本节在数据量比较大的情况下,对比esproc和python。
数据量:7000多条万记录,5个字段分别是orderid,clientid,sellerid,amount,date。总大小超过3G。
1. 筛选8月份的交易记录
esproc
A | |
1 | =now() |
2 | =file("E:\\orders_big_data\\orders.csv")[email protected]() |
3 | =A2.select(month(date)==8).fetch() |
4 | [email protected](A1,now()) |
A2:f.cursor()
根据文件f创建游标并返回,数据扫描完将自动关闭游标。@t, f中第一行记录作为字段名,不使用本选项时默认使用_1,_2,…作为字段名. @c, 无s时用逗号分隔。如果同时有s则用s分隔。
A3:筛选出8月份的订单记录并取出结果
esproc并行代码:
A | |
1 | =now() |
2 | =file("E:\\orders_big_data\\orders.csv")[email protected](;16) |
3 | =A2.select(month(date)==8).fetch() |
4 | [email protected](A1,now()) |
A2:[email protected](;n),@m选项,返回成多路游标,n表示路数。这时结果可能改变原来数据的顺序(筛选数据大多数情况下也不需要保持原序)。
python:
import time
import pandas as pd
import numpy as np
s = time.time()
chunksize=1000000
order_data = pd.read_csv('E:\\orders_big_data\\orders.csv',iterator=True,chunksize=chunksize)
i = 0
month_8_list = []
for chunk in order_data:
chunk['date'] = pd.to_datetime(chunk['date'])
chunk_month_8 = chunk[chunk['date'].dt.month==8]
month_8_list.append(chunk_month_8)
month_8 = pd.concat(month_8_list,ignore_index=True)
print(month_8)
e = time.time()
print(e-s)
定义chunksize大小为1000000万条记录。
pd.read_csv(fileorbuf,iterator,chunksize) iterator,返回一个TextFileReader 对象,以便逐块处理文件,chunksize文件块大小。
循环读取文件,每次都取chunksize的大小,筛选出8月份的记录,放入初始化的list中。
合并list中的dataframe得到结果。
pandas本身不支持并行,所以这里没有python的并行测试。
结果:
esproc单线程
esproc并行:
python
并行数 | 耗时 | |
esproc | 不并行 | 121.068 |
esproc | 2 | 82.117 |
esproc | 4 | 68.897 |
python | 不并行 | 85.726 |
2. 计算出每个销售员每年的销售额和订单数
esproc
A | |
1 | =now() |
2 | =file("E:\\orders_big_data\\orders.csv")[email protected](sellerid,date,amount) |
3 | =A2.groups(sellerid,year(date):y;sum(amount):amount,count(~):count) |
4 | [email protected](A1,now()) |
A2:按照sellerid和年份进行分组,同时汇总amount和count数。只取出计算需要的字段。
esproc并行代码
A | |
1 | =now() |
2 | =file("E:\\orders_big_data\\orders.csv")[email protected](sellerid,date,amount;4) |
3 | =A2.groups(sellerid,year(date):y;sum(amount):amount,count(~):count) |
4 | [email protected](A1,now()) |
python:
import time
import pandas as pd
import numpy as np
s = time.time()
chunksize=1000000
order_data = pd.read_csv('E:\\orders_big_data\\orders.csv',usecols=['sellerid','date','amount'],iterator=True,chunksize=chunksize)
chunk_g_list = []
for chunk in order_data:
chunk['date'] = pd.to_datetime(chunk['date'])
chunk['y'] = chunk['date'].dt.year
chunk_g = chunk.groupby(by=['sellerid','y']).agg(['sum','count']).reset_index()
chunk_g_list.append(chunk_g)
order_group = pd.concat(chunk_g_list,ignore_index=True).groupby(by=['sellerid','y'],as_index=False).agg({('amount','sum'):['sum'],('amount','count'):['sum']})
order_group.columns = ['sellerid','y','amount','count']
print(order_group)
e = time.time()
print(e-s)
定义chunksize为100万
pd.read_csv(fileorbuf,usecols,iterator,chunksize)usecols是取出需要的字段。
按照chunksize循环读取数据
转换date字段的格式为pandas的datetime格式
新增一列年份y
df.groupby(by),按照sellerid和y进行分组。df.agg(),对分组数据进行多种计算。这里是对分组数据amount进行sum和count计算。
将结果放入list中
合并list中的df,然后再按照sellerid和y分组同时计算amount的sum值,(amount,count)的sum值。
结果:
esproc和esproc并行
python
并行数 | 耗时 | |
esproc | 不并行 | 113.417 |
esproc | 2 | 77.001 |
esproc | 4 | 54.078 |
python | 不并行 | 83.724 |
3. 列出客户名单
esproc
A | |
1 | =now() |
2 | =file("E:\\orders_big_data\\orders.csv")[email protected](clientid) |
3 | =A2.id(clientid) |
4 | [email protected](A1,now()) |
A2:游标读取clientid字段
A3:cs.id(x)获取游标cs中字段x的不同值形成的序列。
python:
import time
import pandas as pd
import numpy as np
s = time.time()
chunksize=1000000
order_data = pd.read_csv('E:\\orders_big_data\\orders.csv',usecols = ['clientid'],iterator=True,chunksize=chunksize)
client_set = set()
for chunk in order_data:
client_set = client_set|set(chunk['clientid'].values)
print(pd.Series(list(client_set)))
e = time.time()
print(e-s)
定义chunksize,100万
pd.read_csv(),usecols参数是读取的字段,这里只读取clientid字段。
定义一个集合
按照chunksize循环数据,取chunk的clientid字段的值组成集合并与原来的集合求并集,最终的集合即为客户的名单.
为了便于查看将其转成series结构
结果:
esproc
python
耗时 | |
esproc | 48.582 |
python | 57.797 |
4. 找到每个销售员销售额最大的3笔订单
esproc
A | |
1 | =now() |
2 | =file("E:\\orders_big_data\\orders.csv")[email protected]() |
3 | =A2.groups(sellerid;top(3; -amount):top3) |
4 | =A3.conj(top3) |
5 | [email protected](A1,now()) |
A3:按照sellerid分组并取amount最大的前三条记录。A.top(n;x)存在x时,返回的是记录。针对序列的每个成员计算表达式x,返回前n个最小值对应的记录。
A4:连接top3字段的序表组成新序表。
esproc并行代码
A | |
1 | =now() |
2 | =file("E:\\orders_big_data\\orders.csv")[email protected](;4) |
3 | =A2.groups(sellerid;top(3; -amount):top3) |
4 | =A3.conj(top3) |
5 | [email protected](A1,now()) |
python:
import time
import pandas as pd
import numpy as np
s = time.time()
chunksize=1000000
order_data = pd.read_csv('E:\\orders_big_data\\orders.csv',iterator=True,chunksize=chunksize)
group_list = []
for chunk in order_data:
for_inter_list = []
top_n = chunk.groupby(by='sellerid',as_index=False)
for index,group in top_n:
group = group.sort_values(by='amount',ascending=False).iloc[:3]
for_inter_list.append(group)
for_inter_df = pd.concat(for_inter_list,ignore_index=True)
group_list.append(for_inter_df)
top_n_gr = pd.concat(group_list,ignore_index=True).groupby(by='sellerid',as_index=False)
top_n_list=[]
for index,group in top_n_gr:
group = group.sort_values(by='amount',ascending=False).iloc[:3]
top_n_list.append(group)
top_3 = pd.concat(top_n_list)
print(top_3)
e = time.time()
print(e-s)
定义一个list,用来存放每个chunk生成的df
循环数据
定义一个循环内的list,用来存放分组以后的df
按照sellerid分组
循环分组,按照amount排序,ascending=Falese表示倒序排序,取前三个,然后将结果放入for循环内的list中
合并循环内list的df
循环结束后,合并最初定义的list中的df
再次按照sellerid分组
循环分组,按照amount排序,ascending=Falese表示倒序排序,取前三个
合并这次得到结果,得到每个销售员销售额最大的 3 笔订单
结果:
esproc
python
并行数 | 耗时 | |
esproc | 不并行 | 129.989 |
esproc | 2 | 85.624 |
esproc | 4 | 72.040 |
python | 不并行 | 224.048 |
小结:本节我们用比较大的数据进行了简单的计算,包括条件查询、分组汇总、获得唯一值和topn。从代码的复杂度和运行速度看,esproc都占据了优势,esproc可以轻松的通过并行提高运行效率,充分发挥多核cpu的优势,而python则很难做到。第四例中,python进行了多次循环、排序、合并。我尝试了使用python原生库做第4例的题目,由于一直都是对比的pandas所以这里没有重点介绍,运行效率比pandas快(耗时:183.164),但仍没有esproc快,这里仅供大家参考。python可以根据内存的大小调节chunksize的大小,在内存允许的情况下chunksize越大,运行效率越高。
第四例,python的另一种代码
s = time.time()
seller_dic = {}
with open('E:\\orders_big_data\\orders2.csv') as fd:
i=0
for line in fd:
if i ==0:
cols = line.strip().split(',')
i+=1
else:
ss = line.strip().split(',')
if len(ss) != 5:
continue
orderid = ss[0]
clientid = ss[1]
sellerid = int(ss[2])
amount = float(ss[3])
date = ss[4]
if sellerid not in seller_dic:
seller_dic[sellerid]={}
seller_dic[sellerid][amount] = ss
else:
if len(seller_dic[sellerid])<3:
seller_dic[sellerid][amount] = ss
else:
if amount>min(seller_dic[sellerid].keys()):
seller_dic[sellerid].pop(min(seller_dic[sellerid].keys()))
seller_dic[sellerid][amount]=ss
seller_list = sorted(seller_dic.items(),key=lambda x:x[0])
top_3_list = []
for item in seller_list:
for j in sorted(item[1].items(),key=lambda x:x[0],reverse=True):
top_3_list.append(j[1])
top_3_df = pd.DataFrame(top_3_list,columns=cols)
print(top_3_df)
e = time.time()
print(e-s)