Spark RDD

Reason is the light and the light of life.

Jerry Su Jan 03, 2019 3 mins
Spark RDD要点总结:
Spark RDD弹性分布式数据集
1. RDD简介
  - RDD的概述
  - RDD的属性
2. RDD的创建方式
  - 从文件系统中加载数据创建RDD
  - 通过并行集合创建RDD
3. RDD的处理过程
  - RDD的整体处理流程
  - Transformation算子
  - Action算子
  - 编写WordCount词频统计案例
4. RDD的依赖关系
5. RDD机制
  - 持久化机制
  - 容错机制
6. Spark的任务调度
  - DAG的概念
  - 任务调度流程

核心概念和抽象

RDD

RDD
 这是一个核心抽象,既能实现计算的高效执行,又能灵活方便的形式化定义计算

为什么需要一个新的抽象?

MapReduce中的迭代计算:
 1. 后续jobs之间的关系仅为用户代码所知,而不是框架。所以,框架无法优化整个计算。
 2. 框架必须可靠地持久保存中间数据,从而产生过多的IO

Spark将数据保存在内存中,有效地消除了中间磁盘持久性,从而改善了完成时间

RDD Operations

  1. RDDs支持两种类型操作:

    • Transformations:从已的数据集中创建一个新的数据集。
    • Actions:在数据集上执行完一个计算后,向driver program返回一个值。

    例如:
    map:是一个transformation,通过一个函数传递每个数据集元素,并返回一个表示结果的新RDD。
    reduce:是一个action,使用某个函数聚合RDD的所有元素,并将最终结果返回给driver program

  2. Spark中所有的transformations都是惰性的,因为他们不会立即计算他们的结果。相反,他们只记得应用于某些基础数据集的转换。transformations仅在actions需要将结果返回到driver program时计算。这种设计使Spark能够更有效地运行。

  3. 默认情况下,当每一次在转换后的RDD上执行一个action时,它都会重新计算。但是,也可以使用持久化(或缓存)的方法在内存中保留RDD,在这种情况下,Spark会在群集上保留元素,以便在下次查询时更快地访问。 当然也是支持在磁盘上保留RDD,或在多个节点之间复制的。

Basics

1  lines = sc.textFile("data.txt")
2  lineLengths = lines.map(lambda s: len(s))
3  totalLength = lineLengths.reduce(lambda a, b: a + b)

1: 从外部文件定义基础RDD。 此数据集未加载到内存中或以其他方式操作:lines 仅仅是指向文件的指针。
2: 将 lineLengths 定义为 map 转换的结果。由于惰性 lineLengths 不会立即计算。
3: 最终运行 reduce,是一个 action。此时Spark将计算分解为在不同机器上运行的 Tasks,并且每台机器都运行其 map 的部分和本地的 reduce,仅返回其对 driver program 的结果。

如果稍后需要再次使用 lineLengths,可以用lineLengths.persist()reduce 之前,将 lineLengths 在第一次计算之后保存在内存中。

Understanding closures

Spark的一个难点是在跨集群执行代码时理解变量和方法的范围和生命周期。 修改其范围之外的变量的RDD操作可能经常引起混淆。
函数式编程:理解闭包和延迟计算

1. Example
根据是否在运行在同一JVM中可能表现不同
Spark应用程序运行:本地模式 vs. 集群模式

使用foreach()增加counter

counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)

2. Local vs. cluster modes
- 上述代码的行为是未定义的,并且不同模式下运行情况不同。为了执行Job,Spark将RDD操作的处理分解为Tasks,每个TaskExecutor执行。在执行之前,Spark会计算Task的闭包。闭包是Executor在RDD上进行计算的时候必须可见的那些变量和方法(在这种情况下是foreach())。闭包会被序列化并发送给每个Executor

  • 发送给每个Executor的闭包中的变量是副本,因此,当foreach()函数内引用counter时,它不再是driver节点上的counterdriver节点的内存中仍有一个counter,但该变量是对Executor不可见的!Executor只能看到序列化闭包的副本。因此,counter的最终值仍然为0,因为counter上的所有操作都引用了序列化闭包内的值。

  • 在本地模式,在某些情况下,该foreach()函数实际上将在与driver相同的JVM内执行,并且会引用相同的原始counter,这样是可能实际更新它。

Spark中的Accumulator专门用于提供一种机制,用于在集群中的工作节点之间执行拆分时安全地更新变量。

Action

RDD Persistence

Resiliency

Spark如何实现弹性计算?(尽管集群出现机器故障,但仍可以继续计算操作)
1. tracking lineage
2. assuming deterministic & side-effect free execution of transformations(including closures)
3. assuming idempotency for actions
4. increasing durability of a data set 提高数据集的持久性

It is important to keep in mind that all the closures pass to Spark, must be deterministic and side effect free. Actions require a stronger property, idempotency.
 所有的闭包都传递给Spark,必须是确定性的,并且没有副作用。

  • What is the lineage?
    分区依赖关系图,包含计算中涉及数据源的所有分区。
  • What happens if the dependencies of a failed partition fails as well ?
     重新启动计算。首先重新计算这个分区的依赖,然后再计算这个分区。

Key Questions:
1. 数据集必须具备哪些功能才能实现为RDD?
partitions, iterator and dependencies

高级主题

Execution & Scheduling

