-
Notifications
You must be signed in to change notification settings - Fork 161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PUB/SUB] Subscriber slow motion video (queue keeps growing) #27
Comments
Thanks for your testing of the PUB/SUB pattern and reporting the slowdown on the receiver. This is the first I've heard of this problem. Frankly, I don't know what could be causing it. And it is counterintuitive that re-instantiating the Hub fixes it. I'm going to leave this issue open to see if anyone else responds to it. |
No problem, I will also do some more testing to see if I can figure something out and let you know the results. |
Hi Jeff, |
maybe ZMQ_CONFLATE ? Somewhere or somehow may solve the issue? |
I am not a user of the PUB/SUB pattern, but I did see this on StackOverflow that might be helpful: |
If what you are trying to accomplish is to have the image sending program be robust to a server restart or similar glitch, I handle that in my imagenode program using the REQ/REP pattern and using a signal timer to detect a non-response. That is what I do in my imagenode program: while not node.send_q:
node.read_cameras()
while len(node.send_q) > 0: # send until send_q is empty
try:
with Patience(settings.patience):
text, image = node.send_q.popleft()
hub_reply = node.send_frame(text, image)
except Patience.Timeout: # if no timely response from hub
log.info('No imagehub reply for '
+ str(int(settings.patience)) + ' seconds')
hub_reply = node.fix_comm_link()
node.process_hub_reply(hub_reply) The Patience() class, when instantiated, starts a timer using the system SIGALRM signal. It is used in a with clause to allow a blocking task to be interrupted if it does not return in specified number of seconds (settings.patience is the number of seconds to wait before giving up and fixing the comm link). The event loop code is here: |
I am having this same issue with ImageZMQ. In a receiver script with no significant image processing, the video is displayed with no increasing lag. However when the receiving script processes the image for face recognition, the processed frame rate is about two per second, but it is processing from queued frames, rather than just grabbing what frame is ready when it's ready to chew on another new frame. As a result, the queue grows until the system eventually freezes. I am resizing frames to 400px on the TX side and as I mentioned earlier, receiving this and displaying the window without the processing is quick. If I set a frame send delay on the TX side of .5 (2 frames/second) the receiver keeps up, but anything more and it bombs out eventually. What I'd like is to have the TX side maintain its framerate, and have the RX side NOT queue images, but just grab the next frame coming over the link when it's ready. In this way, other nodes in my system that need to receive and process the image can do so at their own rate, while the slower scripts can crunch along at whatever frame rate they need but not have to drown in queued frames. Is this possible? Some way to have ImageZMQ only take every 5th frame or something? Alternately, is there a way I can clear the queue in each loop, so it gets frame, processes frame, displays frame, clears queue, gets new frame, repeat... ? Thanks. I have created a video that shows clearly the difference between streamed video without significant processing, and streamed video that is live-processed, both using ImageZMQ. Link is Thanks for your attention. |
Hi @DaveXanatos , I really appreciate your video and the time you took to make it. I don't believe that the imagezmq library is actually the root cause of the frames backing up. I believe that ZMQ is backing up the frames at the SUB end of its PUB / SUB protocol. It is probably related to the size of the frames (not that your frames are too large, but most ZMQ PUB / SUB usage is for small message packets). As a first step, would you be willing to modify your program to use the REQ / REP protocol? It shouldn't be too hard to create a version of your programs that use REQ / REP. If that fixes the problem with the queue backing up, then that at least narrows it down to the ZMQ PUB / SUB protocol. If you are willing to try that, let me know if it works. It may be we need to file an issue with ZMQ. I don't know of a way to get ZMQ to flush the SUB queue on the receiving end (though there may be one). Let me know what your performance is like with REQ / REP and we'll proceed from there. |
maybe ZMQ_SNDHWM will be a solution? |
Thanks very much Jeff. I will have a video for you this evening using REQ/REP. I agree that it's the SUB side that's backing up. My intent is to stream the video frames in a manner where fast digesters (like the frame displays upper-left in my initial video) can process without delays induced by slower digesters (like face recognition) operating on the same frame stream. Several Raspberry Pies will be watching the video streams, all with several scripts looking for and processing different aspects of the stream to create a robust set of interpretations of what the robot is "seeing" (face recognition, object detection, OCR processing with Tesseract, etc) ImageZMQ seems like the perfect functionality here if we can solve the SUB side backups. I appreciate your working with me, very much. |
Hi jeffbass, 1: When running REP/REQ, the images do NOT que up, and frame processing runs around 3 FPS. I tried changing start orders, etc., but with REQ/REP mode, a "one to many" function seems to not be possible, but the "one" that is connected runs at an acceptable frame rate and does not queue up frames leading to a system crash. With the PUB/SUB mode, a "one to many" scenario can be established, but the frame backup on any slow SUB leads to a system crash (not to mention uselessly slow operation of the frame processing script) I feel frustratingly close to getting this to function perfectly :) Each piece offers partially correct function, but neither allows for fully correct function to allow for both multiple scripts to watch the stream, and to allow slow scripts to not hang the system and operate on old queued up frames that are useless. Truly, honestly appreciate your time and thought on this issue. Let me know whatever you need me to do to help to figure this out. Thank you! Dave VIDEO LINK: https://youtu.be/jQGz_nnZm7M |
Hi @DaveXanatos ,
What this workaround is doing is fixing the SUB latency queuing problem by restarting the SUB receiver. An easy way to do this is re-instantiate the imagehub SUB receiver before the queue gets "bad". It only needs to be done in the the program that is actually experiencing the lag, not the other programs. This is definitely NOT the longer term way to solve this problem. I suspect that it may be better to set the HWM (HIgh Water Mark) option in the slow subscriber to ZMQ_RCVHWM=1, forcing the SUB receiver to drop any inbound frames when the queue is still full from the last one. You would only set this option on the slow subscriber. Another possibility is to use the ZMQ_CONFLATE option, which is a flag to keep only the last frame, discarding all others (It was mentioned by @zarar7576 above). This is not likely to work with imagezmq because imagezmq is sending compound messages composed of 2 parts (text and image). Setting ZMQ_SNDHWM (mentioned by @hjinlee88 above) is less likely to help because it would have to be set in the PUB side, which is not where you are having a problem. In all the reading I have done, this is a problem with ZMQ slow subscribers, not with ZMQ publishers. So you would want to change HWM on the SUB side. Some options, like "HWM" are available as attributes in the imagezmq classes. For example, once imagehub is instantiated, print(imagehub.zmq_socket.hwm) prints "1000", the default HWM size. I suggest you try the steps above. That will answer the question whether re-instantiating the imagehub instance every 3 (or more or fewer) frames on the slow SUB subscriber prevents the queue lagging problem. Let me know how that works out. |
Hi @jeffbass I'm uploading the video of tonight's testing now. It should be up by about 8:45pm EDT. THANK YOU for your detailed and thoughtful reply above. Much for me to work with. Spoilers: HWM did nothing, but putting the instantiation of image_hub inside the loop does make it workable for now, even if it is utterly repulsive aesthetically :) My hope is that ZMQ makes it so that the SUB side's HWM setting ignores incoming frames if it's still processing one... I'll post when the video link is ready. Dave |
The video is ready at https://youtu.be/WmLbb9kVUng As always, thank you for being so helpful with getting this issue resolved. Dave |
Thanks again for your video. Moving the instantiation inside the frame loop is a very inefficient way to start with a new queue each frame (and throw away frames that would have caused the queue to become slow). But having a new SUB queue for each frame actually seems to work. Each reinstantiation is using the same port and so it is not likely to be causing some memory or port leaks. The only purpose of the test was to see if eliminating the SUB queue on your slow SUB would fix the problem. It looks like it did. I am still hopeful that there is a more elegant and effective way to keep the SUB queue from growing. But I think the test has shown that the slow SUB receiver is indeed the problem. While you did set the HWM correctly in your other test program, I'm pretty sure it only takes effect if you close the connection and reopen it. Let me check on that and get back to you. |
I think setting the HWM in the slow SUB image receiver is worth trying again. From the documentation on PyZMQ:
That means that when you tried to set the HWM in your test, it did not actually affect the socket since it was not a "subsequent socket connect". So, try instantiation before the loop (and don't have any instantiation inside the frame loop) by trying something like this: imagehub = imagezmq.ImageHub(open_port='tcp://127.0.0.1:5555', REQ_REP=False)
imagehub.zmq_socket.disconnect('tcp://127.0.0.1:5555')
imagehub.zmq_socket.set_hwm = 2 # try the value 2 or 4 or 8; imagezmq uses 2 messages per send
imagehub.zmq_socket.connect('tcp://127.0.0.1:5555') It is important to specify the address explicitly if you are going to disconnect and reconnect in this way. Defaults won't behave correctly. It would be OK to have the address in a string variable. Let me know what happens when you try this. |
Thanks again for these great replies. What you're saying makes sense and I'll have another video (hopefully shorter and not as rambling lol) this evening. If this HWM test is successful it will also mean that almost every place I've seen people trying to use it are doing it wrong (ie., like you saw in my code test). HWM would be a much better way to do this than eternal reinstantiation. We should know by this evening. Thanks again! |
Video complete. HWM did not fix the issue unfortunately. I should have the video up in an hour or so (dinner time... :) ) Thanks again! |
Here is the video link with the demo, code, commentary, etc. https://youtu.be/WTDTK8tX7mQ Dave |
@DaveXanatos , Thanks again for your video and willingness to experiment. I was hoping HWM on the SUB side would solve the problem. Sorry that it did not. Where to go from here? As a temporary and very ugly workaround, you can use the re-instantiation inside the frame loop. If you do that, I would not re-instantiate the image hub every frame, but would use a frame counter and an if statement that does the re-instantiation every 2 or 4 or 6 or ? frames. How many would be determined by experiment. Other than that, I have to honest that I am out of ideas to try. I don't have enough PUB / SUB experience to be helpful with deeper testing in the PUB / SUB arena. I do not think the core ZMQ developers see this as a bug they want to spend time on fixing because they have already produced a recommended workaround: The Suicidal Snail description in the ZMQ Guide makes their position clear: http://zguide.zeromq.org/php:chapter5#toc4 . It offers a number of recommended workarounds that all involve have the application code restart itself in some way. Or having the publisher throttle their speed of sending messages in some way. I have also not seen any better solutions on stackoverflow or elsewhere. You might want to consider setting up a deque with a small maxlen that receives frames at full speed in a separate thread. The frames would be received into the deque at full speed and be thrown away when deque reaches maxlen. Then your slow subscriber would read the last frame from the deque whenever it is ready for a new frame. I use the Python deque in my own projects in this way. My best current understanding of this issue is: imagezmq PUB / SUB is not a good solution when there are slow subscribers because the underlying ZMQ library is not a good solution when there are slow subscribers. I did not know that when I did my own testing of the PUB / SUB option because I did not test it with any slow subscribers. The PUB / SUB option in imagzmq was contributed by @bigdaddymax, who did a great job with the code contribution. We both did testing, but did not encounter the slow subscriber issue. This issue thread is evidence that you and others have found a significant issue with slow SUB receivers. I am going to leave this issue open so that others can be aware of the problem and hope that another user more experienced with PUB / SUB will have a suggestion for a solution. I will also add a description of this problem to the imagezmq PUB / SUB documentation. If you come up with a better PUB /SUB solution as you continue development of your project, please comment back in this thread. If it involves a ZMQ alternative like MQTT or RabbitMQ, please let us know how that works out. If it involves a deque in a thread, let us know that, too. Thanks again for all your work on this. Sorry we don't have a good solution yet. |
Well crap. :) This is one of the saddest things I've read in a while... I'll keep playing around, see what I can do. On the bright side, glad I got to know you a little. My wife & I have a small organic farm here, and I have a recent background in biotech. Hopefully we can catch up on FB. Thanks again for walking through this with me, it was a truly valuable experience, and at least I know I'm not alone in this experience. It seems like the core ZMQ folks could solve this easily.... but perhaps they are satisfied with inelegant workarounds... They probably have bigger fish to fry... Dave |
Looking forward to catching up on FB. Love my small organic farm (2 acres in Southern California). |
I have added the PUB / SUB slow subscriber issue discussed in this thread to the imagezmq documentation. I am hopeful that someone will come up with a solution. Leaving this issue open until we find a better solution than just restarting the slow subscriber. |
Hi Jeff, I have added another video with some further behavioral oddities related to the differences between the original Raspberry Pi CM3 running Stretch and the newer Pi 4 (4G) running Buster. While not strictly only related to ImageZMQ, it does affect it to a great deal. The video and complete text description of the observed issues is up at https://youtu.be/mTo28NhR-aQ If you have time and can review and offer any suggestions or observations, I would be very appreciative. I hope you and all your loved ones are staying healthy & safe! UPDATE: I fixed the imshow window by using these lines (primarily the cv2.WINDOW_GUI_NORMAL flag), although it had no appreciable effect on the delay vs. the CM3 running Stretch:
Still trying to figure out what's making the Pi 4 slower than the Pi 3. Makes no sense. |
Hi Dave, |
Hello, I'm not sure about all the details, but reading this thread I feel like you should try to offload receiving the messages on the pi to a different thread and then access the received frames from another thread to display them. Lock a mutex of course. This way the pi will probably not display every image and skip a few (as it might not be capable to do that anyway though), but it won't become a "slow subscriber". I'll put together a few lines of code later. Gotta do it anyway for a different project. |
I think it's crucial to know, that even if the pi will not slow down the PUB/SUB after this fix, a slow connection between server and pi might still lead to a similar behaviour... |
This should do the trick: class VideoStreamSubscriber:
def __init__(self, hostname, port):
self.hostname = hostname
self.port = port
self._stop = False
self._data_ready = threading.Event()
self._thread = threading.Thread(target=self._run, args=())
self._thread.daemon = True
self._thread.start()
def receive(self, timeout=15.0):
flag = self._data_ready.wait(timeout=timeout)
if not flag:
raise TimeoutError(
f"Timeout while reading from subscriber tcp://{self.hostname}:{self.port}")
self._data_ready.clear()
return self._data
def _run(self):
self.zmq_context = SerializingContext()
self.zmq_socket = self.zmq_context.socket(zmq.SUB)
self.zmq_socket.setsockopt(zmq.SUBSCRIBE, b'')
self.zmq_socket.connect(f"tcp://{self.hostname}:{self.port}")
while not self._stop:
self._data = self.zmq_socket.recv_array()
self._data_ready.set()
self.zmq_socket.close()
def close(self):
self._stop = True As @jeffbass correctly pointed out though:
So this might not be the ultimate solution for a pi, but maybe worth a try? |
Thanks @philipp-schmidt! |
Hey, put together a minimal example and docu, pull request #34. Let me know what you think. Best, |
Hello @BrandonYuen, @oggyjack, @hjinlee88, @zarar7576, @DaveXanatos, |
Closing this one for now. Philipp's fix is documented and mentioned in the examples and FAQs. Thanks again for the great contribution @philipp-schmidt ! |
To prevent from growing queue in PUB/SUB pattern in case of slow receiver, the option CONFLATE=1 enables to keep only the latest message. My solution here was to overload the ImageSender and ImageHub init_pubsub class to add some extra parameters in order to keep the queue short, with the RCVHWM and SNDHWM options.
and
|
Hi, can you explain in more detail how the conflate option works? How exactly does it replace the data in receive and send buffers? If this works as you described it briefly there might not even be a need for a seperate IO thread the way I implemented it. |
So the way I understand it: for each client zmq will overwrite the "server" side send buffer if the number of queued messages exceeds the limit set by zmq.SNDHWM? There were some discussions and experiments in this thread already and they failed to solve the issue. |
Hi @philipp-schmidt. However, in the imagezmq library, every image is sent with some metadata in multi-part messages, which is not supported with the conflate option. So my solution was intended to control the size of the sending queues. If the limit has been reached the socket drops the new message.
You could even limit the size of the receiving queue, but I have no idea of the outcome in case of several senders...
I have tested it, with a slower receiver than the sender. It gives correct results with a small delay between us. |
Hi there,
Thanks for your work on ImageZMQ, it's been very useful for me!
I'm using a project setup which has:
I've successfully implemented the REQ/REP pattern into my project and it works well. The only issue for me is that the REQ/REP pattern is blocking the server from processing as many images as it can. Because it's waiting for the receiver for the OK reply at every frame.
This is when I started trying the PUB/SUB pattern. For the server this works great. However, when I use PUB/SUB the video plays in slow motion on the receiver. With slow motion I mean that it's queue'ing all the frames it gets, but probably isn't fast enough to display all the frames it gets from the server. This creates an every growing queue of images. I've also tried it on a stronger machine (macbook), but it's the same result.
Any tips or ideas on how I could solve my issue? Any help is much appreciated!
Edit
I've changed my code a bit by re-instantiating the ImageHub object every loop iteration (instead of just once before the while (true) loop), and it seems to get rid of the queue-problem. It doesn't play in slowmotion anymore! However, I wonder if this is really the best solution; because re-instancing ImageHub every loop doesn't seem the most efficient way?
Before (queue/latency growing):
After (steady latency):
The text was updated successfully, but these errors were encountered: