一、背景
1. Cassandra数据库中有2个Column Family,一个叫U,一个叫S。U表的主键是user_id;S表有如下4个字段:user_id,parent_path, path, json_text,其中主键是前三者。值得注意的是json_text字段是string类型,但其实存的是一个json结构,信息丰富。
2. ElasticSearch被用来存储一些元信息。1中所描述的user_id和path是被用来生成ElasticSearch所用的document_id. 而实际上,ElasticSearch中所存放的元信息的主要部分和json_text一样.
3. 现在的问题在于,ElasticSearch中的极少量数据可能会因为某种原因而遗失。为了恢复ElasticSearch中的信息,必须从Cassandra的S表中取得原始信息,进而插入ElasticSearch中。但是要先做判断 -- 当发现该条记录存在于Cassandra中但并不存在于ElasticSearch中时,才做恢复工作。
4. Cassandra使用的是很老的1.2版本,不支持直接的Paging。
5. 以上就是问题描述。现在已经有一个脚本可以来做这个恢复工作了,可惜这个脚本的Performance实在太差,需要进行优化。
看了这些,可能有人会问,在这里似乎不关U表什么事啊,为何要提它呢?这个问题,留到本文结束再说吧。这里给个简单的答案,是为了帮助Paging和并行处理。
二、原脚本的工作原理
1. 从Cassandra的U表通过间接Paging的方式(即使用token函数,如:SELECT user_id FROM U_cf WHERE token(user_id) > token(<last_user_id>) LIMIT page_size;) 依次从U表取得所有user_id;
2. 在每取得一个page之后,再去S表进行查询以获取path和json_text以备后续使用,如:SELECT user_id, path, json_text FROM S_cf where user_id = '<user_id>';
3. 使用某内定方法通过user_id和path算出document_id后,去ElasticSearch中查询该document_id是否存在,若不存在则说明遗失,那么将该信息的user_id,path和json_text全部存在内存中,待将来统一恢复。
原脚本伪代码如下:
missing_info = list()
while there_is_still_user:
for user_id in 500_users:
user_id, path, json_text = check_cass(user_id, S_cf)
doc_id = make_doc_id(user_id, path)
if not check_es(doc_id):
missing_info.append((user_id,path,json_text))
三、优化
1. 定位
优化的第一步需要定位出问题所在。笔者当时直接看去,发现的问题是,对于每一个user_id,都会去遍历一遍Cassandra的S表,似乎是个巨大的开销。比如,20万的用户,而S表是100万条记录,那么相当于对这100万条记录扫描了20万遍。这是直接感觉。但是实际上,笔者在这里获得的一个经验就是:
第一个经验:Performance到底差在哪儿,还是要用真实的数据说话。
因为代码本身用Python编写,我们可以使用profile或cProfile模块来帮助我们定位到底哪里耗时最多。这里就不上图了,直接说结论。
结论就是,访问ElasticSearch的时间占了所有时间的70%左右,而访问Cassandra的时间只占了约10%. 所以,这里获得的2个经验是:
第二个经验:除了磁盘访问,网络I/O确实也是相当厉害的耗时大户(这一点在当年Hadoop的学习中也有提到);
第三个经验:Cassandra比想象中的要快!
2. 第一个优化 -- 从ElasticSearch一次性获得所有document_id
既然为每一个document_id都要访问一次ElasticSearch,那么能否一次性取得所有的document_id呢?这样就可以将上亿次(假设有1亿个document_id)网络I/O减少到只有一次外加所有的本地检查。
那么要解决的第一个问题就是,怎么写这个ES的Query。这个费了笔者一点点时间,因为其中写出了一个错的Query,当时还自以为正确,结果怎么测结果都不对。最后发现问题所在,换了正确的Query就好了。分享如下:
curl -XPOST 'http://<ES_IP>:9200/_search?scroll=120m&size=10000&pretty' -d '
{
"query": {
"bool": {
"must": [
{
"term": {
"document.field1": "xxxxx"
}
}
],
"must_not": [
{
"term": {
"document.field2": "deleted"
}
}
]
}
},
"fields": ["_id"]
}'
# 使用上面的Query取得scroll_id,然后每次运行下面的Query直至取得所有的document_idcurl -XPOST http://<ES_IP>:9200/_search/scroll?pretty -d '<scroll_id>'
第四个经验:requests发ES的Query,得到结果比用pyes快很多。(这个经验或许值得商榷,因为时间有限,笔者没有深究pyes了。)
第一个优化做完之后,程序确实一下子性能提升了很多,提升了50%-60%吧。但是,这真的可行吗?
3. 推翻第一个优化! -- 内存限制
第一个优化其实并不真实可行。理由就是内存不够。实际中,一个document_id占用了22个字节,假设使用document_id作为key的Hash表的value部分仅仅占一个字节(比如True),那么一条记录本身也占了23个字节了。那么一亿条document_id组成的Hash表就会占2.3GB左右。另外,组内有高手告诉我,像Python这样的高级语言所实现的Hash表,比如dict,其实不是真正的Hash表。为了解决过大问题,其实是红黑树或类似红黑树的树形数据结构,那么所占空间就不止2.3GB了。笔者虽然尚未亲自求证这个说法,但即使是2.3GB,外加这个脚本在其他地方所耗的内存,在其运行环境中已经是难以接受的了。
这个问题的本质是,一个过大的Hash表,以致于大过了内存限制,应该怎么办呢?
看到这个问题,笔者不由得感觉到有些像百度的面试题了。不知道正解到底为何,笔者自己的想法如下:
1. 将该巨大Hash表的key排序后分成若干段小Hash表,比如20段。那么原来的2GB,就变成了每段约200MB了;内存中放X段,剩下20-X段放磁盘上;
2. 查询时,可以很快定位出要查的key位于这20段的到底哪一段。如果该段在内存中,那么直接查就好;若不再,则需要替换到内存中,再查。
很明显,这个方法要命的地方在于,如果要查的值很随机,那么这种磁盘内存间的替换将耗费大量时间。当然,你可以说,将要查的值也排个序,再查的时候就会大大减少替换的次数,即大大增加命中的概率。但是,这同样要耗费排序的时间,还会增加代码的复杂性。所以看起来不是个特别好的法子。
4. 第一个优化的新版 -- 一次取PAGE_SIZE个user_id的所有的document_id
既然不能一次取完所有的document_id,那么自然就少取点。取多少呢?就是每个Page里面所有user_id的相关的所有document_id. 这样只要把上面那个Query略微改改就行了。这个优化是可行的。
5. 第二个优化 -- 针对Cassandra,使用in
这个优化也是一个简单的优化,它能提升性能的30%左右。这个优化就是,使用in语句一次性取得PAGE_SIZE个用户的数据,而不用对每个用户都运行一遍CQL语句。
所以,原先的SELECT语句改成了类似如下这样:
SELECT user_id, path, json_text FROM S_cf WHERE user_id <span style="color:#3366ff;">in</span> (xxx, yyy, zzz, aaa, ...);
6. 第三个优化 -- 意料之外的一击(fast mode)
有同事提醒,在Cassandra中还有一个F表,该表记录了每个user_id拥有的某种特定信息。但是,通过
SELECT count(*) FROM F_cf WHERE user_id = <user_id>;
就能获得在正常情况下该用户在ElasticSearch中的所有document_id的数目。换句话说,取得这里的count(*)和数该用户在ES中的document_id数目,若二者一致,则在较大概率上说明了该用户在ElasticSearch中的信息没有丢失。
这个方法有可能极大提升速度。因为虽然count(*)对Cassandra来说也很耗时,但是还是比为该用户去遍历S表,再去下载document_id和比较,要快不少。
当然,该方法也有个缺陷,那就是,若该用户在ElasticSearch中丢失了A信息,而在Cassandra中丢失了B信息,那么算出来的count(*)和ES中的document_id的数目还是会一致的。不过这种概率还是相当小的。
为了这个方法,笔者特别为该脚本开发了一个选项,叫fast mode,意即采用这种方式先做判断,当判断不通过时再做普通的检查。
7. 第四个优化 -- 并行处理
并行处理是很容易想到的优化。在这里的并行处理可以有2种。
一、为每500个用户开一个线程或进程;
二、在上述线程或进程内,因为既要访问ElasticSearch(通过user_id取得其所有document_id),又要访问Cassandra,所以这又可以分成2个线程或进程。
那么到底采用线程还是进程,抑或协程呢?
首先,协程往往就意味着gevent,而据有的同事说,这个过于耗费CPU了,笔者就没有亲自试了。(笔者后面会试的。因为项目时间关系,这里较快地忽略了这种方法。但是想来,还是应该深入研究下去。)
其次,大家都知道,因为GIL的关系,普通的Python是不支持真正的线程的。所以很自然想到,使用多进程。可是多个子进程对父进程汇报数据,似乎不是那么直接。同样因为项目时间的关系,这里笔者又暂时没有深究该如何用Python去做了。写文件当然可以,可是很慢。似乎还有Queue和Pipe的方法,但似乎编程接口有点麻烦。
最后,回到多线程。其实没有想的那么糟。因为当一个线程在做I/O操作时,另一个线程可以将CPU利用起来。实际测试中,在测试数据集不大时,笔者看到的结果是:加了本节开头的2种并行优化的多线程方法后,性能提升约35%.
补充一下,使用的是:
from multiprocessing.pool import ThreadPool
另外,因为考虑到不要影响使用环境,这里的Pool Size用的是默认值,即CPU个数。
8. 总结
如果不考虑fast mode的优化(因为其不算真正的技术优化),用到的优化技术有:
1. 每PAGE_SIZE(如500)个用户集中做一次ElasticSearch的Query取document_id;
2. 用in语句来一次性从Cassandra查出所有500个用户的信息;
3. 用多线程做并行处理;
9. 一个问题
回到文章之初的问题,为何提及U表呢?其实,我们只需要遍历S表就可以了啊 -- 遍历S表,取得每条记录的user_id,path,json_text,然后计算出document_id,再去ElasticSearch中查找。这么看来,确实是不需要U表的。
但是一个很简单的回答是,笔者没有找到方法如何遍历S表。因为S表具有三个主键:user_id,parent_path,path,而笔者环境中的Cassandra是一个较老的版本1.2,并不支持Paging。笔者尝试过一些方法,可惜最终还是没找到如何遍历有多个主键的表的方法。
而U表的主键呢,就是user_id一个,这样我们可以利用token方法来模拟paging,产生一个Page又一个Page,每个Page里有500个user_id。然后利用这些user_id去遍历S表 --- S表中不会出现在U表中没有的user_id。
但是,回头来想,即使我们有办法遍历S表,我们很可能也不会去那么做。参看上节中的第一个优化,如果我们不能一次性取得多个document_id,那么网络的I/O会使得程序的执行速度相当之慢。