map(func)
/** * Return a new RDD by applying a function to all elements of this RDD. */
def map[U: ClassTag](f: T => U): RDD[U]
map(func) | Return a new distributed dataset formed by passing each element of the source through a function func. |
将原RDD中的每一个元素经过func函数映射为一个新的元素形成一个新的RDD。
示例:
其中sc.parallelize第二个参数标识RDD的分区数量
val rdd = sc.parallelize(1 to 9,2)val rdd1=rdd.map(x=>x+1)
scala> val rdd = sc.parallelize(1 to 9,2)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at:24scala> rdd.take(20)res3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)scala> val rdd1=rdd.map(x=>x+1)rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at :26scala> rdd1.take(20)res5: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10)
filter(func)
/** * Return a new RDD containing only the elements that satisfy a predicate. */
def filter(f: T => Boolean): RDD[T]
filter(func) | Return a new dataset formed by selecting those elements of the source on which func returns true. |
原RDD中通过func函数结果为true的元素转换成一个新的RDD。
val rdd = sc.parallelize(1 to 9,2)val rdd1 = rdd.filter(_>=5)
scala> val rdd = sc.parallelize(1 to 9,2)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at:24scala> val rdd1 = rdd.filter(_>=5)rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at filter at :26scala> rdd1.take(10)res13: Array[Int] = Array(5, 6, 7, 8, 9)
flatMap(func)
/** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). |
和map类似,但是每一个元素可能被映射为0个或多个元素(func函数应该返回一个Seq,而不是单个的元素);实际上就是先进行map,然后再进行一次平滑(flat)处理。
val rdd = sc.parallelize(1 to 3,2)val rdd1 = rdd.flatMap( _ to 5)
scala> val rdd = sc.parallelize(1 to 3,2)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at:24scala> val rdd1 = rdd.flatMap( _ to 5)rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at flatMap at :26scala> rdd1.take(100)res14: Array[Int] = Array(1, 2, 3, 4, 5, 2, 3, 4, 5, 3, 4, 5)
mapPartitions(func)
/** * Return a new RDD by applying a function to each partition of this RDD. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
mapPartitions(func) | Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T. |
和map类似,该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器。如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的过。
计算每一个partition中元素个数
def countPartitionEle(it : Iterator[Int]) = { var result = List[Int]() var i = 0 while(it.hasNext){ i += 1 it.next } result.::(i).iterator//::在列表开头增加元素i,元素i必须用小括号包含,然后创建一个迭代器}val rdd = sc.parallelize(1 to 10, 3)val rdd1 = rdd.mapPartitions(countPartitionEle(_))rdd1.take(10)
scala> def countPartitionEle(it : Iterator[Int]) = { | var result = List[Int]() | var i = 0 | while(it.hasNext){ | i += 1 | it.next | } | result.::(i).iterator | }countPartitionEle: (it: Iterator[Int])Iterator[Int]scala> val rdd = sc.parallelize(1 to 10, 3)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at:24scala> val rdd1 = rdd.mapPartitions(countPartitionEle(_))rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at mapPartitions at :30scala> rdd1.take(10)res8: Array[Int] = Array(3, 3, 4)
mapPartitionsWithIndex(func)
/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]mapPartitionsWithIndex(func) | Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T. |
和mapPartitions类似,也是针对每个分区处理,但是func函数需要两个入参,第一个表示partition分区索引,第二个入参表示每个分区的迭代器。
def func(index :Int, it : Iterator[Int]) = { var result = List[String]() var i = "" while(it.hasNext){ i += it.next + "," } result.::(i.dropRight(1) + " at partition "+index+".").iterator}val rdd = sc.parallelize(1 to 10, 3)val rdd1 = rdd.mapPartitionsWithIndex((x,it) => func(x,it))rdd1.take(3)
scala> def func(index :Int, it : Iterator[Int]) = { | var result = List[String]() | var i = "" | while(it.hasNext){ | i += it.next + "," | } | result.::(i.dropRight(1) + " at partition "+index+".").iterator | }func: (index: Int, it: Iterator[Int])Iterator[String]scala> val rdd = sc.parallelize(1 to 10, 3)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at:24scala> val rdd1 = rdd.mapPartitionsWithIndex((x,it) => func(x,it))rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at mapPartitionsWithIndex at :28scala> rdd1.take(3)res2: Array[String] = Array(1,2,3 at partition 0. 4,5,6 at partition 1. 7,8,9,10 at partition 2.)
sample(withReplacement, fraction, seed)
/**
* Return a sampled subset of this RDD. * * @param withReplacement can elements be sampled multiple times (replaced when sampled out) * @param fraction expected size of the sample as a fraction of this RDD's size * without replacement: probability that each element is chosen; fraction must be [0, 1] * with replacement: expected number of times each element is chosen; fraction must be >= 0 * @param seed seed for the random number generator */ def sample( withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]sample(withReplacement, fraction, seed) | Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed. |
对原RDD进行采样,其中withReplacement表示是否有放回的抽样,fraction表示采样大小是原RDD的百分比,seed表示随机数生成器
scala> val rdd = sc.parallelize(1 to 10, 3)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at:24scala> val rdd1 = rdd.sample(true,0.5,0)rdd1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[4] at sample at :26scala> val rdd2 = rdd.sample(false,0.5,0)rdd2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[5] at sample at :26scala> rdd1.collectres3: Array[Int] = Array(2)scala> rdd2.collectres4: Array[Int] = Array(1, 2, 4, 5, 6, 9) scala> val rdd1 = rdd.sample(true,0.5,1)rdd1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[7] at sample at :26scala> rdd1.collectres6: Array[Int] = Array(1, 3, 7, 7, 8, 8, 9, 10)
union(otherDataset)
/**
* Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */ def union(other: RDD[T]): RDD[T]union(otherDataset) | Return a new dataset that contains the union of the elements in the source dataset and the argument. |
将两个RDD合并成一个RDD,相同的元素可能出现多次,可以使用distinct去重。
val rdd1 = sc.parallelize(1 to 5,2)val rdd2 = sc.parallelize(1 to 5,3)val rdd3 = sc.parallelize(2 to 8,3)val rdd = rdd1.union(rdd2).union(rdd3)rdd.collectrdd.distinct.collect
scala> val rdd1 = sc.parallelize(1 to 5,2)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at:24scala> val rdd2 = sc.parallelize(1 to 5,3)rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at :24scala> val rdd3 = sc.parallelize(2 to 8,3)rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at :24scala> val rdd = rdd1.union(rdd2).union(rdd3)rdd: org.apache.spark.rdd.RDD[Int] = UnionRDD[12] at union at :30scala> rdd.collectres7: Array[Int] = Array(1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 2, 3, 4, 5, 6, 7, 8) scala> rdd.distinct.collectres8: Array[Int] = Array(8, 1, 2, 3, 4, 5, 6, 7)
intersection(otherDataset)
/**
* Return the intersection of this RDD and another one. The output will not contain any duplicate * elements, even if the input RDDs did. * * Note that this method performs a shuffle internally. */ def intersection(other: RDD[T]): RDD[T]intersection(otherDataset) | Return a new RDD that contains the intersection of elements in the source dataset and the argument. |
两个RDD共同的元素组合一个新的RDD
val rdd1 = sc.parallelize(1 to 5,2)val rdd2 = sc.parallelize(4 to 8,3)val rdd = rdd1.intersection(rdd2)rdd.collect
scala> val rdd1 = sc.parallelize(1 to 5,2)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at:24scala> val rdd2 = sc.parallelize(4 to 8,3)rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at :24scala> val rdd = rdd1.intersection(rdd2)rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[23] at intersection at :28scala> rdd.collectres9: Array[Int] = Array(4, 5)
scala> rdd.partitions.length
res10: Int = 3
/**
* Return the intersection of this RDD and another one. The output will not contain any duplicate * elements, even if the input RDDs did. Performs a hash partition across the cluster * * Note that this method performs a shuffle internally. * * @param numPartitions How many partitions to use in the resulting RDD */ def intersection(other: RDD[T], numPartitions: Int): RDD[T]同def intersection(other: RDD[T]): RDD[T],numPartitions表示结果RDD的分区数量
scala> val rdd = rdd1.intersection(rdd2,1)rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[41] at intersection at:28scala> rdd.partitions.lengthres12: Int = 1
/**
* Return the intersection of this RDD and another one. The output will not contain any duplicate * elements, even if the input RDDs did. * * Note that this method performs a shuffle internally. * * @param partitioner Partitioner to use for the resulting RDD */ def intersection( other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
自定义分区
自定义分区类必须继承Partitioner,方法numPartitions设置分区数量,getPartition获取分区索引。
class MyPartitioner(numParts:Int) extends org.apache.spark.Partitioner{ override def numPartitions: Int = numParts override def getPartition(key: Any): Int = { key.toString.toInt%numPartitions }}
val rdd1 = sc.parallelize(1 to 15,2)val rdd2 = sc.parallelize(5 to 25,2)val rdd = rdd1.intersection(rdd2,new MyPartitioner(5))rdd.collectrdd.partitions.lengthdef func(index :Int, it : Iterator[Int]) = { var result = List[String]() var i = "" while(it.hasNext){ i += it.next + "," } result.::(i.dropRight(1) + " at partition "+index+".").iterator}val rdd3 = rdd.mapPartitionsWithIndex((x,it) => func(x,it))rdd3.collect
scala> val rdd1 = sc.parallelize(1 to 15,2)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at:27scala> val rdd2 = sc.parallelize(5 to 25,2)rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[55] at parallelize at :27scala> val rdd = rdd1.intersection(rdd2,newMyPartitioner(5)) :31: error: not found: value newMyPartitioner val rdd = rdd1.intersection(rdd2,newMyPartitioner(5)) ^scala> val rdd = rdd1.intersection(rdd2,new MyPartitioner(5))rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[61] at intersection at :32scala> rdd.collectres25: Array[Int] = Array(15, 10, 5, 11, 6, 7, 12, 13, 8, 14, 9)scala> rdd.partitions.lengthres26: Int = 5scala> def func(index :Int, it : Iterator[Int]) = { | var result = List[String]() | var i = "" | while(it.hasNext){ | i += it.next + "," | } | result.::(i.dropRight(1) + " at partition "+index+".").iterator | }func: (index: Int, it: Iterator[Int])Iterator[String]scala> val rdd3 = rdd.mapPartitionsWithIndex((x,it) => func(x,it))rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[62] at mapPartitionsWithIndex at :36scala> rdd3.collectres27: Array[String] = Array(15,10,5 at partition 0., 11,6 at partition 1., 7,12 at partition 2., 13,8 at partition 3., 14,9 at partition 4.)
distinct([numTasks])
distinct([numTasks]) | Return a new dataset that contains the distinct elements of the source dataset. |
使用原RDD中的元素组成一个没有重复元素的RDD
/**
* Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]numPartitions表示结果RDD的分区数量
val a = Array(1,1,1,2,2,3,4,5)val rdd = sc.parallelize(a,2)rdd.collectval rdd1 = rdd.distinct(1)rdd1.collectrdd1.partitions.length
scala> val a = Array(1,1,1,2,2,3,4,5)a: Array[Int] = Array(1, 1, 1, 2, 2, 3, 4, 5)scala> val rdd = sc.parallelize(a)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[42] at parallelize at:26scala> val rdd = sc.parallelize(a,2)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[43] at parallelize at :26scala> rdd.collectres13: Array[Int] = Array(1, 1, 1, 2, 2, 3, 4, 5) scala> val rdd1 = rdd.distinct(1)rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[46] at distinct at :28scala> rdd1.collectres14: Array[Int] = Array(4, 1, 3, 5, 2)scala> rdd1.partitions.lengthres15: Int = 1
/**
* Return a new RDD containing the distinct elements in this RDD. */ def distinct(): RDD[T]同distinct(numPartitions: Int),不同的是结果RDD中partition数量依赖父RDD。
scala> val rdd1 = rdd.distinct()rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[49] at distinct at:28scala> rdd1.partitions.lengthres16: Int = 2scala> rdd1.collectres17: Array[Int] = Array(4, 2, 1, 3, 5)
keyBy(func)
/** * Creates tuples of the elements in this RDD by applying `f`. */ def keyBy[K](f: T => K): RDD[(K, T)]
使用func为RDD每一个元素创建一个key-value对元素
val rdd = sc.parallelize(1 to 9 ,2)val rdd1 = rdd.keyBy(_%3)rdd1.collect
scala> val rdd = sc.parallelize(1 to 9 ,2)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at:24scala> val rdd1 = rdd.keyBy(_%3)rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[1] at keyBy at :26scala> rdd1.collectres0: Array[(Int, Int)] = Array((1,1), (2,2), (0,3), (1,4), (2,5), (0,6), (1,7), (2,8), (0,9))
/** * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with the existing partitioner/parallelism level. The ordering of elements * within each group is not guaranteed, and may even differ each time the resulting RDD is * evaluated. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ def groupByKey(): RDD[(K, Iterable[V])]
val rdd = sc.parallelize(1 to 9 ,2)val rdd1 = rdd.keyBy(_%3)val rdd2 = rdd1.groupByKey()rdd2.collect
scala> val rdd = sc.parallelize(1 to 9 ,2)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at:24scala> val rdd1 = rdd.keyBy(_%3)rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[3] at keyBy at :26scala> val rdd2 = rdd1.groupByKey()rdd2: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[4] at groupByKey at :28scala> rdd2.collectres1: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(3, 6, 9)), (2,CompactBuffer(2, 5, 8)), (1,CompactBuffer(1, 4, 7)))
/** * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with into `numPartitions` partitions. The ordering of elements within * each group is not guaranteed, and may even differ each time the resulting RDD is evaluated. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. * * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]]. */ def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] 同groupByKey(),只是指定了分区数量。
scala> val rdd = sc.parallelize(1 to 9 ,2)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at:24scala> val rdd1 = rdd.keyBy(_%3)rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[6] at keyBy at :26scala> val rdd2 = rdd1.groupByKey(3)rdd2: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[7] at groupByKey at :28scala> rdd2.partitions.lengthres2: Int = 3
/** * Group the values for each key in the RDD into a single sequence. Allows controlling the * partitioning of the resulting key-value pair RDD by passing a Partitioner. * The ordering of elements within each group is not guaranteed, and may even differ * each time the resulting RDD is evaluated. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. * * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]]. */ def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
class MyPartitioner(numParts:Int) extends org.apache.spark.Partitioner{ override def numPartitions: Int = numParts override def getPartition(key: Any): Int = { key.toString.toInt%numPartitions }}val rdd = sc.parallelize(1 to 9 ,2)val rdd1 = rdd.keyBy(_%3)rdd1.collectval rdd2 = rdd1.groupByKey(new MyPartitioner(2))rdd2.collect
scala> class MyPartitioner(numParts:Int) extends org.apache.spark.Partitioner{ | override def numPartitions: Int = numParts | override def getPartition(key: Any): Int = { | key.toString.toInt%numPartitions | } | }defined class MyPartitionerscala> val rdd = sc.parallelize(1 to 9 ,2)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at:24scala> val rdd1 = rdd.keyBy(_%3)rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[9] at keyBy at :26scala> val rdd2 = rdd1.groupByKey(new MyPartitioner(2))rdd2: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[10] at groupByKey at :29scala> rdd2.collectres3: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(3, 6, 9)), (2,CompactBuffer(2, 5, 8)), (1,CompactBuffer(1, 4, 7)))scala> rdd1.collectres4: Array[(Int, Int)] = Array((1,1), (2,2), (0,3), (1,4), (2,5), (0,6), (1,7), (2,8), (0,9))scala> rdd2.partitions.lengthres5: Int = 2
groupBy(func)
/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements * mapping to that key. The ordering of elements within each group is not guaranteed, and * may even differ each time the resulting RDD is evaluated. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]def func(x:Int) = {x%3}val rdd = sc.parallelize(1 to 10,2)val rdd1 = rdd.groupBy(func(_))rdd1.collect
scala> def func(x:Int) = {x%3}func: (x: Int)Intscala> val rdd = sc.parallelize(1 to 10,2)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[64] at parallelize at:27scala> val rdd1 = rdd.groupBy(func(_))rdd1: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[66] at groupBy at :31scala> rdd1.collectres29: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(3, 6, 9)), (2,CompactBuffer(2, 5, 8)), (1,CompactBuffer(1, 4, 7, 10)))
/** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. The ordering of elements within each group is not guaranteed, and * may even differ each time the resulting RDD is evaluated. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ def groupBy[K]( f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
同groupBy[K](f: T => K),只是指定了分区数量。
def func(x:Int) = {x%3}val rdd = sc.parallelize(1 to 10,2)val rdd1 = rdd.groupBy(func(_),3)rdd1.collect
scala> def func(x:Int) = {x%3}func: (x: Int)Intscala> val rdd = sc.parallelize(1 to 10,2)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at:24scala> val rdd1 = rdd.groupBy(func(_),3)rdd1: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[13] at groupBy at :28scala> rdd1.partitions.lengthres6: Int = 3scala> rdd1.collectres7: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(3, 6, 9)), (1,CompactBuffer(1, 4, 7, 10)), (2,CompactBuffer(2, 5, 8)))
/** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements * mapping to that key. The ordering of elements within each group is not guaranteed, and * may even differ each time the resulting RDD is evaluated. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null) : RDD[(K, Iterable[T])]
class MyPartitioner(numParts:Int) extends org.apache.spark.Partitioner{ override def numPartitions: Int = numParts override def getPartition(key: Any): Int = { key.toString.toInt%numPartitions }}def func(x:Int) = {x%3}val rdd = sc.parallelize(1 to 10,2)val rdd1 = rdd.groupBy(func(_),new MyPartitioner(3))rdd1.collectrdd1.partitions.length
scala> class MyPartitioner(numParts:Int) extends org.apache.spark.Partitioner{ | override def numPartitions: Int = numParts | override def getPartition(key: Any): Int = { | key.toString.toInt%numPartitions | } | }defined class MyPartitionerscala> def func(x:Int) = {x%3}func: (x: Int)Intscala> val rdd = sc.parallelize(1 to 10,2)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at:24scala> val rdd1 = rdd.groupBy(func(_),new MyPartitioner(3))rdd1: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[16] at groupBy at :29scala> rdd1.collectres8: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(3, 6, 9)), (1,CompactBuffer(1, 4, 7, 10)), (2,CompactBuffer(2, 5, 8)))scala> rdd1.partitions.lengthres9: Int = 3
reduceByKey(func, [numTasks])
reduceByKey(func, [numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey , the number of reduce tasks is configurable through an optional second argument. |
对每一个key的所有value使用func函数进行聚合 /** * Merge the values for each key using an associative and commutative reduce function. This will * also perform the merging locally on each mapper before sending results to a reducer, similarly * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ * parallelism level. */ def reduceByKey(func: (V, V) => V): RDD[(K, V)]
val words = Array("one", "two", "two", "three", "three", "three")val rdd = sc.parallelize(words).map(word => (word, 1))val rdd1 = rdd.reduceByKey(_ + _)rdd1.collect
scala> val words = Array("one", "two", "two", "three", "three", "three") words: Array[String] = Array(one, two, two, three, three, three)scala> val rdd = sc.parallelize(words).map(word => (word, 1)) rdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[18] at map at:26scala> val rdd1 = rdd.reduceByKey(_ + _) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at reduceByKey at :28scala> rdd1.collectres10: Array[(String, Int)] = Array((two,2), (one,1), (three,3))
/** * Merge the values for each key using an associative and commutative reduce function. This will * also perform the merging locally on each mapper before sending results to a reducer, similarly * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. */ def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] 同上,只是指定了分区数量
scala> val rdd2 = rdd.reduceByKey(_ + _,3) rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[20] at reduceByKey at:28scala> rdd2.collectres11: Array[(String, Int)] = Array((two,2), (one,1), (three,3)) scala> rdd2.partitions.lengthres12: Int = 3
/** * Merge the values for each key using an associative and commutative reduce function. This will * also perform the merging locally on each mapper before sending results to a reducer, similarly * to a "combiner" in MapReduce. */ def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
同上,使用partitioner自定义分区