Using Debezium, but cannot see changes in sink Postgres Docker container

Published

I am learning Debezium, and I am new to Docker as well. I want to replicate changes from a source Postgres database to a destination Postgres (empty). Both are in separate Docker containers. I am using Debezium to replicate the changes. When I run the Docker stack, and make changes in source Postgres, I can see those changes using Kafka console consumer, but I cannot see anything happening in the destination Postgres.

I am using docker-compose to set up all the containers, as below –

version: '2'
services:
  zookeeper:
    image: debezium/zookeeper:1.6
    container_name: zookeeper
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888
  kafka:
    image: debezium/kafka:1.6
    container_name: kafka
    ports:
      - 9092:9092
      - 29092:29092
    links:
      - zookeeper:zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=LISTENER_EXT://localhost:29092,LISTENER_INT://kafka:9092
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT
      - KAFKA_LISTENERS=LISTENER_INT://0.0.0.0:9092,LISTENER_EXT://0.0.0.0:29092
      - KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT
  postgres:
    image: postgres:11
    container_name: postgres
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - APP_DB_NAME=studentdb
    volumes:
      - ./postgresdb:/docker-entrypoint-initdb.d/
      - ./postgresconf/postgresql.conf.sample:/usr/share/postgresql/postgresql.conf.sample
  
  postgres-sink:
    image: postgres:11
    container_name: postgres-sink
    ports:
      - 5431:5431
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres

  connect:
    #image: debezium/connect:1.6
    #image: debezium/connect-jdbc-es:1.6
    build: .
    container_name: connect
    ports:
      - 8083:8083
      - 5005:5005
    links:
      - zookeeper:zookeeper
      - kafka:kafka
      - postgres:postgres
      - postgres-sink:postgres-sink
    environment:
      - GROUP_ID=1
      - BOOTSTRAP_SERVERS=kafka:9092
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses

All the containers start up fine.
My database in source Postgres is named ‘studentdb’ and in the table ‘students’ I have ‘id’ and ‘name’ columns, very basic stuff. The table ‘students’ falls under the default schema ‘public’.
Next, I am registering the source connector with the following configuration –

{
    "name": "student-connector-source",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "studentdb",
        "database.server.name": "dbserver1",
        "schema.include": "public",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false",
        "database.history.kafka.bootstrap.servers": "kafka:29092",
        "database.history.kafka.topic": "schema-changes.students_profiles",
        "plugin.name": "pgoutput",
        "publication.autocreate.mode": "all_tables",
        "publication.name": "my_publication",
        "snapshot.mode": "always"
    }
}

And the sink connector has the following configuration –

{
    "name": "jdbc-sink",
    "config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
      "tasks.max": "1",
      "topics": "dbserver1.public.students",
      "dialect.name": "PostgreSqlDatabaseDialect",
      "table.name.format": "students",
      "connection.url": "jdbc:postgresql://postgres-sink:5431/studentdb?user=postgres&password=postgres",
      "transforms": "unwrap",
      "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
      "transforms.unwrap.drop.tombstones": "false",
      "auto.create": "true",
      "insert.mode": "upsert",
      "pk.fields": "id",
      "pk.mode": "record_key",
      "delete.enabled": "true"
    }
  }

Both the connectors are created without giving any issue, and when I query Kafka for registered connectors, both the names show up.

Next I start up the Kafka console consumer to see if Kafka is receiving the data in its topic. It shows the table and values, and when I make changes, I can instantly see them. So, I know that the data changes are being streamed to Kafka topic without any issue.

So, my problem is that those changes are not getting streamed into the destination Postgres.

Any advice would be much appreciated!

——- ADDITIONAL INFO ———

The output when I register source connector –

HTTP/1.1 201 Created
Date: Thu, 07 Oct 2021 23:30:42 GMT
Location: http://localhost:8083/connectors/student-connector-source
Content-Type: application/json
Content-Length: 855
Server: Jetty(9.4.38.v20210224)

{"name":"student-connector-source","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"studentdb","database.server.name":"dbserver1","schema.include":"public","key.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"false","value.converter.schemas.enable":"false","database.history.kafka.bootstrap.servers":"kafka:29092","database.history.kafka.topic":"schema-changes.students_profiles","plugin.name":"pgoutput","publication.autocreate.mode":"all_tables","publication.name":"my_publication","snapshot.mode":"always","name":"student-connector-source"},"tasks":[],"type":"source"}

The output of the sink connector registration –

HTTP/1.1 201 Created
Date: Thu, 07 Oct 2021 23:30:45 GMT
Location: http://localhost:8083/connectors/jdbc-sink
Content-Type: application/json
Content-Length: 609
Server: Jetty(9.4.38.v20210224)

{"name":"jdbc-sink","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max":"1","topics":"dbserver1.public.students","dialect.name":"PostgreSqlDatabaseDialect","table.name.format":"students","connection.url":"jdbc:postgresql://postgres-sink:5431/postgres?user=postgres&password=postgres","transforms":"unwrap","transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones":"false","auto.create":"true","insert.mode":"upsert","pk.fields":"id","pk.mode":"record_key","delete.enabled":"true","name":"jdbc-sink"},"tasks":[],"type":"sink"}

My source database "students" table looks like this –

studentdb=# select * from students;
 id  |      name
-----+-----------------
 100 | John Doe
 101 | Barney Cobbles
 103 | Fred Flintstone
(3 rows)

When I add a new row, I get the following output in Kafka console consumer –

{"id":"104"}    {"before":null,"after":{"id":"104","name":"Jane Doe"},"source":{"version":"1.6.2.Final","connector":"postgresql","name":"dbserver1","ts_ms":1633653724508,"snapshot":"false","db":"studentdb","sequence":"["24608536","24608536"]","schema":"public","table":"students","txId":574,"lsn":24608592,"xmin":null},"op":"c","ts_ms":1633653724813,"transaction":null}

Querying the destination Postgres gives the following output –

postgres=# l
                                 List of databases
   Name    |  Owner   | Encoding |  Collate   |   Ctype    |   Access privileges
-----------+----------+----------+------------+------------+-----------------------
 postgres  | postgres | UTF8     | en_US.utf8 | en_US.utf8 |
 template0 | postgres | UTF8     | en_US.utf8 | en_US.utf8 | =c/postgres          +
           |          |          |            |            | postgres=CTc/postgres
 template1 | postgres | UTF8     | en_US.utf8 | en_US.utf8 | =c/postgres          +
           |          |          |            |            | postgres=CTc/postgres
(3 rows)

postgres=# c postgres
You are now connected to database "postgres" as user "postgres".
postgres=# dt
Did not find any relations.

So, the ‘studentdb’ database is not getting replicated, nor is there anything inside the default ‘postgres’ database.

Source: Docker Questions

Answers

Leave a Reply

Still Have Questions?


Our dedicated development team is here for you!

We can help you find answers to your question for as low as 5$.

Contact Us
faq