本文主要列出了一些 Apache Spark 的函数的使用,配上了具体的代码示例,更方便初学者的使用(我自己就是初学者hh)

首先,分享一个网站:https://nbviewer.org/github/jkthompson/pyspark-pictures/blob/master/pyspark-pictures.ipynb

里面每个函数都配有图和案例,非常的实用!

RDD转换操作(RDD Transformation)

Spark 转换操作是一个从现有的 RDD 生成新 RDD 的函数。它以 RDD 作为输入参数,生成并输出一个或多个RDD。每当我们使用任何一个转换操作函数的时候,它都会生成新的 RDD。由于 RDD 本质上是不可变的,因此输入的 RDD 也是不可变的。

转换操作本质上是一种懒加载,也就是说不会立即执行转换函数,只有当我们调用执行操作函数的时候才会执行转换操作函数。转换操作最基本最常用的两个函数是:map() 和 fliter() 函数。转换后生成的 RDD 始终与其父 RDD 不同。转换后的RDD 可能变得比小(如:filter(), count(), distinct(), sample()),也可能比大(如:flatMap(), union(), Cartesian())甚至大小一样的(如:map())。

map(func)

map(func) 函数遍历 RDD 中的每一行并拆分为新的 RDD。使用接受任何函数作为输入参数的 map() 函数,并且该函数(入参函数)作用到 RDD 的每个元素上面。

在 map() 函数中,我们可以灵活地选择 RDD 的输入和返回类型。例如,我们可以将字符串作为RDD的输入类型,在应用map() 函数之后,返回的 RDD 可以是布尔类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
def sub(value):
return (value - 1)

data = range(1, 5)

# 需要转换成 RDD
rangeRDD = sc.parallelize(data)

subRDD = rangeRDD.map(sub)
# 也可以使用 lambda 表达式
# subRDD = rangeRDD.map(lambda x: x-1)

print(subRDD.collect())

注意,如果数据量很大的情况下,直接调用 collect() 会造成一些问题,所以再这之前可以调用譬如 filter 等函数,来缩小要显示的数据量。而collect() 也是一个 action operations,这意味着从这里 Spark 才开始执行计算的操作。

一些用途,比如说我们想将统计一篇文章每个单词出现的频率,我们可以把所有的单词放在一个列表(比如下文的 data 中),然后使用 map() 方法,将 data 中每个单词记录一次。

1
2
3
4
5
6
data = ['d', 'e', 'd', 'b', 'a']

rangeRDD = sc.parallelize(data)
subRDD = rangeRDD.map(lambda x: [x, 1])

print(subRDD.collect())
1
[['d', 1], ['e', 1], ['d', 1], ['b', 1], ['a', 1]]

通常,我们会写成这样

1
2
3
4
5
6
data = ['d', 'e', 'd', 'b', 'a']

rangeRDD = sc.parallelize(data)
subRDD = rangeRDD.map(lambda x: (x, 1)) #看这里

print(subRDD.collect())
1
[('d', 1), ('e', 1), ('d', 1), ('b', 1), ('a', 1)]

当然 map() 同样可以接受多个参数的函数,比如以下例子:

1
2
3
4
5
6
7
8
def sub(x, y):
return x-y

data = [1,2,3,4]
rangeRDD = sc.parallelize(data)
subRDD = rangeRDD.map(lambda x: sub(x, 2))

print(subRDD.collect())
1
[-1, 0, 1, 2]

又或者是:

1
2
3
4
5
6
7
8
def sub(x, y):
return (x[0], x[1]-y)

data = [('a', 1), ('e', 3), ('s', 5), ('s', 1)]
rangeRDD = sc.parallelize(data)
subRDD = rangeRDD.map(lambda x: sub(x, 1))

print(subRDD.collect())
1
[('a', 0), ('e', 2), ('s', 4), ('s', 0)]

flatMap()

通过首先将一个函数应用于此 RDD 的所有元素,然后展平结果来返回一个新的 RDD。这里的展平可以理解为把高维的数组变成一维的操作。

1
2
3
4
5
6
7
8
rdd2 = sc.parallelize(["hello SamShare", "hello PySpark"])
print(rdd2.collect())
print(rdd2.map(lambda x: x.split(" ")).collect())
print(rdd2.flatMap(lambda x: x.split(" ")).collect())

# ['hello SamShare', 'hello PySpark']
# [['hello', 'SamShare'], ['hello', 'PySpark']]
# ['hello', 'SamShare', 'hello', 'PySpark']
1
2
3
4
5
>>> rdd = sc.parallelize([2, 3, 4])
>>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
[1, 1, 1, 2, 2, 3]
>>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]

map() 的区别:https://blog.csdn.net/CYJ2014go/article/details/83014075

再举例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Let's create a new base RDD to work from
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)

# Use map
singularAndPluralWordsRDDMap = wordsRDD.map(lambda x: (x, x + 's'))
# Use flatMap
singularAndPluralWordsRDD = wordsRDD.flatMap(lambda x: (x, x + 's'))

