一个基于Mahout与hadoop的聚类搭建

Harper 2011-09-23

mahout是基于hadoop的数据挖掘工具,因为有了hadoop,所以进行海量数据的挖掘工作显得更为简单。但是因为算法需要支持M/R,所以不是所有常用的数据挖掘算法都会支持。这篇文章会告诉你,如何使用hadoop+mahout搭出一个简易的聚类工具。

第一步:搭建hadoop平台。

我使用的是ubuntu11.04,如果没有ubuntu的开发环境,就参考我的帖子《Ubuntu10.10java开发环境》

#1在ubuntu下面建立一个用户组与用户

beneo@ubuntu:~$ sudo addgroup hadoop
beneo@ubuntu:~$ sudo adduser --ingroup hadoop hduser

#2安装ssh-server

beneo@ubuntu:~$ sudo apt-get install ssh
beneo@ubuntu:~$ su - hduser
hduser@ubuntu:~$ ssh-keygen -t rsa -P ""
hduser@ubuntu:~$ cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys

#3验证ssh通信

hduser@ubuntu:ssh localhost

sshlocalhost后,选择yes,如果没有问题,就可以安装hadoop了

#4添加java_home

修改conf/hadoop-env.sh文件,让JAVA_HOME指向正确的地址

#5修改下面的配置

conf/core-site.xml:

<configuration>
     <property>
         <name>fs.default.name</name>
         <value>hdfs://localhost:9000</value>
     </property>
</configuration>

conf/hdfs-site.xml:

<configuration>
     <property>
         <name>dfs.replication</name>
         <value>1</value>
     </property>
</configuration>

conf/mapred-site.xml:

<configuration>
     <property>
         <name>mapred.job.tracker</name>
         <value>localhost:9001</value>
     </property>
</configuration>

#6Formatanewdistributed-filesystem:

$ bin/hadoop namenode -format

#7Startthehadoopdaemons:

$ bin/start-all.sh

#8验证启动成功没有

$ jps

数一下有没有6个,没有的话,删除logs下面的文件,然后从#6开始

#9别慌,先打开网页,打不开,等!!!

NameNode - http://localhost:50070/
    JobTracker - http://localhost:50030/

第一步搭建hadoop结束

第二步,Mahout的配置

#1下载Mahout,解压

#2.bash_profile里面设置HADOOP_HOME

#3mahout/bin/mahout看看打印结果

第三步,做一个聚类的demo吧

我的聚类是文本->luceneindex->mahout->clusteringdumper

可以选择的是sequeneceFile->mahout->clusteringdumper

我直接贴代码吧,用的是groovy,可能写的不好

#1text->luceneindex

def assembleDoc = {
    label, content ->
    assert !label.toString().trim().empty
    assert !content.toString().trim().empty

    def doc = new Document()
    doc.add(new Field("label", label, Field.Store.YES, Field.Index.NOT_ANALYZED))
    doc.add(new Field("content", content, Field.Store.NO, Field.Index.ANALYZED, TermVector.YES))
    doc
}

def mockContent = {
    def set = []
    new File("""/home/beneo/text.txt""").newReader().eachLine {
        String line ->
        set << line
    }
    set
}


def mockExpandoSet = {

    def lst = []

    mockContent()?.each {
        content ->
        // 过滤掉所有非中文字符
        def line = content.replaceAll("[^\u4e00-\u9fa5]+", "")
        if (line != null && line.trim().length() > 2) {
            println(content)
            def expando = new Expando()
            expando.label = content
            expando.content = line
            lst << expando
        }
    }
    lst
}

//创建一个dic
def directory = FSDirectory.open(new File("""/home/beneo/index"""), NoLockFactory.getNoLockFactory())
// 用的是 IK分词
def analyzer = new IKAnalyzer()
//创建一个indexWriter,这个wirter就是用来产生出index
def indexWriter = new IndexWriter(directory, analyzer, true, IndexWriter.MaxFieldLength.UNLIMITED)

//从本地获得文本
mockExpandoSet().each {
    expando ->
    indexWriter.addDocument(assembleDoc(expando.label, expando.content))
}

indexWriter.commit()
indexWriter.close()
directory.close()
        

#2luceneindex->mahoutvector

mahout/bin/mahout lucene.vector -d index/ -i label -o tmp/vector/vector -f content -t tmp/vector/dict -n 2

#3mahoutvector->mahoutcanopy

mahout/bin/mahout canopy -i tmp/vector/vector -o tmp/canopy/ -dm org.apache.mahout.common.distance.CosineDistanceMeasure -t1 0.32 -t2 0.08 -ow

#4mahoutcanopy->mahoutkmeans

mahout/bin/mahout kmeans -i tmp/vector/vector -c tmp/canopy/clusters-0/ -dm org.apache.mahout.common.distance.CosineDistanceMeasure -o tmp/kmeans/ -cd 0.001 -x 10 -ow -cl

#5mahoutkeamns->结果分析

String seqFileDir = "/home/hduser/tmp/kmeans/clusters-2/"
String pointsDir = "/home/hduser/tmp/kmeans/clusteredPoints/"

def conf = new Configuration()

FileSystem fs = new Path(seqFileDir).getFileSystem(conf)

Map<Integer, List<WeightedVectorWritable>> clusterIdToPoints = readPoints(new Path(pointsDir), new Configuration());

for (FileStatus seqFile: fs.globStatus(new Path(seqFileDir, "part-*"))) {
    Path path = seqFile.getPath()

    SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);

    org.apache.hadoop.io.Writable key = reader.getKeyClass().asSubclass(org.apache.hadoop.io.Writable.class).newInstance();
    org.apache.hadoop.io.Writable value = reader.getValueClass().asSubclass(org.apache.hadoop.io.Writable.class).newInstance();

    while (reader.next(key, value)) {
        Cluster cluster = (Cluster) value;
        int id = cluster.getId()
        int np = cluster.getNumPoints()

        List<WeightedVectorWritable> points = clusterIdToPoints.get(cluster.getId());
        if (points != null && points.size() > 4) {
            for (Iterator<WeightedVectorWritable> iterator = points.iterator(); iterator.hasNext();) {
                println(((NamedVector) iterator.next().getVector()).getName())
            }
            println "======================================"
        }
    }
}


private static Map<Integer, List<WeightedVectorWritable>> readPoints(Path pointsPathDir, Configuration conf)
throws IOException {
    Map<Integer, List<WeightedVectorWritable>> result = new TreeMap<Integer, List<WeightedVectorWritable>>();

    FileSystem fs = pointsPathDir.getFileSystem(conf);
    FileStatus[] children = fs.listStatus(pointsPathDir, new PathFilter() {
        @Override
        public boolean accept(Path path) {
            String name = path.getName();
            return !(name.endsWith(".crc") || name.startsWith("_"));
        }
    });

    for (FileStatus file: children) {
        Path path = file.getPath();
        SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);

        IntWritable key = reader.getKeyClass().asSubclass(IntWritable.class).newInstance();
        WeightedVectorWritable value = reader.getValueClass().asSubclass(WeightedVectorWritable.class).newInstance();
        while (reader.next(key, value)) {
            // value is the cluster id as an int, key is the name/id of the
            // vector, but that doesn't matter because we only care about printing
            // it
            // String clusterId = value.toString();
            List<WeightedVectorWritable> pointList = result.get(key.get());
            if (pointList == null) {
                pointList = new ArrayList<WeightedVectorWritable>();
                result.put(key.get(), pointList);
            }
            pointList.add(value);
            value = reader.getValueClass().asSubclass(WeightedVectorWritable.class).newInstance();
        }

    }

    return result;
}

效果我就不展示了

相关推荐