Spark采用Scala语言实现,使用Scala作为应用框架。与Hadoop不同的是,Spark和Scala紧密集成,Scala像管理本地collective对象那样管理分布式数据集。
Spark支持分布式数据集上的迭代式任务,实际上它可以在Hadoop文件系统上与Hadoop一起运行,这是由第三方集群框架Mesos实现的。Spark由加州大学伯克利分校开发,用于构建大规模、低延时的数据分析应用。
Spark集群计算架构
Spark是一种类似于Hadoop的新型集群计算框架。不同的是,Spark用于特定工作负载类型的集群计算,这种计算在多个并行操作之间重用工作数据集(如机器学习算法)。为了优化这种类型的计算,Spark引入基于内存的集群计算,即将数据集缓存在内存中,减少访问延迟。
Spark还引入了一个抽象概念,即弹性分布式数据集RDD(resilient distributed datasets )。RDD是一个分布在一组节点之间的只读的对象集合。这些集合是弹性的,即能够在部分数据集丢失的情况下重建。重建部分数据集的过程需要一种维护血统(lineage,即重建部分数据集所需的信息,说明数据是根据什么过程产生的)的容错机制支持。一个RDD可以是:(1)一个从文件创建的Scala对象,或(2)一个并行切片(分布在各个节点之间),或(3)从其他RDD转换得来,或(4)改变已有RDD的持久性,如请求将已有RDD缓存在内存中。
Spark应用称为driver,实现单个节点或一组节点上的操作。与Hadoop一样,Spark支持单节点和多节点集群。对于多节点操作,Spark依附于Mesos集群管理器。Mesos为分布式应用提供了有效的资源共享和隔离的平台(见图1)。这种配置允许Spark与Hadoop共用一个节点共享池。
图1 Spark依赖于Mesos集群管理器实现资源共享和隔离
Spark编程模型
Driver在数据集上执行两种操作:行为(action)和转换(transformation)。action,即在数据集上执行计算,并向driver返回一个值;transformation,即从已有数据集创建新的数据集。例如,执行Reduce操作(使用某个函数)、遍历数据集(即在每个元素上执行一个函数,类似Map操作),属于action;Map操作、Cache操作(即请求新的数据集缓存在内存中),属于transformation。
下面我们将简单介绍一下这两种操作的实例。不过首先熟悉一下Scala语言。
Scala简介
很多著名网站都使用Scala,像Twitter,LinkedIn,及Foursquare(其web应用框架叫Lift)。此外,有证据表明金融机构也对Scala的性能感兴趣(例如使用EDF Trading进行衍生工具定价)。
Scala是一种多范式的编程语言,支持命令式、函数式和面向对象的编程范式。从面向对象的角度来看,Scala中的每个值都是一个对象。同理,从函数式编程的角度来看,每个函数也都是一个值。Scala还是一种静态类型语言,其类型系统表达能力强且安全。
此外,Scala还是一种虚拟机语言,Scala编译器生成字节码,使用JRE2直接在Java虚拟机(JVM)上运行。这样,Scala可以在几乎任何支持JVM的地方运行(需要增加Scala运行时库),并使用已有的Java库和Java代码。
最后,Scala是可扩展的,可以以库的形式轻易无缝地集成到其他语言中去。
Scala实例
现在我们来看看Scala的几个实例。Scala有自己的解释器,可以交互式地使用它。本文不对Scala语言进行具体论述,可以参考这里。
清单1 使用解释器快速了解一下Scala语言。启动Scala之后,出现命令提示符,你就可以在交互模式下评估表达式和程序。创建变量有两种方式,一是使用val创建不可变变量(称为单一赋值的变量),二是使用var创建可变变量。如果试图对val变量进行更改,将提示错误。
清单1 Scala中的变量
$ scala
Welcome to Scala version 2.8.1.final (OpenJDK Client VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.
scala> val a = 1
a: Int = 1
scala> var b = 2
b: Int = 2
scala> b = b + a
b: Int = 3
scala> a = 2
a = 2
^
接下来,定义一个简单的函数,计算一个Int类型的平方并返回这个值。使用def定义函数,后面紧跟函数名和参数列表。不需要指定返回值,函数本身可以推断出返回值。注意,这与变量赋值操作类似。这里我演示了在3这个对象上执行这个函数,返回一个名为res0的结果变量(该变量是Scala解释器自动创建的)。见清单2。
清单2 Scala中的函数
scala> def square(x: Int) = x*x
square: (x: Int)Int
scala> square(3)
res0: Int = 9
scala> square(res0)
res1: Int = 81
接着,我们看看如何在Scala中创建简单的类(见清单3)。定义一个简单的类Dog,接受String类型的参数(相当于构造器)。注意这里类直接接受参数,而不需要在类主体中定义这个类参数。类中只有一个打印该字符串的函数。创建一个类的实例,然后调用这个函数。注意解释器会插入一些竖线,它们不是代码的一部分。
清单3 Scala中的类
scala> class Dog( name: String ) {
| def bark() = println(name + " barked")
| }
defined class Dog
scala> val stubby = new Dog("Stubby")
stubby: Dog = Dog@1dd5a3d
scala> stubby.bark
Stubby barked
scala>
完成工作以后,只需要敲入:quit就可以退出Scala解释器。
安装Scala和Spark
首先下载和配置Scala。清单4给出了Scala的下载命令,并准备安装。根据Spark文档,这里使用2.8版本。
清单4 Scala安装
$ wget http://www.scala-lang.org/downloads/distrib/files/scala-2.8.1.final.tgz
$ sudo tar xvfz scala-2.8.1.final.tgz --directory /opt/
为了使Scala可见,将以下语句添加到.bashrc文件中(假设你使用Bash):
export SCALA_HOME=/opt/scala-2.8.1.finalexport PATH=SCALAHOME/bin:PATH
然后按照清单5测试安装。这组命令加载bashrc文件,然后快速测试了Scala解释器。
清单5 配置并在交互模式下运行Scala
$ scala
Welcome to Scala version 2.8.1.final (OpenJDK Client VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.
scala> println("Scala is installed!")
Scala is installed!
scala> :quit
$
现在可以看到Scala命令提示符了,输入:quit退出。注意Scala在JVM上下文中执行,所以还需要JVM。我用的是Ubuntu,默认自带了OpenJDK。
接下来,根据清单6获取最新的Spark框架。
清单6 下载和安装Spark框架
$ wget https://github.com/mesos/spark/tarball/0.3-scala-2.8/mesos-spark-0.3-scala-2.8-0-gc86af80.tar.gz
$ sudo tar xvfz mesos-spark-0.3-scala-2.8-0-gc86af80.tar.gz
然后,设置Spark配置文件 ./conf/spar-env.sh,添加SCALA_HOME环境变量:
export SCALA_HOME=/opt/scala-2.8.1.final
最后,使用简单构建工具(sbt, simple build tool)更新Spark。sbt是Scala的构建工具,Spark中也使用它。在mesos-spark-c86af80子目录下执行更新和编译:
$ sbt/sbt update compile
注意这一步需要连接到互联网。完成以后,按照清单7测试一下Spark。这个测试例子运行SparkPi计算pi的估计值(在单位正方形中随机取点)。命令格式是示例程序(spark.examples.SparkPi),加上主机参数(即定义Mesos master)。本例实在localhost上运行,因为这是一个单节点集群。注意清单7执行了两个任务,但是它们是顺序执行的(任务0结束后任务1才开始)。
主要是jre目录下缺少了libhadoop.so和libsnappy.so两个文件。具体是,spark-shell依赖的是scala,scala依赖的是JAVA_HOME下的jdk,libhadoop.so和libsnappy.so两个文件应该放到$JAVA_HOME/jre/lib/amd64下面。
这两个so:libhadoop.so和libsnappy.so。前一个so可以在HADOOP_HOME下找到,如hadoop\lib\native。第二个libsnappy.so需要下载一个snappy-1.1.0.tar.gz,然后./configure,make编译出来,编译成功之后在.libs文件夹下。当这两个文件准备好后再次启动spark shell不会出现这个问题。