当在Spark上运行我们的应用程序 ( 调用一个Action ) 时会发生什么?
Spark
1. SparkContext:是应用的核心
- 告诉应用如何访问集群
- 协调集群上的进程集来运行我们的应用
- 在同一应用程序内,调度多个并发作业
- 在不同应用程序间,控制动态资源分配

Cluster Manager: 用于获取群集资源的一个外部服务。例如:YARN, Mesos or a standalone Spark cluster.(资源经纪人)

  1. Jobs, stages, tasks
    Job:在响应Spark action时而产生的活动
    stages:将Job分解成更小的tasks集合,叫做stages
    Tasksjob scheduler为所有的job stage创建Tasks
    Task:由Executor执行的一个单位工作

Action -> Job -> Job stages -> Tasks
最终,Tasks被分发给Executors,执行实际的工作。

Example:
1. Invoking an action

Job stages 和 Tasks的区别?
- Job stage: 是跨越物化边界的流水线计算。定义在RDD level,不是可立即可执行的。a pipelined computation spanning between materialization boundaries. not immediately executable.

  • Task: 是绑定到特定分区的job stage,是可立即可执行的。a job stage bound to particular partitions. immediately executable.

The idea behind the job stages is to pipeline computation as much as possible, avoiding the unnecessary data materializations.

  1. Transformations with narrow dependencies allow pipelining

    For example, if you applied two filter transformations in a row, it is not necessary to serialize and deserialize data in between. You can simply pass the data through the next predicate. Data materialization occurs in a few places. when reading, shuffling, or passing data to an action. This is where the distinction between narrow and wide dependencies comes up.
    - Materialization happens when reading, shuffling or passing data to an action.
    Narrow dependencies allow pipelining.
    Wide dependencies forbid it.

Caching & Persistence

1. Spark如何管理中间数据?
2. 如何向Spark提示我们的访问模式以获得更好的弹性和性能?

Shared Variables

  • Spark中的第二个抽象是可以在并行操作中使用的共享变量
  • 默认情况下,当Spark并行运行一个函数作为不同节点上的一组Tasks时,它会将函数中使用的每个变量的副本发送给每个Task。即Spark实际上操作的是这个函数所用变量的一个独立副本。这些变量会被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回driver program
  • 有时,变量需要跨任务共享,或者在任务和驱动程序之间共享。通常跨任务的读写共享变量是低效的,但是,Spark还是为两种常见的使用模式提供了两种有限的共享变量:广播变量 ( Broadcast Variables ) 和累加器 ( Accumulator )

Broadcast Variables

BroadcastVariable
1. 为什么需要广播变量?
如果我们要在分布式计算里面分发大的对象,例如:字典,模型等,这个都会由Driver端进行分发。一般来讲,如果这个变量不是广播变量,那么每个Task就会分发一份,这在Task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗Task服务器上的资源,如果将这个变量声明为广播变量,那么只是每个Executor拥有一份,由这个Executor启动的Task会共享这个变量,节省了通信的成本和服务器的资源。

注:
- 广播变量是只读的共享变量
- 用于共享字典和模型
- 能不能将一个RDD使用广播变量广播出去?
不能,因为RDD是不存储数据的。可以将RDD的结果广播出去
- 广播变量只能在Driver端定义和修改,不能在Executor端定义和修改
- 如果Executor端用到了Driver的变量,如果不使用广播变量Executor有多少Task就有多少Driver端的变量副本
- 如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本

2. 使用广播变量
通过调用SparkContext.broadcast(v)从变量v创建广播变量。 广播变量是v的包装器,可以通过调用value方法访问其值:

>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>

>> broadcastVar.value
[1, 2, 3]

创建广播变量后,应该在群集上运行的任何函数中使用它而不是值v,这样v不会多次传送到节点。 另外,在广播之后不应修改对象v,以确保所有节点获得广播变量的相同值(例如,如果稍后将变量发送到新节点)。

Accumulators

在spark应用程序中,我们经常会有这样的需求,如异常监控,调试,记录符合某特性的数据的数目,这种需求都需要用到计数器,如果一个变量不被声明为一个Accumulator,那么它将在被改变时不会再driver端进行全局汇总,即在分布式运行时每个Task运行的只是原始变量的一个副本,并不能改变原始变量的值,但是当这个变量被声明为Accumulator后,该变量就会有分布式计数的功能。

1. 使用累加器
- Driver端创建:SparkContext.accumulator(v)
- Executor端更新:集群上运行的Task更新:add+=
- Driver端读取:value

>> accum = sc.accumulator(0)
>> accum
Accumulator<id=0, value=0>

>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

>> accum.value
10

注:
累加器不会改变Spark的惰性模型。 如果在RDD上的操作中更新它们,则只有在RDD作为action部分计算时才更新它的值。 因此,在像map()这样的惰性transformation中进行累积器更新时,不能保证执行累加器更新。

accum = sc.accumulator(0)
def g(x):
    accum.add(x)
    return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the `map` to be computed.

References

[1] Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing
[2] RDD Programming Guide
[3] Coursera: Spark RDD
[4] StackOverflow: Internal Work of Spark
[5] Advanced Apache Spark- Sameer Farooqui (Databricks)
[6] A Deeper Understanding of Spark Internals - Aaron Davidson (Databricks)
[7] Introduction to AmpLab Spark Internals


Read more:

Related posts: