(摘) Go-Netty 高性能网络库

声明:内容源自网络,版权归原作者所有。若有侵权请在网页聊天中联系我

Github

有段时间没有写东西了,完成了一些似有似无的东西。今天学习Go-Netty
go-netty 是一款受netty启发的Go语言可扩展的高性能网络库

AddLast(frame.LengthFieldCodec(binary.LittleEndian, 1024, 0, 2, 0, 2)). // 按照自定义协议解码帧(2字节的长度字段)
AddLast(format.TextCodec()). // 消息内容为文本格式(可自定义为 json,protobuf 等编解码器)
bootstrap.Transport(tcp.New()) 配置服务器(客户端)所使用的传输协议
bootstrap.Listen(“tcp://0.0.0.0:6565”).Action(netty.WaitSignal(os.Kill, os.Interrupt)) 开始监听端口并开始提供服务,直到收到指定信号后退出

func LengthFieldCodec(
	byteOrder binary.ByteOrder, // 字节序,大端 & 小端
	maxFrameLength int, // 最大允许数据包长度
	lengthFieldOffset int, // 长度域的偏移量,表示跳过指定长度个字节之后的才是长度域
	lengthFieldLength int, // 记录该帧数据长度的字段本身的长度 1, 2, 4, 8
	lengthAdjustment int, // 包体长度调整的大小,长度域的数值表示的长度加上这个修正值表示的就是带header的包长度
	initialBytesToStrip int, // 拿到一个完整的数据包之后向业务解码器传递之前,应该跳过多少字节
)

给对端发送一条消息,将进入如下流程(视编解码配置): Text -> TextCodec -> LengthFieldCodec -> Channel.Write (文本 文本编码 组装协议格式(长度字段) 网络发送)
向后续的handler传递控制权 ctx.HandleActive()

基于此库,还有一个websocket的库go-netty-ws,实现websocket通信更简单

官方示例稍修改

package main

import (
	"fmt"
	"strings"

	"github.com/go-netty/go-netty"
	"github.com/go-netty/go-netty/codec/format"
	"github.com/go-netty/go-netty/codec/frame"
)

func main() {

	// 子连接的流水线配置
	var childInitializer = func(channel netty.Channel) {
		channel.Pipeline().
			AddLast(frame.DelimiterCodec(128, "\n", true)).  // 最大允许包长128字节,使用\n分割包, 丢弃分隔符
			AddLast(format.TextCodec()).  // 解包出来的bytes转换为字符串
			AddLast(LoggerHandler{}).  // 日志处理器, 打印连接建立断开消息,收到的消息
			AddLast(UpperHandler{})  // 业务处理器 (将字符串全部大写)
	}

	// 创建Bootstrap & 监听端口 & 接受连接
	netty.NewBootstrap(netty.WithChildInitializer(childInitializer)).
		Listen(":9527").Sync()
}

type LoggerHandler struct{}

func (LoggerHandler) HandleActive(ctx netty.ActiveContext) {
	fmt.Println("go-netty:", "->", "连接:", ctx.Channel().RemoteAddr())
	ctx.Write("你好,我是" + "go-netty") // 写入欢迎信息
}

func (LoggerHandler) HandleRead(ctx netty.InboundContext, message netty.Message) {
	fmt.Println("go-netty:", "->", "读取:", message)
	ctx.HandleRead(message) // 交给下一个处理器处理(按照处理器的注册顺序, 此例下一个处理器应该是UpperHandler)
}

func (LoggerHandler) HandleInactive(ctx netty.InactiveContext, ex netty.Exception) {
	fmt.Println("go-netty:", "->", "断开:", ctx.Channel().RemoteAddr(), ex)
	ctx.HandleInactive(ex) // 连接断开了,默认处理是关闭连接
}

type UpperHandler struct{}

func (UpperHandler) HandleRead(ctx netty.InboundContext, message netty.Message) {
	text := message.(string)
	upText := strings.ToUpper(text) // 业务逻辑,将字符串大写化
	ctx.Write(text + " -> " + upText) // 写入返回结果给客户端
}

然后在PowerShell中向服务器发送消息:echo -n “Hello 你” | nc 127.0.0.1 9527
不出意外,双方都出现的是乱码

还得再看看官方示例才明白一点。


ChatServer示例,中文不是问题

package main

import (
	"fmt"
	"net/http"

	"github.com/go-netty/go-netty"
	"github.com/go-netty/go-netty-transport/websocket"
	"github.com/go-netty/go-netty/codec/format"
	"github.com/go-netty/go-netty/codec/frame"
)

// ManagerInst 是一个全局的 Manager 实例
var ManagerInst = NewManager()

func main() {

	// 设置 index 页面的处理函数
	websocket.DefaultOptions.ServeMux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
		writer.Write(indexHtml)
	})

	// 子管道初始化函数
	setupCodec := func(channel netty.Channel) {
		channel.Pipeline().
			AddLast(frame.PacketCodec(128)).  // 读取 websocket 消息
			AddLast(format.JSONCodec(true, false)).  // 将字节解码为 map[string]interface{}
			AddLast(ManagerInst).	// 会话记录器
			AddLast(chatHandler{})	// 聊天处理器
	}

	// 设置引导程序并启动服务器
	netty.NewBootstrap(netty.WithChildInitializer(setupCodec), netty.WithTransport(websocket.New())).
		Listen("0.0.0.0:8080/chat").Sync()
}

