python项目之网络聊天室_Python实现多人聊天室

时间:2025-03-25 11:01:18

项目简介

编程语言:Python3

界面实现:Pyside2(通过QT Designer设计)

基于TCP网络编程

项目概略图:

实现功能

(1)   客户端通过服务器访问数据库,进行登录注册

(2)   群发功能(默认是群发,所有在线用户可见)

(3)   私发功能(需要选定用户,发送消息为私发)

(4)   上线通知(已经在线用户收到其上线通知,刚上线用户收到欢迎语,并实时修改当前在线用户列表和在线人数)

(5)   下线通知(用户下线后通知在线用户,并实时修改在线列表和在线人数)

(6)   服务器可访问数据库,检查登陆信息是否正确,并向客户端返回登陆信息

(7)   服务器提供注册检查,发送是否注册

(8)   转发消息(群发,私发,上线,下线通知)

(9)   显示当前在线用户,用户下线时将其从列表中删除

服务端代码

# Author:wenfei

import datetime

import socket

import threading

import json

import pymysql

host ="127.0.0.1"

prot =8080

socket = (socket.AF_INET, socket.SOCK_STREAM)

((host, prot))# 服务端用bind,客户端用连接connec

(5)

condi = ()

user_list = {}# 存储用户名称和服务端与改用户的socket字典

conn_list = {}#  存储服务端与所有的客户端的地址与socket

thr = {}# 存储所开启的线程

if_the_first =True  # 第一次为发送开启线程

out_message = []# 服务端向外发送的消息存储

user_name = []#  用户名称列表

user_conn = []# 用户连接列表

def edit_stats(name):# 获得已经登录用户的名称

connection = (host="localhost",port=3306,user="root",password="",db="test",

charset="utf8mb4",cursorclass=)

cursor = ()

sql ="update user set stats = 0 where name = '%s'"%(name)

(sql)

()

()

def check_login(name, passwd):

print("执行查询语句\n")

stats =0

connectoin = (host="localhost",port=3306,user="root",password="",db="test",

charset="utf8mb4",cursorclass=)

cursor = ()

sql ="select *from user"

(sql)

result = ()

print("打印查询结果:\n", result)

for datain result:

if data['name'] == nameand data['passwd'] == passwd:

if not data['stats']:

sql ="update user set stats = 1 where name = '%s'" % (name)

try:

(sql)

()# 提交到数据库执行

stats =1  # 返回1表示登录成功

except:

()# 发生错误时,回滚

else:

stats =2  #2表示用户已近登录

else:

pass

()

return stats

def check_register(name, passwd):

stats =0

connection = (host="localhost",port=3306,user="root",password="",db="test",

charset="utf8mb4",cursorclass=)

cursor = ()

sql ="select name from user"

(sql)

result = ()

print("服务端打印  注册查询结果", result)

if_exist =False

for datein result:

if name == date['name']:

if_exist =True

if  not if_exist:

sql ="insert into user(name, passwd, stats) value ('%s','%s', 0)" % (name, passwd)

(sql)

()

stats =1

()

return stats

def threadOut(conn):

global out_message, user_list, message, user_name, user_conn

while True:

if len(out_message) ==2:

print("是都有数据发送", out_message)

try:

js = (out_message[0])

print("取得js值", js)

# 登录消息

if js['type'] =='login':

print("登录类型")

nickname =str(js['nickname'])

passwd =str(js['passwd'])

stats = check_login(nickname, passwd)

out_message[1].send(({

'type':'check',

'stats':stats

}).encode())

print("服务端登录验证发送成功")

if stats ==1:# 表示当前用户登录成功

user_list[nickname] = out_message[1]# 将当前用户加入到列表中r,并存储该用户的连接

print("服务端打印当前登录用户的信息", nickname, out_message[1])

print("打印全局连接字典的值", conn)

user_name.append(nickname)

for conin ():

if con != out_message[1]:

(({'type':'notify',

'message':"系统通知:" +  nickname +'上线',

'datetime':((),'%Y-%m-%d %H:%M:%S'),

'current_user_name_list':repr(user_name),

}).encode())

print("服务端通知其他用户")

else:

print("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%")

# 套接字直接发送过去,不能用eval和exec函数进行解析,因为其中含有<>字符

# 在发送之前先将字典转换为两个列表

# 在接收到之后,将两个列表合并为字典

out_message[1].send(({

'type':'welcome',

'message':"welcome " + nickname +" to the chat room ",

'current_user_name_list':repr(user_name),

'datetime': ((),'%Y-%m-%d %H:%M:%S')

}).encode())

print("服务端给当前用户发送欢迎")

print('登录成功!')

# 群发消息

elif js['type'] =='broadcast':

print("广播类型")

message = js['message']

nickname = js['nickname']

date_time = js['datetime']

for key, valuein ():

conn[key].send((

{

'type':'message',

'message': nickname +':' + message,

'datetime':date_time

}).encode())

