Multiple Debezium Source connectors not working at a time

  apache-kafka, apache-kafka-connect, debezium, docker, mysql

Docker Compose

kafka:
image: confluentinc/cp-enterprise-kafka:6.0.0
container_name: kafka
depends_on:
  - zookeeper
ports:
  - 9092:9092
environment:
  KAFKA_BROKER_ID: 1
  KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
  KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
  KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
  KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
  KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
  KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
  KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
  KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
  CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
  CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
  CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
  CONFLUENT_METRICS_ENABLE: 'true'
  CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
  KAFKA_LOG_RETENTION_MS: 100000000 
  KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 5000

Connector 1: Debezium Source Connector (Like this I need 8 connectors for 8 tables)

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
    {
        "name": "mysql5-mdmembers",
        "config": {
            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            "tasks.max": "10",
            "database.hostname": "13.232.63.40",
            "database.port": "3307",
            "database.user": "root",
            "database.password": "secret",
            "database.server.id": "11",
            "database.server.name": "dbserver",
            "database.whitelist": "indianmo_imc_new",
            "table.whitelist": "indianmo_imc_new.md_members_cdc",
            "database.history.kafka.bootstrap.servers": "kafka:29092",
            "database.history.kafka.topic": "mysql5_md_members",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://schema-registry:8081",
            "value.converter.schema.registry.url": "http://schema-registry:8081",
            "transforms": "unwrap,dropTopicPrefix,selectFields,renameFields,addTopicPrefix,convertTS",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
            "transforms.dropTopicPrefix.regex":"dbserver.indianmo_imc_new.(.*)",
            "transforms.dropTopicPrefix.replacement":"$1",
            "transforms.selectFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
            "transforms.selectFields.whitelist": "mem_id,mem_mobile,mem_fname,mem_email,mem_gender,mem_dob,mem_marital_status,mem_state,mem_city,mem_zip,mem_primary_lang,mem_created_on,mem_updated_on,mem_tot_ttt,transfer_count,first_transfer_date,last_transfer_date,dnd_status",
            "transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
            "transforms.renameFields.renames": "mem_fname:mem_name,mem_primary_lang:mem_primary_language,mem_tot_ttt:mem_total_talktime,transfer_count:mem_transfer_count,first_transfer_date:mem_first_transferred_on,last_transfer_date:mem_last_transferred_on,dnd_status:mem_dnd_status",
            "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
            "transforms.addTopicPrefix.regex":"(.*)",
            "transforms.addTopicPrefix.replacement":"mdt_$1",
            "transforms.convertTS.type"       : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
            "transforms.convertTS.field"      : "mem_created_on,first_transfer_date,last_transfer_date",
            "transforms.convertTS.format"     : "YYYY-MM-dd H:mm:ss",
            "transforms.convertTS.target.type": "unix"
        }
    }'

Connector 2: Debezium source connector for same db

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
    {
        "name": "mysql5-connector2",
        "config": {
            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            "tasks.max": "2",
            "database.hostname": "13.232.63.40",
            "database.port": "3307",
            "database.user": "root",
            "database.password": "secret",
            "database.server.id": "11",
            "database.server.name": "dbserver",
            "database.whitelist": "indianmo_imc_new",
            "table.whitelist": "indianmo_imc_new.associate_leads_cdc",
            "database.history.kafka.bootstrap.servers": "kafka:29092",
            "database.history.kafka.topic": "mysql5table",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://schema-registry:8081",
            "value.converter.schema.registry.url": "http://schema-registry:8081",
            "transforms": "unwrap,dropTopicPrefix,selectFields,renameFields,addTopicPrefix,convertTS",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
            "transforms.dropTopicPrefix.regex":"dbserver.indianmo_imc_new.(.*)",
            "transforms.dropTopicPrefix.replacement":"$1",
            "transforms.selectFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
            "transforms.selectFields.whitelist": "AL_Id,A_Id,L_Id,cityID,rtitle,lead_price,selling_price,lead_trans_type,R_Id,rank,bought_by,bought_ip,leadSentBy,pushed_date,pushed_on,tbSendDate,tbSendOn",
            "transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
            "transforms.renameFields.renames": "AL_Id:al_id,A_Id:associate_id,L_Id:mem_id,cityID:city_id,rtitle:product_id,lead_price:lead_actual_price,selling_price:lead_selling_price,lead_trans_type:lead_type_msql5,R_Id:requirement_id,rank:lead_rating,bought_by:lead_bought_by,bought_ip:lead_bought_ip,leadSentBy:lead_sent_by,pushed_date:lead_pushed_date,pushed_on:lead_pushed_on,tbSendDate:lead_sold_date,tbSendOn:lead_sold_on",
            "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
            "transforms.addTopicPrefix.regex":"(.*)",
            "transforms.addTopicPrefix.replacement":"ldt_lm_$1",
            "transforms.convertTS.type"       : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
            "transforms.convertTS.field"      : "pushed_on,tbSendOn",
            "transforms.convertTS.format"     : "YYYY-MM-dd H:mm:ss",
            "transforms.convertTS.target.type": "unix"
        }
    }'

Sink Connector (Like this I need to sink into 8 tables)

curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config 
    -H "Content-Type: application/json" -d '{
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://65.0.213.250:3306/demo",
        "topics": "ldt_lm_associate_leads_cdc",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "connection.user": "root",
        "connection.password": "secret",
        "auto.create": true,
        "auto.evolve": true,
        "insert.mode": "upsert",
        "delete.enabled": true,
        "pk.mode": "record_key",
        "pk.fields": "al_id",
        "transforms": "RenameKey",
        "transforms.RenameKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
        "transforms.RenameKey.renames": "AL_Id:al_id"
    }'

The error I am geeting from kafka connect is

2021-02-10 13:14:45,326] INFO [mysql5-connector2|task-0] Connector task finished all work and is now shutdown (io.debezium.connector.mysql.MySqlConnectorTask:496)

I am having one broker here.
If I use multiple brokers will my problem solve?
What will be the docker yml configuration for multiple brokers?
I have multiple tables in source db. I want to sink all. For that only I want multiple connectors. But whenever i am running multiple source connectors it is stopping previous connector (Just used two source connectors and it is having problem, I need at least 8 source and 8 sink connectors). What should I do please help. Thanks in advance!

Source: Docker Questions

LEAVE A COMMENT