Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): support different snapshot for streaming jobs #15896

Merged
merged 47 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
f587d0d
feat(storage): register member table id at commit epoch
wenym1 Mar 21, 2024
c802026
fix
wenym1 Mar 21, 2024
1216fcf
fix compile
wenym1 Mar 21, 2024
ffa06a4
Merge branch 'main' into yiming/register-hummock-table-in-commit
wenym1 Mar 21, 2024
c1188df
unregister table id in barrier command
wenym1 Mar 22, 2024
c091e59
fix compile
wenym1 Mar 22, 2024
5ea467e
feat(storage): support different snapshot for streaming jobs
wenym1 Mar 23, 2024
5d291f0
unregister member table id at commit_epoch
wenym1 Mar 23, 2024
b84b211
impl apply_version_delta
wenym1 Mar 23, 2024
80ee13e
fill backward compatibility snapshot group
wenym1 Mar 23, 2024
ad6fa20
Merge branch 'yiming/register-hummock-table-in-commit' into yiming/hu…
wenym1 Mar 23, 2024
f1b47b2
register snapshot group in commit epoch
wenym1 Mar 24, 2024
4025d64
impl purge
wenym1 Mar 25, 2024
0e50f63
fix
wenym1 Mar 25, 2024
e0b0697
fix compile
wenym1 Mar 25, 2024
a493db6
fix
wenym1 Mar 25, 2024
a03260f
Merge branch 'main' into yiming/register-hummock-table-in-commit
wenym1 Mar 25, 2024
f89ecd1
Merge branch 'yiming/register-hummock-table-in-commit' into yiming/hu…
wenym1 Mar 25, 2024
dbdd475
fix comment
wenym1 Mar 25, 2024
8318f5f
Merge branch 'yiming/register-hummock-table-in-commit' into yiming/hu…
wenym1 Mar 25, 2024
beb78a5
rename
wenym1 Mar 25, 2024
af38f68
fix compile
wenym1 Mar 25, 2024
778080b
impl system table
wenym1 Mar 25, 2024
5860e14
no extra info in create command
wenym1 Mar 25, 2024
f4dd843
unregister table id separately
wenym1 Mar 26, 2024
f934ba8
Merge branch 'yiming/register-hummock-table-in-commit' into yiming/hu…
wenym1 Mar 26, 2024
a8bf79d
refine
wenym1 Mar 26, 2024
e94774c
Merge branch 'main' into yiming/register-hummock-table-in-commit
wenym1 Mar 26, 2024
7cdca7b
Merge branch 'yiming/register-hummock-table-in-commit' into yiming/hu…
wenym1 Mar 26, 2024
2a41e01
add more panic log
wenym1 Mar 26, 2024
ef3af37
Merge branch 'main' into yiming/hummock-snapshot-group
wenym1 Mar 29, 2024
6dbaebb
add prev id assertion
wenym1 Mar 29, 2024
b980553
Merge branch 'main' into yiming/hummock-snapshot-group
wenym1 Mar 29, 2024
8000937
fix
wenym1 Mar 29, 2024
d53a32d
rename
wenym1 Mar 29, 2024
fd993d3
Merge branch 'main' into yiming/hummock-snapshot-group
wenym1 Apr 8, 2024
fb87284
Merge branch 'main' into yiming/hummock-snapshot-group
wenym1 Apr 10, 2024
28c50b5
Merge branch 'main' into yiming/hummock-snapshot-group
wenym1 May 7, 2024
307340d
Merge commit 'dac892e9777d4c249da3ae81fdf7b2d3d1218ae7' into yiming/h…
wenym1 May 21, 2024
11e7198
Merge branch 'main' into yiming/hummock-snapshot-group
wenym1 May 21, 2024
e57ed35
Merge branch 'main' into yiming/hummock-snapshot-group
wenym1 May 24, 2024
37df492
per table snapshot
wenym1 May 28, 2024
ed2b4cc
extract state table info struct
wenym1 Jun 5, 2024
8735ebb
fmt
wenym1 Jun 5, 2024
fc60078
Merge branch 'main' into yiming/hummock-snapshot-group
wenym1 Jun 5, 2024
7c97d95
remove log
wenym1 Jun 5, 2024
b717e7c
fix compile
wenym1 Jun 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,16 @@ message TableChangeLog {
repeated EpochNewChangeLog change_logs = 1;
}

message StateTableInfo {
uint64 committed_epoch = 1;
uint64 safe_epoch = 2;
}

message StateTableInfoDelta {
uint64 committed_epoch = 1;
uint64 safe_epoch = 2;
}

