Kafka JDBC Connector with Aurora PostgreSQL
JDBC Connector is pretty straight forward to configure and relies on standard SQL, so following steps should apply to almost all database types and cloud providers. However, I am using Aws Aurora PostgreSQL database cluster with Confluent Kafka and Kafka Connect clusters. This involved some extra steps to create a network path between the two environments.
Setting up Confluent ’s access for Aurora database
Your Aurora database should be publicly accessible.
Next, you need to set up its security group to allow Confluent egress.
Get your cluster’s egress IPs from Confluent, and add them in your Aurora database’s security group. This step is specific to Aws and may not exactly map to Azure or GCP.
Create a table with timestamp and sequence based ID
Using your PostgreSQL client, create a table to test your setup. The timestamp column will be needed in your Connector setup
CREATE TABLE events (
id SERIAL PRIMARY KEY,
topic text,
payload text,
timestamp timestamp NOT NULL,
key text
);
-- Indices -------------------------------------------------------
CREATE INDEX timestamp_idx ON events(timestamp);
Setup the Connector
Use a previously generated key/secret or generate new credentials for your kafka cluster (which is also hosted on Confluent). The Kafka topics will be created with the specified topic prefix followed by the database table name:
In the Database details section, you will specify your table, and timestamp field. Initially, you can also experiment with leaving the Table names field blank.
NOTE: For me including schema name before the table name did NOT work for JDBC connector and it continued to complain that the table does not exist. On the other hand, for CDC Connector: omitting schema name did not work. So if the your connector complaints that the table could not be found then experiment with this. This discrepancy is most probably due to the connector code implementation and how they interact with different databases and not specific to Confluent.
PS: Some connector settings do not get updated after creating for the first time. So it is wise to start afresh if you feel like your config changes are not taking effect.
You should consider specifying *Delay Interval* for a real system. The connector continues to read data from its last checkpoint, so if it has seen a certain timestamp, it will not retrieve any rows with earlier timestamp if some rows were committed out of order. To work around this issue, we can make the connector only read the data which was committed before: current-time minus the Delay Interval.
Timezone issue
I wasted almost eight hours when I repeatedly checked my setup and relaxed the network and database access.
The connector was running without errors, however my rows were not showing up in the topics — until 7 hours later. The issue was that my database was running in UTC timezone, but I had selected “America/Los Angeles” in the JDBC connector. This perfectly explained the 7 hours delay as Pacific Daylight Savings Time lags UTC by 7 hours. Thanks Sami .