用python的asyncio解析http响应

时间:2024-12-13 07:07:24

文章目录

  • 建立TLS保护的tcp连接
  • 发送http请求
  • 获取响应状态码
  • 获取响应头
  • 获取消息体
    • 非chunk格式的body
    • chunk格式的body
  • 模拟wget基础功能的完整代码

asyncio是python里实现non-blocking I/O的标准库;
asyncio提供的IO方法会释放当前线程资源,等操作系统通知IO结果后再恢复执行

建立TLS保护的tcp连接

核心方法:asyncio.open_connection

async def _tsl_connect(host: str) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
    # 创建一个默认的SSL上下文
    ssl_context = ssl.create_default_context()
    # 让python3.10降低安全级别,以允许更多的签名类型
    ssl_context.set_ciphers("ALL:@SECLEVEL=1")
    # 连接443端口,用server_hostname参数提供SNI
    reader, writer = await asyncio.open_connection(
        host=host, port=443, server_hostname=host, ssl=ssl_context
    )
    return reader, writer

发送http请求

async def _send_request(writer, host, url):
    req = f"GET {url} HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n"
    writer.write(req.encode("utf-8"))
    # 发送空行表示HTTP头部发送完毕
    writer.write(b"\r\n")
    await writer.drain()

获取响应状态码

async def _receive_http_status(reader) -> Tuple[str, int]:
    # 读取状态行
    status_line = await reader.readexactly(b"\r\n")
    status_parts = status_line.rstrip(b"\r\n").decode("utf-8").split(" ", 2)
    if len(status_parts) < 2:
        raise ValueError("无效的状态行")
    version = status_parts[0]  # 例如: 'HTTP/1.1'
    status_code = int(status_parts[1])  # 例如: 200
    return version, status_code

获取响应头

async def _receive_http_headers(reader) -> dict:
    headers = {}
    while True:
        line = await reader.readexactly(b"\r\n")
        # 空行表示HTTP头部接收完毕
        if line == b"\r\n":
            break
        # 读取header
        header, value = line.rstrip(b"\r\n").decode("utf-8").split(": ", 1)
        # http允许重复的header;所以当header重复,用数组记录对应的值
        if header in headers:
            # 判断value是不是数组
            if isinstance(headers[header], list):
                headers[header].append(value)
            else:
                headers[header] = [headers[header], value]
        else:
            headers[header] = value
    return headers

获取消息体

非chunk格式的body

Content-Length指定了具体的body字节数

async def _receive_body_by_length(reader, length):
    bodyBytes = await reader.readexactly(length)
    assert len(bodyBytes) == length
    bodyStr = bodyBytes.decode("utf-8")
    return bodyStr

chunk格式的body

Content-Length没有值,Transfer-Encoding指定了格式为chunked

seperator = b"\r\n"
endec = "utf-8"
async def _receive_body_by_chunk(reader, trailer) -> Tuple[str, Optional[str]]:
    chunks = []
    trailerValue = None
    while not reader.at_eof():
        line = await reader.readuntil(seperator)
        # 获取chunk长度信息,是16进制表示的值
        chunkLength = int(line.rstrip(seperator), 16)
        if chunkLength > 0:
            chunks.append(await reader.readexactly(chunkLength))
            # chunk正文后面紧跟\r\n
            assert seperator == await reader.readuntil(seperator)
        else:
            # body chunk已经全部读取完毕
            if trailer is not None:
                # 开始处理trailer
                line = await reader.readuntil(seperator)
                header, trailerValue = (
                    line.rstrip(seperator).decode(endec).split(": ", 1)
                )
                assert header == trailer
                trailer = None
            # 用\r\n完结body
            assert seperator == await reader.readuntil(seperator)
            reader.feed_eof()
    return b"".join(chunks).decode(endec), trailerValue

模拟wget基础功能的完整代码

async def wget(host, url):
    reader, writer = await _tsl_connect(host)
    await _send_request(writer, host, url)

    version, status = await _receive_http_status(reader)
    print(f"【{host}{version} {status}")
    # 读取HTTP响应:
    headers = await _receive_http_headers(reader)
    contentLength = -1
    for key in headers:
        if isinstance(headers[key], list):
            print(f"【{host}{key}: ", headers[key])
        else:
            if "Content-Length" == key:
                contentLength = int(headers["Content-Length"])
            if "Transfer-Encoding" == key:
                assert headers[key] == "chunked"
            print(f"【{host}{key}: {headers[key]}")
    # Trailer指定的header字段,会在body的末尾再提供具体的值
    trailer: Optional[str] = headers.get("Trailer")
    if contentLength >= 0:
        body = await _receive_body_by_length(reader, contentLength)
    else:
        body, trailerValue = await _receive_body_by_chunk(reader, trailer)
    print(f"【{host}】 响应字符数{len(body)}: {body[:100]}")
    if trailer is not None:
        print(f"【{host}{trailer}: {trailerValue}")

    writer.close()
    await writer.wait_closed()
    print(f"Done 【{host}】.")

tasks = [
    wget("www.baidu.com", "/"),
    wget("wenku.baidu.com", "/"),
]
await asyncio.gather(*tasks)