print("服务端发送消息成功")

print("打印当前所有用户信息", conn_list)

# 注册类型

elif js['type'] =='register':

stats = check_register(js['register_name'], js['register_passwd'])

out_message[1].send(({

'type':'register_check',

'stats':stats

}).encode())

# 私发消息

elif js['type'] =='sendto':

print("私发类类型")

who = js['who']

nickname = js['nickname']

message = js['message']

date_time = js['datetime']

# 检查用户是否存在

if whonot in user_list.keys():

out_message[1].send((

{

'type':'message',

'message': who +' 不在线 .please try later.',

'datetime':((),'%Y-%m-%d %H:%M:%S')

}).encode())

print("服务端确认用户不在线")

else:

user_list[who].send((

{

'type':'message',

'message':"私聊信息:" + nickname +' whisper to you: ' + message,

'datetime':date_time

}).encode())

print("服务端私发信息成功")

# 离线消息

elif js['type'] =='offline':

date_time = js['datetime']

off_name = js['off_name']

print(message)

for key, valuein ():

conn[key].send((

{

'type':'offline',

'message':'系统通知:' + off_name +' 下线了 !!',

'off_name':off_name,

'datetime':date_time

}).encode())

print("服务端离线信息通知在线用户")

out_message.clear()

except :

print("服务端发送消息时出现错误")

out_message.clear()

def threadIn(conn):# 接收消息

global out_message, conn_list, user_list, user_name

while True:

try:

temp = (1024)

if not temp:

print("跳出循环")

break

else:

out_message.clear()

out_message.append(())#将受到的消息复制给全局变量

out_message.append(conn)# 存储收到当前消息的socket 连接对象

except:

out_message.clear()

for keyin list(user_list.keys()):

if user_list[key] == conn:

user_list.pop(key)#将当前用户从在线字典中删除

user_name.remove(key)# 将当前用户从在线列表中删除

out_message.append(({

'type':'offline',

'off_name':key,

'datetime':((),'%Y-%m-%d %H:%M:%S')

}))

out_message.append(conn)

for keyin list(conn_list.keys()):

if conn_list[key] == conn:

conn_list.pop(key)

break

while True:

conn, client_address = ()

print("有新用户连接", conn, client_address )

conn_list[client_address] = conn

print("打印全局变量conn_list的值", conn_list)

thr[client_address] = (target=threadIn,args=(conn,))

if if_the_first:

thr['fasong'] = (target=threadOut,args=(conn_list,))

thr['fasong'].start()# 发送只开启一个线程

print("启动发送线程")

if_the_first =False

thr[client_address].start()

登录注册界面(客户端)

import json

from QApplication, QMessageBox, QWidget

from QUiLoader

from QFile

import socket

from loginimport Login

class Main_window(QWidget):

def __init__(self):

super(Main_window,self).__init__()

main_xml = QFile("E:/Python_demo/ui/login_register.ui")

main_xml.open()

main_xml.close()

self.main_ui = QUiLoader().load(main_xml)

self.main_ui.register_btn.()# 给登录按钮绑定点击事件

self.main_ui.login_button.()# 给注册按钮绑定点击事件

self.check_stats =False

= (socket.AF_INET, socket.SOCK_STREAM)

(('127.0.0.1',8080))

def login(self):

name =self.main_ui.()

passwd =self.main_ui.()

print("点击登录按钮,获取到账号密码", name, passwd)

(({

'type':'login',

'nickname':name,

'passwd':passwd

}).encode())

print("登录窗口发送账号密码成功")

recive =(1024)

print("客户端收到验证结果", ())

js = (())

print("客户端收到js数据", js)

if js['type'] =='check':

stats = js['stats']

if stats ==0:# 表示用户名或密码不正确

(self.main_ui,

"警告",

"用户名或密码错误")

elif stats ==1:

login = Login(name,)

login.chat_window.show()

print("登录成功")# 此时关闭窗口,将socket传给子窗口

self.main_ui.setVisible(False)

elif stats ==2:

(self.main_ui,

"提示",

"该用户已经登录")

def register(self):

send_tag =True

print("you click the registerButton")

name =self.main_ui.()

passwd =self.main_ui.()

if name =='' or passwd =='':

(self.main_ui,

"警告",

"用户名和密码不能为空")

send_tag =False

if send_tag:

(({

'type':'register',

'register_name':name,

'register_passwd':passwd

}).encode())

print("将注册用户名和密码发给服务端")

rec =(1024)

js = (())

print("从服务端收到注册验证")

if js['type'] =='register_check':

stats = js['stats']

if stats ==0:

(self.main_ui,

"提示",

"用户名已存在")

elif stats ==1:

(self.main_ui,

"提示",

"注册成功")

if __name__ =='__main__':

app = QApplication([])

main_login = Main_window()

main_login.main_ui.show()

app.exec_()

聊天界面代码

import json

from QApplication, QMessageBox, QWidget, QMainWindow, QLabel

