本文用python在tcp的基础上实现一个http客户端, 该客户端能够复用tcp连接, 使用http1.1协议.
一. 创建http请求
http是基于tcp连接的, 它的请求报文格式如下:
因此, 我们只需要创建一个到服务器的tcp连接, 然后按照上面的格式写好报文并发给服务器, 就实现了一个http请求.
1. httpconnection类
基于以上的分析, 我们首先定义一个httpconnection类来管理连接和请求内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
class httpconnection:
default_port = 80
_http_vsn = 11
_http_vsn_str = 'http/1.1'
def __init__( self , host: str , port: int = none) - > none:
self .sock = none
self ._buffer = []
self .host = host
self .port = port if port is not none else self .default_port
self ._state = _cs_idle
self ._response = none
self ._method = none
self .block_size = 8192
def _output( self , s: union[ str , bytes]) - > none:
if hasattr (s, 'encode' ):
s = s.encode( 'latin-1' )
self ._buffer.append(s)
def connect( self ) - > none:
self .sock = socket.create_connection(( self .host, self .port))
|
对于这个httpconnection对象, 我们只需要创建tcp连接, 然后按照http协议的格式把请求数据写入buffer中, 最后把buffer中的数据发送出去就行了.
2. 编写请求行
请求行的内容比较简单, 就是说明请求方法, 请求路径和http协议. 使用下面的方法来编写一个请求行:
1
2
3
4
5
6
7
|
def put_request( self , method: str , url: str ) - > none:
self ._method = method
url = url or '/'
request = f '{method} {url} {self._http_vsn_str}'
self ._output(request)
|
3. 添加请求头
http请求头和python的字典类似, 每行都是一个字段名与值的映射关系. http协议并不要求设置所有合法的请求头的值, 我们只需要按照需要, 设置特定的请求头即可. 使用如下代码添加请求头:
1
2
3
4
5
6
7
8
9
10
11
|
def put_header( self , header: union[bytes, str ], value: union[bytes, str , int ]) - > none:
if hasattr (header, 'encode' ):
header = header.encode( 'ascii' )
if hasattr (value, 'encode' ):
value = value.encode( 'latin-1' )
elif isinstance (value, int ):
value = str (value).encode( 'ascii' )
header = header + b ': ' + value
self ._output(header)
|
此外, 在http请求中, host请求头字段是必须的, 否则网站可能会拒绝响应. 因此, 如果用户没有设置这个字段, 这里就应该主动把它加上去:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
def _add_host( self , url: str ) - > none:
# 所有http / 1.1请求报文中必须包含一个host头字段
# 如果用户没给,就调用这个函数来生成
netloc = ''
if url.startswith( 'http' ):
nil, netloc, nil, nil, nil = urllib.parse.urlsplit(url)
if netloc:
try :
netloc_enc = netloc.encode( 'ascii' )
except unicodeencodeerror:
netloc_enc = netloc.encode( 'idna' )
self .put_header( 'host' , netloc_enc)
else :
host = self .host
port = self .port
try :
host_enc = host.encode( 'ascii' )
except unicodeencodeerror:
host_enc = host.encode( 'idna' )
# 对ipv6的地址进行额外处理
if host.find( ':' ) > = 0 :
host_enc = b '[' + host_enc + b ']'
if port = = self .default_port:
self .put_header( 'host' , host_enc)
else :
host_enc = host_enc.decode( 'ascii' )
self .put_header( 'host' , f '{host_enc}:{port}' )
|
4. 发送请求正文
我们接受两种形式的body数据: 一个基于io.iobase的可读文件对象, 或者是一个能通过迭代得到数据的对象. 在传输数据之前, 我们首先要确定数据是否采用分块传输:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
def request( self , method: str , url: str , headers: dict = none, body: union[io.iobase, iterable] = none,
encode_chunked: bool = false) - > none:
...
if 'content-length' not in header_names:
if 'transfer-encoding' not in header_names:
encode_chunked = false
content_length = self ._get_content_length(body, method)
if content_length is none:
if body is not none:
# 在这种情况下, body一般是个生成器或者可读文件之类的东西,应该分块传输
encode_chunked = true
self .put_header( 'transfer-encoding' , 'chunked' )
else :
self .put_header( 'content-length' , str (content_length))
else :
# 如果设置了transfer-encoding,则根据用户给的encode_chunked参数决定是否分块
pass
else :
# 只要给了content-length,那么一定不是分块传输
encode_chunked = false
...
@staticmethod
def _get_content_length(body: union[ str , bytes, bytearray, iterable, io.iobase], method: str ) - > optional[ int ]:
if body is none:
# put,post,patch三个方法默认是有body的
if method.upper() in _methods_expecting_body:
return 0
else :
return none
if hasattr (body, 'read' ):
return none
try :
# 对于bytes或者bytearray格式的数据,通过memoryview获取它的长度
return memoryview(body).nbytes
except typeerror:
pass
if isinstance (body, str ):
return len (body)
return none
|
在确定了是否分块之后, 就可以把正文发出去了. 如果body是一个可读文件的话, 就调用_read_readable方法把它封装为一个生成器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
def _send_body( self , message_body: union[ str , bytes, bytearray, iterable, io.iobase], encode_chunked: bool ) - > none:
if hasattr (message_body, 'read' ):
chunks = self ._read_readable(message_body)
else :
try :
memoryview(message_body)
except typeerror:
try :
chunks = iter (message_body)
except typeerror:
raise typeerror(
f 'message_body should be a bytes-like object or an iterable, got {repr(type(message_body))}' )
else :
# 如果是字节类型的,通过一次迭代把它发出去
chunks = (message_body,)
for chunk in chunks:
if not chunk:
continue
if encode_chunked:
chunk = f '{len(chunk):x}\r\n' .encode( 'ascii' ) + chunk + b '\r\n'
self .send(chunk)
if encode_chunked:
self .send(b '0\r\n\r\n' )
def _read_readable( self , readable: io.iobase) - > generator[bytes, none, none]:
need_encode = false
if isinstance (readable, io.textiobase):
need_encode = true
while true:
data_block = readable.read( self .block_size)
if not data_block:
break
if need_encode:
data_block = data_block.encode( 'utf-8' )
yield data_block
|
二. 获取响应数据
http响应报文的格式与请求报文大同小异, 它大致是这样的:
因此, 我们只要用httpconnection的socket对象读取服务器发送的数据, 然后按照上面的格式对数据进行解析就行了.
1. httpresponse类
我们首先定义一个简单的httpresponse类. 它的属性大致上就是socket的文件对象以及一些请求的信息等等, 调用它的begin方法来解析响应行和响应头的数据, 然后调用read方法读取响应正文:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
class httpresponse:
def __init__( self , sock: socket.socket, method: str = none) - > none:
self .fp = sock.makefile( 'rb' )
self ._method = method
self .headers = none
self .version = _unknown
self .status = _unknown
self .reason = _unknown
self .chunked = _unknown
self .chunk_left = _unknown
self .length = _unknown
self .will_close = _unknown
def begin( self ) - > none:
...
def read( self , amount: int = none) - > bytes:
...
|
2. 解析状态行
状态行的解析比较简单, 我们只需要读取响应的第一行数据, 然后把它解析为http协议版本,状态码和原因短语三部分就行了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
def _read_status( self ) - > tuple [ str , int , str ]:
line = str ( self ._read_line(), 'latin-1' )
if not line:
raise remotedisconnected( 'remote end closed connection without response' )
try :
version, status, reason = line.split(none, 2 )
except valueerror:
# reason只是给人看的, 一般和status对应, 所以它有可能不存在
try :
version, status = line.split(none, 1 )
reason = ''
except valueerror:
version, status, reason = ' ', ' ', ' '
if not version.startswith( 'http/' ):
self ._close_conn()
raise badstatusline(line)
try :
status = int (status)
if status < 100 or status > 999 :
raise badstatusline(line)
except valueerror:
raise badstatusline(line)
return version, status, reason.strip()
|
如果状态码为100, 则客户端需要解析多个响应状态行. 它的原理是这样的: 在请求数据过大的时候, 有的客户端会先不发送请求数据, 而是先在header中添加一个expect: 100-continue, 如果服务器愿意接收数据, 会返回100的状态码, 这时候客户端再把数据发过去. 因此, 如果读取到100的状态码, 那么后面往往还会收到一个正式的响应数据, 应该继续读取响应头. 这部分的代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
def begin( self ) - > none:
while true:
version, status, reason = self ._read_status()
if status ! = httpstatus. continue :
break
# 跳过100状态码部分的响应头
while true:
skip = self ._read_line().strip()
if not skip:
breakself.status = status
self .reason = reason
if version in ( 'http/1.0' , 'http/0.9' ):
self .version = 10
elif version.startswith( 'http/1.' ):
self .version = 11
else :
# http2还没研究, 这里就不写了
raise unknownprotocol(version)
...
|
3. 解析响应头
解析响应头比响应行还要简单. 因为每个header字段占一行, 我们只需要一直调用read_line方法读取字段, 直到读完header为止就行了.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
def _parse_header( self ) - > none:
headers = {}
while true:
line = self ._read_line()
if len (headers) > _max_headers:
raise httpexception( 'got more than %d headers' % _max_headers)
if line in _empty_line:
break
line = line.decode( 'latin-1' )
i = line.find( ':' )
if i = = - 1 :
raise badheaderline(line)
# 这里默认没有重名的情况
key, value = line[:i].lower(), line[i + 1 :].strip()
headers[key] = value
self .headers = headers
|
4. 接收响应正文
在接收响应正文之前, 首先要确定它的传输方式和长度:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
def _set_chunk( self ) - > none:
transfer_encoding = self .get_header( 'transfer-encoding' )
if transfer_encoding and transfer_encoding.lower() = = 'chunked' :
self .chunked = true
self .chunk_left = none
else :
self .chunked = false
def _set_length( self ) - > none:
# 首先要知道数据是否是分块传输的
if self .chunked = = _unknown:
self ._set_chunk()
# 如果状态码是1xx或者204(无响应内容)或者304(使用上次缓存的内容),则没有响应正文
# 如果这是个head请求,那么也不能有响应正文
if ( self .status = = httpstatus.no_content or
self .status = = httpstatus.not_modified or
100 < = self .status < 200 or
self ._method = = 'head' ):
self .length = 0
return
length = self .get_header( 'content-length' )
if length and not self .chunked:
try :
self .length = int (length)
except valueerror:
self .length = none
else :
if self .length < 0 :
self .length = none
else :
self .length = none
|
然后, 我们实现一个read方法, 从body中读取指定大小的数据:
1
2
3
4
5
6
7
8
9
|
def read( self , amount: int = none) - > bytes:
if self .is_closed():
return b''
if self ._method = = 'head' :
self .close()
return b''
if amount is none:
return self ._read_all()
return self ._read_amount(amount)
|
如果没有指定需要的数据大小, 就默认读取所有数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
def _read_all( self ) - > bytes:
if self .chunked:
return self ._read_all_chunk()
if self .length is none:
s = self .fp.read()
else :
try :
s = self ._read_bytes( self .length)
except incompleteread:
self .close()
raise
self .length = 0
self .close()
return s
def _read_all_chunk( self ) - > bytes:
assert self .chunked ! = _unknown
value = []
try :
while true:
chunk = self ._read_chunk()
if chunk is none:
break
value.append(chunk)
return b''.join(value)
except incompleteread:
raise incompleteread(b''.join(value))
def _read_chunk( self ) - > optional[bytes]:
try :
chunk_size = self ._read_chunk_size()
except valueerror:
raise incompleteread(b'')
if chunk_size = = 0 :
self ._read_and_discard_trailer()
self .close()
return none
chunk = self ._read_bytes(chunk_size)
# 每块的结尾会有一个\r\n,这里把它读掉
self ._read_bytes( 2 )
return chunk
def _read_chunk_size( self ) - > int :
line = self ._read_line(error_message = 'chunk size' )
i = line.find(b ';' )
if i > = 0 :
line = line[:i]
try :
return int (line, 16 )
except valueerror:
self .close()
raise
def _read_and_discard_trailer( self ) - > none:
# chunk的尾部可能会挂一些额外的信息,比如md5值,过期时间等等,一般会在header中用trailer字段说明
# 当chunk读完之后调用这个函数, 这些信息就先舍弃掉得了
while true:
line = self ._read_line(error_message = 'chunk size' )
if line in _empty_line:
break
|
否则的话, 就读取部分数据, 如果正好是分块数据的话, 就比较复杂了. 简单来说, 就是用bytearray制造一个所需大小的数组, 然后依次读取chunk把数据往里面填, 直到填满或者没数据为止. 然后用chunk_left记录下当前块剩余的量, 以便下次读取.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
def _read_amount( self , amount: int ) - > bytes:
if self .chunked:
return self ._read_amount_chunk(amount)
if isinstance ( self .length, int ) and amount > self .length:
amount = self .length
container = bytearray(amount)
n = self .fp.readinto(container)
if not n and container:
# 如果读不到字节了,也就可以关了
self .close()
elif self .length is not none:
self .length - = n
if not self .length:
self .close()
return memoryview(container)[:n].tobytes()
def _read_amount_chunk( self , amount: int ) - > bytes:
# 调用这个方法,读取amount大小的chunk类型数据,不足就全部读取
assert self .chunked ! = _unknown
total_bytes = 0
container = bytearray(amount)
mvb = memoryview(container)
try :
while true:
# mvb可以理解为容器的空的那一部分
# 这里一直调用_full_readinto把数据填进去,让mvb越来越小,同时记录填入的量
# 等没数据或者当前数据足够把mvb填满之后,跳出循环
chunk_left = self ._get_chunk_left()
if chunk_left is none:
break
if len (mvb) < = chunk_left:
n = self ._full_readinto(mvb)
self .chunk_left = chunk_left - n
total_bytes + = n
break
temp_mvb = mvb[:chunk_left]
n = self ._full_readinto(temp_mvb)
mvb = mvb[n:]
total_bytes + = n
self .chunk_left = 0
except incompleteread:
raise incompleteread(bytes(container[:total_bytes]))
return memoryview(container)[:total_bytes].tobytes()
def _full_readinto( self , container: memoryview) - > int :
# 返回读取的量.如果没能读满,这个方法会报警
amount = len (container)
n = self .fp.readinto(container)
if n < amount:
raise incompleteread(bytes(container[:n]), amount - n)
return n
def _get_chunk_left( self ) - > optional[ int ]:
# 如果当前块读了一半,那么直接返回self.chunk_left就行了
# 否则,有三种情况
# 1). chunk_left为none,说明body压根没开始读,于是返回当前这一整块的长度
# 2). chunk_left为0,说明这块读完了,于是返回下一块的长度
# 3). body数据读完了,返回none,顺便做好善后工作
chunk_left = self .chunk_left
if not chunk_left:
if chunk_left = = 0 :
# 如果剩余零,说明上一块已经读完了,这里把\r\n读掉
# 如果是none,就说明chunk压根没开始读
self ._read_bytes( 2 )
try :
chunk_left = self ._read_chunk_size()
except valueerror:
raise incompleteread(b'')
if chunk_left = = 0 :
self ._read_and_discard_trailer()
self .close()
chunk_left = none
self .chunk_left = chunk_left
return chunk_left
|
三. 复用tcp连接
http通信本质上是基于tcp连接发送和接收http请求和响应, 因此, 只要tcp连接不断开, 我们就可以继续用它进行http请求, 这样就避免了创建和销毁tcp连接产生的消耗.
1. 判断连接是否会断开
在下面几种情况中, 服务端会自动断开连接:
- http协议小于1.1且没有在头部设置了keep-alive
- http协议大于等于1.1但是在头部设置了connection: close
- 数据没有分块传输, 也没有说明数据的长度, 这种情况下, 服务器一般会在发送完成后断开连接, 让客户端知道数据发完了
根据上面列出来的几种情况, 通过下面的代码来判断连接是否会断开:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
def _check_close( self ) - > bool :
conn = self .get_header( 'connection' )
if not self .chunked and self .length is none:
return true
if self .version = = 11 :
if conn and 'close' in conn.lower():
return true
return false
else :
if self .headers.get( 'keep-alive' ):
return false
if conn and 'keep-alive' in conn.lower():
return false
return true
|
2. 正确地关闭httpresponse对象
由于tcp连接的复用, 一个httpconnection可以产生多个httpresponse对象, 而这些对象在同一个tcp连接上, 会共用这个连接的读缓冲区. 这就导致, 如果上一个httpresponse对象没有把它的那部分数据读完, 就会对下一个响应产生影响.
另一方面来看, 我们也需要及时地关闭与这个tcp关联的文件对象来避免占用资源. 因此, 我们定义如下的close方法关闭一个httpresponse对象:
1
2
3
4
5
6
7
8
9
10
|
def close( self ) - > none:
if self .is_closed():
return
fp = self .fp
self .fp = none
fp.close()
def is_closed( self ) - > bool :
return self .fp is none
|
用户调用httpresponse对象的read方法, 把缓冲区数据读完之后, 就会自动调用close方法(具体实现见上一章的第四节: 读取响应数据这部分). 因此, 在获取下一个响应数据之前, 我们只需要调用这个对象的is_closed方法, 就能判断读缓冲区是否已经读完, 能否继续接收响应了.
3. http请求的生命周期
不使用管道机制的话, 不同的http请求必须按次序进行, 相互之间不能重叠. 基于这个原因, 我们为httpconnection对象设置idle, req_started和req_sent三种状态, 一个完整的请求应该经历这几种状态:
根据上面的流程, 对httpconnection中对应的方法进行修改:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
def get_response( self ) - > httpresponse:
if self ._response and self ._response.is_closed():
self ._response = none
if self ._state ! = _cs_req_sent or self ._response:
raise responsenotready( self ._state)
response = httpresponse( self .sock, method = self ._method)
try :
try :
response.begin()
except connectionerror:
self .close()
raise
assert response.will_close ! = _unknown
self ._state = _cs_idle
if response.will_close:
self .close()
else :
self ._response = response
return response
except exception as _:
response.close()
raise
def put_request( self , method: str , url: str ) - > none:
# 调用这个函数开始新一轮的请求,它负责写好请求行输出到缓存里面去
# 调用它的前提是当前处于空闲状态
# 如果之前的response还在并且已结束,会自动把它消除掉
if self ._response and self ._response.is_closed():
self ._response = none
if self ._state = = _cs_idle:
self ._state = _cs_req_started
else :
raise cannotsendrequest( self ._state)
...
def put_header( self , header: union[bytes, str ], value: union[bytes, str , int ]) - > none:
if self ._state ! = _cs_req_started:
raise cannotsendheader()
...
def end_headers( self , message_body = none, encode_chunked = false) - > none:
if self ._state = = _cs_req_started:
self ._state = _cs_req_sent
else :
raise cannotsendheader()
...
|
需要注意的是, 如果第二个请求已经进入到获取响应的阶段了, 而上一个请求的响应还没关闭, 那么就应该直接报错, 否则读取到的会是上一个请求剩余的响应部分数据, 导致解析响应出现问题.
事实上, http1.1开始支持管道化技术, 也就是一次提交多个http请求, 然后等待响应, 而不是在接收到上一个请求的响应后, 才发送后面的请求.
基于这种处理模式, 管道化技术理论上可以减少io时间的损耗, 提升效率, 不过, 需要服务端的支持, 而且会增加程序的复杂程度, 这里就不实现了.
四. 总结
1. 完整代码
httpconnection的完整代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
|
class httpconnection:
default_port = 80
_http_vsn = 11
_http_vsn_str = 'http/1.1'
def __init__( self , host: str , port: int = none) - > none:
self .sock = none
self ._buffer = []
self .host = host
self .port = port if port is not none else self .default_port
self ._state = _cs_idle
self ._response = none
self ._method = none
self .block_size = 8192
def request( self , method: str , url: str , headers: dict = none, body: union[io.iobase, iterable] = none,
encode_chunked: bool = false) - > none:
self .put_request(method, url)
headers = headers or {}
header_names = frozenset (k.lower() for k in headers.keys())
if 'host' not in header_names:
self ._add_host(url)
if 'content-length' not in header_names:
if 'transfer-encoding' not in header_names:
encode_chunked = false
content_length = self ._get_content_length(body, method)
if content_length is none:
if body is not none:
encode_chunked = true
self .put_header( 'transfer-encoding' , 'chunked' )
else :
self .put_header( 'content-length' , str (content_length))
else :
# 如果设置了transfer-encoding,则根据用户给的encode_chunked参数决定是否分块
pass
else :
# 只要给了content-length,那么一定不是分块传输
encode_chunked = false
for hdr, value in headers.items():
self .put_header(hdr, value)
if isinstance (body, str ):
body = _encode(body)
self .end_headers(body, encode_chunked = encode_chunked)
def send( self , data: bytes) - > none:
if self .sock is none:
self .connect()
self .sock.sendall(data)
def get_response( self ) - > httpresponse:
if self ._response and self ._response.is_closed():
self ._response = none
if self ._state ! = _cs_req_sent or self ._response:
raise responsenotready( self ._state)
response = httpresponse( self .sock, method = self ._method)
try :
try :
response.begin()
except connectionerror:
self .close()
raise
assert response.will_close ! = _unknown
self ._state = _cs_idle
if response.will_close:
self .close()
else :
self ._response = response
return response
except exception as _:
response.close()
raise
def connect( self ) - > none:
self .sock = socket.create_connection(( self .host, self .port))
def close( self ) - > none:
self ._state = _cs_idle
try :
sock = self .sock
if sock:
self .sock = none
sock.close()
finally :
response = self ._response
if response:
self ._response = none
response.close()
def put_request( self , method: str , url: str ) - > none:
# 调用这个函数开始新一轮的请求,它负责写好请求行输出到缓存里面去
# 调用它的前提是当前处于空闲状态
# 如果之前的response还在并且已结束,会自动把它消除掉
if self ._response and self ._response.is_closed():
self ._response = none
if self ._state = = _cs_idle:
self ._state = _cs_req_started
else :
raise cannotsendrequest( self ._state)
self ._method = method
url = url or '/'
request = f '{method} {url} {self._http_vsn_str}'
self ._output(request)
def put_header( self , header: union[bytes, str ], value: union[bytes, str , int ]) - > none:
if self ._state ! = _cs_req_started:
raise cannotsendheader()
if hasattr (header, 'encode' ):
header = header.encode( 'ascii' )
if hasattr (value, 'encode' ):
value = value.encode( 'latin-1' )
elif isinstance (value, int ):
value = str (value).encode( 'ascii' )
header = header + b ': ' + value
self ._output(header)
def end_headers( self , message_body = none, encode_chunked = false) - > none:
if self ._state = = _cs_req_started:
self ._state = _cs_req_sent
else :
raise cannotsendheader()
self ._send_output(message_body, encode_chunked = encode_chunked)
def _add_host( self , url: str ) - > none:
# 所有http / 1.1请求报文中必须包含一个host头字段
# 如果用户没给,就调用这个函数来生成
netloc = ''
if url.startswith( 'http' ):
nil, netloc, nil, nil, nil = urlsplit(url)
if netloc:
try :
netloc_enc = netloc.encode( 'ascii' )
except unicodeencodeerror:
netloc_enc = netloc.encode( 'idna' )
self .put_header( 'host' , netloc_enc)
else :
host = self .host
port = self .port
try :
host_enc = host.encode( 'ascii' )
except unicodeencodeerror:
host_enc = host.encode( 'idna' )
# 对ipv6的地址进行额外处理
if host.find( ':' ) > = 0 :
host_enc = b '[' + host_enc + b ']'
if port = = self .default_port:
self .put_header( 'host' , host_enc)
else :
host_enc = host_enc.decode( 'ascii' )
self .put_header( 'host' , f '{host_enc}:{port}' )
def _output( self , s: union[ str , bytes]) - > none:
# 将数据添加到缓冲区
if hasattr (s, 'encode' ):
s = s.encode( 'latin-1' )
self ._buffer.append(s)
def _send_output( self , message_body = none, encode_chunked = false) - > none:
# 发送并清空缓冲数据.然后,如果有请求正文,就也顺便发送
self ._buffer.extend((b' ', b' '))
msg = b '\r\n' .join( self ._buffer)
self ._buffer.clear()
self .send(msg)
if message_body is not none:
self ._send_body(message_body, encode_chunked)
def _send_body( self , message_body: union[bytes, str , bytearray, iterable, io.iobase], encode_chunked: bool ) - > none:
if hasattr (message_body, 'read' ):
chunks = self ._read_readable(message_body)
else :
try :
memoryview(message_body)
except typeerror:
try :
chunks = iter (message_body)
except typeerror:
raise typeerror(
f 'message_body should be a bytes-like object or an iterable, got {repr(type(message_body))}' )
else :
# 如果是字节类型的,通过一次迭代把它发出去
chunks = (message_body,)
for chunk in chunks:
if not chunk:
continue
if encode_chunked:
chunk = f '{len(chunk):x}\r\n' .encode( 'ascii' ) + chunk + b '\r\n'
self .send(chunk)
if encode_chunked:
self .send(b '0\r\n\r\n' )
def _read_readable( self , readable: io.iobase) - > generator[bytes, none, none]:
need_encode = false
if isinstance (readable, io.textiobase):
need_encode = true
while true:
data_block = readable.read( self .block_size)
if not data_block:
break
if need_encode:
data_block = data_block.encode( 'utf-8' )
yield data_block
@staticmethod
def _get_content_length(body: union[ str , bytes, bytearray, iterable, io.iobase], method: str ) - > optional[ int ]:
if body is none:
# put,post,patch三个方法默认是有body的
if method.upper() in _methods_expecting_body:
return 0
else :
return none
if hasattr (body, 'read' ):
return none
try :
# 对于bytes或者bytearray格式的数据,通过memoryview获取它的长度
return memoryview(body).nbytes
except typeerror:
pass
if isinstance (body, str ):
return len (body)
return none
|
httpresponse的完整代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
|
class httpresponse:
def __init__( self , sock: socket.socket, method: str = none) - > none:
self .fp = sock.makefile( 'rb' )
self ._method = method
self .headers = none
self .version = _unknown
self .status = _unknown
self .reason = _unknown
self .chunked = _unknown
self .chunk_left = _unknown
self .length = _unknown
self .will_close = _unknown
def begin( self ) - > none:
if self .headers is not none:
return
self ._parse_status_line()
self ._parse_header()
self ._set_chunk()
self ._set_length()
self .will_close = self ._check_close()
def _read_line( self , limit: int = _max_line + 1 , error_message: str = '') - > bytes:
# 注意,这个方法默认不去除line尾部的\r\n
line = self .fp.readline(limit)
if len (line) > _max_line:
raise linetoolong(error_message)
return line
def _read_bytes( self , amount: int ) - > bytes:
data = self .fp.read(amount)
if len (data) < amount:
raise incompleteread(data, amount - len (data))
return data
def _parse_status_line( self ) - > none:
while true:
version, status, reason = self ._read_status()
if status ! = httpstatus. continue :
break
while true:
skip = self ._read_line(error_message = 'header line' ).strip()
if not skip:
break
self .status = status
self .reason = reason
if version in ( 'http/1.0' , 'http/0.9' ):
self .version = 10
elif version.startswith( 'http/1.' ):
self .version = 11
else :
raise unknownprotocol(version)
def _read_status( self ) - > tuple [ str , int , str ]:
line = str ( self ._read_line(error_message = 'status line' ), 'latin-1' )
if not line:
raise remotedisconnected( 'remote end closed connection without response' )
try :
version, status, reason = line.split(none, 2 )
except valueerror:
# reason只是给人看的, 和status对应, 所以它有可能不存在
try :
version, status = line.split(none, 1 )
reason = ''
except valueerror:
version, status, reason = ' ', ' ', ' '
if not version.startswith( 'http/' ):
self .close()
raise badstatusline(line)
try :
status = int (status)
if status < 100 or status > 999 :
raise badstatusline(line)
except valueerror:
raise badstatusline(line)
return version, status, reason.strip()
def _parse_header( self ) - > none:
headers = {}
while true:
line = self ._read_line(error_message = 'header line' )
if len (headers) > _max_headers:
raise httpexception( 'got more than %d headers' % _max_headers)
if line in _empty_line:
break
line = line.decode( 'latin-1' )
i = line.find( ':' )
if i = = - 1 :
raise badheaderline(line)
# 这里默认没有重名的情况
key, value = line[:i].lower(), line[i + 1 :].strip()
headers[key] = value
self .headers = headers
def _set_chunk( self ) - > none:
transfer_encoding = self .get_header( 'transfer-encoding' )
if transfer_encoding and transfer_encoding.lower() = = 'chunked' :
self .chunked = true
self .chunk_left = none
else :
self .chunked = false
def _set_length( self ) - > none:
# 首先要知道数据是否是分块传输的
if self .chunked = = _unknown:
self ._set_chunk()
# 如果状态码是1xx或者204(无响应内容)或者304(使用上次缓存的内容),则没有响应正文
# 如果这是个head请求,那么也不能有响应正文
assert isinstance ( self .status, int )
if ( self .status = = httpstatus.no_content or
self .status = = httpstatus.not_modified or
100 < = self .status < 200 or
self ._method = = 'head' ):
self .length = 0
return
length = self .get_header( 'content-length' )
if length and not self .chunked:
try :
self .length = int (length)
except valueerror:
self .length = none
else :
if self .length < 0 :
self .length = none
else :
self .length = none
def _check_close( self ) - > bool :
conn = self .get_header( 'connection' )
if not self .chunked and self .length is none:
return true
if self .version = = 11 :
if conn and 'close' in conn.lower():
return true
return false
else :
if self .headers.get( 'keep-alive' ):
return false
if conn and 'keep-alive' in conn.lower():
return false
return true
def close( self ) - > none:
if self .is_closed():
return
fp = self .fp
self .fp = none
fp.close()
def is_closed( self ) - > bool :
return self .fp is none
def read( self , amount: int = none) - > bytes:
if self .is_closed():
return b''
if self ._method = = 'head' :
self .close()
return b''
if amount is none:
return self ._read_all()
print (amount, amount is none)
return self ._read_amount(amount)
def _read_all( self ) - > bytes:
if self .chunked:
return self ._read_all_chunk()
if self .length is none:
s = self .fp.read()
else :
try :
s = self ._read_bytes( self .length)
except incompleteread:
self .close()
raise
self .length = 0
self .close()
return s
def _read_all_chunk( self ) - > bytes:
assert self .chunked ! = _unknown
value = []
try :
while true:
chunk = self ._read_chunk()
if chunk is none:
break
value.append(chunk)
return b''.join(value)
except incompleteread:
raise incompleteread(b''.join(value))
def _read_chunk( self ) - > optional[bytes]:
try :
chunk_size = self ._read_chunk_size()
except valueerror:
raise incompleteread(b'')
if chunk_size = = 0 :
self ._read_and_discard_trailer()
self .close()
return none
chunk = self ._read_bytes(chunk_size)
# 每块的结尾会有一个\r\n,这里把它读掉
self ._read_bytes( 2 )
return chunk
def _read_chunk_size( self ) - > int :
line = self ._read_line(error_message = 'chunk size' )
i = line.find(b ';' )
if i > = 0 :
line = line[:i]
try :
return int (line, 16 )
except valueerror:
self .close()
raise
def _read_and_discard_trailer( self ) - > none:
# chunk的尾部可能会挂一些额外的信息,比如md5值,过期时间等等,一般会在header中用trailer字段说明
# 当chunk读完之后调用这个函数, 这些信息就先舍弃掉得了
while true:
line = self ._read_line(error_message = 'chunk size' )
if line in _empty_line:
break
def _read_amount( self , amount: int ) - > bytes:
if self .chunked:
return self ._read_amount_chunk(amount)
if isinstance ( self .length, int ) and amount > self .length:
amount = self .length
container = bytearray(amount)
n = self .fp.readinto(container)
if not n and container:
# 如果读不到字节了,也就可以关了
self .close()
elif self .length is not none:
self .length - = n
if not self .length:
self .close()
return memoryview(container)[:n].tobytes()
def _read_amount_chunk( self , amount: int ) - > bytes:
# 调用这个方法,读取amount大小的chunk类型数据,不足就全部读取
assert self .chunked ! = _unknown
total_bytes = 0
container = bytearray(amount)
mvb = memoryview(container)
try :
while true:
# mvb可以理解为容器的空的那一部分
# 这里一直调用_full_readinto把数据填进去,让mvb越来越小,同时记录填入的量
# 等没数据或者当前数据足够把mvb填满之后,跳出循环
chunk_left = self ._get_chunk_left()
if chunk_left is none:
break
if len (mvb) < = chunk_left:
n = self ._full_readinto(mvb)
self .chunk_left = chunk_left - n
total_bytes + = n
break
temp_mvb = mvb[:chunk_left]
n = self ._full_readinto(temp_mvb)
mvb = mvb[n:]
total_bytes + = n
self .chunk_left = 0
except incompleteread:
raise incompleteread(bytes(container[:total_bytes]))
return memoryview(container)[:total_bytes].tobytes()
def _full_readinto( self , container: memoryview) - > int :
# 返回读取的量.如果没能读满,这个方法会报警
amount = len (container)
n = self .fp.readinto(container)
if n < amount:
raise incompleteread(bytes(container[:n]), amount - n)
return n
def _get_chunk_left( self ) - > optional[ int ]:
# 如果当前块读了一半,那么直接返回self.chunk_left就行了
# 否则,有三种情况
# 1). chunk_left为none,说明body压根没开始读,于是返回当前这一整块的长度
# 2). chunk_left为0,说明这块读完了,于是返回下一块的长度
# 3). body数据读完了,返回none,顺便做好善后工作
chunk_left = self .chunk_left
if not chunk_left:
if chunk_left = = 0 :
# 如果剩余零,说明上一块已经读完了,这里把\r\n读掉
# 如果是none,就说明chunk压根没开始读
self ._read_bytes( 2 )
try :
chunk_left = self ._read_chunk_size()
except valueerror:
raise incompleteread(b'')
if chunk_left = = 0 :
self ._read_and_discard_trailer()
self .close()
chunk_left = none
self .chunk_left = chunk_left
return chunk_left
def get_header( self , name, default: str = none) - > optional[ str ]:
if self .headers is none:
raise responsenotready()
return self .headers.get(name, default)
@property
def info( self ) - > str :
return repr ( self .headers)
|
这两个类应该放到同一个py文件中, 同时这个文件内还有其他一些辅助性质的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
|
import io
import socket
from typing import generator, iterable, optional, tuple , union
from urllib.parse import urlsplit
_cs_idle = 'idle'
_cs_req_started = 'request-started'
_cs_req_sent = 'request-sent'
_methods_expecting_body = { 'patch' , 'post' , 'put' }
_unknown = 'unknown'
_max_line = 65536
_max_headers = 100
_empty_line = (b '\r\n' , b '\n' , b'')
class httpstatus:
continue = 100
switching_protocols = 101
processing = 102
ok = 200
created = 201
accepted = 202
non_authoritative_information = 203
no_content = 204
reset_content = 205
partial_content = 206
multi_status = 207
already_reported = 208
im_used = 226
multiple_choices = 300
moved_permanently = 301
found = 302
see_other = 303
not_modified = 304
use_proxy = 305
temporary_redirect = 307
permanent_redirect = 308
bad_request = 400
unauthorized = 401
payment_required = 402
forbidden = 403
not_found = 404
method_not_allowed = 405
not_acceptable = 406
proxy_authentication_required = 407
request_timeout = 408
conflict = 409
gone = 410
length_required = 411
precondition_failed = 412
request_entity_too_large = 413
request_uri_too_long = 414
unsupported_media_type = 415
requested_range_not_satisfiable = 416
expectation_failed = 417
misdirected_request = 421
unprocessable_entity = 422
locked = 423
failed_dependency = 424
upgrade_required = 426
precondition_required = 428
too_many_requests = 429
request_header_fields_too_large = 431
unavailable_for_legal_reasons = 451
internal_server_error = 500
not_implemented = 501
bad_gateway = 502
service_unavailable = 503
gateway_timeout = 504
http_version_not_supported = 505
variant_also_negotiates = 506
insufficient_storage = 507
loop_detected = 508
not_extended = 510
network_authentication_required = 511
class httpresponse:
...
class httpconnection:
...
def _encode(data: str , encoding: str = 'latin-1' , name: str = 'data' ) - > bytes:
# 给请求正文等不知道能怎么转码的东西转码时用这个,默认使用latin-1编码
# 它的好处是,转码失败后能抛出详细的错误信息,一目了然
try :
return data.encode(encoding)
except unicodeencodeerror as err:
raise unicodeencodeerror(
err.encoding,
err. object ,
err.start,
err.end,
"{} ({:.20!r}) is not valid {}. use {}.encode('utf-8') if you want to send it encoded in utf-8." . format (
name.title(), data[err.start:err.end], encoding, name)
) from none
class httpexception(exception):
pass
class improperconnectionstate(httpexception):
pass
class cannotsendrequest(improperconnectionstate):
pass
class cannotsendheader(improperconnectionstate):
pass
class cannotclosestream(improperconnectionstate):
pass
class responsenotready(improperconnectionstate):
pass
class linetoolong(httpexception):
def __init__( self , line_type):
httpexception.__init__( self , 'got more than %d bytes when reading %s'
% (_max_line, line_type))
class badstatusline(httpexception):
def __init__( self , line):
if not line:
line = repr (line)
self .args = line,
self .line = line
class badheaderline(httpexception):
def __init__( self , line):
if not line:
line = repr (line)
self .args = line,
self .line = line
class remotedisconnected(connectionreseterror, badstatusline):
def __init__( self , * args, * * kwargs):
badstatusline.__init__( self , '')
connectionreseterror.__init__( self , * args, * * kwargs)
class unknownprotocol(httpexception):
def __init__( self , version):
self .args = version,
self .version = version
class unknowntransferencoding(httpexception):
pass
class incompleteread(httpexception):
def __init__( self , partial, expected = none):
self .args = partial,
self .partial = partial
self .expected = expected
def __repr__( self ):
if self .expected is not none:
e = f ', {self.expected} more expected'
else :
e = ''
return f '{self.__class__.__name__}({len(self.partial)} bytes read{e})'
__str__ = object .__str__
|
2. 需要注意的点
总的来说, 本文的内容不算复杂, 毕竟http属于不难理解, 但知识点很多很杂的类型. 这里把本文中一些需要注意的点总结一下:
- 请求和响应数据的结构大致相同, 都是状态行+头部+正文, 状态行和头部的每个字段都用一个\r\n分割, 与正文之间用两个分割;
- 状态行是必须的, 请求头则最少需要host这个字段, 同时为了大家的方便, 你最好也设置一下accept-encoding和accept来限制服务器返回给你的数据内容和格式;
- 正文不是必须的, 特别是对于除了3p(patch, post, put)之外的方法来说. 如果你有正文, 你最好在header中使用content-length说明正文的长度, 如果是分块发送, 则使用transfer-encoding字段说明;
- 如果对正文使用分块传输, 每块的格式是: 16进制的数据长度+\r\n+数据+\r\n, 使用0\r\n\r\n来收尾. 收尾之后, 你还可以放一个trailer, 里面放数据的md5值或者过期时间什么的, 这时候最好在header中设置trailer字段;
- 在一个请求的生命周期完成后, tcp连接是否会断开取决于三点: 响应数据的http版本, 响应头中的connection和keep-alive字段, 是否知道响应正文的长度;
- 最最重要的一点, http协议只是一个约定而非限制, 这就和矿泉水的建议零售价差不多, 你可以选择遵守, 也可以不遵守, 后果自负.
3. 结果测试
首先, 我们用tornado写一个简单的服务器, 它会显示客户端的地址和接口;
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import tornado.web
import tornado.ioloop
class indexhandler(tornado.web.requesthandler):
def get( self ) - > none:
print (f 'new connection from {self.request.connection.context.address}' )
self .write( 'hello world' )
app = tornado.web.application([(r '/' , indexhandler)])
app.listen( 8888 )
tornado.ioloop.ioloop.current().start()
|
然后, 使用我们刚写好的客户端进行测试:
1
2
3
4
5
6
7
8
9
10
11
12
|
from client import httpconnection
def fetch(conn: httpconnection, url: str = '') - > none:
conn.request( 'get' , url)
res = conn.get_response()
print (res.read())
connection = httpconnection( '127.0.0.1' , 8888 )
for i in range ( 10 ):
fetch(connection)
|
结果如下:
以上就是python用700行代码实现http客户端的详细内容,更多关于python http客户端的资料请关注服务器之家其它相关文章!
原文链接:https://www.cnblogs.com/q1214367903/p/13531859.html