// chatHandler 结构体实现了 netty 的处理器接口
type chatHandler struct{}

// HandleActive 处理连接激活事件
func (chatHandler) HandleActive(ctx netty.ActiveContext) {
	type wsTransport interface {
		Route() string
		Header() http.Header
	}

	// 打印连接信息
	// 将当前通道的传输层转换为 wsTransport 接口,以便访问 wsTransport 接口中定义的 Route() 和 Header() 方法
	// wsTransport 接口定义了两个方法:
	// 		Route() string:返回 WebSocket 路由。
	// 	    Header() http.Header:返回 HTTP 头信息。
	if wst, ok := ctx.Channel().Transport().(wsTransport); ok {
		fmt.Printf("child connection from: %s, route: %s, Websocket-Key: %s, User-Agent: %s\n",
			ctx.Channel().RemoteAddr(), wst.Route(), wst.Header().Get("Sec-Websocket-Key"), wst.Header().Get("User-Agent"))
	}

	ctx.HandleActive()
}

// HandleRead 处理读取消息事件
func (chatHandler) HandleRead(ctx netty.InboundContext, message netty.Message) {

	// 打印接收到的消息
	fmt.Printf("received child message from: %s, %v\n", ctx.Channel().RemoteAddr(), message)

	// 如果消息是命令,则添加 ID
	if cmd, ok := message.(map[string]interface{}); ok {
		cmd["id"] = ctx.Channel().ID()
	}

	// 广播消息
	ManagerInst.Broadcast(message)
}

// HandleInactive 处理连接断开事件
func (chatHandler) HandleInactive(ctx netty.InactiveContext, ex netty.Exception) {
	// 打印断开连接信息
	fmt.Printf("child connection closed: %s %s\n", ctx.Channel().RemoteAddr(), ex.Error())
	ctx.HandleInactive(ex)
}

服务器显示类似的连接信息:child connection from: 127.0.0.1:59756, route: /chat, Websocket-Key: 5Tp4LXIjcnwtLRWP/9yv5A==, User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 CCleaner/131.0.0.0

网页内容

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
	<script type="text/javascript">
		var socket;
		if (!window.WebSocket) {
			window.WebSocket = window.MozWebSocket;
		}
		if (window.WebSocket) {
			socket = new WebSocket("ws://127.0.0.1:8080/chat");
			socket.onmessage = function(event) {
				var cmd = JSON.parse(event.data);
				var ta = document.getElementById('responseText');
				ta.value = ta.value + '\n' + (cmd.name + ': ' + cmd.message);
			};
			socket.onopen = function(event) {
				var ta = document.getElementById('responseText');
				ta.value = "connection open!";
			};
			socket.onclose = function(event) {
				var ta = document.getElementById('responseText');
				ta.value = ta.value + "connection closed!";
			};
		} else {
			alert("Your browser does not support WebSocket!");
		}

		function send(name, message) {
			if (!window.WebSocket) {
				return;
			}
			if (socket.readyState == WebSocket.OPEN) {
				socket.send(JSON.stringify({"name" : name, "message" : message}));
			} else {
				alert("Connection is not open!");
			}
		}
	</script>
	<form onsubmit="return false;">
		<h3>WebSocket Chatroom:</h3>
		<textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
		<br>
		<input type="text" name="name" style="width: 100px" value="Rob">
		<input type="text" name="message" style="width: 300px" value="Hello WebSocket">
		<input type="button" value="Send" onclick="send(this.form.name.value, this.form.message.value)">
		<input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="Clear">
	</form>
	<br>
	<br>
</body>
</html>

看起来就是普通的WebSocket连接,发送消息:socket.send(JSON.stringify({“name” : name, “message” : message}));

服务器的关键处理还在sesseion_manager.go中
ManagerInst.Broadcast(message): 广播消息

package main

import (
	"fmt"
	"sync"

	"github.com/go-netty/go-netty"
)

// Manager 接口定义了会话管理器的行为
type Manager interface {
	netty.ActiveHandler
	netty.InactiveHandler
	Size() int
	Context(id int64) netty.HandlerContext
	ForEach(func(netty.HandlerContext) bool)
	Broadcast(message netty.Message)
	BroadcastIf(message netty.Message, fn func(netty.HandlerContext) bool)
}

// NewManager 创建一个新的会话管理器实例
func NewManager() Manager {
	return &sessionManager{
		_sessions: make(map[int64]netty.HandlerContext, 64),
	}
}

// sessionManager 结构体实现了 Manager 接口
type sessionManager struct {
	_sessions map[int64]netty.HandlerContext
	_mutex    sync.RWMutex
}

// Size 返回当前会话的数量
func (s *sessionManager) Size() int {
	s._mutex.RLock()
	size := len(s._sessions)
	s._mutex.RUnlock()
	return size
}

// Context 根据 ID 返回对应的会话上下文
func (s *sessionManager) Context(id int64) netty.HandlerContext {
	s._mutex.RLock()
	ctx, _ := s._sessions[id]
	s._mutex.RUnlock()
	return ctx
}

// ForEach 遍历所有会话并执行给定的函数
func (s *sessionManager) ForEach(fn func(netty.HandlerContext) bool) {
	s._mutex.RLock()
	defer s._mutex.RUnlock()

	for _, ctx := range s._sessions {
		fn(ctx)
	}
}

// Broadcast 向所有会话广播消息
func (s *sessionManager) Broadcast(message netty.Message) {
	fmt.Println("广播")
	s.ForEach(func(ctx netty.HandlerContext) bool {
		ctx.Write(message)
		return true
	})
}

// BroadcastIf 向满足条件的会话广播消息
func (s *sessionManager) BroadcastIf(message netty.Message, fn func(netty.HandlerContext) bool) {
	s.ForEach(func(ctx netty.HandlerContext) bool {
		if fn(ctx) {
			ctx.Write(message)
		}
		return true
	})
}

// HandleActive 处理会话激活事件
func (s *sessionManager) HandleActive(ctx netty.ActiveContext) {
	fmt.Println("激活")
	s._mutex.Lock()
	s._sessions[ctx.Channel().ID()] = ctx  // 激活后保存一个session(当前通道),当广播时将通过遍历session来完成对各个通道的信息发送。  
	s._mutex.Unlock()

	ctx.HandleActive()
}

// HandleInactive 处理会话断开事件
func (s *sessionManager) HandleInactive(ctx netty.InactiveContext, ex netty.Exception) {
	fmt.Println("断开")
	s._mutex.Lock()
	delete(s._sessions, ctx.Channel().ID())
	s._mutex.Unlock()

	ctx.HandleInactive(ex)
}

文件服务器示例。看起来只是在http原来服务之上接管了一下连接/读取/断开的事件。最终还是把服务功能交给原始的http来完成。

package main

import (
	"fmt"
	"net/http"

	"github.com/go-netty/go-netty"
	"github.com/go-netty/go-netty/codec/xhttp"
)

func main() {

	// HTTP 文件服务器处理器
	httpMux := http.NewServeMux()
	httpMux.Handle("/", http.StripPrefix("/", http.FileServer(http.Dir("./"))))

	// 通道管道初始化器
	setupCodec := func(channel netty.Channel) {
		channel.Pipeline().
			AddLast(xhttp.ServerCodec()).   // 从通道解码 HTTP 请求
			AddLast(new(httpStateHandler)). // 打印 HTTP 访问日志
			AddLast(xhttp.Handler(httpMux)) // 兼容 http.Handler
	}

	// 设置引导程序并启动服务器
	netty.NewBootstrap(netty.WithChildInitializer(setupCodec)).
		Listen("0.0.0.0:8080").Sync()
}

