Python通过Ymodem协议传输文件

时间:2025-03-25 08:38:44

Xmodem、Ymodem和Zmodem协议是最常用的三种通信协议。

Xmodem协议是最早的,传输128字节信息块。

Ymodem支持128字节和1024字节进行传输。

工作中需要使用到Ymodem传输文件后进行启动,找了一圈没有找到合适的,简单了解一下协议以后自己通过Python的serial模块写了一个,目前使用128字节进行传输,目前已经进行过测试,使用128和1024传输没有存在bug。

协议部分不再细说,直接上代码,可能有部分地方考虑不足的地方,欢迎留言

import os
from serial import Serial


class YModem:
    def __init__(self):
        self.ser = None
        self.EOT = b'\x04'
        self.ACK = b'\x06'
        self.NAK = b'\x15'
        self.CAN = b'\x18'
        self.CRC = b'\x43'
        self.ACK_FLAG = False

    @staticmethod
    def crc16(x, invert):
        crc_in = 0x0000
        poly = 0x1021
        for b in x:
            if type(b) is str:  # 这里做了个判断可以直接传入字符串,但存在意义不大
                crc_in ^= (ord(b) << 8)
            else:
                crc_in ^= (b << 8)
            for i in range(8):
                if crc_in & 0x8000:
                    crc_in = (crc_in << 1) ^ poly
                else:
                    crc_in = (crc_in << 1)
        s = hex(crc_in).upper()
        # return (s[-2:], s[-4:-2]) if invert else (s[-4:-2], s[-2:])
        return (s[-2:], s[-4:-2]) if invert else (s[-4:-2], s[-2:])

    @staticmethod
    def str2hex(s):
        i_data = 0
        su = s.upper()
        for c in su:
            tmp = ord(c)
            if tmp <= ord('9'):
                i_data = i_data << 4
                i_data += tmp - ord('0')
            elif ord('A') <= tmp <= ord('F'):
                i_data = i_data << 4
                i_data += tmp - ord('A') + 10
        return i_data

    def open(self, port, baud):
        try:
            self.ser = Serial(port, baud, timeout=1.2)
            print(f'port {port} open ...')
            return True
        except Exception as e:
            print(f'serial {port} is not open {str(e)}')
            return False

    def send_first(self, mode, file_name, file_size):
        if mode == 128:
            x = 1
        else:
            x = 2
        print('start send YModem first package')
        space = bytearray()
        header = bytearray([x, 0, 255])
        file_byte = bytearray([ord(i) for i in file_name])
        size = bytearray([ord(i) for i in str(file_size)])
        for i in range(mode - len(file_byte) - 1 - len(size)):
            space.append(0)
        a1, b1 = self.crc16(file_byte + bytearray([0]) + size + space, False)
        hex_a1 = self.str2hex(a1)
        hex_b1 = self.str2hex(b1)
        last = bytearray([hex_a1, hex_b1])
        data = header + file_byte + bytearray([0]) + size + space + last
        self.ser.write(data)
        print('send first package success')

    def send_data(self, file_data, mode, package_count):
        print('start send file data')
        if mode == 128:
            x = 1
        else:
            x = 2
        for i in range(1, package_count + 1):
            header = bytearray()
            header.append(x)
            header.append(i % 256)
            header.append(255 - (i % 256))
            if i == package_count:
                data = bytearray(file_data[(i - 1) * mode:])
                for j in range(mode - len(data)):
                    data.append(26)
            else:
                data = file_data[(i - 1) * mode: i * mode]
            a1, b1 = self.crc16(data, False)
            hex_a1 = self.str2hex(a1)
            hex_b1 = self.str2hex(b1)
            last = bytearray([hex_a1, hex_b1])
            self.ser.write(header + data + last)
            # 判断ack
            for j in range(10):
                char = self.ser.read(1)
                if char == self.ACK:
                    break
                elif char == self.NAK or j == 9:
                    print('error')
                    return False
                else:
                    continue

        self.ser.write(self.EOT)
        is_ack = self.ser.read(1)
        if is_ack == self.ACK:
            return True
        print('send file data end')

    def send_last(self, mode):
        print('send last data')
        if mode == 128:
            x = 1
        else:
            x = 2
        last_package = bytearray([x, 0, 255])
        for i in range(mode):
            last_package.append(0)
        self.ser.write(last_package)
        self.ser.write(bytearray([0, 0]))
        for i in range(10):
            is_ack = self.ser.read(1)
            if is_ack == self.ACK:
                print('send last ok')
                return True
        print('send last error')
        return False

    def send(self, file_path, mode=128):
        if not os.path.exists(file_path):
            print('file is not found')
            return False
        data_byte = bytes()
        with open(file_path, 'rb') as f:
            for i in f.readlines():
                data_byte += i
        name = os.path.basename(file_path)
        file_size = os.path.getsize(file_path)
        if file_size % mode >= 1:
            package_count = int(file_size / mode) + 1
        else:
            package_count = int(file_size / mode)
        count = 0
        while True:
            count += 1
            if count == 50:
                print('recv char is not crc')
                return False
            char = self.ser.read(1)
            if char == self.CRC:
                print('recv char is crc')
                self.send_first(mode, name, file_size)
                for i in range(30):
                    is_ack = self.ser.read(1)
                    if is_ack == self.ACK:
                        self.ACK_FLAG = True
                        break
                if self.ACK_FLAG:
                    self.send_data(data_byte, mode, package_count)
                    if self.send_last(mode):
                        return True
                    else:
                        print('send data ok but not start app')
            continue

    def close(self):
        self.ser.close()
        self.ser = None


if __name__ == '__main__':
    y_mode = YModem()
    y_mode.open('COM13', 921600)
    y_mode.send(r'.\')
    y_mode.close()

相关文章