实现功能:
- 单人私聊(1:1)
- 群聊(rooms/channels)
- 群聊内
@某人(提及)并产生提醒 - 聊天记录(持久化、分页、检索)
- 在线/离线消息处理与离线推送(概要)
- 安全、扩展与性能建议
设计思路 → 数据模型 → WebSocket 事件协议 → 后端(Node.js + Socket.IO + MongoDB)完整示例 → 前端(HTML + JS + socket.io-client)示例 → 进阶与部署关注点
设计思路
传输层:用 WebSocket(建议用 socket.io,简化房间/重连/心跳/跨域等),也可以用原生 ws。
鉴权:登录后返回 JWT(或 session),WebSocket 连接时带 token 验证。
房间模型:
- 私聊:为每对用户建立“私聊房间 id”,例如
dm:<userAId>:<userBId>(id 可按小->大排序避免重复)。 - 群聊:使用
room:<roomId>。
- 私聊:为每对用户建立“私聊房间 id”,例如
消息持久化:消息写入 DB(MongoDB 推荐,文档型适合聊天),并记录
mentions字段数组(被 @ 的用户 id)。@ 实现:客户端在输入实时完成 username 列表(或 uid),发送消息时把消息体中
mentions: [userId...],服务器也进行一次解析检查(防伪)。离线/未读:在写入消息时记录哪些目标用户未读(unread map),或维护
lastReadAt,方便统计未读数与推送。消息拉取/分页:REST API 用
GET /rooms/:id/messages?before=<timestamp|id>&limit=20分页加载历史。通知:当消息中包含某人的 id(mentions),立刻向该用户推送
mention事件(如果在线),或在 DB 中记录未读并配合 Push Notification(APNs/FCM)发送移动通知。扩展/横向:使用 Redis pub/sub 或 socket.io-adapter(redis-adapter)进行多实例事件广播。
数据模型(MongoDB / Mongoose 风格示例)
// User
{
_id: ObjectId,
username: String, // 唯一
displayName: String,
avatarUrl: String,
passwordHash: String,
lastSeenAt: Date,
createdAt: Date
}
// Room
{
_id: ObjectId,
type: "dm" | "group",
members: [ObjectId],
name?: String, // 群名
ownerId?: ObjectId,
createdAt: Date,
meta: {}
}
// Message
{
_id: ObjectId,
roomId: ObjectId,
senderId: ObjectId,
content: String, // 原始文本(可存富文本或 markdown)
contentHtml?: String, // 可选:预渲染后的 html
mentions: [ObjectId], // 被 @ 的用户 id 列表
attachments?: [{ url, type, filename }],
createdAt: Date,
editedAt?: Date,
deleted?: Boolean,
readBy?: [{ userId: ObjectId, at: Date }] // 或在另表维护 read 状态
}WebSocket / 事件协议(建议)
统一使用 JSON 事件对象 { type, payload } 或 socket.io 的事件名方式。下面用 socket.io 事件名表述。
客户端事件(client → server):
auth:连接后发送 token(若用 socket.io 的 auth 选项可省略)。join_room{ roomId }:加入房间(server 验证权限)。leave_room{ roomId }send_message{ roomId, content, mentions: [userId], attachments?: [...] }typing{ roomId, isTyping: true|false }fetch_messages{ roomId, beforeId?, limit? }(也可用 REST)
服务器事件(server → client):
message{ message }(实时下发)message_ack{ tempId, messageId }(可用于客户端回显确认)typing{ roomId, userId, isTyping }mention{ roomId, messageId, fromUserId }(专门的提及提醒)presence{ userId, online }或presence_updatehistory{ messages: [...] }(回应 fetch_messages)error{ code, message }
后端示例(Node.js + Express + Socket.IO + Mongoose)
下面是一个最小但可运行的示例,涵盖鉴权、加入房间、发消息、保存 DB、@ 通知、历史分页。
先 npm init -y,安装:
npm i express socket.io mongoose jsonwebtoken bcryptjs cors示例 server.js:
// server.js
const express = require('express');
const http = require('http');
const mongoose = require('mongoose');
const jwt = require('jsonwebtoken');
const { Server } = require('socket.io');
const cors = require('cors');
const JWT_SECRET = process.env.JWT_SECRET || 'change_this';
const MONGO = process.env.MONGO || 'mongodb://localhost:27017/chatapp';
mongoose.connect(MONGO, { useNewUrlParser: true, useUnifiedTopology: true });
/* ============ Mongoose Schemas (简化) ============ */
const UserSchema = new mongoose.Schema({
username: { type: String, unique: true },
displayName: String,
passwordHash: String,
createdAt: { type: Date, default: Date.now }
});
const RoomSchema = new mongoose.Schema({
type: { type: String, enum: ['dm','group'] },
members: [{ type: mongoose.Schema.Types.ObjectId, ref: 'User' }],
name: String,
createdAt: { type: Date, default: Date.now }
});
const MessageSchema = new mongoose.Schema({
roomId: { type: mongoose.Schema.Types.ObjectId, ref: 'Room' },
senderId: { type: mongoose.Schema.Types.ObjectId, ref: 'User' },
content: String,
mentions: [{ type: mongoose.Schema.Types.ObjectId, ref: 'User' }],
createdAt: { type: Date, default: Date.now }
});
const User = mongoose.model('User', UserSchema);
const Room = mongoose.model('Room', RoomSchema);
const Message = mongoose.model('Message', MessageSchema);
/* ============ Express ============ */
const app = express();
app.use(cors());
app.use(express.json());
// 简化登录接口(真实请加密码校验)
app.post('/login', async (req, res) => {
// req.body: { username }
const { username } = req.body;
let user = await User.findOne({ username });
if (!user) {
user = await new User({ username, displayName: username }).save();
}
const token = jwt.sign({ userId: user._id }, JWT_SECRET, { expiresIn: '7d' });
res.json({ token, user: { _id: user._id, username: user.username, displayName: user.displayName } });
});
// 获取房间历史(分页)
app.get('/rooms/:roomId/messages', async (req, res) => {
const { roomId } = req.params;
const limit = Math.min(50, parseInt(req.query.limit || 20));
const before = req.query.before; // message id
const q = { roomId };
if (before) q._id = { $lt: before };
const msgs = await Message.find(q).sort({ _id: -1 }).limit(limit).lean();
res.json({ messages: msgs.reverse() }); // 返回正序
});
const server = http.createServer(app);
const io = new Server(server, {
cors: { origin: '*' }
});
/* ============ Socket Auth Middleware ============ */
io.use(async (socket, next) => {
const token = socket.handshake.auth?.token || socket.handshake.query?.token;
if (!token) return next(new Error('auth error'));
try {
const payload = jwt.verify(token, JWT_SECRET);
socket.userId = payload.userId;
const user = await User.findById(socket.userId);
socket.user = user;
return next();
} catch (err) {
return next(new Error('auth error'));
}
});
/* ============ Socket Handlers ============ */
io.on('connection', (socket) => {
console.log('connected', socket.user.username, socket.userId);
// join a room
socket.on('join_room', async ({ roomId }) => {
const room = await Room.findById(roomId);
if (!room) return socket.emit('error', { message: 'room not found' });
// 权限校验:是否为成员(简化)
if (!room.members.map(m => m.toString()).includes(socket.userId.toString())) {
return socket.emit('error', { message: 'no permission' });
}
socket.join(roomId);
socket.emit('joined', { roomId });
});
// send message
socket.on('send_message', async ({ roomId, content, mentions = [] , tempId }) => {
// basic checks
if (!content || !roomId) return;
// verify room membership
const room = await Room.findById(roomId);
if (!room) return socket.emit('error', { message: 'room not found' });
if (!room.members.map(m => m.toString()).includes(socket.userId.toString())) {
return socket.emit('error', { message: 'no permission' });
}
// create message
const msg = await new Message({
roomId,
senderId: socket.userId,
content,
mentions
}).save();
// broadcast to room
const payload = {
_id: msg._id,
roomId,
senderId: socket.userId,
content,
mentions,
createdAt: msg.createdAt
};
io.to(roomId).emit('message', payload);
// ack to sender with mapping if client used tempId for optimistic UI
socket.emit('message_ack', { tempId, messageId: msg._id });
// fire mention events to mentioned users if they are online
for (const uid of mentions) {
// find sockets of that user (could be multiple devices)
const sockets = await io.fetchSockets();
for (const s of sockets) {
if (s.userId.toString() === uid.toString()) {
s.emit('mention', { roomId, messageId: msg._id, from: socket.userId, content });
}
}
// 如果离线:这里可写入 unread/notification 表,或推送 FCM/APNs
}
});
// typing
socket.on('typing', ({ roomId, isTyping }) => {
socket.to(roomId).emit('typing', { roomId, userId: socket.userId, isTyping });
});
socket.on('disconnect', () => {
console.log('disconnect', socket.user?.username);
// 可广播 presence 更新
});
});
server.listen(3000, () => console.log('listening 3000'));运行方法:
node server.jsPOST /login得到token,然后前端io({ auth: { token }})连接。
前端最小示例(HTML + socket.io-client)
安装:在前端页面引入 socket.io-client(CDN 或 bundler)。
示例 index.html:
<!doctype html>
<html>
<head><meta charset="utf-8"><title>Chat Demo</title></head>
<body>
<div>
<input id="token" placeholder="paste token from /login" style="width:400px"/>
<button id="connectBtn">Connect</button>
</div>
<div id="chat" style="display:none;">
<div>
Room: <input id="roomId" value="" />
<button id="join">Join</button>
</div>
<div id="messages" style="height:300px;overflow:auto;border:1px solid #ccc;padding:8px;"></div>
<div>
<input id="msg" style="width:70%" placeholder="@username to mention" />
<button id="send">Send</button>
</div>
</div>
<script src="https://cdn.socket.io/4.7.2/socket.io.min.js"></script>
<script>
let socket;
document.getElementById('connectBtn').onclick = () => {
const token = document.getElementById('token').value;
socket = io('http://localhost:3000', { auth: { token }});
socket.on('connect', () => {
console.log('connected', socket.id);
document.getElementById('chat').style.display = 'block';
});
socket.on('message', (m) => {
const el = document.createElement('div');
el.innerText = `[${new Date(m.createdAt).toLocaleTimeString()}] ${m.senderId}: ${m.content}`;
document.getElementById('messages').appendChild(el);
});
socket.on('mention', (x) => {
const el = document.createElement('div');
el.style.background = '#ffc';
el.innerText = `MENTION from ${x.from} in ${x.roomId}: ${x.content}`;
document.getElementById('messages').appendChild(el);
});
};
document.getElementById('join').onclick = () => {
const roomId = document.getElementById('roomId').value;
socket.emit('join_room', { roomId });
};
document.getElementById('send').onclick = () => {
const roomId = document.getElementById('roomId').value;
const content = document.getElementById('msg').value;
// 简化:客户端解析 @username => 替换为 userId (生产需先 resolve username->id)
// 这里示例直接不解析 mentions
socket.emit('send_message', { roomId, content, mentions: [] , tempId: 't'+Date.now()});
};
</script>
</body>
</html>注意:实际生产中,客户端发送消息前 必须 把
@username解析成userId并在mentions数组中传给后端(可通过输入时调用 autocomplete 接口GET /users?query=...)。
@mentions 的实现细节
- 客户端:在输入框实现
@触发 autocomplete,返回匹配用户名与 id(REST:GET /users?query=...)。 - 发送消息:消息 payload 含
mentions: [{ userId, username, indexStart, indexEnd }](index 用于前端高亮)。 - 后端:校验
mentions中的 userId 是否确实是房间成员并在 content 中实际存在(防止伪造)。把mentions存到 message 文档。然后给这些用户下发mention事件(若在线)。 - 通知:若用户离线,推送移动通知或邮件。并在 UI 上显示未读 mention 列表。
存储未读与已读
两种常见方案:
readBy字段内存每个人阅读时间(适合小群);写入成本高于查询成本。user_rooms表维护lastReadAt(或lastReadMessageId),通过比对消息时间/ID 判断未读数(更常见且可扩展)。
安全与性能建议(精通部分)
- 鉴权:WebSocket 握手时强制 jwt 验证;所有 socket 事件在服务器端再做权限检查(不要信任客户端)。
- 速率控制:对
send_message限流,防止刷屏(Redis 令牌桶)。 - 内容过滤:进行敏感词过滤 / XSS 过滤(例如把 contentHtml 用库安全渲染)。
- 防重复/幂等:客户端发消息带
tempId,服务器去重后返回message_ack,客户端替换回临时消息。 - 水平扩展:部署多实例时使用
socket.io-redisadapter,消息与 presence 同步靠 Redis pub/sub。 - 存储与索引:在 Message collection 建立
roomId + createdAt索引,查询历史与分页高效。 - 备份与归档:对过期消息做归档或冷存,避免主库膨胀。
- 消息审计/合规:若有合规需求,保留完整审计日志、删除策略和导出功能。
- 文件/大附件:文件不直接上传 DB,使用 object storage(S3),把元数据与 url 存在 message 文档,并为私有文件生成临时访问 token。
- 隐私:对敏感字段加密(例如某些企业需加密内容)。
进阶功能建议(可选)
- 富文本、表情、回复(引用 messageId)、消息撤回、编辑。
- 在线/离线设备推送(FCM/APNs)。
- 事件溯源(Event sourcing)或 Kafka 消息总线用于复杂场景。
- 搜索(Elasticsearch)用于聊天内容搜索。
- Typing indicator、read receipts(已读回执)、用户在线列表。
- 群管理(邀请、踢出、权限分层、公告、置顶消息)。
部署 & 运维要点
- 在生产用 HTTPS + WSS。
- 使用负载均衡(ALB/Nginx)并开启 sticky session(如果不使用 redis adapter),推荐使用 redis adapter 则不用 sticky。
- 监控:连接数、消息吞吐、延迟、错误率、内存/GC。
- 灰度、回滚策略、日志采集(结构化日志)。