from QUiLoader

from QFile, QThread, Signal

import threading

import pymysql

import datetime

outString =''

inString =''

IP ="192.168.0.107"

nick =''

port =8080

if_send_tag =False

tag_if_edit =False

current_user_list_of_server = []

conn_list = []

current_user_list = []

if_chat_individually =False

chat_individually_message ="chat_individually"

to_who =''

off_name =''

class Login(QMainWindow):

def __init__(self, name, sock, parent=None):# 接受一个连接的socket,登录的姓名

global current_user_list

super().__init__(parent)

= name

= sock

chat = QFile("E:/Python_demo/ui/chat_window.ui")

()

()

self.chat_window = QUiLoader().load(chat)# 加载聊天界面

self.chat_window.name_user.setText()# 设定当前用户名

self.chat_string =self.chat_window.chat_string

=self.chat_window.listWidget

(self.item_click)# 绑定在线列表的点击事件,用于私发信息

self.chat_window.send_button.(self.send_message)# 绑定发送按钮事件,用于发送消息

self.th_send = (target=self.client_send)# 默认有一个self参数,不用写进去

self.th_accept = (target=self.client_accept)# 默认有一个self参数,不用写进去

self.th_accept.setDaemon(True)

self.th_send.setDaemon(True)

= testThread(self)

=''  # 接受到服务端的信息

self.current_people_count_label =self.chat_window.user_count# 聊天界面显示当前在线人数

self.th_send.start()# 启动发送线程

self.th_accept.start()# 启动接收线程

()# 启动QThread线程,用于实时更新界面

def item_click(self, item):

global if_chat_individually, to_who

if_chat_individually =True

to_who = ()[0:().find('在')]

# print("点击了----------item-----------", ())

def send_message(self):

global if_send_tag, outString

if_send_tag =True

outString =self.chat_window.()

self.chat_window.('')

def client_send(self):

global if_send_tag, outString, if_chat_individually

while True:

if if_send_tag:

if if_chat_individually:# 判断是否是私发消息

print("客户端发送,为私发类型")

(({

'type':'sendto',

'message': outString,

'who': to_who,

'nickname':,

'datetime':((),'%Y-%m-%d %H:%M:%S')

}).encode())

print("私发类型消息发送成功")

if_chat_individually =False

if_send_tag =False

else:

print("客户端发送,为群发类型")

(({

'type':'broadcast',

'message': outString,

'nickname':,

'datetime': ((),'%Y-%m-%d %H:%M:%S')

}).encode())

print("群发类型消息发送成功")

if_send_tag =False

def client_accept(self):

global inString, tag_if_edit, current_user_list, current_user_list_of_server, off_name

while True:

try:

inString =(1024)

if not inString:# 如果没有收到

continue

else:

js = (inString)

if js['type'] =='message':

print("收到群发消息=", js['message'])

= js['message'] +'  ' + js['datetime']

elif js['type'] =='notify':

print("客户端收到服务端发来的通知信息")

current_user_list_of_server =eval(js['current_user_name_list'])

= js['message'] +'  ' + js['datetime']

elif js['type'] =='welcome':

print("收到来自服务端的欢迎信息", js['current_user_name_list'],"^^^type^^^^^^^")

current_user_list_of_server =eval(js['current_user_name_list'])

# print("打印从服务端收到的当前用户信息", current_user_list_of_server, type(current_user_list_of_server))

= js['message'] + js['datetime']

elif js['type'] =='offline':

print("收到离线消息=", js['message'])

= js['message'] +'  ' + js['datetime']

off_name = js['off_name']# 获取下线人的名称

tag_if_edit =True  # 此时可以修改显示信息

except :

print("客户端接数消息出错")

break

class testThread(QThread):

finished = Signal()

def __init__(self, win, parent=None):

super(testThread,self).__init__(parent)

= win

def run(self):

global tag_if_edit, current_user_list, current_user_list_of_server, off_name

while True:

if tag_if_edit:

.chat_string.appendPlainText(str())

print("打印当前用户字典和在线列表", current_user_list, current_user_list_of_server)

for itemin current_user_list_of_server:

.current_people_count_label.setText(str(len(current_user_list_of_server)))

print(itemnot in current_user_list)

if itemnot in current_user_list:

current_user_list.append(item)

print("子线程操作UI")

(item +"在线")# 将在线人添加到QlistWidget的item中

if off_name:

count =()

for iin range(count):

str_item =(i).text()

name = str_item[0:str_item.find('在')]

if name == off_name:

(i)# 从在线列表中删除当前下线的人

off_name =''

print("删除当前下线用户")

.current_people_count_label.setText(str(count-1))# 将在线人数减去1

print("重置当前人数")

tag_if_edit =False

# (('127.0.0.1',)# 连接

if __name__ =='__main__':

app = QApplication([])

login = Login("直接启动")

login.chat_window.show()

app.exec_()

启动