合 PG逻辑复制插件之pglogical使用说明
简介
参考:
https://gitee.com/mirrors/pglogical
https://github.com/2ndQuadrant/pglogical
https://www.dbaup.com/pgluojifuzhichajianzhipglogicalguanfangshuoming.html
Logical Replication extension for PostgreSQL 14、13, 12, 11, 10, 9.6, 9.5, 9.4 (Postgres), providing much faster replication than Slony, Bucardo or Londiste, as well as cross-version upgrades.
pglogical 是 PostgreSQL 的拓展模块, 为 PostgreSQL 数据库提供了逻辑流复制发布和订阅的功能。pglogical 重用了 BDR 项目中的一部分相关技术。pglogical 是一个完全作为PostgreSQL 扩展实现的逻辑复制系统。完全集成,它不需要触发器或外部程序。这种物理复制的替代方法是使用发布/订阅模型复制数据以进行选择性复制的一种高效方法。支持 PG14、13、12、11、10、9.6、9.5、9.4 ,提供比 Slony、Bucardo 或 Londiste 更快的复制速度,以及跨版本升级。
我们使用的下列术语来描述节点和数据流之间的关系,重用了一些早期的Slony技术中的术语:
- 节点 - PostgreSQL数据库实例
- 发布者和订阅者 - 节点的角色名称
- 复制集 - 关系表的集合
pglogical是新技术组件,使用了最新的PostgreSQL 数据库中的一些核心功能,所以存在一些数据库版本限制:
- 数据源发布和订阅节点需要运行 PostgreSQL 9.4 +
- 复制源过滤和冲突检测需要 PostgreSQL 9.5 +
支持的使用场景:
- 主版本数据库之间的升级(存在上述的版本限制)
- 完整的数据库复制
- 利用复制集,选择性的筛选的关系表
- 可从多个上游服务器,做数据的聚集和合并
架构上的细节︰
- pglogical 工作在每个数据库层面上,而不是像物理流复制一样工作在整个数据库集簇实例级别
- 一个发布程序提供给多个订阅者不会引起额外的磁盘写开销
- 一个订阅服务器可以从几个起源的更改合并和检测与自动和可配置冲突决议 (一些,但并不是所有方面所需的多主机)的更改之间的冲突。
- 级联复制是在变更集转发的过程中实现的。
必要条件
- 要使用pglogical,提供发布和订阅服务器必须运行PostgreSQL 9.4或更高版本。
- pglogical扩展必须同时安装在提供发布和订阅服务器上。您必须同时创建扩展“CREATE EXTENSION pglogical;”。
- 提供发布和订阅服务器上的表必须具有相同的名称并位于相同的schema中。将来的修订版可能会添加映射功能。
- 提供发布和订阅服务器上的表必须具有相同的列,每列中的数据类型相同。订阅服务器上的CHECK约束、NOT NULL约束等必须与提供发布相同或较弱(更宽松)。
- 表必须具有相同的主键。不建议添加主键以外的其他唯一约束。
安装插件
安装包安装
yum安装
1、安装yum源:
- PostgreSQL 9.4:
curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/9.4/rpm | bash
- PostgreSQL 9.5:
curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/9.5/rpm | bash
- PostgreSQL 9.6:
curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/9.6/rpm | bash
- PostgreSQL 10:
curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/10/rpm | bash
- PostgreSQL 11:
curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/11/rpm | bash
- PostgreSQL 12:
curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/12/rpm | bash
- PostgreSQL 13:
curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/13/rpm | bash
- PostgreSQL 14:
curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/14/rpm | bash
2、安装插件
- PostgreSQL 9.4:
yum install postgresql94-pglogical
- PostgreSQL 9.5:
yum install postgresql95-pglogical
- PostgreSQL 9.6:
yum install postgresql96-pglogical
- PostgreSQL 10:
yum install postgresql10-pglogical
- PostgreSQL 11:
yum install postgresql11-pglogical
- PostgreSQL 12:
yum install postgresql12-pglogical
- PostgreSQL 13:
yum install postgresql13-pglogical
- PostgreSQL 14:
yum install postgresql14-pglogical
APT安装
1、安装源:
1 | curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/deb | bash |
2、安装插件
- PostgreSQL 9.4:
sudo apt-get install postgresql-9.4-pglogical
- PostgreSQL 9.5:
sudo apt-get install postgresql-9.5-pglogical
- PostgreSQL 9.6:
sudo apt-get install postgresql-9.6-pglogical
- PostgreSQL 10:
sudo apt-get install postgresql-10-pglogical
- PostgreSQL 11:
sudo apt-get install postgresql-11-pglogical
- PostgreSQL 12:
sudo apt-get install postgresql-12-pglogical
- PostgreSQL 13:
sudo apt-get install postgresql-13-pglogical
- PostgreSQL 14:
sudo apt-get install postgresql-14-pglogical
编译安装
https://github.com/2ndQuadrant/pglogical/releases
Source code installs are the same as for any other PostgreSQL extension built using PGXS.
Make sure the directory containing pg_config
from the PostgreSQL release is listed in your PATH
environment variable. You might have to install a -dev
or -devel
package for your PostgreSQL release from your package manager if you don't have pg_config
.
Then run make
to compile, and make install
to install. You might need to use sudo
for the install step.
1 2 3 4 5 6 7 8 9 | wget https://codeload.github.com/2ndQuadrant/pglogical/tar.gz/refs/tags/REL2_4_1 -O pglogical-REL2_4_1.tar.gz tar -zxvf pglogical-REL2_4_1.tar.gz cd pglogical-REL2_4_1 which pg_config USE_PGXS=1 make clean USE_PGXS=1 make USE_PGXS=1 make install |
安装完之后,查看:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | postgres=# select * from pg_available_extensions where name='pglogical'; name | default_version | installed_version | comment -----------+-----------------+-------------------+-------------------------------- pglogical | 2.4.1 | | PostgreSQL Logical Replication (1 row) postgres=# \dx List of installed extensions Name | Version | Schema | Description --------------------+---------+------------+------------------------------------------------------------------------ pageinspect | 1.8 | public | inspect the contents of database pages at a low level pg_recovery | 1.0 | public | recovery table data of update/delete/rollback rows and drop columns pg_stat_statements | 1.8 | public | track planning and execution statistics of all SQL statements executed plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language (4 rows) |
使用配置
配置参数
首先 PostgreSQL服务器必须正确配置才能够支持逻辑解码︰
1 2 3 4 5 6 7 8 | wal_level = 'logical' # one per database needed on (provider/subscriber)provider node max_worker_processes = 10 # one per node needed on provider node max_replication_slots = 10 # one per node needed on provider node max_wal_senders = 10 shared_preload_libraries = 'pglogical' |
配置:
1 2 3 4 5 6 | alter system set shared_preload_libraries = 'pglogical'; alter system set wal_level = 'logical'; select pg_reload_conf(); -- 由于都是postmaster类型的参数,所以需要重启库 |
操作过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | postgres=# select * from pg_settings where name in ('wal_level','max_worker_processes','max_replication_slots','max_wal_senders','shared_preload_libraries'); name | setting | unit | category | short_desc | extra_desc | context | vartype | source | min_val | max_val | enumvals | boot_val | reset_val | sourcefile | sourceline | pending_restart --------------------------+---------+------+--------------------------------------------------------+-------------------------------------------------------------------------+------------+------------+---------+---------+---------+---------+---------------------------+----------+-----------+------------+------------+----------------- max_replication_slots | 10 | | Replication / Sending Servers | Sets the maximum number of simultaneously defined replication slots. | | postmaster | integer | default | 0 | 262143 | | 10 | 10 | | | f max_wal_senders | 10 | | Replication / Sending Servers | Sets the maximum number of simultaneously running WAL sender processes. | | postmaster | integer | default | 0 | 262143 | | 10 | 10 | | | f max_worker_processes | 8 | | Resource Usage / Asynchronous Behavior | Maximum number of concurrent worker processes. | | postmaster | integer | default | 0 | 262143 | | 8 | 8 | | | f shared_preload_libraries | | | Client Connection Defaults / Shared Library Preloading | Lists shared libraries to preload into server. | | postmaster | string | default | | | | | | | | f wal_level | replica | | Write-Ahead Log / Settings | Set the level of information written to the WAL. | | postmaster | enum | default | | | {minimal,replica,logical} | replica | replica | | | f (5 rows) postgres=# alter system set shared_preload_libraries = 'pglogical'; ALTER SYSTEM postgres=# alter system set wal_level = 'logical'; ALTER SYSTEM postgres=# select * from pg_settings where name in ('wal_level','max_worker_processes','max_replication_slots','max_wal_senders','shared_preload_libraries'); name | setting | unit | category | short_desc | extra_desc | context | vartype | source | min_val | max_val | enumvals | boot_val | reset_val | sourcefile | sourceline | pending_restart --------------------------+---------+------+--------------------------------------------------------+-------------------------------------------------------------------------+------------+------------+---------+---------+---------+---------+---------------------------+----------+-----------+------------+------------+----------------- max_replication_slots | 10 | | Replication / Sending Servers | Sets the maximum number of simultaneously defined replication slots. | | postmaster | integer | default | 0 | 262143 | | 10 | 10 | | | f max_wal_senders | 10 | | Replication / Sending Servers | Sets the maximum number of simultaneously running WAL sender processes. | | postmaster | integer | default | 0 | 262143 | | 10 | 10 | | | f max_worker_processes | 8 | | Resource Usage / Asynchronous Behavior | Maximum number of concurrent worker processes. | | postmaster | integer | default | 0 | 262143 | | 8 | 8 | | | f shared_preload_libraries | | | Client Connection Defaults / Shared Library Preloading | Lists shared libraries to preload into server. | | postmaster | string | default | | | | | | | | f wal_level | replica | | Write-Ahead Log / Settings | Set the level of information written to the WAL. | | postmaster | enum | default | | | {minimal,replica,logical} | replica | replica | | | f (5 rows) postgres=# select pg_reload_conf(); pg_reload_conf ---------------- t (1 row) postgres=# select * from pg_settings where name in ('wal_level','max_worker_processes','max_replication_slots','max_wal_senders','shared_preload_libraries'); name | setting | unit | category | short_desc | extra_desc | context | vartype | source | min_val | max_val | enumvals | boot_val | reset_val | sourcefile | sourceline | pending_restart --------------------------+---------+------+--------------------------------------------------------+-------------------------------------------------------------------------+------------+------------+---------+---------+---------+---------+---------------------------+----------+-----------+------------+------------+----------------- max_replication_slots | 10 | | Replication / Sending Servers | Sets the maximum number of simultaneously defined replication slots. | | postmaster | integer | default | 0 | 262143 | | 10 | 10 | | | f max_wal_senders | 10 | | Replication / Sending Servers | Sets the maximum number of simultaneously running WAL sender processes. | | postmaster | integer | default | 0 | 262143 | | 10 | 10 | | | f max_worker_processes | 8 | | Resource Usage / Asynchronous Behavior | Maximum number of concurrent worker processes. | | postmaster | integer | default | 0 | 262143 | | 8 | 8 | | | f shared_preload_libraries | | | Client Connection Defaults / Shared Library Preloading | Lists shared libraries to preload into server. | | postmaster | string | default | | | | | | | | t wal_level | replica | | Write-Ahead Log / Settings | Set the level of information written to the WAL. | | postmaster | enum | default | | | {minimal,replica,logical} | replica | replica | | | t (5 rows) [pg13@lhrpgall ~]$ pg_ctl restart waiting for server to shut down.... done server stopped waiting for server to start....2022-01-14 11:06:07.103 CST [1272] LOG: redirecting log output to logging collector process 2022-01-14 11:06:07.103 CST [1272] HINT: Future log output will appear in directory "pg_log". done server started postgres=# select * from pg_settings where name in ('wal_level','max_worker_processes','max_replication_slots','max_wal_senders','shared_preload_libraries'); name | setting | unit | category | short_desc | extra_desc | context | vartype | source | min_val | max_val | enumvals | boot_val | reset_val | sourcefile | sourceline | pending_restart --------------------------+-----------+------+--------------------------------------------------------+-------------------------------------------------------------------------+------------+------------+---------+--------------------+---------+---------+---------------------------+----------+-----------+-----------------------------------+------------+----------------- max_replication_slots | 10 | | Replication / Sending Servers | Sets the maximum number of simultaneously defined replication slots. | | postmaster | integer | default | 0 | 262143 | | 10 | 10 | | | f max_wal_senders | 10 | | Replication / Sending Servers | Sets the maximum number of simultaneously running WAL sender processes. | | postmaster | integer | default | 0 | 262143 | | 10 | 10 | | | f max_worker_processes | 8 | | Resource Usage / Asynchronous Behavior | Maximum number of concurrent worker processes. | | postmaster | integer | default | 0 | 262143 | | 8 | 8 | | | f shared_preload_libraries | pglogical | | Client Connection Defaults / Shared Library Preloading | Lists shared libraries to preload into server. | | postmaster | string | configuration file | | | | | pglogical | /pg13/pgdata/postgresql.auto.conf | 3 | f wal_level | logical | | Write-Ahead Log / Settings | Set the level of information written to the WAL. | | postmaster | enum | configuration file | | | {minimal,replica,logical} | replica | logical | /pg13/pgdata/postgresql.auto.conf | 4 | f (5 rows) |
如果你想要处理解决与上一次/第一次更新之间的冲突 wins(参阅冲突章节), 你的数据库版本需要为PostgreSQL 9.5+ (在9.4中无效) 您可以向 PostgreSQL.conf 添加此额外的选项:
1 2 | track_commit_timestamp = on # needed for last/first update wins conflict resolution # property available in PostgreSQL 9.5+ |
配置pg_hba.conf
pg_hba.conf 需要配置成允许从本地主机复制,用户拥有复制权限,连接权限。
1 | host replication postgres 网段ip/24 trust |
创建扩展
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 | postgres=# \dx List of installed extensions Name | Version | Schema | Description --------------------+---------+------------+------------------------------------------------------------------------ pageinspect | 1.8 | public | inspect the contents of database pages at a low level pg_recovery | 1.0 | public | recovery table data of update/delete/rollback rows and drop columns pg_stat_statements | 1.8 | public | track planning and execution statistics of all SQL statements executed plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language (4 rows) postgres=# create EXTENSION pglogical; CREATE EXTENSION postgres=# \dx List of installed extensions Name | Version | Schema | Description --------------------+---------+------------+------------------------------------------------------------------------ pageinspect | 1.8 | public | inspect the contents of database pages at a low level pg_recovery | 1.0 | public | recovery table data of update/delete/rollback rows and drop columns pg_stat_statements | 1.8 | public | track planning and execution statistics of all SQL statements executed pglogical | 2.4.1 | pglogical | PostgreSQL Logical Replication plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language (5 rows) postgres=# \dx+ pglogical Objects in extension "pglogical" Object description -------------------------------------------------------------------------------------------------- function pglogical.alter_node_add_interface(name,name,text) function pglogical.alter_node_drop_interface(name,name) function pglogical.alter_replication_set(name,boolean,boolean,boolean,boolean) function pglogical.alter_subscription_add_replication_set(name,name) function pglogical.alter_subscription_disable(name,boolean) function pglogical.alter_subscription_enable(name,boolean) function pglogical.alter_subscription_interface(name,name) function pglogical.alter_subscription_remove_replication_set(name,name) function pglogical.alter_subscription_resynchronize_table(name,regclass,boolean) function pglogical.alter_subscription_synchronize(name,boolean) function pglogical.create_node(name,text) function pglogical.create_replication_set(name,boolean,boolean,boolean,boolean) function pglogical.create_subscription(name,text,text[],boolean,boolean,text[],interval,boolean) function pglogical.drop_node(name,boolean) function pglogical.drop_replication_set(name,boolean) function pglogical.drop_subscription(name,boolean) function pglogical.pglogical_gen_slot_name(name,name,name) function pglogical.pglogical_max_proto_version() function pglogical.pglogical_min_proto_version() function pglogical.pglogical_node_info() function pglogical.pglogical_version() function pglogical.pglogical_version_num() function pglogical.queue_truncate() function pglogical.replicate_ddl_command(text,text[]) function pglogical.replication_set_add_all_sequences(name,text[],boolean) function pglogical.replication_set_add_all_tables(name,text[],boolean) function pglogical.replication_set_add_sequence(name,regclass,boolean) function pglogical.replication_set_add_table(name,regclass,boolean,text[],text) function pglogical.replication_set_remove_sequence(name,regclass) function pglogical.replication_set_remove_table(name,regclass) function pglogical.show_repset_table_info(regclass,text[]) function pglogical.show_subscription_status(name) function pglogical.show_subscription_table(name,regclass) function pglogical.synchronize_sequence(regclass) function pglogical.table_data_filtered(anyelement,regclass,text[]) function pglogical.wait_for_subscription_sync_complete(name) function pglogical.wait_for_table_sync_complete(name,regclass) function pglogical.wait_slot_confirm_lsn(name,pg_lsn) function pglogical.xact_commit_timestamp_origin(xid) table pglogical.depend table pglogical.local_node table pglogical.local_sync_status table pglogical.node table pglogical.node_interface table pglogical.queue table pglogical.replication_set table pglogical.replication_set_seq table pglogical.replication_set_table table pglogical.sequence_state table pglogical.subscription view pglogical.tables (51 rows) |
其它
数据字典
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | select * from pglogical.depend ; select * from pglogical.local_node ; select * from pglogical.local_sync_status ; select * from pglogical.node ; select * from pglogical.node_interface ; select * from pglogical.queue ; select * from pglogical.replication_set ; select * from pglogical.replication_set_seq ; select * from pglogical.replication_set_table ; select * from pglogical.sequence_state ; select * from pglogical.subscription ; select * from pglogical.tables ; select * from pglogical.show_subscription_status(); |
复制集
在复制集default中: update/delete/truncate 操作也是同步复制。
复制集 | INSERT | UPDATE | DELETE | TRUNCATE |
---|---|---|---|---|
default | √ | √ | √ | √ |
default_insert_only | √ | × | × | √ |
ddl_sql | √ | × | × | × |
1 2 3 4 5 6 7 | lhrdb=# select * from pglogical.replication_set ; set_id | set_nodeid | set_name | replicate_insert | replicate_update | replicate_delete | replicate_truncate ------------+------------+---------------------+------------------+------------------+------------------+-------------------- 3808633458 | 345938905 | default | t | t | t | t 1988720057 | 345938905 | default_insert_only | t | f | f | t 742713876 | 345938905 | ddl_sql | t | f | f | f (3 rows) |
复制特性扩展
延迟复制
1 | pglogical.create_subscription(subscription_name name, provider_dsn text, replication_sets text[], synchronize_structure boolean, synchronize_data boolean, forward_origins text[], apply_delay interval) |
参数:
- subscription_name - 订阅的名称,必须是唯一的
- provider_dsn - 提供者的连接字符串
- replication_sets - 要订阅的复制集数组,这些必须已存在,默认为“{default,default_insert_only,ddl_sql}”
- synchronize_structure - 指定是否将提供者与订阅者之间的结构同步,默认为false
- synchronize_data - 指定是否将数据从提供者同步到订阅者,默认为true
- forward_origins - 要转发的原始名称数组,当前只支持的值是空数组,意味着不转发任何不是源自提供者节点的更改,或“{all}”这意味着复制所有更改,无论它们的来源是什么,默认是全部}”
- apply_delay - 延迟复制多少,默认为0秒
示例:数据表结构同步;且延迟复制1分钟
1 2 3 4 5 6 | SELECT pglogical.create_subscription( subscription_name := 'subscription1', provider_dsn := 'host=192.168.1.221 port=5432 dbname=lhrdb', synchronize_structure := true, apply_delay := '00:01:00'::interval ); |
对源端进行 行/列 过滤
过滤机制需要 PostgreSQL 9.5 +
1 | pglogical.replication_set_add_table(set_name name, relation regclass, synchronize_data boolean, columns text [],row_filter text) |
参数:
- set_name - 现有复制集的名称
- relation - 要添加到集合中的表的名称或OID
- synchronize_data - 如果为true,则表数据将在订阅给定复制集的所有订户上同步,默认为false
- columns - 要复制的列的列表。通常,当应复制所有列时,这将设置为NULL,这是默认值
- row_filter - 行过滤表达式,默认为NULL(无过滤),有关详细信息,请参阅(行过滤)。警告:在使用有效行筛选器同步数据时要小心。使用synchronize_data=true有效row_filter就像对表的一次性操作。使用修改后再次执行它将row_filter不会将数据同步到订户。订阅者可能需要pglogical.alter_subscription_resynchronize_table()来修复它。
示例:对表tbl_lottu02中字段{id, name, job} 字段列过滤;且对条件 ‘id > 10’ 进行行过滤
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | # provider 节点 创建表并插入测试数据 create table tbl_lottu02 (id int primary key, name text, job text, reg_time timestamp ); insert into tbl_lottu02 select generate_series(1,20) id,'lottu'||generate_series(1,20),'pg', now(); # subscriber节点创建表; 可以只创建复制的列的数据表 create table tbl_lottu02 (id int primary key, name text, job text, reg_time timestamp ); # or create table tbl_lottu02 (id int primary key, name text, job text); #provider 节点 将表加入复制集中;并同步记录 lottu=# select pglogical.replication_set_add_table(set_name := 'default', relation := 'tbl_lottu02', synchronize_data := true, columns := '{id, name, job}',row_filter := 'id < 10'); replication_set_add_table --------------------------- t (1 row) # subscriber节点查看表tbl_lottu02记录 lottu=# select * from tbl_lottu02; id | name | job ----+--------+----- 1 | lhrdb1 | pg 2 | lhrdb2 | pg 3 | lhrdb3 | pg 4 | lhrdb4 | pg 5 | lhrdb5 | pg 6 | lhrdb6 | pg 7 | lhrdb7 | pg 8 | lhrdb8 | pg 9 | lhrdb9 | pg (9 rows) |
为新表自动分配复制集
事件触发器工具可用于描述为新创建的表定义复制集的规则。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | CREATE OR REPLACE FUNCTION pglogical_assign_repset() RETURNS event_trigger AS $$ DECLARE obj record; BEGIN FOR obj IN SELECT * FROM pg_event_trigger_ddl_commands() LOOP IF obj.object_type = 'table' THEN IF obj.schema_name = 'config' THEN PERFORM pglogical.replication_set_add_table('configuration', obj.objid); ELSIF NOT obj.in_extension THEN PERFORM pglogical.replication_set_add_table('default', obj.objid); END IF; END IF; END LOOP; END; $$ LANGUAGE plpgsql; CREATE EVENT TRIGGER pglogical_assign_repset_trg ON ddl_command_end WHEN TAG IN ('CREATE TABLE', 'CREATE TABLE AS') EXECUTE PROCEDURE pglogical_assign_repset(); |
冲突检测
冲突检测需要 PostgreSQL 9.5 +
如果节点订阅多个提供程序,或当本地写入在订阅服务器上发生,可能会发生冲突,尤其是对传入的变化。这些都自动检测,并可以就此采取行动取决于配置。
解决冲突的办法是通过配置 pglogical.conflict_resolution 参数。
pglogical.conflict_resolution 支持的配置参数选项为︰
- error - 复制将停止上错误如果检测到冲突和手动操作需要解决
- apply_remote - 总是应用与本地数据有冲突的更改,这是默认值
- keep_local - 保留数据的本地版本,并忽略来自远程节点相互冲突的更改
- last_update_wins - 时间戳为提交最新的版本(newest commit timestamp)的数据将会被保存(这可以是本地或远程版本)
- first_update_wins - 时间戳为最旧的版本(oldest timestamp)的数据将会被保存(这可以是本地或远程版本)
当参数track_commit_timestamp被禁用时,唯一允许的配置值是 apply_remote。 PostgreSQL 9.4 不支持 track_commit_timestamp 配置参数只能配置参数apply_remote(该参数是默认值)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | # 在 订阅者 节点配置;我们保留最新的数据 track_commit_timestamp = on pglogical.conflict_resolution = 'last_update_wins' # 在 订阅者 节点创建测试表tbl_lhrdb03 lottu=# create table tbl_lhrdb03(id int primary key, name text); CREATE TABLE lottu=# insert into tbl_lhrdb03 values (1001,'subscriber'); INSERT 0 1 # 在 发布者 节点 创建测试表 create table tbl_lhrdb03(id int primary key, name text); select pglogical.replication_set_add_table( set_name := 'default', relation := 'tbl_lhrdb03',synchronize_data := true); insert into tbl_lhrdb03 values (1001,'provider'); # 在 订阅者 节点 查看数据 lottu=# select * from tbl_lottu03; id | name ------+---------- 1001 | provider |
后记: 在订阅者的表需要主键约束;不然检测不到冲突;是否需要主键约束当然这个也是根据需求而定。
示例一:1个发布端,2个订阅端
现有实验环境
数据库版本 | 操作系统 | IP | 数据库 | 角色 |
---|---|---|---|---|
PostgreSQL 13.4 | Debian GNU/Linux 11 | 172.72.6.30 | lhrdb | provider |
PostgreSQL 13.4 | Debian GNU/Linux 11 | 172.72.6.31 | lhrdb | subscriber |
PostgreSQL 12.8 | Debian GNU/Linux 11 | 172.72.6.32 | lhrdb | subscriber |
环境准备
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | -- 创建专用网络 docker network create --subnet=172.72.6.0/24 pg-network -- PG13 docker rm -f lhrpg30 docker run -d --name lhrpg30 -h lhrpg30 \ -p 64330:5432 --net=pg-network --ip 172.72.6.30 \ -e POSTGRES_PASSWORD=lhr \ -e TZ=Asia/Shanghai \ postgres:13.4 docker rm -f lhrpg31 docker run -d --name lhrpg31 -h lhrpg31 \ -p 64331:5432 --net=pg-network --ip 172.72.6.31 \ -e POSTGRES_PASSWORD=lhr \ -e TZ=Asia/Shanghai \ postgres:13.4 -- PG12 docker rm -f lhrpg32 docker run -d --name lhrpg32 -h lhrpg32 \ -p 64332:5432 --net=pg-network --ip 172.72.6.32 \ -e POSTGRES_PASSWORD=lhr \ -e TZ=Asia/Shanghai \ postgres:12.8 -- 安装插件 -- PG12 apt update apt install -y curl curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/deb | bash apt-get install postgresql-12-pglogical -- PG13 apt update apt install -y curl curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/deb | bash apt-get install postgresql-13-pglogical -- 常用包 cp /etc/apt/sources.list /etc/apt/sources.list_bk cat > /etc/apt/sources.list <<"EOF" deb http://mirrors.aliyun.com/debian/ buster main non-free contrib deb http://mirrors.aliyun.com/debian-security buster/updates main deb http://mirrors.aliyun.com/debian/ buster-updates main non-free contrib deb http://mirrors.aliyun.com/debian/ buster-backports main non-free contrib deb-src http://mirrors.aliyun.com/debian-security buster/updates main deb-src http://mirrors.aliyun.com/debian/ buster main non-free contrib deb-src http://mirrors.aliyun.com/debian/ buster-updates main non-free contrib deb-src http://mirrors.aliyun.com/debian/ buster-backports main non-free contrib EOF apt-get install -y aptitude aptitude install -y curl wget iputils-ping procps net-tools lsb-release build-essential sysstat telnet aptitude install -y vim bzip2 gnupg2 libtinfo5 -- 3个PG库需要配置 psql -U postgres -h 192.168.66.35 -p 64330 psql -U postgres -h 192.168.66.35 -p 64331 psql -U postgres -h 192.168.66.35 -p 64332 alter system set shared_preload_libraries = 'pglogical'; alter system set wal_level = 'logical'; select pg_reload_conf(); select * from pg_settings where name in ('wal_level','max_worker_processes','max_replication_slots','max_wal_senders','shared_preload_libraries'); docker restart lhrpg30 lhrpg31 lhrpg32 create database lhrdb; \c lhrdb create EXTENSION pglogical; cat > /var/lib/postgresql/data/pg_hba.conf <<"EOF" local all all trust host all all 127.0.0.1/32 trust host all all ::1/128 trust local replication all trust host replication all 127.0.0.1/32 trust host replication all ::1/128 trust host replication postgres 172.72.6.0/24 trust host all all all md5 EOF select * from pg_hba_file_rules; -- 查看日志 docker logs -n 10 -f lhrpg30 docker logs -n 10 -f lhrpg31 docker logs -n 10 -f lhrpg32 |
提供者节点(发布端)配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | -- 1、创建节点:在数据库PG13里创建提供者节点 -- SELECT pglogical.drop_node(node_name := 'provider30'); SELECT pglogical.create_node( node_name := 'provider30', dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb' ); -- 查看 select * from pglogical.node_interface ; -- 2、创建复制集:将public架构中的所有表添加到default复制集中,复制集default的表都必需要primary key SELECT pglogical.replication_set_add_all_tables('default', ARRAY['public']); -- 创建完订阅者后,才会有复制槽 select * from pg_replication_slots ; |
运行过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | lhrdb=# SELECT pglogical.create_node( lhrdb(# node_name := 'provider30', lhrdb(# dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb' lhrdb(# ); create_node ------------- 2019914661 (1 row) lhrdb=# select * from pglogical.local_node ; node_id | node_local_interface ------------+---------------------- 2019914661 | 1955427756 (1 row) lhrdb=# select * from pglogical.node_interface ; if_id | if_name | if_nodeid | if_dsn ------------+------------+------------+----------------------------------------- 1955427756 | provider30 | 2019914661 | host=172.72.6.30 port=5432 dbname=lhrdb (1 row) lhrdb=# select * from pg_replication_slots ; slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size -----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+--------------- (0 rows) lhrdb=# lhrdb=# SELECT pglogical.replication_set_add_all_tables('default', ARRAY['public']); replication_set_add_all_tables -------------------------------- t (1 row) |
订阅者PG13节点配置
pglogical 可以同步表/序列结构;在创建订阅者 'pglogical.create_subscription' ; 里面参数synchronize_structure - 指定是否将提供者与订阅者之间的结构同步,默认为false。可以同步表/序列/索引,该功能仅限于同版本,不同版本会报错,具体可以根据情况来测试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | -- 1、创建节点,在数据库PG13创建订阅者节点 SELECT pglogical.create_node( node_name := 'subnode31', dsn := 'host=172.72.6.31 port=5432 dbname=lhrdb user=postgres password=lhr' ); -- 查询 select * from pglogical.node_interface ; -- 2、订阅提供者节点,该订阅将在后台启动同步和复制过程 SELECT pglogical.create_subscription( subscription_name := 'sub31', provider_dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb user=postgres password=lhr', synchronize_structure := true, synchronize_data := true ); -- 查询 select * from pglogical.node_interface ; select * from pglogical.subscription ; select * from pglogical.show_subscription_status(); -- 创建订阅这个命令执行完成后,会在“发布端”创建一个复制槽,这个很重要,一个订阅者一个复制槽 select * from pg_replication_slots ; -- 删除: 若要删除,则先删除订阅者,再删除节点 SELECT pglogical.drop_subscription(subscription_name := 'sub31'); SELECT pglogical.drop_node(node_name := 'subnode31'); |
过程:
订阅端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | lhrdb=# SELECT pglogical.create_node( lhrdb(# node_name := 'subnode31', lhrdb(# dsn := 'host=172.72.6.31 port=5432 dbname=lhrdb user=postgres password=lhr' lhrdb(# ); create_node ------------- 3125886449 (1 row) lhrdb=# lhrdb=# SELECT pglogical.create_subscription( lhrdb(# subscription_name := 'sub31', lhrdb(# provider_dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb user=postgres password=lhr', lhrdb(# synchronize_structure := true, lhrdb(# synchronize_data := true lhrdb(# ); create_subscription --------------------- 1804764769 (1 row) lhrdb=# select * from pglogical.local_node ; node_id | node_local_interface ------------+---------------------- 3125886449 | 450827623 (1 row) lhrdb=# select * from pglogical.node ; node_id | node_name ------------+------------ 3125886449 | subnode31 2019914661 | provider30 (2 rows) lhrdb=# select * from pglogical.node_interface ; if_id | if_name | if_nodeid | if_dsn ------------+------------+------------+--------------------------------------------------------------------- 450827623 | subnode31 | 3125886449 | host=172.72.6.31 port=5432 dbname=lhrdb user=postgres password=lhr 1955427756 | provider30 | 2019914661 | host=172.72.6.30 port=5432 dbname=lhrdb user=postgres password=lhr (2 rows) lhrdb=# select * from pglogical.subscription ; sub_id | sub_name | sub_origin | sub_target | sub_origin_if | sub_target_if | sub_enabled | sub_slot_name | sub_replication_sets | sub_forward_origins | sub_apply_delay | sub_force_text_transfer ------------+----------+------------+------------+---------------+---------------+-------------+----------------------------+---------------------------------------+---------------------+-----------------+------------------------- 1804764769 | sub31 | 2019914661 | 3125886449 | 1955427756 | 450827623 | t | pgl_lhrdb_provider30_sub31 | {default,default_insert_only,ddl_sql} | {all} | 00:00:00 | f (1 row) lhrdb=# lhrdb=# select * from pglogical.show_subscription_status(); subscription_name | status | provider_node | provider_dsn | slot_name | replication_sets | forward_origins -------------------+-------------+---------------+---------------------------------------------------------------------+----------------------------+---------------------------------------+----------------- sub31 | replicating | provider30 | host=172.72.6.30 port=5432 dbname=lhrdb user=postgres password=lhr | pgl_lhrdb_provider30_sub31 | {default,default_insert_only,ddl_sql} | {all} (1 row) |
发布端查询:
1 2 3 4 5 | lhrdb=# select * from pg_replication_slots ; slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size ----------------------------+------------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+--------------- pgl_lhrdb_provider30_sub31 | pglogical_output | logical | 16384 | lhrdb | f | t | 80 | | 489 | 0/166D078 | 0/166D0B0 | reserved | (1 row) |
发布端告警日志:
1 2 3 4 5 6 7 8 9 10 11 | 2022-01-14 20:24:05.544 CST [77] LOG: logical decoding found consistent point at 0/166D078 2022-01-14 20:24:05.544 CST [77] DETAIL: There are no running transactions. 2022-01-14 20:24:05.544 CST [77] STATEMENT: CREATE_REPLICATION_SLOT "pgl_lhrdb_provider30_sub31" LOGICAL pglogical_output 2022-01-14 20:24:05.544 CST [77] LOG: exported logical decoding snapshot: "00000007-00000002-1" with 0 transaction IDs 2022-01-14 20:24:05.544 CST [77] STATEMENT: CREATE_REPLICATION_SLOT "pgl_lhrdb_provider30_sub31" LOGICAL pglogical_output 2022-01-14 20:24:06.352 CST [80] LOG: starting logical decoding for slot "pgl_lhrdb_provider30_sub31" 2022-01-14 20:24:06.352 CST [80] DETAIL: Streaming transactions committing after 0/166D0B0, reading WAL from 0/166D078. 2022-01-14 20:24:06.352 CST [80] STATEMENT: START_REPLICATION SLOT "pgl_lhrdb_provider30_sub31" LOGICAL 0/166D0B0 (expected_encoding 'UTF8', min_proto_version '1', max_proto_version '1', startup_params_format '1', "binary.want_internal_basetypes" '1', "binary.want_binary_basetypes" '1', "binary.basetypes_major_version" '1300', "binary.sizeof_datum" '8', "binary.sizeof_int" '4', "binary.sizeof_long" '8', "binary.bigendian" '0', "binary.float4_byval" '0', "binary.float8_byval" '1', "binary.integer_datetimes" '0', "hooks.setup_function" 'pglogical.pglogical_hooks_setup', "pglogical.forward_origins" '"all"', "pglogical.replication_set_names" '"default",default_insert_only,ddl_sql', "relmeta_cache_size" '-1', pg_version '130005', pglogical_version '2.4.1', pglogical_version_num '20401', pglogical_apply_pid '108') 2022-01-14 20:24:06.353 CST [80] LOG: logical decoding found consistent point at 0/166D078 2022-01-14 20:24:06.353 CST [80] DETAIL: There are no running transactions. 2022-01-14 20:24:06.353 CST [80] STATEMENT: START_REPLICATION SLOT "pgl_lhrdb_provider30_sub31" LOGICAL 0/166D0B0 (expected_encoding 'UTF8', min_proto_version '1', max_proto_version '1', startup_params_format '1', "binary.want_internal_basetypes" '1', "binary.want_binary_basetypes" '1', "binary.basetypes_major_version" '1300', "binary.sizeof_datum" '8', "binary.sizeof_int" '4', "binary.sizeof_long" '8', "binary.bigendian" '0', "binary.float4_byval" '0', "binary.float8_byval" '1', "binary.integer_datetimes" '0', "hooks.setup_function" 'pglogical.pglogical_hooks_setup', "pglogical.forward_origins" '"all"', "pglogical.replication_set_names" '"default",default_insert_only,ddl_sql', "relmeta_cache_size" '-1', pg_version '130005', pglogical_version '2.4.1', pglogical_version_num '20401', pglogical_apply_pid '108') |
订阅端告警日志:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | 2022-01-14 20:23:49.062 CST [74] LOG: apply worker [74] at slot 2 generation 1 detaching cleanly 2022-01-14 20:23:49.067 CST [97] LOG: manager worker [97] at slot 1 generation 4 detaching cleanly 2022-01-14 20:23:49.075 CST [98] LOG: manager worker [98] at slot 1 generation 5 detaching cleanly 2022-01-14 20:23:49.290 CST [67] LOG: manager worker [67] at slot 0 generation 4 detaching cleanly 2022-01-14 20:23:49.296 CST [99] LOG: manager worker [99] at slot 1 generation 6 detaching cleanly 2022-01-14 20:23:49.299 CST [100] LOG: starting pglogical database manager for database lhrdb 2022-01-14 20:23:49.299 CST [100] LOG: manager worker [100] at slot 0 generation 5 detaching cleanly 2022-01-14 20:23:49.304 CST [101] LOG: manager worker [101] at slot 1 generation 7 detaching cleanly 2022-01-14 20:23:55.379 CST [102] LOG: manager worker [102] at slot 0 generation 6 detaching cleanly 2022-01-14 20:23:55.387 CST [103] LOG: starting pglogical database manager for database lhrdb 2022-01-14 20:23:56.390 CST [104] LOG: manager worker [104] at slot 1 generation 8 detaching cleanly 2022-01-14 20:24:05.077 CST [107] LOG: manager worker [107] at slot 1 generation 9 detaching cleanly 2022-01-14 20:24:05.077 CST [108] LOG: starting apply for subscription sub31 2022-01-14 20:24:05.085 CST [109] LOG: manager worker [109] at slot 1 generation 10 detaching cleanly |
订阅者PG12节点配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | -- 1、创建节点,在数据库PG12创建订阅者节点 SELECT pglogical.create_node( node_name := 'subnode32', dsn := 'host=172.72.6.32 port=5432 dbname=lhrdb user=postgres password=lhr' ); -- 查询 select * from pglogical.node_interface ; -- 2、订阅提供者节点,该订阅将在后台启动同步和复制过程 SELECT pglogical.create_subscription( subscription_name := 'sub32', provider_dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb user=postgres password=lhr' ); -- 查询 select * from pglogical.node_interface ; select * from pglogical.subscription ; select * from pglogical.show_subscription_status(); -- 创建订阅这个命令执行完成后,会在发布端创建一个复制槽,这个很重要,一个订阅者一个复制槽 select * from pg_replication_slots ; -- 删除: 若要删除,则先删除订阅者,再删除节点 SELECT pglogical.drop_subscription(subscription_name := 'sub32'); SELECT pglogical.drop_node(node_name := 'subnode32'); |
订阅端: