Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Advanced PUB/SUB example with multithreaded fast subscribers for realtime processing #34

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ Additional Documentation
- `Using imageZMQ in distributed computer vision projects <docs/imagezmq-uses.rst>`_
- `REQ/REP versus PUB/SUB Messaging Patterns <docs/req-vs-pub.rst>`_
- `Advanced example using both messaging patterns in an HTTP streaming application <docs/advanced-pub-sub.rst>`_
- `Advanced PUB/SUB example with multithreaded fast subscribers for realtime processing <docs/fast-pub-sub.rst>`_
- How **imageZMQ** is used in my own projects connecting multiple
Raspberry Pi **imagenodes** to an **imagehub**:

Expand All @@ -380,15 +381,17 @@ Contributors
============
Thanks for all contributions big and small. Some significant ones:

+------------------------+---------------+--------------------------------------------------+
| **Contribution** | **Name** | **GitHub** |
+------------------------+---------------+--------------------------------------------------+
| Initial code & docs | Jeff Bass | `@jeffbass <https://github.com/jeffbass>`_ |
+------------------------+---------------+--------------------------------------------------+
| Added PUB / SUB option | Maksym | `@bigdaddymax <https://github.com/bigdaddymax>`_ |
+------------------------+---------------+--------------------------------------------------+
| HTTP Streaming example | Maksym | `@bigdaddymax <https://github.com/bigdaddymax>`_ |
+------------------------+---------------+--------------------------------------------------+
+------------------------+-----------------+----------------------------------------------------------+
| **Contribution** | **Name** | **GitHub** |
+------------------------+-----------------+----------------------------------------------------------+
| Initial code & docs | Jeff Bass | `@jeffbass <https://github.com/jeffbass>`_ |
+------------------------+-----------------+----------------------------------------------------------+
| Added PUB / SUB option | Maksym | `@bigdaddymax <https://github.com/bigdaddymax>`_ |
+------------------------+-----------------+----------------------------------------------------------+
| HTTP Streaming example | Maksym | `@bigdaddymax <https://github.com/bigdaddymax>`_ |
+------------------------+-----------------+----------------------------------------------------------+
| Fast PUB / SUB example | Philipp Schmidt | `@philipp-schmidt <https://github.com/philipp-schmidt>`_ |
+------------------------+-----------------+----------------------------------------------------------+

Helpful Forks of imageZMQ
=========================
Expand Down
69 changes: 69 additions & 0 deletions docs/fast-pub-sub.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
===============================================================================
PUB/SUB Multithreaded Fast Subscribers for Realtime Processing
===============================================================================

When using the PUB/SUB pattern, the receiver of the frames will always receive
all frames of the publisher. This works as long as the receiver can keep up
with the incoming data. If the receiver needs to do some processing work on the
frames (motion detection, edge detection, maybe even object detection using CNNs)
it can fall behind and will not process the most recent frames from the publisher,
but whatever is still in the receive queue of the zmq socket.

To make sure such a receiver always processes the most recent frames from the publisher,
one could connect, receive a frame and disconnect immediately, to ensure its the most recent frame.
However, this might neither be viable nor elegant, as every connect will introduce an additional delay (e.g. TCP handshake roundtriptime).

A better approach (if network bandwidth is not most concerning) is to keep the socket open,
receive every frame in a dedicated IO thread, but only process the most recent one in a processing thread.

Fast Pub Sub Subscriber
=======================

.. code-block:: python
:number-lines:

class VideoStreamSubscriber:

def __init__(self, hostname, port):
self.hostname = hostname
self.port = port
self._stop = False
self._data_ready = threading.Event()
self._thread = threading.Thread(target=self._run, args=())
self._thread.daemon = True
self._thread.start()

def receive(self, timeout=15.0):
flag = self._data_ready.wait(timeout=timeout)
if not flag:
raise TimeoutError(
f"Timeout while reading from subscriber tcp://{self.hostname}:{self.port}")
self._data_ready.clear()
return self._data

def _run(self):
receiver = imagezmq.ImageHub(f"tcp://{self.hostname}:{self.port}", REQ_REP=False)
while not self._stop:
self._data = receiver.recv_jpg()
self._data_ready.set()
# Close socket here, not implemented in ImageHub :(
# zmq_socket.close()

def close(self):
self._stop = True

This helper class creates a sub socket in a dedicated IO thread and signals new data via an event.
The main thread can read the most recent frame by calling receive().

A timeout can be configured, after which the connection must be considered down.
Keep in mind that in line with the zmq socket behaviour, there is no way of checking whether the connection was
established successfully. If the first call to receive creates a timeout, the connection might not have been established
or the pusblisher is not sending frames (...fast enough?).

The event synchronisation in this class makes sure a single frame will never be read twice.

Please note that this class is not threadsafe, as there is only a single event for new data.

For a full example see `pub_sub_receive.py <../examples/pub_sub_receive.py>`_ and `pub_sub_broadcast.py <../examples/pub_sub_broadcast.py>`_

`Return to main documentation page README.rst <../README.rst>`_
92 changes: 39 additions & 53 deletions examples/pub_sub_broadcast.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,48 @@
"""pub_sub_broadcast.py -- broadcast PiCamera image stream using PUB SUB.

A Raspberry Pi program ...
does some interesting stuf.... Images are jpg compressed before sending.

This program requires .... Brief
test instructions are in the /docs/pub_sub_broadcast.rst file

This program can ... and it can ...
"""

# NOTE: all the code below is just an example for formatting; replace it
# with your *broadcast.py code
"""pub_sub_broadcast.py -- broadcast OpenCV stream using PUB SUB."""

import sys

import socket
import time
import traceback
import cv2
from imutils.video import VideoStream
import imagezmq
import RPi.GPIO as GPIO

# use either of the formats below to specifiy address of display computer
sender = imagezmq.ImageSender(connect_to='tcp://jeff-macbook:5555')
# sender = imagezmq.ImageSender(connect_to='tcp://192.168.1.190:5555')

# optionally, turn on the LED area lighting
use_led = False # set to True or False as needed
# optionally, filp the image vertically
flip = True # set to True of False as needed

if use_led:
GPIO.setmode(GPIO.BCM)
GPIO.setup(18, GPIO.OUT)
GPIO.output(18, True) # turn on LEDs

rpi_name = socket.gethostname() # send RPi hostname with each image
picam = VideoStream(usePiCamera=True).start()
time.sleep(2.0) # allow camera sensor to warm up
jpeg_quality = 95 # 0 to 100, higher is better quality, 95 is cv2 default
try:
while True: # send images as stream until Ctrl-C
image = picam.read()
if flip:
image = cv2.flip(image, -1)
ret_code, jpg_buffer = cv2.imencode(
".jpg", image, [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality])
sender.send_jpg(rpi_name, jpg_buffer)
except (KeyboardInterrupt, SystemExit):
pass # Ctrl-C was pressed to end program
except Exception as ex:
print('Python error with no Exception handler:')
print('Traceback error:', ex)
traceback.print_exc()
finally:
if use_led:
GPIO.output(18, False) # turn off LEDs
GPIO.cleanup() # close GPIO channel and release it
picam.stop()
sys.exit()
if __name__ == "__main__":
# Publish on port
port = 5555
sender = imagezmq.ImageSender(f"tcp://*:{port}", REQ_REP=False)

# Open input stream
# First available cam in this case for simplicity, you can use any stream url (rtsp, mjpg, etc...)
capture = VideoStream()
capture.start()
print("Input stream opened")

# JPEG quality, 0 - 100
jpeg_quality = 95

# Send RPi hostname with each image
# This might be unnecessary in this pub sub mode, as the receiver will already need to know our address
# and can therefore distinguish streams
# Keeping it anyway in case you wanna send a meaningful tag or something (or have a many to many setup)
rpi_name = socket.gethostname()

try:
counter = 0
while True:
frame = capture.read()
ret_code, jpg_buffer = cv2.imencode(
".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality])
sender.send_jpg(rpi_name, jpg_buffer)
print(f"Sent frame {counter}")
counter = counter + 1
except (KeyboardInterrupt, SystemExit):
print('Exit due to keyboard interrupt')
except Exception as ex:
print('Python error with no Exception handler:')
print('Traceback error:', ex)
traceback.print_exc()
finally:
capture.stop()
sys.exit()
124 changes: 69 additions & 55 deletions examples/pub_sub_receive.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,77 @@
"""pub_sub_receive.py -- receive PiCamera image stream using PUB SUB.

An image receiving program ...
does some interesting stuf.... Images are jpg compressed, so this program
decompresses them after receipt.

This program requires .... Brief
test instructions are in the /docs/pub_sub_broadcast.rst file

This program can ... and it can ...
"""

# NOTE: all the code below is just an example for formatting; replace it
# with your *receive.py code
"""pub_sub_receive.py -- receive OpenCV stream using PUB SUB."""

import sys

import socket
import time
import traceback
import cv2
from imutils.video import VideoStream
import imagezmq
import RPi.GPIO as GPIO

# use either of the formats below to specifiy address of display computer
sender = imagezmq.ImageSender(connect_to='tcp://jeff-macbook:5555')
# sender = imagezmq.ImageSender(connect_to='tcp://192.168.1.190:5555')

# optionally, turn on the LED area lighting
use_led = False # set to True or False as needed
# optionally, filp the image vertically
flip = True # set to True of False as needed

if use_led:
GPIO.setmode(GPIO.BCM)
GPIO.setup(18, GPIO.OUT)
GPIO.output(18, True) # turn on LEDs

rpi_name = socket.gethostname() # send RPi hostname with each image
picam = VideoStream(usePiCamera=True).start()
time.sleep(2.0) # allow camera sensor to warm up
jpeg_quality = 95 # 0 to 100, higher is better quality, 95 is cv2 default
try:
while True: # send images as stream until Ctrl-C
image = picam.read()
if flip:
image = cv2.flip(image, -1)
ret_code, jpg_buffer = cv2.imencode(
".jpg", image, [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality])
sender.send_jpg(rpi_name, jpg_buffer)
except (KeyboardInterrupt, SystemExit):
pass # Ctrl-C was pressed to end program
except Exception as ex:
print('Python error with no Exception handler:')
print('Traceback error:', ex)
traceback.print_exc()
finally:
if use_led:
GPIO.output(18, False) # turn off LEDs
GPIO.cleanup() # close GPIO channel and release it
picam.stop()
sys.exit()
import threading
import numpy as np
from time import sleep

# Helper class implementing an IO deamon thread
class VideoStreamSubscriber:

def __init__(self, hostname, port):
self.hostname = hostname
self.port = port
self._stop = False
self._data_ready = threading.Event()
self._thread = threading.Thread(target=self._run, args=())
self._thread.daemon = True
self._thread.start()

def receive(self, timeout=15.0):
flag = self._data_ready.wait(timeout=timeout)
if not flag:
raise TimeoutError(
f"Timeout while reading from subscriber tcp://{self.hostname}:{self.port}")
self._data_ready.clear()
return self._data

def _run(self):
receiver = imagezmq.ImageHub(f"tcp://{self.hostname}:{self.port}", REQ_REP=False)
while not self._stop:
self._data = receiver.recv_jpg()
self._data_ready.set()
# Close socket here, not implemented in ImageHub :(
# zmq_socket.close()

def close(self):
self._stop = True

# Simulating heavy processing load
def limit_to_2_fps():
sleep(0.5)

if __name__ == "__main__":
# Receive from broadcast
hostname = "127.0.0.1"
port = 5555
receiver = VideoStreamSubscriber(hostname, port)

try:
while True:
msg, frame = receiver.receive()
image = cv2.imdecode(np.frombuffer(frame, dtype='uint8'), -1)

# Due to the IO thread constantly fetching images, we can do any amount
# of processing here and the next call to receive() will still give us
# the most recent frame (more or less realtime behaviour)

# Uncomment this to simulate processing load
# limit_to_2_fps()

cv2.imshow("Pub Sub Receive", image)
cv2.waitKey(1)
except (KeyboardInterrupt, SystemExit):
print('Exit due to keyboard interrupt')
except Exception as ex:
print('Python error with no Exception handler:')
print('Traceback error:', ex)
traceback.print_exc()
finally:
receiver.close()
sys.exit()