合 使用OGG传统模式将PG同步到kafka(全量+增量)
环境准备
PG环境
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | -- 创建专用网络 docker network create --subnet=172.72.6.0/24 pg-network -- PG docker rm -f lhrpg docker run -d --name lhrpg -h lhrpg \ -p 64320:5432 --net=pg-network --ip 172.72.6.54 \ -e POSTGRES_PASSWORD=lhr \ -e TZ=Asia/Shanghai \ postgres:14.2 psql -U postgres -h 192.168.1.35 -p 64320 create database lhrdb; \c lhrdb create table t1(id int primary key); create table t2(id int primary key); create schema ogg; -- 需要重启库 alter system set wal_level='logical'; select pg_reload_conf(); docker restart lhrpg sysbench /usr/share/sysbench/oltp_common.lua --db-driver=pgsql \ --pgsql-host=172.72.6.54 --pgsql-port=5432 \ --pgsql-user=postgres --pgsql-password=lhr --pgsql-db=lhrdb \ --table-size=100 --tables=10 --threads=10 \ --events=999999999 --time=60 prepare psql -U postgres -h 192.168.1.35 -p 64320 -d lhrdb lhrdb=# \dt List of relations Schema | Name | Type | Owner --------+----------+-------+---------- public | sbtest1 | table | postgres public | sbtest10 | table | postgres public | sbtest2 | table | postgres public | sbtest3 | table | postgres public | sbtest4 | table | postgres public | sbtest5 | table | postgres public | sbtest6 | table | postgres public | sbtest7 | table | postgres public | sbtest8 | table | postgres public | sbtest9 | table | postgres public | t1 | table | postgres public | t2 | table | postgres (12 rows) |
目标端kafka环境
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | docker pull lhrbest/kafka:3.2.0 docker rm -f lhrkafka docker run -itd --name lhrkafka -h lhrkafka \ --net=pg-network --ip 172.72.6.55 \ -p 9092:9092 -p 2181:2181 \ -v /sys/fs/cgroup:/sys/fs/cgroup \ --privileged=true lhrbest/kafka:3.2.0 \ /usr/sbin/init docker exec -it lhrkafka bash -- 启动(默认已启动) /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & [root@lhrkafka /]# jps 161 QuorumPeerMain 162 Kafka 1127 Jps [root@lhrkafka /]# ps -ef|grep java root 161 1 7 14:20 ? 00:00:03 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/zookeeper-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar org.apache.zookeeper.server.quorum.QuorumPeerMain /usr/local/kafka/config/zookeeper.properties root 162 1 30 14:20 ? 00:00:14 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999 -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar kafka.Kafka /usr/local/kafka/config/server.properties root 1167 961 0 14:20 pts/1 00:00:00 grep --color=auto java [root@lhrkafka /]# netstat -tulnp | grep java tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 161/java tcp 0 0 0.0.0.0:9999 0.0.0.0:* LISTEN 162/java tcp 0 0 0.0.0.0:37691 0.0.0.0:* LISTEN 161/java tcp 0 0 0.0.0.0:40831 0.0.0.0:* LISTEN 162/java tcp 0 0 0.0.0.0:38977 0.0.0.0:* LISTEN 162/java tcp 0 0 0.0.0.0:9092 0.0.0.0:* LISTEN 162/java |
kafka默认占用9092端口,ZK默认占用2181端口。
kafka日志:
1 | tailf /usr/local/kafka/logs/server.log |
测试一下,在服务器上创建一个topic为test,然后生产几条信息:
1 2 3 4 5 6 7 8 9 10 | -- 生产者 /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test >hello >world -- 在另一台机器上,开启消费者控制台,监听test的topic,发现可以收到数据 /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning hello word |
源端OGG for PG环境
OGG下载地址:https://www.oracle.com/middleware/technologies/goldengate-downloads.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | -- OGG机器,同时包含pg和bigdata docker rm -f lhrogg21all docker run -d --name lhrogg21all -h lhrogg21all \ --net=pg-network --ip 172.72.6.50 \ -p 39391:3389 -p 37809-37819:7809-7819 \ -v /sys/fs/cgroup:/sys/fs/cgroup \ --privileged=true lhrbest/ogg21all:v6.0 \ /usr/sbin/init docker exec -it lhrogg21all bash su - pg ogg -- ODBC cat > /oggpg/odbc.ini <<"EOF" [ODBC Data Sources] LHRPGDSN=DataDirect 13 PostgreSQL Wire Protocol [ODBC] IANAAppCodePage=106 InstallDir=/oggpg [PGDSN] Driver=/oggpg/lib/GGpsql25.so #Driver=/usr/lib64/psqlodbcw.so Description=DataDirect 13 PostgreSQL Wire Protocol Database=lhrdb HostName=172.72.6.54 PortNumber=5432 LogonID=postgres Password=lhr EOF -- mgr edit params mgr port 7809 start mgr DBLOGIN SOURCEDB PGDSN USERID postgres PASSWORD lhr ADD TRANDATA public.* |
过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | GGSCI (lhrogg21all) 2> DBLOGIN SOURCEDB PGDSN USERID postgres PASSWORD lhr 2022-07-22 17:08:36 INFO OGG-03036 Database character set identified as UTF-8. Locale: en_US.utf8. 2022-07-22 17:08:36 INFO OGG-03037 Session character set identified as UTF-8. Successfully logged into database. GGSCI (lhrogg21all as postgres@PGDSN) 3> ADD TRANDATA public.* Logging of supplemental log data is already enabled for table public.sbtest1 with REPLICA IDENTITY set to DEFAULT Logging of supplemental log data is already enabled for table public.sbtest10 with REPLICA IDENTITY set to DEFAULT Logging of supplemental log data is already enabled for table public.sbtest2 with REPLICA IDENTITY set to DEFAULT Logging of supplemental log data is already enabled for table public.sbtest3 with REPLICA IDENTITY set to DEFAULT Logging of supplemental log data is already enabled for table public.sbtest4 with REPLICA IDENTITY set to DEFAULT Logging of supplemental log data is already enabled for table public.sbtest5 with REPLICA IDENTITY set to DEFAULT Logging of supplemental log data is already enabled for table public.sbtest6 with REPLICA IDENTITY set to DEFAULT Logging of supplemental log data is already enabled for table public.sbtest7 with REPLICA IDENTITY set to DEFAULT Logging of supplemental log data is already enabled for table public.sbtest8 with REPLICA IDENTITY set to DEFAULT Logging of supplemental log data is already enabled for table public.sbtest9 with REPLICA IDENTITY set to DEFAULT Logging of supplemental log data is already enabled for table public.t1 with REPLICA IDENTITY set to DEFAULT Logging of supplemental log data is already enabled for table public.t2 with REPLICA IDENTITY set to DEFAULT GGSCI (lhrogg21all as postgres@PGDSN) 4> |
目标端OGG for bigdata环境
OGG下载地址:https://www.oracle.com/middleware/technologies/goldengate-downloads.html
1 2 3 4 5 6 7 8 9 10 | docker exec -it lhrogg21all bash su - bigdata ogg -- 解决OGG-01201:Error reported by MGR : Access denied edit params mgr port 8809 ACCESSRULE, PROG *, IPADDR *, ALLOW |
全量同步
注意:在此阶段,源端需要停业务,不能产生新数据。
PG端配置
OGG初始化可以将数据直接输入目标端,也可以先抽取到本地,然后再输入目标端,这里我们直接同步到目标端的kafka里,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | -- PG端 edit params ext0 EXTRACT ext0 SETENV(PGCLIENTENCODING = "UTF8" ) SETENV(ODBCINI="/oggpg/odbc.ini" ) SOURCEDB PGDSN, userid postgres, password lhr rmthost 127.0.0.1,mgrport 8809 rmttask replicat,group rep0 TABLE public.*; add extract ext0 ,sourceistable -- 启动mgr start mgr |
- SOURCEISTABLE指示Extract直接从源表中读取完整的记录。
kafka端配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | edit params rep0 REPLICAT rep0 targetdb libfile libggjava.so set property=./dirprm/kafka.props map public.*, target public.*; add replicat rep0 ,specialrun start mgr -- 配置kafka参数 vi /oggbigdata/dirprm/kafka.props gg.handler.kafkahandler.schemaTopicName=LHR_OGG vi /oggbigdata/dirprm/custom_kafka_producer.properties bootstrap.servers=172.72.6.55:9092 |
SPECIALRUN –将replicat设定为一次性运行,不需要checkpoint
END RUNTIME –当load完成后终结replicat
gg.handler.kafkahandler.topicMappingTemplate:kafka topic名称的映射,指定topic名称,也可以通过占位符的方式,例如${tableName},每一张表对应一个topic。