lee的个人空间

    理论指导实践,实践验证理论

    正在浏览 Java 里的文章

    根据MapReduce计算的流程,在Map阶段选择好KeyValue,然后在reduce中输出计算结果,计算UV的话,最终的结果只是一个数字

    我最开始的思路是:

    map阶段选择常数1作为key,uid作为value,在reduce阶段将map输出放置到HashSet中排重,输出hashSet的size即为正确的UV

    我运行的日志如下:

    11/08/22 16:14:54 INFO mapred.JobClient:   FileSystemCounters
    11/08/22 16:14:54 INFO mapred.JobClient:     FILE_BYTES_READ=817583842
    11/08/22 16:14:54 INFO mapred.JobClient:     HDFS_BYTES_READ=11406242768
    11/08/22 16:14:54 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1459132572
    11/08/22 16:14:54 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=9
    11/08/22 16:14:54 INFO mapred.JobClient:   Map-Reduce Framework
    11/08/22 16:14:54 INFO mapred.JobClient:     Reduce input groups=1
    11/08/22 16:14:54 INFO mapred.JobClient:     Combine output records=0
    11/08/22 16:14:54 INFO mapred.JobClient:     Map input records=5097005
    11/08/22 16:14:54 INFO mapred.JobClient:     Reduce shuffle bytes=0
    11/08/22 16:14:54 INFO mapred.JobClient:     Reduce output records=1
    11/08/22 16:14:54 INFO mapred.JobClient:     Spilled Records=17676018
    11/08/22 16:14:54 INFO mapred.JobClient:     Map output bytes=61164060
    11/08/22 16:14:54 INFO mapred.JobClient:     Combine input records=0
    11/08/22 16:14:54 INFO mapred.JobClient:     Map output records=5097005
    11/08/22 16:14:54 INFO mapred.JobClient:     Reduce input records=5097005
    从以上红色标记日志可以看出处理记录数为5097005条,但reduce分组为1,reduce输出结果也为1
    这是因为key值相同,在mapreduce过程中会将相同的key归并在一起,不难理解这种情况仅仅只有一个reduce任务
    由此造成的后果是reduce任务只会在集群其中一个节点上运行,不仅没有利用好集群处理能力,反倒因为大量数据集中而导致计算效率低下,如果出现OOM异常,那也算正常(因为将所有用户都放置到一个hashset中)
    改进:
    既然reduce任务负载不均衡是因为key单一导致,那可以打散key,把reduce任务分解成N个,分配到整个集群上计算;这就可以解决问题
    我在map阶段把uid整数化后(为什么要整数化,而不直接用字符串?这个大家都应该清楚,整数相对字符串而言,整数更节省存储容量,值比较效率更高) 对1000取模充当key,value为uid; 于是就有1000个不同的key,也将产生1000个reduce任务,这样就充分利用了集群的计算能力;
    map阶段     key: uid%1000                   value: uid
    这样就会产生0-999的key,reduce部分不需要改动,输出的结果可能如下:

    uid%1000=0       XXX

    uid%1000=1        XXX

    uid%1000=2       XXXX

    ……………………….

    uid%1000=998   XXXX

    uid%1000=999   XXXX

    上面输出的结果并非我需要的,但无碍,可以在这基础上再做mapreduce一遍,reduce任务里求和就可以了

    map代码:
    public void map(Object key, Text value, Context ctx)
    throws IOException, InterruptedException {
    try {

    String[] elements = value.toString().split(“\\|”); // 按|分隔符打散

    String userid = elements[7]; // 获取用户标识

    long uidHash = NetUtil.hash(NetUtil.computeMd5(userid), 0); // 哈希转整数

    int mapKey = (int) (uidHash % 1000);//取模将key打散

    ctx.write(new IntWritable(mapKey), new LongWritable(uidHash));

    } catch (Exception e) {
    e.printStackTrace();
    return;
    }
    }

    reduce代码

    public void reduce(IntWritable key, Iterable<LongWritable> values,
    Context ctx) throws IOException, InterruptedException {

    Set<Long> uidSet = new HashSet<Long>();

    Iterator<LongWritable> iter = values.iterator();
    while (iter.hasNext()) {
    long uid = iter.next().get();
    uidSet.add(uid);
    }

    ctx.write(key, new LongWritable(uidSet.size()));
    }

    运行后的日志如下:

    11/08/22 16:08:39 INFO mapred.JobClient:   FileSystemCounters
    11/08/22 16:08:39 INFO mapred.JobClient:     FILE_BYTES_READ=817583986
    11/08/22 16:08:39 INFO mapred.JobClient:     HDFS_BYTES_READ=11406242768
    11/08/22 16:08:39 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1459132860
    11/08/22 16:08:39 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=7890
    11/08/22 16:08:39 INFO mapred.JobClient:   Map-Reduce Framework
    11/08/22 16:08:39 INFO mapred.JobClient:     Reduce input groups=1000
    11/08/22 16:08:39 INFO mapred.JobClient:     Combine output records=0
    11/08/22 16:08:39 INFO mapred.JobClient:     Map input records=5097005
    11/08/22 16:08:39 INFO mapred.JobClient:     Reduce shuffle bytes=0
    11/08/22 16:08:39 INFO mapred.JobClient:     Reduce output records=1000
    11/08/22 16:08:39 INFO mapred.JobClient:     Spilled Records=17676018
    11/08/22 16:08:39 INFO mapred.JobClient:     Map output bytes=61164060
    11/08/22 16:08:39 INFO mapred.JobClient:     Combine input records=0
    11/08/22 16:08:39 INFO mapred.JobClient:     Map output records=5097005
    11/08/22 16:08:39 INFO mapred.JobClient:     Reduce input records=5097005
    同样的输入记录数,但可以看到以分散成了1000个reduce任务,输出结果同样是1000个

    最近自己写的nio程序持续报错(client-server端借助mina实现)

    DefaultExceptionMonitor.exceptionCaught(45) 2011-09-02 09:40:57 | Unexpected exception.
    java.nio.channels.ClosedSelectorException
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:66)
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
    at org.apache.mina.transport.socket.nio.NioProcessor.select(NioProcessor.java:69)
    at org.apache.mina.core.polling.AbstractPollingIoProcessor$Processor.run(AbstractPollingIoProcessor.java:961)
    at org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:64)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
    at java.lang.Thread.run(Thread.java:595)

    通过搜索引擎没有查出问题,但可能的原因是线程阻塞在Selector.select()时,调用了wakeup方法,而此时selector已经关闭

    与淘宝大牛 @dennis zhuang 对这个问题做了一次交流,他与我一致认为问题不在mina上,而在于使用错误,导致mina关闭了selector。

    因client每发送一次消息都会报一次异常,Dennis 以此推断是我用短连接但没有复用NioSocketConnector造成的,client端发送完消息关闭NioSocketConnector就可能报ClosedSelectorException问题,这涉及mina的内部实现:

    当client端关闭connector,mina要关闭NioProcessor线程,此时线程正阻塞在select上,所以先关闭selector,然后唤醒它,mina就会提示selector已经被关闭

    此外创建NioSocketConnector代价太大,所以即使用短连接也得考虑复用它,而不是每通信完一次,就将其关闭,这样就可以解决异常问题(复用connector不关闭它),同时还能提高性能。当然最好的是用mina构建client-server的长连接,实现重连机制,通信稳定且性能更优!

    项目地址:http://code.google.com/p/newrsslib4j/

    源码下载:svn checkout http://newrsslib4j.googlecode.com/svn/trunk/ newrsslib4j-read-only

    下载jar包:   http://code.google.com/p/newrsslib4j/downloads/list

    描述:在rsslib4j源码基础上,保持外接口不变,添加了rss字节流编码探测及内部编码转换,理论上支持所有的编码,组件使用较原版更健壮、更稳定


    change log

    #1,引入了mozilla的jchardet包,作为组件一部分,用于探测rss字节流编码,新增了一辅助类Chaset.java 提供了几个静态编码探测方法:
    String guess(URL url);
    String guess(String path)
    String guess(InputStream in)


    #2,引入UnicodeReader替换原有的InputStreamReader,不用InputStreamReader的 原因在于某些UTF-8编码开始会带有BOM (Byte Order Mark) ;导致解析xml失败:Content is not allowed in prolog. 这是jdk1.5的bug,1.6已修复;在sun的buglist里发现了这个问题,并找到了UnicodeReader


    #3,改造RSSParser解析类,新加一个类属性 private InputSource is; 替换原有的InputStream,因原有没有考虑国际化编码问题,而InputSource可以Reader构造产生,改造完如下:
    /**
    * Set rss resource by URL
    * @param ur the remote url
    * @throws RSSException
    */
    public void setXmlResource(URL ur) throws RSSException{
    try{

    URLConnection con = u.openConnection();
    con.setReadTimeout(10000);
    String charset = Charset.guess(ur);
    is = new InputSource (new UnicodeReader(con.getInputStream(),charset));
    if (con.getContentLength() == -1 && is == null){
    this.fixZeroLength();
    }
    }catch(IOException e){
    throw new RSSException(“RSSParser::setXmlResource fails: “+e.getMessage());
    }
    }


    user guide
    代码片段:
    URL url = new URL(“http://cn.engadget.com/rss.xml”);
    RSSHandler handler = new RSSHandler();   RSSParser.parseXmlFile(url, handler, false);
    RSSChannel ch = handler.getRSSChannel();
    System.out.println(ch.toString());
    List<RSSItem> lst = handler.getRSSChannel().getItems();
    for (int j = 0; j < lst.size(); j++) {
    RSSItem itm = lst.get(j);
    System.out.println(itm.toString());
    }

    准备

    1. jdk1.6及以上
    2. cygwin
    3. eclipse3.X
    4. hadoop-0.20.2.tar.gz
    5. 参见hadoop分布式集群搭建一文

    cygwin安装
    进入cygwin首页,点击setup.exe
    在线安装,选择一个镜像点直接下一步;如果安装不成功,则换一个镜像

    默认的cygwin安装是不带openssh的以及vi工具的;前者是hadoop环境必备的,后者则用来编辑配置文件

    可以重新再点击setup.exe,进入到软件列表里,在查询框分别输入openssh 、vi
    点击条目变成install,而非skip ,下一步即可实现在线安装


    配置sshd
    $ ssh-keygen   一直按回车直到执行结束

    $ cat ~/.ssh/id_rsa.pub >>authorized_keys
    执行ssh localhost还需要密码的话,就分别执行chmod 755 authorized_keys


    安装hadoop
    将hadoop-0.20.2.tar.gz拷贝到CYGWIN_HOME/home/USER_NAME/下
    $ tar -zxvf hadoop-0.20.2.tar.gz
    $ mv hadoop-0.20.2 hadoop
    $ cd hadoop/conf
    $ vi hadoop-env.sh  设置JAVA_HOME 如果不设置则无法启动hadoop

    $ vi core-site.xml
    内容修改为:
    <property>
    <name>fs.default.name</name>
    <value>hdfs://localhost:9770</value>
    </property>

    $ vi mapred-site.xml    我的cygwin_home是c:\cygwin 用户名是sawenlee
    内容修改为
    <property>
    <name>mapred.job.tracker</name>
    <value>localhost:9771</value>
    </property>
    <property>
    <name>mapred.system.dir</name>
    <value>C:\cygwin\home\sawenlee\hadoop\system\mapred.system.dir</value>
    </property>
    <property>
    <name>mapred.local.dir</name>
    <value>C:\cygwin\home\sawenlee\hadoop\data\mapred.local.dir</value>
    </property>
    <property>
    <name>mapred.child.tmp</name>
    <value>C:\cygwin\home\sawenlee\hadoop\data\temp</value>
    </property>

    $ vi hdfs-site.xml
    内容修改为
    <property>
    <name>dfs.name.dir</name>
    <value>C:\cygwin\home\sawenlee\hadoop\data\dfs.name.dir</value>
    </property>
    <property>
    <name>dfs.data.dir</name>
    <value>C:\cygwin\home\sawenlee\hadoop\data\dfs.data.dir</value>
    </property>
    <property>
    <name>dfs.permissions</name>
    <value>false</value>
    </property>
    <property>
    <name>dfs.replication</name>
    <value>1</value>
    </property>


    启动hadoop服务
    $ bin/hadoop namenode -format  #格式化namenode
    $ bin/start-all.sh  启动namenode datanode jobtracker tasktracker等


    配置eclipse插件
    将hadoop目录下的contrib/eclipse-plugin.jar拷贝到eclipse安装目录的plugins下
    重新启动eclipse即可看到右上角视图中多了一个map/reduce视角

    配置hadoop_home
    window–>preferences–>map/reduce设置hadoop安装路径,必须与服务配置一致,不然会产生意料不到的问题

    配置hdfs location
    在map/reduce视图下,new hadoop location…
    如下图配置: 注意第一个端口是在core-site.xml中配置的,第二个是在mapred-site.xml配置的,根据实际情况填写

    如果配置成功可以直接浏览到hdfs上的文件,如下图


    运行本地测试程序
    new project—>map/reduce project —>new class

    public class HdfsCat {

    /**
    * @param args
    * @throws IOException
    */
    public static void main(String[] args) throws IOException {
    String uri = “hdfs://localhost:9770/tmp/wordcount/test.txt”;
    Configuration cnf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri),cnf);
    InputStream in = null;
    try {
    in = fs.open(new Path(uri));
    IOUtils.copyBytes(in, System.out, 4096,false);
    }finally{
    IOUtils.closeStream(in);
    }
    }
    }

    直接run,如果没有出错,在控制台看到文件内容了,那么恭喜你,本地调试环境搭建成功了

    在两台测试服务器(redhat 5 )搭建分布式的hadoop集群,服务器 jdk,ssh,rsync等都安装好

    10.11.20.140  server1   #master   将主机名设置成server1方法:以root身份执行hostname server1即可

    10.11.20.139  server2   #slave

    第一步,新建帐户

    在server1和server2上同时做操作:

    useradd dev  ; passwd dev;

    mkdirs /opt/soft/hadoop/

    chown dev.dev /opt/soft/hadoop -R

    第二步,下载安装程序

    版本:0.20.2

    [server1] cd /opt/soft/hadoop/

    [server1] wget http://mirror.bjtu.edu.cn/apache/hadoop/common/hadoop-0.20.2/hadoop-0.20.2.tar.gz

    第三步,安装配置

    [server1]  tar -zxvf hadoop-0.20.2.tar.gz

    [server1]  mv hadoop-0.20.2 hadoop

    [server1]  cd hadoop/conf/

    [server1]  vi masters  将内容设置成server1 ,注意:这里设置的并不是namenode,而是secondaryNameNode,决定谁是namenode的配置是core-site.xml#fs.default.name

    vi slaves 将内容设置成server2

    [server1]  vi core-site.xml 修改成

    <configuration>
    <property>
    <name>fs.default.name</name>
    <value>hdfs://server1:9770</value>
    </property>
    </configuration>

    [server1] vi hdfs-site.xml 修改成

    <configuration>
    <property>
    <name>dfs.name.dir</name>
    <value>/opt/soft/hadoop/data/dfs.name.dir</value>
    </property>
    <property>
    <name>dfs.data.dir</name>
    <value>/opt/soft/hadoop/data/dfs.data.dir</value>
    </property>
    <property>
    <name>dfs.permissions</name>
    <value>false</value>
    </property>
    <property>
    <name>dfs.replication</name>
    <value>1</value>
    </property>
    </configuration>

    [server1] vi mapred-site.xml 修改成

    <configuration>
    <property>
    <name>mapred.job.tracker</name>
    <value>server1:9771</value>
    </property>
    <property>
    <name>mapred.system.dir</name>
    <value>/opt/soft/hadoop/system/mapred.system.dir</value>
    </property>
    <property>
    <name>mapred.local.dir</name>
    <value>/opt/soft/hadoop/data/mapred.local.dir</value>
    </property>
    </configuration>

    [server1] vi hadoop-env.sh 设置HADOOP_HOME  JAVA_HOME变量

    export JAVA_HOME=/usr/local/jdk

    export HADOO_HOME=/opt/soft/hadoop

    [server1] scp -r hadoop dev@server2:/opt/soft/

    值得注意的是Hadoop集群的所有机器的配置应该保持一致,一般我们在配置完master后,使用scp将配置文件同步到集群的其它服务器上

    第四步 设置机器互通(master到slave不需要密码自动登录)

    [server1]  ssh-keygen   一直按回车直到执行结束

    [server1]  cat ~/.ssh/id_rsa.pub >>authorized_keys

    [server1]  scp ~/.ssh/id_rsa.pub server2:~/.ssh/

    [server2]  cat ~/.ssh/id_rsa.pub >>authorized_keys

    如果在server1上执行ssh localhost或者ssh server2还需要密码的话,就分别在两台机器上以root身份执行chmod 755 authorized_keys

    另机器可能设置了访问限制,请确认/etc/hosts.allow  /etc/hosts.deny 中是允许localhost访问localhost的

    第五步,启动服务

    [server1] bin/hadoop namenode -format

    [server1] bin/start-all.sh

    如果报Exception in thread “IPC Client (47) connection to localhost/127.0.0.1:9000 from root” java.lang.RuntimeException: readObject can’t find class   可能是端口被占用可以查看$HADOOP_HOME/logs/下日志

    第六步,验证

    如果没有错误提示或者出现文件列表,那么恭喜你,Hadoop成功启动了,另外,我们可以通过访问http://10.11.20.140:50070来查看hdfs的状态,访问http://10.11.20.140:50030来查看map/reduce的状态。
    如果出现错误,或Hadoop集群未启动,可以查看$HADOOP_HOME/logs/下的日志文件。

     

    如果要增加更多的节点,只需在conf/slaves中新增节点的ip or name ,整个按照步骤将hadoop程序复制到节点上即可

     

    关于XStream

    XStream是一个将对象序列化成XML同样可从XML还原对象的简单类库

    特性.

    • 简单易用. 提供了高级封装来简化用户使用
    • 不需要任何映射 在不需要映射的情况下, 大多数对象可以序列化成xml
    • 性能好,消耗内存低
    • 生成的xml方便阅读,格式紧凑
    • 不需要对对象修改,就可以序列化内部字段,包括private & final。支持非public类和内部类,并且不需要有默认构造器
    • 定制化转换策略

    应用场景

    • 协议传输
    • 持久化
    • 配置
    • 单元测试

    已知的局限性

    Known Limitations
    If using the enhanced mode, XStream can re-instantiate classes that do not have a default constructor. However, if using a different JVM like an old JRockit version, a JDK 1.3 or you have restrictions because of a SecurityManager, a default constructor is required.
    The enhanced mode is also necessary to restore final fields for any JDK < 1.5. This implies deserialization of instances of an inner class.
    Auto-detection of annotations may cause race conditions. Preprocessing annotations is safe though

    入门资料参见: http://xstream.codehaus.org/tutorial.html

    我们做一个稍微复杂的例子,Object包含类属性,List成员

    Object2XML

    操作的对象如下:

    public class Member {
    private String name;
    private int age;
    //getters and setters are omitted

    public class Team {

    private String name;

    private List<Member> members;

    private Member leader;
    //getters and setters are omitted
    }

    我们定义一个Writer调用Xstream API直接输出文件:

    Member member1 = new Member(“lee”,28);
    Member member2 = new Member(“tae”,26);
    Member leader = new Member(“Kal”,30);
    List<Member> memberList = new ArrayList<Member>();
    memberList.add(member1);
    memberList.add(member2);
    memberList.add(leader); 

    Team team = new Team();
    team.setLeader(leader);
    team.setName(“Mobile Technology Center”);
    team.setMembers(memberList);

    // Serialize the object
    XStream xs = new XStream();
    try
    { FileOutputStream fs = new FileOutputStream( “d:/tmp/team1.txt”); xs.toXML(team, fs); }

    catch (FileNotFoundException e1)
    { e1.printStackTrace(); }

    输出文件:
    <com.sohu.xstream.mine.Team>
    <name>Mobile Technology Center</name>
    <members>
    <com.sohu.xstream.mine.Member>
    <name>lee</name>
    <age>28</age>
    </com.sohu.xstream.mine.Member>
    <com.sohu.xstream.mine.Member>
    <name>tae</name>
    <age>26</age>
    </com.sohu.xstream.mine.Member>
    <com.sohu.xstream.mine.Member>
    <name>Kal</name>
    <age>30</age>
    </com.sohu.xstream.mine.Member>
    </members>
    <leader reference=”../members/com.sohu.xstream.mine.Member3″/>
    </com.sohu.xstream.mine.Team>

    这显然不可能拿来当协议输出,至少应该把类的package去掉,另外leader节点也没有输出正常的内容,Xstream提供了良好的扩展,可以自定义输出格式,只需要注册一个Converter即可
    我们定义TeamConverter覆盖marshal方法

    @Override
    public void marshal(Object obj, HierarchicalStreamWriter writer,
    MarshallingContext ctx) {
    Team team = (Team) obj;
    Member leader = team.getLeader();
    writer.startNode(“Name”);
    writer.setValue(team.getName());
    writer.endNode();
    writer.startNode(“leader”);
    writer.setValue(leader.getName());
    writer.endNode(); 

    writer.startNode(“members”);
    for (Member member : team.getMembers())
    { writer.startNode(“Member”); // writer.setValue(member.) writer.addAttribute(“Name”, member.getName()); writer.addAttribute(“age”, member.getAge()+”"); writer.endNode(); }

    writer.endNode();
    }

    客户端调用时加上属性设置和注册转换器:
    xs.alias(Team.class.getSimpleName(),Team.class);
    xs.alias(Member.class.getSimpleName(), Member.class);
    xs.registerConverter(new TeamConverter());
    这样输出的文档就比较友好了:
    <Team>
    <Name>Mobile Technology Center</Name>
    <leader>Kal</leader>
    <members>
    <Member Name=”lee” age=”28″/>
    <Member Name=”tae” age=”26″/>
    <Member Name=”Kal” age=”30″/>
    </members>
    </Team>

    XML2Object

    在TeamConverter中覆盖unmarshal方法

    @Override
    public Object unmarshal(HierarchicalStreamReader reader,
    UnmarshallingContext ctx) { 

    Team team = new Team();
    reader.moveDown();
    team.setName(reader.getValue());
    reader.moveUp();
    reader.moveDown();
    team.setLeader(new Member(reader.getValue(), 20));
    reader.moveUp();
    reader.moveDown();
    List<Member> memberList = new ArrayList<Member>();
    while (reader.hasMoreChildren()) {
    reader.moveDown();
    Member member = new Member(reader.getAttribute(0),
    Integer.parseInt(reader.getAttribute(1)));
    System.out.println(reader.getNodeName() + reader.getValue() + “|”
    + reader.getAttributeCount());
    memberList.add(member);
    reader.moveUp();
    }
    team.setMembers(memberList);
    reader.moveUp();
    return team;
    }

    然后在客户端调用反序列类Reader

    XStream xs = new XStream(new DomDriver());
    Team t = new Team(); 

    try {
    FileInputStream fis = new FileInputStream(“d:/tmp/team.txt”);
    xs.alias(Team.class.getSimpleName(), Team.class);
    xs.alias(Member.class.getSimpleName(), Member.class);
    xs.registerConverter(new TeamConverter());
    Object obj = xs.fromXML(fis);
    t = (Team) obj;

    // print the data from the object that has been read
    System.out.println(t.getName());

    //why not work???
    // xs.fromXML(fis, t);
    // System.out.println(t.getName());
    } catch (FileNotFoundException ex) {
    ex.printStackTrace();
    }

    经过测试是成功的!

    但Reader里调用xs.fromXML(InputStream,Object)方法不起作用,暂时还未找到原因

    //it doesn’t work
    xs.fromXML(fis, t);
    System.out.println(t.getName()); 

    附上源码:xstream

    通过实践,有两种方式是可实现SS的汉化

    第一种,利用Spring webmvc的本地化类CookieLocaleResolver

    这种方式需要引用springframework webmvc的jar包;

    源码:simpleDemo_db_i18n 如部署有问题直达:Spring Security入门篇——集成DB认证

    在applicationContext.xml里定义它,并制定默认本地语言

    1. <bean
    2. id=“localeResolver”
    3. class=“org.springframework.web.servlet.i18n.CookieLocaleResolver”>
    4. <property
    5. name=“defaultLocale”
    6. value=“zh_CN” />
    7. </bean>

    同时在security.xml中指定messageSource所在位置:

    1. <beans:bean
    2. id=“messageSource”
    3. class=“org.springframework.context.support.ResourceBundleMessageSource”>
    4. <beans:property
    5. name=“basenames”>
    6. <beans:list>
    7. <beans:value>messages</beans:value>
    8. <beans:value>org/springframework/security/messages</beans:value>
    9. </beans:list>
    10. </beans:property>
    11. </beans:bean>

    注意:我的applicationContext.xml和security.xml定义bean的命名空间不一样;

    这时候重新启动应用,尝试用错误的用户名密码登录,看提示有没有变化,是否跟下图一样

    SS的中文化也确认让人蛋疼的,命名用户名错误,翻译成了坏的凭证,这个问题可以修改下Messages_zh_CN.properties中的定义

     

    第二种方案:解压出messages_zh_CN.properties放置到项目的classpath(resources)下

    资源文件位于spring-security-core*.*的jar包的org.springframework.security目录下,将其解压后把messages_zh_CN.properties复制到resources的目录下

    在security.xml中定义messageSource

    1. <beans:bean
    2. id=“messageSource”
    3. class=“org.springframework.context.support.ReloadableResourceBundleMessageSource”>
    4. <beans:property
    5. name=“basename”
    6. value=“classpath:messages_zh_CN” />
    7. </beans:bean>

    再将其注入到authProvider

    1. <beans:property
    2. name=“messageSource”
    3. ref=“messageSource” />

    重启应用,用错误用户名,密码登录同样可看到应用汉化了。

    UserDetails在Spring Security里是用户的信息持有者,包括用户名,是否激活,拥有的权限等信息,用户登录成功后SS会将用户信息(UserDetails)存放在SecurityContextHolder的Context里,我们在项目中所需要的信息远远不止这些;email,phone,真实姓名等SS就没法满足需求,还好SS提供了良好的扩展。

    我们来看UserDetails类的层次结构:userdetail我们先看下面的LdapUserDetails部分:LdapUserDetails继承自UserDetails,并附加了一个dn属性,接下来看他的实现,最底层InetOrgPerson类包含有email,homeAddress,postalCode等从Ldap获取的详细信息,所以ldap部分我们可以忽视UserDetails接口,而直接用InetOrgPerson就可以获取更丰富的用户信息了,获取方式如下:



    1. /**  

    2. * 取得当前用户  

    3. * @return UserDetails  

    4. */  

    5. public static InetOrgPerson getLdapUserDetails() {   

    6. SecurityContext sc = SecurityContextHolder.getContext();   

    7. Authentication auth = sc.getAuthentication();   

    8. Object principal = auth.getPrincipal();   

    9. if (principal instanceof InetOrgPerson) {   

    10. return (InetOrgPerson) principal;   

    11. else {   

    12. return null;   

    13. }   

    14. }  

    这种方式仅适用于LDAP认证的场景,如果是DB认证就需要多做点工作了,但我们也可以从默认实现方式上获取灵感,参见org.springframework.security.core.userdetails.jdbc.JdbcDaoImpl类,SS就是通过它从数据库中获取用户信息的,这样我们就可以模仿它来从数据库获取更多的信息比如email和accountId 我们首先定义一个可以取代UserDetails的类


    1. public class Account implements java.io.Serializable,UserDetails {   

    2.  private int accountId;   

    3.  private String accountName;   

    4.  private String email;   

    5.  private String password;   

    6.  private Collection grantedAuths = new HashSet();   

    7. //getters and setters are omitted   

    8. }  

    然后模仿JdbcDaoImpl重新建一个public class DatabaseAuthProvider extends JdbcDaoSupport implements UserDetailsService
    在这里就可以重新定义规则了,比如rolePrefix以及根据用户名获取权限等不再对外提供可配置了,SS考虑通用性所以必须提供,咱自定义需求就没太大必要,所以把所有属性的setters都remove掉。调整类构造器并覆盖UserDetails loadUserByUsername(String username)
    throws UsernameNotFoundException, DataAccessException 方法。


    1. public DatabaseAuthProvider() {   

    2.         authoritiesByUsernameQuery = “select a.accountName ,r.roleName from account a , role r , ”  

    3.                 + “authority au where au.accountId = a.accountId and au.roleId = r.roleId and a.accountName =?”;   

    4.         usersByUsernameQuery = “SELECT accountName,password,enabled,email,accountId FROM account WHERE accountName = ?”;   

    5.         rolePrefix = “ROLE_”;   

    6.     }   

    7.     public UserDetails loadUserByUsername(String username)   

    8.             throws UsernameNotFoundException, DataAccessException {   

    9.         List<Account> users = loadUsersByUsername(username);   

    10.     if (users.size() == 0) {   

    11.             logger.debug(“Query returned no results for user ’” + username + “‘”);   

    12.         throw new UsernameNotFoundException(   

    13.                     messages.getMessage(“JdbcDaoImpl.notFound”new Object[]{username}, “Username {0} not found”), username);   

    14.         }   

    15.     Account user = users.get(0); // contains no GrantedAuthority[]   

    16.     Set<GrantedAuthority> dbAuthsSet = new HashSet<GrantedAuthority>();   

    17.         dbAuthsSet.addAll(this.loadUserAuthorities(username));   

    18.         user.setGrantedAuths(dbAuthsSet);   

    19.     return user;   

    20.     }   

    21.     /**  

    22.      * Executes the SQL <tt>usersByUsernameQuery</tt> and returns a list of  

    23.      * UserDetails objects. There should normally only be one matching user.  

    24.      */  

    25.     private List<Account> loadUsersByUsername(String username) {   

    26.         return getJdbcTemplate().query(usersByUsernameQuery,   

    27.                 new String[] { username }, new RowMapper<Account>() {   

    28.                     public Account mapRow(ResultSet rs, int rowNum)   

    29.                             throws SQLException {   

    30.                         Account account = new Account();   

    31.                         account.setAccountName(rs.getString(1));   

    32.                         account.setPassword(rs.getString(2));   

    33.                         account.setEmail(rs.getString(4));   

    34.                         account.setAccountId(rs.getInt(5));   

    35.                         return account;   

    36.                     }   

    37.         });   

    38.     }  

    最后再重新配置security.xml的authentication-provider节点


    1. <authentication-manager  

    2.         alias=“authenticationManager”>  

    3.         <authentication-provider  

    4.             user-service-ref=‘dbAuthProvider’>  

    5.             <password-encoder  

    6.                 hash=“plaintext” />  

    7.         </authentication-provider>  

    8.     </authentication-manager>  

    9.   

    10.     <beans:bean  

    11.         id=“dbAuthProvider”  

    12.         class=“org.javali.security.ext.DatabaseAuthProvider”>  

    13.         <beans:property  

    14.             name=“dataSource”  

    15.             ref=“dataSource” />  

    16.     </beans:bean>  

    是不是简洁多了,除了注入jdbcTemplate所需的dataSource,没有任何其他注入

    将扩展的信息显示在profile.jsp页
    accountId: <%=SecurityUtils.getUserDetails().getAccountId() %> accountName: <%=SecurityUtils.getUserDetails().getAccountName() %>
    email: <%=SecurityUtils.getUserDetails().getEmail() %>
    最终显示效果如下图:

    profile

    可以看到accountId和email都已经能正常获取到了。

    附上扩展源码: simpleDemo_db_ext

    Spring Security入门篇——搭建简易权限框架 文中,用户信息是保存在xml中的,对于大型业务系统用户、角色、权限、功能等数据量非常大,关系非常复杂,基于xml配置的方式管理简直就是灾难,我们都会考虑放置到DB或者LDAP来维护。

    电梯直接到 Spring Security进阶——集成LDAP认证

    集成DB认证的Demo源码: simpleDemo_db

    Step1.导入数据库脚本

    见simpleDemo_db#resources目录下的account.sql 导入到mysql库中,对database.properties做相应调整

    数据库结构很简单,只有三个表: 用户表,角色表,权限表;

    db

    Step2.引用数据库相关Jar包,配置spring数据源DataSource

    jar包引用见pom.xml

    数据源配置见resources#spring#applicationContext.xml

    Step3.SS集成DB认证配置

    前面我们已经介绍了xml 以及ldap认证的配置,db认证配置也是相通的,我们只需要提供一个DB鉴权的Provider给它就行了,配置都是从<authentication-manager>节点开始。

    我们定义一个dbAuthProvider注入到authentication-manager,它是由org.springframework.security.core.userdetails.jdbc.JdbcDaoImpl实现的,我们

    可以从源码就能看出第一要给它注入dataSource,然后告诉provider怎样根据用户查找用户的

    密码进行验证(usersByUsernameQuery);通过验证最后再根据用户名获取他的角色列表

    authoritiesByUsernameQuery);如果还有用户组的概念,还可以配置上组权限(groupAuthoritiesByUsernameQuery)

    注意在咱们的Demo程序里,所有角色都加上了前缀ROLE_,所以配置如下:

     

     

     

    1. <beans:bean
    2. id=“dbAuthProvider”
    3. class=“org.springframework.security.core.userdetails.jdbc.JdbcDaoImpl”>
    4. <beans:property
    5. name=“dataSource”
    6. ref=“dataSource” />
    7. <beans:property
    8. name=“usersByUsernameQuery”
    9. value=“SELECT accountName,password,enabled FROM account WHERE accountName = ?” />
    10. <beans:property
    11. name=“authoritiesByUsernameQuery”
    12. value=”select a.accountName ,r.roleName from account a , role r , authority au
    13. where aau.accountId = a.accountId and au.roleId = r.roleId and a.accountName =?” />
    14. <beans:property
    15. name=“rolePrefix”
    16. value=“ROLE_”></beans:property>
    17. </beans:bean>

     

    这类帖子在网上很多,本来没曾想写这篇文章,但为了Security系列的完整性,还是加上了;Spring世界很多东西都是相通的,我们更多的应该学习到方法,正如授人以鱼不如授人以渔,从源码上思考如何配置,比直接从网上拷贝一份完成功能肯定要深刻的多,有意义的多!

    Spring Security提供了良好的LDAP集成方式,基于配置即可

    在simpleDemo项目pom.xml的基础上引入:

    <dependency>
     <groupId>org.springframework.security</groupId>
     <artifactId>spring-security-ldap</artifactId>
     <version>3.0.5.RELEASE</version>
     <type>jar</type>
     <scope>compile</scope>
    </dependency>

    但是集成方式取决于LDAP存储的数据及业务系统需求,需要分两种情况考虑:

    1,LDAP服务器中存有帐号信息和角色信息

    2,LDAP服务器只存了帐号信息

    大多场合都属于后者,因为角色是由业务系统需求决定的,而LDAP在企业仅作为用户信息存储的一种方案,下面就这两种情况来看SS是怎样集成LDAP认证的

    第一种情况:

    LDAP上已经存有用户帐号及角色信息,我们只需要根据SS提供的对外接口,配置上相应的参数就行了,用户密码验证,获取角色等交给SS的LDAP扩展模块自己完成,将security_ldap1覆盖SimpleDemo中的security.xml配置文件即可,建议用Apache Directory Studio了解LDAP数据的内部结构后再替换红色字体部分

    <ldap-server
    id=”sohuLdap”
    port=”389″
    url=”ldap://ldap.sohu-inc.com:389/dc=sohu-inc,dc=com”
    manager-dn=”mobad@sohu-inc.com”
    manager-password=”78963214″ />
    <authentication-manager>
    <ldap-authentication-provider
    server-ref=”sohuLdap”
    user-search-filter=”(sAMAccountName={0})”
    user-search-base=”OU=Users,OU=MTC,OU=BJ-OFFICE”
    group-search-base=”OU=Groups”
    group-role-attribute=”cn”
    group-search-filter=”(member={0})”
    role-prefix=”ROLE_”>
    </ldap-authentication-provider>
    </authentication-manager>
    <ldap-server
     id="myLdap"
     port="389"
     url="ldap://ldap.javali.org:389/dc=javali,dc=org"   #LDAP服务地址
     manager-dn="manager@javali.org"     #提供一个有权限访问ldap服务的帐号
     manager-password="*******" />
     <authentication-manager>
     <ldap-authentication-provider
     server-ref="myLdap"
     user-search-filter="(accountName={0})"     #ldap目录中存帐号名的节点名称
     user-search-base="OU=Users,OU=DEPARTMENT"  #用户查找base
     group-search-base="OU=Groups"
     group-role-attribute="cn"
     group-search-filter="(member={0})"      #用户所属角色的节点名称
     role-prefix="ROLE_">
     </ldap-authentication-provider>
     </authentication-manager>

    第二种情况:

    比第一种情况复杂很多,既然角色由业务系统需求决定,角色信息就保存在业务系统的数据库;处理流程应该是:LDAP负责密码认证,然后在SS-LDAP基础上做扩展,从DB中加载用户角色,思路是正确的,但如何扩展,如何实现,困扰了我很长时间;最后多亏了snowolf的鼎力支持(暗自庆幸身边有spring大牛真好)

    我们在阅读ss集成ldap源码时发现了DefaultLdapAuthoritiesPopulator类,它有一个方法签名

    protected Set<GrantedAuthority> getAdditionalRoles(DirContextOperations user, String username)

    我想你看到方法名就应该知道它的作用了,我们就可以写一个类继承DefaultLdapAuthoritiesPopulator,在getAdditionalRoles方法里加上从数据库获取username的角色列表代码就完事了,代码如下

    public class MyLdapAuthoritiesPopulator extends DefaultLdapAuthoritiesPopulator {
     public MyLdapAuthoritiesPopulator(ContextSource contextSource,
     String groupSearchBase) {
     super(contextSource, groupSearchBase);
     }
     protected Set<GrantedAuthority> getAdditionalRoles(
     DirContextOperations user, String username) {
     // 授权集合
     Set<GrantedAuthority> authorites = new HashSet<GrantedAuthority>();
     // 根据username读取用户信息
    
     //TODO 根据用户信息从DB取得用户角色列表
      //将角色添加到集合里即可
     GrantedAuthority authority = new GrantedAuthorityImpl("");
     authorites.add(authority);
     return authorites;
     }
    }

    此种场景只能采用Bean的配置方式 :security_ldap2

    <beans:bean
     id="contextSource"
     class="org.springframework.security.ldap.DefaultSpringSecurityContextSource">
     <beans:constructor-arg
     value="ldap://ldap.javali.org:389/dc=javali,dc=org" />
     <beans:property
     name="userDn"
     value="manager@javali.org" />
     <beans:property
     name="password"
     value="******" />
     </beans:bean>
     <beans:bean
     id="ldapAuthProvider"
     class="org.springframework.security.ldap.authentication.LdapAuthenticationProvider">
     <beans:constructor-arg>
     <beans:bean
     class="org.springframework.security.ldap.authentication.BindAuthenticator">
     <beans:constructor-arg
     ref="contextSource" />
     <beans:property
     name="userSearch">
     <beans:bean
     class="org.springframework.security.ldap.search.FilterBasedLdapUserSearch">
     <beans:constructor-arg
     value="dc=javali,dc=org" />
     <beans:constructor-arg
     value="(sAMAccountName={0})" />
     <beans:constructor-arg
     ref="contextSource" />
     </beans:bean>
     </beans:property>
     </beans:bean>
     </beans:constructor-arg>
     <beans:constructor-arg>
     <beans:bean
     id="ldapAuthoritiesPopulator"
     class="org.javali.security.ext.MyLdapAuthoritiesPopulator">
     <beans:constructor-arg
     ref="contextSource" />
     <beans:constructor-arg
     value="ou=Groups,dc=javali,dc=org" />
     </beans:bean>
     </beans:constructor-arg>
     <beans:property
     name="userDetailsContextMapper">
     <beans:bean
     id="ldapUserDetailsMapper"
     class="org.springframework.security.ldap.userdetails.LdapUserDetailsMapper">
     <beans:property
     name="rolePrefix"
     value="ROLE_" />
     <beans:property
     name="convertToUpperCase"
     value="true" />
     </beans:bean>
     </beans:property>
     </beans:bean>
     <authentication-manager>
     <authentication-provider
     ref="ldapAuthProvider">
     </authentication-provider>
     </authentication-manager>

    在配置 BindAuthenticator类时查询用户可有两种方式(可参见它的父类AbstractLdapAuthenticator),一种是userDnFormat,另一种是LdapUserSearch,我们采用的是第二种,使用第一种配置的时候发现它只能查找当前userDn目录下用户,如果有用户在更深层次就没法做到了,LdapUserSearch可以支持基于目录模糊匹配的查询方式,更强大,至少满足了我的ldap用户查询