使用ogg同步数据给kafka

技术使用ogg同步数据给kafka 使用ogg同步数据给kafka使用ogg同步数据给kafka两榜进士2017-06-20 14:55:3123011收藏9分类专栏:大数据-Kafka大数据-Kafk

使用ogg将数据同步到kafka。

使用ogg同步数据给kafka

二2017年6月20日学者名单14:5533603123011收藏9。

专栏:大数据-卡夫卡。

大数据-卡夫卡专栏包含此内容。

订阅5篇文章0。

订阅栏

为了更好的配合公司自然人项目的实施,我利用虚拟机搭建了一个测试环境,通过ogg软件将生产库中一些表的数据同步到kafka。

1测试环境描述。

1)客观。

源hr用户下的表t1,当dml操作发生时,操作数据被写入kafka集群并显示。

2)环境构成。

192.168.60.88 tdb1

192.168.60.89次

192.168.60.91 kafka01

192.168.60.92 kafka02

192.168.60.93 kafka03

Tdb1是源,Oracle数据库,版本10.2.0.4,sid:tdb。

Reps是一个接口服务器,用于为bigdata软件安装ogg,在源头接收ogg pump进程发送的数据,并将这些数据写入kafka集群。

卡夫卡01-03是卡夫卡的一个集群。

所有这些服务器的操作系统都是rhel 6.8。

Td,Kafka01-03的jdk版本是1.7.0.79。

reps的jdk版本是1.8.0.121,因为bigdata 12.2.xxx的ogg版本要求jdk版本在1.8以上,jdk 1.7会报错版本不足。

所有五台服务器都需要关闭防火墙和selinux。

2卡夫卡集群安装。

1)下载软件。

动物园管理员,3.4.10版本,文件名:zookeeper-3.4.10.tar.gz,下载地址:http://zookeeper.apache.org/releases.html.

Kafka,版本:2.10-0.10.2.0,文件名:kafka_2.10-0.10.2.0.tgz,下载地址:http://kafka.apache.org/downloads.

2)准备工作。

Kafka01-03这三台机器调整/etc/hosts文件。

[root@kafka01 ~]# cd /etc

[root@kafka01等]#卡特彼勒主机

127 . 0 . 0 . 1 localhost localhost . local domain localhost 4 localhost 4 . localdomain4

:1 localhost localhost . local domain localhost 6 localhost 6 . localdomain 6

192.168.60.91 kafka01

192.168.60.92 kafka02

192.168.60.93 kafka03

Kafka01-03都有一个叫Kafka的用户。这三台机器的kafka用户需要做到ssh互信,而怎么做都可以百度。

Kafka01主机,Kafka用户示例:

[kafka@kafka01 ~]$ pwd

/家/卡夫卡

[kafka@kafka01 ~]$ id

uid=500(卡夫卡)gid=501(卡夫卡)组=501(卡夫卡)

[kafka@kafka01 ~]$

[kafka@kafka01 ~]$ cat。bash_profile

# .bash_profile

#获取别名和函数

if [ -f ~/。bashrc];然后。~/.bashrc

船方不负担装货费用

#用户特定的环境和启动程序

路径=$PATH:$HOME/bin

导出路径

导出JAVA _ HOME=/usr/JAVA/JDK 1 . 7 . 0 _ 79

导出JRE_HOME=/usr/java/jdk1.7.0