# View the results
print(singularAndPluralWordsRDDMap.collect())
print(singularAndPluralWordsRDD.collect())
# View the number of elements in the RDD
print(singularAndPluralWordsRDDMap.count())
print(singularAndPluralWordsRDD.count())
1
2
3
4
[('cat', 'cats'), ('elephant', 'elephants'), ('rat', 'rats'), ('rat', 'rats'), ('cat', 'cats')]
['cat', 'cats', 'elephant', 'elephants', 'rat', 'rats', 'rat', 'rats', 'cat', 'cats']
5
10

再举例子:

1
2
3
simpleRDD = sc.parallelize([2, 3, 4])
print(simpleRDD.map(lambda x: range(1, x)).collect())
print(simpleRDD.flatMap(lambda x: range(1, x)).collect())
1
2
[range(1, 2), range(1, 3), range(1, 4)]
[1, 1, 2, 1, 2, 3]

filter(func)

Spark RDD filter() 函数返回一个新的 RDD,其中仅包含满足谓词的元素。这是一个窄转换操作,因为它不会把数据从一个分区映射到多个分区。例如,假设 RDD 包含前五个自然数(1、2、3、4和5),并且谓词检查偶数。那么过滤器后的结果 RDD 中就只包含偶数,即2和4。

简单来说,这是一个用于过滤掉特定数据的函数。

1
2
3
4
5
6
7
8
9
10
11
12
def ten(value):
if (value < 10):
return True
else:
return False

data = range(1, 15)
subRDD = sc.parallelize(data, 8)

filteredRDD = subRDD.filter(ten)

print(filteredRDD.collect())

同样可以用 Python 的 lambda 表达式:

1
2
3
4
5
data = range(1, 15)
subRDD = sc.parallelize(data)
lambdaRDD = subRDD.filter(lambda x: x < 10)
lambdaRDD.collect()
# [1, 2, 3, 4, 5, 6, 7, 8, 9]

groupByKey()

将 RDD 中每个键的值分组为一个序列。使用 numPartitions 分区对生成的 RDD 进行哈希分区。

1
2
3
4
5
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.groupByKey().mapValues(len).collect())
[('a', 2), ('b', 1)]
>>> sorted(rdd.groupByKey().mapValues(list).collect())
[('a', [1, 1]), ('b', [1])]
1
2
3
4
5
6
7
8
pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])
# mapValues only used to improve format for printing
print(pairRDD.groupByKey().mapValues(lambda x: list(x)).collect())

# Different ways to sum by key
print(pairRDD.groupByKey().map(lambda kv: (kv[0], sum(kv[1]))).collect())
# Using mapValues, which is recommended when they key doesn't change
print(pairRDD.groupByKey().mapValues(lambda x: sum(x)).collect())
1
2
3
[('b', [1]), ('a', [1, 2])]
[('b', 1), ('a', 3)]
[('b', 1), ('a', 3)]

reduceByKey()

使用关联和可交换的 reduce 函数合并每个键的值。 这还将在将结果发送到 reducer 之前在每个映射器上执行本地合并,类似于 MapReduce 中的“组合器”。 输出将使用 numPartitions 分区进行分区,如果未指定 numPartitions,则使用默认并行级别。默认分区程序是散列分区。

1
2
3
4
>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]

reduceByKey() 将具有相同键的对聚集在一起,并一次将一个函数应用于两个关联的值。 reduceByKey() 通过首先在每个分区内基于每个键然后跨分区应用该函数来运行。

也可以这样写

1
rdd.reduceByKey(lambda a, b : a + b).collect()

虽然 groupByKey() 和 reduceByKey() 转换通常可用于解决相同的问题并产生相同的答案,但 reduceByKey() 转换对于大型分布式数据集的效果要好得多。这是因为 Spark 知道它可以在跨节点洗牌(重新分配)数据之前将输出与每个分区上的公共密钥结合起来。只有在随机播放发生之前操作不会从减少数据中获益时才使用 groupByKey()。

combineByKey()

组合元素时可以使用 combineByKey() 但返回类型与输入值类型不同。

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.combineByKey.html?highlight=combinebykey#pyspark.RDD.combineByKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
def to_list(a):
return [a]

def append(a, b):
a.append(b)
return a

def extend(a, b):
a.extend(b)
return a

sorted(x.combineByKey(to_list, append, extend).collect())
# [('a', [1, 2]), ('b', [1])]

sortBy()

sortBy(keyfunc) 可以根据 keyfunc 进行排序,注意这个返回的是一个 RDD。而 takeOrdered() 虽然也可以排序,但是返回的是一个 List

1
2
3
4
5
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

foldByKey()

foldByKey() 使用关联函数和中性“零值”合并每个键的值。

distinct 去重元素

例子如下

1
2
3
4
5
rdd = sc.parallelize([2, 2, 4, 8, 8, 8, 8, 16, 32, 32])
print(rdd.collect())
print(rdd.distinct().collect())
# [2, 2, 4, 8, 8, 8, 8, 16, 32, 32]
# [4, 8, 16, 32, 2]

