修复从 Oracle 发往亚马逊 Aurora PostgreSQL 或适用于 PostgreSQL 的亚马逊 RDS 的对象变更通知

从 Oracle 到 Amazon Aurora PostgreSQL 兼容版 或用 于 PostgreSQL 的 迁移到 亚马逊关系数据库服务 (Amazon RDS) 是一个多阶段的过程,涉及不同的技术和技能,从评估阶段到切换阶段。有关数据库迁移过程的更多信息,请参阅以下文章:

  • 数据库迁移——在开始之前你需要知道什么?
  • 迁移过程和基础架构注意事项
  • 源数据库注意事项
  • PostgreSQL 环境的目标数据库注意事项

对于 Oracle 数据库,如果注册了查询以获取 对象更改通知 (OCN) ,则无论查询结果是否发生变化,只要事务更改对象,数据库都会通知应用程序该查询引用和提交。

如果为查询注册了 查询结果更改通知 (QRCN), 则每当事务更改查询和提交的结果时,数据库都会通知应用程序。

数据库向内部队列添加一条描述更改的消息。客户端应用程序可以监听这些通知、转换数据并发布到其他下游应用程序。此功能是 Oracle 的内置产品;PostgreSQL 没有类似的内置解决方案。这是一个常见的问题,不能仅通过迁移来解决。

在这篇文章中,我们讨论了修复 从 Oracle 到 Amazon Aurora PostgreSQL 兼容版的 对象变更通知 的选项。

先决条件

要测试此解决方案,您需要满足以下先决条件:

  • 一个 亚马逊云科技 账户
  • 兼容 Amazon Aurora PostgreSQL 的版本实例
  • 具有安装 p g_recvlogical Postg reSQL 客户端实用程序的管理员权限的 亚马逊弹性计算云(亚马逊 EC2) 实例

解决方案概述

在考虑解决方案时,您需要考虑以下方面:

  • 交易中需要处理的变更和消息量
  • 客户端应用程序的高可用性,它连接到数据库服务器以使用消息/事件,以及数据持久性
  • 更改客户端应用程序以支持现代化

基于这些方面,考虑以下解决方案:

  • 带有队列表的数据库触发器,用于保存变更数据
  • 监听/通知功能
  • 辑解码 输出插

在以下部分中,我们将更详细地讨论每种解决方案。

带有队列表的数据库触发器,用于保存变更数据

数据库触发器是一种程序代码,它会自动运行以响应数据库中特定表或视图上的某些事件。通过触发方法,我们可以跟踪表列表上的 DML(插入、更新和删除)更改,并保留在数据库表(我们可以将其称为队列表)中,以实现灵活性和数据持久性。此队列表可以使用以下结构构造。

以下是用于跟踪表中的变更数据并保存到队列表的示例代码。

创建测试表和队列表:

CREATE TABLE EMPLOYEE (EMPID INTEGER, FNAME VARCHAR(50), LNAME VARCHAR(50), MGRID INTEGER );

CREATE TABLE MESSAGEQUEUE (MSGID  SERIAL, OPERATION VARCHAR(6), TABLENAME VARCHAR(63), CHANGEDATA JSON, MSGDATE TIMESTAMP, CONSUMED BOOLEAN DEFAULT FALSE NOT NULL );

创建触发器函数以将更改流式传输到队列表:

CREATE OR REPLACE FUNCTION TG_EMPLOYEE() RETURNS trigger AS
$$
BEGIN
IF (TG_OP = 'DELETE') THEN
        INSERT INTO MESSAGEQUEUE  (OPERATION,TABLENAME,CHANGEDATA,MSGDATE,CONSUMED)
                        VALUES( TG_OP,TG_TABLE_NAME,row_to_json(OLD.*),NOW(), 'NO');
           RETURN NEW;
       ELSIF (TG_OP = 'UPDATE') THEN
          INSERT INTO MESSAGEQUEUE  (OPERATION,TABLENAME,CHANGEDATA,MSGDATE,CONSUMED)
                        VALUES( TG_OP,TG_TABLE_NAME,row_to_json(NEW.*),NOW(), 'NO');
           RETURN NEW;
       ELSIF (TG_OP = 'INSERT') THEN
           INSERT INTO MESSAGEQUEUE  (OPERATION,TABLENAME,CHANGEDATA,MSGDATE,CONSUMED)
                        VALUES( TG_OP,TG_TABLE_NAME,row_to_json(NEW.*),NOW(), 'NO');
           RETURN NEW;
       END IF;
 RETURN NULL;
END;
$$
LANGUAGE plpgsql;

CREATE TRIGGER TG_EMPLOYEE
AFTER INSERT OR UPDATE OR DELETE ON EMPLOYEE
   FOR EACH ROW EXECUTE FUNCTION TG_EMPLOYEE();

执行 DML 操作:

mydb3=> insert into employee values ('99999','Ranga','C',10);
INSERT 0 1
mydb3=> insert into employee values ('11111','Nava','T',10);
INSERT 0 1
mydb3=> select * from employee;
 empid | fname | lname | mgrid
