(原) 小智MCP:利用bigmodel对图片进行分析

原创文章,请后转载,并注明出处。

接上文“本地Ollama对图片分析”,这里是使用免费的bigmodel,对本地监控视频流进行图片截取,然后分析图片。

因为是使用bigmodel免费提供的模型,据说只能分析网址提供的图片,而不是通常使用的本地文件base64的方式。

这里使用https://gost.sian.one免费提供的映射功能,即将本地的服务映射为公网地址,使我们的图片有一个固定的公网地址,以提供bigmodel的模型进行分析。

  1. 在https://gost.sian.one上注册,新增客户端并获取key
  2. 在此网申请一个域名解析,与新增的客户端绑定,做好端口设置
  3. 本机运行客户端,完成于服务端通信:gostc.exe -addr gost.sian.one -key 你的key
  4. 启动本地服务,例如 algernon.exe -a -n -q –nocache –nodb –addr=0.0.0.0:80
    此时,即可以通过申请的域名访问到本机的服务内容了。即以下代码中的serverURL
package main

import (
	"bytes"
	"encoding/base64"
	"fmt"
	"io"
	"net/http"
	"os"
	"path/filepath"
)

const (
	apiURL    = "https://open.bigmodel.cn/api/paas/v4/chat/completions"
	apiKey    = "8927212a901cb72b58HC2Ow6"   // bigmodel的key
	modelName = "glm-4v-flash"                  // 免费模型
	serverURL = "https://aaaaa.gost.nyc.mn/" // 图片服务器地址
)

// 修正后的请求结构体
type RequestPayload struct {
	Model    string     `json:"model"`
	Messages []OMessage `json:"messages"`
}

type OMessage struct {
	Role    string         `json:"role"`
	Content []ContentUnion `json:"content"`
}

// 使用联合类型处理不同类型的content
type ContentUnion struct {
	Type     string     `json:"type"`
	Text     string     `json:"text,omitempty"`
	ImageURL *ImageURL  `json:"image_url,omitempty"`
	Image    *ImageData `json:"image,omitempty"`
}

type ImageURL struct {
	URL string `json:"url"`
}

type ImageData struct {
	Data string `json:"data"` // Base64编码的图片数据
}

// 响应结构体
type APIResponse struct {
	Choices []struct {
		Message struct {
			Content string `json:"content"`
		} `json:"message"`
	} `json:"choices"`
	Error struct {
		Message string `json:"message"`
	} `json:"error"`
}

// 使用在线图片URL识别 (推荐方式)
func recognizeFromURL(imageURL, prompt string) string {
	// 修正后的content结构
	content := []ContentUnion{
		{
			Type: "image_url",
			ImageURL: &ImageURL{
				URL: imageURL,
			},
		},
		{
			Type: "text",
			Text: prompt,
		},
	}

	return sendRequest(content)
}

// 使用本地图片文件(Base64)识别 (仅支持付费模型)
func recognizeFromFile(filePath, prompt string) string {
	// 读取图片文件
	imageData, err := os.ReadFile(filePath)
	if err != nil {
		fmt.Printf("读取文件失败: %v\n", err)
		return ""
	}

	// 转换为Base64
	base64Data := base64.StdEncoding.EncodeToString(imageData)

	content := []ContentUnion{
		{
			Type: "image",
			Image: &ImageData{
				Data: base64Data,
			},
		},
		{
			Type: "text",
			Text: prompt,
		},
	}

	return sendRequest(content)
}

// 发送API请求
func sendRequest(content []ContentUnion) string {
	payload := RequestPayload{
		Model: modelName,
		Messages: []OMessage{
			{
				Role:    "user",
				Content: content,
			},
		},
	}

	payloadBytes, err := json.Marshal(payload)
	if err != nil {
		fmt.Printf("JSON编码失败: %v\n", err)
		return ""
	}

	// 打印请求体用于调试
	// fmt.Printf("请求体: %s\n", string(payloadBytes))

	req, err := http.NewRequest("POST", apiURL, bytes.NewBuffer(payloadBytes))
	if err != nil {
		fmt.Printf("创建请求失败: %v\n", err)
		return ""
	}

	req.Header.Set("Authorization", "Bearer "+apiKey)
	req.Header.Set("Content-Type", "application/json")

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		fmt.Printf("请求失败: %v\n", err)
		return ""
	}
	defer resp.Body.Close()

	body, err := io.ReadAll(resp.Body)
	if err != nil {
		fmt.Printf("读取响应失败: %v\n", err)
		return ""
	}

	// fmt.Printf("响应状态: %d\n", resp.StatusCode)
	// fmt.Printf("响应体: %s\n", string(body))

	var result APIResponse
	if err := json.Unmarshal(body, &result); err != nil {
		fmt.Printf("JSON解析失败: %v\n", err)
		return ""
	}

	// 处理响应
	if len(result.Choices) > 0 {
		// fmt.Println("\n识别结果:", result.Choices[0].Message.Content)
		return result.Choices[0].Message.Content
	} else if result.Error.Message != "" {
		fmt.Println("API错误:", result.Error.Message)
	} else {
		fmt.Println("未知响应:", string(body))
	}

	return ""
}

func analyzeImage_bigmodel(imagePath string) (string, error) {
	fileName := filepath.Base(imagePath)
	ret := recognizeFromURL(serverURL+fileName, "描述图片中的场景和物体")
	if ret != "" {
		return ret, nil
	} else {
		return ret, nil
	}
}

为了简单,使用ffmpeg对视频流进行截图
例如我使用萤石云开通的视频流: rtspURL := “rtsp://admin:WBTMHL@192.168.1.210:554/h264/ch1/main/av_stream”

// ffmpeg截取视频流中的图像
func getPicture(rtspURL string) string {
	tempDir := os.TempDir()

	outputPath, _ := filepath.Abs(filepath.Join(tempDir, fmt.Sprintf("%d.jpg", time.Now().UnixNano())))
	timeout := 10 * time.Second
	// 构建FFmpeg命令
	cmd := exec.Command(
		"ffmpeg.exe",
		"-rtsp_transport", "tcp", // 使用TCP传输,更稳定
		"-i", rtspURL, // 输入源
		// "-ss", "00:00:01", // 从2秒处开始(跳过缓冲)
		"-vframes", "1", // 只获取1帧
		"-y",                          // 覆盖已存在的文件
		"-q:v", fmt.Sprintf("%d", 20), // JPEG质量参数 (1-31, 1为最高)
		"-s", fmt.Sprintf("%dx%d", 1024, 768),
		// "-vf", fmt.Sprintf("crop=%d:%d:%d:%d", 1024, 768, 100, 200), // 裁剪滤镜
		outputPath, // 输出路径
	)

	// 捕获命令输出,用于调试
	var stderr bytes.Buffer
	cmd.Stderr = &stderr

	// 设置超时
	done := make(chan error, 1)
	go func() {
		done <- cmd.Run()
	}()

	select {
	case err := <-done:
		if err != nil {
			log.Fatalf("FFmpeg执行失败: %v\n%s", err, stderr.String())
			return ""
		}
		fmt.Printf("成功保存帧到: %s\n", outputPath)
		return outputPath
	case <-time.After(timeout):
		if cmd.Process != nil {
			cmd.Process.Kill()
		}
		log.Fatalf("获取帧超时(%v)", timeout)
		return ""
	}
}

相关文章