python利用多核CPU实现mapreduce

时间:2021-01-05 20:39:34

1. 最近公司用有个比较奇怪的需求,需要在流水中查找某一条符合条件的流水记录,记录是在hdfs上的,按天存在文件中,但是文件都比较大,每天大概是25G的流水数据,现在提供刷卡回执单去查找该消费记录在我们hdfs上的对应的记录,从而可以找到某个信息(不能说是哪个。。。。)

2. 刷卡回执单我们可以找到卡号前6位、后四位,消费的时间,消费的金额,最初我是用管道来一行行排除的,就是cat xxx | grep xxx | grep xxx | grep xxx,但是这样大概需要10几分钟才能出结果

3. 显然不合理,比较好的做法是直接写个mapreduce(只涉及到map,reduce直接输出,这个我写了,先贴代码):

map.py

#!/usr/bin/env python
# vim: set fileencoding=utf-8
import sys
import os


def main():
    card_start = os.environ.get('card_start')
    card_last = os.environ.get('card_last')
    trans_at = float(os.environ.get('trans_at'))

    for line in sys.stdin:
        detail = line.strip().split(',')
        card = detail[0]
        money = float(detail[17])
        if trans_at == money and card_start == card[1 : 7] and card_last == card[-4 : ]:
            print '%s\t%s' % (line.strip(), detail[1])

if __name__ == '__main__':
    main()

reduce.py

#!/usr/bin/env python
# vim: set fileencoding=utf-8
import sys
import os


def main():
    for line in sys.stdin:
        detail = line.strip().split('\t')
        print '%s\t%s' % (detail[0], detail[1])

if __name__ == '__main__':
    main()

4. 其实这个需求并不麻烦,但是用线上的集群取跑难免会小题大做,然后想到了用python操作cpu多核去模拟mapreduce来做(其实这边也只涉及到map,如果要统计一些信息的话就需要reduce了)

5. 直接贴代码:

#!/usr/bin/env python
# vim: set fileencoding=utf-8

'''
Created on Jan 15, 2015

@author: qianjc
'''
import sys
import multiprocessing
import time

class TrMapReduce:
    
    def __init__(self, map_fun, reduce_fun, num_workers=None):
        '''
        map_fun: map函数,返回格式如: [(a, 1), (b, 1)]
        reduce_fun: reduce函数, 返回格式如: (c, 10) 
        num_workers: 使用的多进程个数, 如果不指定就是默认cpu的核数
        '''
        self.map_fun = map_fun
        self.reduce_fun = reduce_fun
        self.card_start = card_start
        self.card_last = card_last
        self.trans_at = trans_at
        self.pool = multiprocessing.Pool(num_workers)
        
    def __call__(self, inputs, chunksize=1):
        return self.pool.map(self.map_fun, inputs, chunksize=chunksize)

card_start = '@666666'
card_last = '1111'
trans_at = 0.0
def mapper(line):
    # 用来过滤流水
    '''
    根据卡号前6位、后四位以及消费金额寻找匹配的流水记录
    '''
    # print line.strip()
    detail = line.strip().split(',')
    card = detail[0]
    money = float(detail[17])
    
    global card_start
    global card_last
    global trans_at

    output = []
    if trans_at == money and card_start == card[0 : 7] and card_last == card[-4 : ]:
        print line.strip()

def reducer():
    # 目前用不到reduce
    pass

def main(argv):
    start = time.time()

    global card_start
    global card_last
    global trans_at
    card_start = '@' + str(argv[1])
    card_last = str(argv[2])
    trans_at = float(argv[3]) * 100


    trMapReduce = TrMapReduce(mapper, reducer)
    res = trMapReduce(sys.stdin, 10000)

    
    end = time.time()
    print 'cost time: ', end - start



if __name__ == '__main__':
    main(sys.argv)

主要参考:http://pymotw.com/2/multiprocessing/mapreduce.html


实际测试了的确是多核在工作,但是没有测试可以节约多少时间,我觉得应该时间上跟grep差不多,反正就当学了个思想吧!!!