Skip to content

Commit

Permalink
labs work
Browse files Browse the repository at this point in the history
  • Loading branch information
andreiionutdamian committed Feb 19, 2024
1 parent 4f991a0 commit a60558a
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 54 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/basic-mqtt-listener.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
name: Basic FastAPI Test container
name: Basic EMQX/MQTT Test container

on:
push:
paths:
- 'labs/labs-internal/emqx/in_cluster_reader/**' # Specify path to your directory

jobs:
build-and-push-py-test-image:
build-and-push-py-mqtt-test-image:
runs-on: ubuntu-latest

steps:
Expand Down
1 change: 1 addition & 0 deletions labs/labs-internal/emqx/delete.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
kubectl delete -f deployment.yaml
13 changes: 8 additions & 5 deletions labs/labs-internal/emqx/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ apiVersion: apps/v1
kind: Deployment
metadata:
name: emqx
namespace: mqtt-test
labels:
app: emqx
spec:
Expand Down Expand Up @@ -37,6 +38,7 @@ spec:
apiVersion: v1
kind: Service
metadata:
namespace: mqtt-test
name: emqx-service
spec:
type: NodePort
Expand All @@ -56,21 +58,22 @@ spec:
apiVersion: apps/v1
kind: Deployment
metadata:
name: in-cluster-reader
name: basic-mqtt-read
namespace: mqtt-test
labels:
app: in-cluster-reader
app: basic-mqtt-read
spec:
replicas: 1
selector:
matchLabels:
app: in-cluster-reader
app: basic-mqtt-read
template:
metadata:
labels:
app: in-cluster-reader
app: basic-mqtt-read
spec:
containers:
- name: in-cluster-reader
- name: basic-mqtt-read
image: aidamian/basic_mqtt_listener
resources:
requests:
Expand Down
118 changes: 71 additions & 47 deletions labs/labs-internal/emqx/in_cluster_reader/reader.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,87 @@
import paho
import paho.mqtt.client as mqtt
import json
from collections import defaultdict
import threading
import time
import os

from uuid import uuid4

print("Using paho-mqtt version", paho.__version__, flush=True)

# Configuration
MQTT_HOST = 'emqx-service'
MQTT_PORT = 1883
MQTT_HOST = '192.168.1.55' #'emqx-service'
MQTT_PORT = 31883 #1883
TOPIC = '/topic_root/subtopic'

messages = defaultdict(int)

def on_connect(client, userdata, flags, rc):
"""Callback for when the client receives a CONNACK response from the server."""
if rc == 0:
print(f"Connected successfully to {MQTT_HOST}:{MQTT_PORT}")
client.subscribe(TOPIC) # Move subscription to here to ensure it's effective after a successful connect
else:
print(f"Failed to connect, return code {rc}")
# Implement reconnection logic here if necessary

def on_disconnect(client, userdata, rc):
"""Callback for when the client disconnects from the broker."""
if rc != 0:
print(f"Unexpected disconnection.")
# Implement reconnection logic here

def on_message(client, userdata, msg):
"""Handle incoming MQTT messages."""
payload = json.loads(msg.payload)
sender = payload['sender']
messages[sender] += int(payload['data'][0]) # Increment by the first character converted to int

def display_stats():
"""Periodically display message stats."""
while True:
for sender, sum_ in messages.items():
print(f"{sender} -> sum -> {sum_}")
time.sleep(10) # Adjust as necessary


class SimpleListener:
def __init__(self):
self.listener_id = str(uuid4())[:5]
self.messages = defaultdict(int)
self.client = mqtt.Client(
client_id=self.listener_id,
clean_session=True
)
# Assign the callbacks
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
return

def on_connect(self, client, userdata, flags, rc):
"""Callback for when the client receives a CONNACK response from the server."""
if rc == 0:
print(f"Connected successfully to {MQTT_HOST}:{MQTT_PORT}")
client.subscribe(TOPIC) # Move subscription to here to ensure it's effective after a successful connect
else:
print(f"Failed to connect, return code {rc}")
# Implement reconnection logic here if necessary
return

def on_disconnect(self, client, userdata, rc):
"""Callback for when the client disconnects from the broker."""
if rc != 0:
print(f"Unexpected disconnection.")
# Implement reconnection logic here
return

def on_message(self, client, userdata, msg):
"""Handle incoming MQTT messages."""
payload = json.loads(msg.payload)
sender = payload['sender']
self.messages[sender] += int(payload['data'][0]) # Increment by the first character converted to int
return

def display_stats(self, ):
"""Periodically display message stats."""
while True:
for sender, sum_ in self.messages.items():
print(f"{sender} -> sum -> {sum_}")
time.sleep(10) # Adjust as necessary
return

def run(self):
try:
self.client.connect(MQTT_HOST, MQTT_PORT, 60)
except Exception as e:
print(f"Failed to connect to MQTT broker at {MQTT_HOST}:{MQTT_PORT}, error: {e}")
exit(1) # Exiting if connection is not successful

stats_thread = threading.Thread(target=self.display_stats)
stats_thread.start()

self.client.loop_forever()
return


if __name__ == '__main__':
for k, v in os.environ.items():
if k.startswith('MQTT') or k.startswith('EMQX'):
print(f"{k}={v}")

client = mqtt.Client()
# Assign the callbacks
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_message = on_message

try:
client.connect(MQTT_HOST, MQTT_PORT, 60)
except Exception as e:
print(f"Failed to connect to MQTT broker at {MQTT_HOST}:{MQTT_PORT}, error: {e}")
exit(1) # Exiting if connection is not successful

stats_thread = threading.Thread(target=display_stats)
stats_thread.start()

client.loop_forever()

listener = SimpleListener()
listener.run()
3 changes: 3 additions & 0 deletions labs/labs-internal/emqx/log.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
NAMESPACE=mqtt-test
POD_NAME=$(kubectl get pods -n $NAMESPACE -o jsonpath="{.items[0].metadata.name}")
kubectl logs -n $NAMESPACE $POD_NAME -f
4 changes: 4 additions & 0 deletions labs/labs-internal/emqx/ns.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
apiVersion: v1
kind: Namespace
metadata:
name: mqtt-test
1 change: 1 addition & 0 deletions labs/labs-internal/emqx/show.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
kubectl get all -n mqtt-test
1 change: 1 addition & 0 deletions labs/labs-internal/emqx/start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
kubectl apply -f deployment.yaml

0 comments on commit a60558a

Please sign in to comment.