(7)在多维度组合下,如何计算某个维度组合里的不重复记录的条数
以数据文件 c.txt 为例:
1
2
3
4
5
6
7
|
[root@localhost pig]$
cat
c.txt
a 1 2 3 4.2 9.8 100
a 3 0 5 3.5 2.1 200
b 7 9 9 - - 300
a 7 9 9 2.6 6.2 300
a 1 2 5 7.7 5.9 200
a 1 2 3 1.4 0.2 500
|
问题:如何计算在第2、3、4列的所有维度组合下,最后一列不重复的记录分别有多少条?例如,第2、3、4列有一个维度组合是(1,2,3),在这个维度维度下,最后一列有两种值:100 和 500,因此不重复的记录数为2。同理可求得其他的记录条数。
pig代码及输出结果如下:
1
2
3
4
5
6
7
8
|
grunt> A = LOAD
'c.txt'
AS (col1:chararray, col2:int, col3:int, col4:int, col5:double, col6:double, col7:int);
grunt> B = GROUP A BY (col2, col3, col4);
grunt> C = FOREACH B {D = DISTINCT A.col7; GENERATE group, COUNT(D);};
grunt> DUMP C;
((1,2,3),2)
((1,2,5),1)
((3,0,5),1)
((7,9,9),1)
|
我们来看看每一步分别生成了什么样的数据:
①LOAD不用说了,就是加载数据;
②GROUP也不用说了,和前文所说的一样。GROUP之后得到了这样的数据:
1
2
3
4
5
|
grunt> DUMP B;
((1,2,3),{(a,1,2,3,4.2,9.8,100),(a,1,2,3,1.4,0.2,500)})
((1,2,5),{(a,1,2,5,7.7,5.9,200)})
((3,0,5),{(a,3,0,5,3.5,2.1,200)})
((7,9,9),{(b,7,9,9,,,300),(a,7,9,9,2.6,6.2,300)})
|
其实到这里,我们肉眼就可以看出来最后要求的结果是什么了,当然,必须要由pig代码来完成,要不然怎么应对海量数据?
文章来源:http://www.codelast.com/
③这里的 FOREACH 与前面有点不一样,第一次看到这种写法,肯定会觉得很奇怪。先看一下用于去重的DISTINCT关键字的说明:
Removes duplicate tuples in a relation.
然后再解释一下:FOREACH 是对B的每一行进行遍历,其中,B的每一行里含有一个包(bag),每一个包中含有若干元组(tuple)A,因此,FOREACH 后面的大括号里的操作,其实是对所谓的“内部包”(inner bag)的操作(详情请参看FOREACH的说明),在这里,我们指定了对A的col7这一列进行去重,去重的结果被命名为D,然后再对D计数(COUNT),就得到了我们想要的结果。
④输出结果数据,与前文所述的差不多。
这样就达成了我们的目的。从总体上说,刚接触pig不久的人会觉得这些写法怪怪的,就是扭不过来,但是要坚持,时间长了,连倒影也会让你觉得是正的了。
(8)如何将关系(relation)转换为标量(scalar)
在前文中,我们要统计符合某些条件的数据的条数,使用了COUNT函数来计算,但在COUNT之后,我们得到的还是一个关系(relation),而不是一个标量的数字,如何把一个关系转换为标量,从而可以在后续处理中便于使用呢?
具体请看这个链接。
(9)pig中如何使用shell进行辅助数据处理
pig中可以嵌套使用shell进行辅助处理,下面,就以一个实际的例子来说明。
假设我们在某一步pig处理后,得到了类似于下面 b.txt 中的数据:
1
2
3
4
|
[root@localhost pig]$
cat
b.txt
1 5 98 = 7
34 8 6 3 2
62 0 6 = 65
|
问题:如何将数据中第4列中的“=”符号全部替换为9999?
pig代码及输出结果如下:
1
2
3
4
5
6
|
grunt> A = LOAD
'b.txt'
AS (col1:int, col2:int, col3:int, col4:chararray, col5:int);
grunt> B = STREAM A THROUGH `
awk
'{if($4 == "=") print $1"\t"$2"\t"$3"\t9999\t"$5; else print $0}'
`;
grunt> DUMP B;
(1,5,98,9999,7)
(34,8,6,3,2)
(62,0,6,9999,65)
|
我们来看看这段代码是如何做到的:
①加载数据,这个没什么好说的。
②通过“STREAM … THROUGH …”的方式,我们可以调用一个shell语句,用该shell语句对A的每一行数据进行处理。此处的shell逻辑为:当某一行数据的第4列为“=”符号时,将其替换为“9999”;否则就照原样输出这一行。
③输出B,可见结果正确。
(10)向pig脚本中传入参数
假设你的pig脚本输出的文件是通过外部参数指定的,则此参数不能写死,需要传入。在pig中,使用传入的参数如下所示:
1
|
STORE A
INTO
'$output_dir'
;
|
则这个“output_dir”就是个传入的参数。在调用这个pig脚本的shell脚本中,我们可以这样传入参数:
1
|
pig -param output_dir=
"/home/my_ourput_dir/"
my_pig_script.pig
|
这里传入的参数“output_dir”的值为“/home/my_output_dir/”。
文章来源:http://www.codelast.com/
(11)就算是同样一段pig代码,多次计算所得的结果也有可能是不同的
例如用AVG函数来计算平均值时,同样一段pig代码,多次计算所得的结果中,小数点的最后几位也有可能是不相同的(当然也有可能相同),大概是因为精度的原因吧。不过,一般来说小数点的最后几位已经不重要了。例如我对一个数据集进行处理后,小数点后13位才开始有不同,这样的精度完全足够了。
(12)如何编写及使用自定义函数(UDF)
首先给出一个链接:Pig 0.8.1 API,还有Pig UDF Manual。这两个文档能提供很多有用的参考。
自定义函数有何用?这里以一个极其简单的例子来说明一下:
假设你有如下数据:
1
2
3
4
5
|
[root@localhost pig]$ cat a.txt
uidk 12 3
hfd 132 99
bbN 463 231
UFD 13 10
|
现在你要将第二列的值先+500,再-300,然后再÷2.6,那么我们可以这样写:
1
2
3
4
5
6
7
|
grunt> A =
LOAD
'a.txt'
AS
(col1:chararray, col2:
double
, col3:
int
);
grunt> B = FOREACH A GENERATE col1, (col2 + 500 - 300)/2.6, col3;
grunt> DUMP B;
(uidk,81.53846153846153,3)
(hfd,127.6923076923077,99)
(bbN,255.0,231)
(UFD,81.92307692307692,10)
|
我们看到,对第二列进行了 (col2 + 500 – 300)/2.6 这样的计算。麻烦不?或许这点小意思没什么。但是,如果有比这复杂得多的处理,每次你需要输入多少pig代码呢?我们希望有这样一个函数,可以让第二行pig代码简化如下:
1
|
grunt> B = FOREACH A GENERATE col1, com.codelast.MyUDF(col2), col3;
|
这样的话,对于我们经常使用的操作,岂不是很方便?
pig的UDF(user-defined function)就是拿来做这个的。
文章来源:http://www.codelast.com/
下面,就以IntelliJ这个IDE为例(其实用什么IDE倒无所谓,大同小异吧),说明我们如何实现这样一个功能。
①新建一个新工程,在工程下创建“lib”目录,然后把pig安装包中的“pig-0.8.1-core.jar”文件放置到此lib目录下,然后在“Project Structure→Libraries”下添加(点击“+”号)一个库,就命名为“lib”,然后点击右侧的“Attach Classes”按钮,选择pig-0.8.1-core.jar文件,再点击下方的“Apply”按钮应用此更改。这样做之后,你就可以在IDE的编辑器中实现输入代码时看到智能提示了。
此外,你还需要用同样的方法,将一堆Hadoop的jar包添加到工程中,包括以下文件:
1
2
3
4
5
|
hadoop-XXX-ant.jar
hadoop-XXX-core.jar
hadoop-XXX-examples.jar
hadoop-XXX-
test
.jar
hadoop-XXX-tools.jar
|
其中,XXX是版本号。
如果没有这些文件,你在编译jar包的时候会报错。
文章来源:http://www.codelast.com/
②跟我一起,在工程目录下的 src/com/coldelast/ 目录下创建Java源代码文件 MyUDF.java,其内容如下:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package
com.codelast;
import
java.io.IOException;
import
org.apache.pig.EvalFunc;
import
org.apache.pig.data.Tuple;
/**
* Author: Darran Zhang @ codelast.com
* Date: 2011-09-29
*/
public
class
MyUDF
extends
EvalFunc<Double> {
@Override
public
Double exec(Tuple input)
throws
IOException {
if
(input ==
null
|| input.size() ==
0
) {
return
null
;
}
try
{
Double val = (Double) input.get(
0
);
val = (val +
500
-
300
) /
2.6
;
return
val;
}
catch
(Exception e) {
throw
new
IOException(e.getMessage());
}
}
}
|
在上面的代码中,input.get(0)是获取UDF的第一个参数(可以向UDF传入多个参数);同理,如果你的UDF接受两个参数(例如一个求和的UDF),那么input.get(1)可以取到第二个参数。
然后编写build.xml(相当于C++里面的Makefile),用ant来编译、打包此工程——这里就不把冗长的build.xml写上来了,而且这也不是关键,没有太多意义。
文章来源:http://www.codelast.com/
③假定编译、打包得到的jar包名为cl.jar,我们到这里几乎已经完成了大部分工作。下面就看看如何在pig中调用我们刚编写的自定义函数了。
1
2
3
4
5
6
7
8
|
grunt> REGISTER cl.jar;
grunt> A =
LOAD
'a.txt'
AS
(col1:chararray, col2:
double
, col3:
int
);
grunt> B = FOREACH A GENERATE col1, com.codelast.MyUDF(col2), col3;
grunt> DUMP B;
(uidk,81.53846153846153,3)
(hfd,127.6923076923077,99)
(bbN,255.0,231)
(UFD,81.92307692307692,10)
|
注:第一句是注册你编写的UDF,使用前必须先注册。
从结果可见,我们实现了预定的效果。
UDF大有用途!
注意:对如果你的UDF返回一个标量类型(类似于我上面的例子),那么pig就可以使用反射(reflection)来识别出返回类型。如果你的UDF返回的是一个包(bag)或一个元组(tuple),并且你希望pig能理解包(bag)或元组(tuple)的内容的话,那么你就要实现outputSchema方法,否则后果很不好。具体可看这个链接的说明。
(13)什么是聚合函数(Aggregate Function)
在pig中,聚合函数就是那些接受一个输入包(bag),返回一个标量(scalar)值的函数。COUNT函数就是一个例子。
(14)COGROUP做了什么
与GROUP操作符一样,COGROUP也是用来分组的,不同的是,COGROUP可以按多个关系中的字段进行分组。
还是以一个实例来说明,假设有以下两个数据文件:
01
02
03
04
05
06
07
08
09
10
|
[root@localhost pig]$
cat
a.txt
uidk 12 3
hfd 132 99
bbN 463 231
UFD 13 10
[root@localhost pig]$
cat
b.txt
908 uidk 888
345 hfd 557
28790 re 00000
|
现在我们用pig做如下操作及得到的结果为:
1
2
3
4
5
6
7
8
9
|
grunt> A =
LOAD
'a.txt'
AS
(acol1:chararray, acol2:
int
, acol3:
int
);
grunt> B =
LOAD
'b.txt'
AS
(bcol1:
int
, bcol2:chararray, bcol3:
int
);
grunt> C = COGROUP A
BY
acol1, B
BY
bcol2;
grunt> DUMP C;
(re,{},{(28790,re,0)})
(UFD,{(UFD,13,10)},{})
(bbN,{(bbN,463,231)},{})
(hfd,{(hfd,132,99)},{(345,hfd,557)})
(uidk,{(uidk,12,3)},{(908,uidk,888)})
|
每一行输出的第一项都是分组的key,第二项和第三项分别都是一个包(bag),其中,第二项是根据前面的key找到的A中的数据包,第三项是根据前面的key找到的B中的数据包。
来看看第一行输出:“re”作为group的key时,其找不到对应的A中的数据,因此第二项就是一个空的包“{}”,“re”这个key在B中找到了对应的数据(28790 re 00000),因此第三项就是包{(28790,re,0)}。
其他输出数据也类似。
(15)安装pig后,运行pig命令时提示“Cannot find hadoop configurations in classpath”等错误的解决办法
pig安装好后,运行pig命令时提示以下错误:
ERROR org.apache.pig.Main – ERROR 4010: Cannot find hadoop configurations in classpath (neither hadoop-site.xml nor core-site.xml was found in the classpath).If you plan to use local mode, please put -x local option in command line
显而易见,提示找不到与hadoop相关的配置文件。所以我们需要把hadoop安装目录下的“conf”子目录添加到系统环境变量PATH中:
修改 /etc/profile 文件,添加:
1
2
3
4
|
export
HADOOP_HOME=
/usr/local/hadoop
export
PIG_CLASSPATH=$HADOOP_HOME
/conf
PATH=$JAVA_HOME
/bin
:$HADOOP_HOME
/bin
:$PIG_CLASSPATH:$PATH
|
然后重新加载 /etc/profile 文件:
1
|
source
/etc/profile
|
文章来源:http://www.codelast.com/
(16)piggybank是什么东西
Pig also hosts a UDF repository called piggybank that allows users to share UDFs that they have written.
说白了就是Apache把大家写的自定义函数放在一块儿,起了个名字,就叫做piggybank。你可以把它理解为一个SVN代码仓库。具体请看这里。
(17)UDF的构造函数会被调用几次
你可能会想在UDF的构造函数中做一些初始化的工作,例如创建一些文件,等等。但是你不能假设UDF的构造函数只被调用一次,因此,如果你要在构造函数中做一些只能做一次的工作,你就要当心了——可能会导致错误。