流水账 - 对某脚本优化的记录

时间:2022-12-31 04:51:11

一、背景

1. Cassandra数据库中有2个Column Family,一个叫U,一个叫S。U表的主键是user_id;S表有如下4个字段:user_id,parent_path, path, json_text,其中主键是前三者。值得注意的是json_text字段是string类型,但其实存的是一个json结构,信息丰富。

2. ElasticSearch被用来存储一些元信息。1中所描述的user_id和path是被用来生成ElasticSearch所用的document_id. 而实际上,ElasticSearch中所存放的元信息的主要部分和json_text一样. 

3. 现在的问题在于,ElasticSearch中的极少量数据可能会因为某种原因而遗失。为了恢复ElasticSearch中的信息,必须从Cassandra的S表中取得原始信息,进而插入ElasticSearch中。但是要先做判断 -- 当发现该条记录存在于Cassandra中但并不存在于ElasticSearch中时,才做恢复工作。

4. Cassandra使用的是很老的1.2版本,不支持直接的Paging。

5. 以上就是问题描述。现在已经有一个脚本可以来做这个恢复工作了,可惜这个脚本的Performance实在太差,需要进行优化。


看了这些,可能有人会问,在这里似乎不关U表什么事啊,为何要提它呢?这个问题,留到本文结束再说吧。这里给个简单的答案,是为了帮助Paging和并行处理。


二、原脚本的工作原理

1. 从Cassandra的U表通过间接Paging的方式(即使用token函数,如:SELECT user_id FROM U_cf WHERE token(user_id) > token(<last_user_id>) LIMIT page_size;) 依次从U表取得所有user_id;

2. 在每取得一个page之后,再去S表进行查询以获取path和json_text以备后续使用,如:SELECT user_id, path, json_text FROM S_cf where user_id = '<user_id>'; 

3. 使用某内定方法通过user_id和path算出document_id后,去ElasticSearch中查询该document_id是否存在,若不存在则说明遗失,那么将该信息的user_id,path和json_text全部存在内存中,待将来统一恢复。


原脚本伪代码如下:

missing_info = list()
while there_is_still_user:
for user_id in 500_users:
user_id, path, json_text = check_cass(user_id, S_cf)
doc_id = make_doc_id(user_id, path)
if not check_es(doc_id):
missing_info.append((user_id,path,json_text))



三、优化

1. 定位

优化的第一步需要定位出问题所在。笔者当时直接看去,发现的问题是,对于每一个user_id,都会去遍历一遍Cassandra的S表,似乎是个巨大的开销。比如,20万的用户,而S表是100万条记录,那么相当于对这100万条记录扫描了20万遍。这是直接感觉。但是实际上,笔者在这里获得的一个经验就是:

第一个经验:Performance到底差在哪儿,还是要用真实的数据说话。

因为代码本身用Python编写,我们可以使用profile或cProfile模块来帮助我们定位到底哪里耗时最多。这里就不上图了,直接说结论。

结论就是,访问ElasticSearch的时间占了所有时间的70%左右,而访问Cassandra的时间只占了约10%. 所以,这里获得的2个经验是:

第二个经验:除了磁盘访问,网络I/O确实也是相当厉害的耗时大户(这一点在当年Hadoop的学习中也有提到);

第三个经验:Cassandra比想象中的要快!


2. 第一个优化 -- 从ElasticSearch一次性获得所有document_id

既然为每一个document_id都要访问一次ElasticSearch,那么能否一次性取得所有的document_id呢?这样就可以将上亿次(假设有1亿个document_id)网络I/O减少到只有一次外加所有的本地检查。 

那么要解决的第一个问题就是,怎么写这个ES的Query。这个费了笔者一点点时间,因为其中写出了一个错的Query,当时还自以为正确,结果怎么测结果都不对。最后发现问题所在,换了正确的Query就好了。分享如下:

curl -XPOST 'http://<ES_IP>:9200/_search?scroll=120m&size=10000&pretty' -d '
{
"query": {
"bool": {
"must": [
{
"term": {
"document.field1": "xxxxx"
}
}
],
"must_not": [
{
"term": {
"document.field2": "deleted"
}
}
]
}
},
"fields": ["_id"]
}'

# 使用上面的Query取得scroll_id,然后每次运行下面的Query直至取得所有的document_idcurl -XPOST http://<ES_IP>:9200/_search/scroll?pretty -d '<scroll_id>'


当然,以上我加了一些限制条件。这是真实环境所需的。比较奇怪的一件事情是,在写完了Query,再用pyes来实现的时候,发现非常慢,但是curl命令确实是很快。最后不得已,笔者在脚本中使用了Python的requests模块来做。

第四个经验:requests发ES的Query,得到结果比用pyes快很多。(这个经验或许值得商榷,因为时间有限,笔者没有深究pyes了。)


第一个优化做完之后,程序确实一下子性能提升了很多,提升了50%-60%吧。但是,这真的可行吗?


3. 推翻第一个优化! -- 内存限制

第一个优化其实并不真实可行。理由就是内存不够。实际中,一个document_id占用了22个字节,假设使用document_id作为key的Hash表的value部分仅仅占一个字节(比如True),那么一条记录本身也占了23个字节了。那么一亿条document_id组成的Hash表就会占2.3GB左右。另外,组内有高手告诉我,像Python这样的高级语言所实现的Hash表,比如dict,其实不是真正的Hash表。为了解决过大问题,其实是红黑树或类似红黑树的树形数据结构,那么所占空间就不止2.3GB了。笔者虽然尚未亲自求证这个说法,但即使是2.3GB,外加这个脚本在其他地方所耗的内存,在其运行环境中已经是难以接受的了。