-------+-------+-------+-------
 99999 | Ranga | C     |    10
 11111 | Nava  | C     |    10
(2 rows)

查询 消息队列表。

mydb3=> select * from messagequeue;
 msgid | operation | tablename |                       changedata                       |          msgdate           | consumed
-------+-----------+-----------+--------------------------------------------------------+----------------------------+----------
     1 | INSERT    | employee  | {"empid":99999,"fname":"Ranga","lname":"C","mgrid":10} | 2022-09-23 03:17:08.552784 | f
     2 | INSERT    | employee  | {"empid":11111,"fname":"Nava","lname":"T","mgrid":10}  | 2022-09-23 03:17:22.350557 | f
(2 rows)

客户端应用程序轮询队列表中的变更数据,提取数据,然后将其发布到下游应用程序进行进一步处理。在这种情况下,客户端应用程序负责提取自上次检查点以来的数据。

注意事项

在考虑此解决方案时,请记住以下几点:

  • 仅跟踪选定表上的数据更改,而不是整个数据库
  • 以单笔交易中的数据变化量为例。如果更改很大,可以考虑使用语句级触发器
  • 将此解决方案用于基于民意调查的方法

该解决方案的一个好处是,变更数据会保留在队列表中,以保持耐久性和容错性。这样,即使出现故障,客户端应用程序也可以在最后一个检查点之后恢复和重放更改。但是,这种方法有以下缺点:

  • 触发器价格昂贵,并且会减慢交易速度,因为它们是同步的
  • 排队桌需要定期维护

听/通知

使用 LISTEN/NOTIFY ,多个听众可以观看单个频道的消息,并在父进程发出通知 时立即收到消息。 pg_not ify 是 PostgreSQL 中的一个内置函数,用于生成异步通知,客户端应用程序使用 LISTEN 来处理这些事件通知。您可以创建触发器来跟踪变更数据,并调用 pg_notify 函数以及 JSON 负载来生成事件通知

以下是用于跟踪表中的变更数据并保存到队列表的示例代码。

创建测试表:

CREATE TABLE EMPLOYEE (EMPID INTEGER, FNAME VARCHAR(50), LNAME VARCHAR(50), MGRID INTEGER );

创建触发函数以向频道发布更改:

CREATE OR REPLACE FUNCTION TG_EMPLOYEE() RETURNS trigger AS
$$
BEGIN
IF (TG_OP = 'DELETE') THEN
PERFORM PG_NOTIFY('EMPLOYEE'::TEXT, (SELECT ROW_TO_JSON(OLD.*)::TEXT));
RETURN NEW;
ELSIF (TG_OP = 'UPDATE') THEN
PERFORM PG_NOTIFY('EMPLOYEE'::TEXT, (SELECT ROW_TO_JSON(NEW.*)::TEXT));
RETURN NEW;
ELSIF (TG_OP = 'INSERT') THEN
PERFORM PG_NOTIFY('EMPLOYEE'::TEXT, (SELECT ROW_TO_JSON(NEW.*)::TEXT));
RETURN NEW;
END IF;
RETURN NULL;
END;
$$
LANGUAGE plpgsql;

CREATE TRIGGER TG_EMPLOYEE
AFTER INSERT OR UPDATE OR DELETE ON EMPLOYEE
FOR EACH ROW EXECUTE FUNCTION TG_EMPLOYEE();

执行 DML 操作:

mydb3=> insert into employee values ('99999','Ranga','C',10);
INSERT 0 1
mydb3=> insert into employee values ('11111','Nava','T',10);
INSERT 0 1
mydb3=> select * from employee;
empid | fname | lname | mgrid
-------+-------+-------+-------
99999 | Ranga | C     |    10
11111 | Nava  | T     |    10
(2 rows)

客户端应用程序在轮询时持续接收消息(对于这篇文章,我们使用 psql 客户端从频道中提取消息):

mydb3=> listen employee;
LISTEN
Asynchronous notification "employee" with payload "{"msgid":1,"operation":"INSERT","tablename":"employee","changedata":{"empid":99999,"fname":"Ranga","lname":"C","mgrid":10},"msgdate":"2022-09-23T03:57:50.301805"}" received from server process with PID 30191.
Asynchronous notification "employee" with payload "{"msgid":2,"operation":"INSERT","tablename":"employee","changedata":{"empid":11111,"fname":"Nava","lname":"T","mgrid":10},"msgdate":"2022-09-23T03:57:53.407565"}" received from server process with PID 30191.

注意事项

在考虑此解决方案时,请记住以下几点:

  • 仅跟踪选定表上的数据更改,而不是整个数据库
  • 以单笔交易中的数据变化量为例。如果更改很大,可以考虑使用语句级触发器
  • 使用此解决方案来实现基于推送的方法

这个解决方案的一个好处是 pg_notify 是异步的。但是,它有以下缺点:

  • 触发器价格昂贵,并且会减慢交易速度,因为它们是同步的
  • 如果客户端应用程序不监听,通知就会丢失

逻辑解码输出插件

逻辑解码 是将数据库表的持续更改提取为一种连贯、易于理解的格式的过程,这种格式无需详细了解数据库的内部状态即可进行解释。

在 PostgreSQL 中,逻辑解码是通过将描述存储级别变化的预写日志的内容解码为特定于应用程序的形式(例如元组流或 SQL 语句)来实现的。在逻辑复制的环境中,插槽代表了一系列更改,这些更改可以按照在源服务器上进行的顺序重播到客户端应用程序。每个插槽从单个数据库流式传输一系列更改。输出插件将数据从预写日志的内部表示形式转换为复制槽使用者所需的格式。传输这些更改的格式由使用的输出插件决定。

在此示例中,我们使用 wal2json 插件以 JSON 格式 流式传输表格的更改。 有关逻辑解码和输出插件的更多信息,请参阅 了解逻辑复制和逻辑解码和逻辑解码 插件。

创建测试表:

CREATE TABLE EMPLOYEE (EMPID INTEGER, FNAME VARCHAR(50), LNAME VARCHAR(50), MGRID INTEGER );

先决条件:

  • 兼容 Aurora PostgreSQL 的版本数据库集群
  • 使用 PostgreSQL 源代码的亚马逊 EC2 实例编译后获得 pg_recvlogical 二进制文件
  • 用于创建槽位的 rds_superuser 角色

pg_recvlog ical 是一个客户端实用程序,可以持续读取插槽中的内容并对其进行流式传输。客户有责任在进一步使用所需事件之前对其进行过滤。

使用 pg_recvlog ical 实用程序创建复制槽。

pg_recvlogical -h mydb3.c24eylcynj2u.us-east-1.rds.amazonaws.com -d mydb3 -U postgres -p 5432 -S employee_slot —create-slot -P wal2json

mydb3=> select * from pg_replication_slots;
slot_name   |  plugin  | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase
---------------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------
employee_slot | wal2json | logical   |  16397 | mydb3    | f         | t      |      31892 |      |         1247 | 5/10000028  | 5/10000060          | reserved   |               | f
(1 row)

在终端 1 中运行 pg_recvlogic al P ostgreSQL 客户端实用程序,然后在终 端 2 中运行 DML 语句 并验证流式传输更改。

mydb3=> insert into employee values ( 9999 , 'Ranga','C',10);
INSERT 0 1

sh-4.2$ ./pg_recvlogical -h mydb3.c24eylcynj2u.us-east-1.rds.amazonaws.com -d mydb3 -U postgres -p 5432 -S employee_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f -
Password:
{
"change": [
]
}
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "employee",
"columnnames": ["empid", "fname", "lname", "mgrid"],
"columntypes": ["integer", "character varying(50)", "character varying(50)", "integer"],
"columnvalues": [9999, "Ranga", "C", 10]
}
]
}

注意事项

在考虑此解决方案时,请记住以下几点:

  • 客户端应用程序需要筛选相关表的 DML 更改事件,并进一步传输这些 DML 更改事件
  • 当触发器导致的额外开销不可接受时,可以考虑使用此解决方案

此解决方案的一个好处是,您可以按照数据的生成顺序流式传输数据更改。但是,它有以下缺点:

  • 应为无法在单个表上配置的数据库集群启用逻辑复制
  • 孤立复制槽可能会消耗您的磁盘空间
  • 每个时段都需要过滤消费者所需的事件
  • 从复制槽中消耗更改后无法重新读取更改

清理

为了避免将来产生费用并删除在测试此用例时创建的组件,请完成以下步骤:

  1. 在 Amazon RDS 控制台上,选择您设置的数据库,然后在 “ 操作 ” 菜单上选择 “ 删除 ” 。
  2. 在 Amazon EC2 控制台上,选择您使用的 EC2 实例,然后在 操作 菜单上选择 终止

摘要

在这篇文章中,我们介绍了修复从甲骨文到亚马逊 Aurora PostgreSQL 兼容版的 Oracle 对象变更通知的多种解决方案。根据您的业务需求,例如要通知的表的数量、变更数据量以及构建客户端应用程序所涉及的工作量,您可以选择最可行的解决方案。

如果您有任何问题或建议,请发表评论。


作者简介

Ranga Cherukuri 是一名云数据库架构师,在 亚马逊云科技 拥有专业服务团队。Ranga 专注于帮助客户构建高度可用、经济实惠的数据库解决方案,并将其大规模 SQL Server 数据库迁移到 亚马逊云科技。他对数据库和分析充满热情。

Navakanth Talluri 是一名数据库迁移架构师,在 亚马逊云科技 拥有专业服务团队。他与亚马逊内部和外部客户合作,为数据库项目提供指导和技术支持,使他们能够从商用数据库引擎迁移到 Amazon RDS。


*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您发展海外业务和/或了解行业前沿技术选择推荐该服务。