spark cache checkpoints mapPartitionsWithIndex aggregate

2022年6月15日23:22:08 发表评论 2,504 views

 

 

scala> var rdd1 = sc.textFile("hdfs://bigdata111:9000/spark/test_Cache.txt")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://bigdata166:9000/spark/test_Cache.txt MapPartitionsRDD[1] at textFile at <console>:24

scala> sc.setCheckpointDir("hdfs://bigdata166:9000/spark/")

scala> sc.setCheckpointDir("hdfs://bigdata166:9000/spark/checkpoint")

scala> rdd1.count
scala> rdd1.cache    //缓存
scala> rdd1.count
res2: Long = 1846904 

//
scala> def fun1(index:Int,itea:Iterator[Int]):Iterator[String] = {
     | itea.toList.map(x => "[partid:" + index +", value="+x+"]" ).iterator
     | }
fun1: (index: Int, itea: Iterator[Int])Iterator[String]

scala> rdd1.mapPartitions
mapPartitions   mapPartitionsWithIndex

scala> rdd1.mapPartitionsWithIndex(fun1).collect
res3: Array[String] = Array(
    [partid:0, value=1], [partid:0, value=2], [partid:0, value=3], 
    [partid:1, value=4], [partid:1, value=5], [partid:1, value=6], 
    [partid:2, value=7], [partid:2, value=8], [partid:2, value=9])

 

aggregate:聚合,

先局部聚合,然后全局聚合

求每个分区的最大值,然后求和

现求每个分区的最大值

求和

zeroValue: U:初始化,需要赋值:初始值在局部操作起作用,全局操作也起作用

后面两个函数参数

第一个函数:表示局部操作

第二个:表示全局操作

scala> var rdd1 = sc.parallelize(List(1,2,3,4,5),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> rdd1.mapPartitionsWithIndex(fun1).collect
res4: Array[String] = Array(
    [partid:0, value=1], [partid:0, value=2],
    [partid:1, value=3], [partid:1, value=4], [partid:1, value=5])

scala> rdd1.aggregate(0)(max(_,_),_+_)
res5: Int = 7
scala> rdd1.aggregate(10)(max(_,_),_+_)
res6: Int = 30
//第一个分区数据:10(初始值),1,2————10
//第二个分区:10(初始值),3,4,5————10
//求和:10(初始值)+10+10=30

//第一个分区的最大值:2
//二:5
//求和:2+5=7

scala> rdd1.aggregate(0)(_+_,_+_)
res7: Int = 15

scala> rdd1.aggregate(10)(_+_,_+_)
res8: Int = 45

//第一个分区:10+1+2=3
//第二个分区:10+3+4+5=12
//求和:10+3+12=15

//其它操作:</pre>
scala> var rdd1 = sc.parallelize(List("12","34","567","8901"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> def fun1(index:Int,iter:Iterator[String]):Iterator[String]={
| iter.toList.map(x => "[partID: "+index+",value:"+x+"]").iterator}
fun1: (index: Int, iter: Iterator[String])Iterator[String]

scala> rdd1.mapPartitionsWithIndex(fun1).collect
res1: Array[String] = Array(
[partID: 0,value:12], [partID: 0,value:34],
[partID: 1,value:567], [partID: 1,value:8901])

scala> rdd1.aggregate("")((x,y)=>math.max(x.length,y.length).toString,(x,y)=>x+y)

scala> rdd1.aggregate("")((x,y)=>math.max(x.length,y.length).toString,(x,y)=>x+y)
res3: String = 42

scala> rdd1.aggregate("")((x,y)=>math.max(x.length,y.length).toString,(x,y)=>x+y)
res4: String = 24

分析:
第一个分区:“12”,“34”
第一次比较:“”,“12”=2.toString ==》 “2”
第二次比较:“2”,“34”=2.toString ==》 “2”
第二个分区:“567”,“8901”
第一次比较:“”,“567”=3.toString ==》“3”
第二次比较:“3”,“8901”=4.toString ==》 “4”
“24”或者“42”

scala> var rdd1 = sc.parallelize(List("12","23","345",""),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> rdd1.mapPartitionsWithIndex(fun1).collect
res6: Array[String] = Array([partID: 0,value:12], [partID: 0,value:23], [partID: 1,value:345], [partID: 1,value:])

scala> rdd1.aggregate("")((x,y)=>math.min(x.length,y.length).toString,(x,y)=>x+y)
res7: String = 10

scala> rdd1.aggregate("")((x,y)=>math.min(x.length,y.length).toString,(x,y)=>x+y)
res9: String = 01

分析:
第一个分区:“12”,“34”
第一次比较:“”,“12”=0.toString ==》 “0”
第二次比较:“0”,“34”=1.toString ==》 “1”
第二个分区:“345”,“”
第一次比较:“”,“345”=0.toString ==》“0”
第二次比较:“0”,“”=0.toString ==》 “0”
“10”或者“01”

scala> var rdd1 = sc.parallelize(List("12","23","","345"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> rdd1.mapPartitionsWithIndex(fun1).collect
res10: Array[String] = Array([partID: 0,value:12], [partID: 0,value:23], [partID: 1,value:], [partID: 1,value:345])

scala> rdd1.aggregate("")((x,y)=>math.min(x.length,y.length).toString,(x,y)=>x+y)
res11: String = 11

分析:
第一个分区:“12”,“34”
第一次比较:“”,“12”=0.toString ==》 “0”
第二次比较:“0”,“34”=1.toString ==》 “1”
第二个分区:“”,“345”
第一次比较:“”,“”=0.toString ==》“0”
第二次比较:“0”,“345”=1.toString ==》 “1”
“11”

 

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: