![Spark MLlib机器学习实践(第2版)](https://wfqqreader-1252317822.image.myqcloud.com/cover/926/26943926/b_26943926.jpg)
2.2 经典的WordCount
上节成功安装完Spark单机版,下面可以开始MLlib的学习了,这也是我们学习MLlib万里长征的第一步。
2.2.1 Spark实现WordCount
相信有不少读者都有Hadoop的学习经验,经典的WordCount是MapReduce入门必看的例子,可以称为分布式框架的Hello World,也是大数据处理程序员必须掌握的入门技能。WordCount的主要功能是统计文本中某个单词出现的次数,其形式如图2-46所示。
![](https://epubservercos.yuewen.com/90E409/15367252404214006/epubprivate/OEBPS/Images/Figure-0042-0049.jpg?sign=1739531149-UkSm4v55utNv2uUK8MIxABsu8B33q3SF-0-59ba12f37a6e6f7ff997eff4fb27d064)
图2-46 WordCount统计流程
首先是数据的准备工作,这里为了简化起见,采用小数据集(本书将以小数据为主演示MLlib的使用和原理)。
在C盘建立名为wc.txt的文本文件,文件名可以自行设定,内容如下:
数据位置://DATA//D02//wc.txt
good bad cool hadoop spark mllib good spark mllib cool spark bad
这是需要计数的数据内容,我们需要计算出文章中每个单词出现的次数,Spark代码如程序2-3所示。
代码位置:// SRC//C02// wordCount.scala
程序2-3 Spark代码
import org.apache.spark.{SparkContext, SparkConf} object wordCount { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local").setAppName("wordCount") //创建环境变量 val sc = new SparkContext(conf) //创建环境变量实例 val data = sc.textFile("c://wc.txt") //读取文件 data.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect().foreach(println) //word计数 } }
下面是对程序进行分析。
(1)首先笔者new了一个SparkConf(),目的是创建了一个环境变量实例,告诉系统开始Spark计算。之后的setMaster("local")启动了本地化运算,setAppName("wordCount")是设置本程序名称。
(2)new SparkContext(conf)的作用是创建环境变量实例,准备开始任务。
(3)sc.textFile("c://wc.txt")的作用是读取文件,这里的文件是在C盘上,因此路径目录即为c://wc.txt。顺便提一下,此时的文件读取是按正常的顺序读取,本书后面章节会介绍如何读取特定格式的文件。
(4)第4行是对word进行计数。flatMap()是Scala中提取相关数据按行处理的一个方法,_.split(" ")中,下划线_是一个占位符,代表传送进来的任意一个数据,对其进行按" "分割。map((_, 1))是对每个字符开始计数,在这个过程中,并不涉及合并和计算,只是单纯地将每个数据行中单词加1。最后的reduceByKey()方法是对传递进来的数据按key值相加,最终形成wordCount计算结果。
目前程序流程如图2-47所示。
![](https://epubservercos.yuewen.com/90E409/15367252404214006/epubprivate/OEBPS/Images/Figure-0044-0050.jpg?sign=1739531149-mBtY1eu6GX1GaVnUnTY8QAjRSxxhvx3Q-0-94f349f62c5a4abd6bba83e30bf4efdf)
图2-47 WordCount流程图
(5)collect()是对程序的启动,因Spark编程的优化,很多方法在计算过程中属于lazy模式,因此需要一个显性启动支持。foreach(println)是打印的一个调用方法,打印出数据内容。
具体打印结果如下:
(cool,2) (spark,3) (hadoop,1) (bad,2) (good,2) (mllib,2)
2.2.2 MapReduce实现WordCount
与Spark对比的是MapReduce中wordCount程序的设计,如程序2-4所示,在这里笔者只是为了做对比,如果有读者想深入学习MapReduce程序设计,请参考相关的专业书籍。
代码位置:// SRC//C02// wordCount.java
程序2-4 MapReduce中wordCount程序的设计
import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class WordCount { public static class Map extends MapReduceBase implements //创建固定Map格式 Mapper<LongWritable, Text, Text, IntWritable> { //创建数据1格式 private final static IntWritable one = new IntWritable(1); //设定输入格式 private Text word = new Text(); //开始map程序 public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { //将传入值定义为line String line = value.toString(); //格式化传入值 StringTokenizer tokenizer = new StringTokenizer(line); //开始迭代计算 while (tokenizer.hasMoreTokens()) { //设置输入值 word.set(tokenizer.nextToken()); //写入输出值 output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements //创建固定Reduce格式 Reducer<Text, IntWritable, Text, IntWritable> { //开始Reduce程序 public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { //初始化计算器 int sum = 0; //开始迭代计算输入值 while (values.hasNext()) { sum += values.next().get(); //计数器计算 } //创建输出结果 output.collect(key, new IntWritable(sum)); } } //开始主程序 public static void main(String[] args) throws Exception { //设置主程序 JobConf conf = new JobConf(WordCount.class); //设置主程序名 conf.setJobName("wordcount"); //设置输出Key格式 conf.setOutputKeyClass(Text.class); //设置输出Vlaue格式 conf.setOutputValueClass(IntWritable.class); //设置主Map conf.setMapperClass(Map.class); //设置第一次Reduce方法 conf.setCombinerClass(Reduce.class); //设置主Reduce方法 conf.setReducerClass(Reduce.class); //设置输入格式 conf.setInputFormat(TextInputFormat.class); //设置输出格式 conf.setOutputFormat(TextOutputFormat.class); //设置输入文件路径 FileInputFormat.setInputPaths(conf, new Path(args[0])); //设置输出路径 FileOutputFormat.setOutputPath(conf, new Path(args[1])); //开始主程序 JobClient.runJob(conf); } }
从程序2-3和程序2-4的对比可以看到,采用了Scala的Spark程序设计能够简化程序编写的过程与步骤,同时在后端,Scala对编译后的文件有较好的优化性,这些都是目前使用Java语言所欠缺的。
这里顺便提一下,可能有部分使用者在使用Scala时感觉较为困难,但实际上,Scala在使用中主要将其进行整体化考虑,而非Java的面向对象的思考方法,这点请读者注意。