Go使用Gin框架+WebSocket实现聊天室后端

1.Go中websocket使用

全局变量

    // 创建一个用于存储WebSocket连接的map和互斥锁
    clients    = make(map[*websocket.Conn]bool)
    clientsMux sync.Mutex

测试消息:

    r.GET("/ws", func(c *gin.Context) {
        // 升级HTTP连接为WebSocket连接
        id := c.Query("id")
        fmt.Println("id=",id)
        ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
        if err != nil {
            log.Println(err)
            return
        }
        defer ws.Close()
        clients[ws] = true

        // 监听连接关闭
        go func(ws *websocket.Conn) {
            for {
                _, message, err := ws.ReadMessage()
                if err != nil {
                    clientsMux.Lock()
                    clients[ws] = false
                    clientsMux.Unlock()
                    fmt.Println("connection has closed")
                    break
                }
                fmt.Println("msg:", string(message))
            }
        }(ws)

        // 发送消息到 WebSocket 连接
        for {
            if v, e := clients[ws]; v {
                // 每隔两秒给前端推送一句消息“hello, WebSocket”
                err := ws.WriteMessage(websocket.TextMessage, []byte("hello, WebSocket"))
                if err != nil {
                    log.Println("send error:",err,e)
                }
                time.Sleep(time.Second * 2)
            }else{
                clientsMux.Lock() 
                delete(clients, ws)  
                clientsMux.Unlock() 
                break
            }
        }
    })

2.根据websocket实现实时聊天

2.1 实现原理

2.1.1 交互密钥

1.客户端通过轮询的方式请求服务器,对话双方需在同一时间段内请求;
2.服务器生成随机的密钥与随机的会话id;
3.客户端请求时若双方同时在线,读取redis 中的im_key,中is_read=1说明双方都在线且都可以获取到密钥;

2.2.2 消息收发

1.客户端与服务器建立websocket长连接,在建立连接时需要先鉴权,需要判断双方是否是需要收发消息的;
2.建立长连接后,收到的消息将保存在redis list中,通过redis_key_id的方式去查找该用户收到的消息,该用户发送的消息将存到redis_key_to_user_id中;
3.每个长连接中都能收发消息,实现消息的发送;
4.这里服务器不进行磁盘保存,所有消息通过前面分发的密钥在客户端加解密,服务器不持久保存密钥与消息内容;

2.2 实现代码

2.2.1 消息处理与交换密钥

package handler

import (
    "crypto/rand"
    "encoding/hex"
    "encoding/json"
    "fmt"
    "github.com/gin-gonic/gin"
    "github.com/gorilla/websocket"
    "log"
    "net/http"
    "strconv"
    "sync"
    "time"
    "videoplayer/proto"
    "videoplayer/worker"
)

var (
    upgrader = websocket.Upgrader{
        ReadBufferSize:  1024,
        WriteBufferSize: 1024,
        CheckOrigin: func(r *http.Request) bool {
            // 允许所有来源的连接
            return true
        },
    }
)

// 创建一个用于存储WebSocket连接的map和互斥锁
var (
    clients    = make(map[*websocket.Conn]bool)
    clientsMux sync.Mutex
)

func SetUpIMGroup(router *gin.Engine) {
    imGroup := router.Group("/im")
    imGroup.POST("/get_imKey", GetImKey)
    imGroup.GET("/ws", SRMessage)

}
func generateRandomHexString(length int) (string, error) {
    bytes := make([]byte, length/2) // 16字节的字符串需要32个十六进制字符,即16个字节
    if _, err := rand.Read(bytes); err != nil {
        return "", err
    }
    return hex.EncodeToString(bytes), nil
}

func GetImKey(c *gin.Context) {
    id, _ := c.Get("id")
    var req proto.ImKeyReq
    if err := c.ShouldBind(&req); err == nil {
        id1 := int(id.(float64))
        var redis_key string
        if id1 < req.To_user_id {
            redis_key = strconv.Itoa(id1) + "_" + strconv.Itoa(req.To_user_id) + "_imKey"
        } else {
            redis_key = strconv.Itoa(req.To_user_id) + "_" + strconv.Itoa(id1) + "_imKey"
        }
        if worker.IsContainKey(redis_key) == true {
            res := worker.GetRedis(redis_key)
            var retrievedData map[string]interface{}
            err2 := json.Unmarshal([]byte(res), &retrievedData)
            if err2 != nil {
                c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"})
                return
            }
            if int(retrievedData["from_user_id"].(float64)) == id1 {
                if int(retrievedData["is_read"].(float64)) == 1 {
                    worker.DelRedis(redis_key)
                }
                c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": retrievedData, "message": "success"})
                return
            } else if int(retrievedData["to_user_id"].(float64)) == id1 {
                retrievedData["is_read"] = 1
                str, _ := json.Marshal(retrievedData)
                res3 := worker.SetRedisWithExpire(redis_key, string(str), time.Second*300)
                res2 := worker.SetRedisWithExpire(redis_key+"_connection", retrievedData["im_session"].(string), time.Second*300)
                if res2 == false || res3 == false {
                    c.JSON(http.StatusOK, gin.H{"error": "set key failed", "code": proto.OperationFailed, "message": "failed"})
                    return
                }
                c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": retrievedData, "message": "success"})
                return
            } else {
                c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"})
                return
            }
        }

        //生成随机字符串
        imkey, err2 := generateRandomHexString(16)
        imSession, err3 := generateRandomHexString(8)
        if err2 != nil || err3 != nil {
            c.JSON(http.StatusOK, gin.H{"error": "generate key failed", "code": proto.OperationFailed, "message": "failed"})
            return
        }

        data := make(map[string]interface{})
        data["im_key"] = imkey
        data["from_user_id"] = id1
        data["to_user_id"] = req.To_user_id
        data["im_session"] = imSession
        data["is_read"] = 0
        str, _ := json.Marshal(data)
        //将key存入redis
        res := worker.SetRedisWithExpire(redis_key, string(str), time.Second*300)
        if res == false {
            c.JSON(http.StatusOK, gin.H{"error": "set key failed", "code": proto.OperationFailed, "message": "failed"})
            return
        }
        c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": data, "message": "success"})
        return
    } else {
        c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"})
    }
}

// 接收及发送消息
func SRMessage(c *gin.Context) {
    id, _ := c.Get("id")
    id1 := int(id.(float64))
    var redis_key string
    to_user_id := c.Query("to_user_id")
    to_user_id_num, _ := strconv.ParseInt(to_user_id, 10, 64)
    if id1 < int(to_user_id_num) {
        redis_key = strconv.Itoa(id1) + "_" + to_user_id + "_imKey"
    } else {
        redis_key = to_user_id + "_" + strconv.Itoa(id1) + "_imKey"
    }
    if worker.IsContainKey(redis_key+"_connection") == false {
        c.JSON(http.StatusOK, gin.H{"code": proto.OperationFailed, "message": "failed"})
        return
    }

    // 升级HTTP连接为WebSocket连接
    ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
    clients[ws] = true
    if err != nil {
        // log.Println(err)
        fmt.Println(err)
        return
    }
    defer ws.Close()
    res := worker.GetRedis(redis_key + "_connection")
    if res == "" {
        return
    }

    // 接收客户端消息并发送到指定用户
    go func(ws *websocket.Conn, session string, to_id string) {
        for {
            _, message, err_r := ws.ReadMessage()
            if err_r != nil {
                // 当连接关闭时,退出循环
                clientsMux.Lock()
                clients[ws] = false
                clientsMux.Unlock()
                //设置ws关闭状态信息
                break
            }

            // 将接收到的消息解析为JSON
            var msg proto.Message
            if err2 := json.Unmarshal(message, &msg); err2 != nil {
                log.Println("unmarshal:", err2)
                continue
            }
            if msg.Type == "msg" {
                // 将消息发送到指定用户
                worker.PushRedisList(session+"_"+to_user_id, msg.Msg) //将消息存入redis
            }
        }
    }(ws, res, to_user_id)

    // 从Redis中读取消息并发送到客户端
    for {
        if v := clients[ws]; v == true {
            res2 := worker.PopRedisListLeft(res + "_" + strconv.Itoa(id1))
            if res2 != "" {
                var msg proto.Message
                msg.Type = "msg"
                msg.Msg = res2
                msg.From_user_id = id1
                msg.Session = res
                res3, _ := json.Marshal(msg)
                err2 := ws.WriteMessage(websocket.TextMessage, res3)
                if err2 != nil {
                    break
                }
            }
            time.Sleep(time.Second * 1) // 每100毫秒查询一次
        } else {
            clientsMux.Lock()
            delete(clients, ws)
            clientsMux.Unlock()
            break
        }
    }
}

2.2.2 redis操作

// push redis list from right
func PushRedisList(key string, value string) bool {
    ctx := context.Background()
    err := redisClient.RPush(ctx, key, value).Err()
    if err != nil {
        fmt.Println("Error setting key: %v", err)
        return false
    }
    return true
}

// pop redis list from left,as queue
func PopRedisListLeft(key string) string {
    ctx := context.Background()
    val, err := redisClient.LPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误
    if err != nil {
        return ""
    }
    return val
}

