Running MQTT on custom Docker network

I am experimenting with custom Docker networks and Paho MQTT in Python. My goal is to create local pipelines linking some custom services together that run with/without a network connection and use the Docker DNS to find the MQTT broker. I mocked up a prototype.

In the code below, I provide a Dockerfile for the dependencies, docker commands and example source code with my logging outputs.

Create a new Docker network

docker network create --driver bridge my_pipeline

Start up an MQTT container on that network

# https://hub.docker.com/r/ncarlier/mqtt
docker run -d -P --name MQTT_Broker --network my_pipeline ncarlier/mqtt

Create a custom Docker pipeline node template having a Pycharm development environment for debugging (Dockerfile).

FROM ubuntu:16.04
RUN apt-get update 
 && apt-get -y upgrade 
 && apt-get install -y software-properties-common build-essential zlib1g-dev libncurses5-dev libgdbm-dev libnss3-dev 
 libssl-dev libreadline-dev libffi-dev wget nano
# TODO Add versions to minimize dependency issues later
ENV DEPS_DIR=/tmp
ENV AVAILABLE_PROCESSORS=8
# ===========================================================================================
# Install Python 3.7.2
# - https://websiteforstudents.com/installing-the-latest-python-3-7-on-ubuntu-16-04-18-04/
# ===========================================================================================
WORKDIR ${DEPS_DIR}/python
RUN wget https://www.python.org/ftp/python/3.7.2/Python-3.7.2.tar.xz
RUN tar -xf Python-3.7.2.tar.xz
WORKDIR ${DEPS_DIR}/python/Python-3.7.2
RUN ./configure --enable-shared
RUN make -j ${AVAILABLE_PROCESSORS} 
 && make altinstall
# https://linuxconfig.org/how-to-change-from-default-to-alternative-python-version-on-debian-linux
#RUN update-alternatives --install /usr/bin/python python /usr/local/bin/python3.7 1
#RUN update-alternatives --set python /usr/local/bin/python3.7
RUN ln -sf /usr/local/bin/python3.7 /usr/bin/python
RUN apt-get install -y python3-pip
RUN pip3 install --upgrade pip
RUN pip3 install --upgrade pip
RUN python -m pip install paho-mqtt numpy

# ===========================================================================================
RUN apt-get install -y git
RUN echo "---------------------------------------------------" 
&& echo "Internal Docker Container Pycharm Support" 
&& echo "---------------------------------------------------"
RUN apt-get install -y software-properties-common
RUN add-apt-repository -y ppa:openjdk-r/ppa
RUN apt-get update 
 && apt-get install -y openjdk-8-jdk 
 && apt-get install -y gnome-keyring
ARG GIT_USER='<my_name>'
ARG GIT_EMAIL='<my_email>'
RUN git config --global user.name ${GIT_USER} 
 && git config --global user.email ${GIT_EMAIL}
# Renamed the install file within the docker context to minimize script changes
ARG PYCHARM_INSTALLATION_FILE_NAME='pycharm-community-2019.1.1.tar.gz'
ARG PYCHARM_INSTALLATION_DIR='pycharm-community-2019.1.1'
COPY ./common/applications/${PYCHARM_INSTALLATION_FILE_NAME} /home/docker_context/apps/${PYCHARM_INSTALLATION_FILE_NAME}
WORKDIR /home/docker_context/apps
RUN tar xvfz ${PYCHARM_INSTALLATION_FILE_NAME} 
 && rm ${PYCHARM_INSTALLATION_FILE_NAME}
#RUN echo 'nalias pycharm="/home/docker_context/apps/${PYCHARM_INSTALLATION_DIR}/bin/pycharm.sh &"' >> /etc/bash.bashrc
RUN echo 'nalias pycharm="/home/docker_context/apps/pycharm-community-2019.1.1/bin/pycharm.sh &"' >> /etc/bash.bashrc
CMD [ "/bin/bash" ]

Should you choose to use Pycharm from within the Docker context, forward the UI

host_os> xhost +local:

Start a Docker container with my custom dependencies and forward the UI for easy debugging.

docker run -it --rm --name mqtt_test 
-v /home/me/config:/home/docker_context/config:ro 
-v /home/me/python_projects/Debug_Network_Communication/subproject_source_code:/home/docker_context/code:rw 
-v /home/me/temp/logs:/home/docker_context/logs:rw 
-e PYTHONPATH="/usr/local/lib/python3.7:/usr/local/lib/python3.7/site-packages:/home/docker_context/code" 
-v /tmp/.X11-unix:/tmp/.X11-unix:rw 
-e "DISPLAY=$DISPLAY" 
--network my_pipeline 
my_docker_image_with_pycharm_python_and_mqtt_deps