_79/jre

  • export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH
  • export ZOOKEEPER_HOME=/home/kafaka/zookeeper
  • export PATH=$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$PATH
  • [kafka@kafka01 ~]$
  • 3)安装配置zookeeper
    如下操作在kafka01-03都要做,如下kafka01上做示例
    a.解压
    zookeeper-3.4.10.tar.gz解压后,目录重命名为:zookeeper,并挪到/home/kafka/下,效果如下:

    1. [kafka@kafka01 zookeeper]$ pwd
    2. /home/kafka/zookeeper
    3. [kafka@kafka01 zookeeper]$ ls
    4. bin conf data docs ivy.xml LICENSE.txt README_packaging.txt recipes zookeeper-3.4.10.jar zookeeper-3.4.10.jar.md5 zookeeper.out
    5. build.xml contrib dist-maven ivysettings.xml lib NOTICE.txt README.txt src zookeeper-3.4.10.jar.asc zookeeper-3.4.10.jar.sha1
    6. [kafka@kafka01 zookeeper]$

    b.配置zoo.cfg

    1. cd /home/kafka/ zookeeper
    2. cp zoo_sample.cfg zoo.cfg

    编辑zoo.cfg,内容如下:

    1. [kafka@kafka01 conf]$ pwd
    2. /home/kafka/zookeeper/conf
    3. zoo.cfg设置如下参数:
    4. dataDir=/home/kafka/zookeeper/data
    5. clientPort=2181
    6. server.1=kafka01:2888:3888
    7. server.2=kafka02:2888:3888
    8. server.3=kafka03:2888:3888

    c.设置节点标识

    1. cd /home/kafka/zookeeper
    2. mkdir data
    3. cd data
    4. vi myid
    5. 输入1
    6. [kafka@kafka01 data]$ pwd
    7. /home/kafka/zookeeper/data
    8. [kafka@kafka01 data]$ cat myid
    9. 1

    d.启动Zookeeper

    1. [kafka@kafka01 bin]$ pwd
    2. /home/kafka/zookeeper/bin
    3. ./zkServer.sh start
    4. 看状态:
    5. [kafka@kafka01 bin]$ ./zkServer.sh status
    6. ZooKeeper JMX enabled by default
    7. Using config: /home/kafka/zookeeper/bin/../conf/zoo.cfg
    8. Mode: follower
    9. 三台中一台Mode是leader,其余两台Mode为follower
    10. 排错:
    11. 如果没启动,可以使用./zkServer.sh start-foreground启动,屏幕上会显示日志信息,能看出哪块出了问题。

    4)安装配置kafka
    如下操作在kafka01-03都要做,kafka01上示例如下:
    a.解压
    kafka_2.10-0.10.2.0.tgz解压后,目录重命名为:kafka,并挪到/home/kafka/下,效果如下:

    1. [kafka@kafka02 kafka]$ pwd
    2. /home/kafka/kafka
    3. [kafka@kafka02 kafka]$ ls
    4. bin config libs LICENSE logs NOTICE site-docs

    b.修改Kafka Servre配置文件

    1. cd /home/kafka/kafka/config
    2. vi server.properties
    3. [kafka@kafka01 config]$ cat server.properties --注:不需改的条目去掉了
    4. broker.id=1 #kafka01为1,kafka02为2,kafka03为3
    5. host.name=kafka01 #按主机名相应调整
    6. listeners=PLAINTEXT://kafka01:9092 #按主机名相应调整
    7. advertised.listeners=PLAINTEXT://kafka01:9092 #按主机名相应调整
    8. log.dirs=/home/kafka/kafka/logs
    9. num.partitions=4

    c.后台启动kakfa
    在集群中的这三个节点上分别后台启动Kafka,分别执行如下命令:

    1. cd /home/kafka/kafka/bin
    2. ./kafka-server-start.sh/home/kafka/kafka/config/server.properties

    d.测试
    创建一个名称为oggtest的Topic,4个分区,并且复制因子为3:
    可以任意一节点

    1. ./kafka-topics.sh -create -zookeeper kafka01:2181,kafka02:2181,kafka03:2181 -replication-factor 3 -partitions 3 –topic oggtest

    查看创建的Topic,执行如下命令:
    可以任意一节点

    1. ./kafka-topics.sh -describe -zookeeper kafka01:2181,kafka02:2181,kafka03:2181 -topic oggtest
    2. 查看所有topic:
    3. ./kafka-topics.sh -describe -zookeeper kafka01:2181,kafka02:2181,kafka03:2181

    可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如何发布消息、消费消息:
    在一个终端,启动Producer,并向我们上面创建的名称为oggtest的Topic中生产消息,执行如下脚本:

    1. ./kafka-console-producer.sh -broker-list kafka01:9092,kafka02:9092,kafka03:9092 -topic oggtest

    在另一个终端,启动Consumer,并订阅我们上面创建的Topic中生产的消息:

    1. ./kafka-console-consumer.sh –zookeeperkafka01:2181,kafka02:2181,kafka03:2181 –from-beginning –topic oggtest

    如果kafka集群配置的没有问题,随便在producer窗口敲入一些信息回车后,在consumer窗口便能看到相应的消息

    3 ogg源端(sdb1主机)的安装配置
    1)准备工作
    a.ogg 软件
    软件名:Oracle GoldenGate V11.2.1.0.3 for Oracle 11g on Linux x86-64.zip,在https://edelivery.oracle.com可以下载到
    b.源数据库要开归档,置成force logging,开追加日志

    1. [oracle@tdb1 ~]$ sqlplus / as sysdba
    2. SQL archive log list
    3. Database log mode Archive Mode
    4. Automatic archival Enabled
    5. Archive destination /oracle/arc
    6. Oldest online log sequence 9
    7. Next log sequence to archive 11
    8. Current log sequence 11
    9. SQL
    10. SQL Select
    11. 2 SUPPLEMENTAL_LOG_DATA_MIN
    12. 3 ,SUPPLEMENTAL_LOG_DATA_PK
    13. 4 ,SUPPLEMENTAL_LOG_DATA_UI
    14. 5 ,SUPPLEMENTAL_LOG_DATA_ALL
    15. 6 , FORCE_LOGGING from v$database;
    16. SUPPLEME SUP SUP SUP FOR
    17. -------- --- --- --- ---
    18. YES NO NO NO YES
    19. SQL

    c.推荐添加一ogg用户,以避免对oracle用户的影响,放在oracle用户的主group,根据数据extract的量为ogg用户建立一个合适大小的文件系统
    ogg用户最终效果示例:

    1. [root@tdb1 ~]# su - ogg
    2. [ogg@tdb1 ~]$ id
    3. uid=501(ogg) gid=500(dba) groups=500(dba)
    4. [ogg@tdb1 ~]$
    5. [ogg@tdb1 ~]$ cat .bash_profile
    6. # .bash_profile
    7. # Get the aliases and functions
    8. if [ -f ~/.bashrc ]; then
    9. . ~/.bashrc
    10. fi
    11. # User specific environment and startup programs
    12. PATH=$PATH:$HOME/bin
    13. export PATH
    14. umask 022
    15. export ORACLE_BASE=/oracle/app/oracle
    16. export ORACLE_HOME=$ORACLE_BASE/product/10.2.0
    17. export ORACLE_SID=tdb
    18. export PATH=$ORACLE_HOME/bin:$PATH:.
    19. export NLS_LANG=AMERICAN_AMERICA.ZHS16GBK
    20. export NLS_DATE_FORMAT=YYYYMMDDHH24MISS
    21. export DISPLAY=192.168.60.1:0.0
    22. #ogg
    23. export GG_HOME=/ogg
    24. #export PATH=$PATH:$GG_HOME
    25. export LD_LIBRARY_PATH=/ogg:$ORACLE_HOME/lib
    26. [ogg@tdb1 ~]$
    27. [ogg@tdb1 ~]$ sqlplus / as sysdba
    28. SQL*Plus: Release 10.2.0.5.0 - Production on Thu Apr 13 11:10:59 2017
    29. Copyright (c) 1982, 2010, Oracle. All Rights Reserved.
    30. Connected to:
    31. Oracle Database 10g Enterprise Edition Release 10.2.0.5.0 - 64bit Production
    32. With the Partitioning, Data Mining and Real Application Testing options
    33. SQL

    d.数据库内建立ogg用户,并给其授权

    1. create tablespace ogg datafile '/oracle/oradata/tdb/ogg.dbf' size 1G;
    2. create user ogg identified by gg_888 default tablespace ogg;
    3. grant connect,resource to ogg;
    4. grant dba to ogg; --如果不做ddl trigger,dba权限可以不给
    5. GRANT CREATE SESSION TO ogg;
    6. GRANT ALTER SESSION TO ogg;
    7. GRANT SELECT ANY DICTIONARY TO ogg;
    8. GRANT SELECT ANY TABLE TO ogg;
    9. GRANT ALTER ANY TABLE TO ogg; --用户配置表级追加日志
    10. GRANT FLASHBACK ANY TABLE TO ogg;
    11. GRANT EXECUTE on DBMS_FLASHBACK package TO ogg;
    12. GRANT EXECUTE on DBMS_FLASHBACK TO ogg;
    13. GRANT EXECUTE ON utl_file TO ogg;
    14. execute DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE('OGG');
    15. grant execute on sys.dbms_lob to ogg;
    16. --如下pl/sql块是在oracle 11g之上版本用的,10g版本不需要执行
    17. BEGIN
    18. DBMS_GOLDENGATE_AUTH.GRANT_ADMIN_PRIVILEGE(
    19. Grantee = 'OGG',
    20. privilege_type = 'CAPTURE',
    21. grant_select_privileges = TRUE,
    22. do_grants = TRUE);
    23. END;
    24. /

    e.为了测试,我建立了一个hr用户,并在其下面建了一个t1的表

    1. -- Create table
    2. create table T1
    3. (
    4. id NUMBER not null,
    5. name VARCHAR2(100)
    6. )
    7. tablespace USERS;
    8. -- Create/Recreate primary, unique and foreign key constraints
    9. alter table T1
    10. add constraint PK_T1_ID primary key (ID)
    11. using index
    12. tablespace USERS;

    2)配置ogg
    a.解压缩ogg软件,放在$GG_HOME下
    效果类似如下:

    1. [ogg@tdb1 ogg]$ ls -l gg*
    2. -rwxr-x--- 1 ogg dba 6577392 Aug 24 2012 ggcmd
    3. -rw-r----- 1 ogg dba 1841 Apr 12 15:58 gglog-defgen.dmp
    4. -rw-r----- 1 ogg dba 1239 Apr 12 16:40 gglog-DPE_TEST-43680.dmp
    5. -rw-r----- 1 ogg dba 962 Apr 12 16:49 gglog-DPE_TEST-43782.dmp
    6. -rw-r----- 1 ogg dba 0 Apr 12 16:40 gglog-DPE_TEST.dmp
    7. -rw-r----- 1 ogg dba 1280336 Aug 24 2012 ggMessage.dat
    8. -rwxr-x--- 1 ogg dba 13899588 Aug 24 2012 ggsci
    9. -rw-rw-rw- 1 ogg dba 21819 Apr 13 08:47 ggserr.log
    10. [ogg@tdb1 ogg]$

    b.创建ogg相关子目录

    1. [ogg@tdb1 ogg]$ pwd
    2. /ogg
    3. [ogg@tdb1 ogg]$ ./ggsci
    4. GGSCIcreate subdirs

    ggsci报错处理:

    1. [ogg@tdb1 ogg]$ ggsci
    2. ggsci: error while loading shared libraries: libnnz11.so: cannot open shared object file: No such file or directory
    3. [ogg@tdb1 ogg]$ ldd ggsci
    4. linux-vdso.so.1 = (0x00007ffd3db73000)
    5. libdl.so.2 = /lib64/libdl.so.2 (0x00000035bbc00000)
    6. libgglog.so = /ogg/./libgglog.so (0x00007ff824130000)
    7. libggrepo.so = /ogg/./libggrepo.so (0x00007ff823fdc000)
    8. libdb-5.2.so = /ogg/./libdb-5.2.so (0x00007ff823d3b000)
    9. libicui18n.so.38 = /ogg/./libicui18n.so.38 (0x00007ff8239da000)
    10. libicuuc.so.38 = /ogg/./libicuuc.so.38 (0x00007ff8236a1000)
    11. libicudata.so.38 = /ogg/./libicudata.so.38 (0x00007ff8226c5000)
    12. libpthread.so.0 = /lib64/libpthread.so.0 (0x00000035bc400000)
    13. libxerces-c.so.28 = /ogg/./libxerces-c.so.28 (0x00007ff8221ad000)
    14. libantlr3c.so = /ogg/./libantlr3c.so (0x00007ff822097000)
    15. libnnz11.so = not found
    16. libclntsh.so.11.1 = not found
    17. libstdc++.so.6 = /usr/lib64/libstdc++.so.6 (0x00000035c7400000)
    18. libm.so.6 = /lib64/libm.so.6 (0x00000035bcc00000)
    19. libgcc_s.so.1 = /lib64/libgcc_s.so.1 (0x00000035c7000000)
    20. libc.so.6 = /lib64/libc.so.6 (0x00000035bc000000)
    21. /lib64/ld-linux-x86-64.so.2 (0x00000035bb800000)
    22. [oracle@tdb1 ~]$ cd $ORACLE_HOME/lib
    23. [oracle@tdb1 lib]$ ln -s libnnz10.so libnnz11.so
    24. [oracle@tdb1 lib]$ ln -s libclntsh.so libclntsh.so.11.1
    25. [oracle@tdb1 lib]$ ls -l libclntsh.so.11.1
    26. lrwxrwxrwx 1 oracle dba 12 Apr 11 22:33 libclntsh.so.11.1 - libclntsh.so
    27. [oracle@tdb1 lib]$ ls -l libnnz11.so
    28. lrwxrwxrwx 1 oracle dba 11 Apr 11 22:31 libnnz11.so - libnnz10.so

    c.打开hr.t1表级附加日志

    1. [ogg@tdb1 ogg]$ ./ggsci
    2. GGSCIDblogin userid ogg, password gg_888
    3. Add trandata hr.t1

    d.配置ogg manager

    1. [ogg@tdb1 ogg]$ ./ggsci
    2. GGSCIedit params mgr
    3. 内容如下,保存
    4. PORT 7809
    5. DYNAMICPORTLIST 7810-7860
    6. AUTORESTART EXTRACT *, RETRIES 5, WAITMINUTES 3, RESETMINUTES 60
    7. PURGEOLDEXTRACTS ./dirdat/*, usecheckpoints, minkeepdays 1
    8. LAGREPORTHOURS 1
    9. LAGINFOMINUTES 30
    10. LAGCRITICALMINUTES 45
    11. 启动OGG manager
    12. GGSCIstart mgr
    13. 查看manager进程状态,正确的形态如下:
    14. GGSCI (tdb1) 1 info mgr
    15. Manager is running (IP port tdb1.7809).

    e.创建Extract

    1. GGSCIedit params ext_test
    2. 内容如下,保存
    3. EXTRACT ext_test
    4. Setenv (NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
    5. USERID ogg, PASSWORD gg_888
    6. gettruncates
    7. DISCARDFILE ./dirrpt/ext_test.dsc, APPEND, MEGABYTES 1024
    8. DBOPTIONS ALLOWUNUSEDCOLUMN
    9. REPORTCOUNT EVERY 1 MINUTES, RATE
    10. WARNLONGTRANS 2h,CHECKINTERVAL 3m
    11. FETCHOPTIONS NOUSESNAPSHOT
    12. TRANLOGOPTIONS CONVERTUCS2CLOBS
    13. EXTTRAIL ./dirdat/te
    14. WILDCARDRESOLVE DYNAMIC
    15. GETUPDATEBEFORES
    16. NOCOMPRESSUPDATES
    17. NOCOMPRESSDELETES
    18. dynamicresolution
    19. table hr.t1;
    20. 添加抽取进程组:
    21. GGSCIadd extract ext_test, TRANLOG, BEGIN NOW
    22. 定义trail文件:
    23. GGSCI ADD EXTTRAIL ./dirdat/te, EXTRACT ext_test, MEGABYTES 200

    f.pump extract进程

    1. GGSCIedit param dpe_test
    2. 内容如下,保存
    3. EXTRACT dpe_test
    4. PASSTHRU
    5. RMTHOST 192.168.60.89, MGRPORT 7809
    6. RMTTRAIL ./dirdat/te
    7. DYNAMICRESOLUTION
    8. TABLE hr.t1;
    9. 添加pump捕获组:
    10. GGSCI ADD EXTRACT dpe_test, EXTTRAILSOURCE ./dirdat/te
    11. 定义pump trail文件:
    12. GGSCI ADD RMTTRAIL ./dirdat/te, EXTRACT dpe_test, MEGABYTES 200

    g.启动捕获进程

    1. GGSCI start extract ext_test;
    2. GGSCI start extract dpe_test;
    3. 看状态,如果如正就是对的:
    4. GGSCI info all
    5. Program Status Group Lag at Chkpt Time Since Chkpt
    6. MANAGER RUNNING
    7. EXTRACT RUNNING DPE_TEST 00:00:00 00:00:03
    8. EXTRACT RUNNING EXT_TEST 00:00:00 00:00:01

    4 接口机reps安装配置
    1)安装OGG for Big Data
    a.如源端类以,解压缩ogg for big data软件,放在$GG_HOME下
    b./etc/hosts文件

    1. [root@reps etc]# cd /etc
    2. [root@reps etc]# cat hosts
    3. 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
    4. ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
    5. 192.168.60.89 reps
    6. 192.168.60.91 kafka01
    7. 192.168.60.92 kafka02
    8. 192.168.60.93 kafka03
    9. [root@reps etc]#

    c.安装jdk 1.8及之以的版本
    ogg for big data 12.2.xx需要jdk 1.8以上的版本,我这里用的是1.8.0_121

    1. [ogg@reps ogg]$ java -version
    2. java version "1.8.0_121"
    3. Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
    4. Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)

    d.创建ogg用户,配置环境变量,安装kafka软件

    1. [root@reps etc]# su - ogg
    2. [ogg@reps ~]$ id
    3. uid=500(ogg) gid=501(ogg) groups=501(ogg)
    4. [ogg@reps ~]$ cat .bash_profile
    5. # .bash_profile
    6. # Get the aliases and functions
    7. if [ -f ~/.bashrc ]; then
    8. . ~/.bashrc
    9. fi
    10. # User specific environment and startup programs
    11. PATH=$PATH:$HOME/bin
    12. export PATH
    13. export OGG_HOME=/ogg
    14. export PATH=$PATH:$GG_HOME
    15. export LD_LIBRARY_PATH=$OGG_HOME:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64
    16. [ogg@reps ~]$
    17. [ogg@reps ~]$ ls -l
    18. total 8
    19. drwxrwxr-x 2 ogg ogg 4096 Apr 11 22:56 install
    20. drwxr-xr-x 6 ogg ogg 4096 Feb 15 01:28 kafka --把kafka软件包解压到这,也可以从kafka主机拷贝这个目录
    21. [ogg@reps ~]$
    22. [ogg@reps ~]$ cd /ogg
    23. [ogg@reps ogg]$ ls -l ggsci
    24. -rwxr-x--- 1 ogg ogg 39120528 Oct 20 07:05 ggsci
    25. [ogg@reps ogg]$

    2)配置OGG for kafka
    a.启动ogg,并创建相关子目录

    1. ./ggsci
    2. GGSCIcreate subdirs

    b.复制example

    1. cp $OGG_HOME/AdapterExamples/big-data/kafka/* $OGG_HOME/dirprm/

    c.配置manager

    1. GGSCIedit params mgr
    2. 内容如下:
    3. PORT 7809
    4. DYNAMICPORTLIST 7810-7860
    5. AUTORESTART REPLICAT *, RETRIES 5, WAITMINUTES 3, RESETMINUTES 60
    6. PURGEOLDEXTRACTS ./dirdat/*, usecheckpoints, minkeepdays 1
    7. LAGREPORTHOURS 1
    8. LAGINFOMINUTES 30
    9. LAGCRITICALMINUTES 45

    d.配置kafka.props
    内容如下:

    1. [ogg@reps dirprm]$ cat kafka.props
    2. gg.handlerlist = kafkahandler
    3. gg.handler.kafkahandler.type = kafka
    4. gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
    5. gg.handler.kafkahandler.TopicName =oggtest
    6. gg.handler.kafkahandler.format =avro_op
    7. gg.handler.kafkahandler.format=delimitedtext
    8. gg.handler.kafkahandler.format.fieldDelimiter=|
    9. gg.handler.kafkahandler.SchemaTopicName=myoggtest
    10. gg.handler.kafkahandler.BlockingSend =false
    11. gg.handler.kafkahandler.includeTokens=false
    12. gg.handler.kafkahandler.mode =op
    13. #gg.handler.kafkahandler.maxGroupSize =100, 1Mb
    14. #gg.handler.kafkahandler.minGroupSize =50, 500Kb
    15. goldengate.userexit.timestamp=utc
    16. goldengate.userexit.writers=javawriter
    17. javawriter.stats.display=TRUE
    18. javawriter.stats.full=TRUE
    19. gg.log=log4j
    20. gg.log.level=INFO
    21. gg.report.time=30sec
    22. #Sample gg.classpath for Apache Kafka
    23. gg.classpath=dirprm/:/home/ogg/kafka/libs/*
    24. #Sample gg.classpath for HDP
    25. #gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*
    26. javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

    说明:
    gg.handler.kafkahandler.TopicName必须指定kafka端定义的topic
    gg.handler.kafkahandler.format下面配置使用文本,并用”|”相隔,kafka最终接收到如下格式的消息。
    gg.classpath须指定相应的lib路径
    e.配置custom_kafka_producer.properties
    内容如下:

    1. bootstrap.servers=kafka01:9092,kafka02:9092,kafka03:9092
    2. acks=1
    3. compression.type=gzip
    4. reconnect.backoff.ms=1000
    5. value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    6. key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    7. # 100KB per partition
    8. batch.size=102400
    9. linger.ms=10000

    3)表结构传递
    源端:

    1. GGSCI ggsci
    2. GGSCI edit param defgen
    3. 内容
    4. DEFSFILE dirdef/source.def, PURGE
    5. USERID ogg, PASSWORD gg_888
    6. TABLE hr.t1 ;
    7. [ogg@tdb1 ogg]$ defgen paramfile dirprm/defgen.prm --shell命令

    把defgen.prm放到接口机(reps)的/ogg/dirdef/下
    4)定义replication
    a.定义参数

    1. ./ggsci
    2. GGSCIedit params rep_test
    3. 输入如下内容:
    4. REPLICAT rep_test
    5. TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
    6. SOURCEDEFS dirdef/source.def
    7. REPORTCOUNT EVERY 1 MINUTES, RATE
    8. GROUPTRANSOPS 10000
    9. MAP hr.*, TARGET hr.*;

    b.指定Trail文件

    1. GGSCI add replicat rep_test, exttrail ./dirdat/te

    c.启动replicat进程,并检查状态

    1. GGSCI start replicat rep_test
    2. 检查状态,类似如下输出表示正常
    3. GGSCI (reps) 1 info all
    4. Program Status Group Lag at Chkpt Time Since Chkpt
    5. MANAGER RUNNING
    6. REPLICAT RUNNING REP_TEST 00:00:00 00:00:05

    其它:如果replicat进程启动不了,可以使用如下命令启动,以方便诊断问题:

    1. cd $OGG_HOME
    2. ./replicat paramfile dirprm/rep_test.prm

    5 测试验证
    1)启动kafka consumerconsole
    kafka任一结点:

    1. ./kafka-console-consumer.sh -zookeeper :2181 -topic oggtest -from-beginning

    2)在源端测试表中插入数据

    1. sqlplus hr/hr
    2. SQL insert into t1 values(5,'shihaifeng');
    3. 1 row created.
    4. SQL commit;
    5. Commit complete.

    3)查看kafka消费控制台是否接收到该行数据
    我的有如下显示:

    1. I|HR.T1|2017-04-13 03:31:03.835147|2017-04-13T11:31:08.973000|00000000000000001478|5|shihaifeng

    内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/52959.html

    (0)

    相关推荐

    • cordic的FPGA概念与算法推导是怎样的

      技术cordic的FPGA概念与算法推导是怎样的cordic的FPGA概念与算法推导是怎样的,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。一、CORDI

      攻略 2021年11月23日
    • 数据库迁移如此复杂的原因是什么

      技术数据库迁移如此复杂的原因是什么这篇文章主要介绍“数据库迁移如此复杂的原因是什么”,在日常操作中,相信很多人在数据库迁移如此复杂的原因是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”

      攻略 2021年10月23日
    • Redis中的bitmap是什么

      技术Redis中的bitmap是什么本篇内容主要讲解“Redis中的bitmap是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Redis中的bitmap是什么”吧!Re

      攻略 2021年12月3日
    • 远近闻名类似的词语,用下列词语编一个童话故事

      技术远近闻名类似的词语,用下列词语编一个童话故事卡尔是 远近闻名的制剑师摩尔的小儿子远近闻名类似的词语,他不仅剑制的的 好,剑术也是一流的,可以和国王的卫队相提并论。听说伯爵悬赏勇士去杀掉山上的恶龙,卡尔不假思索拿上最合

      生活 2021年10月25日
    • 方便的英语,最短的英文单词50个加分

      技术方便的英语,最短的英文单词50个加分am law bit jar
      bear jaw
      by lot
      blue

      生活 2021年10月22日
    • 什么是基于SQL2005的CLR存储过程

      技术基于SQL2005的CLR存储过程是怎样的基于SQL2005的CLR存储过程是怎样的,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。在 S

      攻略 2021年12月19日