-
Notifications
You must be signed in to change notification settings - Fork 178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pulsar blocks vertx thread when blockIfQueueFull is true #2548
Comments
Do you have a reproducer? |
I guess that we would need to dispatch writes on a separate thread as we do for Kafka. |
Hey @cescoffier thank you for a quick response. To reproduce we just need to enable https://quarkus.io/guides/pulsar By default Pulsar throws an exception if the I think that dispatching writes on an executor could be a right solution. So to sum up, to reproduce the issue one needs to:
|
We did not use a sender thread because the Pulsar producer client has the The connector sender honors the Is there a specific behavior you are looking for when enabling |
The real issue is that you are seeing We have ad-hoc "performance" tests for the Pulsar producer here: https://github.com/smallrye/smallrye-reactive-messaging/blob/3c2663b8e91f71251f43fa7c0e808e7add6ea644/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/perf/PerformanceProducerTest.java It would be great if you can reproduce it with a similar test. |
Thanks for the response, The whole code is:
So I was wrong, there is no emitter. Does it change anything? We noticed this error under load. We can work on a reproducible test, but not in upcoming days, as we are on a conference this week. |
No I don't think so. Looking at the code I see that the manual call to the ack is erroneous,
Does this case actually happen or is there for safeguarding? If you want to continue using the
Or you can use the simpler signatures with payload directly and inject the
In your processor definitions you can simply inject the metadata after the payload :
|
It looks like by default Pulsar controls the memory using MemoryLimitController The private boolean canEnqueueRequest(SendCallback callback, long sequenceId, int payloadSize) {
try {
if (conf.isBlockIfQueueFull()) {
if (semaphore.isPresent()) {
semaphore.get().acquire();
}
client.getMemoryLimitController().reserveMemory(payloadSize);
} else {
if (!semaphore.map(Semaphore::tryAcquire).orElse(true)) {
callback.sendComplete(new PulsarClientException.ProducerQueueIsFullError(
"Producer send queue is full", sequenceId));
return false;
}
if (!client.getMemoryLimitController().tryReserveMemory(payloadSize)) {
semaphore.ifPresent(Semaphore::release);
callback.sendComplete(new PulsarClientException.MemoryBufferIsFullError(
"Client memory buffer is full", sequenceId));
return false;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
callback.sendComplete(new PulsarClientException(e, sequenceId));
return false;
}
return true;
} So before we limited the Some calculations:
I think we could reach the limit of 64MB. The solution could be to create SenderProcessor based on MemoryLimitController? We can access the memory limit controller: PulsarClientImpl clientImpl = (PulsarClientImpl) client;
MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController();
SenderProcessor processor = new MemoryAwareSenderProcessor(memoryLimitController); And use something like (forgive me the pseudo code): if (memoryLimitController.isMemoryLimited()) {
semaphore.acquire();
memoryLimitController.tryReserveMemory(messageSize); // There may be more elegant way for checking it
memoryLimitController.releaseMemory(messageSize); // There may be more elegant way for checking it
sendMessage(message);
semaphore.release();
} The takeways:
What do you think @marekczajkowski @ozangunalp ? Of course I may be wrong, haven't tested that yet. |
Regarding the memory limit, I encountered the error while using default settings and a basic relay function. By sending several thousand messages of approximately 30KB each to a Pulsar topic, the relay began to reject messages after consuming around 1.6k messages due to insufficient memory. The error message received was: It's essential that backpressure takes into account memory constraints, not just the number of messages, as the size of individual messages can vary unpredictably. As for the original issue, I've been making efforts to reproduce the 'queue full' error, but haven't encountered it yet. |
@michalcukierman @marekczajkowski Thanks for your inputs. I'll spend more time next week to reproduce this issue. |
I've been looking into this and couldn't find a real problem with the connector code. Here are my observations:
The conclusion is this happens when the broker cannot ingest messages at the rate your app produces them. The proper way is to adjust the send buffer, both in terms of memory and in terms of messages pending for send confirms. That way the back-pressure can be applied to slow the new message ingestion without filling the
Note for partitioned topics : It is not possible for the connector to apply per-partition back-pressure to the upstream. And the If someone is willing to contribute a MemoryLimitAwareSendProcessor, I'd be glad to review it. Hope all this helps for your further investigation. |
@ozangunalp thanks a lot for your input. Here are my thoughts on this The bottleneck arises when setting Example: Given the dynamic nature of partition changes at runtime, I believe there are limited options available to address this issue effectively. Consequently, I suggest updating the documentation to emphasize the importance of carefully configuring As for the MemoryLimitAwareSendProcessor I will raise another issue and we are willing to contribute but it's not the highest priority at the moment |
@marekczajkowski you are right, the producer queue size is somewhat related but different from the client memory full error. Maybe the best course of action is to rename the Pulsar connector attribute Initiallly, I've named the attribute the same to be able to set the producer config together, but I was unaware of this partitioned topic issue. If we rename the connector attribute to WDYT? |
I believe separating the backpressure configuration from the producer configuration is a beneficial approach. This way, the default mechanism can rely on SmallRye configuration, allowing producers to operate without limitations and providing room for more sophisticated custom configurations when used in conjunction with parameters like maxPendingMessages and maxPendingMessagesAcrossPartitions. Additionally, I suggest updating the documentation to include detailed explanations of these configurations. |
When blockIfQueueFull is set to true for pulsar producer it might happen that when the queue limit is reached producer is blocked and it block the event loop.
Sending message should be done in different thread as it's done e.g for Kafka
The text was updated successfully, but these errors were encountered: