欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

Spark組件SparkSQL的實(shí)例分析

Spark組件Spark SQL的實(shí)例分析,很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來(lái)學(xué)習(xí)下,希望你能有所收獲。

創(chuàng)新互聯(lián)長(zhǎng)期為上千余家客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開(kāi)放共贏平臺(tái),與合作伙伴共同營(yíng)造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為雞冠企業(yè)提供專業(yè)的成都做網(wǎng)站、成都網(wǎng)站設(shè)計(jì),雞冠網(wǎng)站改版等技術(shù)服務(wù)。擁有十余年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開(kāi)發(fā)。

Spark SQL是一個(gè)用來(lái)處理結(jié)構(gòu)化數(shù)據(jù)的Spark組件,前身是shark,但是shark過(guò)多的依賴于hive如采用hive的語(yǔ)法解析器、查詢優(yōu)化器等,制約了Spark各個(gè)組件之間的相互集成,因此Spark SQL應(yīng)運(yùn)而生。Spark SQL在汲取了shark諸多優(yōu)勢(shì)如內(nèi)存列存儲(chǔ)、兼容hive等基礎(chǔ)上,做了重新的構(gòu)造,因此也擺脫了對(duì)hive的依賴,但同時(shí)兼容hive。除了采取內(nèi)存列存儲(chǔ)優(yōu)化性能,還引入了字節(jié)碼生成技術(shù)、CBO和RBO對(duì)查詢等進(jìn)行動(dòng)態(tài)評(píng)估獲取最優(yōu)邏輯計(jì)劃、物理計(jì)劃執(zhí)行等。基于這些優(yōu)化,使得Spark SQL相對(duì)于原有的SQL on Hadoop技術(shù)在性能方面得到有效提升。同時(shí),Spark SQL支持多種數(shù)據(jù)源,如JDBC、HDFS、HBase。它的內(nèi)部組件,如SQL的語(yǔ)法解析器、分析器等支持重定義進(jìn)行擴(kuò)展,能更好的滿足不同的業(yè)務(wù)場(chǎng)景。與Spark Core無(wú)縫集成,提供了DataSet/DataFrame的可編程抽象數(shù)據(jù)模型,并且可被視為一個(gè)分布式的SQL查詢引擎。Spark組件Spark SQL的實(shí)例分析 DataSet/DataFrameDataSet/DataFrame都是Spark SQL提供的分布式數(shù)據(jù)集,相對(duì)于RDD而言,除了記錄數(shù)據(jù)以外,還記錄表的schema信息。  
DataSet是自Spark1.6開(kāi)始提供的一個(gè)分布式數(shù)據(jù)集,具有RDD的特性比如強(qiáng)類型、可以使用強(qiáng)大的lambda表達(dá)式,并且使用Spark SQL的優(yōu)化執(zhí)行引擎。DataSet API支持Scala和Java語(yǔ)言,不支持Python。但是鑒于Python的動(dòng)態(tài)特性,它仍然能夠受益于DataSet API(如,你可以通過(guò)一個(gè)列名從Row里獲取這個(gè)字段 row.columnName),類似的還有R語(yǔ)言。

DataFrame是DataSet以命名列方式組織的分布式數(shù)據(jù)集,類似于RDBMS中的表,或者R和Python中的 data frame。DataFrame API支持Scala、Java、Python、R。在Scala API中,DataFrame變成類型為Row的Dataset:

type DataFrame = Dataset[Row]。

DataFrame在編譯期不進(jìn)行數(shù)據(jù)中字段的類型檢查,在運(yùn)行期進(jìn)行檢查。但DataSet則與之相反,因?yàn)樗菑?qiáng)類型的。此外,二者都是使用catalyst進(jìn)行sql的解析和優(yōu)化。為了方便,以下統(tǒng)一使用DataSet統(tǒng)稱。  
DataSet創(chuàng)建  
DataSet通常通過(guò)加載外部數(shù)據(jù)或通過(guò)RDD轉(zhuǎn)化創(chuàng)建。1.加載外部數(shù)據(jù)

以加載json和MySQL為例:

val ds = sparkSession.read.json("/路徑/people.json")

val ds = sparkSession.read.format("jdbc")
.options(Map("url" -> "jdbc:mysql://ip:port/db",
"driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> "tableName", "user" -> "root", "root" -> "123")).load()

2.RDD轉(zhuǎn)換為DataSet通過(guò)RDD轉(zhuǎn)化創(chuàng)建DataSet,關(guān)鍵在于為RDD指定schema,通常有兩種方式(偽代碼):  
1.定義一個(gè)case class,利用反射機(jī)制來(lái)推斷

1) 從HDFS中加載文件為普通RDD
val lineRDD = sparkContext.textFile("hdfs://ip:port/person.txt").map(_.split(" "))

2) 定義case class(相當(dāng)于表的schema)
case class Person(id:Int, name:String, age:Int)

