注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

网易杭研后台技术中心的博客

 
 
 
 
 

日志

 
 

hadoop pig 使用笔录  

来自genww   2012-08-14 20:49:45|  分类: Hadoop |举报 |字号 订阅

  下载LOFTER 我的照片书  |

1. 简单说一下什么是hadoop pig

    pig是用来简化hadoop为了处理大数据时编写相应的mapreduce实例的系统。想象一下在大多数情况想我们只需要对hadoop数据进行简单的分析而得到相应的结果,需求简单明确,创建出来的mapreduce实例也不会很复杂。但是,为了实现这种不是很复杂的数据处理时,通常需要对应的mapreduce代码,然后打包发布。有的时候还需要考虑mapreduce对数据如何进行切分,如何进行合并(reduce),甚至可能还要考虑怎样作key,怎样实现相应的排序功能等。而且一旦需求稍有变动,我们又得重新做之前的所有动作,且还不能出错。
    实际上,我们可以单独写一个简单的脚本,或是几行命令,自动生成mapreduce结构的实例,然后进行mapreduce任务来实现大数据的处理,而不用关系mapreduce其他与数据关系不大的逻辑,从而减少了开发量。这就是pig要实现的功能。

2. 如何运行hadoop pig

2.1 如何在本地模式下运行pig

本地模式可以方便测试pig脚本的运行,抛开hadoop等其他环境的影响

1.进入bin
2.本地模式运行,直接:
./pig -x local
这个时候将会看到pig的shell

2.2 在mapreduce模式运行pig
将hadoop的conf目录指向PIG_CLASSPATH环境变量(或设置fs.default.name,mapred.job.tracker这两个参数到pig.properties)


3. pig shell 中常用的方法

3.1 创建一个名称为records的关系

描述:用于告诉pig要处理的数据是什么,这里为简单起见使用了本地的文件系统(文件中字段用tab分割):

grunt> records =LOAD '/home/genww/test/sample_one'
>> AS (name:chararray,year:chararray,temperature:int,quality:int);

注意:录入后,pig不会立即进行导入工作,这要在进行mapreduce时才开始,如果要查看这个关系可用DUMP records,或者查看关系结构可用describe records;

3.2 创建一个过滤对象

描述:对原数据进行过滤

filtered_records = FILTER records BY text eq 'three 1' and year != 9999;
DUMP filtered_records;

3.3. 分组

描述:按指定字段对数据进行分组操作

grouped_records =GROUP filtered_records BY text;

更深入的,把所有关系分成一个all组中这样使用:grouped = group record all;
我们想指定使用30个reducer时g可以这样:roup record by year parallel 30;

3.4. 计算

描述,当分组成功以后,即可进行计算,结合上面操作举例说明,为了获得每组的最大温度(对每组数据进行处理,并新生成一行,字段由generate确定,group,filtered_records.temperature等可通过describe grouped_records看结构获得,MAX是内置函数)可以用如下方法

max = foreach grouped_records generate group, MAX(filtered_records.temperature);

运行一下DUMP max;看看效果

使用举例2:创建一个新的关系(把字段1和字段4作为新关系的字段,有关表达式参见注释2)
re1 = foreach record generate $0,$3;
计算还可以嵌套eg:grp为组关系,这里会对每个组分别进行处理。子句中record就是grp中的一个字段。a表示从各组中取出t==1的放在字段1,同理b放在字段2.得到的结果应该和grp组数个数相同,每条记录返回满足条件的字段。若不满足则该字段返回空
grunt> stat = foreach grp {
>> a = filter record by t == 1;
>> b = filter record by t == 2;
>> generate a as aline,b as bline;
>> }



3.5 其他操作

//查看各个关联数据集
ILLUSTRATE max;

//导出为文件(注意在脚本中store不触发执行操作)
STORE max into '/home/genww/test/a' USING PigStorage(':');//导出文件,分隔符为:


//使用外部程序对关系进行变幻
STREAM
eg:c = stream red through `cut -f 2`;


//连接
JOIN
eg: c = join a by $0, b by $1;连接两个关系, 其中满足$0=$1
eg2: c = join a by $1, b by $1 using "replicated";//使用分段复制连接(关系a必须是大的关系,关系b是小的关系,且可以放在内存中,这时它把小的关系发送到所有的mapper中进行连接)
eg3: c = join a by $0 left outer, b by $1;//使用左外连接
//多个关系分组(和JOIN格式有点类似,结果是将连接匹配字段为一组来分组。每组有两个包,包1为a关系满足条件的关系,包2为b满足条件的关系,也可以使用inner和outer,inner去除包含空包的行,默认outer)
COGROUP
eg:c = COGROUP a by $0, b by $1;

//多个关系乘积(将关系各个字段按笛卡儿积方式组合成一个新的关系)
CROSS
eg: o = cross a,b

//排序
ORDER
eg: b = order a by $0,$1 desc;

//数量限定
LIMIT
eg: c = limit b 2;

//切分
SPLIT
eg:SPLIT record INTO re1 IF type==1, re2 IF type==2;

//注册jar文件
REGISTER
eg:register /home/genww/test/pig.jar

//新建别名
DEFINE
eg:define myfunc cn.com.genww.pig.MyFunc();
eg:define scripts1 `scripts1.sh` SHIP('/home/genww/scripts1.sh');//将脚本发放到集群中,供集群调用

3.6 pig shell 其他命令

pig shell有和hadoop的fs操作类似的命令,如:cat,cd,copyFormLocal,copyToLocal,cp,fs,ls,mkdir,mv,pwd,rm,rmf,kill,exec,help,quit,run等,具体可查看pig官方文档或hadoop有关fs部分的shell操作文档


