Skip to content

Commit

Permalink
feat(streaming): separate BarrierRecv executor (risingwavelabs#8595)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Mar 17, 2023
1 parent 78ddbce commit 1b208bb
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 5 deletions.
49 changes: 47 additions & 2 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ message StreamSource {
string source_name = 8;
}

// The executor only for receiving barrier from the meta service. It always resides in the leaves
// of the streaming graph.
message BarrierRecvNode {}

message SourceNode {
// The source node can contain either a stream source or nothing. So here we extract all
// information about stream source to a message, and here it will be an `Option` in Rust.
Expand Down Expand Up @@ -545,6 +549,7 @@ message StreamNode {
NowNode now = 129;
GroupTopNNode append_only_group_top_n = 130;
TemporalJoinNode temporal_join = 131;
BarrierRecvNode barrier_recv = 132;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down Expand Up @@ -630,8 +635,9 @@ enum FragmentTypeFlag {
SOURCE = 1;
MVIEW = 2;
SINK = 4;
NOW = 8;
NOW = 8; // TODO: Remove this and insert a `BarrierRecv` instead.
CHAIN_NODE = 16;
BARRIER_RECV = 32;
}

// The environment associated with a stream plan
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ pub struct Schema {
}

impl Schema {
pub fn empty() -> &'static Self {
static EMPTY: Schema = Schema { fields: Vec::new() };
&EMPTY
}

pub fn len(&self) -> usize {
self.fields.len()
}
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/stream_fragmenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ fn build_fragment(
) -> Result<StreamNode> {
// Update current fragment based on the node we're visiting.
match stream_node.get_node_body()? {
NodeBody::BarrierRecv(_) => {
current_fragment.fragment_type_mask |= FragmentTypeFlag::BarrierRecv as u32
}

NodeBody::Source(src) => {
current_fragment.fragment_type_mask |= FragmentTypeFlag::Source as u32;
// Note: For creating table with connector, the source id is left with placeholder and
Expand All @@ -248,7 +252,6 @@ fn build_fragment(

NodeBody::TopN(_) => current_fragment.requires_singleton = true,

// FIXME: workaround for single-fragment mview on singleton upstream mview.
NodeBody::Chain(node) => {
current_fragment.fragment_type_mask |= FragmentTypeFlag::ChainNode as u32;
// memorize table id for later use
Expand All @@ -259,6 +262,7 @@ fn build_fragment(
}

NodeBody::Now(_) => {
// TODO: Remove this and insert a `BarrierRecv` instead.
current_fragment.fragment_type_mask |= FragmentTypeFlag::Now as u32;
current_fragment.requires_singleton = true;
}
Expand Down
5 changes: 4 additions & 1 deletion src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,10 @@ impl TableFragments {
/// Returns barrier inject actor ids.
pub fn barrier_inject_actor_ids(&self) -> Vec<ActorId> {
Self::filter_actor_ids(self, |fragment_type_mask| {
(fragment_type_mask & (FragmentTypeFlag::Source as u32 | FragmentTypeFlag::Now as u32))
(fragment_type_mask
& (FragmentTypeFlag::Source as u32
| FragmentTypeFlag::Now as u32
| FragmentTypeFlag::BarrierRecv as u32))
!= 0
})
}
Expand Down
107 changes: 107 additions & 0 deletions src/stream/src/executor/barrier_recv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use futures::StreamExt;
use risingwave_common::catalog::Schema;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio_stream::wrappers::UnboundedReceiverStream;

use super::{
ActorContext, ActorContextRef, Barrier, BoxedMessageStream, Executor, Message, PkIndicesRef,
StreamExecutorError,
};

/// The executor only for receiving barrier from the meta service. It always resides in the leaves
/// of the streaming graph.
pub struct BarrierRecvExecutor {
_ctx: ActorContextRef,
identity: String,

/// The barrier receiver registered in the local barrier manager.
barrier_receiver: UnboundedReceiver<Barrier>,
}

impl BarrierRecvExecutor {
pub fn new(
ctx: ActorContextRef,
barrier_receiver: UnboundedReceiver<Barrier>,
executor_id: u64,
) -> Self {
Self {
_ctx: ctx,
identity: format!("BarrierRecvExecutor {:X}", executor_id),
barrier_receiver,
}
}

pub fn for_test(barrier_receiver: UnboundedReceiver<Barrier>) -> Self {
Self::new(ActorContext::create(0), barrier_receiver, 0)
}
}

impl Executor for BarrierRecvExecutor {
fn execute(self: Box<Self>) -> BoxedMessageStream {
UnboundedReceiverStream::new(self.barrier_receiver)
.map(|barrier| Ok(Message::Barrier(barrier)))
.chain(futures::stream::once(async {
// We do not use the stream termination as the control message, and this line should
// never be reached in normal cases. So we just return an error here.
Err(StreamExecutorError::channel_closed("barrier receiver"))
}))
.boxed()
}

fn schema(&self) -> &Schema {
Schema::empty()
}

fn pk_indices(&self) -> PkIndicesRef<'_> {
&[]
}

fn identity(&self) -> &str {
&self.identity
}
}

#[cfg(test)]
mod tests {
use futures::pin_mut;
use tokio::sync::mpsc;

use super::*;
use crate::executor::test_utils::StreamExecutorTestExt;

#[tokio::test]
async fn test_barrier_recv() {
let (barrier_tx, barrier_rx) = mpsc::unbounded_channel();

let barrier_recv = BarrierRecvExecutor::for_test(barrier_rx).boxed();
let stream = barrier_recv.execute();
pin_mut!(stream);

barrier_tx.send(Barrier::new_test_barrier(114)).unwrap();
barrier_tx.send(Barrier::new_test_barrier(514)).unwrap();

let barrier_1 = stream.next_unwrap_ready_barrier().unwrap();
assert_eq!(barrier_1.epoch.curr, 114);
let barrier_2 = stream.next_unwrap_ready_barrier().unwrap();
assert_eq!(barrier_2.epoch.curr, 514);

stream.next_unwrap_pending();

drop(barrier_tx);
assert!(stream.next_unwrap_ready().is_err());
}
}
2 changes: 2 additions & 0 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub mod monitor;

pub mod agg_common;
pub mod aggregation;
mod barrier_recv;
mod batch_query;
mod chain;
mod dispatch;
Expand Down Expand Up @@ -103,6 +104,7 @@ mod test_utils;
pub use actor::{Actor, ActorContext, ActorContextRef};
use anyhow::Context;
pub use backfill::*;
pub use barrier_recv::BarrierRecvExecutor;
pub use batch_query::BatchQueryExecutor;
pub use chain::ChainExecutor;
pub use dispatch::{DispatchExecutor, DispatcherImpl};
Expand Down
49 changes: 49 additions & 0 deletions src/stream/src/from_proto/barrier_recv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::stream_plan::BarrierRecvNode;
use tokio::sync::mpsc::unbounded_channel;

use super::*;
use crate::executor::BarrierRecvExecutor;

pub struct BarrierRecvExecutorBuilder;

#[async_trait::async_trait]
impl ExecutorBuilder for BarrierRecvExecutorBuilder {
type Node = BarrierRecvNode;

async fn new_boxed_executor(
params: ExecutorParams,
_node: &Self::Node,
_store: impl StateStore,
stream: &mut LocalStreamManagerCore,
) -> StreamResult<BoxedExecutor> {
assert!(
params.input.is_empty(),
"barrier receiver should not have input"
);

let (sender, barrier_receiver) = unbounded_channel();
stream
.context
.lock_barrier_manager()
.register_sender(params.actor_context.id, sender);

Ok(
BarrierRecvExecutor::new(params.actor_context, barrier_receiver, params.executor_id)
.boxed(),
)
}
}
3 changes: 3 additions & 0 deletions src/stream/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Build executor from protobuf.

mod agg_common;
mod barrier_recv;
mod batch_query;
mod chain;
mod dml;
Expand Down Expand Up @@ -51,6 +52,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{StreamNode, TemporalJoinNode};
use risingwave_storage::StateStore;

use self::barrier_recv::*;
use self::batch_query::*;
use self::chain::*;
use self::dml::*;
Expand Down Expand Up @@ -152,5 +154,6 @@ pub async fn create_executor(
NodeBody::RowIdGen => RowIdGenExecutorBuilder,
NodeBody::Now => NowExecutorBuilder,
NodeBody::TemporalJoin => TemporalJoinExecutorBuilder,
NodeBody::BarrierRecv => BarrierRecvExecutorBuilder,
}
}

0 comments on commit 1b208bb

Please sign in to comment.