Spark基础函数指南(python版)

本文主要列出了一些 Apache Spark 的函数的使用,配上了具体的代码示例,更方便初学者的使用(我自己就是初学者hh)
首先,分享一个网站:https://nbviewer.org/github/jkthompson/pyspark-pictures/blob/master/pyspark-pictures.ipynb
里面每个函数都配有图和案例,非常的实用!
RDD转换操作(RDD Transformation)
Spark 转换操作是一个从现有的 RDD 生成新 RDD 的函数。它以 RDD 作为输入参数,生成并输出一个或多个RDD。每当我们使用任何一个转换操作函数的时候,它都会生成新的 RDD。由于 RDD 本质上是不可变的,因此输入的 RDD 也是不可变的。
转换操作本质上是一种懒加载,也就是说不会立即执行转换函数,只有当我们调用执行操作函数的时候才会执行转换操作函数。转换操作最基本最常用的两个函数是:map() 和 fliter() 函数。转换后生成的 RDD 始终与其父 RDD 不同。转换后的RDD 可能变得比小(如:filter(), count(), distinct(), sample()),也可能比大(如:flatMap(), union(), Cartesian())甚至大小一样的(如:map())。
map(func)
map(func) 函数遍历 RDD 中的每一行并拆分为新的 RDD。使用接受任何函数作为输入参数的 map() 函数,并且该函数(入参函数)作用到 RDD 的每个元素上面。
在 map() 函数中,我们可以灵活地选择 RDD 的输入和返回类型。例如,我们可以将字符串作为RDD的输入类型,在应用map() 函数之后,返回的 RDD 可以是布尔类型。
1 | def sub(value): |
注意,如果数据量很大的情况下,直接调用 collect()
会造成一些问题,所以再这之前可以调用譬如 filter
等函数,来缩小要显示的数据量。而collect()
也是一个 action operations,这意味着从这里 Spark 才开始执行计算的操作。
一些用途,比如说我们想将统计一篇文章每个单词出现的频率,我们可以把所有的单词放在一个列表(比如下文的 data 中),然后使用 map()
方法,将 data 中每个单词记录一次。
1 | data = ['d', 'e', 'd', 'b', 'a'] |
1 | [['d', 1], ['e', 1], ['d', 1], ['b', 1], ['a', 1]] |
通常,我们会写成这样
1 | data = ['d', 'e', 'd', 'b', 'a'] |
1 | [('d', 1), ('e', 1), ('d', 1), ('b', 1), ('a', 1)] |
当然 map()
同样可以接受多个参数的函数,比如以下例子:
1 | def sub(x, y): |
1 | [-1, 0, 1, 2] |
又或者是:
1 | def sub(x, y): |
1 | [('a', 0), ('e', 2), ('s', 4), ('s', 0)] |
flatMap()
通过首先将一个函数应用于此 RDD 的所有元素,然后展平结果来返回一个新的 RDD。这里的展平可以理解为把高维的数组变成一维的操作。
1 | rdd2 = sc.parallelize(["hello SamShare", "hello PySpark"]) |
1 | 2, 3, 4]) rdd = sc.parallelize([ |
跟 map()
的区别:https://blog.csdn.net/CYJ2014go/article/details/83014075
再举例子:
1 | # Let's create a new base RDD to work from |
1 | [('cat', 'cats'), ('elephant', 'elephants'), ('rat', 'rats'), ('rat', 'rats'), ('cat', 'cats')] |
再举例子:
1 | simpleRDD = sc.parallelize([2, 3, 4]) |
1 | [range(1, 2), range(1, 3), range(1, 4)] |
filter(func)
Spark RDD filter() 函数返回一个新的 RDD,其中仅包含满足谓词的元素。这是一个窄转换操作,因为它不会把数据从一个分区映射到多个分区。例如,假设 RDD 包含前五个自然数(1、2、3、4和5),并且谓词检查偶数。那么过滤器后的结果 RDD 中就只包含偶数,即2和4。
简单来说,这是一个用于过滤掉特定数据的函数。
1 | def ten(value): |
同样可以用 Python 的 lambda
表达式:
1 | data = range(1, 15) |
groupByKey()
将 RDD 中每个键的值分组为一个序列。使用 numPartitions 分区对生成的 RDD 进行哈希分区。
1 | "a", 1), ("b", 1), ("a", 1)]) rdd = sc.parallelize([( |
1 | pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)]) |
1 | [('b', [1]), ('a', [1, 2])] |
reduceByKey()
使用关联和可交换的 reduce 函数合并每个键的值。 这还将在将结果发送到 reducer 之前在每个映射器上执行本地合并,类似于 MapReduce 中的“组合器”。 输出将使用 numPartitions 分区进行分区,如果未指定 numPartitions,则使用默认并行级别。默认分区程序是散列分区。
1 | from operator import add |
reduceByKey() 将具有相同键的对聚集在一起,并一次将一个函数应用于两个关联的值。 reduceByKey() 通过首先在每个分区内基于每个键然后跨分区应用该函数来运行。
也可以这样写
1 | rdd.reduceByKey(lambda a, b : a + b).collect() |
虽然 groupByKey() 和 reduceByKey() 转换通常可用于解决相同的问题并产生相同的答案,但 reduceByKey() 转换对于大型分布式数据集的效果要好得多。这是因为 Spark 知道它可以在跨节点洗牌(重新分配)数据之前将输出与每个分区上的公共密钥结合起来。只有在随机播放发生之前操作不会从减少数据中获益时才使用 groupByKey()。
combineByKey()
组合元素时可以使用 combineByKey() 但返回类型与输入值类型不同。
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.combineByKey.html?highlight=combinebykey#pyspark.RDD.combineByKey
1 | x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)]) |
sortBy()
sortBy(keyfunc)
可以根据 keyfunc
进行排序,注意这个返回的是一个 RDD。而 takeOrdered()
虽然也可以排序,但是返回的是一个 List
。
1 | 'a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] tmp = [( |
foldByKey()
foldByKey() 使用关联函数和中性“零值”合并每个键的值。
distinct 去重元素
例子如下
1 | rdd = sc.parallelize([2, 2, 4, 8, 8, 8, 8, 16, 32, 32]) |
RDD执行操作(RDD Action)
转换操作函数从一个RDD转换成另一个 RDD,但当我们处理实际的数据集时会调用执行函数进行数据的处理。在结果之后触发执行操作函数时,不会像转换操作函数那样生成新的RDD。因此,执行操作函数是提供非RDD值的Spark RDD操作。
执行函数生成的值将被存储到驱动程序或外部存储系统中。因此形成了RDD的惰性特性。执行操作函数是从执行器向驱动程序发送数据的方法之一,执行器代理执行操作任务;而驱动程序是一个协调工作人员和任务执行的JVM进程。Spark的部分执行函数有如下这些:
count() 返回 RDD 中的元素数量
take(n)
返回 RDD 的前 num 个元素。
1 | sc.parallelize([2, 3, 4, 5, 6]).take(10) |
first()
first() 操作返回 RDD 的第一个元素,等同于 take(1)
takeOrdered()
从按升序排列或按可选键函数指定的顺序排列的 RDD 中获取 N 个元素。
1 | sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) |
使用 takeOrdered() 而不是 first() 或 take() 的主要优点是 takeOrdered() 返回确定性结果,而其他两个操作可能返回不同的结果,具体取决于分区数或执行环境。
takeOrdered() 返回按升序排序的列表。 top() 操作与 takeOrdered() 类似,只是它按降序返回列表。
上面可以使用 key=lambda x: -x
实现倒叙,实际上有更加高级一点的用法。如下例:
1 | tempData = sc.parallelize(['a', 'b', 'c', 'c', 'c']) |
takeOrdered()
这个函数里 key
接受的 lambda 表达式带的参数只允许有一个,而上例的 x
相当于 [('b', 1), ('c', 3), ('a', 1)]
的每一个元素(如 ('b', 1)
),lambda x: -x[1]
所做的即是依据 ('b', 1)
的第二个元素作为排序。
top()
从 RDD 中获取前 N 个元素。(即降序)
1 | sc.parallelize([10, 4, 2, 12, 3]).top(1) |
reduce()
reduce() 函数从 RDD 中取两个元素作为输入参数,然后输出与输入元素类型相同的结果。我们可以往 RDD 中添加元素,然后统计单词数量。它接受交换与结合运算作为参数。
1 | from operator import add |
但注意不能对空数组进行 reduce
takeSample()
takeSample() 操作返回一个数组,其中包含来自数据集的随机元素样本。它接受一个 withReplacement 参数,它指定是否可以从父 RDD 中多次随机选择相同的项目(因此当 withReplacement=True 时,您可以多次取回相同的项目)。它还采用可选的种子参数,允许您为随机数生成器指定种子值,以便可以获得可重现的结果。
1 | print(filteredRDD.collect()) |
1 | [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] |
1 | # Set seed for predictability |
1 | [6, 4, 0, 1, 3, 5] |
countByValue()
countByValue() 操作将 RDD 中每个唯一值的计数作为将值映射到计数的字典返回。
1 | sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) |
一些辅助的函数
1 | # 给 RDD 设置名字 |
引用参考资料
作者:孤影渡寒江 链接:https://juejin.cn/post/6844904147502759943 来源:稀土掘金 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。