message HummockVersion {
message Levels {
repeated Level levels = 1;
Expand All @@ -166,6 +176,7 @@ message HummockVersion {
uint64 safe_epoch = 4;
map<uint32, TableWatermarks> table_watermarks = 5;
map<uint32, TableChangeLog> table_change_logs = 6;
map<uint32, StateTableInfo> state_table_info = 7;
}

message HummockVersionDelta {
Expand All @@ -191,6 +202,7 @@ message HummockVersionDelta {
uint64 truncate_epoch = 2;
}
map<uint32, ChangeLogDelta> change_log_delta = 10;
map<uint32, StateTableInfoDelta> state_table_info_delta = 11;
}

message HummockVersionDeltas {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,27 @@ async fn read_hummock_table_watermarks(
})
.collect())
}

#[derive(Fields)]
struct RwHummockSnapshot {
#[primary_key]
table_id: i32,
safe_epoch: i64,
committed_epoch: i64,
}

#[system_catalog(table, "rw_catalog.rw_hummock_snapshot")]
async fn read_hummock_snapshot_groups(
reader: &SysCatalogReaderImpl,
) -> Result<Vec<RwHummockSnapshot>> {
let version = reader.meta_client.get_hummock_current_version().await?;
Ok(version
.state_table_info
.iter()
.map(|(table_id, info)| RwHummockSnapshot {
table_id: table_id.table_id as _,
committed_epoch: info.committed_epoch as _,
safe_epoch: info.safe_epoch as _,
})
.collect())
}
5 changes: 5 additions & 0 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,11 @@ pub async fn start_service_as_election_leader(
.unwrap(),
);

hummock_manager
.may_fill_backward_state_table_info()
.await
.unwrap();

// Initialize services.
let backup_manager = BackupManager::new(
env.clone(),
Expand Down
33 changes: 31 additions & 2 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};

use itertools::Itertools;
use risingwave_common::catalog::TableId;
Expand All @@ -27,7 +27,9 @@ use risingwave_hummock_sdk::{
use risingwave_pb::hummock::compact_task::{self};
use risingwave_pb::hummock::group_delta::DeltaType;
use risingwave_pb::hummock::hummock_version_delta::ChangeLogDelta;
use risingwave_pb::hummock::{GroupDelta, GroupMetaChange, HummockSnapshot, IntraLevelDelta};
use risingwave_pb::hummock::{
GroupDelta, GroupMetaChange, HummockSnapshot, IntraLevelDelta, StateTableInfoDelta,
};

use crate::hummock::error::{Error, Result};
use crate::hummock::manager::transaction::{
Expand Down Expand Up @@ -140,8 +142,11 @@ impl HummockManager {
.latest_version()
.build_compaction_group_info();

let mut new_table_ids = None;

// Add new table
if let Some(new_fragment_table_info) = new_table_fragment_info {
let new_table_ids = new_table_ids.insert(HashSet::new());
if !new_fragment_table_info.internal_table_ids.is_empty() {
if let Some(levels) = new_version_delta
.latest_version()
Expand Down Expand Up @@ -178,6 +183,7 @@ impl HummockManager {
for table_id in &new_fragment_table_info.internal_table_ids {
table_compaction_group_mapping
.insert(*table_id, StaticCompactionGroupId::StateDefault as u64);
new_table_ids.insert(*table_id);
}
}

Expand Down Expand Up @@ -208,6 +214,7 @@ impl HummockManager {
});
let _ = table_compaction_group_mapping
.insert(table_id, StaticCompactionGroupId::MaterializedView as u64);
new_table_ids.insert(table_id);
}
}

Expand Down Expand Up @@ -329,6 +336,28 @@ impl HummockManager {
group_deltas.push(group_delta);
}

// update state table info
new_version_delta.with_latest_version(|version, delta| {
for table_id in new_table_ids
.into_iter()
.flat_map(|ids| ids.into_iter().map(|table_id| table_id.table_id))
.chain(
version
.levels
.values()
.flat_map(|group| group.member_table_ids.iter().cloned()),
)
{
delta.state_table_info_delta.insert(
TableId::new(table_id),
StateTableInfoDelta {
committed_epoch: epoch,
safe_epoch: version.safe_epoch,
},
);
}
});

new_version_delta.pre_apply();

// Apply stats changes.
Expand Down
18 changes: 17 additions & 1 deletion src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use risingwave_pb::hummock::subscribe_compaction_event_response::{
use risingwave_pb::hummock::{
compact_task, CompactStatus as PbCompactStatus, CompactTask, CompactTaskAssignment,
CompactionConfig, GroupDelta, InputLevel, IntraLevelDelta, Level, SstableInfo,
SubscribeCompactionEventRequest, TableOption, TableSchema,
StateTableInfoDelta, SubscribeCompactionEventRequest, TableOption, TableSchema,
};
use rw_futures_util::pending_on_none;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -171,6 +171,22 @@ impl<'a> HummockVersionTransaction<'a> {
version_delta.latest_version().safe_epoch,
compact_task.watermark,
);
if version_delta.latest_version().safe_epoch < version_delta.safe_epoch {
version_delta.state_table_info_delta = version_delta
.latest_version()
.state_table_info
.iter()
.map(|(table_id, info)| {
(
*table_id,
StateTableInfoDelta {
committed_epoch: info.committed_epoch,
safe_epoch: version_delta.safe_epoch,
},
)
})
.collect();
}
version_delta.pre_apply();
}
}
Expand Down
40 changes: 24 additions & 16 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ use risingwave_pb::hummock::subscribe_compaction_event_request::ReportTask;
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
compact_task, CompactionConfig, CompactionGroupInfo, CompatibilityVersion, GroupConstruct,
GroupDelta, GroupDestroy, GroupMetaChange,
GroupDelta, GroupDestroy, GroupMetaChange, StateTableInfoDelta,
};
use thiserror_ext::AsReport;
use tokio::sync::OnceCell;

