Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulsar blocks vertx thread when blockIfQueueFull is true #2548

Closed
marekczajkowski opened this issue Mar 21, 2024 · 15 comments · Fixed by #2579
Closed

Pulsar blocks vertx thread when blockIfQueueFull is true #2548

marekczajkowski opened this issue Mar 21, 2024 · 15 comments · Fixed by #2579
Labels

Comments

@marekczajkowski
Copy link

When blockIfQueueFull is set to true for pulsar producer it might happen that when the queue limit is reached producer is blocked and it block the event loop.

Sending message should be done in different thread as it's done e.g for Kafka

@cescoffier
Copy link
Contributor

Do you have a reproducer?

@cescoffier
Copy link
Contributor

I guess that we would need to dispatch writes on a separate thread as we do for Kafka.

@michalcukierman
Copy link
Contributor

michalcukierman commented Mar 21, 2024

Hey @cescoffier thank you for a quick response. To reproduce we just need to enable blockIfQueueFull and produce more messages that Pulsar is able to receive.

https://quarkus.io/guides/pulsar

By default Pulsar throws an exception if the maxPendingMessages limit is exceeded. This is sometimes expected, but by default it leads to Nacks, and puts even more pressure on the service. After enabling blockIfQueueFull the event-loop is getting blocked.

I think that dispatching writes on an executor could be a right solution.

So to sum up, to reproduce the issue one needs to:

  1. Set relatively small maxPendingMessages (i.e. 10)
  2. Enable blockIfQueueFull
  3. Send more messages than Pulsar can write (i.e. 1000 using emitter)
    This will block the event loop.

@ozangunalp
Copy link
Collaborator

We did not use a sender thread because the Pulsar producer client has the maxPendingMessages property.

The connector sender honors the maxPendingMessages and effectively applies back-pressure by not requesting more messages to process from the upstream.
So, in theory, you would not need to use blockIfQueueFull=true. As the name mentions and you've noticed it blocks the client therefore not compatible with event loop threads.

Is there a specific behavior you are looking for when enabling blockIfQueueFull?

@ozangunalp
Copy link
Collaborator

The real issue is that you are seeing maxPendingMessages limit exceeded. Normally that should not happen.

We have ad-hoc "performance" tests for the Pulsar producer here: https://github.com/smallrye/smallrye-reactive-messaging/blob/3c2663b8e91f71251f43fa7c0e808e7add6ea644/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/perf/PerformanceProducerTest.java

It would be great if you can reproduce it with a similar test.

@michalcukierman
Copy link
Contributor

Thanks for the response,

The whole code is:

@ApplicationScoped
public class Relay {

  public static final String INCOMING_PAGES_CHANNEL = "incoming-pages";
  public static final String OUTGOING_PAGES_CHANNEL = "outgoing-pages";
  public static final String INCOMING_PAGE_FRAGMENTS_CHANNEL = "incoming-page-fragments";
  public static final String OUTGOING_PAGE_FRAGMENTS_CHANNEL = "outgoing-page-fragments";
  public static final String INCOMING_ASSETS_CHANNEL = "incoming-assets";
  public static final String OUTGOING_ASSETS_CHANNEL = "outgoing-assets";
  public static final String INCOMING_WEB_RESOURCES_CHANNEL = "incoming-web-resources";
  public static final String OUTGOING_WEB_RESOURCES_CHANNEL = "outgoing-web-resources";

  @Inject
  Logger log;

  @Incoming(INCOMING_PAGES_CHANNEL)
  @Outgoing(OUTGOING_PAGES_CHANNEL)
  public Message<Page> relayPage(Message<Page> incoming) {
    return relay(incoming);
  }

  @Incoming(INCOMING_PAGE_FRAGMENTS_CHANNEL)
  @Outgoing(OUTGOING_PAGE_FRAGMENTS_CHANNEL)
  public Message<PageFragment> relayPageFragment(Message<PageFragment> incoming) {
    return relay(incoming);
  }

  @Incoming(INCOMING_ASSETS_CHANNEL)
  @Outgoing(OUTGOING_ASSETS_CHANNEL)
  public Message<Asset> relayAsset(Message<Asset> incoming) {
    return relay(incoming);
  }

  @Incoming(INCOMING_WEB_RESOURCES_CHANNEL)
  @Outgoing(OUTGOING_WEB_RESOURCES_CHANNEL)
  public Message<WebResource> relayWebResource(Message<WebResource> incoming) {
    return relay(incoming);
  }

