(摘) Nats 轻量级、低延迟的分布式通信

声明:内容源自网络,版权归原作者所有。若有侵权请在网页聊天中联系我

Nats官网

Nats-io 云原生高性能消息系统,专注轻量级、低延迟的分布式通信。核心服务器用 Go 语言开发,客户端覆盖 40+ 语言(Go、Java、Python、JS 等)。

nats-server 单进程架构,无外部依赖,内存占用极低(~10MB)。支持 Pub/Sub、Request-Reply、队列等模式。

nats.go 官方 Go 语言客户端

应用场景:NATS 是一个高性能、轻量级的消息中间件,支持微服务之间的高效通信;IoT 设备间的数据收集与实时控制;可用于构建事件驱动型应用,如实时通知、用户行为跟踪等;可作为 Kafka、RabbitMQ 的轻量级替代方案,适用于低延迟、高吞吐场景;通过 NATS WebSocket 客户端(如 nats.ws),可在浏览器中实现高效消息传递,适用于聊天应用、在线协作工具等;适用于跨系统数据同步,如分布式数据库日志复制

示例:

服务器直接下载nats-server运行即可,无需配置

接收方:

package main

import (
	"log"
	"time"

	"github.com/nats-io/nats.go"
)

func main() {
	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Fatal("连接失败:", err)
	}
	defer nc.Close()

	subject := "demo.query"

	// 订阅请求并回复
	_, err = nc.Subscribe(subject, func(msg *nats.Msg) {
		log.Printf("收到请求: %s\n", string(msg.Data))

		if msg.Reply != "" {
			// 带回复的请求
			response := "响应时间: " + time.Now().Format("15:04:05.000")
			if err := nc.Publish(msg.Reply, []byte(response)); err != nil {
				log.Println("回复失败:", err)
			}
		} else {
			// 简单消息
			log.Println("这是一个无回复的简单消息")
		}
	})
	if err != nil {
		log.Fatal("订阅失败:", err)
	}

	log.Println("服务端已启动,等待请求...")
	select {} // 永久阻塞
}

发布方:

package main

import (
	"log"
	"time"

	"github.com/nats-io/nats.go"
)

func main() {
	// 连接到NATS服务器
	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Fatal("连接失败:", err)
	}
	defer nc.Close()

	subject := "demo.query"
	replySubject := "demo.reply"

	// 1. 简单消息(无回复)
	log.Println("=== 测试简单消息 ===")
	if err := nc.Publish(subject, []byte("这是一个简单消息")); err != nil {
		log.Fatal("发布失败:", err)
	}
	log.Println("已发布简单消息(无回复等待)")

	// 2. 带回复的消息(同步等待响应)
	log.Println("\n=== 测试带回复的消息 ===")
	startTime := time.Now()

	// 发布带回复的请求(同步方式)
	reply, err := nc.Request(subject, []byte("请求当前时间"), 2*time.Second)
	if err != nil {
		log.Fatal("请求失败:", err)
	}
	log.Printf("收到回复: %s (耗时: %v)\n",
		string(reply.Data),
		time.Since(startTime),
	)

	// 3. 带回复的消息(异步方式)
	log.Println("\n=== 测试异步回复处理 ===")

	// 先订阅回复主题
	startTime = time.Now()
	sub, err := nc.Subscribe(replySubject, func(msg *nats.Msg) {
		log.Printf("异步收到回复: %s (耗时: %v)\n", string(msg.Data), time.Since(startTime))
	})
	if err != nil {
		log.Fatal("订阅回复失败:", err)
	}
	defer sub.Unsubscribe()

	// 发布带回复地址的请求
	if err := nc.PublishRequest(subject, replySubject, []byte("异步请求时间")); err != nil {
		log.Fatal("发布请求失败:", err)
	}
	log.Println("已发送异步请求(不阻塞等待)")

	// 等待异步回复
	time.Sleep(1 * time.Second)
}