Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CLN] Refactor log materializer to not need to pass in offset id explicitly for readers #2354

Merged
merged 4 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl CompactionManager {
dispatcher.clone(),
None,
None,
Arc::new(AtomicU32::new(1)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is the default value that we are passing in case the segment is uninitialized. Previously we were passing the next_offset_id (as opposed to curr_max_offset_id). I changed that to pass curr_max_offset_id now.

Arc::new(AtomicU32::new(0)),
);

match orchestrator.run().await {
Expand Down
8 changes: 1 addition & 7 deletions rust/worker/src/execution/operators/brute_force_knn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,7 @@ impl Operator<BruteForceKnnOperatorInput, BruteForceKnnOperatorOutput> for Brute
}
}
};
// We use this really confusing pattern found in the rest of the code
// Where we create an effectively unused offset id for the read path
// This is very odd and confusing, and should be refactored, for now
// we just replicate the pattern here.
let offset_id = Arc::new(AtomicU32::new(1));
let log_materializer =
LogMaterializer::new(record_segment_reader, input.log.clone(), offset_id);
let log_materializer = LogMaterializer::new(record_segment_reader, input.log.clone(), None);
let logs = match log_materializer.materialize().await {
Ok(logs) => logs,
Err(e) => {
Expand Down
4 changes: 1 addition & 3 deletions rust/worker/src/execution/operators/count_records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,7 @@ mod tests {
};
}
};
let curr_max_offset_id = Arc::new(AtomicU32::new(1));
let materializer =
LogMaterializer::new(record_segment_reader, data, curr_max_offset_id);
let materializer = LogMaterializer::new(record_segment_reader, data, None);
let mat_records = materializer
.materialize()
.await
Expand Down
16 changes: 3 additions & 13 deletions rust/worker/src/execution/operators/merge_metadata_results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,8 @@ impl Operator<MergeMetadataResultsOperatorInput, MergeMetadataResultsOperatorOut
};

// Step 1: Materialize the logs.
let mut offset_id = Arc::new(AtomicU32::new(1));
match record_segment_reader.as_ref() {
Some(reader) => {
offset_id = reader.get_current_max_offset_id();
offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
None => {}
};
let materializer =
LogMaterializer::new(record_segment_reader, input.filtered_log.clone(), offset_id);
LogMaterializer::new(record_segment_reader, input.filtered_log.clone(), None);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why were we loading it before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to read the curr_max_offset_id from the record segment. For readers, the onus of doing that has shifted to the materializer now. And writers need to still pass it explicitly since all the writer threads need to share the same curr_max_offset_id.

let mat_records = match materializer.materialize().await {
Ok(records) => records,
Err(e) => {
Expand Down Expand Up @@ -490,8 +482,7 @@ mod test {
};
}
};
let materializer =
LogMaterializer::new(record_segment_reader, data, Arc::new(AtomicU32::new(1)));
let materializer = LogMaterializer::new(record_segment_reader, data, None);
let mat_records = materializer
.materialize()
.await
Expand Down Expand Up @@ -779,8 +770,7 @@ mod test {
};
}
};
let materializer =
LogMaterializer::new(record_segment_reader, data, Arc::new(AtomicU32::new(1)));
let materializer = LogMaterializer::new(record_segment_reader, data, None);
let mat_records = materializer
.materialize()
.await
Expand Down
19 changes: 4 additions & 15 deletions rust/worker/src/execution/operators/metadata_filtering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,8 @@ impl Operator<MetadataFilteringInput, MetadataFilteringOutput> for MetadataFilte
}
};
// Step 1: Materialize the logs.
let mut offset_id = Arc::new(AtomicU32::new(1));
match record_segment_reader.as_ref() {
Some(reader) => {
offset_id = reader.get_current_max_offset_id();
offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
None => {}
};
let materializer =
LogMaterializer::new(record_segment_reader, input.log_record.clone(), offset_id);
LogMaterializer::new(record_segment_reader, input.log_record.clone(), None);
let mat_records = match materializer.materialize().await {
Ok(records) => records,
Err(e) => {
Expand Down Expand Up @@ -851,8 +843,7 @@ mod test {
};
}
};
let materializer =
LogMaterializer::new(record_segment_reader, data, Arc::new(AtomicU32::new(1)));
let materializer = LogMaterializer::new(record_segment_reader, data, None);
let mat_records = materializer
.materialize()
.await
Expand Down Expand Up @@ -1064,8 +1055,7 @@ mod test {
};
}
};
let materializer =
LogMaterializer::new(record_segment_reader, data, Arc::new(AtomicU32::new(1)));
let materializer = LogMaterializer::new(record_segment_reader, data, None);
let mat_records = materializer
.materialize()
.await
Expand Down Expand Up @@ -1248,8 +1238,7 @@ mod test {
};
}
};
let materializer =
LogMaterializer::new(record_segment_reader, data, Arc::new(AtomicU32::new(1)));
let materializer = LogMaterializer::new(record_segment_reader, data, None);
let mat_records = materializer
.materialize()
.await
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/write_segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl Operator<WriteSegmentsInput, WriteSegmentsOutput> for WriteSegmentsOperator
let materializer = LogMaterializer::new(
record_segment_reader,
input.chunk.clone(),
input.offset_id.clone(),
Some(input.offset_id.clone()),
);
// Materialize the logs.
let res = match materializer.materialize().await {
Expand Down
6 changes: 3 additions & 3 deletions rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,10 +390,10 @@ impl CompactOrchestrator {
match RecordSegmentReader::from_segment(record_segment, &self.blockfile_provider).await {
Ok(reader) => {
self.curr_max_offset_id = reader.get_current_max_offset_id();
self.curr_max_offset_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
Err(_) => {}
Err(_) => {
self.curr_max_offset_id = Arc::new(AtomicU32::new(0));
}
};
self.record_segment = Some(record_segment.clone()); // auto deref.

Expand Down
45 changes: 33 additions & 12 deletions rust/worker/src/segment/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,24 +260,48 @@ pub(crate) struct LogMaterializer<'me> {
// Is None when record segment is uninitialized.
pub(crate) record_segment_reader: Option<RecordSegmentReader<'me>>,
pub(crate) logs: Chunk<LogRecord>,
pub(crate) offset_id: Arc<AtomicU32>,
// Is None for readers. In that case, the materializer reads
// the current maximum from the record segment and uses that
// for materializing. Writers pass this value to the materializer
// because they need to share this across all log partitions.
pub(crate) curr_offset_id: Option<Arc<AtomicU32>>,
}

impl<'me> LogMaterializer<'me> {
pub(crate) fn new(
record_segment_reader: Option<RecordSegmentReader<'me>>,
logs: Chunk<LogRecord>,
offset_id: Arc<AtomicU32>,
curr_offset_id: Option<Arc<AtomicU32>>,
) -> Self {
Self {
record_segment_reader,
logs,
offset_id,
curr_offset_id,
}
}
pub(crate) async fn materialize(
&'me self,
) -> Result<Chunk<MaterializedLogRecord<'me>>, LogMaterializerError> {
let next_offset_id;
match self.curr_offset_id.as_ref() {
Some(curr_offset_id) => {
next_offset_id = curr_offset_id.clone();
next_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
None => {
match self.record_segment_reader.as_ref() {
Some(reader) => {
next_offset_id = reader.get_current_max_offset_id();
next_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
// This means that the segment is uninitialized so counting starts
// from 1.
None => {
next_offset_id = Arc::new(AtomicU32::new(1));
}
};
}
}
// Populate entries that are present in the record segment.
let mut existing_id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new();
let mut new_id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new();
Expand Down Expand Up @@ -327,9 +351,8 @@ impl<'me> LogMaterializer<'me> {
if !existing_id_to_materialized.contains_key(log_record.record.id.as_str())
&& !new_id_to_materialized.contains_key(log_record.record.id.as_str())
{
let next_offset_id = self
.offset_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let next_offset_id =
next_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let materialized_record = match MaterializedLogRecord::try_from((
&log_record.record,
next_offset_id,
Expand Down Expand Up @@ -463,9 +486,8 @@ impl<'me> LogMaterializer<'me> {
record_from_map.final_operation = Operation::Add;
} else {
// Insert.
let next_offset_id = self
.offset_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let next_offset_id =
next_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let materialized_record = match MaterializedLogRecord::try_from((
&log_record.record,
next_offset_id,
Expand Down Expand Up @@ -619,8 +641,7 @@ mod tests {
};
}
};
let materializer =
LogMaterializer::new(record_segment_reader, data, Arc::new(AtomicU32::new(1)));
let materializer = LogMaterializer::new(record_segment_reader, data, None);
let mat_records = materializer
.materialize()
.await
Expand Down Expand Up @@ -685,7 +706,7 @@ mod tests {
let materializer = LogMaterializer {
record_segment_reader: Some(reader),
logs: data,
offset_id: Arc::new(AtomicU32::new(3)),
curr_offset_id: None,
};
let res = materializer
.materialize()
Expand Down
Loading