1 架构设计演变
(1) 1.0

缺点: 没有必要把app server和 im server分开, 然后强行通过rpc来通信
(2) 2.0

缺点: 未采用微服务架构, 不方便协同开发,
有其它服务需要服务端主动向客户端推送消息,为了减少重复开发,需要微服务。
(3) 3.0

缺点: 当用户量过多, 一台服务器支持不了这么多websocket长连接时, 无法进行水平扩展
(4) 4.0 (当前架构)

2 技术难点
(1) 如何保证消息不丢失?

在这个过程中,丢失消息有以下几种情况:
- 因为网络不通等原因导致用户A把消息发送到IM服务器失败;
- IM服务器存储消息失败;
- 用户A在超时时间内未收到IM服务器返回的结果;
- 由于IM服务器断电等原因导致消息未能成功推送给用户B
解决方案:
- 前三种情况(http),用户A将被提示消息发送失败;用户A可自行选择是否重发
- 后一种情况(ws),用户B未收到消息. websocket API可判断出消息发送是否成功, 未发送成功, 则断开ws连接, 视为用户B离线, 用户B下次建立连接后自行通过http拉取离线消息
(2) 如何保证消息不重复?
- 重复存储
在上述步骤3中, 由于网络原因导致 用户A没收到响应成功, 这时触发重传机制, 然后server端会再收到一次该消息导致重复存储
解决方案:
客户端发送消息时为每个消息生成一个uuid发给服务端, 服务端存储时若数据库报错uuid已存在, 则不再把消息放入消息队列, 从而不会重复存储
- 重复推送
server端推送给用户B消息时, 若推送失败, 这部分代码是我们自己写的, 可以不重复推送, 直接断开ws连接, 视为用户B离线,
而且就算断线重连后, 拉取离线消息可能重复, client端消息采用插入机制, 若时间戳相同, 且uuid相同, 则不插入重复收到的消息
(3) 如何保证消息的时序一致性?
什么是时序一致性?
对于点对点的聊天场景,时序一致性保证接收方的接收顺序和发送方的发出顺序一致; 对于群聊场景,时序一致性保证所有接收人看到的消息展现顺序一致。
时序一致性的难点?
多发送方、多接收方、服务端多线程并发处理情况下,无法保证时序一致性。 分布式环境下,多个机器的本地时钟不一致,没有“全局时钟”,不能用“本地时间”保证时序的一致性。
其实有”全局时钟”, unix时间戳就是全球统一的, 不随时区变化
- 单聊, 如何保证一致性?
发送方A在某群内依次发出了msg1,msg2,msg3, 如何保证接收方B的群内消息显示顺序也是1,2,3呢?
在A往B发出的消息中,加上发送方A本地的一个绝对时序,来表示接收方B的展现时序, 如果接收方B先收到msg3,msg3会先展现,后收到msg1和msg2后,会展现在msg3的前面。这意味着, 前端不能直接append, 而是从后往前找到合适的位置插入接收到的消息
- 群聊, 如何保证一致性?
N个群友在一个群里聊,怎么保证所有群友收到的消息显示时序一致?
不能再利用发送方的seq来保证时序,因为发送方不单点,时间也不一致
可以利用数据库的入库unix时间来统一时间
综上, 最终方案, 前端根据消息的入库时间的unix时间戳来对收到的消息进行排序
(4) 群聊的消息是读扩散, 还是写扩散?
- 读扩散: 群内消息只存一份, 订阅了该发布者的用户,从内容发布者处获取数据。当订阅者过多时,存在读瓶颈。适用于写多,读少场景。
- 写扩散:内容发布者会将每次发布的数据推送到每个接收者处,接收者只需要从自己处读取数据便可。当订阅者过多时,存在写瓶颈。适用于读多,写少场景。

