Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace some synchronized blocks #2289

Merged
merged 2 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.UnaryOperator;

Expand Down Expand Up @@ -64,6 +65,8 @@ public class KafkaRecordStreamSubscription<K, V, T> implements Flow.Subscription
private final RecordQueue<T> queue;
private final long retries;

private final ReentrantLock queueLock = new ReentrantLock();

public KafkaRecordStreamSubscription(
ReactiveKafkaConsumer<K, V> client,
RuntimeKafkaSourceConfiguration config,
Expand Down Expand Up @@ -255,8 +258,9 @@ boolean isCancelled() {
* @param mapFunction
*/
void rewriteQueue(UnaryOperator<T> mapFunction) {
ArrayDeque<T> replacementQueue = new ArrayDeque<>();
synchronized (queue) {
queueLock.lock();
try {
ArrayDeque<T> replacementQueue = new ArrayDeque<>();
queue
.stream()
.map(mapFunction)
Expand All @@ -265,6 +269,8 @@ void rewriteQueue(UnaryOperator<T> mapFunction) {

queue.clear();
queue.addAll((Iterable<T>) replacementQueue);
} finally {
queueLock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.concurrent.locks.ReentrantLock;

/**
* Stores the records coming from Kafka.
Expand All @@ -12,6 +13,8 @@
*/
public class RecordQueue<T> extends ArrayDeque<T> {

private final ReentrantLock lock = new ReentrantLock();

public RecordQueue(int capacityHint) {
super(capacityHint);
}
Expand All @@ -22,10 +25,13 @@ public boolean addAll(Collection<? extends T> c) {
}

public void addAll(Iterable<T> iterable) {
synchronized (this) {
lock.lock();
try {
for (T record : iterable) {
super.offer(record);
}
} finally {
lock.unlock();
}
}

Expand All @@ -36,16 +42,22 @@ public boolean add(T item) {

@Override
public boolean offer(T item) {
synchronized (this) {
lock.lock();
try {
return super.offer(item);
} finally {
lock.unlock();
}
}

@Override
public T poll() {
T record;
synchronized (this) {
lock.lock();
try {
record = super.poll();
} finally {
lock.unlock();
}
return record;
}
Expand All @@ -62,15 +74,21 @@ public boolean remove(Object o) {

@Override
public int size() {
synchronized (this) {
lock.lock();
try {
return super.size();
} finally {
lock.unlock();
}
}

@Override
public void clear() {
synchronized (this) {
lock.lock();
try {
super.clear();
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -35,70 +36,88 @@ public class KafkaTransactionsImpl<T> extends MutinyEmitterImpl<T> implements Ka

private volatile Transaction<?> currentTransaction;

private final ReentrantLock lock = new ReentrantLock();

public KafkaTransactionsImpl(EmitterConfiguration config, long defaultBufferSize, KafkaClientService clientService) {
super(config, defaultBufferSize);
this.clientService = clientService;
this.producer = clientService.getProducer(config.name());
}

@Override
public synchronized boolean isTransactionInProgress() {
return currentTransaction != null;
public boolean isTransactionInProgress() {
lock.lock();
try {
return currentTransaction != null;
} finally {
lock.unlock();
}
}

@Override
@CheckReturnValue
public synchronized <R> Uni<R> withTransaction(Function<TransactionalEmitter<T>, Uni<R>> work) {
if (currentTransaction == null) {
return new Transaction<R>().execute(work);
public <R> Uni<R> withTransaction(Function<TransactionalEmitter<T>, Uni<R>> work) {
lock.lock();
try {
if (currentTransaction == null) {
return new Transaction<R>().execute(work);
}
throw KafkaExceptions.ex.transactionInProgress(name);
} finally {
lock.unlock();
}
throw KafkaExceptions.ex.transactionInProgress(name);
}

@SuppressWarnings("rawtypes")
@Override
@CheckReturnValue
public synchronized <R> Uni<R> withTransaction(Message<?> message, Function<TransactionalEmitter<T>, Uni<R>> work) {
String channel;
Map<TopicPartition, OffsetAndMetadata> offsets;

Optional<IncomingKafkaRecordBatchMetadata> batchMetadata = message.getMetadata(IncomingKafkaRecordBatchMetadata.class);
Optional<IncomingKafkaRecordMetadata> recordMetadata = message.getMetadata(IncomingKafkaRecordMetadata.class);
if (batchMetadata.isPresent()) {
IncomingKafkaRecordBatchMetadata<?, ?> metadata = batchMetadata.get();
channel = metadata.getChannel();
offsets = metadata.getOffsets().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue().offset() + 1)));
} else if (recordMetadata.isPresent()) {
IncomingKafkaRecordMetadata<?, ?> metadata = recordMetadata.get();
channel = metadata.getChannel();
offsets = new HashMap<>();
offsets.put(TopicPartitions.getTopicPartition(metadata.getTopic(), metadata.getPartition()),
new OffsetAndMetadata(metadata.getOffset() + 1));
} else {
throw KafkaExceptions.ex.noKafkaMetadataFound(message);
}
public <R> Uni<R> withTransaction(Message<?> message, Function<TransactionalEmitter<T>, Uni<R>> work) {
lock.lock();
try {
String channel;
Map<TopicPartition, OffsetAndMetadata> offsets;

Optional<IncomingKafkaRecordBatchMetadata> batchMetadata = message
.getMetadata(IncomingKafkaRecordBatchMetadata.class);
Optional<IncomingKafkaRecordMetadata> recordMetadata = message.getMetadata(IncomingKafkaRecordMetadata.class);
if (batchMetadata.isPresent()) {
IncomingKafkaRecordBatchMetadata<?, ?> metadata = batchMetadata.get();
channel = metadata.getChannel();
offsets = metadata.getOffsets().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue().offset() + 1)));
} else if (recordMetadata.isPresent()) {
IncomingKafkaRecordMetadata<?, ?> metadata = recordMetadata.get();
channel = metadata.getChannel();
offsets = new HashMap<>();
offsets.put(TopicPartitions.getTopicPartition(metadata.getTopic(), metadata.getPartition()),
new OffsetAndMetadata(metadata.getOffset() + 1));
} else {
throw KafkaExceptions.ex.noKafkaMetadataFound(message);
}

List<KafkaConsumer<Object, Object>> consumers = clientService.getConsumers(channel);
if (consumers.isEmpty()) {
throw KafkaExceptions.ex.unableToFindConsumerForChannel(channel);
} else if (consumers.size() > 1) {
throw KafkaExceptions.ex.exactlyOnceProcessingNotSupported(channel);
}
KafkaConsumer<Object, Object> consumer = consumers.get(0);
if (currentTransaction == null) {
return new Transaction<R>(
/* before commit */
consumer.consumerGroupMetadata()
.chain(groupMetadata -> producer.sendOffsetsToTransaction(offsets, groupMetadata)),
r -> Uni.createFrom().item(r),
VOID_UNI,
/* after abort */
t -> consumer.resetToLastCommittedPositions()
.chain(() -> Uni.createFrom().failure(t)))
.execute(work);
List<KafkaConsumer<Object, Object>> consumers = clientService.getConsumers(channel);
if (consumers.isEmpty()) {
throw KafkaExceptions.ex.unableToFindConsumerForChannel(channel);
} else if (consumers.size() > 1) {
throw KafkaExceptions.ex.exactlyOnceProcessingNotSupported(channel);
}
KafkaConsumer<Object, Object> consumer = consumers.get(0);
if (currentTransaction == null) {
return new Transaction<R>(
/* before commit */
consumer.consumerGroupMetadata()
.chain(groupMetadata -> producer.sendOffsetsToTransaction(offsets, groupMetadata)),
r -> Uni.createFrom().item(r),
VOID_UNI,
/* after abort */
t -> consumer.resetToLastCommittedPositions()
.chain(() -> Uni.createFrom().failure(t)))
.execute(work);
}
throw KafkaExceptions.ex.transactionInProgress(name);
} finally {
lock.unlock();
}
throw KafkaExceptions.ex.transactionInProgress(name);
}

private static final Uni<Void> VOID_UNI = Uni.createFrom().voidItem();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

import org.eclipse.microprofile.reactive.messaging.Message;
Expand All @@ -28,6 +29,8 @@ public abstract class AbstractEmitter<T> implements MessagePublisherProvider<T>
protected final AtomicReference<Throwable> synchronousFailure = new AtomicReference<>();
private final OnOverflow.Strategy overflow;

private final ReentrantLock lock = new ReentrantLock();

@SuppressWarnings("unchecked")
public AbstractEmitter(EmitterConfiguration config, long defaultBufferSize) {
this.name = config.name();
Expand Down Expand Up @@ -55,24 +58,34 @@ public AbstractEmitter(EmitterConfiguration config, long defaultBufferSize) {
}
}

public synchronized void complete() {
MultiEmitter<? super Message<? extends T>> emitter = verify();
if (emitter != null) {
emitter.complete();
public void complete() {
lock.lock();
try {
MultiEmitter<? super Message<? extends T>> emitter = verify();
if (emitter != null) {
emitter.complete();
}
} finally {
lock.unlock();
}
}

public synchronized void error(Exception e) {
public void error(Exception e) {
if (e == null) {
throw ex.illegalArgumentForException("null");
}
MultiEmitter<? super Message<? extends T>> emitter = verify();
if (emitter != null) {
emitter.fail(e);
lock.lock();
try {
MultiEmitter<? super Message<? extends T>> emitter = verify();
if (emitter != null) {
emitter.fail(e);
}
} finally {
lock.unlock();
}
}

public synchronized boolean isCancelled() {
public boolean isCancelled() {
MultiEmitter<? super Message<? extends T>> emitter = internal.get();
return emitter == null || emitter.isCancelled();
}
Expand Down Expand Up @@ -139,29 +152,33 @@ public Publisher<Message<? extends T>> getPublisher() {
return publisher;
}

protected synchronized void emit(Message<? extends T> message) {
protected void emit(Message<? extends T> message) {
if (message == null) {
throw ex.illegalArgumentForNullValue();
}

MultiEmitter<? super Message<? extends T>> emitter = verify();
if (emitter == null) {
if (overflow == OnOverflow.Strategy.DROP) {
// There are no subscribers, but because we use the DROP strategy, just ignore the event.
// However, nack the message, so the sender can be aware of the rejection.
message.nack(NO_SUBSCRIBER_EXCEPTION);
lock.lock();
try {
MultiEmitter<? super Message<? extends T>> emitter = verify();
if (emitter == null) {
if (overflow == OnOverflow.Strategy.DROP) {
// There are no subscribers, but because we use the DROP strategy, just ignore the event.
// However, nack the message, so the sender can be aware of the rejection.
message.nack(NO_SUBSCRIBER_EXCEPTION);
}
return;
}
return;
}
if (synchronousFailure.get() != null) {
throw ex.incomingNotFoundForEmitter(synchronousFailure.get());
}
if (emitter.isCancelled()) {
throw ex.illegalStateForDownstreamCancel();
}
emitter.emit(message);
if (synchronousFailure.get() != null) {
throw ex.illegalStateForEmitterWhileEmitting(synchronousFailure.get());
if (synchronousFailure.get() != null) {
throw ex.incomingNotFoundForEmitter(synchronousFailure.get());
}
if (emitter.isCancelled()) {
throw ex.illegalStateForDownstreamCancel();
}
emitter.emit(message);
if (synchronousFailure.get() != null) {
throw ex.illegalStateForEmitterWhileEmitting(synchronousFailure.get());
}
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public EmitterImpl(EmitterConfiguration config, long defaultBufferSize) {
}

@Override
public synchronized CompletionStage<Void> send(T payload) {
public CompletionStage<Void> send(T payload) {
if (payload == null) {
throw ex.illegalArgumentForNullValue();
}
Expand All @@ -40,7 +40,7 @@ public synchronized CompletionStage<Void> send(T payload) {
}

@Override
public synchronized <M extends Message<? extends T>> void send(M msg) {
public <M extends Message<? extends T>> void send(M msg) {
if (msg == null) {
throw ex.illegalArgumentForNullValue();
}
Expand Down
Loading
Loading