根据MapReduce计算的流程,在Map阶段选择好KeyValue,然后在reduce中输出计算结果,计算UV的话,最终的结果只是一个数字

我最开始的思路是:

map阶段选择常数1作为key,uid作为value,在reduce阶段将map输出放置到HashSet中排重,输出hashSet的size即为正确的UV

我运行的日志如下:

11/08/22 16:14:54 INFO mapred.JobClient:   FileSystemCounters
11/08/22 16:14:54 INFO mapred.JobClient:     FILE_BYTES_READ=817583842
11/08/22 16:14:54 INFO mapred.JobClient:     HDFS_BYTES_READ=11406242768
11/08/22 16:14:54 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1459132572
11/08/22 16:14:54 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=9
11/08/22 16:14:54 INFO mapred.JobClient:   Map-Reduce Framework
11/08/22 16:14:54 INFO mapred.JobClient:     Reduce input groups=1
11/08/22 16:14:54 INFO mapred.JobClient:     Combine output records=0
11/08/22 16:14:54 INFO mapred.JobClient:     Map input records=5097005
11/08/22 16:14:54 INFO mapred.JobClient:     Reduce shuffle bytes=0
11/08/22 16:14:54 INFO mapred.JobClient:     Reduce output records=1
11/08/22 16:14:54 INFO mapred.JobClient:     Spilled Records=17676018
11/08/22 16:14:54 INFO mapred.JobClient:     Map output bytes=61164060
11/08/22 16:14:54 INFO mapred.JobClient:     Combine input records=0
11/08/22 16:14:54 INFO mapred.JobClient:     Map output records=5097005
11/08/22 16:14:54 INFO mapred.JobClient:     Reduce input records=5097005
从以上红色标记日志可以看出处理记录数为5097005条,但reduce分组为1,reduce输出结果也为1
这是因为key值相同,在mapreduce过程中会将相同的key归并在一起,不难理解这种情况仅仅只有一个reduce任务
由此造成的后果是reduce任务只会在集群其中一个节点上运行,不仅没有利用好集群处理能力,反倒因为大量数据集中而导致计算效率低下,如果出现OOM异常,那也算正常(因为将所有用户都放置到一个hashset中)
改进:
既然reduce任务负载不均衡是因为key单一导致,那可以打散key,把reduce任务分解成N个,分配到整个集群上计算;这就可以解决问题
我在map阶段把uid整数化后(为什么要整数化,而不直接用字符串?这个大家都应该清楚,整数相对字符串而言,整数更节省存储容量,值比较效率更高) 对1000取模充当key,value为uid; 于是就有1000个不同的key,也将产生1000个reduce任务,这样就充分利用了集群的计算能力;
map阶段     key: uid%1000                   value: uid
这样就会产生0-999的key,reduce部分不需要改动,输出的结果可能如下:

uid%1000=0       XXX

uid%1000=1        XXX

uid%1000=2       XXXX

……………………….

uid%1000=998   XXXX

uid%1000=999   XXXX

上面输出的结果并非我需要的,但无碍,可以在这基础上再做mapreduce一遍,reduce任务里求和就可以了

map代码:
public void map(Object key, Text value, Context ctx)
throws IOException, InterruptedException {
try {

String[] elements = value.toString().split(“\\|”); // 按|分隔符打散

String userid = elements[7]; // 获取用户标识

long uidHash = NetUtil.hash(NetUtil.computeMd5(userid), 0); // 哈希转整数

int mapKey = (int) (uidHash % 1000);//取模将key打散

ctx.write(new IntWritable(mapKey), new LongWritable(uidHash));

} catch (Exception e) {
e.printStackTrace();
return;
}
}

reduce代码

public void reduce(IntWritable key, Iterable<LongWritable> values,
Context ctx) throws IOException, InterruptedException {

Set<Long> uidSet = new HashSet<Long>();

Iterator<LongWritable> iter = values.iterator();
while (iter.hasNext()) {
long uid = iter.next().get();
uidSet.add(uid);
}

ctx.write(key, new LongWritable(uidSet.size()));
}

运行后的日志如下:

11/08/22 16:08:39 INFO mapred.JobClient:   FileSystemCounters
11/08/22 16:08:39 INFO mapred.JobClient:     FILE_BYTES_READ=817583986
11/08/22 16:08:39 INFO mapred.JobClient:     HDFS_BYTES_READ=11406242768
11/08/22 16:08:39 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1459132860
11/08/22 16:08:39 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=7890
11/08/22 16:08:39 INFO mapred.JobClient:   Map-Reduce Framework
11/08/22 16:08:39 INFO mapred.JobClient:     Reduce input groups=1000
11/08/22 16:08:39 INFO mapred.JobClient:     Combine output records=0
11/08/22 16:08:39 INFO mapred.JobClient:     Map input records=5097005
11/08/22 16:08:39 INFO mapred.JobClient:     Reduce shuffle bytes=0
11/08/22 16:08:39 INFO mapred.JobClient:     Reduce output records=1000
11/08/22 16:08:39 INFO mapred.JobClient:     Spilled Records=17676018
11/08/22 16:08:39 INFO mapred.JobClient:     Map output bytes=61164060
11/08/22 16:08:39 INFO mapred.JobClient:     Combine input records=0
11/08/22 16:08:39 INFO mapred.JobClient:     Map output records=5097005
11/08/22 16:08:39 INFO mapred.JobClient:     Reduce input records=5097005
同样的输入记录数,但可以看到以分散成了1000个reduce任务,输出结果同样是1000个