English 中文(简体)
如何在 PIG Latin 语句中优化一组语句?
原标题:How to optimize a group by statement in PIG latin?
  • 时间:2012-05-24 06:45:48
  •  标签:
  • apache-pig

我有一个偏斜的数据集, 我需要用操作来做一个组, 然后在它上做一个套套。 由于偏斜的数据, 很少的递减者花费了很长时间, 其他人则没有花时间。 我知道有偏斜的组合, 但按一个组合和每个组合有什么用 。 这是我的猪码( 重新命名变量 ) :

foo_grouped = GROUP foo_grouped by FOO;
FOO_stats = FOREACH foo_grouped 
{ 
a_FOO_total = foo_grouped.ATTR; 
a_FOO_total = DISTINCT a_FOO_total; 

bar_count = foo_grouped.BAR; 
bar_count = DISTINCT bar_count; 

a_FOO_type1 = FILTER foo_grouped by COND1== Y ;
a_FOO_type1 = a_FOO_type1.ATTR; 
a_FOO_type1 = DISTINCT a_FOO_type1;

a_FOO_type2 = FILTER foo_grouped by COND2== Y  OR COND3== HIGH ; 
a_FOO_type2 = a_FOO_type2.ATTR; 
a_FOO_type2 = DISTINCT a_FOO_type2; 

generate group as FOO, 
COUNT(a_FOO_total) as a_FOO_total, COUNT(a_FOO_type1) as a_FOO_type1, COUNT(a_FOO_type2)     as a_FOO_type2, COUNT(bar_count) as bar_count; }
最佳回答

在您的例子中, FORACH 中有许多嵌套的 DISTRIT 运算符, 它们在缩放器中执行, 它依靠 RAM 来计算独有的值, 此查询只产生一个任务 。 如果组合中只有太多的独有元素, 您也可以得到与记忆相关的例外 。

幸运的是, PIG 拉丁文是一种数据流语言, 您会写入某种执行计划 。 为了使用更多的 CPU, 您可以修改您的代码, 从而强制执行更多可以平行执行的 MapRduce 任务 。 因为我们应该重写查询而不使用嵌套的 DISTINCT, 关键是进行不同的操作, 而不是组合, 仿佛您只有一列, 而不是合并结果 。 它非常像 SQL, 但它是有效的 。 这里是 :

records = LOAD  ....  USING PigStorage( , ) AS (g, a, b, c, d, fd, s, w);
selected = FOREACH records GENERATE g, a, b, c, d;
grouped_a = FOREACH selected GENERATE g, a;
grouped_a = DISTINCT grouped_a;
grouped_a_count = GROUP grouped_a BY g;
grouped_a_count = FOREACH grouped_a_count GENERATE FLATTEN(group) as g, COUNT(grouped_a) as a_count;

grouped_b = FOREACH selected GENERATE g, b;
grouped_b = DISTINCT grouped_b;
grouped_b_count = GROUP grouped_b BY g;
grouped_b_count = FOREACH grouped_b_count GENERATE FLATTEN(group) as g, COUNT(grouped_b) as b_count;

grouped_c = FOREACH selected GENERATE g, c;
grouped_c = DISTINCT grouped_c;
grouped_c_count = GROUP grouped_c BY g;
grouped_c_count = FOREACH grouped_c_count GENERATE FLATTEN(group) as g, COUNT(grouped_c) as c_count;

grouped_d = FOREACH selected GENERATE g, d;
grouped_d = DISTINCT grouped_d;
grouped_d_count = GROUP grouped_d BY g;
grouped_d_count = FOREACH grouped_d_count GENERATE FLATTEN(group) as g, COUNT(grouped_d) as d_count;

mrg = JOIN grouped_a_count BY g, grouped_b_count BY g, grouped_c_count BY g, grouped_d_count BY g;
out = FOREACH mrg GENERATE grouped_a_count::g, grouped_a_count::a_count, grouped_b_count::b_count, grouped_c_count::c_count, grouped_d_count::d_count;
STORE out into  ....  USING PigStorage( , );

