參考IntAccumulatorParam的實現(xiàn)思路(上述文章中有講):
網(wǎng)站建設哪家好,找創(chuàng)新互聯(lián)!專注于網(wǎng)頁設計、網(wǎng)站建設、微信開發(fā)、小程序設計、集團企業(yè)網(wǎng)站建設等服務項目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了桂林免費建站歡迎大家使用!
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
def addAccumulator(t1: T, t2: T): T = {
// addInPlace有很多具體的實現(xiàn)類
// 如果想要實現(xiàn)自定義的話,就得實現(xiàn)這個方法
addInPlace(t1, t2)
}
}
自定義也可以通過這個方法去實現(xiàn),從而兼容我們自定義的累加器
**
* 自定義的AccumulatorParam
*
* Created by lemon on 2018/7/28.
*/
object UniqueKeyAccumulator extends AccumulatorParam[Map[Int, Int]] {
override def addInPlace(r1: Map[Int, Int], r2: Map[Int, Int]): Map[Int, Int] = {
// ++用于兩個集合相加
r1++r2
}
override def zero(initialValue: Map[Int, Int]): Map[Int, Int] = {
var data: Map[Int, Int] = Map()
data
}
}
/**
* 使用自定義的累加器,實現(xiàn)隨機數(shù)
*
* Created by lemon on 2018/7/28.
*/
object CustomAccumulator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("CustomAccumulator").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val uniqueKeyAccumulator = sc.accumulable(Map[Int, Int]())(UniqueKeyAccumulator)
val distData = sc.parallelize(1 to 10)
val mapCount = distData.map(x => {
val randomNum = new Random().nextInt(20)
// 構(gòu)造一個k-v對
val map: Map[Int, Int] = Map[Int, Int](randomNum -> randomNum)
uniqueKeyAccumulator += map
})
println(mapCount.count())
// 獲取到累加器的值 中的key值,并進行打印
uniqueKeyAccumulator.value.keys.foreach(println)
sc.stop()
}
}
運行結(jié)果如下圖:## 思路 & 需求
參考IntAccumulatorParam的實現(xiàn)思路(上述文章中有講):
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
def addAccumulator(t1: T, t2: T): T = {
// addInPlace有很多具體的實現(xiàn)類
// 如果想要實現(xiàn)自定義的話,就得實現(xiàn)這個方法
addInPlace(t1, t2)
}
}
自定義也可以通過這個方法去實現(xiàn),從而兼容我們自定義的累加器
**
* 自定義的AccumulatorParam
*
* Created by lemon on 2018/7/28.
*/
object UniqueKeyAccumulator extends AccumulatorParam[Map[Int, Int]] {
override def addInPlace(r1: Map[Int, Int], r2: Map[Int, Int]): Map[Int, Int] = {
// ++用于兩個集合相加
r1++r2
}
override def zero(initialValue: Map[Int, Int]): Map[Int, Int] = {
var data: Map[Int, Int] = Map()
data
}
}
/**
* 使用自定義的累加器,實現(xiàn)隨機數(shù)
*
* Created by lemon on 2018/7/28.
*/
object CustomAccumulator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("CustomAccumulator").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val uniqueKeyAccumulator = sc.accumulable(Map[Int, Int]())(UniqueKeyAccumulator)
val distData = sc.parallelize(1 to 10)
val mapCount = distData.map(x => {
val randomNum = new Random().nextInt(20)
// 構(gòu)造一個k-v對
val map: Map[Int, Int] = Map[Int, Int](randomNum -> randomNum)
uniqueKeyAccumulator += map
})
println(mapCount.count())
// 獲取到累加器的值 中的key值,并進行打印
uniqueKeyAccumulator.value.keys.foreach(println)
sc.stop()
}
}
運行結(jié)果如下圖:
網(wǎng)頁名稱:生產(chǎn)常用Spark累加器剖析之三(自定義累加器)
分享路徑:http://aaarwkj.com/article8/psojip.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供微信公眾號、ChatGPT、、面包屑導航、定制網(wǎng)站、移動網(wǎng)站建設
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)