做了一個仿微信的即時通訊系統,從協議設計到消息投遞,從 Java 微服務到 Flutter 跨端 SDK。系統做完了,但我發現,故事似乎剛剛開始。
與 IM 結緣是在 2022 年。當時在研究各類分散式系統時,我發現即時通訊是後端領域挑戰性最大的方向之一,長連線管理、消息可靠投遞、多端同步、高併發處理,每一個單獨拿出來都是硬核問題,而 IM 系統需要把這些全部整合到一起,還能穩定運行。
從那時起,我就對 IM 如痴如醉。
但我很快發現,看文章和動手做完全是兩碼事。不落地的設計不是好設計——這是我在這個專案中反覆驗證的道理。很多方案在紙面上看起來很完美,真正寫程式部署上去才發現各種邊界條件和異常場景,這些都是不親手做永遠體會不到的。
IM 系統有一個很大的「欺騙性」——看起來很簡單,好像人人都能寫一個。發消息、收消息、聊天列表,介面就那幾個,功能說出來誰都懂。但真正動手做才會發現,想做好、做穩定,要考慮的東西多到讓人頭皮發麻。
我把這個專案過程中踩過的核心難題列出來,也給出我的解法,後文會展開每個點的具體實作:
消息可靠性
難題到底難在哪我的解法消息不丟網路抖動、服務重啟、客戶端當機,任何環節都可能丟消息Redis ZSet 延遲重試佇列 + RocketMQ 持久化兜底,三次漸進式重試(5s→30s→300s),超過重試次數轉離線存儲消息去重重試、網路重傳、斷線重連都會產生重複消息雙 ID 設計:clientMsgId(UUID)客戶端去重 + msgId(雪花演算法)服務端排序;SDK 記憶體快取 2 分鐘 TTL消息有序性群聊多人同時發言,消息到達順序可能和發送順序不一致雪花 ID 天然遞增,客戶端按 msgId 排序展示,不依賴到達順序重試消息重試過程中服務重啟怎麼辦?多個重試任務並發怎麼防衝突?Lua 腳本原子認領(ZRANGEBYSCORE + ZREM 原子操作),LZ4 壓縮存儲節省 Redis 記憶體連線與狀態
難題到底難在哪我的解法斷線重連行動端網路頻繁切換(WiFi↔4G),重連不能丟消息SDK 指數退避自動重連(1s→30s),_pendingSends 佇列保存發送失敗的消息,重連後自動 flush弱網優化地鐵、電梯裡網路時斷時續,消息發不出去又不能丟客戶端消息級重試(2s→4s→8s,最多 3 次)+ 離線消息佇列雙重保障,網路恢復後自動補發在線狀態一致性用戶連在節點 A,但節點 B 還以為他在線;用戶斷網了服務端沒感知到Netty IdleStateHandler 心跳檢測 + 本地 ConcurrentHashMap 管理連線 + 定時清理 stale channel百萬連線單機記憶體、執行緒模型、GC 壓力都是瓶頸Netty Epoll + Boss/Worker 執行緒分離 + PooledByteBufAllocator 池化記憶體 + TCP 參數調校(目前還在優化中)消息投遞
難題到底難在哪我的解法離線消息用戶離線期間積壓了幾百條消息,上線後怎麼高效拉取?推拉結合:MobPush 推送通知喚醒 → 客戶端主動拉取會話列表摘要 → 打開聊天時按需拉歷史,避免大量消息瞬間湧入群消息廣播一個 500 人的群,一條消息要推到所有在線成員,延遲怎麼控?RocketMQ 廣播消費 + 每個 im-connect 只推本地在線成員,讀擴散模式只存一份消息多端同步同一帳號手機 + 平板同時登入,消息狀態怎麼保持一致?支援最多 5 裝置同時在線,每裝置獨立 Token(Redis Key 含 deviceType),已讀回執即時同步已讀未讀用戶打開聊天就標記已讀,但快速切換多個聊天會話時容易亂SDK 上下文感知 ACK:ChatSessionManager 追蹤當前打開的會話,自動發送已讀回執,用戶無感知已讀回執一條一條發已讀 ACK 太頻繁,浪費頻寬和效能水位線機制:客戶端只發一個 lastReadMsgId,服務端按 msgId ≤ lastReadMsgId 批次標記已讀,一條指令搞定,無需逐條確認協議與存儲
難題到底難在哪我的解法消息 ID 設計UUID 無序不好排序,自增 ID 分散式環境下會衝突雙軌設計:clientMsgId(UUID 去重)+ msgId(雪花演算法,全域唯一且天然有序)協議設計JSON 體積大解析慢,高頻消息場景下是效能瓶頸Protobuf 二進位協議 + 信封模式 + 欄位級優化(fixed64 存 ID、bytes 存 UUID、chatId 動態計算)客戶端消息存儲聊天記錄全放服務端?每次打開聊天都網路請求?本地 SQLite 快取 + 服務端 MongoDB 持久化,拉取歷史時按時間範圍增量同步高併發低延遲消息鏈路越長延遲越高,怎麼把端到端延遲壓到毫秒級?三層路由(本地直推 → gRPC 跨節點 → 離線存儲),Protobuf 減少序列化開銷,TCP_NODELAY 禁用 Nagle上面這張表基本覆蓋了 IM 系統的核心難點。每一行背後都是反覆調試、推翻重來的過程。所以很多人寫了聊天 Demo 就覺得 IM 不過如此,但 Demo 和生產級系統之間的差距,可能比 Java 和 JavaScript 的差距還大,哈哈,都是淚啊。
後面的章節會逐一展開每個點的具體實作細節和踩坑經歷。
說實話,這個專案過程中有太多次我想放棄:
每次遇到這些問題,我都會想:要不就算了吧,誰也不缺這麼一個專案。但每次想到「不落地的設計不是好設計」這句話,又覺得如果連一個能真正跑起來的 IM 系統都沒做過,那之前所有的學習和研究都是紙上談兵。
於是就這麼堅持下來了。這個專案從 2024 年 5 月正式啟動,那時候還沒有 AI 輔助程式設計的概念,Vibe Coding 更是聞所未聞——每一行程式碼都是純手搓出來的。一直到 2025 年才慢慢開始接入 AI 輔助,但核心的架構設計和關鍵模組都是自己一行一行敲的。兩年下來,記不清有多少個深夜對著螢幕 debug 到天亮,只記得每次調通一個複雜鏈路時的那種興奮感——值了。
以下是 git 提交和程式碼情況:

GitLab 上一共維護了 4 個專案,各司其職:
專案說明xzll-im-serverJava 微服務後端,包含 gateway、auth、connect(Netty 長連線)、business(核心業務)、console(管理後台 API)、data-sync(ES 資料同步)、social(社交功能)7 個模組xzll-im-flutter-clientFlutter 客戶端 UI 層,基於 GetX 狀態管理,22 個頁面,負責聊天、好友、群組、音視頻通話等使用者介面xzll-im-flutter-sdk自研 IM SDK(獨立 Flutter Package),封裝 WebSocket 連線管理、Protobuf 序列化、消息收發、自動重連、離線佇列、去重、ACK 等底層能力,與 UI 層完全解耦xzll-im-app-uniapp鑑於 uniapp 的效能問題,基本廢棄。後期會出個 uniapp 的 sdk。
目前這個 IM 系統已經實現了以下功能:
整個系統目前有 104+ 個 API 介面、約 25 張資料表、22 個 Flutter 頁面。
技術棧也經歷了幾輪演進,最終定型為:
後端:Spring Boot 3.3.7 + Spring Cloud 2023.0.4 + JDK 21
通信:Netty WebSocket + Protobuf + gRPC
存儲:MySQL + MongoDB(分片叢集)+ Redis + Elasticsearch + MinIO
訊息佇列:RocketMQ(DLedger 叢集模式)
客戶端:Flutter 3.35 + GetX + 自研 IM SDK
配置中心:Nacos
為了支撐這套系統,我自建了 ESXi 伺服器,跑了 6 個虛擬機來模擬生產環境。基礎設施全自建,從資料庫到消息佇列到配置中心,全部自己搭建和維護。
這個專案全部是在業餘時間完成的。白天上班,晚上和週末寫程式,經常寫到凌晨兩三點,有時候一個 bug 調通抬頭天已經亮了。兩年下來,長時間夜間編碼對眼睛和頸椎的考驗很大,後來換了明基 RD270Q 編程顯示器,夜間編碼的體驗提升很大——後文會結合具體的開發場景聊感受。
深夜編碼場景實拍:

螢幕光線很柔和,長時間盯著程式碼眼睛也不酸,凌晨兩三點寫程式和白天一樣舒服。
另外說明一下,文中所有顯示器實拍照片由於像素壓縮和燈光環境的影響,和實際顯示效果有較大差別,拍不出真實螢幕的細膩度和畫質,其實實際觀感要好得多。
IM 系統的業務特點決定了它天然適合微服務架構——長連線服務(有狀態)和業務處理服務(無狀態)的擴縮容節奏完全不同,必須拆開。
我把系統拆成了以下 7 個核心模組:
模組職責埠im-gatewaySpring Cloud Gateway API 閘道器,JWT 驗證過濾,限流8081im-authOAuth2 授權伺服器,JWT Token 簽發與驗證8082im-connectNetty WebSocket 長連線層,即時消息收發8085im-business核心業務邏輯(消息存儲、好友、群組、搜尋)8083im-data-syncMongoDB → Elasticsearch 資料同步(CQRS)8086im-console管理後台 API8084im-social社交功能(圈子、貼文、活動)8087### 2.2 為什麼這樣拆
最關鍵的設計決策是把 im-connect(有狀態)和 im-business(無狀態)拆開:
im-connect 維護著所有客戶端的 WebSocket 長連線,是典型的有狀態服務。它只負責消息的路由和推送,不做持久化。im-business 消費 RocketMQ 消息做持久化,完全無狀態,可以隨意水平擴展。這種拆分意味著:如果業務邏輯的 QPS 上來了,直接加 im-business 實例就行,不需要動長連線層。反過來,如果連線數不夠了,加 im-connect 節點,透過 Nacos 服務發現自動註冊。
資料同步也獨立成了 im-data-sync 模組,採用 CQRS 模式——消息先寫到 MongoDB,然後異步同步到 Elasticsearch 做全文搜尋,不影響主消息鏈路的寫入效能。
在技術選型上,我走過一些彎路,最終的選擇都有明確的理由:
gRPC 取代 Dubbo:專案早期用的是 Dubbo 做服務間通訊,遇到了不少序列化相容問題和版本升級痛點。後來遷移到 gRPC,配合 Protobuf 的強型別定義,介面契約清晰,跨語言支援好(服務端 Java,客戶端 Flutter 都能用同一套 proto 定義),維護成本大幅降低。這個遷移過程很痛苦,但值得。
RocketMQ 取代 Kafka:IM 場景需要支援廣播消費(群消息推送到所有 connect 節點)、交易消息、延遲消息。RocketMQ 原生支援這些特性,不需要像 Kafka 那樣額外搭建基礎設施。而且 RocketMQ 對消息軌跡和死信佇列的支援更完善,方便排查消息丟失問題。部署上採用 DLedger 叢集模式,基於 Raft 協議實作 Broker 的主從自動切換,避免了單點故障導致消息丟失的風險——IM 系統對消息可靠性要求極高,任何一條消息都不能丟。
消息存儲的三次演進(MySQL 分庫分表 → HBase → MongoDB):消息存儲是整個專案調整最多的部分。第一版用 MySQL,資料量上來後做了分庫分表,但 IM 消息的高頻寫入很快遇到瓶頸。第二版遷移到 HBase,寫入吞吐上去了,但發現 IM 的查詢模式(按會話拉取歷史、按時間範圍分頁、按使用者 ID 查最近 N 條)和 HBase 的 scan 模型並不匹配——HBase 擅長基於 rowkey 的精確查找,而 IM 消息更多是範圍查詢。第三版遷移到 MongoDB,靈活的 Schema 和豐富的索引能力讓消息存儲和查詢都順暢了。這個選型過程走了不少彎路,但每一步都加深了對不同存儲引擎特性的理解。
Spring Cloud Gateway:WebFlux 非同步非阻塞模型,適合閘道器這種高併發低延遲的場景。配合 JWT Filter 做驗證,整體鏈路簡潔高效。
系統整體架構圖如下:

部署拓撲如下:

上邊是設計層面的架構和部署拓撲,實際跑起來是這樣的——各服務在虛擬機上的運行日誌:
IM 場景下,協議效率直接影響系統效能。每秒可能有數千條消息在傳輸,如果用 JSON 這種文字協議,體積大、解析慢,在高頻消息場景下是明顯的瓶頸。
Protobuf 作為二進位協議,相比 JSON 有幾個關鍵優勢:
還有一個我選擇 Protobuf 的重要原因:三端協議一致性。IM 系統有 Java 服務端、Flutter SDK、Flutter 客戶端三個端,Protobuf 的 .proto 文件就是三端共享的介面契約。改了 proto 文件,三端編譯時立刻就能發現不相容的地方,比 JSON 靠文件約束欄位名可靠得多。
當然 Protobuf 也有代價——proto 文件的維護和跨端同步需要額外的工作量,但這個代價和它帶來的收益相比完全值得。
我採用了經典的「信封模式」來設計協議——外層是統一的信封,內層按消息類型分發不同的載荷:
protobuf
// 客戶端 → 服務端
message ImProtoRequest {
MsgType type = 1; // 消息類型
bytes payload = 2; // 具體消息內容(按 type 反序列化)
}
// 服務端 → 客戶端
message ImProtoResponse {
MsgType type = 1;
bytes payload = 2;
int32 code = 3; // 回應碼
string msg = 4; // 回應描述
}
MsgType 列舉定義了 24 種消息類型,涵蓋了 IM 系統的所有場景:
protobuf
enum MsgType {
C2C_SEND = 1; // 單聊發送
C2C_ACK = 2; // 單聊 ACK
C2C_WITHDRAW = 3; // 單聊撤回
C2C_MSG_PUSH = 5; // 單聊消息推送
GROUP_SEND = 7; // 群聊發送
GROUP_MSG_PUSH = 8; // 群消息推送
GROUP_ACK = 9; // 群消息 ACK
GROUP_WITHDRAW = 10; // 群消息撤回
FRIEND_REQUEST = 11; // 好友申請
FRIEND_RESPONSE = 12; // 好友回應
READ_RECEIPT = 14; // 已讀回執
TYPING_STATUS = 16; // 正在輸入
CALL_OFFER = 20; // 通話邀請
CALL_ANSWER = 22; // 通話接聽
CALL_REJECT = 23; // 通話拒絕
CALL_END = 24; // 通話結束
// ...更多類型
}
程式碼位置:
im-common/src/main/proto/message_service.proto
這是我覺得最有意思的部分。在 IM 系統中,每條消息雖然可能只省幾十個位元組,但在百萬級消息量下,累計節省的頻寬非常可觀。
我做了三個關鍵的欄位優化:
1. fixed64 存雪花 ID
protobuf
fixed64 msgId = 2; // 8 位元組
fixed64 from = 3; // 8 位元組(使用者 ID)
fixed64 to = 4; // 8 位元組(使用者 ID)
雪花演算法生成的 ID 是純數字,用 fixed64(固定 8 位元組)儲存,而不是用 string(19 位元組的十進位字串)。每個 ID 欄位省 60% 的空間。一條消息裡有 msgId、from、to、time 四個 ID 欄位,加起來就省了很多。
2. bytes 存 UUID
protobuf
bytes clientMsgId = 1; // 16 位元組(UUID 原始位元組)
客戶端生成的消息唯一 ID 是 UUID,用 bytes 類型儲存 16 位元組的原始值,而不是 36 位元組的字串表示。省 56%。
3. chatId 動態計算,不在協議中傳輸
會話 ID 的格式是 bizType-chatType-smallUserId-bigUserId(例如 100-1-123-456,小 ID 在前),這個 ID 可以由發送者和接收者的 ID 動態計算出來,根本不需要在消息中傳輸。每條消息省 33 位元組。
protobuf
message C2CSendReq {
bytes clientMsgId = 1;
fixed64 msgId = 2;
fixed64 from = 3;
fixed64 to = 4;
int32 format = 5;
string content = 6;
fixed64 time = 7;
// chatId 已移除!由服務端/客戶端根據 from + to 動態計算
optional ReplyInfo reply_info = 10;
}
不同 im-connect 節點之間透過 gRPC 轉發消息,定義了 6 個 RPC 方法:
protobuf
service MessageService {
rpc ResponseServerAck2Client(ServerAckPush) returns (WebBaseResponse);
rpc ResponseClientAck2Client(ClientAckPush) returns (WebBaseResponse);
rpc SendWithdrawMsg2Client(WithdrawPush) returns (WebBaseResponse);
rpc PushFriendRequest2Client(FriendRequestPush) returns (WebBaseResponse);
rpc PushFriendResponse2Client(FriendResponsePush) returns (WebBaseResponse);
rpc TransferC2CMsg(ImProtoRequest) returns (WebBaseResponse); // 跨節點轉發
}
其中 TransferC2CMsg 是最核心的——當發送者和接收者連在不同的 im-connect 節點上時,透過這個 RPC 方法做跨節點轉發。
proto 文件程式碼截圖:

寫 proto 文件的時候有個感受很明顯——像 l/1、O/0、q/g 這種在普通顯示器上容易看混的相似字元,在編碼模式下清晰銳利,一眼就能區分。分區對比度技術讓深色程式碼主題下關鍵字、變數、註解的顏色層次更鮮明,長時間看協議定義不會串行。
編碼模式可以透過熱鍵一鍵切換,設定入口很方便:

im-connect 模組是整個系統最核心的部分——它維護著所有客戶端的 WebSocket 長連線,負責消息的即時路由和推送。
啟動時自動檢測執行平台,Linux 用 Epoll(效能最優),其他平台回退到 NIO:
java
private void initEventLoopGroups() {
int cpuCores = Runtime.getRuntime().availableProcessors();
int bossThreads = Math.max(1, cpuCores / 4); // 連線接收執行緒
int workerThreads = cpuCores * 2; // IO 處理執行緒
if (Epoll.isAvailable()) {
bossGroup = new EpollEventLoopGroup(bossThreads,
new DefaultThreadFactory("netty-boss", Thread.MAX_PRIORITY));
workerGroup = new EpollEventLoopGroup(workerThreads,
new DefaultThreadFactory("netty-worker", Thread.NORM_PRIORITY));
} else {
bossGroup = new NioEventLoopGroup(bossThreads, ...);
workerGroup = new NioEventLoopGroup(workerThreads, ...);
}
}
TCP 參數也做了針對性調校:
java
bootstrap
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true) // 禁用 Nagle 演算法,降低延遲
.option(ChannelOption.SO_BACKLOG, 65535) // 連線佇列
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) // 池化記憶體
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(writeBufferLow, writeBufferHigh)); // 寫緩衝水位
程式碼位置:
im-connect/im-connect-service/.../netty/NettyServer.java
每個客戶端連線進來後,消息會經過一系列 Handler 的鏈式處理:
java
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec()); // HTTP 編解碼
pipeline.addLast(new HttpObjectAggregator(65536)); // HTTP 聚合
pipeline.addLast(new ChunkedWriteHandler()); // 大資料塊寫入
// WebSocket 壓縮(可選)
if (config.isEnableCompression()) {
pipeline.addLast(new WebSocketServerCompressionHandler());
}
pipeline.addLast("heart-notice",
new IdleStateHandler(idleCheckInterval, 0, 0, TimeUnit.SECONDS));
pipeline.addLast("connection-limit", connectionLimitHandler); // 連線數限制
pipeline.addLast("flow-control", flowControlHandler); // 流量控制
pipeline.addLast("metrics", new MetricsHandler()); // 指標採集
pipeline.addLast("auth", authHandler); // JWT 驗證
pipeline.addLast("websocket", webSocketServerHandler); // 消息分發
}
程式碼位置:
im-connect/.../netty/channel/WebSocketChannelInitializer.java
幾個關鍵 Handler 的職責:
驗證是在 WebSocket 握手的 HTTP 升級請求中完成的:
java
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (!(msg instanceof FullHttpRequest)) {
ctx.fireChannelRead(msg); // 非 HTTP 請求,放行給下一個 Handler
return;
}
FullHttpRequest request = (FullHttpRequest) msg;
String clientIp = getClientIp(ctx);
// IP 黑名單檢查
if (isIpLocked(clientIp)) {
ctx.channel().close();
return;
}
// IP 白名單放行
if (isIpWhitelisted(clientIp)) {
handleWhitelistAccess(ctx, request, clientIp);
return;
}
// JWT Token 校驗
if (performAuthentication(ctx, request, clientIp)) {
resetAuthFailures(clientIp);
ctx.pipeline().remove(this); // 驗證通過,移除自身
ctx.fireChannelRead(msg); // 放行
}
}
Token 校驗透過 Redis 完成,Key 格式為 USER_TOKEN_KEY:{userId}:{deviceType}:{tokenMd5},確保多裝置登入時 Token 不會混淆。
還有防暴力破解機制——同一個 IP 連續驗證失敗超過閾值,直接鎖定 IP:
java
private void handleAuthFailure(ChannelHandlerContext ctx, String clientIp, String reason) {
RAtomicLong failureCounter = redissonUtils.getAtomicLong(AUTH_FAILURE_KEY_PREFIX + clientIp);
long currentFailures = failureCounter.incrementAndGet();
failureCounter.expire(lockoutDurationMinutes * 2, TimeUnit.MINUTES);
if (currentFailures >= maxAuthFailures) {
lockIp(clientIp); // 鎖定 IP
}
ctx.channel().close();
}
程式碼位置:
im-connect/.../netty/handler/AuthHandler.java
本地連線透過 LocalChannelManager 管理,底層是 ConcurrentHashMap<String, Channel>:
像 WebSocketServerHandler(682 行)這種核心分發器,消息接收、驗證判斷、異常處理、連線管理的邏輯全在一個類裡,橫屏看需要來回捲動才能理清完整的呼叫鏈。直屏旋轉之後一屏多看幾十行,從消息入口到分發出口的完整鏈路一眼看全,高度和角度也能自由調節,找到最舒服的姿勢,長時間編碼脖子不酸。
順帶說一句,這台顯示器的支架設計確實省心——快拆式安裝不需要螺絲刀,單手就能完成橫豎屏切換、高度升降和角度傾斜,幾分鐘就搞定,對經常切換橫豎屏的開發場景很友好。
核心 Handler 直屏編碼實拍:

