(码) 小智AI加入MCP功能

请微信扫码打赏,留言中输入文章编号. 附费后将自动刷新出完整内容.

之前购了一个小智AI,最近比较火的一个项目,它就是你的随身AI助手。

当前1.6.2版本,经网友介绍,发现它已经支持MCP功能。这样就不用再在本地部署小智服务端,运行一个MCP服务就,通过官方(https://xiaozhi.me/)就可以支持本地丰富的功能了。

给的官方示例很简单,也不算复杂。计划将它Golang化。

添加了一些自己的功能

2025.6.4

新增更多的功能:

  1. 打开本地工具
  2. 发送(企业)微信信息
  3. 锁屏
  4. 静音/最大声
  5. 最大化/关闭窗口
  6. 打开网站
  7. 关机
  8. 发送ntfy信息
  9. 按键
  10. 输入文本
  11. WOL(未测试,需要与另一台主机合作)

服务端

"""
This script is used to connect to the MCP server and pipe the input and output to the websocket endpoint.
Version: 0.1.0

Usage:

export MCP_ENDPOINT=<mcp_endpoint>
python mcp_pipe.py <mcp_script>

"""

import asyncio
import websockets
import subprocess
import logging
import os
import signal
import sys
import random
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MCP_PIPE')

# Reconnection settings
INITIAL_BACKOFF = 1  # 初始等待时间,单位为秒
MAX_BACKOFF = 600    # 最大等待时间,以秒为单位
reconnect_attempt = 0
backoff = INITIAL_BACKOFF

# 无限重试连接
async def connect_with_retry(uri):
    """使用重试机制连接到WebSocket服务器"""
    global reconnect_attempt, backoff
    while True:  # 无限重连
        try:
            if reconnect_attempt > 0:
                wait_time = backoff * (1 + random.random() * 0.1)  # 添加一些随机抖动
                logger.info(f"Waiting {wait_time:.2f} seconds before reconnection attempt {reconnect_attempt}...")
                await asyncio.sleep(wait_time)
                
            # 尝试连接
            await connect_to_server(uri)
        
        except Exception as e:
            reconnect_attempt += 1
            logger.warning(f"Connection closed (attempt: {reconnect_attempt}): …
请微信扫码打赏,留言中输入文章编号. 附费后将自动刷新出完整内容.

相关文章