引言

自己写着玩的项目需要使用WebSocket作为客户端回传长时数据流和确定在线状态能力的依赖,但是go-zero并没有直接提供使用WebSocket的功能,go-zero只有一个极其简单的demo,仅有的博客园教程也像是在AI生成,所以自己研究了一下,顺带写一篇教程。

实现

/pkg/ws/ws.go(自创建)

package ws

import (
	"net/http"
	"sync"
	"github.com/gorilla/websocket"
	"github.com/zeromicro/go-zero/core/logx"
)

var Upgrader = websocket.Upgrader{
	CheckOrigin: func(r *http.Request) bool {
		return true // 允许所有来源
	},
}

type Connection struct {
	Conn *websocket.Conn
	mu   sync.Mutex
}

type Hub struct {
	Connections map[*Connection]bool
	Broadcast   chan []byte
	Register    chan *Connection
	Unregister  chan *Connection
}

func NewHub() *Hub {
	return &Hub{
		Connections: make(map[*Connection]bool),
		Broadcast:   make(chan []byte),
		Register:    make(chan *Connection, 10),
		Unregister:  make(chan *Connection),
	}
}

func (h *Hub) Run() {
	for {
		select {
		case conn := <-h.Register:
			h.Connections[conn] = true
		case conn := <-h.Unregister:
			if _, ok := h.Connections[conn]; ok {
				delete(h.Connections, conn)
				conn.Conn.Close()
			}
		case message := <-h.Broadcast:
			for conn := range h.Connections {
				conn.Write(message)
			}

		}

	}
}

func (c *Connection) Write(message []byte) {
	c.mu.Lock()
	defer c.mu.Unlock()
	err := c.Conn.WriteMessage(websocket.TextMessage, message)
	if err != nil {
		logx.Errorf("Error writing message: %v", err)
	}
}

ServiceContext(svcCtx)

type ServiceContext struct {
    Config      config.Config
    Casbin      *casbin.Enforcer
    Authority   rest.Middleware
    Trans       *i18n.Translator
    CoreRpc     coreclient.Core
    DanmakusRpc danmakusclient.Danmakus
    WSHub       *ws.Hub // 自行添加
    DataPerm    rest.Middleware
}

func NewServiceContext(c config.Config) *ServiceContext {

    rds := c.RedisConf.MustNewUniversalRedis()

    cbn := c.CasbinConf.MustNewCasbinWithOriginalRedisWatcher(c.CasbinDatabaseConf.Type, c.CasbinDatabaseConf.GetDSN(), c.RedisConf)

    wsHub := ws.NewHub() //创建Hub
    go wsHub.Run() // 启动Hub线程

    trans := i18n.NewTranslator(c.I18nConf, i18n2.LocaleFS)

    return &ServiceContext{
       Config:      c,
       Authority:   middleware.NewAuthorityMiddleware(cbn, rds).Handle,
       Trans:       trans,
       CoreRpc:     coreclient.NewCore(zrpc.NewClientIfEnable(c.CoreRpc)),
       WSHub:       wsHub,  // 添加
    }
}

创建API及其Handle

路径随意,Method为GET

Handle.go

func WsHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
       conn, err := ws2.Upgrader.Upgrade(w, r, nil)
       if err != nil {
          logx.Errorf("Error upgrading to WebSocket: %v", err)
          return
       }
       c := &ws2.Connection{Conn: conn}
       svcCtx.WSHub.Register <- c
       defer func() {
          svcCtx.WSHub.Unregister <- c
       }()
       var resp types.DanmakusReq
       l := ws.NewWsLogic(r.Context(), svcCtx)

       // 该channel用于往goroutine中发送控制
       resetChan := make(chan bool)

       // 定时器用于设置心跳包
       timer := time.NewTimer(70 * time.Second)

       // 启动一个 goroutine 来处理定时器
       go func() { // 这是我用来验证心跳包的,可以去掉相关代码
          for {
             select {
             case <-timer.C:
                // 无心跳包
                c.Conn.Close()
             case reset := <-resetChan:
                if reset {
                   // 重置心跳定时
                   timer.Reset(70 * time.Second)
                }
             }
          }
       }()

       for {
          err := c.Conn.ReadJSON(&resp)
          if err != nil {
             if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
                logx.Errorf("Error reading message: %v", err)
             }
             break
          }
          if resp.Type == 0 {
             resetChan <- true
          }
          l.Ws(resp) // 这里写ReadMeassge相关处理等Logic,其他的也行,自己写就行
       }
    }
}

结语

这时候启动应该就能正常使用了,具体更多功能就需要自己定制了。