有段时间没有写东西了,完成了一些似有似无的东西。今天学习Go-Netty
go-netty 是一款受netty启发的Go语言可扩展的高性能网络库
AddLast(frame.LengthFieldCodec(binary.LittleEndian, 1024, 0, 2, 0, 2)). // 按照自定义协议解码帧(2字节的长度字段)
AddLast(format.TextCodec()). // 消息内容为文本格式(可自定义为 json,protobuf 等编解码器)
bootstrap.Transport(tcp.New()) 配置服务器(客户端)所使用的传输协议
bootstrap.Listen(“tcp://0.0.0.0:6565”).Action(netty.WaitSignal(os.Kill, os.Interrupt)) 开始监听端口并开始提供服务,直到收到指定信号后退出
func LengthFieldCodec(
byteOrder binary.ByteOrder, // 字节序,大端 & 小端
maxFrameLength int, // 最大允许数据包长度
lengthFieldOffset int, // 长度域的偏移量,表示跳过指定长度个字节之后的才是长度域
lengthFieldLength int, // 记录该帧数据长度的字段本身的长度 1, 2, 4, 8
lengthAdjustment int, // 包体长度调整的大小,长度域的数值表示的长度加上这个修正值表示的就是带header的包长度
initialBytesToStrip int, // 拿到一个完整的数据包之后向业务解码器传递之前,应该跳过多少字节
)
给对端发送一条消息,将进入如下流程(视编解码配置): Text -> TextCodec -> LengthFieldCodec -> Channel.Write (文本 文本编码 组装协议格式(长度字段) 网络发送)
向后续的handler传递控制权 ctx.HandleActive()
基于此库,还有一个websocket的库go-netty-ws,实现websocket通信更简单
官方示例稍修改
package main
import (
"fmt"
"strings"
"github.com/go-netty/go-netty"
"github.com/go-netty/go-netty/codec/format"
"github.com/go-netty/go-netty/codec/frame"
)
func main() {
// 子连接的流水线配置
var childInitializer = func(channel netty.Channel) {
channel.Pipeline().
AddLast(frame.DelimiterCodec(128, "\n", true)). // 最大允许包长128字节,使用\n分割包, 丢弃分隔符
AddLast(format.TextCodec()). // 解包出来的bytes转换为字符串
AddLast(LoggerHandler{}). // 日志处理器, 打印连接建立断开消息,收到的消息
AddLast(UpperHandler{}) // 业务处理器 (将字符串全部大写)
}
// 创建Bootstrap & 监听端口 & 接受连接
netty.NewBootstrap(netty.WithChildInitializer(childInitializer)).
Listen(":9527").Sync()
}
type LoggerHandler struct{}
func (LoggerHandler) HandleActive(ctx netty.ActiveContext) {
fmt.Println("go-netty:", "->", "连接:", ctx.Channel().RemoteAddr())
ctx.Write("你好,我是" + "go-netty") // 写入欢迎信息
}
func (LoggerHandler) HandleRead(ctx netty.InboundContext, message netty.Message) {
fmt.Println("go-netty:", "->", "读取:", message)
ctx.HandleRead(message) // 交给下一个处理器处理(按照处理器的注册顺序, 此例下一个处理器应该是UpperHandler)
}
func (LoggerHandler) HandleInactive(ctx netty.InactiveContext, ex netty.Exception) {
fmt.Println("go-netty:", "->", "断开:", ctx.Channel().RemoteAddr(), ex)
ctx.HandleInactive(ex) // 连接断开了,默认处理是关闭连接
}
type UpperHandler struct{}
func (UpperHandler) HandleRead(ctx netty.InboundContext, message netty.Message) {
text := message.(string)
upText := strings.ToUpper(text) // 业务逻辑,将字符串大写化
ctx.Write(text + " -> " + upText) // 写入返回结果给客户端
}
然后在PowerShell中向服务器发送消息:echo -n “Hello 你” | nc 127.0.0.1 9527
不出意外,双方都出现的是乱码
还得再看看官方示例才明白一点。
ChatServer示例,中文不是问题
package main
import (
"fmt"
"net/http"
"github.com/go-netty/go-netty"
"github.com/go-netty/go-netty-transport/websocket"
"github.com/go-netty/go-netty/codec/format"
"github.com/go-netty/go-netty/codec/frame"
)
// ManagerInst 是一个全局的 Manager 实例
var ManagerInst = NewManager()
func main() {
// 设置 index 页面的处理函数
websocket.DefaultOptions.ServeMux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
writer.Write(indexHtml)
})
// 子管道初始化函数
setupCodec := func(channel netty.Channel) {
channel.Pipeline().
AddLast(frame.PacketCodec(128)). // 读取 websocket 消息
AddLast(format.JSONCodec(true, false)). // 将字节解码为 map[string]interface{}
AddLast(ManagerInst). // 会话记录器
AddLast(chatHandler{}) // 聊天处理器
}
// 设置引导程序并启动服务器
netty.NewBootstrap(netty.WithChildInitializer(setupCodec), netty.WithTransport(websocket.New())).
Listen("0.0.0.0:8080/chat").Sync()
}
// chatHandler 结构体实现了 netty 的处理器接口
type chatHandler struct{}
// HandleActive 处理连接激活事件
func (chatHandler) HandleActive(ctx netty.ActiveContext) {
type wsTransport interface {
Route() string
Header() http.Header
}
// 打印连接信息
// 将当前通道的传输层转换为 wsTransport 接口,以便访问 wsTransport 接口中定义的 Route() 和 Header() 方法
// wsTransport 接口定义了两个方法:
// Route() string:返回 WebSocket 路由。
// Header() http.Header:返回 HTTP 头信息。
if wst, ok := ctx.Channel().Transport().(wsTransport); ok {
fmt.Printf("child connection from: %s, route: %s, Websocket-Key: %s, User-Agent: %s\n",
ctx.Channel().RemoteAddr(), wst.Route(), wst.Header().Get("Sec-Websocket-Key"), wst.Header().Get("User-Agent"))
}
ctx.HandleActive()
}
// HandleRead 处理读取消息事件
func (chatHandler) HandleRead(ctx netty.InboundContext, message netty.Message) {
// 打印接收到的消息
fmt.Printf("received child message from: %s, %v\n", ctx.Channel().RemoteAddr(), message)
// 如果消息是命令,则添加 ID
if cmd, ok := message.(map[string]interface{}); ok {
cmd["id"] = ctx.Channel().ID()
}
// 广播消息
ManagerInst.Broadcast(message)
}
// HandleInactive 处理连接断开事件
func (chatHandler) HandleInactive(ctx netty.InactiveContext, ex netty.Exception) {
// 打印断开连接信息
fmt.Printf("child connection closed: %s %s\n", ctx.Channel().RemoteAddr(), ex.Error())
ctx.HandleInactive(ex)
}
服务器显示类似的连接信息:child connection from: 127.0.0.1:59756, route: /chat, Websocket-Key: 5Tp4LXIjcnwtLRWP/9yv5A==, User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 CCleaner/131.0.0.0
网页内容
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://127.0.0.1:8080/chat");
socket.onmessage = function(event) {
var cmd = JSON.parse(event.data);
var ta = document.getElementById('responseText');
ta.value = ta.value + '\n' + (cmd.name + ': ' + cmd.message);
};
socket.onopen = function(event) {
var ta = document.getElementById('responseText');
ta.value = "connection open!";
};
socket.onclose = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + "connection closed!";
};
} else {
alert("Your browser does not support WebSocket!");
}
function send(name, message) {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(JSON.stringify({"name" : name, "message" : message}));
} else {
alert("Connection is not open!");
}
}
</script>
<form onsubmit="return false;">
<h3>WebSocket Chatroom:</h3>
<textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
<br>
<input type="text" name="name" style="width: 100px" value="Rob">
<input type="text" name="message" style="width: 300px" value="Hello WebSocket">
<input type="button" value="Send" onclick="send(this.form.name.value, this.form.message.value)">
<input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="Clear">
</form>
<br>
<br>
</body>
</html>
看起来就是普通的WebSocket连接,发送消息:socket.send(JSON.stringify({“name” : name, “message” : message}));
服务器的关键处理还在sesseion_manager.go中
ManagerInst.Broadcast(message): 广播消息
package main
import (
"fmt"
"sync"
"github.com/go-netty/go-netty"
)
// Manager 接口定义了会话管理器的行为
type Manager interface {
netty.ActiveHandler
netty.InactiveHandler
Size() int
Context(id int64) netty.HandlerContext
ForEach(func(netty.HandlerContext) bool)
Broadcast(message netty.Message)
BroadcastIf(message netty.Message, fn func(netty.HandlerContext) bool)
}
// NewManager 创建一个新的会话管理器实例
func NewManager() Manager {
return &sessionManager{
_sessions: make(map[int64]netty.HandlerContext, 64),
}
}
// sessionManager 结构体实现了 Manager 接口
type sessionManager struct {
_sessions map[int64]netty.HandlerContext
_mutex sync.RWMutex
}
// Size 返回当前会话的数量
func (s *sessionManager) Size() int {
s._mutex.RLock()
size := len(s._sessions)
s._mutex.RUnlock()
return size
}
// Context 根据 ID 返回对应的会话上下文
func (s *sessionManager) Context(id int64) netty.HandlerContext {
s._mutex.RLock()
ctx, _ := s._sessions[id]
s._mutex.RUnlock()
return ctx
}
// ForEach 遍历所有会话并执行给定的函数
func (s *sessionManager) ForEach(fn func(netty.HandlerContext) bool) {
s._mutex.RLock()
defer s._mutex.RUnlock()
for _, ctx := range s._sessions {
fn(ctx)
}
}
// Broadcast 向所有会话广播消息
func (s *sessionManager) Broadcast(message netty.Message) {
fmt.Println("广播")
s.ForEach(func(ctx netty.HandlerContext) bool {
ctx.Write(message)
return true
})
}
// BroadcastIf 向满足条件的会话广播消息
func (s *sessionManager) BroadcastIf(message netty.Message, fn func(netty.HandlerContext) bool) {
s.ForEach(func(ctx netty.HandlerContext) bool {
if fn(ctx) {
ctx.Write(message)
}
return true
})
}
// HandleActive 处理会话激活事件
func (s *sessionManager) HandleActive(ctx netty.ActiveContext) {
fmt.Println("激活")
s._mutex.Lock()
s._sessions[ctx.Channel().ID()] = ctx // 激活后保存一个session(当前通道),当广播时将通过遍历session来完成对各个通道的信息发送。
s._mutex.Unlock()
ctx.HandleActive()
}
// HandleInactive 处理会话断开事件
func (s *sessionManager) HandleInactive(ctx netty.InactiveContext, ex netty.Exception) {
fmt.Println("断开")
s._mutex.Lock()
delete(s._sessions, ctx.Channel().ID())
s._mutex.Unlock()
ctx.HandleInactive(ex)
}
文件服务器示例。看起来只是在http原来服务之上接管了一下连接/读取/断开的事件。最终还是把服务功能交给原始的http来完成。
package main
import (
"fmt"
"net/http"
"github.com/go-netty/go-netty"
"github.com/go-netty/go-netty/codec/xhttp"
)
func main() {
// HTTP 文件服务器处理器
httpMux := http.NewServeMux()
httpMux.Handle("/", http.StripPrefix("/", http.FileServer(http.Dir("./"))))
// 通道管道初始化器
setupCodec := func(channel netty.Channel) {
channel.Pipeline().
AddLast(xhttp.ServerCodec()). // 从通道解码 HTTP 请求
AddLast(new(httpStateHandler)). // 打印 HTTP 访问日志
AddLast(xhttp.Handler(httpMux)) // 兼容 http.Handler
}
// 设置引导程序并启动服务器
netty.NewBootstrap(netty.WithChildInitializer(setupCodec)).
Listen("0.0.0.0:8080").Sync()
}
// httpStateHandler 结构体
type httpStateHandler struct{}
// HandleActive 方法在客户端连接时调用
func (*httpStateHandler) HandleActive(ctx netty.ActiveContext) {
fmt.Printf("HTTP 客户端激活: %s\n", ctx.Channel().RemoteAddr())
ctx.HandleActive()
}
// HandleRead 方法在读取到消息时调用
func (*httpStateHandler) HandleRead(ctx netty.InboundContext, message netty.Message) {
if request, ok := message.(*http.Request); ok {
fmt.Printf("[%d]%s: %s %s\n", ctx.Channel().ID(), ctx.Channel().RemoteAddr(), request.Method, request.URL.Path)
}
ctx.HandleRead(message)
}
// HandleInactive 方法在客户端断开连接时调用
func (*httpStateHandler) HandleInactive(ctx netty.InactiveContext, ex netty.Exception) {
fmt.Printf("HTTP 客户端断开: %s %v\n", ctx.Channel().RemoteAddr(), ex)
ctx.HandleInactive(ex)
}
web服务示例亦同上面的文件服务示例,最终是交给了原始的http来完成。只是修改了一点点关于服务的响应:
httpMux := http.NewServeMux()
httpMux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
writer.Write([]byte("Hello, go-netty!"))
})
TCP服务示例代码,在一个示例中分示了服务端和客户端
frame.LengthFieldCodec(binary.LittleEndian, 1024, 0, 2, 0, 2) 不知是何意
package main
import (
"encoding/binary"
"fmt"
"time"
"github.com/go-netty/go-netty"
"github.com/go-netty/go-netty/codec/format"
"github.com/go-netty/go-netty/codec/frame"
"github.com/go-netty/go-netty/utils"
)
func main() {
// 设置子通道管道初始化器
childInitializer := func(channel netty.Channel) {
channel.Pipeline().
AddLast(frame.LengthFieldCodec(binary.LittleEndian, 1024, 0, 2, 0, 2)). // 添加长度字段解码器
AddLast(format.TextCodec()). // 添加文本编解码器
AddLast(EchoHandler{"Server"}) // 添加回显处理器
}
// 设置客户端管道初始化器
clientInitializer := func(channel netty.Channel) {
channel.Pipeline().
AddLast(frame.LengthFieldCodec(binary.LittleEndian, 1024, 0, 2, 0, 2)). // 添加长度字段解码器
AddLast(format.TextCodec()). // 添加文本编解码器
AddLast(EchoHandler{"Client"}) // 添加回显处理器
}
// 创建引导程序
var bootstrap = netty.NewBootstrap(netty.WithChildInitializer(childInitializer), netty.WithClientInitializer(clientInitializer))
// 1秒后连接到服务器
time.AfterFunc(time.Second, func() {
_, err := bootstrap.Connect("127.0.0.1:6565")
utils.Assert(err)
})
// 设置引导程序并启动服务器
bootstrap.Listen("0.0.0.0:6565").Sync()
}
// EchoHandler 结构体
type EchoHandler struct {
role string
}
// HandleActive 方法在客户端连接时调用
func (l EchoHandler) HandleActive(ctx netty.ActiveContext) {
fmt.Println(l.role, "->", "active:", ctx.Channel().RemoteAddr())
ctx.Write("Hello I'm " + l.role)
ctx.HandleActive()
}
// HandleRead 方法在读取到消息时调用
func (l EchoHandler) HandleRead(ctx netty.InboundContext, message netty.Message) {
fmt.Println(l.role, "->", "handle read:", message)
ctx.HandleRead(message)
}
服务端
// create websocket instance
var ws = nettyws.NewWebsocket()
// setup OnOpen handler
ws.OnOpen = func(conn nettyws.Conn) {
fmt.Println("OnOpen: ", conn.RemoteAddr())
}
// setup OnData handler
ws.OnData = func(conn nettyws.Conn, data []byte) {
fmt.Println("OnData: ", conn.RemoteAddr(), ", message: ", string(data))
conn.Write(data)
}
// setup OnClose handler
ws.OnClose = func(conn nettyws.Conn, err error) {
fmt.Println("OnClose: ", conn.RemoteAddr(), ", error: ", err)
}
fmt.Println("listening websocket connections ....")
// listen websocket server
if err := ws.Listen("ws://127.0.0.1:9527/ws"); nil != err {
panic(err)
}
客户端
// create websocket instance
var ws = nettyws.NewWebsocket()
// setup OnOpen handler
ws.OnOpen = func(conn nettyws.Conn) {
fmt.Println("OnOpen: ", conn.RemoteAddr())
conn.Write([]byte("hello world"))
}
// setup OnData handler
ws.OnData = func(conn nettyws.Conn, data []byte) {
fmt.Println("OnData: ", conn.RemoteAddr(), ", message: ", string(data))
}
// setup OnClose handler
ws.OnClose = func(conn nettyws.Conn, err error) {
fmt.Println("OnClose: ", conn.RemoteAddr(), ", error: ", err)
}
fmt.Println("open websocket connection ...")
// connect to websocket server
if _, err := ws.Open("ws://127.0.0.1:9527/ws"); nil != err {
panic(err)
}
看起来三个事件就完成了,外加一个发送write。
从标准 Http 服务器升级:
// 创建websocket实例
var ws = nettyws.NewWebsocket()
// 设置OnOpen事件处理器,当连接建立时被调用
ws.OnOpen = func(conn nettyws.Conn) {
fmt.Println("OnOpen: ", conn.RemoteAddr())
}
// 设置OnData事件处理器,当连接收到消息时被调用
ws.OnData = func(conn nettyws.Conn, data []byte) {
fmt.Println("OnData: ", conn.RemoteAddr(), ", message: ", string(data))
conn.Write(data)
}
// 设置OnClose事件处理器,当连接关闭时被回调
ws.OnClose = func(conn nettyws.Conn, err error) {
fmt.Println("OnClose: ", conn.RemoteAddr(), ", error: ", err)
}
fmt.Println("upgrade websocket connections ....")
// 绑定http路由处理函数,并从http升级到websocket
serveMux := http.NewServeMux()
serveMux.HandleFunc("/ws", func(writer http.ResponseWriter, request *http.Request) {
ws.UpgradeHTTP(writer, request)
})
// 启动端口监听,接受客户端请求
if err := http.ListenAndServe(":9527", serveMux); nil != err {
panic(err)
}
补充: go-netty库支持了KCP协议。补充一下KCP协议的知识:
TCP保证数据准确交付,UDP保证数据快速到达,KCP则是两种协议的一个折中。
KCP是一个快速可靠协议,能以比 TCP浪费10%-20%的带宽的代价,换取平均延迟降低 30%-40%,且最大延迟降低三倍的传输效果。
KCP力求在保证可靠性的情况下提高传输速度。
严格意义上讲KCP并不是一种网络传输协议,它是为UDP写的可靠传输算法,它是把TCP的主要可靠传输机制移植到了UDP身上,让UDP变的可靠了起来
Go中也有KCP的库,比如kcp-go
package main
import (
"log"
"net"
"github.com/xtaci/kcp-go"
)
func main() {
listener, err := kcp.ListenWithOptions(":12345", nil, 0, 0)
if err != nil {
log.Fatal(err)
}
defer listener.Close()
log.Println("KCP server is listening on :12345")
for {
conn, err := listener.AcceptKCP()
if err != nil {
log.Fatal(err)
}
go handleConnection(conn)
}
}
func handleConnection(conn *kcp.UDPSession) {
defer conn.Close()
buf := make([]byte, 1024)
for {
n, err := conn.Read(buf)
if err != nil {
log.Println("Read error:", err)
return
}
log.Printf("Received: %s", buf[:n])
}
}
package main
import (
"log"
"github.com/xtaci/kcp-go"
)
func main() {
conn, err := kcp.DialWithOptions("127.0.0.1:12345", nil, 0, 0)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
msg := "Hello, KCP!"
_, err = conn.Write([]byte(msg))
if err != nil {
log.Fatal(err)
}
log.Println("Message sent:", msg)
}