diff --git a/dashboard/proto/gen/stream_plan.ts b/dashboard/proto/gen/stream_plan.ts index 8589cecfcd92..692e49e9fc21 100644 --- a/dashboard/proto/gen/stream_plan.ts +++ b/dashboard/proto/gen/stream_plan.ts @@ -693,8 +693,16 @@ export interface ChainNode { tableId: number; /** The schema of input stream, which will be used to build a MergeNode */ upstreamFields: Field[]; - /** Which columns from upstream are used in this Chain node. */ + /** + * The columns from the upstream table to output. + * TODO: rename this field. + */ upstreamColumnIndices: number[]; + /** + * The columns from the upstream table that'll be internally required by this chain node. + * TODO: This is currently only used by backfill table scan. We should also apply it to the upstream dispatcher (#4529). + */ + upstreamColumnIds: number[]; /** * Generally, the barrier needs to be rearranged during the MV creation process, so that data can * be flushed to shared buffer periodically, instead of making the first epoch from batch query extra @@ -3111,6 +3119,7 @@ function createBaseChainNode(): ChainNode { tableId: 0, upstreamFields: [], upstreamColumnIndices: [], + upstreamColumnIds: [], chainType: ChainType.CHAIN_UNSPECIFIED, isSingleton: false, tableDesc: undefined, @@ -3127,6 +3136,9 @@ export const ChainNode = { upstreamColumnIndices: Array.isArray(object?.upstreamColumnIndices) ? object.upstreamColumnIndices.map((e: any) => Number(e)) : [], + upstreamColumnIds: Array.isArray(object?.upstreamColumnIds) + ? object.upstreamColumnIds.map((e: any) => Number(e)) + : [], chainType: isSet(object.chainType) ? chainTypeFromJSON(object.chainType) : ChainType.CHAIN_UNSPECIFIED, isSingleton: isSet(object.isSingleton) ? Boolean(object.isSingleton) : false, tableDesc: isSet(object.tableDesc) ? StorageTableDesc.fromJSON(object.tableDesc) : undefined, @@ -3146,6 +3158,11 @@ export const ChainNode = { } else { obj.upstreamColumnIndices = []; } + if (message.upstreamColumnIds) { + obj.upstreamColumnIds = message.upstreamColumnIds.map((e) => Math.round(e)); + } else { + obj.upstreamColumnIds = []; + } message.chainType !== undefined && (obj.chainType = chainTypeToJSON(message.chainType)); message.isSingleton !== undefined && (obj.isSingleton = message.isSingleton); message.tableDesc !== undefined && @@ -3158,6 +3175,7 @@ export const ChainNode = { message.tableId = object.tableId ?? 0; message.upstreamFields = object.upstreamFields?.map((e) => Field.fromPartial(e)) || []; message.upstreamColumnIndices = object.upstreamColumnIndices?.map((e) => e) || []; + message.upstreamColumnIds = object.upstreamColumnIds?.map((e) => e) || []; message.chainType = object.chainType ?? ChainType.CHAIN_UNSPECIFIED; message.isSingleton = object.isSingleton ?? false; message.tableDesc = (object.tableDesc !== undefined && object.tableDesc !== null) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 0adea7046f81..e4b849786bf2 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -389,9 +389,11 @@ message ChainNode { uint32 table_id = 1; // The schema of input stream, which will be used to build a MergeNode repeated plan_common.Field upstream_fields = 2; - // Which columns from upstream are used in this Chain node. + // The columns from the upstream table to output. + // TODO: rename this field. repeated uint32 upstream_column_indices = 3; - // TODO + // The columns from the upstream table that'll be internally required by this chain node. + // TODO: This is currently only used by backfill table scan. We should also apply it to the upstream dispatcher (#4529). repeated int32 upstream_column_ids = 8; // Generally, the barrier needs to be rearranged during the MV creation process, so that data can // be flushed to shared buffer periodically, instead of making the first epoch from batch query extra