Skip to content

Commit

Permalink
fix: various
Browse files Browse the repository at this point in the history
  • Loading branch information
andreiionutdamian committed Feb 19, 2024
1 parent a60558a commit 7c03222
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 167 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/basic-mqtt-listener.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Basic EMQX/MQTT Test container
on:
push:
paths:
- 'labs/labs-internal/emqx/in_cluster_reader/**' # Specify path to your directory
- 'labs/labs-internal/emqx/app/**' # Specify path to your directory

jobs:
build-and-push-py-mqtt-test-image:
Expand All @@ -24,6 +24,6 @@ jobs:

- name: Build and Push
run: |
cd labs/labs-internal/emqx/in_cluster_reader/
cd labs/labs-internal/emqx/app/
chmod +x build.sh # Ensure the script is executable
./build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ WORKDIR /app

RUN pip install paho-mqtt

COPY reader.py .
ADD src/reader.py .
ADD src/base.py .

CMD ["python", "reader.py"]
File renamed without changes.
Binary file not shown.
114 changes: 114 additions & 0 deletions labs/labs-internal/emqx/app/src/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import threading
import random
import string


from paho.mqtt import client as mqttc
from paho.mqtt import __version__ as mqtt_version

def generate_thread_id():
"""Generate a unique 5 character thread ID."""
return ''.join(random.choices(string.ascii_letters + string.digits, k=5))


class Printer:
def __init__(self):
self.lock = threading.Lock()
return

def print_message(self, message):
self.lock.acquire()
print(message, flush=True)
self.lock.release()
return


class BaseMQTT:
def __init__(self, logger, host, port, topic=None):
self.logger = logger
self.P(f"Using paho-mqtt version {mqtt_version}")
self.host = host
self.port = port
self.topic = topic
self.thread_id = generate_thread_id()
self.connected = self.__connect()
if not self.connected:
raise Exception("Failed to connect to MQTT broker")
return


def P(self, msg):
self.logger.print_message(msg)
return

def __connect(self):
result = False
try:
self.create_client()
self.P(f"Connecting to MQTT broker at {self.host}:{self.port}")
self.client.connect(self.host, self.port, 60)
result = True
except Exception as e:
self.P(f"Failed to connect to MQTT broker at {self.host}:{self.port}, error: {e}")
return result# Exiting if connection is not successful

# Paho MQTT client creation v1 vs v2
def create_client(self):
if mqtt_version.startswith('2'):
kwargs = dict(
callback_api_version=mqttc.CallbackAPIVersion.VERSION2,
client_id=self.thread_id,
clean_session=True,
)
else:
kwargs = mqttc.Client(
client_id=self.listener_id,
clean_session=True,
)
#endif v2 vs v1
self.client = mqttc.Client(**kwargs)
self.client.on_message = self.on_message
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
return


def publish(self, payload, topic):
"""Publish a message to the broker."""
res = self.client.publish(topic, payload)
# Check if publish was successful
if res.rc != mqttc.MQTT_ERR_SUCCESS:
self.P(f"Failed to publish message to {topic}, error code: {res.rc}")
result = True
else:
result = False
return result


def on_message(self, client, userdata, msg, *args, **kwargs):
"""Handle incoming MQTT messages."""
return


def on_connect(self, client, userdata, flags, reason_code, *args, **kwargs):
"""Callback for when the client receives a CONNACK response from the server."""
if reason_code == 0:
self.P(f"Connected successfully to {self.host}:{self.port}")
if self.topic:
# Move subsription to here to ensure it's effective after a successful connect
client.subscribe(self.topic)
else:
self.P(f"Failed to connect, return code {reason_code}")
# Implement reconnection logic here if necessary
return


def on_disconnect(self, client, userdata, reason_code, *args, **kwargs):
"""Callback for when the client disconnects from the broker."""
if reason_code != 0:
self.P(f"Unexpected disconnection. Code: {reason_code}")
# Implement reconnection logic here
else:
self.P(f"Disconnected from {self.host}:{self.port}")
return

61 changes: 61 additions & 0 deletions labs/labs-internal/emqx/app/src/publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import threading
import json
import time

from paho.mqtt import client as mqttc

from base import BaseMQTT, Printer



# Configuration
MQTT_HOST = '192.168.1.55'
MQTT_PORT = 31883
TOPIC = '/topic_root/subtopic'
THREAD_COUNT = 5 # Number of threads to spawn
package = "1" * 1_000_000


class Publisher(BaseMQTT):
def __init__(self, *args, **kwargs):
super(Publisher, self).__init__(*args, **kwargs)
self.count = 0
return

def publish_messages(self):
"""Publish messages with a specific structure."""
while True:
self.count += 1
payload = json.dumps({
"sender": self.thread_id,
"data": package, # 1 million times '1'
"id": self.count
})
self.publish(payload, TOPIC)
time.sleep(1) # Adjust as necessary
#endwhile
return

def run(self):
"""Run the publisher."""
thread = threading.Thread(target=self.publish_messages)
thread.start()
return

if __name__ == '__main__':
logger = Printer()

publishers = []
for _ in range(THREAD_COUNT):
publisher = Publisher(
logger=logger,
host=MQTT_HOST,
port=MQTT_PORT,
)
publishers.append(publisher)
publisher.run()

# Assuming you want to join threads, you need to store and join them correctly
for publisher in publishers:
# This would require storing the thread object in the Publisher class
publisher.thread.join() # Wait for all threads to finish
61 changes: 61 additions & 0 deletions labs/labs-internal/emqx/app/src/reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import json
from collections import defaultdict
import threading
import time
import os

from uuid import uuid4

from base import BaseMQTT, Printer


# Configuration
MQTT_HOST = '192.168.1.55' #'emqx-service'
MQTT_PORT = 31883 #1883
TOPIC = '/topic_root/subtopic'



class SimpleListener(BaseMQTT):
def __init__(self, *args, **kwargs):
super(SimpleListener, self).__init__(*args, **kwargs)
self.messages = defaultdict(int)
self.counters = {}
return

def on_message(self, client, userdata, msg, *args, **kwargs):
"""Handle incoming MQTT messages."""
payload = json.loads(msg.payload)
sender = payload['sender']
self.messages[sender] += int(payload['data'][0]) # Increment by the first character converted to int
self.counters[sender] = payload['id']
return

def display_stats(self, ):
"""Periodically display message stats."""
while True:
for sender, sum_ in self.messages.items():
self.P(f"{sender} -> sum -> {sum_} (id: {self.counters[sender]})")
time.sleep(10) # Adjust as necessary
return

def run(self):
stats_thread = threading.Thread(target=self.display_stats)
stats_thread.start()

self.client.loop_forever()
return


if __name__ == '__main__':
for k, v in os.environ.items():
if k.startswith('MQTT') or k.startswith('EMQX'):
print(f"{k}={v}")

listener = SimpleListener(
logger=Printer(),
host=MQTT_HOST,
port=MQTT_PORT,
topic=TOPIC,
)
listener.run()
77 changes: 0 additions & 77 deletions labs/labs-internal/emqx/external_writer/publisher.py

This file was deleted.

Loading

0 comments on commit 7c03222

Please sign in to comment.