lee的个人空间

    理论指导实践,实践验证理论

    正在浏览标签为 hadoop 的文章

    根据MapReduce计算的流程,在Map阶段选择好KeyValue,然后在reduce中输出计算结果,计算UV的话,最终的结果只是一个数字

    我最开始的思路是:

    map阶段选择常数1作为key,uid作为value,在reduce阶段将map输出放置到HashSet中排重,输出hashSet的size即为正确的UV

    我运行的日志如下:

    11/08/22 16:14:54 INFO mapred.JobClient:   FileSystemCounters
    11/08/22 16:14:54 INFO mapred.JobClient:     FILE_BYTES_READ=817583842
    11/08/22 16:14:54 INFO mapred.JobClient:     HDFS_BYTES_READ=11406242768
    11/08/22 16:14:54 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1459132572
    11/08/22 16:14:54 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=9
    11/08/22 16:14:54 INFO mapred.JobClient:   Map-Reduce Framework
    11/08/22 16:14:54 INFO mapred.JobClient:     Reduce input groups=1
    11/08/22 16:14:54 INFO mapred.JobClient:     Combine output records=0
    11/08/22 16:14:54 INFO mapred.JobClient:     Map input records=5097005
    11/08/22 16:14:54 INFO mapred.JobClient:     Reduce shuffle bytes=0
    11/08/22 16:14:54 INFO mapred.JobClient:     Reduce output records=1
    11/08/22 16:14:54 INFO mapred.JobClient:     Spilled Records=17676018
    11/08/22 16:14:54 INFO mapred.JobClient:     Map output bytes=61164060
    11/08/22 16:14:54 INFO mapred.JobClient:     Combine input records=0
    11/08/22 16:14:54 INFO mapred.JobClient:     Map output records=5097005
    11/08/22 16:14:54 INFO mapred.JobClient:     Reduce input records=5097005
    从以上红色标记日志可以看出处理记录数为5097005条,但reduce分组为1,reduce输出结果也为1
    这是因为key值相同,在mapreduce过程中会将相同的key归并在一起,不难理解这种情况仅仅只有一个reduce任务
    由此造成的后果是reduce任务只会在集群其中一个节点上运行,不仅没有利用好集群处理能力,反倒因为大量数据集中而导致计算效率低下,如果出现OOM异常,那也算正常(因为将所有用户都放置到一个hashset中)
    改进:
    既然reduce任务负载不均衡是因为key单一导致,那可以打散key,把reduce任务分解成N个,分配到整个集群上计算;这就可以解决问题
    我在map阶段把uid整数化后(为什么要整数化,而不直接用字符串?这个大家都应该清楚,整数相对字符串而言,整数更节省存储容量,值比较效率更高) 对1000取模充当key,value为uid; 于是就有1000个不同的key,也将产生1000个reduce任务,这样就充分利用了集群的计算能力;
    map阶段     key: uid%1000                   value: uid
    这样就会产生0-999的key,reduce部分不需要改动,输出的结果可能如下:

    uid%1000=0       XXX

    uid%1000=1        XXX

    uid%1000=2       XXXX

    ……………………….

    uid%1000=998   XXXX

    uid%1000=999   XXXX

    上面输出的结果并非我需要的,但无碍,可以在这基础上再做mapreduce一遍,reduce任务里求和就可以了

    map代码:
    public void map(Object key, Text value, Context ctx)
    throws IOException, InterruptedException {
    try {

    String[] elements = value.toString().split(“\\|”); // 按|分隔符打散

    String userid = elements[7]; // 获取用户标识

    long uidHash = NetUtil.hash(NetUtil.computeMd5(userid), 0); // 哈希转整数

    int mapKey = (int) (uidHash % 1000);//取模将key打散

    ctx.write(new IntWritable(mapKey), new LongWritable(uidHash));

    } catch (Exception e) {
    e.printStackTrace();
    return;
    }
    }

    reduce代码

    public void reduce(IntWritable key, Iterable<LongWritable> values,
    Context ctx) throws IOException, InterruptedException {

    Set<Long> uidSet = new HashSet<Long>();

    Iterator<LongWritable> iter = values.iterator();
    while (iter.hasNext()) {
    long uid = iter.next().get();
    uidSet.add(uid);
    }

    ctx.write(key, new LongWritable(uidSet.size()));
    }

    运行后的日志如下:

    11/08/22 16:08:39 INFO mapred.JobClient:   FileSystemCounters
    11/08/22 16:08:39 INFO mapred.JobClient:     FILE_BYTES_READ=817583986
    11/08/22 16:08:39 INFO mapred.JobClient:     HDFS_BYTES_READ=11406242768
    11/08/22 16:08:39 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1459132860
    11/08/22 16:08:39 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=7890
    11/08/22 16:08:39 INFO mapred.JobClient:   Map-Reduce Framework
    11/08/22 16:08:39 INFO mapred.JobClient:     Reduce input groups=1000
    11/08/22 16:08:39 INFO mapred.JobClient:     Combine output records=0
    11/08/22 16:08:39 INFO mapred.JobClient:     Map input records=5097005
    11/08/22 16:08:39 INFO mapred.JobClient:     Reduce shuffle bytes=0
    11/08/22 16:08:39 INFO mapred.JobClient:     Reduce output records=1000
    11/08/22 16:08:39 INFO mapred.JobClient:     Spilled Records=17676018
    11/08/22 16:08:39 INFO mapred.JobClient:     Map output bytes=61164060
    11/08/22 16:08:39 INFO mapred.JobClient:     Combine input records=0
    11/08/22 16:08:39 INFO mapred.JobClient:     Map output records=5097005
    11/08/22 16:08:39 INFO mapred.JobClient:     Reduce input records=5097005
    同样的输入记录数,但可以看到以分散成了1000个reduce任务,输出结果同样是1000个

    hive特性:

    数据存储在hdfs上,依托hadoop集群实现并行计算

    采用hiveQL作为查询语言,与SQL极其相似

    Hive中存储的数据无固定格式要求,可随用户自定义

    可追加数据,但不支持更改

    可扩展性强,支持大规模并行计算

     

    hive安装
    请确保 hadoop集群处在运行状态
    当前用户环境变量中有HADOOP_HOME,如果不设置,hive没法运行

    $cd /opt/soft
    $wget  http://apache.etoak.com/hive/hive-0.7.0/hive-0.7.0-bin.tar.gz
    $tar -zxvf  hive-0.7.0-bin.tar.gz
    $mv hive-0.7.0 hive
    $cd hive/conf

    $cp hive-default.xml hive-site.xml
    $cd ../bin
    $./hive   启动hive进入命令行

    操作指南
    可见官网文档:https://cwiki.apache.org/confluence/display/Hive/GettingStarted#GettingStarted-SQLOperations

    Hive提供了很多的函数,可以在命令行下show functions罗列所有的函数
    你会发现这些函数名与mysql的很相近,绝大多数相同的,
    可通过describe function functionName 查看函数使用方法

    hive支持的数据类型很简单就int,string等原子类型, 连日期时间类型也不支持,

    但通过to_date unix_timestamp date_diff date_add date_sub等函数就能完成mysql同样的时间日期复杂操作

    分区
    hive与mysql分区有些区别,mysql分区是用表结构中的字段来分区(range,list,hash等),而hive不同,他需要手工指定分区列,这个列是独立于表结构,但属于表中一列,在加载数据时手动指定分区

    试用:
    create table mytest(id int,name string ,regtime string) PARTITIONED by (ds string) row format delimited fields terminated by ‘\t’ stored as textfile ;

    创建一个数据文件
    1       sawenlee        2011-07-28 22:23:12
    2       tonylee 2011-08-03 22:24:21
    3       nick    2011-08-02 6:2:43

    执行
    LOAD DATA LOCAL INPATH ‘../data/test.txt’ OVERWRITE INTO TABLE mytest  PARTITION (ds=’2011-08-02′);

    LOAD DATA LOCAL INPATH ‘../data/test.txt’ OVERWRITE INTO TABLE mytest  PARTITION (ds=’2011-08-03′);

    再看hive文件结构

    如果执行一个查询,hive会将其转换成map reduce在hadoop上执行(select * from mytest除外)

    select * from mytest where to_date(regtime) >to_date(’2011-8-1′);

    在两台测试服务器(redhat 5 )搭建分布式的hadoop集群,服务器 jdk,ssh,rsync等都安装好

    10.11.20.140  server1   #master   将主机名设置成server1方法:以root身份执行hostname server1即可

    10.11.20.139  server2   #slave

    第一步,新建帐户

    在server1和server2上同时做操作:

    useradd dev  ; passwd dev;

    mkdirs /opt/soft/hadoop/

    chown dev.dev /opt/soft/hadoop -R

    第二步,下载安装程序

    版本:0.20.2

    [server1] cd /opt/soft/hadoop/

    [server1] wget http://mirror.bjtu.edu.cn/apache/hadoop/common/hadoop-0.20.2/hadoop-0.20.2.tar.gz

    第三步,安装配置

    [server1]  tar -zxvf hadoop-0.20.2.tar.gz

    [server1]  mv hadoop-0.20.2 hadoop

    [server1]  cd hadoop/conf/

    [server1]  vi masters  将内容设置成server1 ,注意:这里设置的并不是namenode,而是secondaryNameNode,决定谁是namenode的配置是core-site.xml#fs.default.name

    vi slaves 将内容设置成server2

    [server1]  vi core-site.xml 修改成

    <configuration>
    <property>
    <name>fs.default.name</name>
    <value>hdfs://server1:9770</value>
    </property>
    </configuration>

    [server1] vi hdfs-site.xml 修改成

    <configuration>
    <property>
    <name>dfs.name.dir</name>
    <value>/opt/soft/hadoop/data/dfs.name.dir</value>
    </property>
    <property>
    <name>dfs.data.dir</name>
    <value>/opt/soft/hadoop/data/dfs.data.dir</value>
    </property>
    <property>
    <name>dfs.permissions</name>
    <value>false</value>
    </property>
    <property>
    <name>dfs.replication</name>
    <value>1</value>
    </property>
    </configuration>

    [server1] vi mapred-site.xml 修改成

    <configuration>
    <property>
    <name>mapred.job.tracker</name>
    <value>server1:9771</value>
    </property>
    <property>
    <name>mapred.system.dir</name>
    <value>/opt/soft/hadoop/system/mapred.system.dir</value>
    </property>
    <property>
    <name>mapred.local.dir</name>
    <value>/opt/soft/hadoop/data/mapred.local.dir</value>
    </property>
    </configuration>

    [server1] vi hadoop-env.sh 设置HADOOP_HOME  JAVA_HOME变量

    export JAVA_HOME=/usr/local/jdk

    export HADOO_HOME=/opt/soft/hadoop

    [server1] scp -r hadoop dev@server2:/opt/soft/

    值得注意的是Hadoop集群的所有机器的配置应该保持一致,一般我们在配置完master后,使用scp将配置文件同步到集群的其它服务器上

    第四步 设置机器互通(master到slave不需要密码自动登录)

    [server1]  ssh-keygen   一直按回车直到执行结束

    [server1]  cat ~/.ssh/id_rsa.pub >>authorized_keys

    [server1]  scp ~/.ssh/id_rsa.pub server2:~/.ssh/

    [server2]  cat ~/.ssh/id_rsa.pub >>authorized_keys

    如果在server1上执行ssh localhost或者ssh server2还需要密码的话,就分别在两台机器上以root身份执行chmod 755 authorized_keys

    另机器可能设置了访问限制,请确认/etc/hosts.allow  /etc/hosts.deny 中是允许localhost访问localhost的

    第五步,启动服务

    [server1] bin/hadoop namenode -format

    [server1] bin/start-all.sh

    如果报Exception in thread “IPC Client (47) connection to localhost/127.0.0.1:9000 from root” java.lang.RuntimeException: readObject can’t find class   可能是端口被占用可以查看$HADOOP_HOME/logs/下日志

    第六步,验证

    如果没有错误提示或者出现文件列表,那么恭喜你,Hadoop成功启动了,另外,我们可以通过访问http://10.11.20.140:50070来查看hdfs的状态,访问http://10.11.20.140:50030来查看map/reduce的状态。
    如果出现错误,或Hadoop集群未启动,可以查看$HADOOP_HOME/logs/下的日志文件。

     

    如果要增加更多的节点,只需在conf/slaves中新增节点的ip or name ,整个按照步骤将hadoop程序复制到节点上即可