// httpStateHandler 结构体
type httpStateHandler struct{}

// HandleActive 方法在客户端连接时调用
func (*httpStateHandler) HandleActive(ctx netty.ActiveContext) {
	fmt.Printf("HTTP 客户端激活: %s\n", ctx.Channel().RemoteAddr())
	ctx.HandleActive()
}

// HandleRead 方法在读取到消息时调用
func (*httpStateHandler) HandleRead(ctx netty.InboundContext, message netty.Message) {
	if request, ok := message.(*http.Request); ok {
		fmt.Printf("[%d]%s: %s %s\n", ctx.Channel().ID(), ctx.Channel().RemoteAddr(), request.Method, request.URL.Path)
	}
	ctx.HandleRead(message)
}

// HandleInactive 方法在客户端断开连接时调用
func (*httpStateHandler) HandleInactive(ctx netty.InactiveContext, ex netty.Exception) {
	fmt.Printf("HTTP 客户端断开: %s %v\n", ctx.Channel().RemoteAddr(), ex)
	ctx.HandleInactive(ex)
}

web服务示例亦同上面的文件服务示例,最终是交给了原始的http来完成。只是修改了一点点关于服务的响应:

	httpMux := http.NewServeMux()
	httpMux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
		writer.Write([]byte("Hello, go-netty!"))
	})

TCP服务示例代码,在一个示例中分示了服务端和客户端

frame.LengthFieldCodec(binary.LittleEndian, 1024, 0, 2, 0, 2) 不知是何意

package main

import (
	"encoding/binary"
	"fmt"
	"time"

	"github.com/go-netty/go-netty"
	"github.com/go-netty/go-netty/codec/format"
	"github.com/go-netty/go-netty/codec/frame"
	"github.com/go-netty/go-netty/utils"
)

func main() {

	// 设置子通道管道初始化器
	childInitializer := func(channel netty.Channel) {
		channel.Pipeline().
			AddLast(frame.LengthFieldCodec(binary.LittleEndian, 1024, 0, 2, 0, 2)). // 添加长度字段解码器
			AddLast(format.TextCodec()).                                            // 添加文本编解码器
			AddLast(EchoHandler{"Server"})                                          // 添加回显处理器
	}

	// 设置客户端管道初始化器
	clientInitializer := func(channel netty.Channel) {
		channel.Pipeline().
			AddLast(frame.LengthFieldCodec(binary.LittleEndian, 1024, 0, 2, 0, 2)). // 添加长度字段解码器
			AddLast(format.TextCodec()).                                            // 添加文本编解码器
			AddLast(EchoHandler{"Client"})                                          // 添加回显处理器
	}

	// 创建引导程序
	var bootstrap = netty.NewBootstrap(netty.WithChildInitializer(childInitializer), netty.WithClientInitializer(clientInitializer))

	// 1秒后连接到服务器
	time.AfterFunc(time.Second, func() {
		_, err := bootstrap.Connect("127.0.0.1:6565")
		utils.Assert(err)
	})

	// 设置引导程序并启动服务器
	bootstrap.Listen("0.0.0.0:6565").Sync()
}

// EchoHandler 结构体
type EchoHandler struct {
	role string
}

// HandleActive 方法在客户端连接时调用
func (l EchoHandler) HandleActive(ctx netty.ActiveContext) {
	fmt.Println(l.role, "->", "active:", ctx.Channel().RemoteAddr())

	ctx.Write("Hello I'm " + l.role)
	ctx.HandleActive()
}

// HandleRead 方法在读取到消息时调用
func (l EchoHandler) HandleRead(ctx netty.InboundContext, message netty.Message) {
	fmt.Println(l.role, "->", "handle read:", message)
	ctx.HandleRead(message)
}

附加go-netty-ws

服务端

// create websocket instance
var ws = nettyws.NewWebsocket()

// setup OnOpen handler
ws.OnOpen = func(conn nettyws.Conn) {
    fmt.Println("OnOpen: ", conn.RemoteAddr())
}

// setup OnData handler
ws.OnData = func(conn nettyws.Conn, data []byte) {
    fmt.Println("OnData: ", conn.RemoteAddr(), ", message: ", string(data))
    conn.Write(data)
}

