Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: integrate lsn lease into synthetic size #8208

Closed
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ pub struct TimelineCreateRequest {
pub pg_version: Option<u32>,
}

#[derive(Serialize, Deserialize, Clone)]
pub struct LsnLeaseRequest {
pub lsn: Lsn,
}

#[derive(Serialize, Deserialize)]
pub struct TenantShardSplitRequest {
pub new_shard_count: u8,
Expand Down
4 changes: 2 additions & 2 deletions libs/tenant_size_model/src/calculation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ struct SegmentSize {
}

struct SizeAlternatives {
// cheapest alternative if parent is available.
/// cheapest alternative if parent is available.
incremental: SegmentSize,

// cheapest alternative if parent node is not available
/// cheapest alternative if parent node is not available
non_incremental: Option<SegmentSize>,
}

Expand Down
40 changes: 30 additions & 10 deletions libs/tenant_size_model/src/svg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ const SVG_WIDTH: f32 = 500.0;
struct SvgDraw<'a> {
storage: &'a StorageModel,
branches: &'a [String],
seg_to_branch: &'a [usize],
seg_to_branch: &'a [(usize, bool)],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't find a good way to propagate the info regarding whether a point is a lease.

sizes: &'a [SegmentSizeResult],

// layout
xscale: f32,
min_lsn: u64,
seg_coordinates: Vec<(f32, f32)>,
seg_coordinates: Vec<(f32, f32, bool)>,
}

fn draw_legend(result: &mut String) -> anyhow::Result<()> {
Expand Down Expand Up @@ -42,13 +42,18 @@ fn draw_legend(result: &mut String) -> anyhow::Result<()> {
"<line x1=\"5\" y1=\"70\" x2=\"15\" y2=\"70\" stroke-width=\"1\" stroke=\"gray\" />"
)?;
writeln!(result, "<text x=\"20\" y=\"75\">WAL not retained</text>")?;
writeln!(
result,
"<line x1=\"10\" y1=\"85\" x2=\"10\" y2=\"95\" stroke-width=\"3\" stroke=\"blue\" />"
)?;
writeln!(result, "<text x=\"20\" y=\"95\">LSN lease</text>")?;
Ok(())
}

