说明:用fastapi+tcp+android实现在线聊天,测试完成
效果图:
客户端1:
(.venv) PS C:\Users\wangrusheng\PycharmProjects\FastAPIProject> python client.py
请输入用户名: a
输入消息(格式:@用户ID 内容 或 直接输入内容):
==================================================
在线用户列表:
0c868691 | a
==================================================
[2025-03-15 08:26:31] 系统通知: a 进入聊天室
==================================================
在线用户列表:
0c868691 | a
1add816b |
==================================================
[2025-03-15 08:26:36] 系统通知: 进入聊天室
==================================================
在线用户列表:
0c868691 | a
==================================================
[2025-03-15 08:26:41] 系统通知: 已离开
==================================================
在线用户列表:
0c868691 | a
466947dd | b
==================================================
[2025-03-15 08:26:47] 系统通知: b 进入聊天室
[2025-03-15 08:26:52] 公共消息 b: hello
good boy
输入消息(格式:@用户ID 内容 或 直接输入内容):
[2025-03-15 08:27:05] 公共消息 a: good boy
[2025-03-15 08:27:25] 私聊来自 b: what is your name
@466947dd i name is bob
输入消息(格式:@用户ID 内容 或 直接输入内容):
[2025-03-15 08:27:46] 私聊来自 a: i name is bob
==================================================
在线用户列表:
0c868691 | a
466947dd | b
74cfb061 | UserA
==================================================
[2025-03-15 08:40:37] 系统通知: UserA 进入聊天室
[2025-03-15 08:40:39] 公共消息 UserA: Hello everyone, this is UserA
==================================================
在线用户列表:
0c868691 | a
466947dd | b
==================================================
[2025-03-15 08:40:43] 系统通知: UserA 已离开
==================================================
在线用户列表:
0c868691 | a
466947dd | b
432066ef | UserA
==================================================
[2025-03-15 08:41:20] 系统通知: UserA 进入聊天室
[2025-03-15 08:41:21] 公共消息 UserA: Hello everyone, this is UserA
==================================================
在线用户列表:
0c868691 | a
466947dd | b
==================================================
[2025-03-15 08:41:25] 系统通知: UserA 已离开
客户端2:
(.venv) PS C:\Users\wangrusheng\PycharmProjects\FastAPIProject> python client.py
请输入用户名: b
输入消息(格式:@用户ID 内容 或 直接输入内容):
==================================================
在线用户列表:
0c868691 | a
466947dd | b
==================================================
[2025-03-15 08:26:47] 系统通知: b 进入聊天室
hello
输入消息(格式:@用户ID 内容 或 直接输入内容):
[2025-03-15 08:26:52] 公共消息 b: hello
[2025-03-15 08:27:05] 公共消息 a: good boy
@0c868691 what is your name
输入消息(格式:@用户ID 内容 或 直接输入内容):
[2025-03-15 08:27:25] 私聊来自 b: what is your name
[2025-03-15 08:27:46] 私聊来自 a: i name is bob
==================================================
在线用户列表:
0c868691 | a
466947dd | b
74cfb061 | UserA
==================================================
[2025-03-15 08:40:37] 系统通知: UserA 进入聊天室
[2025-03-15 08:40:39] 公共消息 UserA: Hello everyone, this is UserA
==================================================
在线用户列表:
0c868691 | a
466947dd | b
==================================================
[2025-03-15 08:40:43] 系统通知: UserA 已离开
==================================================
在线用户列表:
0c868691 | a
466947dd | b
432066ef | UserA
==================================================
[2025-03-15 08:41:20] 系统通知: UserA 进入聊天室
[2025-03-15 08:41:21] 公共消息 UserA: Hello everyone, this is UserA
==================================================
在线用户列表:
0c868691 | a
466947dd | b
==================================================
[2025-03-15 08:41:25] 系统通知: UserA 已离开
step1:C:\Users\wangrusheng\PycharmProjects\FastAPIProject\main.py
import asyncio
import json
import uuid
import datetime
import logging
from typing import Dict
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ChatServer")
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, dict] = {}
async def connect(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, client_id: str, username: str):
self.active_connections[client_id] = {
"reader": reader,
"writer": writer,
"username": username,
"addr": writer.get_extra_info('peername')
}
logger.info(f"用户 {username}({client_id}) 已连接")
await self._broadcast_user_list()
await self._send_system_message(f"{username} 进入聊天室")
async def disconnect(self, client_id: str):
if client_id in self.active_connections:
user = self.active_connections[client_id]
user["writer"].close()
del self.active_connections[client_id]
logger.info(f"用户 {user['username']}({client_id}) 已断开")
await self._broadcast_user_list()
await self._send_system_message(f"{user['username']} 已离开")
async def handle_message(self, sender_id: str, data: dict):
if data["type"] == "private":
await self._send_private_message(
sender_id=sender_id,
recipient_id=data["to"],
content=data["content"]
)
else:
await self._broadcast_message(sender_id, data["content"])
async def _send_private_message(self, sender_id: str, recipient_id: str, content: str):
sender = self.active_connections.get(sender_id)
recipient = self.active_connections.get(recipient_id)
if sender and recipient:
message = {
"type": "private",
"from": sender_id,
"to": recipient_id,
"sender_name": sender["username"],
"content": content,
"timestamp": self._current_time()
}
await self._send_to_client(recip_id=recipient_id, message=message)
await self._send_to_client(recip_id=sender_id, message=message) # 回显
async def _broadcast_message(self, sender_id: str, content: str):
sender = self.active_connections.get(sender_id)
if sender:
message = {
"type": "public",
"from": sender_id,
"sender_name": sender["username"],
"content": content,
"timestamp": self._current_time()
}
for client_id in self.active_connections:
await self._send_to_client(client_id, message)
async def _send_system_message(self, content: str):
message = {
"type": "system",
"content": content,
"timestamp": self._current_time()
}
for client_id in self.active_connections:
await self._send_to_client(client_id, message)
async def _broadcast_user_list(self):
users = [{
"client_id": cid,
"username": info["username"]
} for cid, info in self.active_connections.items()]
message = {
"type": "user_list",
"users": users
}
for client_id in self.active_connections:
await self._send_to_client(client_id, message)
async def _send_to_client(self, recip_id: str, message: dict):
try:
writer = self.active_connections[recip_id]["writer"]
data = json.dumps(message) + "\n"
writer.write(data.encode())
await writer.drain()
except (KeyError, ConnectionError):
await self.disconnect(recip_id)
def _current_time(self):
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
manager = ConnectionManager()
async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
client_id = str(uuid.uuid4())[:8]
try:
# 接收初始化信息
data = await reader.readuntil(b"\n")
init_data = json.loads(data.decode().strip())
username = init_data.get("username", f"用户{client_id}")
await manager.connect(reader, writer, client_id, username)
while True:
data = await reader.readuntil(b"\n")
msg = json.loads(data.decode().strip())
await manager.handle_message(client_id, msg)
except (asyncio.IncompleteReadError, json.JSONDecodeError):
logger.error("收到无效数据")
except ConnectionResetError:
logger.info("客户端强制断开连接")
finally:
await manager.disconnect(client_id)
writer.close()
async def main():
server = await asyncio.start_server(
handle_client,
host="0.0.0.0",
port=8000
)
async with server:
await server.serve_forever()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("服务器已关闭")
step2:C:\Users\wangrusheng\PycharmProjects\FastAPIProject\client.py
import asyncio
import json
import sys
class ChatClient:
def __init__(self):
self.reader = None
self.writer = None
self.client_id = ""
self.username = ""
async def connect(self, host: str, port: int):
self.reader, self.writer = await asyncio.open_connection(host, port)
self.username = input("请输入用户名: ")
await self._send_init_message()
async def _send_init_message(self):
init_msg = {"username": self.username}
await self._send_message(init_msg)
async def _send_message(self, msg: dict):
data = json.dumps(msg) + "\n"
self.writer.write(data.encode())
await self.writer.drain()
async def receive_messages(