Skip to content

Commit

Permalink
Merge branch 'main' into add-compute-command-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
hlinnaka committed Jun 26, 2024
2 parents fac383b + 47e5bf3 commit 2e7b2f9
Show file tree
Hide file tree
Showing 32 changed files with 668 additions and 259 deletions.
2 changes: 1 addition & 1 deletion .github/actions/download/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ runs:
TARGET: ${{ inputs.path }}
ARCHIVE: /tmp/downloads/${{ inputs.name }}.tar.zst
SKIP_IF_DOES_NOT_EXIST: ${{ inputs.skip-if-does-not-exist }}
PREFIX: artifacts/${{ inputs.prefix || format('{0}/{1}', github.run_id, github.run_attempt) }}
PREFIX: artifacts/${{ inputs.prefix || format('{0}/{1}/{2}', github.event.pull_request.head.sha || github.sha, github.run_id, github.run_attempt) }}
run: |
BUCKET=neon-github-public-dev
FILENAME=$(basename $ARCHIVE)
Expand Down
4 changes: 2 additions & 2 deletions .github/actions/upload/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ inputs:
description: "A directory or file to upload"
required: true
prefix:
description: "S3 prefix. Default is '${GITHUB_RUN_ID}/${GITHUB_RUN_ATTEMPT}'"
description: "S3 prefix. Default is '${GITHUB_SHA}/${GITHUB_RUN_ID}/${GITHUB_RUN_ATTEMPT}'"
required: false

runs:
Expand Down Expand Up @@ -45,7 +45,7 @@ runs:
env:
SOURCE: ${{ inputs.path }}
ARCHIVE: /tmp/uploads/${{ inputs.name }}.tar.zst
PREFIX: artifacts/${{ inputs.prefix || format('{0}/{1}', github.run_id, github.run_attempt) }}
PREFIX: artifacts/${{ inputs.prefix || format('{0}/{1}/{2}', github.event.pull_request.head.sha || github.sha, github.run_id , github.run_attempt) }}
run: |
BUCKET=neon-github-public-dev
FILENAME=$(basename $ARCHIVE)
Expand Down
3 changes: 2 additions & 1 deletion .neon_clippy_args
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# * `-A unknown_lints` – do not warn about unknown lint suppressions
# that people with newer toolchains might use
# * `-D warnings` - fail on any warnings (`cargo` returns non-zero exit status)
export CLIPPY_COMMON_ARGS="--locked --workspace --all-targets -- -A unknown_lints -D warnings"
# * `-D clippy::todo` - don't let `todo!()` slip into `main`
export CLIPPY_COMMON_ARGS="--locked --workspace --all-targets -- -A unknown_lints -D warnings -D clippy::todo"
6 changes: 0 additions & 6 deletions compute_tools/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,6 @@ pub fn write_postgres_conf(
ComputeMode::Replica => {
// hot_standby is 'on' by default, but let's be explicit
writeln!(file, "hot_standby=on")?;

// Inform the replica about the primary state
// Default is 'false'
if let Some(primary_is_running) = spec.primary_is_running {
writeln!(file, "neon.primary_is_running={}", primary_is_running)?;
}
}
}

Expand Down
1 change: 0 additions & 1 deletion control_plane/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,6 @@ impl Endpoint {
remote_extensions,
pgbouncer_settings: None,
shard_stripe_size: Some(shard_stripe_size),
primary_is_running: None,
};
let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
Expand Down
6 changes: 0 additions & 6 deletions libs/compute_api/src/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ pub struct ComputeSpec {
// Stripe size for pageserver sharding, in pages
#[serde(default)]
pub shard_stripe_size: Option<usize>,

// When we are starting a new replica in hot standby mode,
// we need to know if the primary is running.
// This is used to determine if replica should wait for
// RUNNING_XACTS from primary or not.
pub primary_is_running: Option<bool>,
}

/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
Expand Down
7 changes: 4 additions & 3 deletions libs/pageserver_api/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,9 @@ impl Key {
key
}

/// Convert a 18B slice to a key. This function should not be used for metadata keys because field2 is handled differently.
/// Use [`Key::from_i128`] instead if you want to handle 16B keys (i.e., metadata keys).
/// Convert a 18B slice to a key. This function should not be used for 16B metadata keys because `field2` is handled differently.
/// Use [`Key::from_i128`] instead if you want to handle 16B keys (i.e., metadata keys). There are some restrictions on `field2`,
/// and therefore not all 18B slices are valid page server keys.
pub fn from_slice(b: &[u8]) -> Self {
Key {
field1: b[0],
Expand All @@ -173,7 +174,7 @@ impl Key {
}
}

/// Convert a key to a 18B slice. This function should not be used for metadata keys because field2 is handled differently.
/// Convert a key to a 18B slice. This function should not be used for getting a 16B metadata key because `field2` is handled differently.
/// Use [`Key::to_i128`] instead if you want to get a 16B key (i.e., metadata keys).
pub fn write_to_byte_slice(&self, buf: &mut [u8]) {
buf[0] = self.field1;
Expand Down
2 changes: 2 additions & 0 deletions libs/walproposer/src/walproposer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(clippy::todo)]

use std::ffi::CString;

use crate::{
Expand Down
8 changes: 8 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,14 @@ async fn timeline_compact_handler(
if Some(true) == parse_query_param::<_, bool>(&request, "force_image_layer_creation")? {
flags |= CompactFlags::ForceImageLayerCreation;
}
if Some(true) == parse_query_param::<_, bool>(&request, "enhanced_gc_bottom_most_compaction")? {
if !cfg!(feature = "testing") {
return Err(ApiError::InternalServerError(anyhow!(
"enhanced_gc_bottom_most_compaction is only available in testing mode"
)));
}
flags |= CompactFlags::EnhancedGcBottomMostCompaction;
}
let wait_until_uploaded =
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);

Expand Down
14 changes: 12 additions & 2 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7069,6 +7069,16 @@ mod tests {
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append("@0x30")),
),
(
get_key(3),
Lsn(0x28),
Value::WalRecord(NeonWalRecord::wal_append("@0x28")),
),
(
get_key(3),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append("@0x30")),
),
(
get_key(3),
Lsn(0x40),
Expand Down Expand Up @@ -7128,7 +7138,7 @@ mod tests {
Bytes::from_static(b"value 0@0x10"),
Bytes::from_static(b"value 1@0x10@0x20"),
Bytes::from_static(b"value 2@0x10@0x30"),
Bytes::from_static(b"value 3@0x10@0x40"),
Bytes::from_static(b"value 3@0x10@0x28@0x30@0x40"),
Bytes::from_static(b"value 4@0x10"),
Bytes::from_static(b"value 5@0x10@0x20"),
Bytes::from_static(b"value 6@0x10@0x20"),
Expand All @@ -7141,7 +7151,7 @@ mod tests {
Bytes::from_static(b"value 0@0x10"),
Bytes::from_static(b"value 1@0x10@0x20"),
Bytes::from_static(b"value 2@0x10@0x30"),
Bytes::from_static(b"value 3@0x10"),
Bytes::from_static(b"value 3@0x10@0x28@0x30"),
Bytes::from_static(b"value 4@0x10"),
Bytes::from_static(b"value 5@0x10@0x20"),
Bytes::from_static(b"value 6@0x10@0x20"),
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/block_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ impl<'a> BlockCursor<'a> {
///
/// The file is assumed to be immutable. This doesn't provide any functions
/// for modifying the file, nor for invalidating the cache if it is modified.
#[derive(Clone)]
pub struct FileBlockReader<'a> {
pub file: &'a VirtualFile,

Expand Down
32 changes: 22 additions & 10 deletions pageserver/src/tenant/disk_btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ impl<'a, const L: usize> OnDiskNode<'a, L> {
///
/// Public reader object, to search the tree.
///
#[derive(Clone)]
pub struct DiskBtreeReader<R, const L: usize>
where
R: BlockReader,
Expand Down Expand Up @@ -259,27 +260,38 @@ where
Ok(result)
}

pub fn iter<'a>(
&'a self,
start_key: &'a [u8; L],
ctx: &'a RequestContext,
) -> DiskBtreeIterator<'a> {
pub fn iter<'a>(self, start_key: &'a [u8; L], ctx: &'a RequestContext) -> DiskBtreeIterator<'a>
where
R: 'a,
{
DiskBtreeIterator {
stream: Box::pin(self.get_stream_from(start_key, ctx)),
stream: Box::pin(self.into_stream(start_key, ctx)),
}
}

/// Return a stream which yields all key, value pairs from the index
/// starting from the first key greater or equal to `start_key`.
///
/// Note that this is a copy of [`Self::visit`].
/// Note 1: that this is a copy of [`Self::visit`].
/// TODO: Once the sequential read path is removed this will become
/// the only index traversal method.
pub fn get_stream_from<'a>(
&'a self,
///
/// Note 2: this function used to take `&self` but it now consumes `self`. This is due to
/// the lifetime constraints of the reader and the stream / iterator it creates. Using `&self`
/// requires the reader to be present when the stream is used, and this creates a lifetime
/// dependency between the reader and the stream. Now if we want to create an iterator that
/// holds the stream, someone will need to keep a reference to the reader, which is inconvenient
/// to use from the image/delta layer APIs.
///
/// Feel free to add the `&self` variant back if it's necessary.
pub fn into_stream<'a>(
self,
start_key: &'a [u8; L],
ctx: &'a RequestContext,
) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a {
) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a
where
R: 'a,
{
try_stream! {
let mut stack = Vec::new();
stack.push((self.root_blk, None));
Expand Down
29 changes: 14 additions & 15 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,6 @@ impl DeltaLayerInner {
}

/// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
#[cfg(test)]
pub(super) async fn load_key_values(
&self,
ctx: &RequestContext,
Expand All @@ -941,7 +940,7 @@ impl DeltaLayerInner {
);
let mut result = Vec::new();
let mut stream =
Box::pin(self.stream_index_forwards(&index_reader, &[0; DELTA_KEY_SIZE], ctx));
Box::pin(self.stream_index_forwards(index_reader, &[0; DELTA_KEY_SIZE], ctx));
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let cursor = block_reader.block_cursor();
let mut buf = Vec::new();
Expand Down Expand Up @@ -976,7 +975,7 @@ impl DeltaLayerInner {
ctx: &RequestContext,
) -> anyhow::Result<Vec<VectoredRead>>
where
Reader: BlockReader,
Reader: BlockReader + Clone,
{
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
Expand All @@ -986,7 +985,7 @@ impl DeltaLayerInner {
let mut range_end_handled = false;

let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
let index_stream = index_reader.get_stream_from(&start_key.0, &ctx);
let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
let mut index_stream = std::pin::pin!(index_stream);

while let Some(index_entry) = index_stream.next().await {
Expand Down Expand Up @@ -1241,7 +1240,7 @@ impl DeltaLayerInner {
block_reader,
);

let stream = self.stream_index_forwards(&tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
let stream = self.stream_index_forwards(tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
// put in a sentinel value for getting the end offset for last item, and not having to
// repeat the whole read part
Expand Down Expand Up @@ -1300,7 +1299,7 @@ impl DeltaLayerInner {
offsets.start.pos(),
offsets.end.pos(),
meta,
max_read_size,
Some(max_read_size),
))
}
} else {
Expand Down Expand Up @@ -1459,17 +1458,17 @@ impl DeltaLayerInner {

fn stream_index_forwards<'a, R>(
&'a self,
reader: &'a DiskBtreeReader<R, DELTA_KEY_SIZE>,
reader: DiskBtreeReader<R, DELTA_KEY_SIZE>,
start: &'a [u8; DELTA_KEY_SIZE],
ctx: &'a RequestContext,
) -> impl futures::stream::Stream<
Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
> + 'a
where
R: BlockReader,
R: BlockReader + 'a,
{
use futures::stream::TryStreamExt;
let stream = reader.get_stream_from(start, ctx);
let stream = reader.into_stream(start, ctx);
stream.map_ok(|(key, value)| {
let key = DeltaKey::from_slice(&key);
let (key, lsn) = (key.key(), key.lsn());
Expand Down Expand Up @@ -1857,7 +1856,7 @@ mod test {
.finish(entries_meta.key_range.end, &timeline, &ctx)
.await?;

let inner = resident.as_delta(&ctx).await?;
let inner = resident.get_as_delta(&ctx).await?;

let file_size = inner.file.metadata().await?.len();
tracing::info!(
Expand Down Expand Up @@ -2044,11 +2043,11 @@ mod test {

let copied_layer = writer.finish(Key::MAX, &branch, ctx).await.unwrap();

copied_layer.as_delta(ctx).await.unwrap();
copied_layer.get_as_delta(ctx).await.unwrap();

assert_keys_and_values_eq(
new_layer.as_delta(ctx).await.unwrap(),
copied_layer.as_delta(ctx).await.unwrap(),
new_layer.get_as_delta(ctx).await.unwrap(),
copied_layer.get_as_delta(ctx).await.unwrap(),
truncate_at,
ctx,
)
Expand All @@ -2073,7 +2072,7 @@ mod test {
source.index_root_blk,
&source_reader,
);
let source_stream = source.stream_index_forwards(&source_tree, &start_key, ctx);
let source_stream = source.stream_index_forwards(source_tree, &start_key, ctx);
let source_stream = source_stream.filter(|res| match res {
Ok((_, lsn, _)) => ready(lsn < &truncated_at),
_ => ready(true),
Expand All @@ -2086,7 +2085,7 @@ mod test {
truncated.index_root_blk,
&truncated_reader,
);
let truncated_stream = truncated.stream_index_forwards(&truncated_tree, &start_key, ctx);
let truncated_stream = truncated.stream_index_forwards(truncated_tree, &start_key, ctx);
let mut truncated_stream = std::pin::pin!(truncated_stream);

let mut scratch_left = Vec::new();
Expand Down
Loading

0 comments on commit 2e7b2f9

Please sign in to comment.