Skip to content

Commit

Permalink
Pulsar transactions batch eop fix
Browse files Browse the repository at this point in the history
Fixes #2245
  • Loading branch information
ozangunalp committed Jul 25, 2024
1 parent 8a7ce52 commit 6ed59de
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public PulsarTransactionsImpl(EmitterConfiguration config, long defaultBufferSiz
this.pulsarClient = pulsarClientService.getClient(config.name());
}

private static boolean isConflict(Throwable throwable) {
return throwable instanceof PulsarClientException.TransactionConflictException
|| throwable.getCause() instanceof PulsarClientException.TransactionConflictException;
}

@Override
public <R> Uni<R> withTransaction(Function<TransactionalEmitter<T>, Uni<R>> work) {
return new PulsarTransactionEmitter<R>().execute(work);
Expand All @@ -57,17 +62,14 @@ public <R> Uni<R> withTransaction(Message<?> message, Function<TransactionalEmit
@Override
public <R> Uni<R> withTransaction(Duration txnTimeout, Message<?> message, Function<TransactionalEmitter<T>, Uni<R>> work) {
return new PulsarTransactionEmitter<R>(txnTimeout,
txn -> Uni.createFrom().completionStage(message.ack()),
txn -> Uni.createFrom().completionStage(message.ack())
.onFailure(PulsarTransactionsImpl::isConflict).recoverWithUni(throwable -> VOID_UNI),
PulsarTransactionsImpl::defaultAfterCommit,
(txn, throwable) -> {
if (!(throwable.getCause() instanceof PulsarClientException.TransactionConflictException)) {
// If TransactionConflictException is not thrown,
// you need to redeliver or negativeAcknowledge this message,
// or else this message will not be received again.
return Uni.createFrom().completionStage(() -> message.nack(throwable));
} else {
return VOID_UNI;
}
// If TransactionConflictException is not thrown,
// you need to redeliver or negativeAcknowledge this message,
// or else this message will not be received again.
return isConflict(throwable) ? VOID_UNI : Uni.createFrom().completionStage(message.nack(throwable));
},
PulsarTransactionsImpl::defaultAfterAbort)
.execute(e -> {
Expand Down Expand Up @@ -200,7 +202,8 @@ private Uni<R> executeInTransaction(Function<TransactionalEmitter<T>, Uni<R>> wo
.onCancellation().call(() -> abort(new RuntimeException("Transaction cancelled")))
// when there was no exception,
// commit or rollback the transaction
.call(() -> abort ? abort(new RuntimeException("Transaction aborted")) : commit())
.call(() -> abort ? abort(new RuntimeException("Transaction aborted"))
: commit().onFailure().call(throwable -> abort(throwable)))
.onFailure().recoverWithUni(throwable -> afterAbort.apply(throwable))
.onItem().transformToUni(result -> afterCommit.apply(result));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

public class PulsarContainer extends GenericContainer<PulsarContainer> {

public static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:3.2.2");
public static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:3.3.0");

public static final String STARTER_SCRIPT = "/run_pulsar.sh";

Expand Down Expand Up @@ -45,6 +45,8 @@ protected void containerIsStarting(InspectContainerResponse containerInfo, boole
command += "export PULSAR_PREFIX_advertisedListeners=" + advertisedListeners + " \n";
command += "export PULSAR_PREFIX_transactionCoordinatorEnabled=true\n";
command += "export PULSAR_PREFIX_systemTopicEnabled=true\n";
command += "export PULSAR_PREFIX_brokerDeduplicationEnabled=true\n";
command += "export PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled=true\n";
command += "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone -nfw -nss";
copyFileToContainer(
Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.PulsarConnector;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage;
import io.smallrye.reactive.messaging.pulsar.PulsarMessage;
import io.smallrye.reactive.messaging.pulsar.TestTags;
import io.smallrye.reactive.messaging.pulsar.base.WeldTestBase;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

Expand Down Expand Up @@ -104,8 +101,6 @@ Uni<Void> process(PulsarIncomingBatchMessage<Integer> batch) {
* There are still duplicate items delivered to the consumer batch after an transaction abort.
*/
@Test
@Tag(TestTags.FLAKY)
@Disabled
void testExactlyOnceProcessorWithProcessingError() throws PulsarAdminException, PulsarClientException {
addBeans(ConsumerConfig.class);
this.inTopic = UUID.randomUUID().toString();
Expand Down Expand Up @@ -137,10 +132,6 @@ void testExactlyOnceProcessorWithProcessingError() throws PulsarAdminException,
.topic(this.inTopic)
.create(), numberOfRecords, (i, producer) -> producer.newMessage().sequenceId(i).value(i).key("k-" + i));

await().untilAsserted(() -> assertThat(app.getProcessed())
.containsAll(IntStream.range(0, numberOfRecords).boxed().collect(Collectors.toList()))
.doesNotHaveDuplicates());

await().untilAsserted(() -> assertThat(list)
.containsAll(IntStream.range(0, numberOfRecords).boxed().collect(Collectors.toList()))
.doesNotHaveDuplicates());
Expand All @@ -166,7 +157,8 @@ private MapBasedConfig consumerConfig() {
.with("mp.messaging.incoming.exactly-once-consumer.enableTransaction", true)
.with("mp.messaging.incoming.exactly-once-consumer.negativeAckRedeliveryDelayMicros", 5000)
.with("mp.messaging.incoming.exactly-once-consumer.schema", "INT32")
.with("mp.messaging.incoming.exactly-once-consumer.batchReceive", true);
.with("mp.messaging.incoming.exactly-once-consumer.batchReceive", true)
.with("mp.messaging.incoming.exactly-once-consumer.batchIndexAckEnable", true);
}

@ApplicationScoped
Expand Down

0 comments on commit 6ed59de

Please sign in to comment.