// setup OnClose handler
ws.OnClose = func(conn nettyws.Conn, err error) {
    fmt.Println("OnClose: ", conn.RemoteAddr(), ", error: ", err)
}

fmt.Println("listening websocket connections ....")

// listen websocket server
if err := ws.Listen("ws://127.0.0.1:9527/ws"); nil != err {
    panic(err)
}

客户端

// create websocket instance
var ws = nettyws.NewWebsocket()

// setup OnOpen handler
ws.OnOpen = func(conn nettyws.Conn) {
    fmt.Println("OnOpen: ", conn.RemoteAddr())
    conn.Write([]byte("hello world"))
}

// setup OnData handler
ws.OnData = func(conn nettyws.Conn, data []byte) {
    fmt.Println("OnData: ", conn.RemoteAddr(), ", message: ", string(data))
}

// setup OnClose handler
ws.OnClose = func(conn nettyws.Conn, err error) {
    fmt.Println("OnClose: ", conn.RemoteAddr(), ", error: ", err)
}

fmt.Println("open websocket connection ...")

// connect to websocket server
if _, err := ws.Open("ws://127.0.0.1:9527/ws"); nil != err {
    panic(err)
}

看起来三个事件就完成了,外加一个发送write。


从标准 Http 服务器升级:

// 创建websocket实例
var ws = nettyws.NewWebsocket()

// 设置OnOpen事件处理器,当连接建立时被调用
ws.OnOpen = func(conn nettyws.Conn) {
    fmt.Println("OnOpen: ", conn.RemoteAddr())
}

// 设置OnData事件处理器,当连接收到消息时被调用
ws.OnData = func(conn nettyws.Conn, data []byte) {
    fmt.Println("OnData: ", conn.RemoteAddr(), ", message: ", string(data))
    conn.Write(data)
}

// 设置OnClose事件处理器,当连接关闭时被回调
ws.OnClose = func(conn nettyws.Conn, err error) {
    fmt.Println("OnClose: ", conn.RemoteAddr(), ", error: ", err)
}

fmt.Println("upgrade websocket connections ....")

// 绑定http路由处理函数,并从http升级到websocket
serveMux := http.NewServeMux()
serveMux.HandleFunc("/ws", func(writer http.ResponseWriter, request *http.Request) {
    ws.UpgradeHTTP(writer, request)
})

// 启动端口监听,接受客户端请求
if err := http.ListenAndServe(":9527", serveMux); nil != err {
    panic(err)
}

补充: go-netty库支持了KCP协议。补充一下KCP协议的知识:

TCP保证数据准确交付,UDP保证数据快速到达,KCP则是两种协议的一个折中。
KCP是一个快速可靠协议,能以比 TCP浪费10%-20%的带宽的代价,换取平均延迟降低 30%-40%,且最大延迟降低三倍的传输效果。
KCP力求在保证可靠性的情况下提高传输速度。
严格意义上讲KCP并不是一种网络传输协议,它是为UDP写的可靠传输算法,它是把TCP的主要可靠传输机制移植到了UDP身上,让UDP变的可靠了起来

Go中也有KCP的库,比如kcp-go

package main

import (
    "log"
    "net"
    "github.com/xtaci/kcp-go"
)

func main() {
    listener, err := kcp.ListenWithOptions(":12345", nil, 0, 0)
    if err != nil {
        log.Fatal(err)
    }
    defer listener.Close()

    log.Println("KCP server is listening on :12345")

    for {
        conn, err := listener.AcceptKCP()
        if err != nil {
            log.Fatal(err)
        }

        go handleConnection(conn)
    }
}

func handleConnection(conn *kcp.UDPSession) {
    defer conn.Close()

    buf := make([]byte, 1024)
    for {
        n, err := conn.Read(buf)
        if err != nil {
            log.Println("Read error:", err)
            return
        }

        log.Printf("Received: %s", buf[:n])
    }
}
package main

import (
    "log"
    "github.com/xtaci/kcp-go"
)

func main() {
    conn, err := kcp.DialWithOptions("127.0.0.1:12345", nil, 0, 0)
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    msg := "Hello, KCP!"
    _, err = conn.Write([]byte(msg))
    if err != nil {
        log.Fatal(err)
    }

    log.Println("Message sent:", msg)
}