swift中创建对象,即PUT object,根据选定的存储策略将对象内容写入至相应的服务器(object server)。我们重点关注object controller和object servers之间的通信过程,其它从略。
在proxy-server上对client发送来的HTTP请求进行解析、wsgi环境变量进行设置、认证及相应错误处理过程从略。唯一需要说明的是,对外部client 通过HTTP请求发送来的对象(object),swift会将其具体内容存放于环境变量env中,即:self.environ['wsgi.input'] = WsgiStringIO(value)
对象创建最终会定位到BaseObjectController类中的PUT方法:
@public
@cors_validation
@delay_denial
def PUT(self, req):
"""HTTP PUT request handler."""
if req.if_none_match is not None and '*' not in req.if_none_match:
# Sending an etag with if-none-match isn't currently supported
return HTTPBadRequest(request=req, content_type='text/plain',
body='If-None-Match only supports *')
container_info = self.container_info(
self.account_name, self.container_name, req)
policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
container_info['storage_policy'])
obj_ring = self.app.get_object_ring(policy_index)
container_nodes = container_info['nodes']
container_partition = container_info['partition']
partition, nodes = obj_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
# pass the policy index to storage nodes via req header
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
req.acl = container_info['write_acl']
req.environ['swift_sync_key'] = container_info['sync_key']
# is request authorized
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
if not container_info['nodes']:
return HTTPNotFound(request=req)
# update content type in case it is missing
self._update_content_type(req)
# check constraints on object name and request headers
error_response = check_object_creation(req, self.object_name) or \
check_content_type(req)
if error_response:
return error_response
self._update_x_timestamp(req)
# check if versioning is enabled and handle copying previous version
self._handle_object_versions(req)
# check if request is a COPY of an existing object
source_header = req.headers.get('X-Copy-From')
if source_header:
error_response, req, data_source, update_response = \
self._handle_copy_request(req)
if error_response:
return error_response
else:
reader = req.environ['wsgi.input'].read
data_source = iter(lambda: reader(self.app.client_chunk_size), '')
update_response = lambda req, resp: resp
# check if object is set to be automaticaly deleted (i.e. expired)
req, delete_at_container, delete_at_part, \
delete_at_nodes = self._config_obj_expiration(req)
# add special headers to be handled by storage nodes
outgoing_headers = self._backend_requests(
req, len(nodes), container_partition, container_nodes,
delete_at_container, delete_at_part, delete_at_nodes)
# send object to storage nodes
resp = self._store_object(
req, data_source, nodes, partition, outgoing_headers)
return update_response(req, resp)
下面我们就从这七十来行代码来详细探讨此PUT操作究竟做了什么。其中错误处理和有些有明显注释的代码不深究,专注于主要流程。
1.获取node及partition信息
要执行PUT object,首先便要获取关于此object的必要信息,它从哪里来(前文提到过,env中直接暂存对象内容)、它要放到哪里去,同时也要知道它所在container的相应信息,因为PUT object会增加container内容,影响其元数据信息,故需要对相应存放container信息的数据表进行更新,这必然要涉及到存放container信息数据表partition及具体存放结点信息。
10-18行,是这一过程的具体代码。我们可以看到,继承自Controller类的container_info方法被首先调用,此函数相当于读取container元数据信息缓存,返回一个字典,包含相应container的partition、nodes及policy_index信息。即某一个具体container(容器)的所在区、存放结点列表、所使用的存储策略(副本or纠删码)。
还会根据object所在container具体存储策略,获得其object ring,然后根据ring中定义的映射规则,取得该object所在区及存放结点列表(line. 17-18)。
2. 认证、更新、检查
认证(line. 26-29),根据env中'swift.authorize'存放的认证函数,校验token,确认请求的合法性。
更新请求信息(line. 21-23,35,43),更新HTTP请求中相应信息,包括content_type(内容类型)、timestamp(时间戳)等等。
检查,包括检查object命名与请求是否合法(line. 38-41),检查object版本信息(line. 46),检查是否从已有对象拷贝(line. 49-58),检查是否设定自动删除时间(line. 61-62)。
虽然没有研究过关于对象自动删除的代码,但我猜想对象的自动删除要额外启动服务进程(类似于swift-object-server之类),定期对所有servers上object delete_time进行轮询,到达相应截止时间后删除相应对象。
还有一点需要注意的便是,若要创建的object不是从已有对象拷贝,将定义一个对object内容进行读取的迭代器data_source(line. 56-57)。
3.生成请求头
虽然生成请求头和接下来的上传数据在上面的PUT代码中,均只封装成一个函数,但它们会调用其它许多函数。这两个流程也是PUT object操作的核心。
先附上65行_backend_request()
函数的源码:
def _backend_requests(self, req, n_outgoing,
container_partition, containers,
delete_at_container=None, delete_at_partition=None,
delete_at_nodes=None):
headers = [self.generate_request_headers(req,additional=req.headers)
for _junk in range(n_outgoing)]
for i, container in enumerate(containers):
i = i % len(headers)
headers[i]['X-Container-Partition'] = container_partition
headers[i]['X-Container-Host'] = csv_append(
headers[i].get('X-Container-Host'),
'%(ip)s:%(port)s' % container)
headers[i]['X-Container-Device'] = csv_append(
headers[i].get('X-Container-Device'),
container['device'])
for i, node in enumerate(delete_at_nodes or []):
i = i % len(headers)
headers[i]['X-Delete-At-Container'] = delete_at_container
headers[i]['X-Delete-At-Partition'] = delete_at_partition
headers[i]['X-Delete-At-Host'] = csv_append(
headers[i].get('X-Delete-At-Host'),
'%(ip)s:%(port)s' % node)
headers[i]['X-Delete-At-Device'] = csv_append(
headers[i].get('X-Delete-At-Device'),
node['device'])
return headers
这个函数的目的在于,针对要上传的object所在container的存放信息,生成若干个HTTP请求头,用于object上传成功后container信息的更新。object存放在多少个node上,便生成多少相应的HTTP请求头,即len(nodes)
。可以预料,对container信息的更新,将会在接收PUT object的若干个object-server上并发。
在_backend_request()
函数中,首先生成最基本的HTTP请求头列表headers
, headers = [header_for_node1, header_for_node2,......]
,其中的每一个元素都是一个完整的HTTP请求头字典,最初的请求头仅包括时间戳、x-trans-id、user-agent等信息。
接下来的两个for循环,完成的工作类似,第一个for循环将container更新所需要的信息平均分配给headers列表中的header;第二个for循环将定时删除所需要的信息平均分配给headers中的header。我们重点讲讲第一个for循环,第二个,类比即可。
line. 9的取余操作,保证了将container信息更新平均分配至存放该object的结点进行并发,若container信息存放的结点数小于存放object的结点,那么headers列表中后面的一些header将只有基本初始化信息,接收这些header的node不用负责container信息的更新。
line. 11进行container partition信息的设定,每个container的partition信息均相同,所以不用担心它们会打架(被覆盖)。
line. 12-14和line. 15-17分别设定header中需要更新的container信息所在的主机地址(X-Container-Host
)和设备名(X-Container-Device
)。container信息是通过数据表存放的,所以container信息的更新最终会化为对相应服务器相应设备上数据表的update操作。csv_append
函数会将不同Host和Device用逗号隔开,相应node接收到这样的header后,将会利用两者信息定位到所有相应数据表,完成update。
通过这整个for循环,将每个存放object的node更新container信息的进行了分配和设定,简单明了。
4.上传数据
_store_object()
是PUT流程中最复杂也最重要的函数,直接负责向相应的存储结点上传数据。值得一提的是,使用纠删码作为存储策略的话,将会重定义_store_object()
函数。这里仅就原有_store_object()
进行探讨,重定义的_store_object()
留待以后讨论。_store_object()
执行具体流程如下:
def _store_object(self, req, data_source, nodes, partition,
outgoing_headers):
policy_index = req.headers.get('X-Backend-Storage-Policy-Index')
policy = POLICIES.get_by_index(policy_index)
if not nodes:
return HTTPNotFound()
# RFC2616:8.2.3 disallows 100-continue without a body
if (req.content_length > 0) or req.is_chunked:
expect = True
else:
expect = False
conns = self._get_put_connections(req, nodes, partition,
outgoing_headers, policy, expect)
min_conns = quorum_size(len(nodes))
try:
# check that a minimum number of connections were established and
# meet all the correct conditions set in the request
self._check_failure_put_connections(conns, req, nodes, min_conns)
# transfer data
self._transfer_data(req, data_source, conns, nodes)
# get responses
statuses, reasons, bodies, etags = self._get_put_responses(
req, conns, nodes)
except HTTPException as resp:
return resp
finally:
for conn in conns:
conn.close()
if len(etags) > 1:
self.app.logger.error(
_('Object servers returned %s mismatched etags'), len(etags))
return HTTPServerError(request=req)
etag = etags.pop() if len(etags) else None
resp = self.best_response(req, statuses, reasons, bodies,
_('Object PUT'), etag=etag)
resp.last_modified = math.ceil(
float(Timestamp(req.headers['X-Timestamp'])))
return resp
其中除第一个操作获取存储策略之外,其它每个流程都对应一个具体的函数。_get_put_connections
建立从proxy-server到存放object的node之间的http连接,其中利用eventlet中的GreenPile实现并发:
pile = GreenPile(len(nodes))
for nheaders in outgoing_headers:
if expect:
nheaders['Expect'] = '100-continue'
pile.spawn(self._connect_put_node, node_iter, partition,
req.swift_entity_path, nheaders,
self.app.logger.thread_locals)
具体connection的建立,在_connect_put_node()
中:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
'PUT', path, headers)
其中,node_iter迭代器在不同的nodes之间进行遍历,创建链接失败即跳过,保证min_conns个链接建立成功即可PUT数据。但这里有两个问题:(1)node_iter如何在多个_get_put_node间起作用,难道是和协程的实际顺序执行有关?(2)创建链接数满足min_conns可传输数据,但要保证数据指定冗余,数据必须存放在所链接所有结点上,那么链接不成功那些结点上数据什么时候以何种方式上传?(关于这个问题,研究了半天之后,发现它和swift中采用的NWR策略相关,至于剩余冗余的上传初步估计是额外的数据同步服务直接执行对象的拷贝)
接下来对建立的connections进行检查,查看其返回状态,及_check_min_conn()
验证成功创建的连接是否满足最低需求,错误则直接返回异常状态码。_transfer_data()
在上传数据的过程中直接使用了继承自GreenPool类的ContextPool类,ContextPool类增添了退出时杀死正在运行的协程的方法。_transfer_data()
中使用队列(Queue)定义缓冲区暂存上传的数据,其中使用绿色线程池(ContextPool)定义多个协程执行_send_file()
将Queue中chunk传输至建立的HTTP链接,每个storage _transfer_data()
后面的过程便是将上传的object数据写入conns队列中。node和proxy之间建立的HTTP链接conn都拥有独立的队列,副本模式下数据同时写入活动的诸多conns中队列。
for conn in list(conns):
if not conn.failed:
conn.queue.put('%x\r\n%s\r\n' % (len(chunk), chunk)
if req.is_chunked else chunk)
最后获得响应,从获得的诸多response中选取一定数量的(quorum_size)最优响应,生成对用户的响应返回,PUT object至此终结。
通过对PUT Object这一过程的学习,发现自己对ring部分代码非常陌生,有些地方完全弄不明白,映射关系和node管理完全靠ring的原理猜测。比如三副本存取且整个系统仅有一个storage node,那么系统会报错吗?如果正常运行node及node_iter又是何种状态?
特别是后期,我还打算对ring进行功能扩展,代码都没弄懂就谈扩展,无异痴人说梦。
还有,NWR策略相关内容还要进一步研究,整理出博文。