Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring of the Pub/Sub Ordering keys client #4962

Merged
merged 1 commit into from
Apr 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,30 +234,30 @@ public ApiFuture<String> publish(PubsubMessage message) {
messagesBatchLock.lock();
try {
// Check if the next message makes the current batch exceed the max batch byte size.
MessagesBatch batch = messagesBatches.get(orderingKey);
if (batch == null) {
batch = new MessagesBatch(orderingKey);
messagesBatches.put(orderingKey, batch);
MessagesBatch messageBatch = messagesBatches.get(orderingKey);
if (messageBatch == null) {
messageBatch = new MessagesBatch(orderingKey);
messagesBatches.put(orderingKey, messageBatch);
}
if (!batch.isEmpty()
if (!messageBatch.isEmpty()
&& hasBatchingBytes()
&& batch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) {
batchToSend = batch.popOutstandingBatch();
&& messageBatch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) {
batchToSend = messageBatch.popOutstandingBatch();
}

// Border case if the message to send is greater or equals to the max batch size then can't
// be included in the current batch and instead sent immediately.
if (!hasBatchingBytes() || messageSize < getMaxBatchBytes()) {
batch.addMessage(outstandingPublish, messageSize);
messageBatch.addMessage(outstandingPublish, messageSize);
// If after adding the message we have reached the batch max messages then we have a batch
// to send.
if (batch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
batchToSend = batch.popOutstandingBatch();
if (messageBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
batchToSend = messageBatch.popOutstandingBatch();
}
}

// Setup the next duration based delivery alarm if there are messages batched.
if (!batch.isEmpty()) {
if (!messageBatch.isEmpty()) {
setupDurationBasedPublishAlarm();
} else {
messagesBatches.remove(orderingKey);
Expand Down Expand Up @@ -405,7 +405,9 @@ public ApiFuture call() {
}
};
ApiFutures.addCallback(
sequentialExecutor.submit(outstandingBatch.orderingKey, func), futureCallback);
sequentialExecutor.submit(outstandingBatch.orderingKey, func),
futureCallback,
directExecutor());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void cancel(Throwable e) {
}

/** Runs synchronous {@code Runnable} tasks sequentially. */
public void submit(final String key, final Runnable runnable) {
void submit(final String key, final Runnable runnable) {
autoSequentialExecutor.execute(key, runnable);
}

Expand All @@ -128,7 +128,7 @@ enum TaskCompleteAction {
* Creates a AutoSequentialExecutor which executes the next queued task automatically when the
* previous task has completed.
*/
public static SequentialExecutor newAutoSequentialExecutor(Executor executor) {
static SequentialExecutor newAutoSequentialExecutor(Executor executor) {
return new SequentialExecutor(executor, TaskCompleteAction.EXECUTE_NEXT_TASK);
}

Expand All @@ -147,7 +147,7 @@ private SequentialExecutor(Executor executor, TaskCompleteAction taskCompleteAct
this.tasksByKey = new HashMap<>();
}

public void execute(final String key, Runnable task) {
void execute(final String key, Runnable task) {
Deque<Runnable> newTasks;
synchronized (tasksByKey) {
newTasks = tasksByKey.get(key);
Expand Down Expand Up @@ -182,7 +182,7 @@ public void run() {
}

/** Cancels every task in the queue assoicated with {@code key}. */
public void cancelQueuedTasks(final String key, Throwable e) {
void cancelQueuedTasks(final String key, Throwable e) {
// TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked,
// so that no more tasks are scheduled.
synchronized (tasksByKey) {
Expand All @@ -204,7 +204,7 @@ public void cancelQueuedTasks(final String key, Throwable e) {
}

/** Executes the next queued task associated with {@code key}. */
public void resume(final String key) {
void resume(final String key) {
if (taskCompleteAction.equals(TaskCompleteAction.EXECUTE_NEXT_TASK)) {
// resume() is no-op since tasks are executed automatically.
return;
Expand Down