pub fn draw_svg(
storage: &StorageModel,
branches: &[String],
seg_to_branch: &[usize],
seg_to_branch: &[(usize, bool)],
sizes: &SizeResult,
) -> anyhow::Result<String> {
let mut draw = SvgDraw {
Expand Down Expand Up @@ -100,7 +105,7 @@ impl<'a> SvgDraw<'a> {

// Layout the timelines on Y dimension.
// TODO
let mut y = 100.0;
let mut y = 120.0;
let mut branch_y_coordinates = Vec::new();
for _branch in self.branches {
branch_y_coordinates.push(y);
Expand All @@ -109,10 +114,10 @@ impl<'a> SvgDraw<'a> {

// Calculate coordinates for each point
let seg_coordinates = std::iter::zip(segments, self.seg_to_branch)
.map(|(seg, branch_id)| {
.map(|(seg, (branch_id, is_lease_point))| {
let x = (seg.lsn - min_lsn) as f32 / xscale;
let y = branch_y_coordinates[*branch_id];
(x, y)
(x, y, *is_lease_point)
})
.collect();

Expand Down Expand Up @@ -140,8 +145,8 @@ impl<'a> SvgDraw<'a> {
SegmentMethod::Skipped => "stroke-width=\"1\" stroke=\"gray\"",
};
if let Some(parent_id) = seg.parent {
let (x1, y1) = self.seg_coordinates[parent_id];
let (x2, y2) = self.seg_coordinates[seg_id];
let (x1, y1, _) = self.seg_coordinates[parent_id];
let (x2, y2, _) = self.seg_coordinates[seg_id];

writeln!(
result,
Expand All @@ -154,7 +159,7 @@ impl<'a> SvgDraw<'a> {
writeln!(result, "</line>")?;
} else {
// draw a little dash to mark the starting point of this branch
let (x, y) = self.seg_coordinates[seg_id];
let (x, y, _) = self.seg_coordinates[seg_id];
let (x1, y1) = (x, y - 5.0);
let (x2, y2) = (x, y + 5.0);

Expand All @@ -174,7 +179,22 @@ impl<'a> SvgDraw<'a> {
let seg = &self.storage.segments[seg_id];

// draw a snapshot point if it's needed
let (coord_x, coord_y) = self.seg_coordinates[seg_id];
let (coord_x, coord_y, is_lease_point) = self.seg_coordinates[seg_id];

if is_lease_point {
let (x1, y1) = (coord_x, coord_y - 10.0);
let (x2, y2) = (coord_x, coord_y + 10.0);

let style = "stroke-width=\"3\" stroke=\"blue\"";

writeln!(
result,
"<line x1=\"{x1}\" y1=\"{y1}\" x2=\"{x2}\" y2=\"{y2}\" {style}>",
)?;
writeln!(result, " <title>leased lsn at {}</title>", seg.lsn)?;
writeln!(result, "</line>")?;
}

if self.sizes[seg_id].method == SegmentMethod::SnapshotHere {
writeln!(
result,
Expand Down
8 changes: 8 additions & 0 deletions libs/tenant_size_model/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ impl ScenarioBuilder {
let storage_model = StorageModel {
segments: self.segments.clone(),
};

let segs = storage_model
.segments
.iter()
.enumerate()
.collect::<Vec<_>>();
eprintln!("segs before: {:#?}", segs);

let size_result = storage_model.calculate();
(storage_model, size_result)
}
Expand Down
17 changes: 11 additions & 6 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use pageserver_api::models::ListAuxFilesRequest;
use pageserver_api::models::LocationConfig;
use pageserver_api::models::LocationConfigListResponse;
use pageserver_api::models::LsnLease;
use pageserver_api::models::LsnLeaseRequest;
use pageserver_api::models::ShardParameters;
use pageserver_api::models::TenantDetails;
use pageserver_api::models::TenantLocationConfigResponse;
Expand Down Expand Up @@ -67,6 +68,7 @@ use crate::tenant::remote_timeline_client::download_index_part;
use crate::tenant::remote_timeline_client::list_remote_tenant_shards;
use crate::tenant::remote_timeline_client::list_remote_timelines;
use crate::tenant::secondary::SecondaryController;
use crate::tenant::size::LsnKind;
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::storage_layer::LayerName;
Expand Down Expand Up @@ -1191,10 +1193,15 @@ fn synthetic_size_html_response(
timeline_map.insert(ti.timeline_id, index);
timeline_ids.push(ti.timeline_id.to_string());
}
let seg_to_branch: Vec<usize> = inputs
let seg_to_branch: Vec<(usize, bool)> = inputs
.segments
.iter()
.map(|seg| *timeline_map.get(&seg.timeline_id).unwrap())
.map(|seg| {
(
*timeline_map.get(&seg.timeline_id).unwrap(),
seg.kind == LsnKind::LeasePoint,
)
})
.collect();

let svg =
Expand Down Expand Up @@ -1527,15 +1534,13 @@ async fn handle_tenant_break(

// Obtains an lsn lease on the given timeline.
async fn lsn_lease_handler(
request: Request<Body>,
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;

let lsn: Lsn = parse_query_param(&request, "lsn")?
.ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;
let lsn = json_request::<LsnLeaseRequest>(&mut request).await?.lsn;

let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);

Expand Down
44 changes: 43 additions & 1 deletion pageserver/src/tenant/size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ impl SegmentMeta {
LsnKind::BranchPoint => true,
LsnKind::GcCutOff => true,
LsnKind::BranchEnd => false,
LsnKind::LeasePoint => true,
LsnKind::LeaseStart => false,
LsnKind::LeaseEnd => false,
}
}
}
Expand All @@ -103,6 +106,9 @@ pub enum LsnKind {
GcCutOff,
/// Last record LSN
BranchEnd,
LeasePoint,
LeaseStart,
LeaseEnd,
}

/// Collect all relevant LSNs to the inputs. These will only be helpful in the serialized form as
Expand Down Expand Up @@ -248,6 +254,15 @@ pub(super) async fn gather_inputs(
.map(|lsn| (lsn, LsnKind::BranchPoint))
.collect::<Vec<_>>();

lsns.extend(
gc_info
.leases
.keys()
.filter(|&&lsn| lsn > ancestor_lsn)
.copied()
.map(|lsn| (lsn, LsnKind::LeasePoint)),
);

drop(gc_info);

// Add branch points we collected earlier, just in case there were any that were
Expand Down Expand Up @@ -296,6 +311,7 @@ pub(super) async fn gather_inputs(
if kind == LsnKind::BranchPoint {
branchpoint_segments.insert((timeline_id, lsn), segments.len());
}

segments.push(SegmentMeta {
segment: Segment {
parent: Some(parent),
Expand All @@ -306,7 +322,33 @@ pub(super) async fn gather_inputs(
timeline_id: timeline.timeline_id,
kind,
});
parent += 1;

parent = segments.len() - 1;

if kind == LsnKind::LeasePoint {
let mut lease_parent = parent;
segments.push(SegmentMeta {
segment: Segment {
parent: Some(lease_parent),
lsn: lsn.0,
size: None,
needed: next_gc_cutoff <= lsn,
},
timeline_id: timeline.timeline_id,
kind: LsnKind::LeaseStart,
});
lease_parent += 1;
segments.push(SegmentMeta {
segment: Segment {
parent: Some(lease_parent),
lsn: lsn.0,
size: None,
needed: true,
},
timeline_id: timeline.timeline_id,
kind: LsnKind::LeaseEnd,
});
}
}

// Current end of the timeline
Expand Down
16 changes: 16 additions & 0 deletions test_runner/fixtures/pageserver/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,22 @@ def timeline_get_lsn_by_timestamp(
res_json = res.json()
return res_json

def timeline_lsn_lease(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, lsn: Lsn
):
data = {
"lsn": str(lsn),
}

log.info(f"Requesting lsn lease for {lsn=}, {tenant_id=}, {timeline_id=}")
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/lsn_lease",
json=data,
)
self.verbose_error(res)
res_json = res.json()
return res_json

def timeline_get_timestamp_of_lsn(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, lsn: Lsn
):
Expand Down
100 changes: 100 additions & 0 deletions test_runner/regress/test_tenant_size.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,3 +710,103 @@ def mask_model_inputs(x):
return newlist
else:
return x


@pytest.mark.parametrize("zero_gc", [True, False])
def test_lsn_lease_size(neon_env_builder: NeonEnvBuilder, test_output_dir: Path, zero_gc: bool):
"""
Compare a LSN lease to a read-only branch for synthetic size calculation.
They should have the same effect.
"""

conf = {"pitr_interval": "0s"} if zero_gc else {"pitr_interval": "3600s"}

env = neon_env_builder.init_start(initial_tenant_conf=conf)
lease_res = insert_and_acquire_lease(
env, env.initial_tenant, env.initial_timeline, test_output_dir
)

tenant, timeline = env.neon_cli.create_tenant(conf=conf)
ro_branch_res = insert_and_create_ro_branch(env, tenant, timeline, test_output_dir)

for lhs, rhs in zip(lease_res, ro_branch_res):
assert_size_approx_equal(lhs, rhs)


def insert_and_acquire_lease(
env: NeonEnv, tenant: TenantId, timeline: TimelineId, test_output_dir: Path
) -> list[int]:
sizes = []
client = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=tenant) as ep:
initial_size = client.tenant_size(tenant)
log.info(f"initial size: {initial_size}")

with ep.cursor() as cur:
cur.execute(
"CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)"
)
last_flush_lsn = wait_for_last_flush_lsn(env, ep, tenant, timeline)
res = client.timeline_lsn_lease(tenant, timeline, last_flush_lsn)
log.info(f"result from lsn_lease api: {res}")

with ep.cursor() as cur:
cur.execute(
"CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)"
)
cur.execute(
"CREATE TABLE t2 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)"
)
cur.execute(
"CREATE TABLE t3 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)"
)

last_flush_lsn = wait_for_last_flush_lsn(env, ep, tenant, timeline)

size_after_lease_and_insert = client.tenant_size(tenant)
sizes.append(size_after_lease_and_insert)
log.info(f"size_after_lease_and_insert: {size_after_lease_and_insert}")

size_debug_file = open(test_output_dir / "size_debug_lease.html", "w")
size_debug = client.tenant_size_debug(tenant)
size_debug_file.write(size_debug)
return sizes


def insert_and_create_ro_branch(
env: NeonEnv, tenant: TenantId, timeline: TimelineId, test_output_dir: Path
) -> list[int]:
sizes = []
client = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=tenant) as ep:
initial_size = client.tenant_size(tenant)
log.info(f"initial size: {initial_size}")

with ep.cursor() as cur:
cur.execute(
"CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)"
)
wait_for_last_flush_lsn(env, ep, tenant, timeline)
ro_branch = env.neon_cli.create_branch("ro_branch", tenant_id=tenant)
log.info(f"{ro_branch=}")

with ep.cursor() as cur:
cur.execute(
"CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)"
)
cur.execute(
"CREATE TABLE t2 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)"
)
cur.execute(
"CREATE TABLE t3 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)"
)
wait_for_last_flush_lsn(env, ep, tenant, timeline)

size_after_branching = client.tenant_size(tenant)
sizes.append(size_after_branching)
log.info(f"size_after_branching: {size_after_branching}")

size_debug_file = open(test_output_dir / "size_debug_ro_branch.html", "w")
size_debug = client.tenant_size_debug(tenant)
size_debug_file.write(size_debug)
return sizes
Loading