Spark MLlib机器学习实践(第2版)
上QQ阅读APP看书,第一时间看更新

2.2 经典的WordCount

上节成功安装完Spark单机版,下面可以开始MLlib的学习了,这也是我们学习MLlib万里长征的第一步。

2.2.1 Spark实现WordCount

相信有不少读者都有Hadoop的学习经验,经典的WordCount是MapReduce入门必看的例子,可以称为分布式框架的Hello World,也是大数据处理程序员必须掌握的入门技能。WordCount的主要功能是统计文本中某个单词出现的次数,其形式如图2-46所示。

图2-46 WordCount统计流程

首先是数据的准备工作,这里为了简化起见,采用小数据集(本书将以小数据为主演示MLlib的使用和原理)。

在C盘建立名为wc.txt的文本文件,文件名可以自行设定,内容如下:

数据位置://DATA//D02//wc.txt

        good bad cool
        hadoop spark mllib
        good spark mllib
        cool spark bad

这是需要计数的数据内容,我们需要计算出文章中每个单词出现的次数,Spark代码如程序2-3所示。

代码位置:// SRC//C02// wordCount.scala

程序2-3 Spark代码

        import org.apache.spark.{SparkContext, SparkConf}
        object wordCount {
          def main(args: Array[String]) {
            val conf = new SparkConf().setMaster("local").setAppName("wordCount")
    //创建环境变量
            val sc = new SparkContext(conf)                        //创建环境变量实例
            val data = sc.textFile("c://wc.txt")                  //读取文件
            data.flatMap(_.split(" ")).map((_,
    1)).reduceByKey(_+_).collect().foreach(println)    //word计数
          }
        }

下面是对程序进行分析。

(1)首先笔者new了一个SparkConf(),目的是创建了一个环境变量实例,告诉系统开始Spark计算。之后的setMaster("local")启动了本地化运算,setAppName("wordCount")是设置本程序名称。

(2)new SparkContext(conf)的作用是创建环境变量实例,准备开始任务。

(3)sc.textFile("c://wc.txt")的作用是读取文件,这里的文件是在C盘上,因此路径目录即为c://wc.txt。顺便提一下,此时的文件读取是按正常的顺序读取,本书后面章节会介绍如何读取特定格式的文件。

(4)第4行是对word进行计数。flatMap()是Scala中提取相关数据按行处理的一个方法,_.split(" ")中,下划线_是一个占位符,代表传送进来的任意一个数据,对其进行按" "分割。map((_, 1))是对每个字符开始计数,在这个过程中,并不涉及合并和计算,只是单纯地将每个数据行中单词加1。最后的reduceByKey()方法是对传递进来的数据按key值相加,最终形成wordCount计算结果。

目前程序流程如图2-47所示。

图2-47 WordCount流程图

(5)collect()是对程序的启动,因Spark编程的优化,很多方法在计算过程中属于lazy模式,因此需要一个显性启动支持。foreach(println)是打印的一个调用方法,打印出数据内容。

具体打印结果如下:

        (cool,2)
        (spark,3)
        (hadoop,1)
        (bad,2)
        (good,2)
        (mllib,2)

2.2.2 MapReduce实现WordCount

与Spark对比的是MapReduce中wordCount程序的设计,如程序2-4所示,在这里笔者只是为了做对比,如果有读者想深入学习MapReduce程序设计,请参考相关的专业书籍。

代码位置:// SRC//C02// wordCount.java

程序2-4 MapReduce中wordCount程序的设计

        import java.io.IOException;
        import java.util.Iterator;
        import java.util.StringTokenizer;
        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.io.IntWritable;
        import org.apache.hadoop.io.LongWritable;
        import org.apache.hadoop.io.Text;
        import org.apache.hadoop.mapred.FileInputFormat;
        import org.apache.hadoop.mapred.FileOutputFormat;
        import org.apache.hadoop.mapred.JobClient;
        import org.apache.hadoop.mapred.JobConf;
        import org.apache.hadoop.mapred.MapReduceBase;
        import org.apache.hadoop.mapred.Mapper;
        import org.apache.hadoop.mapred.OutputCollector;
        import org.apache.hadoop.mapred.Reducer;
        import org.apache.hadoop.mapred.Reporter;
        import org.apache.hadoop.mapred.TextInputFormat;
        import org.apache.hadoop.mapred.TextOutputFormat;

    public class WordCount {

      public static class Map extends MapReduceBase implements
        //创建固定Map格式
        Mapper<LongWritable, Text, Text, IntWritable> {
        //创建数据1格式
          private final static IntWritable one = new IntWritable(1);
        //设定输入格式
          private Text word = new Text();
        //开始map程序
          public void map(LongWritable key, Text value,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {
            //将传入值定义为line
          String line = value.toString();
            //格式化传入值
          StringTokenizer tokenizer = new StringTokenizer(line);
          //开始迭代计算
          while (tokenizer.hasMoreTokens()) {
            //设置输入值
            word.set(tokenizer.nextToken());
            //写入输出值
            output.collect(word, one);
          }
        }
      }

      public static class Reduce extends MapReduceBase implements
        //创建固定Reduce格式
        Reducer<Text, IntWritable, Text, IntWritable> {
        //开始Reduce程序
        public void reduce(Text key, Iterator<IntWritable> values,
                  OutputCollector<Text, IntWritable> output, Reporter reporter)
                  throws IOException {
          //初始化计算器
          int sum = 0;
          //开始迭代计算输入值

          while (values.hasNext()) {
                sum += values.next().get();          //计数器计算
              }
              //创建输出结果
              output.collect(key, new IntWritable(sum));
            }
          }
          //开始主程序
          public static void main(String[] args) throws Exception {
            //设置主程序
            JobConf conf = new JobConf(WordCount.class);
            //设置主程序名
            conf.setJobName("wordcount");
            //设置输出Key格式
            conf.setOutputKeyClass(Text.class);
            //设置输出Vlaue格式
            conf.setOutputValueClass(IntWritable.class);
            //设置主Map
            conf.setMapperClass(Map.class);
            //设置第一次Reduce方法
            conf.setCombinerClass(Reduce.class);
            //设置主Reduce方法
            conf.setReducerClass(Reduce.class);
            //设置输入格式
            conf.setInputFormat(TextInputFormat.class);
            //设置输出格式
            conf.setOutputFormat(TextOutputFormat.class);
            //设置输入文件路径
            FileInputFormat.setInputPaths(conf, new Path(args[0]));
            //设置输出路径

            FileOutputFormat.setOutputPath(conf, new Path(args[1]));
            //开始主程序
            JobClient.runJob(conf);
          }
        }

从程序2-3和程序2-4的对比可以看到,采用了Scala的Spark程序设计能够简化程序编写的过程与步骤,同时在后端,Scala对编译后的文件有较好的优化性,这些都是目前使用Java语言所欠缺的。

这里顺便提一下,可能有部分使用者在使用Scala时感觉较为困难,但实际上,Scala在使用中主要将其进行整体化考虑,而非Java的面向对象的思考方法,这点请读者注意。