For source code, I built a small class for managing MQTT connections to assert consistency between sender/receivers that I call an MQTT Relay.

# Assuming:
# - Python >= 3
# - Paho-Mqtt version = 1.4.0
class MqttRelay(object):
    def __init__(self):
        self.topic = None
        self.client = None
        self.log = logging.getLogger(__name__)
    def set_config(self, config):
        self.client = mqtt.Client()
        self.topic = config['topic']
        port = config['port']
        mqtt_broker_ip = config['MQTT_BROKER_IP_ADDRESS']
        self.log.info('Topic: {}'.format(self.topic))
        self.log.info('Port: {}'.format(port))
        self.log.info('MQTT_BROKER_IP_ADDRESS: {}'.format(mqtt_broker_ip))
        # CALLBACK: ON SUCCESSFUL MQTT CONNECTION
        # Subscribing in on_connect() means that if we lose the connection and
        # reconnect then subscriptions will be renewed.
        # https://pypi.python.org/pypi/paho-mqtt/1.1#installation
        # ------------------------------------------------
        def on_mqtt_connect(client, userdata, flags, rc):
            if rc == 0:
                print('Connected to MQTT: {}'.format(rc))
                self.log.info('Connected to MQTT: {}'.format(rc))
                self.log.info('Using QoS=0')
                client.subscribe(topic=self.topic, qos=0)
        self.client.on_connect = on_mqtt_connect
        # SUBSCRIBE CALLBACK: MESSAGE RECEIVED BY THIS OBJECT (LISTENER)
        # - User registers a callback that decides what to do with the message and how to interpret it
        # -------------------------------------------------------
        on_msg_callback = config.get('callback_on_receive', None)
        if on_msg_callback is None:
            def on_mqtt_message_receive(client, userdata, msg):
                # print('Topic: {}'.format(msg.topic))
                pass
            self.client.on_message = on_mqtt_message_receive
        else:
            self.client.on_message = on_msg_callback
        # CONNECT TO MQTT
        # --------------------
        self.log.debug('Attempting MQTT connection to broker at: {}'.format(mqtt_broker_ip))
        try:
            self.client.connect(
                host=mqtt_broker_ip,
                port=port,
                keepalive=60
            )
        except:
            self.log.error('Unable to connect to MQTT: broker: {}'.format(mqtt_broker_ip))
            sys.exit(1)
        # READ EVENTS FROM MQTT QUEUE
        # -------------------------------
        self.log.info('Starting event loop')
        self.client.loop_start()
        # WAIT FOR SUCCESSFUL CONNECTION TO MQTT BROKER
        # - http://www.steves-internet-guide.com/client-connections-python-mqtt/
        # ------------------------------------------------
        # while not self.client.connected_flag:
        #     print('Waiting to connect to MQTT....')
        #     time.sleep(1)
    # PUBLISH
    # - Sends a "network_msg" over a predefined or custom topic.
    # - The network_msg is assumed to be rendered in a form such as primitive, JSON, or Pickle
    def send(self, network_msg, topic_override=None):
        if topic_override is None:
            self.log.debug('Send results: Output topic: {}'.format(self.topic))
            self.client.publish(topic=self.topic,
                                payload=network_msg)
        else:
            self.log.debug('Send results: Output topic: {}'.format(topic_override))
            self.client.publish(topic=topic_override,
                                payload=network_msg)
    def stop(self):
        self.client.disconnect()

Here’s my MQTT JSON Publisher:

