這篇文章主要講解了“怎么用flink 1.11使sql客戶端支持執(zhí)行sql文件”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“怎么用flink 1.11使sql客戶端支持執(zhí)行sql文件”吧!
成都創(chuàng)新互聯(lián)公司主營白河網(wǎng)站建設的網(wǎng)絡公司,主營網(wǎng)站建設方案,app軟件開發(fā)公司,白河h5小程序制作搭建,白河網(wǎng)站營銷推廣歡迎白河等地區(qū)企業(yè)咨詢
目前flink的sql客戶端提供了一種交互式的sql查詢服務,用戶可以使用sql客戶端執(zhí)行一些sql的批任務或者流任務。但是當我想執(zhí)行一些sql的定時任務時,flink卻沒有提供一個合適的方式,所以綜合考慮了一下,我決定在sql的客戶端基礎上給加一個 '-filename (-f)' 參數(shù),就像類似'hive -f abc.sql' 一樣,可以執(zhí)行一批sql任務。
目前我只是想通過sql客戶端執(zhí)行一些批任務,再加上flink sql 客戶端本身的一些設計,所以目前修改后的sql client 執(zhí)行sql文件的時候支持 SET,DDL,INSERT INTO SELECT ...等語句,其他比如select暫不支持。
修改后執(zhí)行的方式為:
/home/flink/bin/sql-client.sh embedded -f flink.sql
在這個sql 客戶端參數(shù)解析類里添加一個選項,用于解析-f參數(shù)。
public static final Option OPTION_FILENAME = Option
.builder("f")
.required(false)
.longOpt("filename")
.numberOfArgs(1)
.argName("the path of the sql file")
.desc("SQL from files")
.build();
在這里添加一個變量filename
private final String filename;
在SqlClient里添加對于-filename的處理
if (options.getUpdateStatement() != null){
// execute update statement
final boolean success = cli.submitUpdate(options.getUpdateStatement());
if (!success) {
throw new SqlClientException("Could not submit given SQL update statement to cluster.");
}
} else if (options.getFilename() != null){
final boolean success = cli.executeFile(options.getFilename());
if (!success) {
throw new SqlClientException("Could not submit given SQL file to cluster.");
}
} else {
cli.open();
}
添加具體的執(zhí)行sql文件的方法,sql文件里的所有sql以分號切分,然后分別判斷是什么類型,調(diào)用不同的方法來執(zhí)行。
public boolean executeFile(String filename){
File file = new File(filename);
if (!file.exists()){
printError("the file do not exist");
return false;
} else {
String statement = null;
try {
statement = FileUtils.readFileToString(file);
} catch (IOException e){
printError("read the sql file error , " + e.getMessage());
return false;
}
String[] sqls = statement.split(";");
for (String sql : sqls){
if (sql == null || "".equals(sql.trim())){
continue;
}
final Optional<SqlCommandCall> parsedStatement = parseCommand(sql);
if (parsedStatement.isPresent()){
SqlCommandCall cmdCall = parsedStatement.get();
switch (cmdCall.command) {
case SET:
callSet(cmdCall);
break;
................
case INSERT_INTO:
case INSERT_OVERWRITE:
callInsert(cmdCall);
break;
case CREATE_TABLE:
callDdl(cmdCall.operands[0], CliStrings.MESSAGE_TABLE_CREATED);
break;
.....................
throw new SqlClientException("Unsupported command: " + cmdCall.command);
}
}
}
}
return true;
}
感謝各位的閱讀,以上就是“怎么用flink 1.11使sql客戶端支持執(zhí)行sql文件”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對怎么用flink 1.11使sql客戶端支持執(zhí)行sql文件這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關知識點的文章,歡迎關注!
名稱欄目:怎么用flink1.11使sql客戶端支持執(zhí)行sql文件
本文URL:http://aaarwkj.com/article14/peepge.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供動態(tài)網(wǎng)站、網(wǎng)站制作、域名注冊、企業(yè)網(wǎng)站制作、營銷型網(wǎng)站建設、App開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)