Skip to content

Commit

Permalink
emqx demo
Browse files Browse the repository at this point in the history
  • Loading branch information
andreiionutdamian committed Feb 19, 2024
1 parent 000715d commit 20f65c0
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 27 deletions.
29 changes: 29 additions & 0 deletions .github/workflows/basic-mqtt-listener.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Basic FastAPI Test container

on:
push:
paths:
- 'labs/labs-internal/emqx/in_cluster_reader/**' # Specify path to your directory

jobs:
build-and-push-py-test-image:
runs-on: ubuntu-latest

steps:
- name: Checkout Repository
uses: actions/checkout@v2

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1

- name: Login to Docker Hub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKERHUB_USR }}
password: ${{ secrets.DOCKERHUB_PWD }}

- name: Build and Push
run: |
cd labs/labs-internal/emqx/in_cluster_reader/
chmod +x build.sh # Ensure the script is executable
./build.sh
35 changes: 35 additions & 0 deletions labs/labs-internal/emqx/deployment.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
###################################
# EMQX Deployment #
###################################
apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down Expand Up @@ -44,3 +47,35 @@ spec:
port: 1883
targetPort: 1883
nodePort: 31883 # Optional: Kubernetes chooses a port if not specified
---

###################################
# App Deployment #
###################################

apiVersion: apps/v1
kind: Deployment
metadata:
name: in-cluster-reader
labels:
app: in-cluster-reader
spec:
replicas: 1
selector:
matchLabels:
app: in-cluster-reader
template:
metadata:
labels:
app: in-cluster-reader
spec:
containers:
- name: in-cluster-reader
image: aidamian/basic_mqtt_listener
resources:
requests:
memory: "256Mi" # 256Mi = 0.25GB
cpu: "500m" # 500m = 0.5 cores
limits:
memory: "1024Mi" # 1024Mi = 1GB
cpu: "2000m" # 2000m = 2 cores
82 changes: 58 additions & 24 deletions labs/labs-internal/emqx/external_writer/pubisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,71 @@

# Configuration
MQTT_HOST = '192.168.1.55'
MQTT_PORT = 1883
MQTT_PORT = 31883
TOPIC = '/topic_root/subtopic'
THREAD_COUNT = 5 # Number of threads to spawn
package = "1" * 1_000_000

def publish_messages(thread_id):
"""Publish messages with a specific structure."""
client = mqtt.Client()
client.connect(MQTT_HOST, MQTT_PORT, 60)
count = 0
while True:
count += 1
payload = json.dumps({
"sender": thread_id,
"data": package, # 1 million times '1'
"id": count
})
client.publish(TOPIC, payload)
time.sleep(1) # Adjust as necessary

def generate_thread_id():
"""Generate a unique 5 character thread ID."""
return ''.join(random.choices(string.ascii_letters + string.digits, k=5))

if __name__ == '__main__':
threads = []
for _ in range(THREAD_COUNT):
thread_id = generate_thread_id()
thread = threading.Thread(target=publish_messages, args=(thread_id,))
class Publisher:
def __init__(self):
self.thread_id = generate_thread_id()
self.client = mqtt.Client()
# Define callback for successful connection
self.client.on_connect = self.on_connect
# Define callback for disconnection
self.client.on_disconnect = self.on_disconnect
try:
print("Connecting to MQTT broker at {MQTT_HOST}:{MQTT_PORT}", flush=True)
self.client.connect(MQTT_HOST, MQTT_PORT, 60)
except Exception as e:
print(f"Failed to connect to MQTT broker at {MQTT_HOST}:{MQTT_PORT}, error: {e}")
exit(1) # Exiting if connection is not successful
self.count = 0

def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print(f"Connected successfully to {MQTT_HOST}:{MQTT_PORT}")
else:
print(f"Failed to connect, return code {rc}")
# Implement reconnection logic here if necessary

def on_disconnect(self, client, userdata, rc):
print(f"Disconnected from MQTT broker with return code {rc}")
# Implement reconnection logic here

def publish_messages(self):
"""Publish messages with a specific structure."""
while True:
self.count += 1
payload = json.dumps({
"sender": self.thread_id,
"data": package, # 1 million times '1'
"id": self.count
})
result = self.client.publish(TOPIC, payload)
# Check if publish was successful
if result.rc != mqtt.MQTT_ERR_SUCCESS:
print(f"Failed to publish message to {TOPIC}, error code: {result.rc}")
time.sleep(1) # Adjust as necessary

def run(self):
"""Run the publisher."""
thread = threading.Thread(target=self.publish_messages)
thread.start()
threads.append(thread)
return

for thread in threads:
thread.join()
if __name__ == '__main__':
publishers = []
for _ in range(THREAD_COUNT):
publisher = Publisher()
publishers.append(publisher)
publisher.run()

# Assuming you want to join threads, you need to store and join them correctly
for publisher in publishers:
# This would require storing the thread object in the Publisher class
publisher.thread.join() # Wait for all threads to finish
9 changes: 9 additions & 0 deletions labs/labs-internal/emqx/in_cluster_reader/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM python:3.11-slim

WORKDIR /app

RUN pip install paho-mqtt

COPY reader.py .

CMD ["python", "reader.py"]
2 changes: 2 additions & 0 deletions labs/labs-internal/emqx/in_cluster_reader/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
docker build -t aidamian/basic_mqtt_listener .
docker push aidamian/basic_mqtt_listener
28 changes: 25 additions & 3 deletions labs/labs-internal/emqx/in_cluster_reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,21 @@

messages = defaultdict(int)

def on_connect(client, userdata, flags, rc):
"""Callback for when the client receives a CONNACK response from the server."""
if rc == 0:
print(f"Connected successfully to {MQTT_HOST}:{MQTT_PORT}")
client.subscribe(TOPIC) # Move subscription to here to ensure it's effective after a successful connect
else:
print(f"Failed to connect, return code {rc}")
# Implement reconnection logic here if necessary

def on_disconnect(client, userdata, rc):
"""Callback for when the client disconnects from the broker."""
if rc != 0:
print(f"Unexpected disconnection.")
# Implement reconnection logic here

def on_message(client, userdata, msg):
"""Handle incoming MQTT messages."""
payload = json.loads(msg.payload)
Expand All @@ -29,11 +44,18 @@ def display_stats():
for k, v in os.environ.items():
if k.startswith('MQTT') or k.startswith('EMQX'):
print(f"{k}={v}")

client = mqtt.Client()
# Assign the callbacks
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_message = on_message
client.connect(MQTT_HOST, MQTT_PORT, 60)
client.subscribe(TOPIC)

try:
client.connect(MQTT_HOST, MQTT_PORT, 60)
except Exception as e:
print(f"Failed to connect to MQTT broker at {MQTT_HOST}:{MQTT_PORT}, error: {e}")
exit(1) # Exiting if connection is not successful

stats_thread = threading.Thread(target=display_stats)
stats_thread.start()
Expand Down

0 comments on commit 20f65c0

Please sign in to comment.