执行后,我得到了以下摘要,其中显示不同的操作没有因数据扭曲而受到影响,由第一份工作处理:

Job Stats (time in seconds):
      JobId            Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime      MaxReduceTime   MinReduceTime   AvgReduceTime   Alias   Feature Outputs
job_201206061712_0244   669     45      75      8       13      376     18      202     grouped_a,grouped_b,grouped_c,grouped_d,records,selected        DISTINCT,MULTI_QUERY
job_201206061712_0245   1       1       3       3       3       12      12      12      grouped_c_count GROUP_BY,COMBINER
job_201206061712_0246   1       1       3       3       3       12      12      12      grouped_b_count GROUP_BY,COMBINER
job_201206061712_0247   5       1       48      27      33      30      30      30      grouped_a_count GROUP_BY,COMBINER
job_201206061712_0248   1       1       3       3       3       12      12      12      grouped_d_count GROUP_BY,COMBINER
job_201206061712_0249   4       1       3       3       3       12      12      12      mrg,out HASH_JOIN       ...,
Input(s):
Successfully read 52215768 records (44863559501 bytes) from: "...."

Output(s):
Successfully stored 9 records (181 bytes) in: "..."

我们可以看到,从工作DAG, 集体行动是平行执行的:

Job DAG:
job_201206061712_0244   ->      job_201206061712_0248,job_201206061712_0246,job_201206061712_0247,job_201206061712_0245,
job_201206061712_0248   ->      job_201206061712_0249,
job_201206061712_0246   ->      job_201206061712_0249,
job_201206061712_0247   ->      job_201206061712_0249,
job_201206061712_0245   ->      job_201206061712_0249,
job_201206061712_0249

它在我的数据集上运作良好, 组关键值之一( g列) 占数据95%。 它也会消除与内存有关的例外 。

问题回答

我最近遇到一个错误, 加入此组合 。 如果组合中存在任何无效, 那么整个关系将被删除 。





相关问题
Merging multiple files into one within Hadoop

I get multiple small files into my input directory which I want to merge into a single file without using the local file system or writing mapreds. Is there a way I could do it using hadoof fs ...

Bundling jars, when submittingmap/reduce work through Pig?

I m试图将Hadoop、Pig和Casandra合并起来,以便能够通过简单的Pig查询,就Casses储存的数据开展工作。 问题在于,我不得不做一些工作来创造实际工作的地图/绘画。

generating bigram combinations from grouped data in pig

given my input data in userid,itemid format: raw: {userid: bytearray,itemid: bytearray} dump raw; (A,1) (A,2) (A,4) (A,5) (B,2) (B,3) (B,5) (C,1) (C,5) grpd = GROUP raw BY userid; dump grpd; (A,{(...

Difference between Pig and Hive? Why have both? [closed]

My background - 4 weeks old in the Hadoop world. Dabbled a bit in Hive, Pig and Hadoop using Cloudera s Hadoop VM. Have read Google s paper on Map-Reduce and GFS (PDF link). I understand that- Pig s ...

Regexp matching in pig

Using apache pig and the text hahahah. my brother just didnt do anything wrong. He cheated on a test? no way! I m trying to match "my brother just didnt do anything wrong." Ideally, I d want to ...

How to use Cassandra s Map Reduce with or w/o Pig?

Can someone explain how MapReduce works with Cassandra .6? I ve read through the word count example, but I don t quite follow what s happening on the Cassandra end vs. the "client" end. https://svn....

Storing data to SequenceFile from Apache Pig

Apache Pig can load data from Hadoop sequence files using the PiggyBank SequenceFileLoader: REGISTER /home/hadoop/pig/contrib/piggybank/java/piggybank.jar; DEFINE SequenceFileLoader org.apache.pig....

热门标签