欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

如何解析SparkSQL外部數(shù)據(jù)源

這期內(nèi)容當中小編將會給大家?guī)碛嘘P(guān)如何解析SparkSQL外部數(shù)據(jù)源,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

成都創(chuàng)新互聯(lián)公司是創(chuàng)新、創(chuàng)意、研發(fā)型一體的綜合型網(wǎng)站建設(shè)公司,自成立以來公司不斷探索創(chuàng)新,始終堅持為客戶提供滿意周到的服務(wù),在本地打下了良好的口碑,在過去的十多年時間我們累計服務(wù)了上千家以及全國政企客戶,如成都自拌料攪拌車等企業(yè)單位,完善的項目管理流程,嚴格把控項目進度與質(zhì)量監(jiān)控加上過硬的技術(shù)實力獲得客戶的一致贊美。

場景介紹:

    大數(shù)據(jù)MapReduce,Hive,Spark作業(yè),首先需要加載數(shù)據(jù),數(shù)據(jù)的存放源可能是HDFS、HBase、S3、OSS MongoDB;數(shù)據(jù)格式也可能為json、text、csv、parquet、jdbc..或者數(shù)據(jù)格式經(jīng)過壓縮,不同格式文件需要不同的解析方式,

    如果需要HDFS關(guān)聯(lián)MySQL數(shù)據(jù),可以通過sqoop進行一些列轉(zhuǎn)換到,如果使用External Data Source API直接加載為DF拿到數(shù)據(jù),簡單的說可以通過SparkSQL拿到外部數(shù)據(jù)源數(shù)據(jù)加載成DF。

加載方式:

build-in :內(nèi)置加載外部數(shù)據(jù)如 json、text、parquet、jdbc、HDFS;

third-party:第三方加載外部數(shù)據(jù)如HBase、S3、OSS mongoDB

    第三方JAR地址:https://spark-packages.org/  

    Maven工程需要導入gav

    spark-shell:需要外部導入--package g:a:v  

    SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0

        優(yōu)勢:下載依賴包到本地

缺點:內(nèi)網(wǎng)環(huán)境沒有網(wǎng)絡(luò)無法下載

一、外部數(shù)據(jù)源讀取parquet文件:

Spark context Web UI available at http://hadoop001:4040

Spark context available as 'sc' (master = local[2], app id = local-1536244013147).

Spark session available as 'spark'.

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 2.3.1

      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)

Type in expressions to have them evaluated.

Type :help for more information.

scala> spark.read.load("file:///home/hadoop/app/spark--bin-2.6.0-cdh6.7.0/examples/src/main/resources/people.txt").show

提示錯誤:/people.txt is not a Parquet file

注意:spark.read.load()底層默認讀取Parquet file

scala> spark.read.load("file:///home/hadoop/app/spark--bin-2.6.0-cdh6.7.0/examples/src/main/resources/users.parquet").show

18/09/06 10:32:29 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException

+------+--------------+----------------+                                        
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

scala> val users = spark.read.load("file:///home/hadoop/app/spark--bin-2.6.0-cdh6.7.0/examples/src/main/resources/users.parquet")

users: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

scala> users.printSchema

root

 |-- name: string (nullable = true)

 |-- favorite_color: string (nullable = true)

 |-- favorite_numbers: array (nullable = true)

 |    |-- element: integer (containsNull = true)

scala> users.show

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

-- 查看列,常規(guī)操作

scala> users.select("name").show

+------+
|  name|
+------+
|Alyssa|
|   Ben|
+------+

二、轉(zhuǎn)換操作

-- 轉(zhuǎn)成json格式輸出

scala> users.select("name","favorite_color").write.format("json").save("file:////home/hadoop/data/parquet/")

[hadoop@hadoop001 parquet]$ cat *
{"name":"Alyssa"}
{"name":"Ben","favorite_color":"red"}

-- 不采取壓縮

.option("compression","none")  

-- 轉(zhuǎn)成text格式輸出

scala> users.select("name").write.format("text").save("file:////home/hadoop/data/parquet2/")

[hadoop@hadoop001 parquet2]$ cat *

Alyssa

-- Save Modes

用法:.mode("")

1、default  -- 目標目錄存在,拋出異常

2、append   -- 目標目錄存在,重跑數(shù)據(jù)+1,無法保證數(shù)據(jù)冪等

3、overwrite-- 目標目錄存在,覆蓋原文件

4、ignore-- 忽略你的模式,目標純在將不保存

三、spark-shell操作JDBC數(shù)據(jù)

-- 讀取外部MySQL數(shù)據(jù)為DF

val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/ruozedata").option("driver","com.mysql.jdbc.Driver").option("dbtable", "city_info").option("user","root").option("password", "root").load()

-- 查看表信息

jdbcDF.show()

-- 獲取本地數(shù)據(jù) 

val deptDF = spark.table("dept") 

-- join關(guān)聯(lián)使用

deptDF.join(jdbcDF,deptDF.col("deptid") === jdbcDF.col("deptid"))

-- DF寫入MySQL本地,數(shù)據(jù)類型有變化,重復(fù)寫入需要加上.mode("overwrite")

jdbcDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/hive_data").option("driver","com.mysql.jdbc.Driver").option("dbtable", "city_info_bak").option("user","root").option("password", "root").save()

mysql> show tables

+---------------------------+
| Tables_in_hive_data       |
+---------------------------+
| bucketing_cols            |
| cds                       |
| city_info_bak             |
+---------------------------+

-- 如果想類型不發(fā)生變化指定option指定字段類型

.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")

四、spark-sql操作JDBC數(shù)據(jù)

-- SQL創(chuàng)建臨時表視圖,單session

CREATE TEMPORARY VIEW emp_sql
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://hadoop001:3306/ruozedata",
  dbtable "city_info",
  user 'root',
  password 'root'
)

show tbales;

INSERT INTO TABLE emp_sql

SELECT * FROM emp_sql

五、Perdicate Push Down(PPD)

               disk         network                  CPU

外部數(shù)據(jù)外(1T)------->獲取本地磁盤(1T)---------->提交到集群(1T)--------->結(jié)果(1G)

               disk        network                   CPU

外部數(shù)據(jù)外(1T)------->經(jīng)過列裁剪(10G)----------->提交到集群(10G)----------->傳結(jié)果(1g)

               disk          CPU                 network

外部數(shù)據(jù)外(1T)------->經(jīng)過列裁剪(10G)---------->進過計算(1G)----------->傳輸結(jié)果

六、SparkSQL外部數(shù)據(jù)源實現(xiàn)機制

-- 0.有效的讀取外部數(shù)據(jù)源的數(shù)據(jù)的

-- 1.buildScan掃描整張表,變成一個RDD[ROW]

trait TableScan {

def buildScan(): RDD[Row]  

}

-- 2.PrunedScan獲取表的裁剪列

trait PrunedScan {

def buildScan(requiredColumns: Array[String]): RDD[Row] 

-- 3.PrunedFilteredScan列裁剪,行過濾

trait PrunedFilteredScan {

def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]

-- 4.加載外部數(shù)據(jù)源的數(shù)據(jù),定義數(shù)據(jù)的schema信息

abstract class BaseRelation{

-- 5.Relation產(chǎn)生BaseRelation使用

trait RelationProvider

}

總歸:

-- 查詢類操作

trait PrunedScan {

  def buildScan(requiredColumns: Array[String]): RDD[Row]

}  

-- 列裁剪,行過濾

trait PrunedFilteredScan {

  def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]

}  

-- 寫入類操作

trait InsertableRelation {

  def insert(data: DataFrame, overwrite: Boolean): Unit

}

上述就是小編為大家分享的如何解析SparkSQL外部數(shù)據(jù)源了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

文章題目:如何解析SparkSQL外部數(shù)據(jù)源
鏈接地址:http://aaarwkj.com/article16/godsgg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供用戶體驗網(wǎng)站設(shè)計公司、外貿(mào)網(wǎng)站建設(shè)軟件開發(fā)、網(wǎng)站改版、定制網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

微信小程序開發(fā)
中文字幕av在线有码| 中文字幕成人资源网站| 亚洲限制级电影一区二区| 国产三级网站在线观看播放| 国产夫妻一区二区三区| 欧美成人精品欧美一级黄片| 日韩在线视频精品一区| 久久99精品久久久子伦| 精品国产女同一区二区| 国产日韩一区二区三区电影| 欧美日韩亚洲一区二区搜索| 亚洲黄色成人在线观看| 欧美v日韩v亚洲综合国产高清| 欧美二区三区精品在线| 亚洲第六页亚洲第一页| 色琪琪原网另类欧美日韩| 欧美日韩精品人妻一区| 国产黄色大片在线关看| 中文字幕熟女人妻另类癖好| jk黑丝白丝国产精品| 国产激情av网站在线观看| 亚洲三级伦理中文字幕| 2004年亚洲中文字幕| 亚洲精品一区二区三区网站| 高清一区高清二区高清三区| 久久精品高潮999久久久| 亚洲国产精品热久久网站| 久久熟女av一区二区三区| 亚洲av毛片一区二区三区网| 免费在线观看av不卡| 欧美福利免费在线视频| 亚州欧美精品一区二区| 成人av久久一区二区三区| 日本不卡免费一区二区视频| 国产精品国产精品三级在线观看 | 欧美精品黄片免费在线观看| 日本最新一区二区三区视频| 亚洲精品乱码精品乱码不卡| 日本高清区一区二区三区四区五区| 日韩精品人妻一区二区三区蜜桃臀| 91国产性感美女视频|