How to keep all the settings configured even after restarting a machine with confluent kafka docker-compose configured?

Published

Here’s the docker-compose file I am using for kafka and ksqldb setup,

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:6.2.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'      

  schema-registry:
    image: confluentinc/cp-schema-registry:6.2.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081


  connect:
    image: confluentinc/kafka-connect-datagen:latest
    build:
      context: .
      dockerfile: Dockerfile
      extra_hosts:
        host.docker.internal: host-gateway
    extra_hosts:
      - "host.docker.internal:host-gateway"
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.2.0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR


  control-center:
    image: confluentinc/cp-enterprise-control-center:6.2.0
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021
      

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:6.2.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'


  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:6.2.0
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true
    
    
  rest-proxy:
    image: confluentinc/cp-kafka-rest:6.2.0
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

Here’s the kafka connect Dockerfile I used for custom SMT operations,

FROM confluentinc/cp-kafka-connect-base:6.2.0

RUN confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.5.1 
   && confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.5.0 
   && confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.2.0 
   && confluent-hub install --no-prompt debezium/debezium-connector-mongodb:1.5.0 
   && confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.0.6 
   && confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.2.0 
   && confluent-hub install --no-prompt confluentinc/connect-transforms:latest 
   && confluent-hub install --no-prompt redhatinsights/expandjsonsmt:0.0.7

I have added mysql as my source connector and elasticsearch as sink connector, I have used ksqldb for all transformation stuff, Here are the commands I ran in ksqldb.

After docker-compose up –build is success, I ran this command to go to ksqldb,

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

Once I’m in ksqldb, I ran the following code in same order as here

CREATE SOURCE CONNECTOR `source_mysql_connector` WITH( 
    "connector.class"= 'io.debezium.connector.mysql.MySqlConnector',
    "tasks.max"= '1',  
    "database.hostname"= '11.111.111.11',  
    "database.port"= '3306',
    "database.user"= 'secret',
    "database.password"= '#r&jWSQz',
    "database.server.id"= '1234',  
    "database.server.name"= 'test',  
    "database.include.list"= 'RF',
    "database.history.store.only.monitored.tables.ddl"='true',
    "include.schema.changes"= 'false',
    "time.precision.mode"= 'connect',
    "table.include.list"= 'RF.candidate',
    "column.include.list"= 'RF.candidate.organizationID,RF.candidate.first_name,RF.candidate.last_name,RF.candidate.experience,RF.candidate.candidateID,RF.candidate.source_name,RF.candidate.location,RF.candidate.title,RF.candidate.phone,RF.candidate.email,RF.candidate.status,RF.candidate.isActive,RF.candidate.immigration_status,RF.candidate.recUpdateDt',
    "database.history.kafka.bootstrap.servers"= 'broker:29092',  
    "database.history.kafka.topic"= 'schema-changes.RF',
    "database.allowPublicKeyRetrieval"='true',
    'transforms' = 'unwrap,Cast,RenameField',
    'transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',
    'transforms.RenameField.type' = 'org.apache.kafka.connect.transforms.ReplaceField$Key',
    'transforms.RenameField.renames' = 'candidateID:_id',
    'transforms.Cast.type'= 'org.apache.kafka.connect.transforms.Cast$Value',
    'transforms.Cast.spec'= 'organizationID:string',
    'key.converter'= 'org.apache.kafka.connect.json.JsonConverter',
    'key.converter.schemas.enable'= 'false',
    'value.converter'= 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url'= 'http://schema-registry:8081'
);

# reset the offset to first
SET 'auto.offset.reset' = 'earliest';

# create a input stream
CREATE STREAM candidate_input WITH (KAFKA_TOPIC='test.RF.candidate', KEY_FORMAT='JSON', VALUE_FORMAT='AVRO');
        
# create a target stream        
CREATE STREAM candidate_target WITH (KAFKA_TOPIC='test.RF.candidate.target', KEY_FORMAT='JSON', VALUE_FORMAT='AVRO') AS
    SELECT organizationID AS "organizationID", CONCAT(first_name, ' ', last_name) AS "name", first_name AS "first_name", last_name AS "last_name", experience AS "experience", candidateID AS "candidateID", source_name AS "source_name", location AS "location", title AS "title", phone AS "phone", email AS "email", status AS "status", isActive AS "isActive", immigration_status AS "immigration_status", recUpdateDt AS "recUpdateDt"
    FROM candidate_input;
    
# reset the offset to first
SET 'auto.offset.reset' = 'earliest';

# sink connector
CREATE SINK CONNECTOR `sink_elasticsearch` WITH (
'connector.class'                     = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'topics'                              = 'test.RF.candidate.target',
'key.converter'                       = 'org.apache.kafka.connect.json.JsonConverter',
'key.converter.schemas.enable'        = 'false',
'value.converter'                     = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'connection.url'                      = 'http://11.11.11.11:9200',
'connection.username'                 = 'secret',
'connection.password'                 = '0XWc=',
'transforms'                          = 'ValueFieldExample,expand,Cast,createKey,extractInt',
'transforms.Cast.type'                = 'org.apache.kafka.connect.transforms.Cast$Value',
'transforms.Cast.spec'                = 'phone:string',
'transforms.ValueFieldExample.type'   = 'io.confluent.connect.transforms.ExtractTopic$Value',
'transforms.ValueFieldExample.field'  = 'organizationID',
'transforms.createKey.type'           = 'org.apache.kafka.connect.transforms.ValueToKey',
'transforms.createKey.fields'         = 'candidateID',
'transforms.extractInt.type'          = 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.extractInt.field'         = 'candidateID',
'transforms.expand.type'              = 'com.redhat.insights.expandjsonsmt.ExpandJSON$Value',
'transforms.expand.sourceFields'      = 'location,experience',      
'schema.ignore'                       = 'true'
);

Lets say I have configured all this on my machine and everything is working. I can see the data is being transferred from my source mysql to elasticsearch sink.

Now the problem is if I turn of my PC, when I turn it back on. All the container will be turned off.

So I did, docker-compose up to bring up all services.

Now when I went to control center and checked all the topics created and ksqldb streams, connectors everything got deleted.

Is there a way I can keep intact every after bringing the machine down.

Source: Docker Questions

Answers

Leave a Reply

Still Have Questions?


Our dedicated development team is here for you!

We can help you find answers to your question for as low as 5$.

Contact Us
faq