合 使用OGG微服务将Oracle同步到kafka(全量+增量)
Tags: OracleOGG数据迁移OGG微服务Kafka数据同步
类似可以参考:使用OGG传统模式将Oracle同步到kafka(全量+增量)
环境准备
Oracle环境
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | -- 创建专用网络 docker network create --subnet=172.72.7.0/24 ora-network -- oracle 压测工具 docker pull lhrbest/lhrdbbench:1.0 docker rm -f lhrdbbench docker run -d --name lhrdbbench -h lhrdbbench \ --net=ora-network --ip 172.72.7.33 \ -v /sys/fs/cgroup:/sys/fs/cgroup \ --privileged=true lhrbest/lhrdbbench:1.0 \ /usr/sbin/init -- Oracle 12c docker rm -f lhrora1221 docker run -itd --name lhrora1221 -h lhrora1221 \ --net=ora-network --ip 172.72.7.34 \ -p 1526:1521 -p 3396:3389 \ --privileged=true \ lhrbest/oracle_12cr2_ee_lhr_12.2.0.1:2.0 init -- oracle数据库配置 1.开启数据库归档--如果没有开启 2.开启数据库级别附加日志--如果没有开始最小附加日志 3.开启强制日志--如果没有开启强制日志 4.设置ENABLE_GOLDENGATE_REPLICAT参数为TRUE 5.创建OGG用户包括包括源端用户、目标端用户以及OGG抽取用户 alter database add supplemental log data; alter database add supplemental log data (all) columns; alter database force logging; alter system set enable_goldengate_replication=TRUE; select name,supplemental_log_data_min , force_logging, log_mode from v$database; alter system set streams_pool_size = 128M; alter system set sga_max_size = 2g scope=spfile; alter system set sga_target = 2g scope=spfile; alter system set pga_aggregate_target=1g; startup force -- OGG管理用户 CREATE USER ogg identified by lhr; GRANT DBA to ogg; grant SELECT ANY DICTIONARY to ogg; GRANT EXECUTE ON SYS.DBMS_LOCK TO ogg; grant select any transaction to ogg; grant select any table to ogg; grant flashback any table to ogg; grant alter any table to ogg; exec dbms_goldengate_auth.grant_admin_privilege('OGG','*',TRUE); -- 业务用户 CREATE USER lhr identified by lhr; alter user lhr identified by lhr; GRANT DBA to lhr ; grant SELECT ANY DICTIONARY to lhr; GRANT EXECUTE ON SYS.DBMS_LOCK TO lhr; -- 启动监听 vi /u01/app/oracle/product/12.2.0.1/dbhome_1/network/admin/listener.ora lsnrctl start lsnrctl status |
Oracle数据初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | -- 源端数据初始化 /usr/local/swingbench/bin/oewizard -s -create -c /usr/local/swingbench/wizardconfigs/oewizard.xml -create \ -version 2.0 -cs //172.72.7.34/lhrsdb -dba "sys as sysdba" -dbap lhr -dt thin \ -ts users -u lhr -p lhr -allindexes -scale 0.0001 -tc 16 -v -cl col TABLE_NAME format a30 SELECT a.table_name,a.num_rows FROM dba_tables a where a.OWNER='LHR' ; select object_type,count(*) from dba_objects where owner='LHR' group by object_type; select object_type,status,count(*) from dba_objects where owner='LHR' group by object_type,status; select sum(bytes)/1024/1024 from dba_segments where owner='LHR'; -- 检查键是否正确:https://www.dbaup.com/ogg-01296-biaoyouzhujianhuoweiyijiandanshirengranshiyongquanbulielaijiexixing.html -- 否则OGG启动后,会报错:OGG-01296、OGG-06439、OGG-01169 Encountered an update where all key columns for target table LHR.ORDER_ITEMS are not present. select owner, constraint_name, constraint_type, status, validated from dba_constraints where owner='LHR' and VALIDATED='NOT VALIDATED'; select 'alter table lhr.'||TABLE_NAME||' enable validate constraint '||CONSTRAINT_NAME||';' from dba_constraints where owner='LHR' and VALIDATED='NOT VALIDATED'; -- 删除外键 SELECT 'ALTER TABLE LHR.'|| D.TABLE_NAME ||' DROP constraint '|| D.CONSTRAINT_NAME||';' FROM DBA_constraints d where owner='LHR' and d.CONSTRAINT_TYPE='R'; sqlplus lhr/lhr@172.72.7.34:1521/lhrsdb @/oggoracle/demo_ora_create.sql @/oggoracle/demo_ora_insert.sql SQL> select * from tcustmer; CUST NAME CITY ST ---- ------------------------------ -------------------- -- WILL BG SOFTWARE CO. SEATTLE WA JANE ROCKY FLYER INC. DENVER CO -- 创建2个clob和blob类型的表 sqlplus lhr/lhr@172.72.7.34:1521/lhrsdb @/oggoracle/demo_ora_lob_create.sql exec testing_lobs; select * from lhr.TSRSLOB; drop table IMAGE_LOB; CREATE TABLE IMAGE_LOB ( T_ID VARCHAR2 (5) NOT NULL, T_IMAGE BLOB, T_CLOB CLOB ); -- 插入blob文件 CREATE OR REPLACE DIRECTORY D1 AS '/home/oracle/'; grant all on DIRECTORY D1 TO PUBLIC; CREATE OR REPLACE NONEDITIONABLE PROCEDURE IMG_INSERT(TID VARCHAR2, FILENAME VARCHAR2, name VARCHAR2) AS F_LOB BFILE; B_LOB BLOB; BEGIN INSERT INTO IMAGE_LOB (T_ID, T_IMAGE,T_CLOB) VALUES (TID, EMPTY_BLOB(),name) RETURN T_IMAGE INTO B_LOB; F_LOB := BFILENAME('D1', FILENAME); DBMS_LOB.FILEOPEN(F_LOB, DBMS_LOB.FILE_READONLY); DBMS_LOB.LOADFROMFILE(B_LOB, F_LOB, DBMS_LOB.GETLENGTH(F_LOB)); DBMS_LOB.FILECLOSE(F_LOB); COMMIT; END; / BEGIN IMG_INSERT('1','1.jpg','dbaup.com'); IMG_INSERT('2','2.jpg','www.dbaup.com'); END; / select * from IMAGE_LOB; ----- oracle所有表 SQL> select * from tab; TNAME TABTYPE CLUSTERID ------------------------------ ------- ---------- ADDRESSES TABLE CARD_DETAILS TABLE CUSTOMERS TABLE IMAGE_LOB TABLE INVENTORIES TABLE LOGON TABLE ORDERENTRY_METADATA TABLE ORDERS TABLE ORDER_ITEMS TABLE PRODUCTS VIEW PRODUCT_DESCRIPTIONS TABLE PRODUCT_INFORMATION TABLE PRODUCT_PRICES VIEW TCUSTMER TABLE TCUSTORD TABLE TSRSLOB TABLE TTRGVAR TABLE WAREHOUSES TABLE 18 rows selected. SELECT COUNT(*) FROM LHR.ADDRESSES UNION ALL SELECT COUNT(*) FROM LHR.CARD_DETAILS UNION ALL SELECT COUNT(*) FROM LHR.CUSTOMERS UNION ALL SELECT COUNT(*) FROM LHR.IMAGE_LOB UNION ALL SELECT COUNT(*) FROM LHR.INVENTORIES UNION ALL SELECT COUNT(*) FROM LHR.LOGON UNION ALL SELECT COUNT(*) FROM LHR.ORDERENTRY_METADATA UNION ALL SELECT COUNT(*) FROM LHR.ORDERS UNION ALL SELECT COUNT(*) FROM LHR.ORDER_ITEMS UNION ALL SELECT COUNT(*) FROM LHR.PRODUCT_DESCRIPTIONS UNION ALL SELECT COUNT(*) FROM LHR.PRODUCT_INFORMATION UNION ALL SELECT COUNT(*) FROM LHR.TCUSTMER UNION ALL SELECT COUNT(*) FROM LHR.TCUSTORD UNION ALL SELECT COUNT(*) FROM LHR.TSRSLOB UNION ALL SELECT COUNT(*) FROM LHR.TTRGVAR UNION ALL SELECT COUNT(*) FROM LHR.WAREHOUSES ; COUNT(*) ---------- 150 150 100 2 900724 239 4 143 773 1000 1000 2 2 1 0 1000 16 rows selected. |
最终,在Oracle端共包括16张表,2个视图,其中2个表TSRSLOB和IMAGE_LOB包括了blob和clob字段。
目标端kafka环境
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | docker pull lhrbest/kafka:3.2.0 docker rm -f lhrkafka docker run -itd --name lhrkafka -h lhrkafka \ --net=ora-network --ip 172.72.7.44 \ -p 9092:9092 -p 2181:2181 \ -v /sys/fs/cgroup:/sys/fs/cgroup \ --privileged=true lhrbest/kafka:3.2.0 \ /usr/sbin/init docker exec -it lhrkafka bash -- 启动(默认已启动) /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & [root@lhrkafka /]# jps 161 QuorumPeerMain 162 Kafka 1127 Jps [root@lhrkafka /]# ps -ef|grep java root 161 1 7 14:20 ? 00:00:03 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/zookeeper-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar org.apache.zookeeper.server.quorum.QuorumPeerMain /usr/local/kafka/config/zookeeper.properties root 162 1 30 14:20 ? 00:00:14 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999 -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar kafka.Kafka /usr/local/kafka/config/server.properties root 1167 961 0 14:20 pts/1 00:00:00 grep --color=auto java [root@lhrkafka /]# netstat -tulnp | grep java tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 161/java tcp 0 0 0.0.0.0:9999 0.0.0.0:* LISTEN 162/java tcp 0 0 0.0.0.0:37691 0.0.0.0:* LISTEN 161/java tcp 0 0 0.0.0.0:40831 0.0.0.0:* LISTEN 162/java tcp 0 0 0.0.0.0:38977 0.0.0.0:* LISTEN 162/java tcp 0 0 0.0.0.0:9092 0.0.0.0:* LISTEN 162/java |
kafka默认占用9092端口,ZK默认占用2181端口。
kafka日志:
1 | tailf /usr/local/kafka/logs/server.log |
测试一下,在服务器上创建一个topic为test,然后生产几条信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 | -- 生产者 /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test >hello >world -- 在另一台机器上,开启消费者控制台,监听test的topic,发现可以收到数据 /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning hello word -- 查看当前服务器中的所有 topic /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092 |
源端OGG for Oracle微服务环境
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | -- OGG机器 docker pull lhrbest/ogg213maoracle:v1.0 docker rm -f lhrogg213maoracle docker run -d --name lhrogg213maoracle -h lhrogg213maoracle \ --net=ora-network --ip 172.72.7.100 \ -p 9391:3389 -p 29000-29005:9000-9005 \ -v /sys/fs/cgroup:/sys/fs/cgroup \ --privileged=true lhrbest/ogg213maoracle:v1.0 \ /usr/sbin/init docker exec -it lhrogg213maoracle bash su - oracle adminclient CONNECT http://127.0.0.1:9000 deployment deploy213 as oggadmin password lhr |
访问:http://192.168.1.35:29001 ,用户名:oggadmin,密码:lhr
创建身份证明、添加trandata
1 | ogg@172.72.7.34/lhrsdb |
目标端OGG for bigdata微服务环境
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | docker pull lhrbest/ogg214mabigdata:v1.0 docker rm -f lhrogg214mabigdata docker run -d --name lhrogg214mabigdata -h lhrogg214mabigdata \ --net=ora-network --ip 172.72.7.101 \ -p 9191:3389 -p 9000-9005:9000-9005 \ -v /sys/fs/cgroup:/sys/fs/cgroup \ --privileged=true lhrbest/ogg214mabigdata:v1.0 \ /usr/sbin/init docker exec -it lhrogg214mabigdata bash -- 配置kafka参数 vi /ogg214c/ogg_deploy/etc/conf/ogg/kafka.props gg.handler.kafkahandler.schemaTopicName=LHR_OGG vi /ogg214c/ogg_deploy/etc/conf/ogg/custom_kafka_producer.properties bootstrap.servers=172.72.7.44:9092 |
访问:http://192.168.1.35:9001 ,用户名:oggadmin,密码:lhr
全量同步
注意:在此阶段,源端需要停业务,不能产生新数据。
源端创建初始化加载
1 2 3 4 5 6 7 | EXTRACT ext0 USERIDALIAS ora12c domain OGGMA rmthost 172.72.7.101,mgrport 9003 rmtfile ./dirdat/e0 tableexclude LHR.PRODUCTS; tableexclude LHR.PRODUCT_PRICES; TABLE LHR.*; |
查询报告,说明数据已经传输到目标端了,如下:
进入OS查询:
1 2 3 4 5 6 7 8 9 | [root@lhrogg214mabigdata dirdat]# pwd /ogg214c/ogg_deploy/var/lib/data/dirdat [root@lhrogg214mabigdata dirdat]# ll total 84236 -rw-r----- 1 oracle oinstall 86256166 Jul 22 12:52 e0000000 [root@lhrogg214mabigdata dirdat]# [root@lhrogg214mabigdata dirdat]# ll -h total 83M -rw-r----- 1 oracle oinstall 83M Jul 22 12:52 e0000000 |
目标端kafka数据全量初始化
1 2 3 4 | REPLICAT rep0 targetdb libfile libggjava.so set property=/ogg214c/ogg_deploy/etc/conf/ogg/kafka.props end runtime map LHR.*, target LHR.*; |
运行完后,自动停止:
全量同步结果检查
1 2 3 4 5 6 7 8 9 10 | -- 查看所有历史数据 /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic LHR_OGG --from-beginning -- 查看当前服务器中的所有 topic /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092 -- topic详情 /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic LHR_OGG |
一张表一个主题,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | [root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092 __consumer_offsets test [root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092 ADDRESSES CARD_DETAILS CUSTOMERS EMP IMAGE_LOB INVENTORIES LHR_OGG LOGON ORDERENTRY_METADATA ORDERS ORDER_ITEMS PRODUCT_DESCRIPTIONS PRODUCT_INFORMATION TCUSTMER TCUSTORD TSRSLOB WAREHOUSES __consumer_offsets test [root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092 | wc -l 19 [root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic WAREHOUSES Topic: WAREHOUSES TopicId: HR3273rMTK6JsQt8OTjKNA PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: WAREHOUSES Partition: 0 Leader: 0 Replicas: 0 Isr: 0 [root@lhrkafka /]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic WAREHOUSES --from-beginning | wc -l ^CProcessed a total of 1000 messages |
支持Markdown