跳转至

SessionManager

1. 模块定位

1.1 核心作用

SessionManager 是 WebSocket 连接管理器,负责管理所有 WebSocket 客户端连接的生命周期,包括连接建立、消息收发、心跳检测和连接关闭。

一句话定义:SessionManager 是 WebSocket 连接的"管家",确保每个客户端连接都被正确管理和维护。

1.2 设计目标

目标 说明
连接管理 维护所有活跃的 WebSocket 连接
消息推送 向指定客户端或广播消息
心跳检测 检测客户端是否存活,清理死连接
并发安全 支持多客户端同时连接和消息收发

1.3 职责边界

graph TB
    subgraph "SessionManager 职责范围"
        subgraph "✅ 负责"
            A1[WebSocket 连接升级]
            A2[Session 生命周期管理]
            A3[消息推送和广播]
            A4[心跳检测]
            A5[连接状态统计]
        end
        subgraph "❌ 不负责"
            B1[消息内容解析]
            B2[业务逻辑处理]
            B3[消息路由决策]
            B4[HTTP 服务器管理]
        end
    end

2. 设计决策

2.1 为什么需要独立的 SessionManager?

问题背景: - 需要支持多客户端同时连接 - 需要管理每个连接的状态 - 需要实现心跳检测和死连接清理

没有 SessionManager 的情况

graph LR
    subgraph "Gateway 直接管理"
        GW[Gateway]
        C1[Client 1]
        C2[Client 2]
        C3[Client 3]

        GW <--> C1
        GW <--> C2
        GW <--> C3
    end

    style GW fill:#f99

问题:Gateway 职责过重,连接管理与消息协调耦合

有 SessionManager 的情况

graph TB
    subgraph "分层管理"
        GW[Gateway
消息协调] SM[SessionManager
连接管理] C1[Client 1] C2[Client 2] C3[Client 3] end GW --> SM SM <--> C1 SM <--> C2 SM <--> C3 style GW fill:#9f9 style SM fill:#9f9

优势:职责分离,Gateway 专注消息协调,SessionManager 专注连接管理

2.2 为什么使用 Session 抽象?

决策:每个 WebSocket 连接封装为一个 Session 对象。

classDiagram
    class SessionManager {
        +sessions: map[string]*Session
        +AddSession(id, conn)
        +RemoveSession(id)
        +Broadcast(msg)
        +SendTo(id, msg)
        +GetClientCount()
    }

    class Session {
        +ID: string
        +Conn: *websocket.Conn
        +Connected: time.Time
        +LastActive: time.Time
        +Send(msg)
        +Close()
    }

    SessionManager "1" --> "*" Session

理由: 1. 状态隔离:每个连接的状态独立管理 2. 并发安全:每个 Session 有独立的写锁 3. 便于扩展:可在 Session 中添加更多元数据

2.3 为什么使用回调模式通知消息?

决策:SessionManager 收到客户端消息后,通过回调通知 Gateway。

sequenceDiagram
    participant C as Client
    participant S as Session
    participant SM as SessionManager
    participant CB as Callback

    C->>S: WebSocket Send
    S->>SM: 消息到达
    SM->>CB: OnMessage(clientID, msg)

    Note over CB: Gateway 注册的回调

理由: 1. 解耦:SessionManager 不关心消息如何处理 2. 灵活:业务逻辑可动态更换 3. 简单:避免复杂的事件系统


3. 接口契约

3.1 SessionManager 接口

// SessionManager WebSocket 会话管理器接口
type SessionManager interface {
    // 连接管理
    AddSession(id string, conn *websocket.Conn) error
    RemoveSession(id string)
    GetSession(id string) (*Session, bool)

    // 消息推送
    Broadcast(ctx context.Context, msg *Message) error
    SendTo(ctx context.Context, sessionID string, msg *Message) error

    // 状态查询
    GetClientCount() int
    GetSessionIDs() []string

    // 生命周期
    Stop(ctx context.Context) error
}

3.2 Session 结构

// Session WebSocket 会话
type Session struct {
    ID         string            // 会话唯一标识
    Conn       *websocket.Conn   // WebSocket 连接
    Connected  time.Time         // 连接时间
    LastActive time.Time         // 最后活跃时间
    Metadata   map[string]any    // 可选元数据
}

3.3 回调接口

// SessionConfig SessionManager 配置
type SessionConfig struct {
    // 消息回调(必需)
    OnMessage func(clientID string, msg *Message)

    // 连接事件回调(可选)
    OnConnect    func(clientID string)
    OnDisconnect func(clientID string)

    // 心跳配置
    HeartbeatInterval time.Duration  // 心跳间隔,默认 30s
    HeartbeatTimeout  time.Duration  // 心跳超时,默认 90s
}

3.4 接口设计说明

方法 调用时机 行为约定
AddSession 新连接建立时 添加到管理列表,启动消息读取循环
RemoveSession 连接关闭时 从列表移除,关闭连接
Broadcast 需要广播时 向所有活跃 Session 发送消息
SendTo 需要单播时 向指定 Session 发送消息
Stop Gateway 停止时 关闭所有连接

4. 消息推送设计

4.1 广播流程

sequenceDiagram
    participant GW as Gateway
    participant SM as SessionManager
    participant S1 as Session 1
    participant S2 as Session 2
    participant S3 as Session 3

    GW->>SM: Broadcast(ctx, msg)
    SM->>SM: 序列化消息

    par 并行发送
        SM->>S1: Send(msg)
        SM->>S2: Send(msg)
        SM->>S3: Send(msg)
    end

    Note over S1,S3: 各 Session 独立发送

设计要点: - 消息只序列化一次 - 各 Session 并行发送,互不阻塞 - 单个 Session 发送失败不影响其他

4.2 单播流程

sequenceDiagram
    participant GW as Gateway
    participant SM as SessionManager
    participant S as Session

    GW->>SM: SendTo(ctx, sessionID, msg)
    SM->>SM: 查找 Session

    alt Session 存在
        SM->>S: Send(msg)
        S-->>SM: 发送完成
        SM-->>GW: nil
    else Session 不存在
        SM-->>GW: ErrSessionNotFound
    end

4.3 消息接收流程

sequenceDiagram
    participant C as Client
    participant S as Session
    participant SM as SessionManager
    participant CB as OnMessage Callback

    loop 读取循环
        C->>S: WebSocket Message
        S->>S: 反序列化
        S->>SM: 消息到达
        SM->>CB: OnMessage(sessionID, msg)
    end

    Note over C,S: 连接关闭时退出循环

5. 心跳检测设计

5.1 心跳机制

stateDiagram-v2
    [*] --> Active: 连接建立
    Active --> Active: 收到消息/Pong
    Active --> Inactive: 超时无响应
    Inactive --> Active: 收到消息
    Inactive --> Closed: 超时关闭
    Closed --> [*]

5.2 心跳流程

sequenceDiagram
    participant SM as SessionManager
    participant S as Session
    participant C as Client

    loop 每 HeartbeatInterval
        SM->>S: 检查 LastActive
        alt 未超时
            S->>C: Ping
            C-->>S: Pong
            S->>S: 更新 LastActive
        else 已超时
            SM->>S: Close()
            Note over S: 触发 OnDisconnect
        end
    end

5.3 配置参数

参数 默认值 说明
HeartbeatInterval 30s 发送心跳间隔
HeartbeatTimeout 90s 判定连接超时的时间

6. 并发安全设计

6.1 数据结构

classDiagram
    class SessionManager {
        -sessions: sync.Map
        -mu: sync.RWMutex
        +AddSession()
        +RemoveSession()
        +Broadcast()
    }

    class Session {
        -mu: sync.Mutex
        +Send()
        +Close()
    }

    SessionManager --> Session

6.2 并发控制策略

操作 锁策略 说明
添加 Session 写锁 防止并发添加
移除 Session 写锁 防止并发移除
遍历 Session 读锁 允许并发遍历
发送消息 Session 级别锁 每个 Session 独立

6.3 并发场景

sequenceDiagram
    participant G1 as Goroutine 1
    participant SM as SessionManager
    participant G2 as Goroutine 2
    participant S as Session

    par 并发操作
        G1->>SM: Broadcast(msg1)
        G2->>SM: Broadcast(msg2)
    end

    SM->>S: Send(msg1)
    SM->>S: Send(msg2)

    Note over S: Session 内部串行化

7. 与其他模块的关系

7.1 依赖关系

graph TB
    SM[SessionManager]

    SM --> WS[gorilla/websocket]
    SM --> MSG[Message]

    GW[Gateway] --> SM

    style SM fill:#9f9

7.2 交互方式

交互方 交互方式 说明
Gateway 创建和管理 Gateway 创建 SessionManager
Gateway 回调注册 注册 OnMessage 回调
WebSocket 库 连接持有 使用 gorilla/websocket

8. 约束与限制

8.1 使用约束

约束 原因
必须注册 OnMessage 回调 否则无法处理客户端消息
Session ID 必须唯一 用于路由和日志
Stop 后不能复用 简化生命周期

8.2 性能限制

限制 说明
广播是异步的 不等待所有 Session 发送完成
无消息队列 直接发送,不缓冲
无消息确认 不等待客户端 ACK

8.3 错误处理约定

错误类型 处理方式
Session 不存在 返回 ErrSessionNotFound
发送失败 记录日志,移除 Session
连接断开 触发 OnDisconnect,移除 Session

9. 扩展点

9.1 可扩展的设计

扩展点 扩展方式
Session 元数据 在 Metadata 字段添加自定义数据
连接事件 注册 OnConnect/OnDisconnect 回调
心跳策略 调整 HeartbeatInterval/Timeout

9.2 不支持扩展的设计

限制 原因
不支持消息确认 增加复杂度,桌面场景不需要
不支持消息队列 内存限制,避免内存泄漏

10. 使用场景

10.1 多客户端广播

sequenceDiagram
    participant WX as 微信消息
    participant GW as Gateway
    participant SM as SessionManager
    participant BR as 浏览器
    participant DA as 桌面App
    participant MA as 移动App

    WX->>GW: 消息到达
    GW->>SM: Broadcast(msg)

    par 并行推送
        SM->>BR: WebSocket Push
        SM->>DA: WebSocket Push
        SM->>MA: WebSocket Push
    end

10.2 指定客户端推送

sequenceDiagram
    participant AI as AI处理
    participant GW as Gateway
    participant SM as SessionManager
    participant BR as 浏览器

    AI->>GW: 响应消息(指定客户端)
    GW->>SM: SendTo(sessionID, msg)
    SM->>BR: WebSocket Push

11. 相关文档