python 自动识别并连接串口的实现

时间:2022-09-14 07:40:04

这个属于我项目中一个函数,跟大家分享一下我的思路及最终实现

在编写串口通信工具中,需要实现一个函数,自动找到对应com 口,并且连接该com口,保证后续通信正常
作为初始化过程的一部分。

思路

在win 设备管理器中,经常会出现多个com 口,但并不是每个com 口都是目标设备所链接的。
尝试打开每个com 口,输入enter 按键, 正确的com 口,会有ack log 返回,表明通信 正常

否则,没有任何log 返回,则判断为非目标设备所连接的com 口。

实现

python 自动识别并连接串口的实现

尝试去打开所有com 口,然后发送enter, 如果在一段时间内有返回值,检查com 口收到的字节数,如果非零,则表明找到了对应的com 口。

完整测试代码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
import serial
import serial.tools.list_ports
import threading
import binascii
import time
from datetime import datetime
 
# default value
baunrate = 115200
is_read = false
is_write = false
write_buff = []
sys_buff = []
mserial = none
callback = none
is_opened = 0
is_registed = 0
 
 
class serialport:
 
  def __init__(self,port,buand):
 
    self.port = serial.serial(port,buand)
    self.port.close()
    if not self.port.isopen():
      self.port.open()
 
    #the index of data_bytes for read operation,私有属性
    #only used in read lines
    self.__read_ptr = 0
    self.__read_head = 0
    #store all read bytes
    # used in read date, read lines
    self.__data_bytes = bytearray()
 
  def port_open(self):
    if not self.port.isopen():
      self.port.open()
 
  def port_close(self):
    self.port.close()
 
  def send(self):
    global is_write
    global write_buff
 
    while is_write:
      if len(write_buff):
        msg = write_buff.pop(0)
        msg = msg+"\n"
        cmd = msg.encode()
        try:
 
          self.port.write(cmd)
        except:
          write_buff.clear()
          is_write = false
    write_buff.clear()
 
  def read_data(self):
    global is_read
    global is_opened
    byte_cnt = 0
    while is_read:
      try:
        count = self.port.inwaiting()
        if count > 0:
          rec_str = self.port.read(count)
          self.__data_bytes = self.__data_bytes+rec_str
        #print("receive:",rec_str.decode())
          #print(rec_str)
          byte_cnt += count
          if not is_opened:
            is_opened = 1
      #print("累计收到:",byte_cnt)
      #time.sleep(0.5)
        self.read_lines()
      except:
        deinit()
 
 
  #将当前所有的数据都读出,读取位置不变,每次读取指针依次移动,不漏数据, 读取行为一直在进行
  def read_lines(self):
    #reset
    line_cnt = 0
    data_len = len(self.__data_bytes)
    #print ("")
    #print ("begin: prt=:", self.__read_ptr, " head =", self.__read_head,"current len =",data_len)
    if self.__read_ptr >=data_len:
      return
    #get all lines in current data_bytes
    while(self.__read_ptr < data_len-1):
      if(self.__data_bytes[self.__read_ptr+1] == 0x0a and self.__data_bytes[self.__read_ptr] == 0x0d):
        tmp = bytearray()
        tmp = self.__data_bytes[self.__read_head:self.__read_ptr]
 
        try:
          line = tmp.decode()
        except:
          self.__read_head = self.__read_ptr + 2
          self.__read_ptr = self.__read_head
          continue
        iprint(line)
        line_cnt += 1
        self.__read_head = self.__read_ptr + 2
        self.__read_ptr = self.__read_head
      else:
        self.__read_ptr = self.__read_ptr + 1
 
def auto_open_serial():
  global baunrate
  global mserial
  global callback
  global is_registed
  global is_opened
  #reset
  deinit()
  # 列出所有当前的com口
  port_list = list(serial.tools.list_ports.comports())
  port_list_name = []
  #get all com
  if len(port_list) <= 0:
    iprint("the serial port can't find!")
    return false
  else:
    for itms in port_list:
      port_list_name.append(itms.device)
  #try open
  #print(port_list_name)
  for i in port_list_name:
    try:
      mserial = serialport(i,baunrate)
      iprint("try open %s"%i)
      start_task()
      send("")
      #return true
      time.sleep(1)
      if is_opened:
        iprint("connect %s successfully"%i)
        return true
      else:
        deinit()
        if i == port_list_name[len(port_list_name)-1]:
          iprint("uart don't open")
          break
        continue
    except:
      iprint(" uart don't open")
  deinit()
  return false
 
def deinit():
  global mserial
  global is_write
  global is_read
  global write_buff
  global is_opened
 
  if mserial:
    mserial.port_close()
 
  is_opened = 0
  is_read = false
  is_write = false
  write_buff = []
 
  mserial = none
  time.sleep(1)
 
def init():
  global mserial
  global callback
  global is_registed
  global is_opened
  global is_read
  global is_write
  #retry
  retry_time = 0
  while not auto_open_serial():
    if not is_opened:
      iprint("wait for uart connect, retry %s"%str(retry_time))
    else:
      return true
    retry_time += 1
    time.sleep(2)
    if retry_time == 10:
      iprint(" open uart fail")
      return false
 
def send(msg):
  global mserial
  global is_write
  global write_buff
  if is_write:
    write_buff.append(msg)
 
def start_task():
  global mserial
  global is_write
  global is_read
 
  if mserial:
    is_write = true
    t1 = threading.thread (target=mserial.send)
    t1.setdaemon (false)
    t1.start ()
 
    is_read = true
    t2 = threading.thread (target=mserial.read_data)
    t2.setdaemon (false)
    t2.start ()
 
def iprint(msg):
  global callback
  global is_registed
 
  msg = "[uart] "+str(msg)
  if is_registed:
    callback.append(msg)
  else:
    print(msg)
 
def start_sys_cmd():
  global is_registed
  if is_registed:
    t3 = threading.thread (target=process_receive_sys_cmd)
    t3.setdaemon (false)
    t3.start()
 
def process_receive_sys_cmd():
  global sys_buff
  global is_registed
  global callback
  #print("process_receive_sys_cmd")
  while is_registed:
    #print ("wait,process_receive_sys_cmd")
    if len(sys_buff):
      #print ("receive,process_receive_sys_cmd")
      line = sys_buff.pop(0)
      if "init" in line:
        if is_opened and is_read and is_write:
          iprint("already open uart")
          break
        iprint("start init")
        init()
    if is_opened:
      break
  iprint("eixt uart sys thread")
 
def register_cback(list):
  global callback
  global is_registed
 
  callback = list
  is_registed = 1
 
 
def unregister_cback():
  global callback
  callback.clear()
 
if __name__ == '__main__':
 
  receive = []
  register_cback(receive)
  sys_buff.append("init")
  start_sys_cmd()
 
  def process_receive_msg():
    global receive
    while true:
      #print("wait")
      if len(receive):
        #print("receive")
        print(receive.pop(0))
 
  t = threading.thread(target=process_receive_msg)
  t.setdaemon(false)
  t.start()

到此这篇关于python 自动识别并连接串口的实现的文章就介绍到这了,更多相关python 自动识别并连接串口内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/cai472861/article/details/106366598