// 获取redis
func GetRedis(key string) string {
    ctx := context.Background()
    val, err := redisClient.Get(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误
    if err != nil {
        fmt.Println(key, " Error getting key: %v", err)
        return ""
    }
    return val
}
func IsContainKey(key string) bool {
    ctx := context.Background()
    val, err := redisClient.Exists(ctx, key).Result() // 检查键是否存在, 如果存在则返回 1, 否则返回 0
    if err != nil {
        fmt.Println("Error getting key: %v", err)
        return false
    }
    if val == 0 {
        return false
    }
    return true
}

// 设置redis
func SetRedisWithExpire(key string, value string, expire time.Duration) bool { // 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等
    ctx := context.Background()
    // 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等
    err := redisClient.Set(ctx, key, value, expire).Err()
    if err != nil {
        fmt.Println("Error setting key: %v", err)
        return false
    }
    return true
}

2.2.3 Python测试代码

2.2.3.1 获取密钥

这需要与用户登录鉴权相关:

import subprocess
from datetime import datetime
from threading import Thread
import json
import sys
import os
import requests
import time
token=''
video_id=0

def get_token():
    url = "http://localhost:8083/user/login"
    global token
    user={"username":"","email":"","password":""}
    response = requests.post(url,data=user)
    res = json.loads(response.content.decode('utf-8'))
    print(res)
    if res['code'] ==0:
        token = res['data']['token']
        print("get token success")
    else:
        print('get token failed')
    return token

def create_im_conn():
    im={"to_user_id":2002}
    url ='http://localhost:8083/im/get_imKey'
    global token
    global video_id
    heades={'Content-Type': 'application/json','token':token}
    print(heades)
    response = requests.post(url,json=im,headers=heades)
    res = json.loads(response.content.decode('utf-8'))
    if res['code'] == 0:
        print(res)
        print("create im success")
        return res['data']['is_read']
    else:
        print(res)
        print("create im fail")
get_token()
for i in range(100):
    res = create_im_conn() #返回1说明对方已读,停止轮询
    if res ==1:
        break
    time.sleep(1)

2.2.3.2 消息传递

这里需要token,我的登录鉴权返回的是jwt token;

# -*- coding:utf-8 -*-
import websocket
from websocket import WebSocketApp

try:
    import thread
except ImportError:
    import _thread as thread
import time

class Test(object):
    def __init__(self):
        super(Test, self).__init__()
        self.url = "ws://localhost:8083/im/ws?to_user_id=2002&token=ey3JhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MTk0OTQ0MjYsImlkIjoxLCJ1c2VybmFtZSI6ImxpanVuIn0.cCKJfvXpVh_GaNaCsEeH8T5tP4DYjdIMURPKHnXj2Ng"
        self.ws = None

    def on_message(self, message,m):
        print("####### on_message #######")
        print("message:%s"%m)

    def on_error(self, error,e):
        print("####### on_error #######")
        print("error:%s" % error)
        print("e:%s"%e)

    def on_close(self,a,b,c):
        print("####### on_close #######")
        print(a,b,c)

    def on_ping(self, message):
        print("####### on_ping #######")
        print("ping message:%s" % message)

    def on_pong(self, message):
        print("####### on_pong #######")
        print("pong message:%s" % message)

    def on_open(self,e):
        print("####### on_open #######")

        thread.start_new_thread(self.run, ())

    def run(self, *args):
        while True:
            time.sleep(1)
            input_msg = input("输入要发送的消息(ps:输入关键词 close 结束程序):\n")
            if input_msg == "close":
                self.ws.close()  # 关闭
                print("thread terminating...")
                break
            else:
                self.ws.send(input_msg)

    def start(self):
        websocket.enableTrace(True)  # 开启运行状态追踪。debug 的时候最好打开他,便于追踪定位问题。

        self.ws = WebSocketApp(self.url,
                               on_open=self.on_open,
                               on_message=self.on_message,
                               on_error=self.on_error,
                               on_close=self.on_close)
        # self.ws.on_open = self.on_open  # 也可以先创建对象再这样指定回调函数。run_forever 之前指定回调函数即可。

        self.ws.run_forever()

if __name__ == '__main__':
    Test().start()

#发送测试信息:{"type":"msg","data":"hello2002","to_user_id":2002,"from_user_id":1,"session":"11917957"}

评论

  1. sdc
    已编辑
    10 月前
    2024-6-28 11:22:32

    test

    • 博主
      sdc
      10 月前
      2024-6-29 13:10:53

      hello

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