在您的例子中, FORACH 中有许多嵌套的 DISTRIT 运算符, 它们在缩放器中执行, 它依靠 RAM 来计算独有的值, 此查询只产生一个任务 。 如果组合中只有太多的独有元素, 您也可以得到与记忆相关的例外 。
幸运的是, PIG 拉丁文是一种数据流语言, 您会写入某种执行计划 。 为了使用更多的 CPU, 您可以修改您的代码, 从而强制执行更多可以平行执行的 MapRduce 任务 。 因为我们应该重写查询而不使用嵌套的 DISTINCT, 关键是进行不同的操作, 而不是组合, 仿佛您只有一列, 而不是合并结果 。 它非常像 SQL, 但它是有效的 。 这里是 :
records = LOAD .... USING PigStorage( , ) AS (g, a, b, c, d, fd, s, w);
selected = FOREACH records GENERATE g, a, b, c, d;
grouped_a = FOREACH selected GENERATE g, a;
grouped_a = DISTINCT grouped_a;
grouped_a_count = GROUP grouped_a BY g;
grouped_a_count = FOREACH grouped_a_count GENERATE FLATTEN(group) as g, COUNT(grouped_a) as a_count;
grouped_b = FOREACH selected GENERATE g, b;
grouped_b = DISTINCT grouped_b;
grouped_b_count = GROUP grouped_b BY g;
grouped_b_count = FOREACH grouped_b_count GENERATE FLATTEN(group) as g, COUNT(grouped_b) as b_count;
grouped_c = FOREACH selected GENERATE g, c;
grouped_c = DISTINCT grouped_c;
grouped_c_count = GROUP grouped_c BY g;
grouped_c_count = FOREACH grouped_c_count GENERATE FLATTEN(group) as g, COUNT(grouped_c) as c_count;
grouped_d = FOREACH selected GENERATE g, d;
grouped_d = DISTINCT grouped_d;
grouped_d_count = GROUP grouped_d BY g;
grouped_d_count = FOREACH grouped_d_count GENERATE FLATTEN(group) as g, COUNT(grouped_d) as d_count;
mrg = JOIN grouped_a_count BY g, grouped_b_count BY g, grouped_c_count BY g, grouped_d_count BY g;
out = FOREACH mrg GENERATE grouped_a_count::g, grouped_a_count::a_count, grouped_b_count::b_count, grouped_c_count::c_count, grouped_d_count::d_count;
STORE out into .... USING PigStorage( , );
执行后,我得到了以下摘要,其中显示不同的操作没有因数据扭曲而受到影响,由第一份工作处理:
Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs
job_201206061712_0244 669 45 75 8 13 376 18 202 grouped_a,grouped_b,grouped_c,grouped_d,records,selected DISTINCT,MULTI_QUERY
job_201206061712_0245 1 1 3 3 3 12 12 12 grouped_c_count GROUP_BY,COMBINER
job_201206061712_0246 1 1 3 3 3 12 12 12 grouped_b_count GROUP_BY,COMBINER
job_201206061712_0247 5 1 48 27 33 30 30 30 grouped_a_count GROUP_BY,COMBINER
job_201206061712_0248 1 1 3 3 3 12 12 12 grouped_d_count GROUP_BY,COMBINER
job_201206061712_0249 4 1 3 3 3 12 12 12 mrg,out HASH_JOIN ...,
Input(s):
Successfully read 52215768 records (44863559501 bytes) from: "...."
Output(s):
Successfully stored 9 records (181 bytes) in: "..."
我们可以看到,从工作DAG, 集体行动是平行执行的:
Job DAG:
job_201206061712_0244 -> job_201206061712_0248,job_201206061712_0246,job_201206061712_0247,job_201206061712_0245,
job_201206061712_0248 -> job_201206061712_0249,
job_201206061712_0246 -> job_201206061712_0249,
job_201206061712_0247 -> job_201206061712_0249,
job_201206061712_0245 -> job_201206061712_0249,
job_201206061712_0249
它在我的数据集上运作良好, 组关键值之一( g列) 占数据95%。 它也会消除与内存有关的例外 。