diff --git a/.github/workflows/basic-mqtt-listener.yml b/.github/workflows/basic-mqtt-listener.yml index 2ee1134..5f42ac4 100644 --- a/.github/workflows/basic-mqtt-listener.yml +++ b/.github/workflows/basic-mqtt-listener.yml @@ -1,4 +1,4 @@ -name: Basic FastAPI Test container +name: Basic EMQX/MQTT Test container on: push: @@ -6,7 +6,7 @@ on: - 'labs/labs-internal/emqx/in_cluster_reader/**' # Specify path to your directory jobs: - build-and-push-py-test-image: + build-and-push-py-mqtt-test-image: runs-on: ubuntu-latest steps: diff --git a/labs/labs-internal/emqx/delete.sh b/labs/labs-internal/emqx/delete.sh new file mode 100755 index 0000000..d5d1729 --- /dev/null +++ b/labs/labs-internal/emqx/delete.sh @@ -0,0 +1 @@ +kubectl delete -f deployment.yaml \ No newline at end of file diff --git a/labs/labs-internal/emqx/deployment.yaml b/labs/labs-internal/emqx/deployment.yaml index e0b7b17..e7e1424 100644 --- a/labs/labs-internal/emqx/deployment.yaml +++ b/labs/labs-internal/emqx/deployment.yaml @@ -5,6 +5,7 @@ apiVersion: apps/v1 kind: Deployment metadata: name: emqx + namespace: mqtt-test labels: app: emqx spec: @@ -37,6 +38,7 @@ spec: apiVersion: v1 kind: Service metadata: + namespace: mqtt-test name: emqx-service spec: type: NodePort @@ -56,21 +58,22 @@ spec: apiVersion: apps/v1 kind: Deployment metadata: - name: in-cluster-reader + name: basic-mqtt-read + namespace: mqtt-test labels: - app: in-cluster-reader + app: basic-mqtt-read spec: replicas: 1 selector: matchLabels: - app: in-cluster-reader + app: basic-mqtt-read template: metadata: labels: - app: in-cluster-reader + app: basic-mqtt-read spec: containers: - - name: in-cluster-reader + - name: basic-mqtt-read image: aidamian/basic_mqtt_listener resources: requests: diff --git a/labs/labs-internal/emqx/external_writer/pubisher.py b/labs/labs-internal/emqx/external_writer/publisher.py similarity index 100% rename from labs/labs-internal/emqx/external_writer/pubisher.py rename to labs/labs-internal/emqx/external_writer/publisher.py diff --git a/labs/labs-internal/emqx/in_cluster_reader/reader.py b/labs/labs-internal/emqx/in_cluster_reader/reader.py index 338ad0c..411d78d 100644 --- a/labs/labs-internal/emqx/in_cluster_reader/reader.py +++ b/labs/labs-internal/emqx/in_cluster_reader/reader.py @@ -1,3 +1,4 @@ +import paho import paho.mqtt.client as mqtt import json from collections import defaultdict @@ -5,59 +6,82 @@ import time import os +from uuid import uuid4 + +print("Using paho-mqtt version", paho.__version__, flush=True) + # Configuration -MQTT_HOST = 'emqx-service' -MQTT_PORT = 1883 +MQTT_HOST = '192.168.1.55' #'emqx-service' +MQTT_PORT = 31883 #1883 TOPIC = '/topic_root/subtopic' -messages = defaultdict(int) - -def on_connect(client, userdata, flags, rc): - """Callback for when the client receives a CONNACK response from the server.""" - if rc == 0: - print(f"Connected successfully to {MQTT_HOST}:{MQTT_PORT}") - client.subscribe(TOPIC) # Move subscription to here to ensure it's effective after a successful connect - else: - print(f"Failed to connect, return code {rc}") - # Implement reconnection logic here if necessary - -def on_disconnect(client, userdata, rc): - """Callback for when the client disconnects from the broker.""" - if rc != 0: - print(f"Unexpected disconnection.") - # Implement reconnection logic here - -def on_message(client, userdata, msg): - """Handle incoming MQTT messages.""" - payload = json.loads(msg.payload) - sender = payload['sender'] - messages[sender] += int(payload['data'][0]) # Increment by the first character converted to int - -def display_stats(): - """Periodically display message stats.""" - while True: - for sender, sum_ in messages.items(): - print(f"{sender} -> sum -> {sum_}") - time.sleep(10) # Adjust as necessary + + +class SimpleListener: + def __init__(self): + self.listener_id = str(uuid4())[:5] + self.messages = defaultdict(int) + self.client = mqtt.Client( + client_id=self.listener_id, + clean_session=True + ) + # Assign the callbacks + self.client.on_connect = self.on_connect + self.client.on_disconnect = self.on_disconnect + self.client.on_message = self.on_message + return + + def on_connect(self, client, userdata, flags, rc): + """Callback for when the client receives a CONNACK response from the server.""" + if rc == 0: + print(f"Connected successfully to {MQTT_HOST}:{MQTT_PORT}") + client.subscribe(TOPIC) # Move subscription to here to ensure it's effective after a successful connect + else: + print(f"Failed to connect, return code {rc}") + # Implement reconnection logic here if necessary + return + + def on_disconnect(self, client, userdata, rc): + """Callback for when the client disconnects from the broker.""" + if rc != 0: + print(f"Unexpected disconnection.") + # Implement reconnection logic here + return + + def on_message(self, client, userdata, msg): + """Handle incoming MQTT messages.""" + payload = json.loads(msg.payload) + sender = payload['sender'] + self.messages[sender] += int(payload['data'][0]) # Increment by the first character converted to int + return + + def display_stats(self, ): + """Periodically display message stats.""" + while True: + for sender, sum_ in self.messages.items(): + print(f"{sender} -> sum -> {sum_}") + time.sleep(10) # Adjust as necessary + return + + def run(self): + try: + self.client.connect(MQTT_HOST, MQTT_PORT, 60) + except Exception as e: + print(f"Failed to connect to MQTT broker at {MQTT_HOST}:{MQTT_PORT}, error: {e}") + exit(1) # Exiting if connection is not successful + + stats_thread = threading.Thread(target=self.display_stats) + stats_thread.start() + + self.client.loop_forever() + return + if __name__ == '__main__': for k, v in os.environ.items(): if k.startswith('MQTT') or k.startswith('EMQX'): print(f"{k}={v}") - client = mqtt.Client() - # Assign the callbacks - client.on_connect = on_connect - client.on_disconnect = on_disconnect - client.on_message = on_message - - try: - client.connect(MQTT_HOST, MQTT_PORT, 60) - except Exception as e: - print(f"Failed to connect to MQTT broker at {MQTT_HOST}:{MQTT_PORT}, error: {e}") - exit(1) # Exiting if connection is not successful - - stats_thread = threading.Thread(target=display_stats) - stats_thread.start() - - client.loop_forever() + + listener = SimpleListener() + listener.run() \ No newline at end of file diff --git a/labs/labs-internal/emqx/log.sh b/labs/labs-internal/emqx/log.sh new file mode 100755 index 0000000..8c223a5 --- /dev/null +++ b/labs/labs-internal/emqx/log.sh @@ -0,0 +1,3 @@ +NAMESPACE=mqtt-test +POD_NAME=$(kubectl get pods -n $NAMESPACE -o jsonpath="{.items[0].metadata.name}") +kubectl logs -n $NAMESPACE $POD_NAME -f \ No newline at end of file diff --git a/labs/labs-internal/emqx/ns.yaml b/labs/labs-internal/emqx/ns.yaml new file mode 100644 index 0000000..8ce647c --- /dev/null +++ b/labs/labs-internal/emqx/ns.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: mqtt-test \ No newline at end of file diff --git a/labs/labs-internal/emqx/show.sh b/labs/labs-internal/emqx/show.sh new file mode 100755 index 0000000..f4bf0b4 --- /dev/null +++ b/labs/labs-internal/emqx/show.sh @@ -0,0 +1 @@ +kubectl get all -n mqtt-test diff --git a/labs/labs-internal/emqx/start.sh b/labs/labs-internal/emqx/start.sh new file mode 100755 index 0000000..b0c10d4 --- /dev/null +++ b/labs/labs-internal/emqx/start.sh @@ -0,0 +1 @@ +kubectl apply -f deployment.yaml \ No newline at end of file