  private <T> Message<T> relay(Message<T> incoming) {
    String key = MetadataUtils.extractKey(incoming);
    Action action = MetadataUtils.extractAction(incoming);
    Long eventTime = MetadataUtils.extractEventTime(incoming);

    Message<T> outgoing;
    if (key == null || action == null || eventTime == null) {
      log.trace("Skipping relaying of message without required metadata");
      incoming.ack();

      outgoing = null;
    } else {
      log.tracef("Relaying: key %s, action %s, event time %s",
          key, action, eventTime);
      outgoing = incoming;
    }
    return outgoing;
  }
}

So I was wrong, there is no emitter. Does it change anything? We noticed this error under load.
The settings are:
maxPendingMessages 16
blockIfQueueFull true

We can work on a reproducible test, but not in upcoming days, as we are on a conference this week.

@ozangunalp
Copy link
Collaborator

So I was wrong, there is no emitter. Does it change anything?

No I don't think so.

Looking at the code I see that the manual call to the ack is erroneous,
but I can't be sure if that's the reason for the wrong application of the backpressure

if (key == null || action == null || eventTime == null) {
log.trace("Skipping relaying of message without required metadata");
incoming.ack();
outgoing = null;
} else {

Does this case actually happen or is there for safeguarding?
Because incoming.ack(); is an async operation and you need to subscribe to it, otherwise you would never call the ack of the pulsar message underneath.

If you want to continue using the Message<T> you can do the following:


    private <T> Uni<Message<T>> relay(Message<T> incoming) {
        String key = MetadataUtils.extractKey(incoming);
        Action action = MetadataUtils.extractAction(incoming);
        Long eventTime = MetadataUtils.extractEventTime(incoming);

        if (key == null || action == null || eventTime == null) {
            log.trace("Skipping relaying of message without required metadata");
            return Uni.createFrom().completionStage(incoming::ack)
                    .replaceWith(() -> null);
        } else {
            log.tracef("Relaying: key %s, action %s, event time %s",
                    key, action, eventTime);
            return Uni.createFrom().item(incoming);
        }
    }

Or you can use the simpler signatures with payload directly and inject the PulsarIncomingMessageMetadata to the incoming methods and extract key, action, eventtime from the metadata. The handling of acks, also in the case of returning null, will be handled automatically.

  private <T> T relay(T incomingPayload, PulsarIncomingMessageMetadata metadata) {
    String key = MetadataUtils.extractKey(metadata);
    Action action = MetadataUtils.extractAction(metadata);
    Long eventTime = MetadataUtils.extractEventTime(metadata);

    if (key == null || action == null || eventTime == null) {
      log.trace("Skipping relaying of message without required metadata");
      return null;
    } else {
      log.tracef("Relaying: key %s, action %s, event time %s",
          key, action, eventTime);
      return incomingPayload;
    }
  }

In your processor definitions you can simply inject the metadata after the payload :

  @Incoming(INCOMING_ASSETS_CHANNEL)
  @Outgoing(OUTGOING_ASSETS_CHANNEL)
  public Asset relayAsset(Asset incoming, PulsarIncomingMessageMetadata metadata) {
    return relay(incoming, metadata);
  }

@marekczajkowski
Copy link
Author

It's only for safeguarding. This block of code should 'never' happen. Anyway I do not think it has something to do with incoming message acknowledging. There might be something wrong with backPressure on pulsar.

Originally we faced the issue with Pulsar producer queue full.
Zrzut ekranu 2024-04-4 o 13 50 50

As you can see we have two services (2 replicas for each) that reads from 'pages' channel and tries to send messages to pulsar topic via smallrye channel.

No custom settings for maxPendingMessages nor waitForWriteCompletion have been introduced.

At first it seemed like a good idea to set blockIfQueueFull to true to wait until pulsar consume the pending messages, but as you mentioned it's not.

@michalcukierman
Copy link
Contributor

michalcukierman commented Apr 4, 2024

It looks like by default Pulsar controls the memory using MemoryLimitController
https://github.com/apache/pulsar/pull/13344/files#diff-dada7e51eed6861aa3ebc13dcf1571dc0b6998f881a61d0cf4ced7e10b06628a

The ProducerQueueIsFullError error is raised, when we can't reserve a memory (ProducerImpl.java - Pulsar):

    private boolean canEnqueueRequest(SendCallback callback, long sequenceId, int payloadSize) {
        try {
            if (conf.isBlockIfQueueFull()) {
                if (semaphore.isPresent()) {
                    semaphore.get().acquire();
                }
                client.getMemoryLimitController().reserveMemory(payloadSize);
            } else {
                if (!semaphore.map(Semaphore::tryAcquire).orElse(true)) {
                    callback.sendComplete(new PulsarClientException.ProducerQueueIsFullError(
                            "Producer send queue is full", sequenceId));
                    return false;
                }

                if (!client.getMemoryLimitController().tryReserveMemory(payloadSize)) {
                    semaphore.ifPresent(Semaphore::release);
                    callback.sendComplete(new PulsarClientException.MemoryBufferIsFullError(
                            "Client memory buffer is full", sequenceId));
                    return false;
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            callback.sendComplete(new PulsarClientException(e, sequenceId));
            return false;
        }

        return true;
    }

So before we limited the maxPendingMessages and enabled blockIfQueueFull, we get errors because of the memory limit.

Some calculations:

  • 6 partitions = 6000 messages that Pulsar can enqueue
  • SmallRye allows to send 1000 messages (inflights, SmallRye does not take partitions into the consideration)
  • 64MB memory limit (default)
  • we were sending 30kb messages

I think we could reach the limit of 64MB.

The solution could be to create SenderProcessor based on MemoryLimitController?

We can access the memory limit controller:

PulsarClientImpl clientImpl = (PulsarClientImpl) client;
MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController();
SenderProcessor processor = new MemoryAwareSenderProcessor(memoryLimitController);

And use something like (forgive me the pseudo code):

        if (memoryLimitController.isMemoryLimited()) {
            semaphore.acquire();
            memoryLimitController.tryReserveMemory(messageSize); // There may be more elegant way for checking it
            memoryLimitController.releaseMemory(messageSize); // There may be more elegant way for checking it
            sendMessage(message);
            semaphore.release();
        }

The takeways:

  • Creating MemoryLimitController
  • Another potential issue, is that if the client is not memory limited, the partitions are not taken into the consideration when calculating inflights in SenderProcessor.

What do you think @marekczajkowski @ozangunalp ?

Of course I may be wrong, haven't tested that yet.

@marekczajkowski
Copy link
Author

Regarding the memory limit, I encountered the error while using default settings and a basic relay function. By sending several thousand messages of approximately 30KB each to a Pulsar topic, the relay began to reject messages after consuming around 1.6k messages due to insufficient memory.

The error message received was:
SRMSG19024: Failed to process a message sent to channel incoming-pages, proceeding with nack, failure is: Client memory buffer is full.

It's essential that backpressure takes into account memory constraints, not just the number of messages, as the size of individual messages can vary unpredictably.

As for the original issue, I've been making efforts to reproduce the 'queue full' error, but haven't encountered it yet.

@ozangunalp
Copy link
Collaborator

@michalcukierman @marekczajkowski Thanks for your inputs.
I'd assume the memory management would be according to the max inflight messages setting but apparently, we need to look into this a little bit deeper.

I'll spend more time next week to reproduce this issue.

@ozangunalp
Copy link
Collaborator

I've been looking into this and couldn't find a real problem with the connector code.
I've been using the EndToEndPayloadPerfTest, (it was disabled on main for a stupid reason.) Initially, it runs with 10kb payload sizes but I've increased it up to 50kb for testing.

Here are my observations:

  • It is hard (and also easy) for me to trigger the Client memory buffer is full exception: The PulsarContainer used in tests sets up a tmpfs for the data directory. It was there to run the tests a bit faster but didn't have a big impact at the end. However, when you are hammering the broker with big payloads it fills up the bookie disk space fast and kills the container if its a tmpfs. Just before getting killed, the broker stops acking sent messages and the client memory gets filled.
  • When I remove the tmpfs, it is not really possible for me to reproduce the client memory buffer is full. For further tests, we need to set up a latency with a toxiproxy like we do for Kafka tests. Contributions are welcome!
  • So I continued my tests with tmpfs enabled. I pushed 30k * 50kb sized payloads and processed them with a relay (in-out). I've noticed that I filled the disk space around 7k relayed messages, which is coherent with 2Gb of tmpfs disk space (50kb * 37k = 1.85Gb).
  • In this scenario the client should process and relay messages and eventually fill the bookie's disk without hitting the Client memory buffer is full. It should stop processing more messages as it "detects" that no more message sends are getting confirmed. (The maxPendingMessages in SenderProcessor working)
  • I recognized that the MemoryLimitController indeed works effectively to track the message payload sizes. However, it maxes out at the half of the set size, ex for 64mb (default) memoryLimitBytes maxes out at 650 50kb messages pending for send confirms.
  • Initially, I thought this was because of received messages but this is the case even when using different PulsarClient instances for in and out channels. (Tip: Pulsar connector shares pulsar client instances between different channels if client configurations are the same. If you want to force different clients, you can set the description attribute.)
  • But it turns out it was because of producer batching. When producer batching is enabled the same payload counts twice in the memory limit, once for send queueing and second for the batch allocation. The batching is enabled by default with 128kb size limit, but I guess when the broker is not accepting any more sends, batches also stack up.

The conclusion is this happens when the broker cannot ingest messages at the rate your app produces them. The proper way is to adjust the send buffer, both in terms of memory and in terms of messages pending for send confirms. That way the back-pressure can be applied to slow the new message ingestion without filling the client memory buffer. Many course of actions are possible:

  • You can adjust the memoryLimitBytes (defaults to 64mb)
  • You can disable batching if payloads are already very big, that way batches do not count against the memory limit.
  • You can adjust the maxPendingMessages (applied by the connector to 1000) : For example 500 * 50kb stays in the 30mb limit.

Note for partitioned topics :
The memory limiter is on the pulsar client and shared for all producers/consumers created with that client. Therefore for a partitioned producer the limit will be the same 64mb default.

It is not possible for the connector to apply per-partition back-pressure to the upstream. And the maxPendingMessagesAcrossPartitions config being deprecated, I've chosen to honor only the maxPendingMessages and not send more than that to the producer.
Currently, there is an issue on how the client configs are merged so when specifying maxPendingMessages you also need to specify maxPendingMessagesAcrossPartitions but it is easy to fix.

If someone is willing to contribute a MemoryLimitAwareSendProcessor, I'd be glad to review it.

Hope all this helps for your further investigation.

@marekczajkowski
Copy link
Author

marekczajkowski commented Apr 10, 2024

@ozangunalp thanks a lot for your input. Here are my thoughts on this

The bottleneck arises when setting maxPendingMessages for partitioned topics. When maxPendingMessages is configured, SmallRye applies backpressure, preventing the sending of more messages to the broker than the specified limit. However, with partitioned topics, each partition has its own producer under the hood, and each producer has its maxPendingMessages set.
The challenge lies in the fact that setting maxPendingMessages is applied individually for each producer. Consider a scenario where there are 3 partitions and the default maxPendingMessages is set to 1000. From Pulsar's perspective, brokers can accept 3 x 1000 = 3000 messages. However, SmallRye does not honor this setting. This discrepancy can be addressed by using the deprecated maxPendingMessagesAcrossPartitions setting, which should be set to the value of numOfPartitions x maxPendingMessages.
Unfortunately, the maxPendingMessagesAcrossPartitions setting has no effect in the SmallRye connector. When maxPendingMessagesAcrossPartitions is set to a lower number than numOfPartitions x maxPendingMessages, SmallRye will send messages via the producer up to the limit of maxPendingMessages. Consequently, individual producers may have a lower queue size (maxPendingMessagesAcrossPartitions / numOfPartitions) than the SmallRye sender. This mismatch can lead to errors, as SmallRye may send more messages to an individual producer than it can accept, potentially resulting in a ProducerQueueIsFullError or MemoryBufferIsFullError.

Example:
numberOfPartitions=2
maxPendingMessages=100
maxPendingMessagesAcrossPartitions=102
smallrye backpressure inflights = maxPendingMessages (100)
maxPendingMessage per individual partition producer in pulsar client = 51
smallrye can send 100 messages when producer can accept 51.

Given the dynamic nature of partition changes at runtime, I believe there are limited options available to address this issue effectively. Consequently, I suggest updating the documentation to emphasize the importance of carefully configuring maxPendingMessagesAcrossPartitions, taking into account the aforementioned scenario. Additionally, it would be beneficial to include a note in the SmallRye log messages, indicating that if encountering a ProducerQueueIsFull error, users should verify their maxMessagesAcrossPartitions settings.

As for the MemoryLimitAwareSendProcessor I will raise another issue and we are willing to contribute but it's not the highest priority at the moment

@ozangunalp
Copy link
Collaborator

@marekczajkowski you are right, the producer queue size is somewhat related but different from the client memory full error.

Maybe the best course of action is to rename the Pulsar connector attribute maxPendingMessages to something else like max.inflight.messages like in Kafka.

Initiallly, I've named the attribute the same to be able to set the producer config together, but I was unaware of this partitioned topic issue.

If we rename the connector attribute to max.inflight.messages with 1000 default, the maxPendingMessages and maxPendingMessagesAcrossPartitions can be set independently, defaulting both to 0.

WDYT?

@marekczajkowski
Copy link
Author

I believe separating the backpressure configuration from the producer configuration is a beneficial approach. This way, the default mechanism can rely on SmallRye configuration, allowing producers to operate without limitations and providing room for more sophisticated custom configurations when used in conjunction with parameters like maxPendingMessages and maxPendingMessagesAcrossPartitions. Additionally, I suggest updating the documentation to include detailed explanations of these configurations.

ozangunalp added a commit to ozangunalp/smallrye-reactive-messaging that referenced this issue Apr 12, 2024
ozangunalp added a commit to ozangunalp/smallrye-reactive-messaging that referenced this issue Apr 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants