跳转至

设计简介

1. 概述

1.1 项目定位

gort 是一个轻量级多渠道通信网关,专为桌面 App 单用户场景设计,提供统一的 WebSocket 聊天通信服务接入层。通过标准化的 Channel 接口抽象,实现多种即时通讯平台的统一接入和消息转发。

1.2 设计目标

  • 统一接入:为不同通信渠道提供一致的抽象接口
  • 协议适配:将各种异构协议(Webhook 等)转换为标准消息格式
  • 轻量简洁:针对桌面 App 单用户场景,去除不必要的复杂设计
  • 易于集成:作为库嵌入桌面应用,低资源占用
  • 插件化:支持渠道热插拔和动态配置

1.3 非目标(明确排除)

  • ❌ 分布式部署(单实例运行)
  • ❌ 高并发海量连接(单用户场景)
  • ❌ 消息持久化存储(内存级转发)
  • ❌ 复杂限流策略(平台方已有限制)
  • ❌ 多租户隔离

2. 整体架构

2.1 架构图

graph TB
    subgraph "外部通信渠道"
        WeChat[微信
Webhook] DingTalk[钉钉
Webhook] Feishu[飞书
Webhook] Telegram[Telegram
Webhook] Custom[其他自定义协议
Webhook] end subgraph "Channel 层 - 协议适配器" direction TB WC_Adapter[WeChat Channel
协议解析
消息标准化
签名验证] DT_Adapter[DingTalk Channel
协议解析
消息标准化
签名验证] FS_Adapter[Feishu Channel
协议解析
消息标准化
签名验证] TG_Adapter[Telegram Channel
协议解析
消息标准化] WH_Adapter[Generic Webhook Channel
通用 Webhook
协议解析] end subgraph "Gateway 层 - WebSocket 统一接入服务" GW["Gateway Core
消息协调
会话管理
生命周期控制"] MW["Middleware Chain
日志中间件
Token 验证"] Router["Message Router
客户端路由"] SessionMgr["Session Manager
WebSocket 会话管理"] end subgraph "客户端层" Browser[浏览器客户端
WebSocket 连接] DesktopApp[桌面 App 客户端
WebSocket 连接] MobileApp[移动 App 客户端
WebSocket 连接] end subgraph "业务层" Business[Business Logic
AI 助手/业务处理] end %% 外部渠道通过 Webhook 接入 Channel WeChat -->|HTTP POST| WC_Adapter DingTalk -->|HTTP POST| DT_Adapter Feishu -->|HTTP POST| FS_Adapter Telegram -->|HTTP POST| TG_Adapter Custom -->|HTTP POST| WH_Adapter %% Channel 将标准消息发送给 Gateway WC_Adapter -->|标准 Message| GW DT_Adapter -->|标准 Message| GW FS_Adapter -->|标准 Message| GW TG_Adapter -->|标准 Message| GW WH_Adapter -->|标准 Message| GW %% Gateway 内部处理流程 GW <--> MW MW <--> Router Router <--> SessionMgr SessionMgr <--> Business %% Gateway 与客户端双向通信 SessionMgr <==>|WebSocket| Browser SessionMgr <==>|WebSocket| DesktopApp SessionMgr <==>|WebSocket| MobileApp %% Gateway 通过 Channel 发送响应回外部渠道 GW -.->|SendMessage| WC_Adapter WC_Adapter -.->|HTTP Response| WeChat

2.2 核心架构说明

2.2.1 数据流向

入站流程(外部渠道 → WebSocket 客户端):

外部渠道 → Channel(协议适配) → Gateway.HandleChannelMessage() → 
中间件处理 → 业务处理 → WebSocket 推送 → 客户端

出站流程(WebSocket 客户端 → 外部渠道):

客户端 → WebSocket → Gateway.HandleClientMessage() → 
中间件处理 → 业务处理 → Channel.SendMessage() → 外部渠道

2.2.2 组件职责

组件 职责 说明
Channel 协议适配器 接收 Webhook,协议转换,签名验证
Gateway 消息协调中心 统一入口,消息分发,生命周期管理
SessionMgr WebSocket 管理 客户端连接、心跳、消息推送
Middleware 横切处理 日志、认证
Router 客户端路由 决定消息推送到哪个客户端

3. 核心接口定义

3.1 Channel 接口

// Channel 协议适配器接口
type Channel interface {
    // 基本信息
    Name() string
    Type() ChannelType
    Protocol() ProtocolType

    // 生命周期
    Start(ctx context.Context, handler MessageHandler) error
    Stop(ctx context.Context) error
    IsRunning() bool

    // 消息发送(出站)
    SendMessage(ctx context.Context, msg *Message) error

    // 状态查询
    GetStatus() ChannelStatus
}

// MessageHandler 消息处理回调
type MessageHandler func(ctx context.Context, msg *Message) error

关键变更说明: - 移除 SetOnMessage,改为 Start 时注入 handler,避免状态不一致 - 移除 HealthCheck,桌面场景下单实例无需健康检查 - 简化 ChannelStatus,移除 HealthScore 等过度设计字段

3.2 Gateway 接口

// Gateway WebSocket 网关接口
type Gateway interface {
    // 生命周期
    Start(ctx context.Context) error
    Stop(ctx context.Context) error

    // 消息处理(入站)
    HandleChannelMessage(ctx context.Context, msg *Message) error

    // 客户端管理
    RegisterClientHandler(handler ClientHandler)
    Broadcast(ctx context.Context, msg *Message) error
    SendToClient(ctx context.Context, clientID string, msg *Message) error

    // 状态查询
    GetClientCount() int
    IsRunning() bool
}

// ClientHandler 客户端消息处理器
type ClientHandler func(ctx context.Context, clientID string, msg *Message) (*Message, error)

3.3 核心数据结构

// Message 标准消息格式
type Message struct {
    ID        string                 `json:"id"`
    ChannelID string                 `json:"channel_id"`      // 来源渠道
    Direction MessageDirection       `json:"direction"`       // inbound/outbound
    From      UserInfo               `json:"from"`
    To        UserInfo               `json:"to"`
    Content   string                 `json:"content"`
    Type      MessageType            `json:"type"`            // text/image/file
    Metadata  map[string]interface{} `json:"metadata"`
    Timestamp time.Time              `json:"timestamp"`
}

// UserInfo 用户信息
type UserInfo struct {
    ID       string `json:"id"`
    Name     string `json:"name"`
    Avatar   string `json:"avatar"`
    Platform string `json:"platform"` // wechat/dingtalk 等
}

// ChannelConfig 渠道配置
type ChannelConfig struct {
    Name    string                 `json:"name"`
    Type    ChannelType            `json:"type"`
    Enabled bool                   `json:"enabled"`
    Webhook WebhookConfig          `json:"webhook"`
    Secrets map[string]string      `json:"secrets"`         // 敏感配置
    Options map[string]interface{} `json:"options"`
}

// WebhookConfig Webhook 配置
type WebhookConfig struct {
    Path   string `json:"path"`    // 如 "/wechat"
    Method string `json:"method"`  // 默认 POST
}

4. 组件详细设计

4.1 Channel 层设计

4.1.1 统一 HTTP 服务器

设计决策:所有 Channel 共享 Gateway 的统一 HTTP 服务器,而非每个 Channel 独立端口。

type Gateway struct {
    httpServer *http.Server    // 统一 HTTP 服务器
    channels   map[string]Channel
    mux        *http.ServeMux
}

// 注册 Channel Webhook 路由
func (g *Gateway) registerChannelRoutes() {
    for _, ch := range g.channels {
        path := ch.GetWebhookPath()
        g.mux.HandleFunc(path, g.createWebhookHandler(ch))
    }
}

优势: - 只需开放 2 个端口(HTTP + WebSocket) - 避免防火墙配置复杂性 - 统一入口便于日志和监控

4.1.2 Channel 实现示例

// WeChatChannel 微信渠道实现
type WeChatChannel struct {
    name      string
    config    ChannelConfig
    handler   MessageHandler
    isRunning bool
}

func (c *WeChatChannel) Start(ctx context.Context, handler MessageHandler) error {
    c.handler = handler
    c.isRunning = true
    return nil
}

