文章目录
- 建立TLS保护的tcp连接
- 发送http请求
- 获取响应状态码
- 获取响应头
- 获取消息体
- 非chunk格式的body
- chunk格式的body
- 模拟wget基础功能的完整代码
asyncio是python里实现non-blocking I/O的标准库;
asyncio提供的IO方法会释放当前线程资源,等操作系统通知IO结果后再恢复执行
建立TLS保护的tcp连接
核心方法:asyncio.open_connection
async def _tsl_connect(host: str) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
# 创建一个默认的SSL上下文
ssl_context = ssl.create_default_context()
# 让python3.10降低安全级别,以允许更多的签名类型
ssl_context.set_ciphers("ALL:@SECLEVEL=1")
# 连接443端口,用server_hostname参数提供SNI
reader, writer = await asyncio.open_connection(
host=host, port=443, server_hostname=host, ssl=ssl_context
)
return reader, writer
发送http请求
async def _send_request(writer, host, url):
req = f"GET {url} HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n"
writer.write(req.encode("utf-8"))
# 发送空行表示HTTP头部发送完毕
writer.write(b"\r\n")
await writer.drain()
获取响应状态码
async def _receive_http_status(reader) -> Tuple[str, int]:
# 读取状态行
status_line = await reader.readexactly(b"\r\n")
status_parts = status_line.rstrip(b"\r\n").decode("utf-8").split(" ", 2)
if len(status_parts) < 2:
raise ValueError("无效的状态行")
version = status_parts[0] # 例如: 'HTTP/1.1'
status_code = int(status_parts[1]) # 例如: 200
return version, status_code
获取响应头
async def _receive_http_headers(reader) -> dict:
headers = {}
while True:
line = await reader.readexactly(b"\r\n")
# 空行表示HTTP头部接收完毕
if line == b"\r\n":
break
# 读取header
header, value = line.rstrip(b"\r\n").decode("utf-8").split(": ", 1)
# http允许重复的header;所以当header重复,用数组记录对应的值
if header in headers:
# 判断value是不是数组
if isinstance(headers[header], list):
headers[header].append(value)
else:
headers[header] = [headers[header], value]
else:
headers[header] = value
return headers
获取消息体
非chunk格式的body
Content-Length指定了具体的body字节数
async def _receive_body_by_length(reader, length):
bodyBytes = await reader.readexactly(length)
assert len(bodyBytes) == length
bodyStr = bodyBytes.decode("utf-8")
return bodyStr
chunk格式的body
Content-Length没有值,Transfer-Encoding指定了格式为chunked
seperator = b"\r\n"
endec = "utf-8"
async def _receive_body_by_chunk(reader, trailer) -> Tuple[str, Optional[str]]:
chunks = []
trailerValue = None
while not reader.at_eof():
line = await reader.readuntil(seperator)
# 获取chunk长度信息,是16进制表示的值
chunkLength = int(line.rstrip(seperator), 16)
if chunkLength > 0:
chunks.append(await reader.readexactly(chunkLength))
# chunk正文后面紧跟\r\n
assert seperator == await reader.readuntil(seperator)
else:
# body chunk已经全部读取完毕
if trailer is not None:
# 开始处理trailer
line = await reader.readuntil(seperator)
header, trailerValue = (
line.rstrip(seperator).decode(endec).split(": ", 1)
)
assert header == trailer
trailer = None
# 用\r\n完结body
assert seperator == await reader.readuntil(seperator)
reader.feed_eof()
return b"".join(chunks).decode(endec), trailerValue
模拟wget基础功能的完整代码
async def wget(host, url):
reader, writer = await _tsl_connect(host)
await _send_request(writer, host, url)
version, status = await _receive_http_status(reader)
print(f"【{host}】 {version} {status}")
# 读取HTTP响应:
headers = await _receive_http_headers(reader)
contentLength = -1
for key in headers:
if isinstance(headers[key], list):
print(f"【{host}】 {key}: ", headers[key])
else:
if "Content-Length" == key:
contentLength = int(headers["Content-Length"])
if "Transfer-Encoding" == key:
assert headers[key] == "chunked"
print(f"【{host}】 {key}: {headers[key]}")
# Trailer指定的header字段,会在body的末尾再提供具体的值
trailer: Optional[str] = headers.get("Trailer")
if contentLength >= 0:
body = await _receive_body_by_length(reader, contentLength)
else:
body, trailerValue = await _receive_body_by_chunk(reader, trailer)
print(f"【{host}】 响应字符数{len(body)}: {body[:100]}")
if trailer is not None:
print(f"【{host}】 {trailer}: {trailerValue}")
writer.close()
await writer.wait_closed()
print(f"Done 【{host}】.")
tasks = [
wget("www.baidu.com", "/"),
wget("wenku.baidu.com", "/"),
]
await asyncio.gather(*tasks)