3) 將RDD和case class關(guān)聯(lián)
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))

4) 將RDD轉(zhuǎn)換成DataFrame
val ds= personRDD.toDF

2.手動(dòng)定義一個(gè)schema StructType,直接指定在RDD上

val schemaString ="name age"

val schema =  StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

val rowRdd = peopleRdd.map(p=>Row(p(0),p(1)))

val ds = sparkSession.createDataFrame(rowRdd,schema)
操作DataSet的兩種風(fēng)格語(yǔ)法  
DSL語(yǔ)法 1.查詢DataSet部分列中的內(nèi)容personDS.select(col("name"))personDS.select(col("name"), col("age"))2.查詢所有的name和age和salary,并將salary加1000personDS.select(col("name"), col("age"), col("salary") + 1000)personDS.select(personDS("name"), personDS("age"), personDS("salary") + 1000)3.過(guò)濾age大于18的personDS.filter(col("age") > 18)4.按年齡進(jìn)行分組并統(tǒng)計(jì)相同年齡的人數(shù)personDS.groupBy("age").count()

注意:直接使用col方法需要import org.apache.spark.sql.functions._

SQL語(yǔ)法如果想使用SQL風(fēng)格的語(yǔ)法,需要將DataSet注冊(cè)成表personDS.registerTempTable("person")

//查詢年齡最大的前兩名

val result = sparkSession.sql("select * from person order by age desc limit 2")//保存結(jié)果為json文件。注意:如果不指定存儲(chǔ)格式,則默認(rèn)存儲(chǔ)為parquet  
result.write.format("json").save("hdfs://ip:port/res2") Spark SQL的幾種使用方式1.sparksql-shell交互式查詢就是利用Spark提供的shell命令行執(zhí)行SQL2.編程

首先要獲取Spark SQL編程"入口":SparkSession(當(dāng)然在早期版本中大家可能更熟悉的是SQLContext,如果是操作hive則為HiveContext)。這里以讀取parquet為例:

val spark = SparkSession.builder()  
.appName("example").master("local[*]").getOrCreate();val df = sparkSession.read.format("parquet").load("/路徑/parquet文件")然后就可以針對(duì)df進(jìn)行業(yè)務(wù)處理了。 3.Thriftserverbeeline客戶端連接操作啟動(dòng)spark-sql的thrift服務(wù),sbin/start-thriftserver.sh,啟動(dòng)腳本中配置好Spark集群服務(wù)資源、地址等信息。然后通過(guò)beeline連接thrift服務(wù)進(jìn)行數(shù)據(jù)處理。hive-jdbc驅(qū)動(dòng)包來(lái)訪問(wèn)spark-sql的thrift服務(wù)

在項(xiàng)目pom文件中引入相關(guān)驅(qū)動(dòng)包,跟訪問(wèn)mysql等jdbc數(shù)據(jù)源類似。示例:

Class.forName("org.apache.hive.jdbc.HiveDriver")
val conn = DriverManager.getConnection("jdbc:hive2://ip:port", "root", "123");
try {
 val stat = conn.createStatement()
 val res = stat.executeQuery("select * from people limit 1")
 while (res.next()) {
   println(res.getString("name"))
 }
} catch {
 case e: Exception => e.printStackTrace()
} finally{
 if(conn!=null) conn.close()
}

Spark SQL 獲取Hive數(shù)據(jù)

Spark SQL讀取hive數(shù)據(jù)的關(guān)鍵在于將hive的元數(shù)據(jù)作為服務(wù)暴露給Spark。除了通過(guò)上面thriftserver jdbc連接hive的方式,也可以通過(guò)下面這種方式:  
首先,配置 $HIVE_HOME/conf/hive-site.xml,增加如下內(nèi)容:

<property>

<name>hive.metastore.uris</name>

<value>thrift://ip:port</value>

</property>然后,啟動(dòng)hive metastore

最后,將hive-site.xml復(fù)制或者軟鏈到$SPARK_HOME/conf/。如果hive的元數(shù)據(jù)存儲(chǔ)在mysql中,那么需要將mysql的連接驅(qū)動(dòng)jar包如mysql-connector-java-5.1.12.jar放到$SPARK_HOME/lib/下,啟動(dòng)spark-sql即可操作hive中的庫(kù)和表。而此時(shí)使用hive元數(shù)據(jù)獲取SparkSession的方式為:

val spark = SparkSession.builder()

