1、带证书上传文件
filename = '/tmp/test.cert'
hash_v = 'assumethisisahash'
#这是一种流式上传的方式
with open(filename, 'rb') as f:
....requests.post(link, data={'hash': hash_v}, files={'filename':f}, verify='/tmp/test.cert')
2、最简单的流式上传
with open('massive-body') as f:
requests.post('http://some.url/streamed', data=f)
3、块编码请求
def gen():
yield 'hi'
yield 'there' requests.post('http://some.url/chunked', data=gen())
其他内容可参见:http://www.ziliao1.com/Article/Show/05534046411C9B8866742DE312F126CB.html
4、复杂的流式上传,为了兼容中文文件,还对文件名称中途做了一个replace替换
def get_content_code(url, res_path, encoded_name, file_name):
with open(res_path, 'rb') as f_:
m = MultipartEncoder(
fields={'file': (encoded_name, f_,
'application/octet-stream')}
)
utils.logger.info('body:{},encoded_name:{},file_name:{}'.format(m,encoded_name,file_name)) decoded_m = m.to_string()
decoded_m=decoded_m.replace(encoded_name, file_name)
utils.logger.info('decoded_m:{}'.format(decoded_m))
utils.logger.info('url:{}'.format(url))
response = requests.post(url,
data=decoded_m,
headers={'Content-Type': m.content_type,
'charset': 'UTF-8'})
utils.logger.info('content: {}'.format(response.content)) try:
content = json.loads(response.content)
except ValueError:
content = response.content
return content, response.status_code