Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pausable channels #2667

Merged
merged 1 commit into from
Jun 28, 2024
Merged

Pausable channels #2667

merged 1 commit into from
Jun 28, 2024

Conversation

ozangunalp
Copy link
Collaborator

@ozangunalp ozangunalp commented Jun 26, 2024

Adds a new decorator for pausing and accumulating downstream requests to connector channels.
A channel needs to set the pausable flag to true. The pause/resume is controlled by the PausableChannel that is accessible from ChannelRegistry by channel name.

Closes #1649

import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.reactive.messaging.PausableChannel;

public class PausableMulti<T> extends MultiOperator<T, T> implements PausableChannel {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would like to have @jponge review on this one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll check that

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something that could be done is passing that operator through the Reactive Streams (Flow) TCK. I don't think there's anything incorrect in that implementation, but you know... 🤣

cescoffier
cescoffier previously approved these changes Jun 27, 2024
jponge
jponge previously approved these changes Jun 27, 2024
Copy link
Member

@jponge jponge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, I just have a question on unbounded subscriptions and the expectations on it.

}

@Override
public void request(long numberOfItems) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when Long.MAX value is requested? (unbounded subscription)

My understanding is that the pausing mechanism delays upstream requests when paused (aka a kind of passive pausing).

Also this should be documented as per reactive streams expectations.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is much like demand capping. pause/resume won't work if the subscription is unbounded.
How would you define active pausing? To buffer items until resumed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but TBH in this case what you have totally makes sense. Just make sure you document the case of unbounded requests, or even "large" requests 😉 (e.g., Long.MAX_VALUE - 100)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we could've contributed this operator to Mutiny itself, but I couldn't find a Mutiny-like API to give the control of pause/resume. Would you've any ideas on that ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd need an extended subscription to support these additional operations I guess.

import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.reactive.messaging.PausableChannel;

public class PausableMulti<T> extends MultiOperator<T, T> implements PausableChannel {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something that could be done is passing that operator through the Reactive Streams (Flow) TCK. I don't think there's anything incorrect in that implementation, but you know... 🤣

@ozangunalp
Copy link
Collaborator Author

Pushed a small change to pass the flow publisher tck.

```

!!!warning
Pausable channels only work with back-pressure aware subscribers, with bounded downstream requests.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@ozangunalp ozangunalp merged commit 81ccb67 into smallrye:main Jun 28, 2024
4 checks passed
@ozangunalp ozangunalp added this to the 4.22.0 milestone Jul 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Provide a way to pause/resume inbound and outbound traffic (connector-based)
3 participants