本博客的所有原创文章采用创作公用版协议。要求署名、非商业用途和保持一致。要求署名必须包含我的网名(geokai)以及文章来源(选择博客首地址或者具体博文地址)。
商业性使用须预先征得本人同意(发送Email到 geokai@126.com)
18年下半年太忙了,直接停止软件的开发计划。在18年最后几天使用python自带的Tkinter框架简单的先把软件功能实现出来了。占时把这一期的标题改成Tkinter开发上位机软件。
先说一下软件实现的功能把
1)获取二氧化碳传感器探头的数据,使用到pyserial,crcmod库
2)使用matplolib进行实时绘图,使用到matplotlib库
3)定时将数据回传到邮箱,使用到email,smtplib库
这里只放出最核心部分的代码
导入关键的库
#导入数值GUI框架 import tkinter as tk from tkinter import scrolledtext #导入绘图包 from matplotlib.backends.backend_tkagg import ( FigureCanvasTkAgg, NavigationToolbar2Tk) # Implement the default Matplotlib key bindings. #from matplotlib.backend_bases import key_press_handler from matplotlib.figure import Figure import matplotlib.dates as mdates #导入数学计算包 import pandas as pd import numpy as np #导入系统包 import threading import time from datetime import datetime,timedelta import serial.tools.list_ports import crcmod #导入网络包,邮件发送 import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.image import MIMEImage from email.header import Header
GUI框架,其中包括绘图部分
#定义GUI界面及功能 class Application(tk.Tk): def __init__(self): \'\'\'初始化\'\'\' self.createWidgets() def createWidgets(self): \'\'\'设置绘图区\'\'\' self.fig = Figure(figsize=(10,5), dpi=100) self.ax_co2 = self.fig.add_subplot(1,1,1) self.canvas = FigureCanvasTkAgg(self.fig, master=self) self.canvas.get_tk_widget() self.canvas._tkcanvas.place(x=0, y=0, width=1024, height=350)#pack(side=tk.TOP, fill=tk.BOTH, expand=1) \'\'\'设置二氧化碳的数据接收区\'\'\' self.log_co2 = scrolledtext.ScrolledText(self, font=("Calibri", 8), background=\'#ffffff\') self.log_co2.place(x=720, y=450, width=300, height=60) self.log_co2.insert(tk.END,\'Strat\r\n\') self.log_co2_neat = scrolledtext.ScrolledText(self, font=("Calibri", 10), background=\'#ffffff\') self.log_co2_neat.place(x=720, y=530, width=300, height=100) self.log_co2_neat.insert(tk.END,\'Strat\r\n\') self.log_co2_col_name=tk.Label(bg=\'gray\', font=("Calibri", 10), justify=tk.LEFT, anchor=tk.W, text=\'Time\t\tCO2(ppm)\') self.log_co2_col_name.place(x=720, y=510, width=300, height=20) \'\'\'设置按钮区\'\'\' self.bt_connect_str = tk.StringVar() if self.trans_data_status==False: self.bt_connect_str.set(\'开始传输数据\') else: self.bt_connect_str.set(\'停止传输数据\') self.bt_connect = tk.Button(self, textvariable=self.bt_connect_str, command=self.ActivateTrans) self.bt_connect.pack(side=tk.LEFT , anchor=tk.S) self.Draw() # 绘图 \'\'\'设置状态区\'\'\' self.lb_port_co2_status=tk.Label(bg=\'red\', width=10, height=1, text=\'CO2 Status\') self.lb_port_co2_status.pack(side=tk.RIGHT , anchor=tk.S) def ActivateTrans(self): \'\'\' 点击数据传输按钮后激活数据传输 1)激活一个从串口获取二氧化碳数据的线程 3)激活邮件发送线程 4)运行以上三个线程,并判断是否正确连接串口,并显示串口连接状态 ***注意以上三个线程的功能较为复杂,使用了单独的Thread类进行了继承,因此停止线程 采用定义在类里面的Stop()方法 5)激活文本数据刷新线程 6)激活绘图区刷新线程 ***以上两个线程仅有单独的函数并且封装在窗体类下,直接采用Threading类进行定义,所 以需要注意停止需采用threading.Event()方法进行停止 \'\'\' if self.trans_data_status==False: self.trans_thread_co2 = Thread_CO2(0, "Thread_CO2_1") self.trans_thread_co2.setDaemon(True) self.trans_thread_co2.start() if self.trans_thread_co2.port_available==True: self.lb_port_co2_status.config(bg=\'green\') print(\'CO2 Port Failed to Connect\') self.email_thread_event = threading.Event() self.email_thread = threading.Thread(target = SendEmail, args=(EMAIL_RESEND_INTERVAL, self.email_thread_event)) self.email_thread.start() self.refresh_thread_event = threading.Event() self.refresh_thread = threading.Thread(target = self.RefreshThread, args=(1, self.refresh_thread_event)) self.refresh_thread.start() self.redraw_thread_event = threading.Event() self.redraw_thread = threading.Thread(target = self.ReDrawThread, args=(10, self.redraw_thread_event)) self.redraw_thread.start() self.trans_data_status=True self.bt_connect_str.set(\'停止传输数据\') else: self.trans_thread_co2.Stop() self.lb_port_co2_status.config(bg=\'red\') self.lb_port_hg_status.config(bg=\'red\') self.trans_data_status=False self.bt_connect_str.set(\'开始传输数据\') self.email_thread_event.set() self.refresh_thread_event.set() self.redraw_thread_event.set() # self.email_thread.join(0) def RefreshThread(self, time_interval, stop_event): \'\'\' 原始数据刷新程序 \'\'\' while(not stop_event.is_set() ): print(\'refresh\') self.RefreshDataText() #pinrt(time_interval) time.sleep(time_interval) def RefreshDataText(self): \'\'\' 判断是否有新的数据并显示在文本框中 \'\'\' text = self.log_co2.get(0.0, tk.END).splitlines() # print(raw_trans_data_co2) if len(raw_trans_data_co2)>0 : if raw_trans_data_co2[-1]==text[-2]: pass else: self.log_co2.insert(tk.END, raw_trans_data_co2[-1]+\'\r\n\') self.log_co2.see(tk.END) #print(np.array(compiled_data_co2.iloc[-1])) if len(compiled_data_co2)>0: one_data = np.array(compiled_data_co2.iloc[-1]) #print(one_data) one_data = str(one_data[0]) + \'\t\t\' + str(one_data[1]) text = self.log_co2_neat.get(0.0, tk.END).splitlines() if one_data==text[-2]: print(\'same\') else: self.log_co2_neat.insert(tk.END, one_data + \'\r\n\') self.log_co2_neat.see(tk.END) def AdjustScale(self,_): \'\'\' 调整绘图区坐标轴范围 \'\'\' def ReDrawThread(self, time_interval, stop_event): \'\'\' 绘图区刷新程序 \'\'\' while(not stop_event.is_set()): try: self.Draw() except: pass time.sleep(time_interval) def Draw(self): \'\'\' 实时绘图程序 TODO: 1)个人认为使用matplotlib的这种绘图方式效率有些底下,是否采用诸如Animation的 动态绘图功能改善绘图性能有待检验 \'\'\' #判断是否有有效数据 if len(compiled_data_co2)>0 or len(compiled_data_hg)>0: #由于二氧化碳数据量太大,选择最后16000条数据,足够保证最大3天的显示量,降低绘图负担 #注意原始数据中时间数据最好经过to_datetime函数规整一遍,以免造成数据错误 co2_x_data = pd.to_datetime(compiled_data_co2.iloc[-16000:,0]) co2_y_data = compiled_data_co2.iloc[-16000:,1] # co2_xlim_min = datetime.strptime(co2_x_data.iloc[-1], \'%Y-%m-%d %H:%M:%S\') - self.fig_xlim_delta co2_xlim_min = co2_x_data.iloc[-1] - self.fig_xlim_delta print( co2_y_data.min()) self.ax_co2.clear() self.ax_co2.xaxis.set_major_formatter(mdates.DateFormatter(\'%m-%d\n%H:%M\')) self.ax_co2.xaxis.set_major_locator(mdates.AutoDateLocator()) self.ax_co2.scatter(co2_x_data.values,co2_y_data.values, s = 1, c=\'green\') self.ax_co2.set_xlim(co2_xlim_min,co2_x_data.iloc[-1]+ self.fig_xlim_delta/9) self.ax_co2.set_ylim(co2_y_data.min(), co2_y_data.min()+self.fig_co2_ylim_delta) self.ax_co2.set_ylabel(\'$CO_2(ppm)$\') self.ax_co2.grid(linestyle=\'--\') self.fig.savefig(\'D:/figure.png\') self.canvas.draw() def _quit(self): \'\'\'退出\'\'\' if self.trans_data_status==True: self.ActivateTrans() self.quit() # 停止 mainloop self.destroy() # 销毁所有部件
二氧化碳数据的传输
class Thread_CO2 (threading.Thread): \'\'\' 接收CO2数据的线程 该CO2探头的购买链接https://m.tb.cn/h.3phrPcr \'\'\' def __init__(self, threadID, name): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.port_available = False com_list = serial.tools.list_ports.comports() for port in com_list: print(port.device) #返回端口号如COM3 print(port.description) #返回设备名字 print(port.pid) #返回设备在计算机上的位置 if port.pid==29987: port_num_co2=port.device self.port_available = True self.ser_co2=serial.Serial() if self.port_available==True: self.ser_co2.port=port_num_co2 self.ser_co2.baudrate=19200 self.ser_co2.parity=serial.PARITY_EVEN self.ser_co2.timeout=0.5 self.__runing_flag=True def run(self): print ("开始线程:" + self.name) if self.port_available==False: if DEBUG_MODE==True: while(self.__runing_flag): self.FakeData() time.sleep(1) return 1 if not self.ser_co2.is_open: self.ser_co2.open() print(self.ser_co2) while(self.__runing_flag): self.GetData() time.sleep(CO2_REFRESH_INTERVAL) def GetData(self): \'\'\' 获取CO2数据需要注意进制的转换,以及最终的CRC16校验 CRC的校验使用CRCMOD库,不同类型的CRC均可以采用此库进行计算 其中特别要关注poly这个参数,参考http://www.ip33.com/crc.html 在该网站查询CRC多项式,并在开头补1 \'\'\' retry_time= 10 while(retry_time): request_code_co2=[] #首先配置需要发送的信息,Serial库接收直接以0-255的int值 #因此需要将16进制字符串转换为10进制整数 for i in \'15 04 13 8B 00 01 46 70 \'.split(): request_code_co2.append(int(i,16)) # print(request_code_co2) if not self.ser_co2.is_open: self.ser_co2.open() self.ser_co2.write(request_code_co2) #获取的格式为b\'\',byte型 temp =self.ser_co2.readline() #print(temp) #定义CRC,并计算CRC crc16_func = crcmod.Crc(poly=0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000) crc16_func.update(temp[:-2]) co2_conc=[] #CRC的计算结果为hex型,采用bytes.fromhex()转换为byte再与传输的最后两位byte对比,注意顺序 #如果获取成功就退出,没有成功则重复,最多10次 #TODO:如果多词未获取成功,未来需要加入一个错误信息日志 if bytes.fromhex(crc16_func.hexdigest()) == temp[-2:][::-1]: print(temp[-4:-2]) co2_conc = (int.from_bytes(temp[-4:-2], byteorder=\'big\', signed=False)) break retry_time-=1 #raw_trans_data_co2用来显示文本信息,需要将DateTime和获取的16进制值转换成str类型,否则文本框无法显示 #TODO:这个语句应该可以优化 co2_one_data_raw = str([temp.hex()[x*2:x*2+2] for x in range(len(temp.hex())//2) ]) co2_one_data_time = (datetime.now().strftime(\'%Y-%m-%d %H:%M:%S\')) raw_trans_data_co2.append(str(co2_one_data_time) + co2_one_data_raw) try: if co2_conc>=0: compiled_data_co2.loc[len(compiled_data_co2)] = [co2_one_data_time, co2_conc] except: pass with open(\'D:/raw_data_CO2.txt\', mode=\'a\') as f: f.write(str(co2_one_data_time)) f.write(\'\t\') f.write(str(co2_conc)) f.write(\'\r\n\') #print(compiled_data_co2) def FakeData(self): #产生伪数据 def Stop(self): if self.ser_co2.is_open and self.port_available==True: self.ser_co2.close() print(self.ser_co2) self.__runing_flag=False