func (c *WeChatChannel) HandleWebhook(w http.ResponseWriter, r *http.Request) {
    // 1. 验证签名
    if !c.verifySignature(r) {
        http.Error(w, "Invalid signature", http.StatusUnauthorized)
        return
    }

    // 2. 解析消息
    msg, err := c.parseMessage(r)
    if err != nil {
        http.Error(w, "Invalid message", http.StatusBadRequest)
        return
    }

    // 3. 调用 handler 转发给 Gateway
    if err := c.handler(r.Context(), msg); err != nil {
        http.Error(w, "Processing failed", http.StatusInternalServerError)
        return
    }

    // 4. 返回成功响应
    w.WriteHeader(http.StatusOK)
}

4.2 Gateway 层设计

4.2.1 消息处理流程

func (g *Gateway) HandleChannelMessage(ctx context.Context, msg *Message) error {
    // 1. 中间件处理
    if err := g.middlewareChain.Execute(ctx, msg); err != nil {
        return fmt.Errorf("middleware failed: %w", err)
    }

    // 2. 业务处理(如果注册了 handler)
    if g.clientHandler != nil {
        response, err := g.clientHandler(ctx, "", msg)
        if err != nil {
            return fmt.Errorf("business logic failed: %w", err)
        }

        // 3. 推送到 WebSocket 客户端
        if response != nil {
            return g.sessionMgr.Broadcast(ctx, response)
        }
    }

    // 4. 无业务 handler 时直接广播
    return g.sessionMgr.Broadcast(ctx, msg)
}

func (g *Gateway) HandleClientMessage(ctx context.Context, clientID string, msg *Message) error {
    // 1. 中间件处理
    if err := g.middlewareChain.Execute(ctx, msg); err != nil {
        return err
    }

    // 2. 业务处理
    if g.clientHandler != nil {
        response, err := g.clientHandler(ctx, clientID, msg)
        if err != nil {
            return err
        }

        // 3. 发送到指定渠道
        if response != nil && response.ChannelID != "" {
            channel, ok := g.channels[response.ChannelID]
            if !ok {
                return fmt.Errorf("channel not found: %s", response.ChannelID)
            }
            return channel.SendMessage(ctx, response)
        }
    }

    return nil
}

4.2.2 Session 管理

type SessionManager struct {
    sessions    map[string]*Session
    mu          sync.RWMutex
    upgrader    websocket.Upgrader
}

type Session struct {
    ID         string
    Conn       *websocket.Conn
    Connected  time.Time
    LastActive time.Time
    mu         sync.Mutex
}

func (sm *SessionManager) Broadcast(ctx context.Context, msg *Message) error {
    sm.mu.RLock()
    defer sm.mu.RUnlock()

    data, err := json.Marshal(msg)
    if err != nil {
        return err
    }

    for _, session := range sm.sessions {
        go func(s *Session) {
            s.mu.Lock()
            defer s.mu.Unlock()
            s.Conn.WriteMessage(websocket.TextMessage, data)
        }(session)
    }

    return nil
}

4.3 中间件设计

// Middleware 中间件接口
type Middleware interface {
    Name() string
    Handle(ctx context.Context, msg *Message, next Handler) error
}

type Handler func(ctx context.Context, msg *Message) error

// MiddlewareChain 中间件链
type MiddlewareChain struct {
    middlewares []Middleware
}

func (mc *MiddlewareChain) Execute(ctx context.Context, msg *Message) error {
    var index int

    var next Handler
    next = func(ctx context.Context, msg *Message) error {
        if index >= len(mc.middlewares) {
            return nil
        }
        mw := mc.middlewares[index]
        index++
        return mw.Handle(ctx, msg, next)
    }

    return next(ctx, msg)
}

4.3.1 日志中间件

type LoggingMiddleware struct {
    logger *slog.Logger
}

func (m *LoggingMiddleware) Handle(ctx context.Context, msg *Message, next Handler) error {
    start := time.Now()

    m.logger.Info("message received",
        "channel", msg.ChannelID,
        "from", msg.From.ID,
        "type", msg.Type,
    )

    err := next(ctx, msg)

    m.logger.Info("message processed",
        "duration", time.Since(start),
        "error", err,
    )

    return err
}

4.3.2 Token 验证中间件

type TokenMiddleware struct {
    token string
}