use crate::hummock::compaction::compaction_config::{
Expand Down Expand Up @@ -120,7 +119,7 @@ impl HummockManager {
CompactionGroupId::from(StaticCompactionGroupId::StateDefault),
));
}
self.register_table_ids(&pairs).await?;
self.register_table_ids_for_test(&pairs).await?;
Ok(pairs.iter().map(|(table_id, ..)| *table_id).collect_vec())
}

Expand All @@ -130,13 +129,14 @@ impl HummockManager {
&self,
table_fragments: &[crate::model::TableFragments],
) {
self.unregister_table_ids_fail_fast(
self.unregister_table_ids(
&table_fragments
.iter()
.flat_map(|t| t.all_table_ids())
.collect_vec(),
)
.await;
.await
.unwrap();
}

/// Unregisters stale members and groups
Expand All @@ -155,7 +155,10 @@ impl HummockManager {
}

/// The implementation acquires `versioning` lock.
pub async fn register_table_ids(
///
/// The method name is temporarily added with a `_for_test` prefix to mark
/// that it's currently only used in test.
pub async fn register_table_ids_for_test(
&self,
pairs: &[(StateTableId, CompactionGroupId)],
) -> Result<()> {
Expand Down Expand Up @@ -185,6 +188,10 @@ impl HummockManager {
&self.metrics,
);
let mut new_version_delta = version.new_delta();
let (committed_epoch, safe_epoch) = {
let version = new_version_delta.latest_version();
(version.max_committed_epoch, version.safe_epoch)
};

for (table_id, raw_group_id) in pairs {
let mut group_id = *raw_group_id;
Expand Down Expand Up @@ -235,6 +242,16 @@ impl HummockManager {
..Default::default()
})),
});
assert!(new_version_delta
.state_table_info_delta
.insert(
TableId::new(*table_id),
StateTableInfoDelta {
committed_epoch,
safe_epoch,
}
)
.is_none());
}
new_version_delta.pre_apply();
commit_multi_var!(self.meta_store_ref(), version)?;
Expand Down Expand Up @@ -290,7 +307,7 @@ impl HummockManager {
);
new_version_delta
.removed_table_ids
.push(TableId::new(*table_id));
.insert(TableId::new(*table_id));
}

let groups_to_remove = modified_groups
Expand Down Expand Up @@ -337,15 +354,6 @@ impl HummockManager {
Ok(())
}

/// The implementation acquires `versioning` lock and `compaction_group_manager` lock.
pub async fn unregister_table_ids_fail_fast(&self, table_ids: &[StateTableId]) {
self.unregister_table_ids(table_ids)
.await
.unwrap_or_else(|e| {
panic!("unregister table ids fail: {table_ids:?} {}", e.as_report())
});
}

pub async fn update_compaction_config(
&self,
compaction_group_ids: &[CompactionGroupId],
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ impl HummockManager {
.init_compaction_config_for_replay(group.id, group_config)
.await
.unwrap();
self.register_table_ids(&pairs).await?;
self.register_table_ids_for_test(&pairs).await?;
tracing::info!("Registered table ids {:?}", pairs);
}

Expand Down
Loading
Loading