Skip to content

Commit

Permalink
refactor: Remove state reporter for local mode (#8477)
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Mar 10, 2023
1 parent eb24cda commit 70f46f1
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 32 deletions.
3 changes: 1 addition & 2 deletions src/batch/src/rpc/service/task_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ impl TaskService for BatchServiceImpl {
let task = BatchTaskExecution::new(&task_id, plan, context, epoch, self.mgr.runtime())?;
let task = Arc::new(task);
let (tx, rx) = tokio::sync::mpsc::channel(LOCAL_EXECUTE_BUFFER_SIZE);
let state_reporter = StateReporter::new_with_local_sender(tx.clone());
if let Err(e) = task.clone().async_execute(state_reporter).await {
if let Err(e) = task.clone().async_execute(None).await {
error!(
"failed to build executors and trigger execution of Task {:?}: {}",
task_id, e
Expand Down
46 changes: 17 additions & 29 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,12 @@ use task_stats_alloc::{TaskLocalBytesAllocated, BYTES_ALLOCATED};
use tokio::runtime::Runtime;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio_metrics::TaskMonitor;
use tonic::Status;

use crate::error::BatchError::{Aborted, SenderError};
use crate::error::{to_rw_error, BatchError, Result as BatchResult};
use crate::executor::{BoxedExecutor, ExecutorBuilder};
use crate::rpc::service::exchange::ExchangeWriter;
use crate::rpc::service::task_service::{GetDataResponseResult, TaskInfoResponseResult};
use crate::rpc::service::task_service::TaskInfoResponseResult;
use crate::task::channel::{create_output_channel, ChanReceiverImpl, ChanSenderImpl};
use crate::task::BatchTaskContext;

Expand Down Expand Up @@ -96,33 +95,18 @@ where
/// effect. Local sender only report Failed update, Distributed sender will also report
/// Finished/Pending/Starting/Aborted etc.
pub enum StateReporter {
Local(tokio::sync::mpsc::Sender<GetDataResponseResult>),
Distributed(tokio::sync::mpsc::Sender<TaskInfoResponseResult>),
Mock(),
}

impl StateReporter {
pub async fn send(&mut self, val: TaskInfoResponse) -> BatchResult<()> {
match self {
Self::Local(s) => {
// A hack here to convert task failure message to data error
match val.task_status() {
TaskStatus::Failed => s
.send(Err(Status::internal(val.error_message)))
.await
.map_err(|_| SenderError),
_ => Ok(()),
}
}
Self::Distributed(s) => s.send(Ok(val)).await.map_err(|_| SenderError),
Self::Mock() => Ok(()),
}
}

pub fn new_with_local_sender(s: tokio::sync::mpsc::Sender<GetDataResponseResult>) -> Self {
Self::Local(s)
}

pub fn new_with_dist_sender(s: tokio::sync::mpsc::Sender<TaskInfoResponseResult>) -> Self {
Self::Distributed(s)
}
Expand Down Expand Up @@ -355,7 +339,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
/// hash partitioned across multiple channels.
/// To obtain the result, one must pick one of the channels to consume via [`TaskOutputId`]. As
/// such, parallel consumers are able to consume the result independently.
pub async fn async_execute(self: Arc<Self>, state_tx: StateReporter) -> Result<()> {
pub async fn async_execute(self: Arc<Self>, state_tx: Option<StateReporter>) -> Result<()> {
let mut state_tx = state_tx;
trace!(
"Prepare executing plan [{:?}]: {}",
Expand All @@ -382,7 +366,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
// After we init the output receivers, it's must safe to schedule next stage -- able to send
// TaskStatus::Running here.
// Init the state receivers. Swap out later.
self.change_state_notify(TaskStatus::Running, &mut state_tx, None)
self.change_state_notify(TaskStatus::Running, state_tx.as_mut(), None)
.await?;

// Clone `self` to make compiler happy because of the move block.
Expand All @@ -398,7 +382,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
let task = |task_id: TaskId| async move {
// We should only pass a reference of sender to execution because we should only
// close it after task error has been set.
t_1.run(exec, sender, shutdown_rx, &mut state_tx)
t_1.run(exec, sender, shutdown_rx, state_tx.as_mut())
.in_span({
let mut span = Span::enter_with_local_parent("batch_execute");
span.add_property(|| ("task_id", task_id.task_id.to_string()));
Expand Down Expand Up @@ -470,18 +454,22 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
pub async fn change_state_notify(
&self,
task_status: TaskStatus,
state_tx: &mut StateReporter,
state_tx: Option<&mut StateReporter>,
err_str: Option<String>,
) -> BatchResult<()> {
self.change_state(task_status);
// Notify frontend the task status.
state_tx
.send(TaskInfoResponse {
task_id: Some(self.task_id.to_prost()),
task_status: task_status.into(),
error_message: err_str.unwrap_or("".to_string()),
})
.await
if let Some(reporter) = state_tx {
reporter
.send(TaskInfoResponse {
task_id: Some(self.task_id.to_prost()),
task_status: task_status.into(),
error_message: err_str.unwrap_or("".to_string()),
})
.await
} else {
Ok(())
}
}

pub fn change_state(&self, task_status: TaskStatus) {
Expand All @@ -493,7 +481,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
root: BoxedExecutor,
mut sender: ChanSenderImpl,
mut shutdown_rx: Receiver<String>,
state_tx: &mut StateReporter,
state_tx: Option<&mut StateReporter>,
) {
let mut data_chunk_stream = root.execute();
let mut state;
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/task/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl BatchManager {
))
.into())
};
task.async_execute(state_reporter).await?;
task.async_execute(Some(state_reporter)).await?;
ret
}

Expand Down

0 comments on commit 70f46f1

Please sign in to comment.