Spark
1. RDD
abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging
- RDD是一个抽象类
- 带泛型的,可以支持多种类型:String / 类对象 / ...
Resilient Distributed Dataset — 弹性 分布式 数据集
the basic abstraction in Spark
RDD是spark最基本的抽象单位
Represents an
- immutable:不可变
- partitioned collection of elements:分区
- Array(1,2,3,4,5,6,7,8,9) 分区:(1,2,3),(4,5,6),(7,8,9)
- that can be operated on in parallel:并行计算
PairRDDFunctions — 键值对
contains operations available only on RDDs of key-value pairs, such as
groupByKey
andjoin
;
2. RDD 特性
单机存储 / 计算 ==> 分布式存储 / 计算
- 数据的存储:切割 HDFS的block
- 数据的计算:切割(分布式并行计算) MapReduce / Spark
- 存储+计算: 切割? HDFS / + MapReduce / Spark ===> OK
RDD 的五大特性:
Internally, each RDD is characterized by five main properties:
在内部,每个RDD都有五个主要特性
- A list of partitions
一系列的分区(partitions) / 分片
- A function for computing each split/partition
对一个rdd执行一个函数其实就是对这个rdd的所有的分区都执行一个相同的函数操作
rdd.map(_+1)
- A list of dependencies on other RDDs
一个 RDD 有一系列依赖在其他 RDD 上
rdd1 => rdd2 => rdd3 => rdd4 dependencies:***** 比如: rdd1 = 5 partition ==>map rdd2 = 5 partition
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
对于一个key-value的 RDD 可以定制一个 partitioner ,告诉它如何分片。常用的有 hash / range
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for
- an HDFS file)
数据在哪优先把作业调度到数据所在的节点进行计算
移动数据不如移动计算
- All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
- to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
- reading data from a new storage system) by overriding these functions.
机器翻译: Spark中的所有调度和执行都是基于这些方法完成的,允许使用每个RDD实现自己的计算方法。实际上,用户可以实现自定义RDDs通过覆盖这些函数从新的存储系统中读取数据。
RDD五大特性在源码中的 体现:
- def compute(split: Partition, context: TaskContext): Iterator[T]
- partition , 上下文 ===> 特性2
partition源码:package org.apache.spark/** * An identifier for a partition in an RDD. * RDD中分区的标识符 */// trait 接口trait Partition extends Serializable { /** * Get the partition's index within its parent RDD */ def index: Int // A better default implementation of HashCode override def hashCode(): Int = index override def equals(other: Any): Boolean = super.equals(other)}
- protected def getPartitions: Array[Partition]
- ===> 特性1
- 该方法只调用一次,因此在其中实现耗时的计算是安全的。
- protected def getDependencies: Seq[Dependency[_]] = deps
- 由子类实现,以返回此RDD如何依赖于父RDDs
- ===> 特性3
- protected def getPreferredLocations(split: Partition): Seq[String] = Nil
- 根据分区获得最佳的位置所在
- 可选地由子类重写,以指定放置首选项。(机器翻译)
- ===> 特性5
- @transient val partitioner: Option[Partitioner] = None
- 特性4
Scala Option(选项)类型用来表示一个值是可选的(有值或无值)。
Option[T] 是一个类型为 T 的可选值的容器: 如果值存在, Option[T] 就是一个 Some[T] ,如果不存在, Option[T] 就是对象 None 。
第一要务:创建 SparkContext
class pyspark.SparkContext(master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class 'pyspark.profiler.BasicProfiler'>)
- 连接到Spark"集群" : local / standalone / yarn ....
- 通过SparkContext来创建RDD,广播变量到集群
class pyspark.SparkConf(loadDefaults=True,_jvm=None, _jconf=None)[source]
Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
Most of the time, you would create a SparkConf object with SparkConf(), which will load values from spark.* Java system properties as well. In this case, any parameters you set directly on the SparkConf object take priority over system properties.
For unit tests, you can also call SparkConf(false) to skip loading external settings and get the same configuration no matter what the system properties are.
All setter methods in this class support chaining. For example, you can write conf.setMaster(“local”).setAppName(“My app”).
Spark应用程序的配置。 用于将各种Spark参数设置为键值对。大多数情况下,您将使用SparkConf()创建一个SparkConf对象,它将从spark。* Java系统属性加载值。 在这种情况下,您直接在SparkConf对象上设置的任何参数都优先于系统属性。
对于单元测试,您也可以调用SparkConf(false)来跳过加载外部设置,并获得相同的配置,无论系统属性如何。
此类中的所有setter方法都支持链接。 例如,您可以编写conf.setMaster(“local”)。setAppName(“我的应用程序”)。