RDD执行操作(RDD Action)

转换操作函数从一个RDD转换成另一个 RDD,但当我们处理实际的数据集时会调用执行函数进行数据的处理。在结果之后触发执行操作函数时,不会像转换操作函数那样生成新的RDD。因此,执行操作函数是提供非RDD值的Spark RDD操作。

执行函数生成的值将被存储到驱动程序或外部存储系统中。因此形成了RDD的惰性特性。执行操作函数是从执行器向驱动程序发送数据的方法之一,执行器代理执行操作任务;而驱动程序是一个协调工作人员和任务执行的JVM进程。Spark的部分执行函数有如下这些:

count() 返回 RDD 中的元素数量

take(n)

返回 RDD 的前 num 个元素。

1
sc.parallelize([2, 3, 4, 5, 6]).take(10)

first()

first() 操作返回 RDD 的第一个元素,等同于 take(1)

takeOrdered()

从按升序排列或按可选键函数指定的顺序排列的 RDD 中获取 N 个元素。

1
2
3
4
sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
# [1, 2, 3, 4, 5, 6]
sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
# [10, 9, 7, 6, 5, 4]

使用 takeOrdered() 而不是 first() 或 take() 的主要优点是 takeOrdered() 返回确定性结果,而其他两个操作可能返回不同的结果,具体取决于分区数或执行环境。

takeOrdered() 返回按升序排序的列表。 top() 操作与 takeOrdered() 类似,只是它按降序返回列表。

上面可以使用 key=lambda x: -x 实现倒叙,实际上有更加高级一点的用法。如下例:

1
2
3
4
5
6
7
8
9
10
11
tempData = sc.parallelize(['a', 'b', 'c', 'c', 'c'])
tempData1 = tempData.map(lambda x: (x, 1))
tempData2 = tempData1.reduceByKey(lambda a,b: a+b)
res = tempData2.takeOrdered(1, key=lambda x: -x[1])

print(tempData1.collect())
print(tempData2.collect())
print(res)
# [('a', 1), ('b', 1), ('c', 1), ('c', 1), ('c', 1)]
# [('b', 1), ('c', 3), ('a', 1)]
# [('c', 3)]

takeOrdered() 这个函数里 key 接受的 lambda 表达式带的参数只允许有一个,而上例的 x 相当于 [('b', 1), ('c', 3), ('a', 1)] 的每一个元素(如 ('b', 1)),lambda x: -x[1] 所做的即是依据 ('b', 1) 的第二个元素作为排序。

top()

从 RDD 中获取前 N 个元素。(即降序)

1
2
3
4
5
6
sc.parallelize([10, 4, 2, 12, 3]).top(1)
# [12]
sc.parallelize([2, 3, 4, 5, 6], 2).top(2)
# [6, 5]
sc.parallelize([10, 4, 2, 12, 3]).top(3, key=str)
# [4, 3, 2]

reduce()

reduce() 函数从 RDD 中取两个元素作为输入参数,然后输出与输入元素类型相同的结果。我们可以往 RDD 中添加元素,然后统计单词数量。它接受交换与结合运算作为参数。

1
2
3
4
5
from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
15
>>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
10

但注意不能对空数组进行 reduce

takeSample()

takeSample() 操作返回一个数组,其中包含来自数据集的随机元素样本。它接受一个 withReplacement 参数,它指定是否可以从父 RDD 中多次随机选择相同的项目(因此当 withReplacement=True 时,您可以多次取回相同的项目)。它还采用可选的种子参数,允许您为随机数生成器指定种子值,以便可以获得可重现的结果。

1
2
3
4
5
print(filteredRDD.collect())
# takeSample reusing elements
print(filteredRDD.takeSample(withReplacement=True, num=6))
# takeSample without reuse
print(filteredRDD.takeSample(withReplacement=False, num=6))
1
2
3
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[3, 0, 7, 1, 7, 6]
[7, 1, 5, 9, 0, 6]
1
2
# Set seed for predictability
print(filteredRDD.takeSample(withReplacement=False, num=6, seed=500))
1
[6, 4, 0, 1, 3, 5]

countByValue()

countByValue() 操作将 RDD 中每个唯一值的计数作为将值映射到计数的字典返回。

1
2
sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
# [(1, 2), (2, 3)]

一些辅助的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 给 RDD 设置名字
filteredRDD.setName('My Filtered RDD')
# 缓存 RDD
filteredRDD.cache()
# 查看当前 RDD 是否缓存了
print(filteredRDD.is_cached)
# 使用完了 RDD,可以停止在内存中缓存它
filteredRDD.unpersist()
# Storage level for a non cached RDD
print(filteredRDD.getStorageLevel())
filteredRDD.cache()
# Storage level for a cached RDD
print(filteredRDD.getStorageLevel())
# Note that toDebugString also provides storage information
print(filteredRDD.toDebugString())

引用参考资料

作者:孤影渡寒江 链接:https://juejin.cn/post/6844904147502759943 来源:稀土掘金 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。