環(huán)境:
源端:oracle12.2 ogg for oracle 12.3
目標端:KAFKA ogg for bigdata 12.3
將oracle中的數(shù)據(jù)通過OGG同步到KAFKA
源端配置:
1、為要同步的表添加附加日志
dblogin USERID ogg@orclpdb, PASSWORD ogg
add trandata scott.tab1
add trandata scott.tab2
2、 添加抽取進程
GGSCI>add extract EXT_KAF1,integrated tranlog, begin now
GGSCI>add EXTTRAIL ./dirdat/k1, extract EXT_KAF1,MEGABYTES 200
編輯抽取進程參數(shù):
GGSCI> edit params EXT_KAF1
extract EXT_KAF1
userid c##ggadmin,PASSWORD ggadmin
LOGALLSUPCOLS
UPDATERECORDFORMAT COMPACT
exttrail ./dirdat/k1,FORMAT RELEASE 12.3
SOURCECATALOG orclpdb --(指定pdb)
table scott.tab1;
table scott.tab2;
注冊進程
GGSCI> DBLOGIN USERID c##ggadmin,PASSWORD ggadmin
GGSCI> register extract EXT_KAF1 database container (orclpdb)
3、添加投遞進程:
GGSCI>add extract PMP_KAF1, exttrailsource ./dirdat/k1
GGSCI>add rmttrail ./dirdat/f1,EXTRACT PMP_KAF1,MEGABYTES 200
編輯投遞進程參數(shù):
GGSCI>edit param PMP_KAF1
EXTRACT PMP_KAF1
USERID c##ggadmin,PASSWORD ggadmin
PASSTHRU
RMTHOST 10.1.1.247, MGRPORT 9178
RMTTRAIL ./dirdat/f1,format release 12.3
SOURCECATALOG orclpdb
TABLE scott.tab1;
table scott.tab2;
4、添加數(shù)據(jù)初始化進程(Oracle initial load) 可以多個表分開初始化也可以一起初始化,此處選擇分開初始化
GGSCI> add extract ek_01, sourceistable
編輯參數(shù):
GGSCI> EDIT PARAMS ek_01
EXTRACT ek_01
USERID c##ggadmin,PASSWORD ggadmin
RMTHOST 10.1.1.247, MGRPORT 9178
RMTFILE ./dirdat/ka,maxfiles 999, megabytes 500,format release 12.3
SOURCECATALOG orclpdb
table scott.tab1;
GGSCI> add extract ek_02, sourceistable
EDIT PARAMS ek_02
EXTRACT ek_02
USERID c##ggadmin,PASSWORD ggadmin
RMTHOST 10.1.1.247, MGRPORT 9178
RMTFILE ./dirdat/kb,maxfiles 999, megabytes 500,format release 12.3
SOURCECATALOG orclpdb
table scott.tab2;
5、生成def文件:
GGSCI> edit param defgen1
USERID c##ggadmin,PASSWORD ggadmin
defsfile /home/oracle/ogg/ggs12/dirdef/defgen1.def,format release 12.3
SOURCECATALOG orclpdb
table scott.tab1;
table scott.tab2;
在OGG_HOME下執(zhí)行如下命令生成def文件
defgen paramfile dirprm/defgen1.prm
將生成的def文件傳到目標端$OGG_HOME/dirdef下
目標端配置:
1、將$OGG_HOME/AdapterExamples/big-data/kafka下的所有文件copy到$OGG_HOME/dirprm下
cd $OGG_HOME/AdapterExamples/big-data/kafka
cp * $OGG_HOME/dirprm
2、將$ORACLE_HOME/AdapterExamples/trail下的文件tr000000000 copy到$OGG_HOME/dirdat下
cd $ORACLE_HOME/AdapterExamples/trail
cp tr000000000 $OGG_HOME/dirdat
3、添加初始化進程:(可以多表一起初始化也可以分開初始化,此處選擇單獨初始化)
GGSCI> ADD replicat rp_01, specialrun
GGSCI> EDIT PARAMS rp_01
SPECIALRUN
end runtime
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
targetdb libfile libggjava.so set property=./dirprm/kafka1.props
SOURCEDEFS ./dirdef/defgen1.def
EXTFILE ./dirdat/ka
reportcount every 1 minutes, rate
grouptransops 10000
MAP orclpdb.scott.tab1, TARGET scott.tab1;
GGSCI> ADD replicat rp_02, specialrun
GGSCI> EDIT PARAMS rp_02
SPECIALRUN
end runtime
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
targetdb libfile libggjava.so set property=./dirprm/kafka2.props
SOURCEDEFS ./dirdef/defgen1.def
EXTFILE ./dirdat/kb
reportcount every 1 minutes, rate
grouptransops 10000
MAP orclpdb.scott.tab2, TARGET scott.tab2;
4、添加恢復進程:
GGSCI>add replicat r_kaf1,exttrail ./dirdat/f1
GGSCI>edit params r_kaf1
REPLICAT r_kaf1
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
HANDLECOLLISIONS
targetdb libfile libggjava.so set property=./dirprm/kafka1.props
SOURCEDEFS ./dirdef/defgen1.def
reportcount every 1 minutes, rate
grouptransops 10000
MAP orclpdb.scott.tab1, TARGET scott.tab1;
GGSCI> add replicat r_kaf2,exttrail ./dirdat/f2
GGSCI> edit params r_kaf2
REPLICAT r_kaf2
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
HANDLECOLLISIONS
targetdb libfile libggjava.so set property=./dirprm/kafka2.props
SOURCEDEFS ./dirdef/defgen1.def
reportcount every 1 minutes, rate
grouptransops 10000
MAP orclpdb.scott.tab2, TARGET scott.tab2;
5、參數(shù)配置:
custom_kafka_producer.properties文件內(nèi)容如下:
bootstrap.servers=10.1.1.246:9200,10.1.1.247:9200 --只需要改動這一行就行,指定kafka的地址和端口號
acks=1
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=16384
linger.ms=10000
kafka1.props文件內(nèi)容如下:
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate= topic1
#gg.handler.kafkahandler.format=avro_op
gg.handler.kafkahandler.format =json --這里做了改動,指定格式為json格式
gg.handler.kafkahandler.format.insertOpKey=I
gg.handler.kafkahandler.format.updateOpKey=U
gg.handler.kafkahandler.format.deleteOpKey=D
gg.handler.kafkahandler.format.truncateOpKey=T
gg.handler.kafkahandler.format.prettyPrint=false
gg.handler.kafkahandler.format.jsonDelimiter=CDATA[]
gg.handler.kafkahandler.format.includePrimaryKeys=true --包含主鍵
gg.handler.kafkahandler.SchemaTopicName= topic1 --此處指定為要同步到的目標topic名字
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/ --指定classpath,這里很重要,必須有kafka安裝文件的類庫。
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
啟動進程進程恢復:
1、啟動源端抓取進程
GGSCI> start EXT_KAF1
2、啟動源端投遞進程
GGSCI> start PMP_KAF1
3、啟動源端初始化進程
GGSCI> start ek_01
4、啟動目標端初始化進程
在$OGG_HOME下執(zhí)行如下命令:
./replicat paramfile ./dirprm/rp_01.prm reportfile ./dirrpt/rp_01.rpt -p INITIALDATALOAD
5、啟動目標端恢復進程
GGSCI> start R_KAF1
遇到的錯誤:
1、ERROR OGG-15050 Error loading Java VM runtime library(2 no such file or directory)
原因:找不到類庫(配置好環(huán)境變量之后,OGG的mgr進程沒有重啟,導致的)
解決:重啟MGR進程
2、ERROR OG-15051 Java or JNI exception
原因:沒有使用ogg12.3.1.1.1自帶的kafka.props,而是copy了ogg12.2的kafka.props,導致出現(xiàn)異常。
解決:使用ogg12.3.1.1.1自帶的kafka.props,并指定相關(guān)的屬性,解決。
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)cdcxhl.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。
當前題目:OGG同步ORACLE數(shù)據(jù)到KAFKA-創(chuàng)新互聯(lián)
本文路徑:http://aaarwkj.com/article10/gccgo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站策劃、響應(yīng)式網(wǎng)站、網(wǎng)站維護、外貿(mào)建站、外貿(mào)網(wǎng)站建設(shè)、電子商務(wù)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容