这个问题的本质是,一个过大的Hash表,以致于大过了内存限制,应该怎么办呢?

看到这个问题,笔者不由得感觉到有些像百度的面试题了。不知道正解到底为何,笔者自己的想法如下:

1. 将该巨大Hash表的key排序后分成若干段小Hash表,比如20段。那么原来的2GB,就变成了每段约200MB了;内存中放X段,剩下20-X段放磁盘上;

2. 查询时,可以很快定位出要查的key位于这20段的到底哪一段。如果该段在内存中,那么直接查就好;若不再,则需要替换到内存中,再查。

很明显,这个方法要命的地方在于,如果要查的值很随机,那么这种磁盘内存间的替换将耗费大量时间。当然,你可以说,将要查的值也排个序,再查的时候就会大大减少替换的次数,即大大增加命中的概率。但是,这同样要耗费排序的时间,还会增加代码的复杂性。所以看起来不是个特别好的法子。


4. 第一个优化的新版 -- 一次取PAGE_SIZE个user_id的所有的document_id

既然不能一次取完所有的document_id,那么自然就少取点。取多少呢?就是每个Page里面所有user_id的相关的所有document_id. 这样只要把上面那个Query略微改改就行了。这个优化是可行的。


5. 第二个优化 -- 针对Cassandra,使用in

这个优化也是一个简单的优化,它能提升性能的30%左右。这个优化就是,使用in语句一次性取得PAGE_SIZE个用户的数据,而不用对每个用户都运行一遍CQL语句。

所以,原先的SELECT语句改成了类似如下这样:

SELECT user_id, path, json_text FROM S_cf WHERE user_id <span style="color:#3366ff;">in</span> (xxx, yyy, zzz, aaa, ...);


6. 第三个优化 -- 意料之外的一击(fast mode)

有同事提醒,在Cassandra中还有一个F表,该表记录了每个user_id拥有的某种特定信息。但是,通过

SELECT count(*) FROM F_cf WHERE user_id = <user_id>;

就能获得在正常情况下该用户在ElasticSearch中的所有document_id的数目。换句话说,取得这里的count(*)和数该用户在ES中的document_id数目,若二者一致,则在较大概率上说明了该用户在ElasticSearch中的信息没有丢失。

这个方法有可能极大提升速度。因为虽然count(*)对Cassandra来说也很耗时,但是还是比为该用户去遍历S表,再去下载document_id和比较,要快不少。

当然,该方法也有个缺陷,那就是,若该用户在ElasticSearch中丢失了A信息,而在Cassandra中丢失了B信息,那么算出来的count(*)和ES中的document_id的数目还是会一致的。不过这种概率还是相当小的。

为了这个方法,笔者特别为该脚本开发了一个选项,叫fast mode,意即采用这种方式先做判断,当判断不通过时再做普通的检查。


7. 第四个优化 -- 并行处理

并行处理是很容易想到的优化。在这里的并行处理可以有2种。

一、为每500个用户开一个线程或进程;

二、在上述线程或进程内,因为既要访问ElasticSearch(通过user_id取得其所有document_id),又要访问Cassandra,所以这又可以分成2个线程或进程。


那么到底采用线程还是进程,抑或协程呢?

首先,协程往往就意味着gevent,而据有的同事说,这个过于耗费CPU了,笔者就没有亲自试了。(笔者后面会试的。因为项目时间关系,这里较快地忽略了这种方法。但是想来,还是应该深入研究下去。)

其次,大家都知道,因为GIL的关系,普通的Python是不支持真正的线程的。所以很自然想到,使用多进程。可是多个子进程对父进程汇报数据,似乎不是那么直接。同样因为项目时间的关系,这里笔者又暂时没有深究该如何用Python去做了。写文件当然可以,可是很慢。似乎还有Queue和Pipe的方法,但似乎编程接口有点麻烦。

最后,回到多线程。其实没有想的那么糟。因为当一个线程在做I/O操作时,另一个线程可以将CPU利用起来。实际测试中,在测试数据集不大时,笔者看到的结果是:加了本节开头的2种并行优化的多线程方法后,性能提升约35%. 

补充一下,使用的是:

from multiprocessing.pool import ThreadPool

另外,因为考虑到不要影响使用环境,这里的Pool Size用的是默认值,即CPU个数。


8. 总结

如果不考虑fast mode的优化(因为其不算真正的技术优化),用到的优化技术有:

1. 每PAGE_SIZE(如500)个用户集中做一次ElasticSearch的Query取document_id;

2. 用in语句来一次性从Cassandra查出所有500个用户的信息;

3. 用多线程做并行处理;


9. 一个问题

回到文章之初的问题,为何提及U表呢?其实,我们只需要遍历S表就可以了啊 -- 遍历S表,取得每条记录的user_id,path,json_text,然后计算出document_id,再去ElasticSearch中查找。这么看来,确实是不需要U表的。

但是一个很简单的回答是,笔者没有找到方法如何遍历S表。因为S表具有三个主键:user_id,parent_path,path,而笔者环境中的Cassandra是一个较老的版本1.2,并不支持Paging。笔者尝试过一些方法,可惜最终还是没找到如何遍历有多个主键的表的方法。

而U表的主键呢,就是user_id一个,这样我们可以利用token方法来模拟paging,产生一个Page又一个Page,每个Page里有500个user_id。然后利用这些user_id去遍历S表 --- S表中不会出现在U表中没有的user_id。


但是,回头来想,即使我们有办法遍历S表,我们很可能也不会去那么做。参看上节中的第一个优化,如果我们不能一次性取得多个document_id,那么网络的I/O会使得程序的执行速度相当之慢。