這是整個 IM 系統最核心的部分——一條消息從發送者發出,到接收者收到,中間經歷了什麼?
所有消息進來後,先由 HandlerDispatcher 做分發。它利用 Spring 的 ApplicationContext 自動發現所有 ProtoMsgHandlerStrategy 實作,按 MsgType 註冊到 Map 中:
java
@Component
public class HandlerDispatcher implements ApplicationContextAware {
private final Map<MsgType, ProtoMsgHandlerStrategy> protoHandlers = new HashMap<>();
public void dispatcher(ChannelHandlerContext ctx, ImProtoRequest protoRequest) {
MsgType msgType = protoRequest.getType();
ProtoMsgHandlerStrategy handler = protoHandlers.get(msgType);
handler.exchange(ctx, protoRequest);
}
@Override
public void setApplicationContext(ApplicationContext ctx) {
Collection<ProtoMsgHandlerStrategy> strategies =
ctx.getBeansOfType(ProtoMsgHandlerStrategy.class).values();
for (ProtoMsgHandlerStrategy strategy : strategies) {
protoHandlers.put(strategy.supportMsgType(), strategy);
}
}
}
策略介面定義很簡單:
java
public interface ProtoMsgHandlerStrategy {
MsgType supportMsgType();
void exchange(ChannelHandlerContext ctx, ImProtoRequest protoRequest);
default WebBaseResponse receiveAndSendMsg(ImProtoRequest protoRequest) {
return WebBaseResponse.returnResultError("不支援跨伺服器轉發");
}
}
新增消息類型時,只需要實作這個介面並加上 @Component 註解,Spring 自動註冊,完全不需要改分發器的程式碼。
而且分發時使用 CompletableFuture.runAsync() 將業務邏輯放到獨立執行緒池執行,實現了 Netty IO 執行緒與業務執行緒的隔離——即使某個消息處理耗時很長,也不會阻塞 Netty 的 EventLoop。
程式碼位置:
im-connect/.../dispatcher/HandlerDispatcher.java、im-connect/.../strategy/ProtoMsgHandlerStrategy.java
這是整個系統最複雜的部分。當用戶 A 給用戶 B 發一條消息,完整的流轉路徑如下:
客戶端 A(Flutter)透過 WebSocket 發送 Protobuf 二進位幀
↓
WebSocketServerHandler 解析 ImProtoRequest
↓
HandlerDispatcher 按 MsgType=C2C_SEND 路由到 C2CMsgSendProtoStrategyImpl
↓
exchange() 方法執行三步:
① RocketMQ 發消息 → im-business 消費並持久化到 MySQL + MongoDB
② 查找接收者 B 的在線狀態
③ 根據狀態走不同的路由策略
三層路由的核心邏輯:
java
public void exchange(ChannelHandlerContext ctx, ImProtoRequest protoRequest) {
C2CSendReq req = C2CSendReq.parseFrom(protoRequest.getPayload());
C2CSendMsgAO packet = convertToAO(req);
// 第一步:消息發到 RocketMQ,由 im-business 非同步持久化
boolean mqSent = c2CMsgProvider.sendC2CMsg(packet);
// 第二步:查找接收者在線狀態
ReceiveUserDataDTO receiveUserData =
super.getReceiveUserDataTemplate(packet.getToUserId(), this.redissonUtils);
// 第三步:三層路由
if (targetChannel != null && userStatus == ONLINE) {
// 🟢 第一層:接收者在本地(同一台 im-connect 伺服器)
sendProtoMsg(targetChannel, buildPushMsgResp(packet), packet);
c2CMsgRetryService.addToRetryQueue(packet); // 加入重試佇列保底
} else if (userStatus == null && targetChannel == null) {
// 🔴 第三層:接收者離線
c2CMsgProvider.offLineMsg(buildOffLineMsgDTO(packet));
} else if (targetChannel == null && userStatus == ONLINE && ipPortStr != null) {
// 🟡 第二層:接收者在線,但在另一台 im-connect 伺服器上
SmartGrpcClientManager.GrpcStubWrapper stubWrapper =
grpcClientManager.getStubByIP(targetIp, targetPort);
MessageServiceBlockingStub stub =
MessageServiceGrpc.newBlockingStub(stubWrapper.getChannelInfo().getChannel());
// 透過 gRPC 轉發到目標 im-connect 節點
ImProtoRequest forwardRequest = ImProtoRequest.newBuilder()
.setType(MsgType.C2C_SEND)
.setPayload(ByteString.copyFrom(c2cReq.toByteArray()))
.build();
stub.transferC2CMsg(forwardRequest);
}
}
程式碼位置:
im-connect/.../strategy/impl/c2c/C2CMsgSendProtoStrategyImpl.java
為什麼是三層?
im-connect 上,直接透過 Channel 寫入推送,毫秒級延遲。im-connect 上。透過 gRPC TransferC2CMsg 方法轉發到目標節點,目標節點的 receiveAndSendMsg() 方法接收並推送到本地 Channel。為什麼要推拉結合?
純推送模式有個問題:用戶離線期間積壓了幾百條消息,上線時一次性全推過來,WebSocket 連線瞬間被打滿,客戶端解析不過來容易卡頓甚至當機。純拉取模式也有問題:用戶不知道有新消息,不會主動來拉。
所以採用推拉結合:
這樣既保證了消息及時性,又避免了大量離線消息瞬間湧入導致客戶端壓力過大。
消息投遞不能丟消息。即使第一層推送成功了,也存在網路抖動導致客戶端沒收到的情況。所以我設計了一套基於 Redis ZSet 的延遲重試佇列。
核心設計:
java
public void addToRetryQueue(C2CSendMsgAO packet) {
C2CMsgRetryEvent retryEvent = buildRetryEvent(packet);
String jsonValue = JSONUtil.toJsonStr(retryEvent);
String compressedValue = CompressionUtil.compressToBase64(jsonValue); // LZ4 壓縮
long executeTime = System.currentTimeMillis() + retryDelays[0] * 1000;
// Lua 原子操作:同時寫入 ZSet(時間排序)和 Hash(資料儲存)
redissonUtils.executeLuaScriptAsLongUseStringCodec(
addToRetryQueueScript,
Arrays.asList(C2C_MSG_RETRY_QUEUE, C2C_MSG_RETRY_INDEX),
compressedValue, String.valueOf(executeTime), packet.getMsgId()
);
}
重試策略:漸進式延遲,5s → 30s → 300s,共 3 次重試:
java
@Value("${im-server.c2c.retry.max-retries:3}")
private int maxRetries;
@Value("${im-server.c2c.retry.delays:5,30,300}")
private String delaysConfig; // 5 秒、30 秒、5 分鐘
定時掃描:@Scheduled 每秒執行一次,掃描 ZSet 中到期的消息:
java
@Scheduled(fixedRateString = "${im-server.c2c.retry.scan-interval:1000}")
public void scanRetryQueue() {
// 第一步:Lua 原子認領(ZRANGEBYSCORE + ZREM)
List<String> expiredMsgIds = redissonUtils.executeLuaScriptAsStringListUseStringCodec(
claimRetryMessagesScript, ...);
// 第二步:批次獲取 Hash 中的壓縮資料
Map<String, String> compressedDataMap =
redissonUtils.batchGetHashWithStringCodec(...);
// 第三步:解壓、按用戶分組、非同步處理
for (Map.Entry<String, List<C2CMsgRetryEvent>> entry : groupedByUserId.entrySet()) {
CompletableFuture.runAsync(
() -> processRetryBatch(toUserId, userRetryEvents), retryExecutor);
}
}
重試時檢查接收者是否在線:在線就重新推送並再次加入重試佇列;離線或超過最大重試次數,則標記為離線消息透過 RocketMQ 存入持久化佇列。
程式碼位置:
im-connect/.../service/impl/C2CMsgRetryServiceImpl.java
每條消息有兩個 ID:
雙 ID 設計兼顧了客戶端去重和服務端排序兩個不同需求。
除了基本的消息收發,系統還支援幾個增強功能:
ReadReceiptProtoStrategyImpl):客戶端打開聊天後發送一個 lastReadMsgId 水位線,服務端按 msgId ≤ lastReadMsgId 批次標記已讀,MySQL 和 MongoDB 同時更新TypingStatusProtoStrategyImpl):A 正在輸入時,B 能即時看到「對方正在輸入...」WithdrawMsgSendProtoStrategyImpl):支援撤回已發送的消息,帶時間校驗消息流轉流程圖:

多視窗調試場景實拍(IDE + 終端機 + 日誌):

調試這些功能比實作它們更花時間——比如已讀回執,需要同時觀察客戶端的消息狀態變化、服務端的 ACK 處理邏輯、Redis 中的已讀標記是否正確更新,一個問題可能涉及三四個環節。多視窗同時打開不顯得擁擠,144Hz 切換視窗沒有拖影。這種一盯就是幾個小時的 debug 場景,護眼認證是真的能感受到差別——同樣的時長,眼睛疲勞感明顯減輕。
低藍光模式效果:

群聊消息的存儲模型有一個經典的設計選擇:讀擴散還是寫擴散。
我的方案採用的是讀擴散模式。消息只存一份到 MongoDB(透過 RocketMQ 非同步寫入),客戶端拉取群聊歷史時根據 chatId(格式:bizType-chatType-groupId,如 100-2-789)+ 時間範圍分頁查詢。這樣寫入效率最高,而且 MongoDB 分片叢集的範圍查詢能力足以支撐讀側的壓力。
群聊消息和單聊的最大區別在於:一條消息需要推送給群內所有在線成員。
我的實作方案是利用 RocketMQ 的廣播消費模式:
im-connect 節點都以廣播模式消費這條消息im-connect 節點檢查本地有哪些群成員在線,逐一推送這種方案的好處是:im-connect 節點不需要知道全域的群成員在線分布,只需要關注本地連線的用戶,邏輯簡單清晰。
程式碼位置:
im-connect/.../strategy/impl/group/GroupMsgSendProtoStrategyImpl.java
GroupMsgWithdrawProtoStrategyImpl):和單聊撤回類似,帶時間校驗,撤回後通知所有在線群成員GROUP_MENTION):支援 @特定成員,被 @ 的成員會收到特殊通知群消息廣播的核心邏輯和運行時的終端日誌:

IM 系統的資料特點決定了需要多種存儲引擎配合:
存儲用途選擇原因MySQL用戶、好友、群組、會話等關係資料交易一致性要求高,關係查詢成熟MongoDB消息記錄(分片叢集)高寫入吞吐,Schema 靈活,適合非結構化消息體Elasticsearch消息全文搜尋倒排索引,支援斷詞搜尋和高亮Redis快取、未讀計數、離線消息、分散式鎖記憶體級讀寫性能,支援豐富的資料結構MinIO圖片、語音、視頻等檔案存儲相容 S3 協議,自部署,成本低這裡特別說一下消息存儲的選型。這條路上我走了三次彎路:第一版用 MySQL 存消息,資料量上來後做了分庫分表,但 IM 消息的高頻寫入很快把 MySQL 打到瓶頸;第二版遷移到 HBase,寫入效能上去了,但 IM 的查詢模式(按會話拉取歷史消息、按時間範圍查詢、按使用者 ID 查最近 N 條)和 HBase 的 scan 模型並不匹配——HBase 擅長基於 rowkey 的精確查找,而 IM 消息更多是範圍查詢和分頁查詢。
遷移到 MongoDB 後,利用其靈活的 Schema 和豐富的查詢能力,消息存儲和查詢都順暢了很多。目前 MongoDB 部署為分片叢集架構,配合智慧查詢路由(Smart Query Router),可以根據查詢條件自動選擇最優的分片和索引,避免全分片掃描。
消息寫入路徑:
客戶端發消息 → im-connect → RocketMQ → im-business → MySQL(關係資料)+ MongoDB(消息體)
↓(非同步)
im-data-sync → Elasticsearch
MySQL 存儲消息的關係資料(發送者、接收者、時間、狀態),MongoDB 存儲消息的完整內容(文字、附件、擴充欄位)。兩者配合實現高效的消息存儲和查詢。
搜尋功能透過 CQRS(Command Query Responsibility Segregation,命令查詢職責分離)模式實現——簡單說就是寫入和讀取用不同的存儲引擎,各管各的:
im-data-sync 消費 RocketMQ 消息,非同步同步到 Elasticsearch程式碼位置:
im-data-sync模組的DataSyncMessageConsumer
分屏場景實拍(左半屏程式碼,右半屏資料庫客戶端):

