Apache Pig的一些基础概念及用法总结2(转)

时间:2023-01-30 23:28:39

(7)在多维度组合下,如何计算某个维度组合里的不重复记录的条数
以数据文件 c.txt 为例:

1
2
3
4
5
6
7
[root@localhost pig]$
cat c.txt
a 1 2 3 4.2 9.8 100
a 3 0 5 3.5 2.1 200
b 7 9 9 - - 300
a 7 9 9 2.6 6.2 300
a 1 2 5 7.7 5.9 200
a 1 2 3 1.4 0.2 500

问题:如何计算在第2、3、4列的所有维度组合下,最后一列不重复的记录分别有多少条?例如,第2、3、4列有一个维度组合是(1,2,3),在这个维度维度下,最后一列有两种值:100 和 500,因此不重复的记录数为2。同理可求得其他的记录条数。
pig代码及输出结果如下:

1
2
3
4
5
6
7
8
grunt> A = LOAD 'c.txt' AS (col1:chararray, col2:int, col3:int, col4:int, col5:double, col6:double, col7:int);
grunt> B = GROUP A BY (col2, col3, col4);
grunt> C = FOREACH B {D = DISTINCT A.col7; GENERATE group, COUNT(D);};
grunt> DUMP C;
((1,2,3),2)
((1,2,5),1)
((3,0,5),1)
((7,9,9),1)

我们来看看每一步分别生成了什么样的数据:
LOAD不用说了,就是加载数据;
GROUP也不用说了,和前文所说的一样。GROUP之后得到了这样的数据:

1
2
3
4
5
grunt> DUMP B;
((1,2,3),{(a,1,2,3,4.2,9.8,100),(a,1,2,3,1.4,0.2,500)})
((1,2,5),{(a,1,2,5,7.7,5.9,200)})
((3,0,5),{(a,3,0,5,3.5,2.1,200)})
((7,9,9),{(b,7,9,9,,,300),(a,7,9,9,2.6,6.2,300)})

其实到这里,我们肉眼就可以看出来最后要求的结果是什么了,当然,必须要由pig代码来完成,要不然怎么应对海量数据?
文章来源:http://www.codelast.com/
这里的 FOREACH 与前面有点不一样,第一次看到这种写法,肯定会觉得很奇怪。先看一下用于去重DISTINCT关键字的说明:

Removes duplicate tuples in a relation.

然后再解释一下:FOREACH 是对B的每一行进行遍历,其中,B的每一行里含有一个包(bag),每一个包中含有若干元组(tuple)A,因此,FOREACH 后面的大括号里的操作,其实是对所谓的“内部包”(inner bag)的操作(详情请参看FOREACH的说明),在这里,我们指定了对A的col7这一列进行去重,去重的结果被命名为D,然后再对D计数(COUNT),就得到了我们想要的结果。
输出结果数据,与前文所述的差不多。
这样就达成了我们的目的。从总体上说,刚接触pig不久的人会觉得这些写法怪怪的,就是扭不过来,但是要坚持,时间长了,连倒影也会让你觉得是正的了。

(8)如何将关系(relation)转换为标量(scalar)
在前文中,我们要统计符合某些条件的数据的条数,使用了COUNT函数来计算,但在COUNT之后,我们得到的还是一个关系(relation),而不是一个标量的数字,如何把一个关系转换为标量,从而可以在后续处理中便于使用呢?
具体请看这个链接

(9)pig中如何使用shell进行辅助数据处理
pig中可以嵌套使用shell进行辅助处理,下面,就以一个实际的例子来说明。
假设我们在某一步pig处理后,得到了类似于下面 b.txt 中的数据:

1
2
3
4
[root@localhost pig]$
cat b.txt
1 5 98  = 7
34  8 6 3 2
62  0 6 = 65

问题:如何将数据中第4列中的“=”符号全部替换为9999?
pig代码及输出结果如下:

1
2
3
4
5
6
grunt> A = LOAD 'b.txt' AS (col1:int, col2:int, col3:int, col4:chararray, col5:int);
grunt> B = STREAM A THROUGH ` awk '{if($4 == "=") print $1"\t"$2"\t"$3"\t9999\t"$5; else print $0}' `;
grunt> DUMP B;
(1,5,98,9999,7)
(34,8,6,3,2)
(62,0,6,9999,65)

我们来看看这段代码是如何做到的:
加载数据,这个没什么好说的。
通过“STREAM … THROUGH …”的方式,我们可以调用一个shell语句,用该shell语句对A的每一行数据进行处理。此处的shell逻辑为:当某一行数据的第4列为“=”符号时,将其替换为“9999”;否则就照原样输出这一行。
输出B,可见结果正确。

(10)向pig脚本中传入参数
假设你的pig脚本输出的文件是通过外部参数指定的,则此参数不能写死,需要传入。在pig中,使用传入的参数如下所示:

1
STORE A INTO '$output_dir' ;

则这个“output_dir”就是个传入的参数。在调用这个pig脚本的shell脚本中,我们可以这样传入参数:

1
pig -param output_dir= "/home/my_ourput_dir/" my_pig_script.pig

这里传入的参数“output_dir”的值为“/home/my_output_dir/”。
文章来源:http://www.codelast.com/
(11)就算是同样一段pig代码,多次计算所得的结果也有可能是不同的
例如用AVG函数来计算平均值时,同样一段pig代码,多次计算所得的结果中,小数点的最后几位也有可能是不相同的(当然也有可能相同),大概是因为精度的原因吧。不过,一般来说小数点的最后几位已经不重要了。例如我对一个数据集进行处理后,小数点后13位才开始有不同,这样的精度完全足够了。

(12)如何编写及使用自定义函数(UDF)
首先给出一个链接:Pig 0.8.1 API,还有Pig UDF Manual。这两个文档能提供很多有用的参考。
自定义函数有何用?这里以一个极其简单的例子来说明一下:
假设你有如下数据:

1
2
3
4
5
[root@localhost pig]$ cat a.txt
uidk  12  3
hfd 132 99
bbN 463 231
UFD 13  10

现在你要将第二列的值先+500,再-300,然后再÷2.6,那么我们可以这样写:

1
2
3
4
5
6
7
grunt> A = LOAD 'a.txt' AS (col1:chararray, col2: double , col3: int );
grunt> B = FOREACH A GENERATE col1, (col2 + 500 - 300)/2.6, col3;
grunt> DUMP B;
(uidk,81.53846153846153,3)
(hfd,127.6923076923077,99)
(bbN,255.0,231)
(UFD,81.92307692307692,10)

我们看到,对第二列进行了 (col2 + 500 – 300)/2.6 这样的计算。麻烦不?或许这点小意思没什么。但是,如果有比这复杂得多的处理,每次你需要输入多少pig代码呢?我们希望有这样一个函数,可以让第二行pig代码简化如下:

1
grunt> B = FOREACH A GENERATE col1, com.codelast.MyUDF(col2), col3;

这样的话,对于我们经常使用的操作,岂不是很方便?
pig的UDF(user-defined function)就是拿来做这个的。
文章来源:http://www.codelast.com/
下面,就以IntelliJ这个IDE为例(其实用什么IDE倒无所谓,大同小异吧),说明我们如何实现这样一个功能。
新建一个新工程,在工程下创建“lib”目录,然后把pig安装包中的“pig-0.8.1-core.jar”文件放置到此lib目录下,然后在“Project Structure→Libraries”下添加(点击“+”号)一个库,就命名为“lib”,然后点击右侧的“Attach Classes”按钮,选择pig-0.8.1-core.jar文件,再点击下方的“Apply”按钮应用此更改。这样做之后,你就可以在IDE的编辑器中实现输入代码时看到智能提示了。
此外,你还需要用同样的方法,将一堆Hadoop的jar包添加到工程中,包括以下文件:

1
2
3
4
5
hadoop-XXX-ant.jar
hadoop-XXX-core.jar
hadoop-XXX-examples.jar
hadoop-XXX- test .jar
hadoop-XXX-tools.jar

其中,XXX是版本号。
如果没有这些文件,你在编译jar包的时候会报错。
文章来源:http://www.codelast.com/
跟我一起,在工程目录下的 src/com/coldelast/ 目录下创建Java源代码文件 MyUDF.java,其内容如下:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.codelast;
 
import java.io.IOException;
 
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
 
/**
  * Author: Darran Zhang @ codelast.com
  * Date: 2011-09-29
  */
 
public class MyUDF extends EvalFunc<Double> {
 
   @Override
   public Double exec(Tuple input) throws IOException {
     if (input == null || input.size() == 0 ) {
       return null ;
     }
 
     try {
       Double val = (Double) input.get( 0 );
       val = (val + 500 - 300 ) / 2.6 ;
       return val;
     } catch (Exception e) {
       throw new IOException(e.getMessage());
     }
   }
}

在上面的代码中,input.get(0)是获取UDF的第一个参数(可以向UDF传入多个参数);同理,如果你的UDF接受两个参数(例如一个求和的UDF),那么input.get(1)可以取到第二个参数。
然后编写build.xml(相当于C++里面的Makefile),用ant来编译、打包此工程——这里就不把冗长的build.xml写上来了,而且这也不是关键,没有太多意义。
文章来源:http://www.codelast.com/
假定编译、打包得到的jar包名为cl.jar,我们到这里几乎已经完成了大部分工作。下面就看看如何在pig中调用我们刚编写的自定义函数了。

1
2
3
4
5
6
7
8
grunt> REGISTER cl.jar;
grunt> A = LOAD 'a.txt' AS (col1:chararray, col2: double , col3: int );
grunt> B = FOREACH A GENERATE col1, com.codelast.MyUDF(col2), col3;
grunt> DUMP B;
(uidk,81.53846153846153,3)
(hfd,127.6923076923077,99)
(bbN,255.0,231)
(UFD,81.92307692307692,10)

注:第一句是注册你编写的UDF,使用前必须先注册。
从结果可见,我们实现了预定的效果。
UDF大有用途!
注意:对如果你的UDF返回一个标量类型(类似于我上面的例子),那么pig就可以使用反射(reflection)来识别出返回类型。如果你的UDF返回的是一个包(bag)或一个元组(tuple),并且你希望pig能理解包(bag)或元组(tuple)的内容的话,那么你就要实现outputSchema方法,否则后果很不好。具体可看这个链接的说明。

(13)什么是聚合函数(Aggregate Function)
在pig中,聚合函数就是那些接受一个输入包(bag),返回一个标量(scalar)值的函数。COUNT函数就是一个例子。

(14)COGROUP做了什么
与GROUP操作符一样,COGROUP也是用来分组的,不同的是,COGROUP可以按多个关系中的字段进行分组。
还是以一个实例来说明,假设有以下两个数据文件:

01
02
03
04
05
06
07
08
09
10
[root@localhost pig]$
cat a.txt
uidk  12  3
hfd 132 99
bbN 463 231
UFD 13  10
 
[root@localhost pig]$
cat b.txt
908 uidk  888
345 hfd 557
28790 re  00000

现在我们用pig做如下操作及得到的结果为:

1
2
3
4
5
6
7
8
9
grunt> A = LOAD 'a.txt' AS (acol1:chararray, acol2: int , acol3: int );
grunt> B = LOAD 'b.txt' AS (bcol1: int , bcol2:chararray, bcol3: int );
grunt> C = COGROUP A
BY acol1, B BY bcol2;
grunt> DUMP C;
(re,{},{(28790,re,0)})
(UFD,{(UFD,13,10)},{})
(bbN,{(bbN,463,231)},{})
(hfd,{(hfd,132,99)},{(345,hfd,557)})
(uidk,{(uidk,12,3)},{(908,uidk,888)})

每一行输出的第一项都是分组的key,第二项和第三项分别都是一个包(bag),其中,第二项是根据前面的key找到的A中的数据包,第三项是根据前面的key找到的B中的数据包。
来看看第一行输出:“re”作为group的key时,其找不到对应的A中的数据,因此第二项就是一个空的包“{}”,“re”这个key在B中找到了对应的数据(28790    re    00000),因此第三项就是包{(28790,re,0)}。
其他输出数据也类似。

(15)安装pig后,运行pig命令时提示“Cannot find hadoop configurations in classpath”等错误的解决办法
pig安装好后,运行pig命令时提示以下错误:

ERROR org.apache.pig.Main – ERROR 4010: Cannot find hadoop configurations in classpath (neither hadoop-site.xml nor core-site.xml was found in the classpath).If you plan to use local mode, please put -x local option in command line

显而易见,提示找不到与hadoop相关的配置文件。所以我们需要把hadoop安装目录下的“conf”子目录添加到系统环境变量PATH中:
修改 /etc/profile 文件,添加:

1
2
3
4
export HADOOP_HOME= /usr/local/hadoop
export PIG_CLASSPATH=$HADOOP_HOME /conf
 
PATH=$JAVA_HOME /bin :$HADOOP_HOME /bin :$PIG_CLASSPATH:$PATH

然后重新加载 /etc/profile 文件:

1
source /etc/profile

文章来源:http://www.codelast.com/
(16)piggybank是什么东西

Pig also hosts a UDF repository called piggybank that allows users to share UDFs that they have written.

说白了就是Apache把大家写的自定义函数放在一块儿,起了个名字,就叫做piggybank。你可以把它理解为一个SVN代码仓库。具体请看这里

(17)UDF的构造函数会被调用几次
你可能会想在UDF的构造函数中做一些初始化的工作,例如创建一些文件,等等。但是你不能假设UDF的构造函数只被调用一次,因此,如果你要在构造函数中做一些只能做一次的工作,你就要当心了——可能会导致错误。

转载:http://www.codelast.com/