From cadb16687742212921fb542717146075da2ed080 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Thu, 21 Sep 2023 12:43:21 +0200 Subject: [PATCH] Replace synchronized blocks in kafka connector with reentrant lock --- .../impl/KafkaRecordStreamSubscription.java | 10 +- .../messaging/kafka/impl/RecordQueue.java | 28 ++++- .../transactions/KafkaTransactionsImpl.java | 109 ++++++++++-------- 3 files changed, 95 insertions(+), 52 deletions(-) diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaRecordStreamSubscription.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaRecordStreamSubscription.java index b4104e6b27..aa4de81e4b 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaRecordStreamSubscription.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaRecordStreamSubscription.java @@ -8,6 +8,7 @@ import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.UnaryOperator; @@ -64,6 +65,8 @@ public class KafkaRecordStreamSubscription implements Flow.Subscription private final RecordQueue queue; private final long retries; + private final ReentrantLock queueLock = new ReentrantLock(); + public KafkaRecordStreamSubscription( ReactiveKafkaConsumer client, RuntimeKafkaSourceConfiguration config, @@ -255,8 +258,9 @@ boolean isCancelled() { * @param mapFunction */ void rewriteQueue(UnaryOperator mapFunction) { - ArrayDeque replacementQueue = new ArrayDeque<>(); - synchronized (queue) { + queueLock.lock(); + try { + ArrayDeque replacementQueue = new ArrayDeque<>(); queue .stream() .map(mapFunction) @@ -265,6 +269,8 @@ void rewriteQueue(UnaryOperator mapFunction) { queue.clear(); queue.addAll((Iterable) replacementQueue); + } finally { + queueLock.unlock(); } } } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/RecordQueue.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/RecordQueue.java index d33144668b..c41b8b2a1b 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/RecordQueue.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/RecordQueue.java @@ -2,6 +2,7 @@ import java.util.ArrayDeque; import java.util.Collection; +import java.util.concurrent.locks.ReentrantLock; /** * Stores the records coming from Kafka. @@ -12,6 +13,8 @@ */ public class RecordQueue extends ArrayDeque { + private final ReentrantLock lock = new ReentrantLock(); + public RecordQueue(int capacityHint) { super(capacityHint); } @@ -22,10 +25,13 @@ public boolean addAll(Collection c) { } public void addAll(Iterable iterable) { - synchronized (this) { + lock.lock(); + try { for (T record : iterable) { super.offer(record); } + } finally { + lock.unlock(); } } @@ -36,16 +42,22 @@ public boolean add(T item) { @Override public boolean offer(T item) { - synchronized (this) { + lock.lock(); + try { return super.offer(item); + } finally { + lock.unlock(); } } @Override public T poll() { T record; - synchronized (this) { + lock.lock(); + try { record = super.poll(); + } finally { + lock.unlock(); } return record; } @@ -62,15 +74,21 @@ public boolean remove(Object o) { @Override public int size() { - synchronized (this) { + lock.lock(); + try { return super.size(); + } finally { + lock.unlock(); } } @Override public void clear() { - synchronized (this) { + lock.lock(); + try { super.clear(); + } finally { + lock.unlock(); } } } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/transactions/KafkaTransactionsImpl.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/transactions/KafkaTransactionsImpl.java index 1528263016..dae1a7948e 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/transactions/KafkaTransactionsImpl.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/transactions/KafkaTransactionsImpl.java @@ -5,6 +5,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.stream.Collectors; @@ -35,6 +36,8 @@ public class KafkaTransactionsImpl extends MutinyEmitterImpl implements Ka private volatile Transaction currentTransaction; + private final ReentrantLock lock = new ReentrantLock(); + public KafkaTransactionsImpl(EmitterConfiguration config, long defaultBufferSize, KafkaClientService clientService) { super(config, defaultBufferSize); this.clientService = clientService; @@ -42,63 +45,79 @@ public KafkaTransactionsImpl(EmitterConfiguration config, long defaultBufferSize } @Override - public synchronized boolean isTransactionInProgress() { - return currentTransaction != null; + public boolean isTransactionInProgress() { + lock.lock(); + try { + return currentTransaction != null; + } finally { + lock.unlock(); + } } @Override @CheckReturnValue - public synchronized Uni withTransaction(Function, Uni> work) { - if (currentTransaction == null) { - return new Transaction().execute(work); + public Uni withTransaction(Function, Uni> work) { + lock.lock(); + try { + if (currentTransaction == null) { + return new Transaction().execute(work); + } + throw KafkaExceptions.ex.transactionInProgress(name); + } finally { + lock.unlock(); } - throw KafkaExceptions.ex.transactionInProgress(name); } @SuppressWarnings("rawtypes") @Override @CheckReturnValue - public synchronized Uni withTransaction(Message message, Function, Uni> work) { - String channel; - Map offsets; - - Optional batchMetadata = message.getMetadata(IncomingKafkaRecordBatchMetadata.class); - Optional recordMetadata = message.getMetadata(IncomingKafkaRecordMetadata.class); - if (batchMetadata.isPresent()) { - IncomingKafkaRecordBatchMetadata metadata = batchMetadata.get(); - channel = metadata.getChannel(); - offsets = metadata.getOffsets().entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue().offset() + 1))); - } else if (recordMetadata.isPresent()) { - IncomingKafkaRecordMetadata metadata = recordMetadata.get(); - channel = metadata.getChannel(); - offsets = new HashMap<>(); - offsets.put(TopicPartitions.getTopicPartition(metadata.getTopic(), metadata.getPartition()), - new OffsetAndMetadata(metadata.getOffset() + 1)); - } else { - throw KafkaExceptions.ex.noKafkaMetadataFound(message); - } + public Uni withTransaction(Message message, Function, Uni> work) { + lock.lock(); + try { + String channel; + Map offsets; + + Optional batchMetadata = message + .getMetadata(IncomingKafkaRecordBatchMetadata.class); + Optional recordMetadata = message.getMetadata(IncomingKafkaRecordMetadata.class); + if (batchMetadata.isPresent()) { + IncomingKafkaRecordBatchMetadata metadata = batchMetadata.get(); + channel = metadata.getChannel(); + offsets = metadata.getOffsets().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue().offset() + 1))); + } else if (recordMetadata.isPresent()) { + IncomingKafkaRecordMetadata metadata = recordMetadata.get(); + channel = metadata.getChannel(); + offsets = new HashMap<>(); + offsets.put(TopicPartitions.getTopicPartition(metadata.getTopic(), metadata.getPartition()), + new OffsetAndMetadata(metadata.getOffset() + 1)); + } else { + throw KafkaExceptions.ex.noKafkaMetadataFound(message); + } - List> consumers = clientService.getConsumers(channel); - if (consumers.isEmpty()) { - throw KafkaExceptions.ex.unableToFindConsumerForChannel(channel); - } else if (consumers.size() > 1) { - throw KafkaExceptions.ex.exactlyOnceProcessingNotSupported(channel); - } - KafkaConsumer consumer = consumers.get(0); - if (currentTransaction == null) { - return new Transaction( - /* before commit */ - consumer.consumerGroupMetadata() - .chain(groupMetadata -> producer.sendOffsetsToTransaction(offsets, groupMetadata)), - r -> Uni.createFrom().item(r), - VOID_UNI, - /* after abort */ - t -> consumer.resetToLastCommittedPositions() - .chain(() -> Uni.createFrom().failure(t))) - .execute(work); + List> consumers = clientService.getConsumers(channel); + if (consumers.isEmpty()) { + throw KafkaExceptions.ex.unableToFindConsumerForChannel(channel); + } else if (consumers.size() > 1) { + throw KafkaExceptions.ex.exactlyOnceProcessingNotSupported(channel); + } + KafkaConsumer consumer = consumers.get(0); + if (currentTransaction == null) { + return new Transaction( + /* before commit */ + consumer.consumerGroupMetadata() + .chain(groupMetadata -> producer.sendOffsetsToTransaction(offsets, groupMetadata)), + r -> Uni.createFrom().item(r), + VOID_UNI, + /* after abort */ + t -> consumer.resetToLastCommittedPositions() + .chain(() -> Uni.createFrom().failure(t))) + .execute(work); + } + throw KafkaExceptions.ex.transactionInProgress(name); + } finally { + lock.unlock(); } - throw KafkaExceptions.ex.transactionInProgress(name); } private static final Uni VOID_UNI = Uni.createFrom().voidItem();