引言
自己写着玩的项目需要使用WebSocket作为客户端回传长时数据流和确定在线状态能力的依赖,但是go-zero并没有直接提供使用WebSocket的功能,go-zero只有一个极其简单的demo,仅有的博客园教程也像是在AI生成,所以自己研究了一下,顺带写一篇教程。
实现
/pkg/ws/ws.go(自创建)
package ws
import (
"net/http"
"sync"
"github.com/gorilla/websocket"
"github.com/zeromicro/go-zero/core/logx"
)
var Upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true // 允许所有来源
},
}
type Connection struct {
Conn *websocket.Conn
mu sync.Mutex
}
type Hub struct {
Connections map[*Connection]bool
Broadcast chan []byte
Register chan *Connection
Unregister chan *Connection
}
func NewHub() *Hub {
return &Hub{
Connections: make(map[*Connection]bool),
Broadcast: make(chan []byte),
Register: make(chan *Connection, 10),
Unregister: make(chan *Connection),
}
}
func (h *Hub) Run() {
for {
select {
case conn := <-h.Register:
h.Connections[conn] = true
case conn := <-h.Unregister:
if _, ok := h.Connections[conn]; ok {
delete(h.Connections, conn)
conn.Conn.Close()
}
case message := <-h.Broadcast:
for conn := range h.Connections {
conn.Write(message)
}
}
}
}
func (c *Connection) Write(message []byte) {
c.mu.Lock()
defer c.mu.Unlock()
err := c.Conn.WriteMessage(websocket.TextMessage, message)
if err != nil {
logx.Errorf("Error writing message: %v", err)
}
}
ServiceContext(svcCtx)
type ServiceContext struct {
Config config.Config
Casbin *casbin.Enforcer
Authority rest.Middleware
Trans *i18n.Translator
CoreRpc coreclient.Core
DanmakusRpc danmakusclient.Danmakus
WSHub *ws.Hub // 自行添加
DataPerm rest.Middleware
}
func NewServiceContext(c config.Config) *ServiceContext {
rds := c.RedisConf.MustNewUniversalRedis()
cbn := c.CasbinConf.MustNewCasbinWithOriginalRedisWatcher(c.CasbinDatabaseConf.Type, c.CasbinDatabaseConf.GetDSN(), c.RedisConf)
wsHub := ws.NewHub() //创建Hub
go wsHub.Run() // 启动Hub线程
trans := i18n.NewTranslator(c.I18nConf, i18n2.LocaleFS)
return &ServiceContext{
Config: c,
Authority: middleware.NewAuthorityMiddleware(cbn, rds).Handle,
Trans: trans,
CoreRpc: coreclient.NewCore(zrpc.NewClientIfEnable(c.CoreRpc)),
WSHub: wsHub, // 添加
}
}
创建API及其Handle
路径随意,Method为GET
Handle.go
func WsHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
conn, err := ws2.Upgrader.Upgrade(w, r, nil)
if err != nil {
logx.Errorf("Error upgrading to WebSocket: %v", err)
return
}
c := &ws2.Connection{Conn: conn}
svcCtx.WSHub.Register <- c
defer func() {
svcCtx.WSHub.Unregister <- c
}()
var resp types.DanmakusReq
l := ws.NewWsLogic(r.Context(), svcCtx)
// 该channel用于往goroutine中发送控制
resetChan := make(chan bool)
// 定时器用于设置心跳包
timer := time.NewTimer(70 * time.Second)
// 启动一个 goroutine 来处理定时器
go func() { // 这是我用来验证心跳包的,可以去掉相关代码
for {
select {
case <-timer.C:
// 无心跳包
c.Conn.Close()
case reset := <-resetChan:
if reset {
// 重置心跳定时
timer.Reset(70 * time.Second)
}
}
}
}()
for {
err := c.Conn.ReadJSON(&resp)
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
logx.Errorf("Error reading message: %v", err)
}
break
}
if resp.Type == 0 {
resetChan <- true
}
l.Ws(resp) // 这里写ReadMeassge相关处理等Logic,其他的也行,自己写就行
}
}
}
结语
这时候启动应该就能正常使用了,具体更多功能就需要自己定制了。