func (m *TokenMiddleware) Handle(ctx context.Context, msg *Message, next Handler) error {
    // 从 metadata 中验证 token
    if m.token != "" {
        msgToken, ok := msg.Metadata["token"].(string)
        if !ok || msgToken != m.token {
            return fmt.Errorf("invalid token")
        }
    }
    return next(ctx, msg)
}

5. 消息处理流程

5.1 入站消息时序图

sequenceDiagram
    participant EC as 外部渠道
    participant CH as Channel
    participant GW as Gateway
    participant MW as Middleware
    participant BL as 业务逻辑
    participant SM as SessionMgr
    participant CL as WebSocket客户端

    EC->>CH: HTTP POST Webhook
    CH->>CH: 验证签名
    CH->>CH: 解析并标准化消息
    CH->>GW: HandleChannelMessage(msg)
    GW->>MW: 中间件处理
    MW-->>GW: 继续
    GW->>BL: 业务处理
    BL-->>GW: 返回响应消息
    GW->>SM: Broadcast(response)
    SM->>CL: WebSocket 推送
    CL-->>SM: 收到确认

5.2 出站消息时序图

sequenceDiagram
    participant CL as WebSocket客户端
    participant SM as SessionMgr
    participant GW as Gateway
    participant MW as Middleware
    participant BL as 业务逻辑
    participant CH as Channel
    participant EC as 外部渠道

    CL->>SM: WebSocket 发送消息
    SM->>GW: HandleClientMessage(clientID, msg)
    GW->>MW: 中间件处理
    MW-->>GW: 继续
    GW->>BL: 业务处理
    BL-->>GW: 返回响应消息
    GW->>CH: SendMessage(msg)
    CH->>CH: 转换为平台格式
    CH->>EC: HTTP POST/API 调用
    EC-->>CH: 响应
    CH-->>GW: 发送成功

6. 配置管理

6.1 配置优先级

环境变量 > 配置文件 > 默认值

6.2 配置结构

# config.yaml
gateway:
  name: "gort-gateway"
  websocket_addr: ":9000"      # WebSocket 监听地址
  http_addr: ":8080"           # HTTP 监听地址(Webhook)
  token: ""                    # 可选的访问令牌

channels:
  - name: "wechat"
    type: "wechat"
    enabled: true
    webhook:
      path: "/webhook/wechat"
    secrets:
      app_id: "${WECHAT_APP_ID}"
      app_secret: "${WECHAT_APP_SECRET}"
      token: "${WECHAT_TOKEN}"

  - name: "dingtalk"
    type: "dingtalk"
    enabled: true
    webhook:
      path: "/webhook/dingtalk"
    secrets:
      app_key: "${DINGTALK_APP_KEY}"
      app_secret: "${DINGTALK_APP_SECRET}"

6.3 环境变量命名

# Gateway 配置
export GORT_WEBSOCKET_ADDR=":9000"
export GORT_HTTP_ADDR=":8080"
export GORT_TOKEN="optional-token"

# 渠道配置(使用渠道名称前缀)
export GORT_CHANNEL_WECHAT_APP_ID="wx..."
export GORT_CHANNEL_WECHAT_APP_SECRET="secret..."
export GORT_CHANNEL_WECHAT_TOKEN="token..."

7. 错误处理与重试

7.1 错误分类

错误类型 处理方式 说明
协议错误 立即返回 400 签名验证失败、格式错误
临时错误 重试 3 次 网络超时、服务端 5xx
业务错误 记录日志 业务逻辑异常
系统错误 优雅降级 内存不足、连接断开

7.2 重试策略

// 简单的指数退避重试
func retryWithBackoff(ctx context.Context, fn func() error, maxRetries int) error {
    var err error
    for i := 0; i < maxRetries; i++ {
        if err = fn(); err == nil {
            return nil
        }

        // 指数退避:1s, 2s, 4s
        delay := time.Duration(1<<i) * time.Second
        select {
        case <-time.After(delay):
            continue
        case <-ctx.Done():
            return ctx.Err()
        }
    }
    return err
}

8. 安全措施

8.1 Webhook 安全

措施 实现
签名验证 每个 Channel 验证平台签名
Token 验证 可选的 WebSocket 连接 Token
HTTPS 生产环境强制使用 TLS
IP 白名单 可选的平台 IP 限制

8.2 WebSocket 安全

// 升级配置
upgrader := websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        // 生产环境应限制 Origin
        return true
    },
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
}

9. 监控与日志

9.1 关键指标

type Metrics struct {
    MessagesReceived  int64     // 接收消息数
    MessagesSent      int64     // 发送消息数
    ActiveClients     int       // 活跃客户端数
    ChannelErrors     map[string]int64  // 各渠道错误数
    ProcessingTime    time.Duration     // 平均处理时间
}

9.2 日志规范

// 结构化日志
logger.Info("message processed",
    slog.String("channel", msg.ChannelID),
    slog.String("msg_id", msg.ID),
    slog.Duration("duration", duration),
    slog.String("error", errStr),
)

10. 优化变更总结

10.1 移除的过度设计

原设计 问题 优化方案
MaxWorkers/QueueSize 单用户场景无需复杂并发控制 使用简单 goroutine
HealthCheck 接口 单实例无需健康检查 移除接口,保留简单状态查询
HealthScore 过度设计 简化为布尔状态
DLQ(死信队列) 单用户场景可即时处理错误 直接返回错误,由调用方处理
复杂路由策略 单用户场景只需简单广播 简化为广播或指定客户端
ChannelSwitcher 与 Router 职责重叠 合并到业务层处理
MessageForwarder 多余抽象层 Gateway 直接调用 Channel
多端口 Webhook 防火墙配置复杂 统一 HTTP 服务器

10.2 修复的逻辑谬误

问题 修复方案
Channel 与 Gateway 交互不明确 明确使用 Start 时注入 handler 模式
Router 与 Switcher 职责重叠 移除 Switcher,Router 只负责客户端路由
消息流向不清晰 明确入站/出站两条独立流程
缺少客户端标识 Message 增加 Direction 字段区分流向

10.3 补充的缺失组件

新增组件 说明
统一 HTTP 服务器 所有 Channel 共享,减少端口占用
SessionManager 明确 WebSocket 会话管理职责
Message Direction 区分入站/出站消息
结构化日志 使用 slog 替代简单日志
配置验证 启动时验证配置完整性
优雅关闭 确保消息处理完成后再关闭

11. 使用示例

11.1 基础使用

package main

import (
    "context"
    "log"
    "gort/pkg/gateway"
)

func main() {
    // 1. 创建 Gateway
    gw, err := gateway.New(gateway.Config{
        WebSocketAddr: ":9000",
        HTTPAddr:      ":8080",
    })
    if err != nil {
        log.Fatal(err)
    }

    // 2. 注册渠道
    wechatCh := channels.NewWeChat(channels.WeChatConfig{
        AppID:     os.Getenv("WECHAT_APP_ID"),
        AppSecret: os.Getenv("WECHAT_APP_SECRET"),
        Token:     os.Getenv("WECHAT_TOKEN"),
        WebhookPath: "/webhook/wechat",
    })
    gw.RegisterChannel(wechatCh)

    // 3. 设置业务处理器
    gw.RegisterClientHandler(func(ctx context.Context, clientID string, msg *gateway.Message) (*gateway.Message, error) {
        // 业务逻辑处理
        log.Printf("Received: %s", msg.Content)

        // 返回响应
        return &gateway.Message{
            ChannelID: msg.ChannelID,
            Content:   "Received: " + msg.Content,
            Type:      gateway.MessageTypeText,
        }, nil
    })

    // 4. 启动
    ctx := context.Background()
    if err := gw.Start(ctx); err != nil {
        log.Fatal(err)
    }

    // 5. 优雅关闭
    <-ctx.Done()
    gw.Stop(context.Background())
}

12. 附录

12.1 术语表

术语 说明
Channel 协议适配器,对接外部 IM 平台
Gateway 网关核心,消息协调中心
Session WebSocket 客户端连接会话
Webhook HTTP 回调接口,接收外部消息
入站消息 从外部渠道进入系统的消息
出站消息 从系统发送到外部渠道的消息

12.2 技术选型

组件 选型 理由
WebSocket gorilla/websocket 成熟稳定,广泛使用
HTTP net/http 标准库,无需额外依赖
日志 log/slog Go 1.21+ 标准结构化日志
配置 envconfig + yaml 简单实用