import MQTT_Relay_Default as rly
import time
import json
import logging
import sys
import os
if __name__ == '__main__':
    # INITIALIZE SYSTEM SETTINGS
    # --------------------------
    docker_container_name = os.environ.get('DOCKER_CONTAINER_NAME', None)
    log_prefix = docker_container_name if docker_container_name is not None else 'UNKNOWN'
    sys_tuning_path = os.environ.get('SYSTEM_TUNING_DICT_JSON', None)
    python_script = os.path.basename(__file__)
    docker_logs = '/home/docker_context/logs'
    log_file = os.path.join(docker_logs, '{}_{}.log'.format(log_prefix, python_script))
    print('Writing: {}'.format(log_file))
    logging.basicConfig(filename=log_file,
                        filemode='a',
                        level=logging.DEBUG,
                        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
                        datefmt='%d-%b-%y %H:%M:%S')
    log = logging.getLogger(__name__)
    log.info('------START------')
    log.info('Script: {}'.format(os.path.realpath(__file__)))
    log.info('Docker Env Var: DOCKER_CONTAINER_NAME: {}'.format(docker_container_name))
    log.info('Docker Env Var: SYSTEM_TUNING_DICT_JSON: {}'.format(sys_tuning_path))
    if docker_container_name is None or sys_tuning_path is None:
        log.error('Unable to find required Env vars. Exiting.')
        sys.exit(1)
    log.info('Loading system tuning: {}'.format(sys_tuning_path))
    with open(sys_tuning_path, 'r') as fin:
        sys_tuning = json.load(fin)
    # log.info('Initialize MQTT')

    default_topic = 'debug'
    relay = rly.MqttRelay()
    relay.set_config(
        config={
            'topic': default_topic,
            'port': sys_tuning['mqtt']['port'],
            'MQTT_BROKER_IP_ADDRESS': 'MQTT_Broker',

        }
    )
    for idx in range(100):
        time.sleep(1)
        data = {
            'Packet_id': idx
        }
        network_msg = json.dumps(data)
        log.debug('SEND: {}'.format(network_msg))
        relay.send(network_msg)

Here’s the corresponding MQTT JSON Subscriber:

import json
import MQTT_Relay_Default as rly
import time
import logging
import sys
import os
class MQTTReceiver(object):
    def __init__(self, send_topic):
        self.relay = rly.MqttRelay()
        self.log = logging.getLogger(__name__)
        self.send_topic = send_topic
    def start(self, config):
        config['callback_on_receive'] = self.receive_transaction
        self.relay.set_config(config=config)
    # MQTT to MQTT
    def receive_transaction(self, client, userdata, msg):
        network_msg = msg.payload
        self.log.debug('RCV: {}'.format(network_msg))
        if self.send_topic is not None:
            self.relay.send(network_msg, topic_override=self.send_topic)
if __name__ == '__main__':
    # INITIALIZE SYSTEM SETTINGS
    # --------------------------
    docker_container_name = os.environ.get('DOCKER_CONTAINER_NAME', None)
    log_prefix = docker_container_name if docker_container_name is not None else 'UNKNOWN'
    sys_tuning_path = os.environ.get('SYSTEM_TUNING_DICT_JSON', None)
    python_script = os.path.basename(__file__)
    docker_logs = '/home/docker_context/logs'
    log_file = os.path.join(docker_logs, '{}_{}.log'.format(log_prefix, python_script))
    print('Writing: {}'.format(log_file))
    logging.basicConfig(filename=log_file,
                        filemode='a',
                        level=logging.DEBUG,
                        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
                        datefmt='%d-%b-%y %H:%M:%S')
    log = logging.getLogger(__name__)
    log.info('------START------')
    log.info('Script: {}'.format(os.path.realpath(__file__)))
    log.info('Docker Env Var: DOCKER_CONTAINER_NAME: {}'.format(docker_container_name))
    log.info('Docker Env Var: SYSTEM_TUNING_DICT_JSON: {}'.format(sys_tuning_path))
    if docker_container_name is None or sys_tuning_path is None:
        log.error('Unable to find required Env vars. Exiting.')
        sys.exit(1)
    log.info('Loading system tuning: {}'.format(sys_tuning_path))
    with open(sys_tuning_path, 'r') as fin:
        sys_tuning = json.load(fin)
    # log.info('Initialize MQTT')

    default_topic = 'debug'
    config = {
        'topic': default_topic,
        'port': sys_tuning['mqtt']['port'],
        'MQTT_BROKER_IP_ADDRESS': 'MQTT_Broker',
    }
    mq_rcv = MQTTReceiver(send_topic=default_topic)

    mq_rcv.start(config=config)
    while True:
        # do some blocking computation
        time.sleep(0.1)

My publisher is producing auto-incrementing JSON messages. Each time the subscriber queries the MQTT broker, I would expect a new distinct message ID to be observed by the subscriber.

Here we can see a snippet of the publisher log:

12-Aug-19 13:10:14 - MQTT_Relay_Default - INFO - Topic: debug
12-Aug-19 13:10:14 - MQTT_Relay_Default - INFO - Port: 1883
12-Aug-19 13:10:14 - MQTT_Relay_Default - INFO - MQTT_BROKER_IP_ADDRESS: MQTT_Broker
12-Aug-19 13:10:14 - MQTT_Relay_Default - DEBUG - Attempting MQTT connection to broker at: MQTT_Broker
12-Aug-19 13:10:15 - MQTT_Relay_Default - INFO - Starting event loop
12-Aug-19 13:10:15 - MQTT_Relay_Default - INFO - Connected to MQTT: 0
12-Aug-19 13:10:15 - MQTT_Relay_Default - INFO - Using QoS=0
12-Aug-19 13:10:16 - __main__ - DEBUG - SEND: {"Packet_id": 0}
12-Aug-19 13:10:16 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:10:17 - __main__ - DEBUG - SEND: {"Packet_id": 1}
12-Aug-19 13:10:17 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:10:18 - __main__ - DEBUG - SEND: {"Packet_id": 2}
12-Aug-19 13:10:18 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:10:19 - __main__ - DEBUG - SEND: {"Packet_id": 3}
12-Aug-19 13:10:19 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:10:20 - __main__ - DEBUG - SEND: {"Packet_id": 4}
12-Aug-19 13:10:20 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:10:21 - __main__ - DEBUG - SEND: {"Packet_id": 5}
12-Aug-19 13:10:21 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:10:22 - __main__ - DEBUG - SEND: {"Packet_id": 6}

Here is a snippet of the Subscriber log:

12-Aug-19 13:10:07 - MQTT_Relay_Default - INFO - Topic: debug
12-Aug-19 13:10:07 - MQTT_Relay_Default - INFO - Port: 1883
12-Aug-19 13:10:07 - MQTT_Relay_Default - INFO - MQTT_BROKER_IP_ADDRESS: MQTT_Broker
12-Aug-19 13:10:07 - MQTT_Relay_Default - DEBUG - Attempting MQTT connection to broker at: MQTT_Broker
12-Aug-19 13:10:07 - MQTT_Relay_Default - INFO - Starting event loop
12-Aug-19 13:10:07 - MQTT_Relay_Default - INFO - Connected to MQTT: 0
12-Aug-19 13:10:07 - MQTT_Relay_Default - INFO - Using QoS=0
12-Aug-19 13:10:16 - __main__ - DEBUG - RCV: b'{"Packet_id": 0}'
12-Aug-19 13:10:16 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:10:16 - __main__ - DEBUG - RCV: b'{"Packet_id": 0}'
12-Aug-19 13:10:16 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:10:16 - __main__ - DEBUG - RCV: b'{"Packet_id": 0}'
12-Aug-19 13:10:16 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:10:16 - __main__ - DEBUG - RCV: b'{"Packet_id": 0}'
12-Aug-19 13:10:16 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:10:16 - __main__ - DEBUG - RCV: b'{"Packet_id": 0}'
12-Aug-19 13:10:16 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:10:16 - __main__ - DEBUG - RCV: b'{"Packet_id": 0}'
12-Aug-19 13:10:16 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug

Eventually, you see this in the subscriber log:

12-Aug-19 13:12:04 - __main__ - DEBUG - RCV: b'{"Packet_id": 0}'
12-Aug-19 13:12:04 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:12:04 - __main__ - DEBUG - RCV: b'{"Packet_id": 2}'
12-Aug-19 13:12:04 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:12:04 - __main__ - DEBUG - RCV: b'{"Packet_id": 3}'
12-Aug-19 13:12:04 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:12:04 - __main__ - DEBUG - RCV: b'{"Packet_id": 4}'
12-Aug-19 13:12:04 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:12:04 - __main__ - DEBUG - RCV: b'{"Packet_id": 1}'
12-Aug-19 13:12:04 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:12:04 - __main__ - DEBUG - RCV: b'{"Packet_id": 0}'
12-Aug-19 13:12:04 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:12:04 - __main__ - DEBUG - RCV: b'{"Packet_id": 2}'
12-Aug-19 13:12:04 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:12:04 - __main__ - DEBUG - RCV: b'{"Packet_id": 3}'
12-Aug-19 13:12:04 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:12:04 - __main__ - DEBUG - RCV: b'{"Packet_id": 4}'
12-Aug-19 13:12:04 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:12:04 - __main__ - DEBUG - RCV: b'{"Packet_id": 1}'
12-Aug-19 13:12:04 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:12:04 - __main__ - DEBUG - RCV: b'{"Packet_id": 0}'
12-Aug-19 13:12:04 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:12:04 - __main__ - DEBUG - RCV: b'{"Packet_id": 2}'
12-Aug-19 13:12:04 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug
12-Aug-19 13:12:04 - __main__ - DEBUG - RCV: b'{"Packet_id": 3}'
12-Aug-19 13:12:04 - MQTT_Relay_Default - DEBUG - Send results: Output topic: debug

Anyone have any ideas why I might be seeing this behavior? Is MQTT thinking that I have a new subscriber each time the broker is queried?

Source: StackOverflow