Nats-io 云原生高性能消息系统,专注轻量级、低延迟的分布式通信。核心服务器用 Go 语言开发,客户端覆盖 40+ 语言(Go、Java、Python、JS 等)。
nats-server 单进程架构,无外部依赖,内存占用极低(~10MB)。支持 Pub/Sub、Request-Reply、队列等模式。
nats.go 官方 Go 语言客户端
应用场景:NATS 是一个高性能、轻量级的消息中间件,支持微服务之间的高效通信;IoT 设备间的数据收集与实时控制;可用于构建事件驱动型应用,如实时通知、用户行为跟踪等;可作为 Kafka、RabbitMQ 的轻量级替代方案,适用于低延迟、高吞吐场景;通过 NATS WebSocket 客户端(如 nats.ws),可在浏览器中实现高效消息传递,适用于聊天应用、在线协作工具等;适用于跨系统数据同步,如分布式数据库日志复制
示例:
服务器直接下载nats-server运行即可,无需配置
接收方:
package main
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal("连接失败:", err)
}
defer nc.Close()
subject := "demo.query"
// 订阅请求并回复
_, err = nc.Subscribe(subject, func(msg *nats.Msg) {
log.Printf("收到请求: %s\n", string(msg.Data))
if msg.Reply != "" {
// 带回复的请求
response := "响应时间: " + time.Now().Format("15:04:05.000")
if err := nc.Publish(msg.Reply, []byte(response)); err != nil {
log.Println("回复失败:", err)
}
} else {
// 简单消息
log.Println("这是一个无回复的简单消息")
}
})
if err != nil {
log.Fatal("订阅失败:", err)
}
log.Println("服务端已启动,等待请求...")
select {} // 永久阻塞
}
发布方:
package main
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
// 连接到NATS服务器
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal("连接失败:", err)
}
defer nc.Close()
subject := "demo.query"
replySubject := "demo.reply"
// 1. 简单消息(无回复)
log.Println("=== 测试简单消息 ===")
if err := nc.Publish(subject, []byte("这是一个简单消息")); err != nil {
log.Fatal("发布失败:", err)
}
log.Println("已发布简单消息(无回复等待)")
// 2. 带回复的消息(同步等待响应)
log.Println("\n=== 测试带回复的消息 ===")
startTime := time.Now()
// 发布带回复的请求(同步方式)
reply, err := nc.Request(subject, []byte("请求当前时间"), 2*time.Second)
if err != nil {
log.Fatal("请求失败:", err)
}
log.Printf("收到回复: %s (耗时: %v)\n",
string(reply.Data),
time.Since(startTime),
)
// 3. 带回复的消息(异步方式)
log.Println("\n=== 测试异步回复处理 ===")
// 先订阅回复主题
startTime = time.Now()
sub, err := nc.Subscribe(replySubject, func(msg *nats.Msg) {
log.Printf("异步收到回复: %s (耗时: %v)\n", string(msg.Data), time.Since(startTime))
})
if err != nil {
log.Fatal("订阅回复失败:", err)
}
defer sub.Unsubscribe()
// 发布带回复地址的请求
if err := nc.PublishRequest(subject, replySubject, []byte("异步请求时间")); err != nil {
log.Fatal("发布请求失败:", err)
}
log.Println("已发送异步请求(不阻塞等待)")
// 等待异步回复
time.Sleep(1 * time.Second)
}