日常開發中,同時管理 MySQL、MongoDB、Redis、ES 四種資料源的連線和調試是家常便飯。PBP 分屏模式下左半屏寫程式,右半屏開資料庫客戶端查看資料,不用 Alt+Tab 切來切去,效率高很多。需要對比兩端資料時,PIP 畫中畫也很方便——主屏寫程式,小窗監控 Redis 狀態。
客戶端技術選型糾結過一陣子。IM 客戶端需要同時支援 Android 和 iOS,原生開發意味著要寫兩套程式碼,維護成本翻倍。React Native 和 Flutter 是兩個主要的跨平台選項。
最終選擇 Flutter 有幾個原因:
當然,最現實的原因是——找不到合適的 app 端同學來幫忙,我決定自己寫。Flutter 的學習曲線對後端開發來說比較友好,Dart 的語法和 Java/Kotlin 有很多相似之處。從一個純後端開發到能獨立寫出完整的 Flutter 客戶端和 IM SDK,大概花了兩個月時間。
日常就是左邊開 IDEA 寫後端 Java 程式碼,右邊寫 Flutter 客戶端和 SDK,雙端聯調:

Flutter 客戶端採用 GetX 做狀態管理、路由和依賴注入,遵循 Binding + Controller + View 的分層模式。
啟動時註冊一系列全域服務:
dart
Future<void> initServices() async {
Get.put(ThemeController());
Get.put(AppData());
final imSdkManager = Get.put(IMSDKManager());
await imSdkManager.init(); // IM SDK 初始化
final mobPushService = Get.put(MobPushService());
await mobPushService.init(); // 推送服務初始化
Get.put(BadgeService()); // 角標服務
Get.put(ChatBackgroundService()); // 聊天背景
Get.put(LiveKitService()); // 音視頻服務
Get.put(CallAudioService()); // 通話音效
Get.put(CallManagerService()); // 通話管理
}
App 生命週期管理也很重要——前後台切換時需要正確處理 WebSocket 連線:
dart
void didChangeAppLifecycleState(AppLifecycleState state) {
switch (state) {
case AppLifecycleState.resumed:
// 回到前台:檢查 WebSocket 狀態,斷連則重連
Get.find<IMSDKManager>().setAppInBackground(false);
if (wsStatus != WebSocketStatus.connected) {
IMSDKManager.to.connect();
}
case AppLifecycleState.paused:
// 進入背景:標記背景狀態,連線保持不斷(用於接收推送)
Get.find<IMSDKManager>().setAppInBackground(true);
case AppLifecycleState.detached:
// 程序終止:清理資源
}
}
程式碼位置:
xzll-im-flutter-client/lib/main.dart
自研的 IM SDK(xzll_im_sdk)是獨立 Flutter Package,封裝了所有 IM 能力,客戶端透過它來收發消息。
SDK 採用工廠單例模式,確保全域只有一個實例:
dart
static final XZLLIMClient _instance = XZLLIMClient._internal();
factory XZLLIMClient() => _instance;
XZLLIMClient._internal();
斷線後自動重連,使用指數退避策略(1s → 2s → 4s → 8s → ... → 最大 30s),背景狀態下直接等 30s:
dart
void _scheduleReconnect({bool immediate = false}) {
if (_isManualDisconnect) return; // 手動斷開不重連
_reconnectAttempts++;
Duration delay;
if (immediate) {
delay = Duration.zero;
} else if (_isAppInBackground) {
delay = Duration(seconds: 30); // 背景 30 秒
} else {
final exponent = _reconnectAttempts - 1;
final delaySeconds = (1 << exponent).clamp(1, 30); // 2^n,上限 30s
delay = Duration(seconds: delaySeconds);
}
_reconnectTimer = Timer(delay, () => _performReconnect());
}
網路重傳和重試機制可能導致重複消息。SDK 使用記憶體快取做去重,2 分鐘 TTL,最多快取 1000 條:
dart
final Map<String, int> _messageCache = {};
static const int _maxCacheSize = 1000;
static const int _messageCacheTtlMs = 2 * 60 * 1000; // 2 分鐘
bool _isDuplicateMessage({String? msgId, String? clientMsgId}) {
_evictExpiredMessageCacheEntries();
if (msgId != null && _messageCache.containsKey('s:$msgId')) return true;
if (clientMsgId != null && _messageCache.containsKey('c:$clientMsgId')) return true;
return false;
}
這是 SDK 中一個比較巧妙的設計。ChatSessionManager 追蹤當前打開的聊天會話,SDK 自動對當前聊天發送已讀 ACK——用戶完全無感知:
dart
class ChatSessionManager {
final XZLLIMClient _imClient;
final String _chatId;
bool _isClosed = false;
ChatSessionManager._({required XZLLIMClient imClient, required String chatId})
: _imClient = imClient, _chatId = chatId {
_imClient.setCurrentOpenChatId(_chatId); // 打開聊天時自動設定
}
void close() {
if (_isClosed) return;
_isClosed = true;
_imClient.setCurrentOpenChatId(''); // 關閉時清空
}
}
客戶端使用方式很簡單:
dart
// 進入聊天頁面
final session = XZLLIMClient().openChatSession(chatId);
// ... 收發消息 ...
// 離開聊天頁面
session.close();
打開聊天會話後,SDK 收到的該會話消息會自動發送已讀 ACK,不需要手動呼叫。
程式碼位置:
xzll-im-flutter-sdk/lib/src/core/chat_session_manager.dart
網路斷開時,發送失敗的消息不會丟失。SDK 用 _pendingSends Map 保存待重發的消息,重連後自動 flush:
dart
final Map<String, _PendingSendTask> _pendingSends = {};
Future<void> _flushPendingMessages() async {
if (_pendingSends.isEmpty) return;
final tasks = _pendingSends.values.toList();
for (final task in tasks) {
await _executeRetrySend(task); // 串行 flush,避免 WebSocket 擁堵
}
}
消息級別也有重試機制,每條消息最多重試 3 次,指數退避(2s → 4s → 8s):
dart
void _scheduleMessageRetry(_PendingSendTask task) {
if (task.retryCount >= _maxMessageRetry) {
_markMessageFailed(task.clientMsgId, task.chatId);
return;
}
final delaySeconds = 2 << task.retryCount; // 2s, 4s, 8s
_messageRetryTimers[task.clientMsgId] = Timer(Duration(seconds: delaySeconds), () {
_executeRetrySend(task);
});
}
程式碼位置:
xzll-im-flutter-sdk/lib/src/core/xzll_im_client.dart
檔案存儲使用 MinIO(自部署,相容 S3 協議),支援圖片、語音、視頻上傳。SDK 中的 MediaDownloadManager 提供非同步下載能力,透過 Stream 通知 UI 層下載進度和完成狀態。
程式碼位置:
xzll-im-flutter-sdk/lib/src/media/
音視頻通話是正在開發的功能,方案設計如下:
CallManagerService 管理通話的完整生命週期信令重用 WebSocket 長連線,不需要額外建立連線,減少了延遲和複雜度。
Flutter 開發需要同時跑 Android 和 iOS 兩端除錯,加上後端服務,設備多了管理起來麻煩。USB-C 65W 反向供電一線連 MacBook,筆記型電腦充電和視訊輸出一條線搞定。KVM 切換功能在多設備除錯時也很實用——同一套鍵鼠控制 MacBook 和另一台測試機,不用來回切換,客戶端介面在 2K 螢幕上各種細節看得清清楚楚。
USB-C 接口還能直接給手機反向充電,除錯時手機不用找充電器:

從 2022 年與 IM 結緣到現在,經歷了多次技術選型推翻和重寫,每一次推倒重來都很痛苦,但「不落地的設計不是好設計」這句話一直驅動著我——方案好不好,只有真正跑起來才知道。
目前這個 IM 系統已經具備了微信的基礎通訊能力:單聊、群聊、好友系統、多媒體檔案發送,音視頻通話正在基於 LiveKit 開發中。整個系統有 104+ 個 API 介面、約 25 張資料表、22 個 Flutter 頁面。
這個專案還有很多事情要做:
目前正在準備用 Gatling 做系統級的壓力測試,後續會分享詳細的壓測資料和優化過程。除此之外:
1. LiveKit 音視頻通話完善
目前信令層(CALL_OFFER / CALL_ANSWER / CALL_REJECT / CALL_END)已經透過 WebSocket 實現,CallManagerService 也搭建好了生命週期管理框架。接下來要完成的是:
2. 百萬連線壓測
1 萬用戶只是熱身。下一步目標是:
3. 整體效能優化
4. UI 優化
5. 社交功能
im-social 模組已搭建好基礎框架(圈子、貼文、活動、話題、打招呼)6. SDK 與 UI 分離:一套內核,無限可能
這個專案從設計之初就把 SDK 和 UI 徹底分離了。xzll-im-flutter-sdk 是一個獨立的 Flutter Package,封裝了全部 IM 能力——WebSocket 連線管理、Protobuf 序列化、消息收發、自動重連、離線佇列、去重、ACK,全部在 SDK 層完成。而 xzll-im-flutter-client 只是一個基於 SDK 的 UI 實作。
這意味著什麼?拿這套 SDK 換一層 UI 外殼,就是一個全新的產品。
幾個可行的方向:
IM 的底層能力(連線、消息、可靠性)是通用的,差異只在業務層和 UI 層。SDK 和 UI 分離後,上層業務可以自由定制,不用重複造輪子。
7. AI + IM:智慧化方向
AI 是 IM 系統最天然的結合點之一,也是我後續最想探索的方向:
這些方向之所以可行,正是因為 SDK 層已經把 IM 的底層能力全部封裝好了,AI 只需要對接消息的輸入輸出,不需要關心連線管理、消息可靠性這些複雜邏輯。
前面聊了不少技術實作,最後結合整個專案的開發體驗,補充兩個 RD270Q 最讓我印象深刻的點。
編碼模式確實是這台顯示器最核心的功能。專門為程式設計優化的顯示模式 + 高對比度,整個專案開發下來,深色主題下的程式碼可讀性提升非常明顯。
Eye Care 夜間保護是深夜編碼的加分項——這個專案大部分程式碼都是深夜寫的,夜間防護模式可以調到極低亮度,螢幕光線柔和不刺眼,搭配 MoonHalo 背面光環營造的氛圍,凌晨兩三點寫程式的體驗和白天一樣好。
Eye Care 夜間保護設定:

RD270Q 整體外觀——27 吋 2K 編程顯示器,正面和側面:

直屏模式——橫豎切換單手搞定,不需要任何工具:

說實話,專案做出來是一回事,怎麼變現是另一回事。這也是我目前最頭疼的問題——技術做完了,但怎麼讓技術產生商業價值?
目前想到的幾個方向,但都不太確定:
但說實話,這些方向都還在摸索中,沒有哪個跑通了。白天上班晚上寫程式,最難的不是技術——技術再難熬幾個通宵總能啃下來——而是找到一個值得投入時間的方向。
所以想問問各位讀者——如果你是我,你會怎麼變現這樣的專案?歡迎在留言區聊聊你的想法,也許你的一句話就能點醒我。
另外,如果有同學對這個專案感興趣,想一起開發,或者有好的變現思路想聊聊,歡迎私訊我或者在文章底下留言,一個人的力量終究有限,期待和你交流。
這個專案之前是完全開源的,但由於一些商業化考量,目前已經閉源,程式碼託管在我個人的 GitLab 上。不過早期的開源版本仍然可以訪問:開源地址:github.com/598572/xzll…
我是蝎子萊萊愛打怪,全網同名,歡迎關注我的公眾號。
文中使用的明基 RD270Q 為 27 英寸 2K 編碼顯示器,144Hz 高刷新率,支援編碼模式、分區對比度、六大護眼認證、USB-C 65W 供電、KVM 多設備切換,適合長時間編碼的開發者使用。