发现个好东西: 一个免费短链生成项目:“ https://www.isok.co”一个可以生成所有短链的服务,isok.co的这个含义能覆盖所有短链含义isok, 我试过了很不错,都可以试试
scala> var rdd1 = sc.textFile("hdfs://bigdata111:9000/spark/test_Cache.txt")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://bigdata166:9000/spark/test_Cache.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> sc.setCheckpointDir("hdfs://bigdata166:9000/spark/")
scala> sc.setCheckpointDir("hdfs://bigdata166:9000/spark/checkpoint")
scala> rdd1.count
scala> rdd1.cache //缓存
scala> rdd1.count
res2: Long = 1846904
//
scala> def fun1(index:Int,itea:Iterator[Int]):Iterator[String] = {
| itea.toList.map(x => "[partid:" + index +", value="+x+"]" ).iterator
| }
fun1: (index: Int, itea: Iterator[Int])Iterator[String]
scala> rdd1.mapPartitions
mapPartitions mapPartitionsWithIndex
scala> rdd1.mapPartitionsWithIndex(fun1).collect
res3: Array[String] = Array(
[partid:0, value=1], [partid:0, value=2], [partid:0, value=3],
[partid:1, value=4], [partid:1, value=5], [partid:1, value=6],
[partid:2, value=7], [partid:2, value=8], [partid:2, value=9])
:聚合,
先局部聚合,然后全局聚合
求每个分区的最大值,然后求和
现求每个分区的最大值
求和
zeroValue: U:初始化,需要赋值:初始值在局部操作起作用,全局操作也起作用
后面两个函数参数
第一个函数:表示局部操作
第二个:表示全局操作
scala> var rdd1 = sc.parallelize(List(1,2,3,4,5),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> rdd1.mapPartitionsWithIndex(fun1).collect
res4: Array[String] = Array(
[partid:0, value=1], [partid:0, value=2],
[partid:1, value=3], [partid:1, value=4], [partid:1, value=5])
scala> rdd1.aggregate(0)(max(_,_),_+_)
res5: Int = 7
scala> rdd1.aggregate(10)(max(_,_),_+_)
res6: Int = 30
//第一个分区数据:10(初始值),1,2————10
//第二个分区:10(初始值),3,4,5————10
//求和:10(初始值)+10+10=30
//第一个分区的最大值:2
//二:5
//求和:2+5=7
scala> rdd1.aggregate(0)(_+_,_+_)
res7: Int = 15
scala> rdd1.aggregate(10)(_+_,_+_)
res8: Int = 45
//第一个分区:10+1+2=3
//第二个分区:10+3+4+5=12
//求和:10+3+12=15
//其它操作:</pre>
scala> var rdd1 = sc.parallelize(List("12","34","567","8901"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> def fun1(index:Int,iter:Iterator[String]):Iterator[String]={
| iter.toList.map(x => "[partID: "+index+",value:"+x+"]").iterator}
fun1: (index: Int, iter: Iterator[String])Iterator[String]
scala> rdd1.mapPartitionsWithIndex(fun1).collect
res1: Array[String] = Array(
[partID: 0,value:12], [partID: 0,value:34],
[partID: 1,value:567], [partID: 1,value:8901])
scala> rdd1.aggregate("")((x,y)=>math.max(x.length,y.length).toString,(x,y)=>x+y)
scala> rdd1.aggregate("")((x,y)=>math.max(x.length,y.length).toString,(x,y)=>x+y)
res3: String = 42
scala> rdd1.aggregate("")((x,y)=>math.max(x.length,y.length).toString,(x,y)=>x+y)
res4: String = 24
分析:
第一个分区:“12”,“34”
第一次比较:“”,“12”=2.toString ==》 “2”
第二次比较:“2”,“34”=2.toString ==》 “2”
第二个分区:“567”,“8901”
第一次比较:“”,“567”=3.toString ==》“3”
第二次比较:“3”,“8901”=4.toString ==》 “4”
“24”或者“42”
scala> var rdd1 = sc.parallelize(List("12","23","345",""),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> rdd1.mapPartitionsWithIndex(fun1).collect
res6: Array[String] = Array([partID: 0,value:12], [partID: 0,value:23], [partID: 1,value:345], [partID: 1,value:])
scala> rdd1.aggregate("")((x,y)=>math.min(x.length,y.length).toString,(x,y)=>x+y)
res7: String = 10
scala> rdd1.aggregate("")((x,y)=>math.min(x.length,y.length).toString,(x,y)=>x+y)
res9: String = 01
分析:
第一个分区:“12”,“34”
第一次比较:“”,“12”=0.toString ==》 “0”
第二次比较:“0”,“34”=1.toString ==》 “1”
第二个分区:“345”,“”
第一次比较:“”,“345”=0.toString ==》“0”
第二次比较:“0”,“”=0.toString ==》 “0”
“10”或者“01”
scala> var rdd1 = sc.parallelize(List("12","23","","345"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> rdd1.mapPartitionsWithIndex(fun1).collect
res10: Array[String] = Array([partID: 0,value:12], [partID: 0,value:23], [partID: 1,value:], [partID: 1,value:345])
scala> rdd1.aggregate("")((x,y)=>math.min(x.length,y.length).toString,(x,y)=>x+y)
res11: String = 11
分析:
第一个分区:“12”,“34”
第一次比较:“”,“12”=0.toString ==》 “0”
第二次比较:“0”,“34”=1.toString ==》 “1”
第二个分区:“”,“345”
第一次比较:“”,“”=0.toString ==》“0”
第二次比较:“0”,“345”=1.toString ==》 “1”
“11”