群聊的消息 写少读多, 所以采用读扩散
聊天消息 在 kafka中作为一个topic, 分区只有一个, 但可以有多个副本, 副本数=kafka的broker数
每个用户上线后, message-api都要查看ta在哪些组里, 给它加入到这些组名对应的channel里, 后面收到消息, 直接取出channel里的消息广播给所有在线client即可
(5) 如何保证所有聊天信息的完整性?
- 用户先调用user_group_list, 获取自己的group_list
- 然后调用message 的 pull, 对每个group, 根据group_user中的last_msg_id, 获取到上次更新的消息id, 到message表里查找该group对应的消息, 且msg_id>last_msg_id的所有消息, 返回给用户
但是, 鉴于离线消息可能太多的情况, 每个群组的离线最多返回10条, 用户需要自己上拉刷新
(7) 用户A在server1上, 用户B在server2上, 用户A发给用户B的消息如何推送给用户B?
用户在获取到自己的群组后, 加入每个群对应的group, 方便websocket广播
用户A 上传到 某个群组 的消息先放到MQ中(生产者), message-api监听MQ, 取出消息广播给该room中的所有用户
(8) 如何实现消息漫游?
- QQ能够拉取到所有历史聊天消息,这个就是消息漫游。
- wechat目前只支持“多点登录”同时收发在线消息,没有实现“消息漫游”
- 本开源项目的设计是希望能够像qq一样实现消息漫游, 有两种方案
- 方案1: 用redis缓存要记录每个客户端(key=uid+平台) 拉取到的历史消息的最新时间戳, 每次上线后拉取离线消息都从这个时间后开始拉取
- 方案2(实际采用): 离线消息全部通过上拉获取, 每次上拉可获取10条, 这种方式最简单, 不用前端进行离线消息存储
(9) 数据量大之后如何分库分表?
数据量大之后, 消息表一定是最先突破2100w的,
可以按 消息表的主键id来分库分表
具体可以看看TiDB怎么做的(10) 用户量大后, ws长连接多了怎么处理?
msg-api水平扩容, 按每50w个同时在线用户 加一台机器每个tcp缓冲区消耗的内存大约是4KB, 在golang中, 每个tcp会分配一个协程, 最小2KB 平均一个websocket, 大约是6KB以上, 就按10KB来算吧 一台机器内存为8GB, 按msg-api最大能用5GB算吧 xx = 5GB / 10KB = 50w
3 技术栈
- Redis: 缓存, 存储在线用户uid, 和 与用户建立websocket长连接的接入层server的addr
- Mysql: 存储用户信息, 聊天信息, 群组信息
- kafka: 逻辑层 与 接入层的解耦, 异步, 传递在线消息
- OSS: 存储图片,视频等附件
4 User服务
(1) 数据表
| id | user表主键, 自增 | | —- | —- | | email | unique | | password | | | nick_name | | | gender | 1男, 2女 | | avator_url | | | create_time | timestamp |
(2) 协议
syntax = "proto3";option go_package = "./proto";// 注册message RegisterRequest {string email = 1;string password = 2;string nickName = 3;int64 gender = 4;}message RegisterResponse {}// 登录message LoginRequest {string email = 1;string password = 2;}message LoginResponse {string accessToken = 1;int64 accessExpire = 2;}// 个人信息message PersonalInfoRequest {int64 id = 1;}message PersonalInfoResponse {string nickName = 1;int64 gender = 2;string email = 3;}service UserClient {rpc Login(LoginRequest) returns(LoginResponse);rpc Register(RegisterRequest) returns(RegisterResponse);rpc PersonalInfo(PersonalInfoRequest) returns(PersonalInfoResponse);}
5 Message服务
(1) 数据表
msg_chat聊天消息表, 持久化存放全部消息(无论在线离线)
| id | 自增, uint64 |
|---|---|
| group_id | 分组id, 两人聊天: A_B, 多人聊天: A_timestamp |
| msg_type | 枚举值(1系统, 2文本, 3文件, 4语音, 5链接) |
| sender_id | 消息发送者的用户id |
| content | 消息内容, 最多65535个字节 |
| create_time | 创建时间 (服务器时间戳), 每次返回给前端的是Unix时间戳的毫秒 |
(2) 协议
syntax = "proto3";option go_package = "./proto";// 聊天消息结构message ChatMsg {int64 groupId=1; // 群idint64 senderId=2; // 发送者uidint64 type=3; // 消息类型 0文本, 1图片, 2视频, 3语音string content=4; // 消息内容string uuid=5; // 作用是去重int64 createTime=6; // 创建时间}// 上传消息message UploadRequest {int64 groupId=1;int64 senderId=2;int64 type=3;string content=4;string uuid=5;}message UploadResponse {string uuid=1;int64 createTime=2; // 创建时间}// 拉取消息message PullRequest {string email=1; // 用户邮箱string platform=2; // 设备信息int64 groupId=3; // 要拉取哪个群的消息}message PullResponse {repeated ChatMsg list=1;}service MessageClient {rpc Upload(UploadRequest) returns(UploadResponse);rpc Pull(PullRequest) returns(PullResponse);}
6 Group服务
(1) 数据表
group | id | 字符串, 两人的为uid1_uid2, 多人的为uuid | | —- | —- | | name | 群名称(默认是 用户A昵称,用户B昵称) | | type | 群类型, 单聊1, 群聊2 | | status | 0表示无效(未同意), 1表示有效, 2表示无效(拉黑) | | config | 群配置, json
{
“notice”: “群公告”,
“owner”: 2,
“curNum”: 2,
} | | create_time | 创建时间 |group_user | id | 自增 | | —- | —- | | group_id | 群id (index) | | user_id | 用户id (index) | | alias_name | 用户对该群(或该用户)的备注名 | | create_time | 进群时间 |
redis缓存
“uid:platform:group_id”: last_msg_id
(2) 协议
syntax = "proto3";option go_package = "./proto";// 添加好友message AddFriendRequest {int64 fromUid=1;int64 toUid=2;}message AddFriendResponse {string groupId=1; // 返回创建的这个新群, 但是这个群还处于删除态, 不可发聊天消息}// 处理好友申请message HandleFriendRequest {string groupId=1;bool isAgree=2;}message HandleFriendResponse {string groupId=1;}// 获取群内的用户message GroupUsersRequest {string groupId=1;}message GroupUsersResponse {repeated int64 list=1;}service GroupClient {rpc AddFriend(AddFriendRequest) returns(AddFriendResponse);rpc HandleFriend(HandleFriendRequest) returns(HandleFriendResponse);rpc GroupUsers(GroupUsersRequest) returns(GroupUsersResponse);}3) invite群内用户 可以 邀请用户进群4) remove群主 可以 把某用户移出群聊5) exit群内用户 可以自行退群, 群主会自动移交
7 前端技术难点
(1) 下拉刷新
### `pull_to_refresh` 下拉组件lib/pages/category/widgets/news_page_list.dart```dart@overrideWidget build(BuildContext context) {super.build(context);return GetX<CategoryController>(init: controller,builder: (controller) => SmartRefresher(enablePullUp: true,controller: controller.refreshController,onRefresh: controller.onRefresh,onLoading: controller.onLoading,child: CustomScrollView(slivers: [SliverPadding(padding: EdgeInsets.symmetric(vertical: 0.w,horizontal: 0.w,),sliver: SliverList(delegate: SliverChildBuilderDelegate((content, index) {var item = controller.state.newsList[index];return newsListItem(item);},childCount: controller.state.newsList.length,),),),],),),);}
controller: controller.refreshController 上下拉控制器
onRefresh: controller.onRefresh 下拉刷新数据
onLoading: controller.onLoading 上拉载入数据
SliverChildBuilderDelegate 动态构建每一项, childCount 告诉组件一共有多少数据
controller 中写入业务
lib/pages/category/controller.dart
onRefresh下拉刷新
void onRefresh() {fetchNewsList(isRefresh: true).then((_) {refreshController.refreshCompleted(resetFooterState: true);}).catchError((_) {refreshController.refreshFailed();});}
refreshController.refreshCompleted() 刷新完成
refreshController.refreshFailed() 刷新失败
onLoading上拉载入
void onLoading() {if (state.newsList.length < total) {fetchNewsList().then((_) {refreshController.loadComplete();}).catchError((_) {refreshController.loadFailed();});} else {refreshController.loadNoData();}}
refreshController.loadComplete() 载入完成
refreshController.loadFailed() 载入失败
refreshController.loadNoData() 没有数据
fetch所有数据
// 拉取数据Future<void> fetchNewsList({bool isRefresh = false}) async {var result = await NewsAPI.newsPageList(params: NewsPageListRequestEntity(categoryCode: categoryCode,pageNum: curPage + 1,pageSize: pageSize,),);if (isRefresh == true) {curPage = 1;total = result.counts!;state.newsList.clear();} else {curPage++;}state.newsList.addAll(result.items!);}
state.newsList.addAll(result.items!); 合并 list 集合 RxList 封装的
dispose记得释放
///dispose 释放内存@overridevoid dispose() {super.dispose();// dispose 释放对象refreshController.dispose();}
refreshController.dispose() 这个业务中就是下拉控件了,还有视频播放器、文本框啥的控制器都要记得释放。
```
参考资料
后端:
- 极客时间专栏课《即时消息技术剖析与实战》
- Github (go-websocket)
- Github (micro-message-system)
- Github (Open-IM-Server)
- Github (gowebsocket)
- Github (gin-chat-demo)
- 哔哩哔哩-如何设计10亿流量的IM系统
- 飞书-从0到1再到N,探索亿级流量的IM架构演绎
- 52im-海量在线用户的移动端IM架构设计实践
- 消息“时序”与“一致性”为何这么难?
- 细聊分布式ID生成方法
- 如何设计一个亿级消息量的 IM 系统
前端:
- 仅界面: https://gitee.com/laogede/flutter_wechat(视频教程)
- 完整源码: https://github.com/CoderMikeHe/flutter_wechat (前端2)
- 仿微信录制音频开源库:https://github.com/yxwandroid/flutter_plugin_record
- 微信图库:https://github.com/fluttercandies/flutter_wechat_assets_picker
- youtube教程: https://www.youtube.com/watch?v=Da8vg2RIFxM&list=PL-BBzWII3MPEYXtI6HgqVHGaL6lb4sA6e&index=4