.config(sparkConf).enableHiveSupport().getOrCreate()UDF、UDAF、AggregatorUDFUDF是最基礎(chǔ)的用戶自定義函數(shù),以自定義一個(gè)求字符串長(zhǎng)度的udf為例:  
val udf_str_length = udf{(str:String) => str.length}
spark.udf.register("str_length",udf_str_length)
val ds =sparkSession.read.json("路徑/people.json")
ds.createOrReplaceTempView("people")
sparkSession.sql("select str_length(address) from people")
UDAF  
定義UDAF,需要繼承抽象類UserDefinedAggregateFunction,它是弱類型的,下面的aggregator是強(qiáng)類型的。以求平均數(shù)為例:  
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._

object MyAverage extends UserDefinedAggregateFunction {
 // Data types of input arguments of this aggregate function
 def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
 // Data types of values in the aggregation buffer
 def bufferSchema: StructType = {
   StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
 }
 // The data type of the returned value
 def dataType: DataType = DoubleType
 // Whether this function always returns the same output on the identical input
 def deterministic: Boolean = true
 // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
 // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
 // the opportunity to update its values. Note that arrays and maps inside the buffer are still
 // immutable.
 def initialize(buffer: MutableAggregationBuffer): Unit = {
   buffer(0) = 0L
   buffer(1) = 0L
 }
 // Updates the given aggregation buffer `buffer` with new input data from `input`
 def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
   if (!input.isNullAt(0)) {
     buffer(0) = buffer.getLong(0) + input.getLong(0)
     buffer(1) = buffer.getLong(1) + 1
   }
 }
 // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
 def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
   buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
   buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
 }
 // Calculates the final result
 def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}

// Register the function to access it
spark.udf.register("myAverage", MyAverage)

val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()

Aggregator

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator

case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

object MyAverage extends Aggregator[Employee, Average, Double] {
 // A zero value for this aggregation. Should satisfy the property that any b + zero = b
 def zero: Average = Average(0L, 0L)
 // Combine two values to produce a new value. For performance, the function may modify `buffer`
 // and return it instead of constructing a new object
 def reduce(buffer: Average, employee: Employee): Average = {
   buffer.sum += employee.salary
   buffer.count += 1
   buffer
 }
 // Merge two intermediate values
 def merge(b1: Average, b2: Average): Average = {
   b1.sum += b2.sum
   b1.count += b2.count
   b1
 }
 // Transform the output of the reduction
 def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
 // Specifies the Encoder for the intermediate value type
 def bufferEncoder: Encoder[Average] = Encoders.product
 // Specifies the Encoder for the final output value type
 def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
Spark SQL與Hive的對(duì)比  
Spark組件Spark SQL的實(shí)例分析

看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對(duì)創(chuàng)新互聯(lián)的支持。

網(wǎng)頁(yè)標(biāo)題:Spark組件SparkSQL的實(shí)例分析
URL地址:http://aaarwkj.com/article40/ispeho.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)頁(yè)設(shè)計(jì)公司、網(wǎng)站設(shè)計(jì)公司、動(dòng)態(tài)網(wǎng)站、響應(yīng)式網(wǎng)站搜索引擎優(yōu)化、靜態(tài)網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

搜索引擎優(yōu)化
国产一级二级三级黄色| 一本久久综合亚洲鲁鲁五月天| 久久99精品人妻一区二区三区| 欧美一区二区三区高清正版| 亚洲女久久久噜噜噜综合| 妇女人妻丰满少妇中文字幕| 日本一区二区电影大全| 久久熟女av一区二区三区| 色哟哟91精品色哟哟| 日韩精品色av一区二区| 91成人大片在线观看| 国产三级黄色片免费看| 蜜臀av在线精品国自产拍| 亚洲天堂,男人的天堂| 未满十八禁止在线播放| 欧美日韩另类综合久久久| 日韩视频一区二区三区四区| 亚洲一区二区精品天堂| 99精品国产麻豆一区二区三区| 欧美日韩福利一区二区三区| 在线观看高清国产黄色片| 亚洲av乱码国产精品观看| 久久精品国产免费夜夜嗨| 亚洲天堂免费在线播放| 中文字幕av在线日韩| 三级黄色片免费久久久| 未满十八禁止观看免费| 蜜桃视频在线观看视频免费| 91日本在线免费观看视频| 欧美久久久久综合一区| 人妻少妇亚洲精品视频| 2004年亚洲中文字幕| 91精品国产综合久久男男 | 国产三级三级三级三级| 国产精品自拍国产精品| 亚洲综合偷拍日韩av| 大片天天看菲色亚洲黄色| 欧美日韩国产亚洲免费| 国产在线乱码一区二区| 国产av不卡精品影片| 日韩欧美国产亚洲在线|