PostgreSQL Debezium/CDC Kafka Connector with Aurora RDS
This post uses Confluent based Kafka cluster and Kafka Connectors. Same steps will be applicable to Kafka Connect hosted by you or any other provider, though Confluent provides a nice visual interface.
I decided to write this post, as I was oblivious to some manual steps to enable CDC, which I hadn’t needed for Jdbc based Connectors.
Setting up Confluent ’s access for Aurora database
Before you start creating your database:
- Create custom cluster db parameter group: and set
rds.logical_replication
to 1. - This config applies at cluster level. Use it when creating your db cluster. If you have already created a db cluster, then apply it *Reboot the instances for this change to take effect* Afterwards, verify that the `wal_level` is `logical` (using any PostgreSQL client).
- Without this step, you will see issues like:
no pg_hba.conf entry ...
Next, ensure that Aurora database is publicly accessible.
Next, you need to set up its security group to allow Confluent egress.
Get your cluster’s egress IPs from Confluent, and add them in your Aurora database’s security group. This step is specific to Aws and may not exactly map to Azure or GCP.
Replication plugins and Replication Slot:
By default, Aurora PostgreSQL comes with plugin_name=’pgoutput’ and slot_name=’debezium’. You can verify it like this:
select slot_name, plugin, slot_type from pg_replication_slots;Output:slot_name plugin slot_type
debezium pgoutput logical
[Optional] Depending on your PostgreSQL version, you may see more or no slots. In which case, there are a whole list of replication plugins (wal2json, test_decoding, …), many of them supported by PostgreSQL. Call `pg_create_logical_replication_slot` function to create the replication slot. Later this slot and its plugin type will be referred to in the Confluent connector setting.
SELECT * FROM pg_create_logical_replication_slot('my_slot_name', 'wal2json')-- wal2json_rds plugin did *NOT* work for me
Create the CDC Connector
Create the PostgreSQL CDC connector in Confluent with the defaults.
Use a previously generated key/secret or generate new credentials for your kafka cluster (which is also hosted on Confluent).
Database server name is a logical label. It can be different from your actual database or host name. Confluent will use to prefix kafka topics, kafka connect schema etc.
In the next section, you will specify the plugin type and slot name, which was configured earlier in your database. If you choose to specify table include/exclude filters, add schema before the table names, e.g. public.events
instead of events
.
Also, if the default replication plugin and slots existed in your case, then you can use Plugin name=pgoutput
and Slot name=debezium
Next → Launch. You should notice the events flowing into your topics soon…
EDIT 1: Nov 3, 2021
Confluent has introduced SMT (Single Message Transformations) so I wanted to add an example here:
This is the schema in the database:
CREATE TABLE poc.outbox (
id text,
topic text,
payload text,
timestamp timestamp without time zone NOT NULL);
CREATE INDEX outbox_pk_idx ON poc.outbox(id text_ops);
CREATE INDEX outbox_timestamp_idx ON poc.outbox(timestamp timestamp_ops);
Inserting these values will create them in the respective topic: prod.trace and prod.events.
insert into poc.outbox(id, topic,payload,timestamp) values(‘23’, ‘prod.trace’, ‘c 1’, NOW());
insert into poc.outbox(id, topic,payload,timestamp) values(‘24’, ‘prod.trace’, ‘c 2’, NOW());
insert into poc.outbox(id, topic,payload,timestamp) values(‘23’, ‘prod.events’, ‘c 1 event’, NOW());
insert into poc.outbox(id, topic,payload,timestamp) values(‘24’, ‘prod.events’, ‘c 2 event’, NOW());
After that I was able to index these events into Elastic Search with minimal configuration:
Footnotes
- Debezium’s documentation regarding replication plugins and replication slot.