4 hadoop pig 函数

4.1 内置函数

    在3.4中的例子,已经有使用了max这样的函数,起始hadoop pig为了方便数据处理,内置了很多常用的函数,下面简短做一个归纳:

计算类:SIZE取字串长度,AVG,CONCAT合并字符串,COUNT,COUNTSTAR计算包的项的个数,DIFF计算包的差,MAX,MIN,SUM,TOKENIZE对字符数组进行标记
过滤类:IsEmpty
加载/存储类:PigStorage设置分割字符,BinStorage二进制加载,BinaryStorage加载包含bytearray类型的字段,TextLoader加载纯文本,PigDump用其toString形式存储关系

4.2 自定义函数
    如果觉得内置函数不够满足我们的需求,还可以自定义函数,下面举例描述如何编写一个自定义的函数

自定义过滤类函数:

1.编写自定义函数并打jar包:
eg:
public class MyFunc extends FilterFunc{
@Override
public Boolean exec(Tuple tuple) throws IOException {
if(tuple == null || tuple.size() == 0){
return false;
}
Object obj = tuple.get(0);
if(obj == null){
return false;
}
int i = (Integer)obj;
return i == 1;
}

}
2.注册
grunt> register /home/genww/test/pig.jar 

3.使用
grunt> ret = filter record by cn.com.genww.pig.MyFunc($3);

另外,通过别名可以缩短自定义的函数名称eg:define myfunc cn.com.genww.pig.MyFunc();我们还应该告诉pig这个函数的传入参数应该是什么:这样pig将尝试转换成对应的类型,如果转换失败,则返回null
@Override
public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
List<FuncSpec> funcSpecs = new ArrayList<FuncSpec>();
funcSpecs.add(new FuncSpec(this.getClass().getName(),new Schema(new Schema.FieldSchema(null, DataType.INTEGER))));
return funcSpecs;
}

自定义计算类函数:

说明,和上面类似,主要是在定义函数这里有区别,例子如下:

public class MyFunc extends EvalFunc<String>{
@Override
public String exec(Tuple tuple) throws IOException {
if(tuple == null || tuple.size() == 0){
return null;
}
Object obj = tuple.get(0);
if(obj == null){
return null;
}
return ((String)obj).toString().trim();
}
@Override
public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
List<FuncSpec> funcSpecs = new ArrayList<FuncSpec>();
funcSpecs.add(new FuncSpec(this.getClass().getName(),new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY))));
return funcSpecs;
}
}

自定义加载类函数:

public class MyFunc extends LoadFunc{
private RecordReader reader;
private final TupleFactory tupleFactory = TupleFactory.getInstance();
public MyFunc(String params){
//传入参数
}
@Override
public void setLocation(String location, Job job) throws IOException {
//设置输入路径位置
FileInputFormat.setInputPaths(job, location);
}
@Override
public InputFormat getInputFormat() throws IOException {
//因底层是mapreduce,故需要告诉它文档如何分割
return new TextInputFormat();
}
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
//每个分片把RecordReader传递于此
this.reader = reader;
}
@Override
public Tuple getNext() throws IOException {
//负责将内容转换成Tuple对象,该方法反复调用,知道返回null,表示数据读完了
try {
if(!reader.nextKeyValue()){
return null;
}
Text val = (Text)reader.getCurrentValue();
String line = val.toString();
Tuple tuple = tupleFactory.newTuple(1);
tuple.set(1, new DataByteArray(line));
return tuple;
} catch (InterruptedException e) {
e.printStackTrace();
return null;
}
}
}
加载类函数的使用:record = load '...' USING cn.com.genww.MyFunc('params1') as (...);当发生类型转换时,可重载public LoadCaster getLoadCaster()来指定如何将一种类型转换成另一种类型

附录1  pig支持的数据类型

简单类型:int,long,float,double,chararray,bytearray
复杂类型:tuple,bag,map

附录2 pig shell中参数替换(注释1)

参数支持

描述:可以在pig shell中支持参数,内容在运行pig shell前定义,直接举例说明

若在脚本中有:record = load '$path/$file' ...
则可这样运行脚本:
pig -param path=... -param file=... script.pig
或者创建一个存放参数值的文件param.param,内容如下:
path=...
file=...
然后
pig -param_file /../param.param script.pig

加如下参数,可以预先查看pig对参数代换的情况
-dryrun

exec与run区别
前者在一个新的grunt外壳中运行,所定义的别名等信息在脚本结束后消失。后者在当前外壳中执行。别名的信息可获取查询。另外多查询执行仅支持exec(多查询执行:一个关系的结果可以被多个其他关系调用。其他关系调用时不会重复执行前面的关系以重新产生结果)
通过在conf目录或grunt运行的目录下添加autocomplete文件来定制自动补全的单词
-- 写注释时的前缀



附录2 支持的表达式(注释2)
//常数:1.0,'a'

//字段:$n或name eg:字段1:$0
eg1:将字段1和字段名为year作为新的关系
re1 = foreach record generate $0,year;

//投影(取复杂关系的字段):eg:record.$0 record.year等

//map:eg:items'coat'

//类型转换eg:(int)year
foreach record generate $0,year,(float)123;

//算术:eg:$1-$0,$2 % $1

//条件:eg: quality == 0 ? 0 : 1

//比较:eg:quality == 0,quality matches '[0193]',quality is not null
eg:re1 = filter record by quality == 0;

//boolean: and or not

//函数形
ret = foreach grouped generate COUNT(record);
  评论这张
 
阅读(689)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017