Apache Spark-简介
行业正在广泛使用Hadoop分析其数据集。原因是Hadoop框架基于简单的编程模型(MapReduce),它使计算解决方案具有可扩展性,灵活性,容错性和成本效益。在这里,主要的关注点是在查询之间的等待时间和运行程序的等待时间方面,保持处理大型数据集的速度。
Apache软件基金会(Apache Software Foundation)引入Spark是为了加快Hadoop计算计算软件流程。
与通常的看法相反,Spark不是Hadoop的修改版,并且实际上不依赖Hadoop,因为它具有自己的集群管理。Hadoop只是实施Spark的方法之一。
Spark通过两种方式使用Hadoop:一种是存储,另一种是处理。由于Spark具有自己的集群管理计算,因此仅将Hadoop用于存储目的。
Apache Spark
Apache Spark是一种快如闪电的集群计算技术,专为快速计算而设计。它基于Hadoop MapReduce,并且扩展了MapReduce模型以有效地将其用于更多类型的计算,包括交互式查询和流处理。Spark的主要功能是其内存中的群集计算,可提高应用程序的处理速度。
Spark旨在涵盖各种工作负载,例如批处理应用程序,迭代算法,交互式查询和流。除了在各自的系统中支持所有这些工作负载之外,它还减轻了维护单独工具的管理负担。
Apache Spark的演变
Spark是Hadoop的子项目之一,该子项目由Matei Zaharia在2009年在UC Berkeley的AMPLab中开发。它在BSD许可下于2010年开源。它在2013年捐赠给Apache软件基金会,现在Apache Spark从2014年2月开始成为Apache的顶级项目。
Apache Spark的功能
Apache Spark具有以下功能。
速度 -Spark有助于在Hadoop集群中运行应用程序,内存速度最高可提高100倍,而在磁盘上运行时则可提高10倍。这可以通过减少对磁盘的读/写操作次数来实现。它将中间处理数据存储在内存中。
支持多种语言 -Spark提供Java,Scala或Python内置的API。因此,您可以使用不同的语言编写应用程序。Spark提供了80个高级操作员用于交互式查询。
高级分析 -Spark不仅支持"地图"和"减少"。它还支持SQL查询,流数据,机器学习(ML)和图算法。
基于Hadoop构建的Spark
下图显示了如何使用Hadoop组件构建Spark的三种方式。
Spark部署有以下三种方式。
独立 -Spark独立部署意味着Spark占据了HDFS(Hadoop分布式文件系统)之上的位置,并且为HDFS明确分配了空间。在这里,Spark和MapReduce将并排运行以覆盖集群上的所有Spark作业。
Hadoop Yarn -Hadoop Yarn部署意味着,Spark可以在Yarn上运行,而无需任何预安装或root访问。它有助于将Spark集成到Hadoop生态系统或Hadoop堆栈中。它允许其他组件在堆栈顶部运行。
MapReduce中的Spark(SIMR) -除了独立部署外,MapReduce中的Spark还用于启动Spark作业。使用SIMR,用户可以启动Spark并使用其shell,而无需任何管理访问权限。
Spark的组成
下图描述了Spark的不同组件。
Apache Spark核心
Spark Core是所有其他功能都基于的Spark平台的基础通用执行引擎。它提供了外部存储系统中的内存中计算和引用数据集。
Spark SQL
Spark SQL是Spark Core之上的组件,它引入了一个称为SchemaRDD的新数据抽象,它为结构化和半结构化数据提供支持。
火花流
Spark Streaming利用Spark Core的快速调度功能来执行流分析。它以小批量提取数据,并对那些小批量数据执行RDD(弹性分布式数据集)转换。
MLlib(机器学习库)
MLlib是基于Spark的分布式机器学习框架,因为它基于分布式内存的Spark体系结构。根据基准,它是MLlib开发人员针对交替最小二乘(ALS)实现而完成的。Spark MLlib的速度是Apache Mahout的基于Hadoop磁盘的版本的9倍(在Mahout获得Spark接口之前)。
GraphX
GraphX是基于Spark的分布式图形处理框架。它提供了一个用于表达图形计算的API,该API可以使用Pregel抽象API对用户定义的图形进行建模。它还为此抽象提供了优化的运行时。
Apache Spark-RDD
弹性分布式数据集
弹性分布式数据集(RDD)是Spark的基本数据结构。它是对象的不可变分布式集合。RDD中的每个数据集都分为逻辑分区,可以在群集的不同节点上进行计算。RDD可以包含任何类型的Python,Java或Scala对象,包括用户定义的类。
正式而言,RDD是记录的只读分区集合。可以通过对稳定存储上的数据或其他RDD进行确定性操作来创建RDD。RDD是可以并行操作的元素的容错集合。
有两种创建RDD的方法- 并行化驱动程序中的现有集合,或引用外部存储系统中的数据集,例如共享文件系统,HDFS,HBase或提供Hadoop输入格式的任何数据源。
Spark利用RDD的概念来实现更快,更有效的MapReduce操作。让我们首先讨论MapReduce操作是如何发生的以及为什么它们效率不高。
MapReduce中的数据共享速度很慢
MapReduce被广泛采用,用于在集群上使用并行分布式算法处理和生成大型数据集。它允许用户使用一组高级运算符编写并行计算,而不必担心工作分配和容错性。
不幸的是,在大多数当前框架中,在计算之间(两个MapReduce作业之间的Ex-)重用数据的唯一方法是将其写入外部稳定的存储系统(Ex-HDFS)。尽管此框架提供了许多用于访问群集的计算资源的抽象,但用户仍然需要更多。
这两个迭代和互动应用需要更快速地并行作业的数据共享。由于复制,序列化和磁盘IO,MapReduce中的数据共享速度很慢。对于存储系统,大多数Hadoop应用程序花费90%以上的时间进行HDFS读写操作。
MapReduce上的迭代操作
在多阶段应用程序中跨多个计算重用中间结果。下图说明了在MapReduce上进行迭代操作时当前框架的工作方式。由于数据复制,磁盘I / O和序列化,这会导致大量开销,从而使系统变慢。
MapReduce上的交互式操作
用户对同一数据子集运行临时查询。每个查询都将在稳定存储上执行磁盘I / O,这可能会影响应用程序的执行时间。
下图说明了在MapReduce上进行交互式查询时当前框架的工作方式。
使用Spark RDD进行数据共享
由于复制,序列化和磁盘IO,MapReduce中的数据共享速度很慢。在大多数Hadoop应用程序中,它们花费90%以上的时间进行HDFS读写操作。
认识到此问题后,研究人员开发了一种称为Apache Spark的专门框架。火花的关键思想是- [R esilient d istributed d atasets(RDD); 它支持内存中的处理计算。这意味着,它将内存状态存储为跨作业的对象,并且该对象可在这些作业之间共享。内存中的数据共享比网络和磁盘快10到100倍。
现在让我们尝试找出在Spark RDD中如何进行迭代和交互操作。
Spark RDD上的迭代操作
下图显示了Spark RDD上的迭代操作。它将中间结果存储在分布式内存中,而不是稳定存储(磁盘)中,并使系统运行更快。
注 –如果分布式内存(RAM)足以存储中间结果(JOB的状态),则它将这些结果存储在磁盘上。
Spark RDD上的交互式操作
此图显示了在Spark RDD上的交互操作。如果对同一组数据重复执行不同的查询,则可以将这些特定数据保留在内存中以缩短执行时间。
默认情况下,每次在其上执行操作时,都可能会重新计算每个转换后的RDD。但是,您也可以将RDD 保留在内存中,在这种情况下,Spark将在下次查询时将元素保留在群集中,以加快访问速度。还支持将RDD持久存储在磁盘上,或在多个节点之间复制。
Apache Spark-安装
Spark是Hadoop的子项目。因此,最好将Spark安装到基于Linux的系统中。以下步骤显示了如何安装Apache Spark。
步骤1:验证Java安装
Java安装是安装Spark的必要步骤之一。尝试使用以下命令来验证JAVA版本。
$java -version
如果您的系统上已经安装了Java,则会看到以下响应-
java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)
如果您的系统上未安装Java,请在继续下一步之前先安装Java。
步骤2:验证Scala安装
您应该使用Scala语言来实现Spark。因此,让我们使用以下命令来验证Scala的安装。
$scala -version
如果您的系统上已经安装了Scala,则会看到以下响应-
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
如果您的系统上未安装Scala,请继续执行下一步以进行Scala安装。
第三步:下载Scala
通过访问以下链接下载最新版本的。对于本教程,我们使用的是scala-2.11.6版本。下载后,您将在下载文件夹中找到Scala tar文件。
步骤4:安装Scala
请按照以下给定的步骤安装Scala。
提取Scala tar文件
键入以下命令以提取Scala tar文件。
$ tar xvf scala-2.11.6.tgz
移动Scala软件文件
使用以下命令将Scala软件文件移动到相应目录(/ usr / local / scala)。
$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv scala-2.11.6 /usr/local/scala
# exit
设置Scala的PATH
使用以下命令为Scala设置PATH。
$ export PATH = $PATH:/usr/local/scala/bin
验证Scala安装
安装后,最好进行验证。使用以下命令来验证Scala安装。
$scala -version
如果您的系统上已经安装了Scala,则会看到以下响应-
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
步骤5:下载Apache Spark
通过访问以下链接下载最新版本的。在本教程中,我们使用spark-1.3.1-bin-hadoop2.6版本。下载后,您将在下载文件夹中找到Spark tar文件。
步骤6:安装Spark
请按照以下给出的步骤安装Spark。
提取火花焦油
以下命令用于提取spark tar文件。
$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz
移动Spark软件文件
以下命令用于将Spark软件文件移动到相应目录(/ usr / local / spark)。
$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv spark-1.3.1-bin-hadoop2.6 /usr/local/spark
# exit
设置Spark环境
将以下行添加到?/ .bashrc文件。这意味着将火花软件文件所在的位置添加到PATH变量中。
export PATH=$PATH:/usr/local/spark/bin
使用以下命令来获取?/ .bashrc文件。
$ source ~/.bashrc
步骤7:验证Spark安装
编写以下命令以打开Spark Shell。
$spark-shell
如果spark安装成功,那么您将找到以下输出。
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
Apache Spark-核心编程
Spark Core是整个项目的基础。它提供了分布式任务分配,调度和基本的I / O功能。Spark使用称为RDD(弹性分布式数据集)的专用基础数据结构,该结构是跨机器分区的逻辑数据集合。RDD可以通过两种方式创建:一种是通过引用外部存储系统中的数据集,第二种是通过对现有RDD进行转换(例如,映射,过滤器,化简,联接)。
RDD抽象是通过语言集成的API公开的。这简化了编程的复杂性,因为应用程序处理RDD的方式类似于处理本地数据集合。
火花壳
Spark提供了一个交互式外壳程序-一个强大的工具来交互式地分析数据。它可以使用Scala或Python语言。Spark的主要抽象是称为"弹性分布式数据集(RDD)"的项目的分布式集合。可以从Hadoop输入格式(例如HDFS文件)或通过转换其他RDD创建RDD。
打开火花壳
以下命令用于打开Spark Shell。
$ spark-shell
创建简单的RDD
让我们从文本文件创建一个简单的RDD。使用以下命令创建一个简单的RDD。
scala> val inputfile = sc.textFile("input.txt")
上面命令的输出是
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at
Spark RDD API引入了很少的转换和很少的操作来操纵RDD。
RDD转换
RDD转换返回指向新RDD的指针,并允许您在RDD之间创建依赖关系。依赖关系链中的每个RDD(依赖关系字符串)都有一个计算其数据的功能,并具有指向其父RDD的指针(依赖关系)。
Spark是懒惰的,因此除非您调用将触发作业创建和执行的某些转换或操作,否则将不会执行任何操作。请看下面的单词计数示例片段。
因此,RDD转换不是一组数据,而是程序中的一个步骤(可能是唯一的步骤),告诉Spark如何获取数据以及如何处理数据。
下面给出了RDD转换的列表。
动作
下表列出了返回值的操作。
用RDD编程
让我们借助示例来了解RDD编程中一些RDD转换和动作的实现。
例
考虑一个单词计数示例-它计算文档中出现的每个单词。将以下文本视为输入,并将其另存为主目录中的input.txt文件。
input.txt-输入文件。
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
请按照下面给出的步骤执行给定的示例。
打开Spark-Shell
以下命令用于打开火花壳。通常,使用Scala构建spark。因此,Spark程序在Scala环境中运行。
$ spark-shell
如果Spark Shell成功打开,则将找到以下输出。看输出"可作为SC火花上下文"的最后一行表示Spark容器是自动创建的火花上下文对象与名称SC。在开始程序的第一步之前,应创建SparkContext对象。
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
创建一个RDD
首先,我们必须使用Spark-Scala API读取输入文件并创建一个RDD。
以下命令用于从给定位置读取文件。在这里,使用输入文件名创建新的RDD。在textFile("")方法中作为参数给出的String是输入文件名的绝对路径。但是,如果仅给出文件名,则表示输入文件位于当前位置。
scala> val inputfile = sc.textFile("input.txt")
执行字数转换
我们的目的是计算文件中的单词数。创建一个平面地图,将每行分割成多个单词(flatMap(line?line.split(""))。
接下来,使用map函数(map(word?(word,1)),将每个单词作为键读取,值为'1'(
最后,通过添加相似键的值(reduceByKey(_ + _))来减少这些键。
以下命令用于执行字计数逻辑。执行此操作后,您将找不到任何输出,因为这不是操作,而是转换。指向新的RDD或告知Spark如何处理给定的数据)
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
当前RDD
在使用RDD时,如果您想了解当前的RDD,请使用以下命令。它将向您显示有关当前RDD及其调试依赖关系的描述。
scala> counts.toDebugString
缓存转换
您可以使用其上的persist()或cache()方法将RDD标记为持久。第一次在操作中对其进行计算时,它将被保存在节点上的内存中。使用以下命令将中间转换存储在内存中。
scala> counts.cache()
采取行动
应用动作(如存储所有转换)将结果生成文本文件。saveAsTextFile("")方法的String参数是输出文件夹的绝对路径。尝试使用以下命令将输出保存在文本文件中。在以下示例中,"输出"文件夹位于当前位置。
scala> counts.saveAsTextFile("output")
检查输出
打开另一个终端以转到主目录(在另一个终端中执行spark)。使用以下命令检查输出目录。
[hadoop@localhost ~]$ cd output/
[hadoop@localhost output]$ ls -1
part-00000
part-00001
_SUCCESS
以下命令用于查看Part-00000文件的输出。
[hadoop@localhost output]$ cat part-00000
输出量
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
以下命令用于查看Part-00001文件的输出。
[hadoop@localhost output]$ cat part-00001
输出量
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
联合国坚持储存
取消永久保留之前,如果要查看用于该应用程序的存储空间,请在浏览器中使用以下URL。
http://localhost:4040
您将看到以下屏幕,其中显示了用于应用程序的存储空间,这些存储空间在Spark Shell上运行。
如果要取消永久保留特定RDD的存储空间,请使用以下命令。
Scala> counts.unpersist()
您将看到如下输出:
15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at
要验证浏览器中的存储空间,请使用以下URL。
http://localhost:4040/
您将看到以下屏幕。它显示了用于应用程序的存储空间,这些存储空间在Spark Shell上运行。
Apache Spark-部署
使用spark-submit的Spark应用程序是一个Shell命令,用于在群集上部署Spark应用程序。它通过统一的界面使用所有各自的集群管理器。因此,您不必为每个应用程序都配置您的应用程序。
例
让我们以以前使用shell命令的单词计数为例。在这里,我们考虑与Spark应用程序相同的示例。
样本输入
以下文本是输入数据,名为in.txt的文件。
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
看下面的程序-
SparkWordCount.scala
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")
/* Transform the inputRDD into countRDD */
val count = input.flatMap(line ? line.split(" "))
.map(word ? (word, 1))
.reduceByKey(_ + _)
/* saveAsTextFile method is an action that effects on the RDD */
count.saveAsTextFile("outfile")
System.out.println("OK");
} }
将上述程序保存到名为SparkWordCount.scala的文件中,并将其放置在名为spark-application的用户定义目录中。
注意 -在将inputRDD转换为countRDD时,我们使用flatMap()将行(从文本文件中)标记为单词,使用map()方法计算单词频率,并使用reduceByKey()方法计算每个单词重复。
使用以下步骤提交此申请。通过终端执行spark-application目录中的所有步骤。
步骤1:下载Spark Ja
编译需要Spark核心jar,因此,请从以下链接下载spark-core_2.10-1.3.0.jar 并将jar文件从下载目录移至spark-application目录。
步骤2:编译程式
使用下面给出的命令编译以上程序。该命令应从spark-application目录执行。在这里,
/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar是从Spark库获取的Hadoop支持jar。
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
第3步:创建一个JAR
使用以下命令创建spark应用程序的jar文件。在这里,wordcount是jar文件的文件名。
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
步骤4:提交Spark申请
使用以下命令提交spark应用程序-
spark-submit --class SparkWordCount --master local wordcount.jar
如果执行成功,您将在下面找到输出。以下输出中的OK(确定)命令用于用户识别,这是程序的最后一行。如果您仔细阅读以下输出,将会发现不同的内容,例如-
在端口42954上成功启动服务" sparkDriver"
MemoryStore开始时的容量为267.3 MB
从http://192.168.1.217:4040启动SparkUI
添加了JAR文件:
/home/hadoop/piapplication/count.jar
ResultStage 1(SparkPi.scala:saveAsTextFile:11)在0.566秒内完成
在http://192.168.1.217:4040停止了Spark Web UI
MemoryStore已清除
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
步骤5:检查输出
程序成功执行后,您将在spark-application目录中找到名为outfile的目录。
以下命令用于打开和检查outfile目录中的文件列表。
$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS
用于检查part-00000文件中的输出的命令是-
$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
用于检查part-00001文件中的输出的命令是-
$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
浏览以下部分以了解有关" spark-submit"命令的更多信息。
Spark提交语法
spark-submit [options]
选件
下表给出了选项列表-
高级Spark编程
Spark包含两种不同类型的共享变量-一种是广播变量,另一种是累加器。
广播变量 -用于有效地分配较大的值。
累加器 -用于汇总特定集合的信息。
广播变量
广播变量使程序员可以在每台计算机上保留一个只读变量,而不用将其副本与任务一起发送。例如,可以使用它们以有效的方式为每个节点提供大型输入数据集的副本。Spark还尝试使用有效的广播算法分配广播变量,以降低通信成本。
火花动作是通过一组阶段执行的,这些阶段由分布式"随机"操作分开。Spark自动广播每个阶段任务所需的通用数据。
以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前反序列化。这意味着仅当跨多个阶段的任务需要相同数据或以反序列化形式缓存数据非常重要时,显式创建广播变量才有用。
广播变量是通过调用SparkContext.broadcast(v)从变量v创建的。broadcast变量是v的包装,可以通过调用value方法来访问其值。下面给出的代码显示了这一点-
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
输出 -
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
创建广播变量之后,在集群上运行的任何函数中都应使用它代替值v,以使v不会多次传送给节点。另外,对象v广播后不应修改,以确保所有节点都具有相同的广播变量值。
蓄能器
累加器是仅通过关联操作"添加"到的变量,因此可以有效地并行支持。它们可用于实现计数器(如在MapReduce中)或总和。Spark本机支持数字类型的累加器,程序员可以添加对新类型的支持。如果使用名称创建累加器,它们将显示在Spark的UI中。这对于了解运行阶段的进度很有用(注意-Python尚不支持此功能)。
通过调用SparkContext.accumulator(v)从初始值v创建一个累加器。然后,可以使用add方法或+ =运算符(在Scala和Python中)向集群中运行的任务添加集群。但是,他们无法读取其值。只有驱动程序才能使用其value方法读取累加器的值。
下面给出的代码显示了一个累加器,用于累加一个数组的元素-
scala> val accum = sc.accumulator(0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
如果您想查看以上代码的输出,请使用以下命令-
scala> accum.value
输出量
res2: Int = 10
数字RDD运算
Spark允许您使用预定义的API方法之一对数字数据执行不同的操作。Spark的数字运算是通过流算法实现的,该算法允许一次仅一个元素地构建模型。
通过调用status()方法来计算并返回这些操作作为StatusCounter对象。
以下是StatusCounter中可用的数字方法的列表。
如果只想使用这些方法之一,则可以直接在RDD上调用相应的方法。