Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Advice on increasing consumer concurrency & throughput #372

Open
fabianwikstrom opened this issue Apr 1, 2024 · 0 comments
Open

Advice on increasing consumer concurrency & throughput #372

fabianwikstrom opened this issue Apr 1, 2024 · 0 comments

Comments

@fabianwikstrom
Copy link

Hello, we need some advice on how to increase throughput in our Pulsar consumers. Here are some details:

  • We run 1 consumer per pod in a kubernetes cluster
  • We run in the Shared subscription mode. Strict ordering does not matter for us
  • To keep debugging simple we have not enabled batching
  • We've been using the listener pattern

We've found that our messages are being processed sequentially, which leads to poor throughput. We need to speed things up a bit. What we are wondering is what is the recommended way to do so. I've attached two options we are considering below

  1. Increases the number of consumers and uses the listener pattern
  2. Uses the receiver pattern with multiple workers

We'd like to understand what the community considers best practice and why. Thank you :)

Running multiple consumers per pod

import { faker } from '@faker-js/faker';
import Pulsar from 'pulsar-client';

process.env.ENVIRONMENT = 'development';
process.env.PULSAR_SERVICE_URL = 'pulsar://localhost:6650';
const PULSAR_TOPIC = `test-${faker.string.alpha(10)}`;

const PULSAR_SUBSCRIPTION = `sub-${PULSAR_TOPIC}`;

const CONCURRENCY = 5;
const SEND_NUMBER = 10;

async function handleMessage(
  message: Pulsar.Message,
  consumer: Pulsar.Consumer,
): Promise<void> {
  console.log('Received message: ', message.getData().toString());
  await new Promise((resolve) => setTimeout(resolve, 1000));
  await consumer.acknowledge(message);
}

async function main() {
  const client = new Pulsar.Client({
    serviceUrl: process.env.PULSAR_SERVICE_URL as string,
    log: logconfig(),
    messageListenerThreads: CONCURRENCY,
  });

  console.log('Topic: ', PULSAR_TOPIC);
  console.log('Subscription: ', PULSAR_SUBSCRIPTION);

  // Create the main consumer
  const consumers = [];
  const counter = new Map<string, number>();
  const subscriptionType = 'Shared';
  const ackTimeoutMs = 10_000;
  const nAckRedeliverTimeoutMs = 2_000;
  const batchIndexAckEnabled = false;

  for (let i = 0; i < CONCURRENCY; i += 1) {
    const consumer = await client.subscribe({
      topic: PULSAR_TOPIC,
      subscription: PULSAR_SUBSCRIPTION,
      subscriptionType,
      ackTimeoutMs,
      nAckRedeliverTimeoutMs,
      receiverQueueSize: 10,
      batchIndexAckEnabled,
      listener: (message, consumer) => handleMessage(message, consumer),
    });
    consumers.push(consumer);
  }

  // Send messages
  const producer = await client.createProducer({ topic: PULSAR_TOPIC });

  for (let i = 0; i < SEND_NUMBER; i += 1) {
    const msg = `test-message-${i}`;
    counter.set(msg, 0);
    await producer.send({ data: Buffer.from(msg) });
  }

  // Sleep 20 seconds to wait for the messages to be processed
  await new Promise((resolve) => setTimeout(resolve, 50000));

  await producer.close();
  for (const consumer of consumers) {
    await consumer.close();
  }
  process.exit(0);
}

void main();

function logconfig() {
  return (level: any, _file: any, _line: any, message: any) => {
    switch (level) {
      case Pulsar.LogLevel.DEBUG:
        console.debug(message);
        break;
      case Pulsar.LogLevel.INFO:
        console.info(message);
        break;
      case Pulsar.LogLevel.WARN:
        console.warn(message);
        break;
      case Pulsar.LogLevel.ERROR:
        console.error(message);
        break;
    }
  };
}

Increasing concurrency per consumer

import Pulsar from 'pulsar-client';

import logger from '../../utils/logger';

process.env.ENVIRONMENT = 'development';
process.env.PULSAR_SERVICE_URL = 'pulsar://localhost:6650';
const PULSAR_TOPIC = `test-${faker.string.alpha(10)}`;

const PULSAR_SUBSCRIPTION = `sub-${PULSAR_TOPIC}`;

const CONCURRENCY = 5;
const SEND_NUMBER = 10;

async function handleMessage(
  message: Pulsar.Message,
  consumer: Pulsar.Consumer,
): Promise<void> {
  console.log('Received message: ', message.getData().toString());
  await new Promise((resolve) => setTimeout(resolve, 1000));
  await consumer.acknowledge(message);
}

async function main() {
  const client = new Pulsar.Client({
    serviceUrl: process.env.PULSAR_SERVICE_URL as string,
    log: logconfig()
  });

  console.log('Topic: ', PULSAR_TOPIC);
  console.log('Subscription: ', PULSAR_SUBSCRIPTION);

  // Create the main consumer
  const consumers = [];
  const counter = new Map<string, number>();
  const subscriptionType = 'Shared';
  const ackTimeoutMs = 10_000;
  const nAckRedeliverTimeoutMs = 2_000;
  const batchIndexAckEnabled = false;

  const consumer = await client.subscribe({
    topic: PULSAR_TOPIC,
    subscription: PULSAR_SUBSCRIPTION,
    subscriptionType,
    ackTimeoutMs,
    nAckRedeliverTimeoutMs,
    receiverQueueSize: 10,
    batchIndexAckEnabled,
  });

  await listen(
    consumer,
    async (consumer, message) => handleMessage(message, consumer),
    CONCURRENCY,
  );

  // Send messages
  const producer = await client.createProducer({ topic: PULSAR_TOPIC });

  for (let i = 0; i < SEND_NUMBER; i += 1) {
    const msg = `test-message-${i}`;
    counter.set(msg, 0);
    await producer.send({ data: Buffer.from(msg) });
  }

  // Sleep 20 seconds to wait for the messages to be processed
  await new Promise((resolve) => setTimeout(resolve, 50000));

  await producer.close();
  await consumer.close();
  process.exit(0);
}

void main();

/**
 * Receive messages from a Pulsar consumer and process them concurrently.
 *
 * @param consumer - Pulsar consumer to receive messages from.
 * @param listener - Message handler function.
 * @param concurrency - Maximum number of messages to process at a time.
 */
export async function listen(
  consumer: Pulsar.Consumer,
  listener: (
    consumer: Pulsar.Consumer,
    message: Pulsar.Message,
  ) => Promise<void>,
  concurrency = 1,
): Promise<void> {
  const workers = new Array<Promise<void>>();
  for (let i = 0; i < concurrency; i++) {
    const worker = async () => {
      for (;;) {
        try {
          const message = await consumer.receive();
          await listener(consumer, message);
        } catch (err: any) {
          logger.error(`Message processing error: ${err.message}`);
        }
      }
    };
    workers.push(worker());
  }
  await Promise.all(workers);
}

function logconfig() {
  return (level: any, _file: any, _line: any, message: any) => {
    switch (level) {
      case Pulsar.LogLevel.DEBUG:
        console.debug(message);
        break;
      case Pulsar.LogLevel.INFO:
        console.info(message);
        break;
      case Pulsar.LogLevel.WARN:
        console.warn(message);
        break;
      case Pulsar.LogLevel.ERROR:
        console.error(message);
        break;
    }
  };
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant