1. 最近公司用有个比较奇怪的需求,需要在流水中查找某一条符合条件的流水记录,记录是在hdfs上的,按天存在文件中,但是文件都比较大,每天大概是25G的流水数据,现在提供刷卡回执单去查找该消费记录在我们hdfs上的对应的记录,从而可以找到某个信息(不能说是哪个。。。。)
2. 刷卡回执单我们可以找到卡号前6位、后四位,消费的时间,消费的金额,最初我是用管道来一行行排除的,就是cat xxx | grep xxx | grep xxx | grep xxx,但是这样大概需要10几分钟才能出结果
3. 显然不合理,比较好的做法是直接写个mapreduce(只涉及到map,reduce直接输出,这个我写了,先贴代码):
map.py
#!/usr/bin/env python # vim: set fileencoding=utf-8 import sys import os def main(): card_start = os.environ.get('card_start') card_last = os.environ.get('card_last') trans_at = float(os.environ.get('trans_at')) for line in sys.stdin: detail = line.strip().split(',') card = detail[0] money = float(detail[17]) if trans_at == money and card_start == card[1 : 7] and card_last == card[-4 : ]: print '%s\t%s' % (line.strip(), detail[1]) if __name__ == '__main__': main()
reduce.py
#!/usr/bin/env python # vim: set fileencoding=utf-8 import sys import os def main(): for line in sys.stdin: detail = line.strip().split('\t') print '%s\t%s' % (detail[0], detail[1]) if __name__ == '__main__': main()
4. 其实这个需求并不麻烦,但是用线上的集群取跑难免会小题大做,然后想到了用python操作cpu多核去模拟mapreduce来做(其实这边也只涉及到map,如果要统计一些信息的话就需要reduce了)
5. 直接贴代码:
#!/usr/bin/env python # vim: set fileencoding=utf-8 ''' Created on Jan 15, 2015 @author: qianjc ''' import sys import multiprocessing import time class TrMapReduce: def __init__(self, map_fun, reduce_fun, num_workers=None): ''' map_fun: map函数,返回格式如: [(a, 1), (b, 1)] reduce_fun: reduce函数, 返回格式如: (c, 10) num_workers: 使用的多进程个数, 如果不指定就是默认cpu的核数 ''' self.map_fun = map_fun self.reduce_fun = reduce_fun self.card_start = card_start self.card_last = card_last self.trans_at = trans_at self.pool = multiprocessing.Pool(num_workers) def __call__(self, inputs, chunksize=1): return self.pool.map(self.map_fun, inputs, chunksize=chunksize) card_start = '@666666' card_last = '1111' trans_at = 0.0 def mapper(line): # 用来过滤流水 ''' 根据卡号前6位、后四位以及消费金额寻找匹配的流水记录 ''' # print line.strip() detail = line.strip().split(',') card = detail[0] money = float(detail[17]) global card_start global card_last global trans_at output = [] if trans_at == money and card_start == card[0 : 7] and card_last == card[-4 : ]: print line.strip() def reducer(): # 目前用不到reduce pass def main(argv): start = time.time() global card_start global card_last global trans_at card_start = '@' + str(argv[1]) card_last = str(argv[2]) trans_at = float(argv[3]) * 100 trMapReduce = TrMapReduce(mapper, reducer) res = trMapReduce(sys.stdin, 10000) end = time.time() print 'cost time: ', end - start if __name__ == '__main__': main(sys.argv)
主要参考:http://pymotw.com/2/multiprocessing/mapreduce.html
实际测试了的确是多核在工作,但是没有测试可以节约多少时间,我觉得应该时间上跟grep差不多,反正就当学了个思想吧!!!