1.Go中websocket使用
全局变量
// 创建一个用于存储WebSocket连接的map和互斥锁
clients = make(map[*websocket.Conn]bool)
clientsMux sync.Mutex
测试消息:
r.GET("/ws", func(c *gin.Context) {
// 升级HTTP连接为WebSocket连接
id := c.Query("id")
fmt.Println("id=",id)
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Println(err)
return
}
defer ws.Close()
clients[ws] = true
// 监听连接关闭
go func(ws *websocket.Conn) {
for {
_, message, err := ws.ReadMessage()
if err != nil {
clientsMux.Lock()
clients[ws] = false
clientsMux.Unlock()
fmt.Println("connection has closed")
break
}
fmt.Println("msg:", string(message))
}
}(ws)
// 发送消息到 WebSocket 连接
for {
if v, e := clients[ws]; v {
// 每隔两秒给前端推送一句消息“hello, WebSocket”
err := ws.WriteMessage(websocket.TextMessage, []byte("hello, WebSocket"))
if err != nil {
log.Println("send error:",err,e)
}
time.Sleep(time.Second * 2)
}else{
clientsMux.Lock()
delete(clients, ws)
clientsMux.Unlock()
break
}
}
})
2.根据websocket实现实时聊天
2.1 实现原理
2.1.1 交互密钥
1.客户端通过轮询的方式请求服务器,对话双方需在同一时间段内请求;
2.服务器生成随机的密钥与随机的会话id;
3.客户端请求时若双方同时在线,读取redis 中的im_key,中is_read=1说明双方都在线且都可以获取到密钥;
2.2.2 消息收发
1.客户端与服务器建立websocket长连接,在建立连接时需要先鉴权,需要判断双方是否是需要收发消息的;
2.建立长连接后,收到的消息将保存在redis list中,通过redis_key_id的方式去查找该用户收到的消息,该用户发送的消息将存到redis_key_to_user_id中;
3.每个长连接中都能收发消息,实现消息的发送;
4.这里服务器不进行磁盘保存,所有消息通过前面分发的密钥在客户端加解密,服务器不持久保存密钥与消息内容;
2.2 实现代码
2.2.1 消息处理与交换密钥
package handler
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"log"
"net/http"
"strconv"
"sync"
"time"
"videoplayer/proto"
"videoplayer/worker"
)
var (
upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// 允许所有来源的连接
return true
},
}
)
// 创建一个用于存储WebSocket连接的map和互斥锁
var (
clients = make(map[*websocket.Conn]bool)
clientsMux sync.Mutex
)
func SetUpIMGroup(router *gin.Engine) {
imGroup := router.Group("/im")
imGroup.POST("/get_imKey", GetImKey)
imGroup.GET("/ws", SRMessage)
}
func generateRandomHexString(length int) (string, error) {
bytes := make([]byte, length/2) // 16字节的字符串需要32个十六进制字符,即16个字节
if _, err := rand.Read(bytes); err != nil {
return "", err
}
return hex.EncodeToString(bytes), nil
}
func GetImKey(c *gin.Context) {
id, _ := c.Get("id")
var req proto.ImKeyReq
if err := c.ShouldBind(&req); err == nil {
id1 := int(id.(float64))
var redis_key string
if id1 < req.To_user_id {
redis_key = strconv.Itoa(id1) + "_" + strconv.Itoa(req.To_user_id) + "_imKey"
} else {
redis_key = strconv.Itoa(req.To_user_id) + "_" + strconv.Itoa(id1) + "_imKey"
}
if worker.IsContainKey(redis_key) == true {
res := worker.GetRedis(redis_key)
var retrievedData map[string]interface{}
err2 := json.Unmarshal([]byte(res), &retrievedData)
if err2 != nil {
c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"})
return
}
if int(retrievedData["from_user_id"].(float64)) == id1 {
if int(retrievedData["is_read"].(float64)) == 1 {
worker.DelRedis(redis_key)
}
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": retrievedData, "message": "success"})
return
} else if int(retrievedData["to_user_id"].(float64)) == id1 {
retrievedData["is_read"] = 1
str, _ := json.Marshal(retrievedData)
res3 := worker.SetRedisWithExpire(redis_key, string(str), time.Second*300)
res2 := worker.SetRedisWithExpire(redis_key+"_connection", retrievedData["im_session"].(string), time.Second*300)
if res2 == false || res3 == false {
c.JSON(http.StatusOK, gin.H{"error": "set key failed", "code": proto.OperationFailed, "message": "failed"})
return
}
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": retrievedData, "message": "success"})
return
} else {
c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"})
return
}
}
//生成随机字符串
imkey, err2 := generateRandomHexString(16)
imSession, err3 := generateRandomHexString(8)
if err2 != nil || err3 != nil {
c.JSON(http.StatusOK, gin.H{"error": "generate key failed", "code": proto.OperationFailed, "message": "failed"})
return
}
data := make(map[string]interface{})
data["im_key"] = imkey
data["from_user_id"] = id1
data["to_user_id"] = req.To_user_id
data["im_session"] = imSession
data["is_read"] = 0
str, _ := json.Marshal(data)
//将key存入redis
res := worker.SetRedisWithExpire(redis_key, string(str), time.Second*300)
if res == false {
c.JSON(http.StatusOK, gin.H{"error": "set key failed", "code": proto.OperationFailed, "message": "failed"})
return
}
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": data, "message": "success"})
return
} else {
c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"})
}
}
// 接收及发送消息
func SRMessage(c *gin.Context) {
id, _ := c.Get("id")
id1 := int(id.(float64))
var redis_key string
to_user_id := c.Query("to_user_id")
to_user_id_num, _ := strconv.ParseInt(to_user_id, 10, 64)
if id1 < int(to_user_id_num) {
redis_key = strconv.Itoa(id1) + "_" + to_user_id + "_imKey"
} else {
redis_key = to_user_id + "_" + strconv.Itoa(id1) + "_imKey"
}
if worker.IsContainKey(redis_key+"_connection") == false {
c.JSON(http.StatusOK, gin.H{"code": proto.OperationFailed, "message": "failed"})
return
}
// 升级HTTP连接为WebSocket连接
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
clients[ws] = true
if err != nil {
// log.Println(err)
fmt.Println(err)
return
}
defer ws.Close()
res := worker.GetRedis(redis_key + "_connection")
if res == "" {
return
}
// 接收客户端消息并发送到指定用户
go func(ws *websocket.Conn, session string, to_id string) {
for {
_, message, err_r := ws.ReadMessage()
if err_r != nil {
// 当连接关闭时,退出循环
clientsMux.Lock()
clients[ws] = false
clientsMux.Unlock()
//设置ws关闭状态信息
break
}
// 将接收到的消息解析为JSON
var msg proto.Message
if err2 := json.Unmarshal(message, &msg); err2 != nil {
log.Println("unmarshal:", err2)
continue
}
if msg.Type == "msg" {
// 将消息发送到指定用户
worker.PushRedisList(session+"_"+to_user_id, msg.Msg) //将消息存入redis
}
}
}(ws, res, to_user_id)
// 从Redis中读取消息并发送到客户端
for {
if v := clients[ws]; v == true {
res2 := worker.PopRedisListLeft(res + "_" + strconv.Itoa(id1))
if res2 != "" {
var msg proto.Message
msg.Type = "msg"
msg.Msg = res2
msg.From_user_id = id1
msg.Session = res
res3, _ := json.Marshal(msg)
err2 := ws.WriteMessage(websocket.TextMessage, res3)
if err2 != nil {
break
}
}
time.Sleep(time.Second * 1) // 每100毫秒查询一次
} else {
clientsMux.Lock()
delete(clients, ws)
clientsMux.Unlock()
break
}
}
}
2.2.2 redis操作
// push redis list from right
func PushRedisList(key string, value string) bool {
ctx := context.Background()
err := redisClient.RPush(ctx, key, value).Err()
if err != nil {
fmt.Println("Error setting key: %v", err)
return false
}
return true
}
// pop redis list from left,as queue
func PopRedisListLeft(key string) string {
ctx := context.Background()
val, err := redisClient.LPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误
if err != nil {
return ""
}
return val
}
// 获取redis
func GetRedis(key string) string {
ctx := context.Background()
val, err := redisClient.Get(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误
if err != nil {
fmt.Println(key, " Error getting key: %v", err)
return ""
}
return val
}
func IsContainKey(key string) bool {
ctx := context.Background()
val, err := redisClient.Exists(ctx, key).Result() // 检查键是否存在, 如果存在则返回 1, 否则返回 0
if err != nil {
fmt.Println("Error getting key: %v", err)
return false
}
if val == 0 {
return false
}
return true
}
// 设置redis
func SetRedisWithExpire(key string, value string, expire time.Duration) bool { // 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等
ctx := context.Background()
// 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等
err := redisClient.Set(ctx, key, value, expire).Err()
if err != nil {
fmt.Println("Error setting key: %v", err)
return false
}
return true
}
2.2.3 Python测试代码
2.2.3.1 获取密钥
这需要与用户登录鉴权相关:
import subprocess
from datetime import datetime
from threading import Thread
import json
import sys
import os
import requests
import time
token=''
video_id=0
def get_token():
url = "http://localhost:8083/user/login"
global token
user={"username":"","email":"","password":""}
response = requests.post(url,data=user)
res = json.loads(response.content.decode('utf-8'))
print(res)
if res['code'] ==0:
token = res['data']['token']
print("get token success")
else:
print('get token failed')
return token
def create_im_conn():
im={"to_user_id":2002}
url ='http://localhost:8083/im/get_imKey'
global token
global video_id
heades={'Content-Type': 'application/json','token':token}
print(heades)
response = requests.post(url,json=im,headers=heades)
res = json.loads(response.content.decode('utf-8'))
if res['code'] == 0:
print(res)
print("create im success")
return res['data']['is_read']
else:
print(res)
print("create im fail")
get_token()
for i in range(100):
res = create_im_conn() #返回1说明对方已读,停止轮询
if res ==1:
break
time.sleep(1)
2.2.3.2 消息传递
这里需要token,我的登录鉴权返回的是jwt token;
# -*- coding:utf-8 -*-
import websocket
from websocket import WebSocketApp
try:
import thread
except ImportError:
import _thread as thread
import time
class Test(object):
def __init__(self):
super(Test, self).__init__()
self.url = "ws://localhost:8083/im/ws?to_user_id=2002&token=ey3JhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MTk0OTQ0MjYsImlkIjoxLCJ1c2VybmFtZSI6ImxpanVuIn0.cCKJfvXpVh_GaNaCsEeH8T5tP4DYjdIMURPKHnXj2Ng"
self.ws = None
def on_message(self, message,m):
print("####### on_message #######")
print("message:%s"%m)
def on_error(self, error,e):
print("####### on_error #######")
print("error:%s" % error)
print("e:%s"%e)
def on_close(self,a,b,c):
print("####### on_close #######")
print(a,b,c)
def on_ping(self, message):
print("####### on_ping #######")
print("ping message:%s" % message)
def on_pong(self, message):
print("####### on_pong #######")
print("pong message:%s" % message)
def on_open(self,e):
print("####### on_open #######")
thread.start_new_thread(self.run, ())
def run(self, *args):
while True:
time.sleep(1)
input_msg = input("输入要发送的消息(ps:输入关键词 close 结束程序):\n")
if input_msg == "close":
self.ws.close() # 关闭
print("thread terminating...")
break
else:
self.ws.send(input_msg)
def start(self):
websocket.enableTrace(True) # 开启运行状态追踪。debug 的时候最好打开他,便于追踪定位问题。
self.ws = WebSocketApp(self.url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
# self.ws.on_open = self.on_open # 也可以先创建对象再这样指定回调函数。run_forever 之前指定回调函数即可。
self.ws.run_forever()
if __name__ == '__main__':
Test().start()
#发送测试信息:{"type":"msg","data":"hello2002","to_user_id":2002,"from_user_id":1,"session":"11917957"}
test
hello