Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(scrubber): more robust metadata consistency checks #8344

Merged
merged 17 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pageserver/src/tenant/storage_layer/layer_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,17 @@ impl LayerName {
Image(_) => "image",
}
}

/// Checks if the layer name might be an L0 layer.
///
/// **Implementation specific:** an L0 layer is a delta layer and it's key range is from [`Key::MIN`] to [`Key::MAX`]
pub fn is_maybe_l0(&self) -> bool {
yliang412 marked this conversation as resolved.
Show resolved Hide resolved
if let LayerName::Delta(delta) = &self {
delta.key_range == (Key::MIN..Key::MAX)
} else {
false
}
}
}

impl fmt::Display for LayerName {
Expand Down
64 changes: 50 additions & 14 deletions storage_scrubber/src/checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ impl TimelineAnalysis {
}
}

pub(crate) fn branch_cleanup_and_check_errors(
pub(crate) async fn branch_cleanup_and_check_errors(
s3_client: &Client,
root_target: &RootTarget,
id: &TenantShardTimelineId,
tenant_objects: &mut TenantObjectListing,
s3_active_branch: Option<&BranchData>,
Expand Down Expand Up @@ -85,15 +87,17 @@ pub(crate) fn branch_cleanup_and_check_errors(
}

if &index_part.version() != IndexPart::KNOWN_VERSIONS.last().unwrap() {
result.warnings.push(format!(
info!(
"index_part.json version is not latest: {}",
index_part.version()
))
);
}

if index_part.metadata.disk_consistent_lsn()
!= index_part.duplicated_disk_consistent_lsn()
{
// Tech debt: let's get rid of one of these, they are redundant
// https://github.com/neondatabase/neon/issues/8343
result.errors.push(format!(
"Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
index_part.metadata.disk_consistent_lsn(),
Expand All @@ -102,8 +106,16 @@ pub(crate) fn branch_cleanup_and_check_errors(
}

if index_part.layer_metadata.is_empty() {
// not an error, can happen for branches with zero writes, but notice that
info!("index_part.json has no layers");
if index_part.metadata.ancestor_timeline().is_none() {
// The initial timeline with no ancestor should ALWAYS have layers.
result.errors.push(
yliang412 marked this conversation as resolved.
Show resolved Hide resolved
"index_part.json has no layers (ancestor_timeline=None)"
.to_string(),
);
} else {
// Not an error, can happen for branches with zero writes, but notice that
info!("index_part.json has no layers (ancestor_timeline exists)");
}
}

for (layer, metadata) in index_part.layer_metadata {
Expand All @@ -114,16 +126,40 @@ pub(crate) fn branch_cleanup_and_check_errors(
}

if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
// FIXME: this will emit false positives if an index was
// uploaded concurrently with our scan. To make this check
// correct, we need to try sending a HEAD request for the
// layer we think is missing.
result.errors.push(format!(
"index_part.json contains a layer {}{} (shard {}) that is not present in remote storage",
let timeline_root = root_target.timeline_root(id);
let remote_layer_path = format!(
"{}{}{}",
timeline_root.prefix_in_bucket,
layer,
metadata.generation.get_suffix(),
metadata.shard
))
metadata.generation.get_suffix()
yliang412 marked this conversation as resolved.
Show resolved Hide resolved
);

// HEAD request used here to address a race condition when an index was uploaded concurrently
// with our scan. We check if the object is uploaded to S3 after taking the listing snapshot.
let response = s3_client
yliang412 marked this conversation as resolved.
Show resolved Hide resolved
.head_object()
.bucket(timeline_root.bucket_name)
.key(remote_layer_path)
.send()
.await;

if response.is_err() {
// Object is not present.

let msg = format!(
"index_part.json contains a layer {}{} (shard {}) that is not present in remote storage (layer_is_maybe_l0: {})",
layer,
metadata.generation.get_suffix(),
metadata.shard,
layer.is_maybe_l0(),
);

if layer.is_maybe_l0() {
result.warnings.push(msg);
} else {
result.errors.push(msg);
}
}
}
}
}
Expand Down
32 changes: 27 additions & 5 deletions storage_scrubber/src/scan_pageserver_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ pub async fn scan_metadata(
let mut tenant_objects = TenantObjectListing::default();
let mut tenant_timeline_results = Vec::new();

fn analyze_tenant(
async fn analyze_tenant(
s3_client: &Client,
target: &RootTarget,
tenant_id: TenantId,
summary: &mut MetadataSummary,
mut tenant_objects: TenantObjectListing,
Expand All @@ -259,8 +261,16 @@ pub async fn scan_metadata(

// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
// reference counts for layers across the tenant.
let analysis =
branch_cleanup_and_check_errors(&ttid, &mut tenant_objects, None, None, Some(data));
let analysis = branch_cleanup_and_check_errors(
s3_client,
target,
&ttid,
&mut tenant_objects,
None,
None,
Some(data),
)
.await;
summary.update_analysis(&ttid, &analysis);
}

Expand Down Expand Up @@ -317,9 +327,18 @@ pub async fn scan_metadata(
None => tenant_id = Some(ttid.tenant_shard_id.tenant_id),
Some(prev_tenant_id) => {
if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
// New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
let tenant_objects = std::mem::take(&mut tenant_objects);
let timelines = std::mem::take(&mut tenant_timeline_results);
analyze_tenant(prev_tenant_id, &mut summary, tenant_objects, timelines);
analyze_tenant(
&s3_client,
&target,
prev_tenant_id,
&mut summary,
tenant_objects,
timelines,
)
.await;
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
}
}
Expand All @@ -338,11 +357,14 @@ pub async fn scan_metadata(

if !tenant_timeline_results.is_empty() {
analyze_tenant(
&s3_client,
&target,
tenant_id.expect("Must be set if results are present"),
&mut summary,
tenant_objects,
tenant_timeline_results,
);
)
.await;
}

Ok(summary)
Expand Down
3 changes: 3 additions & 0 deletions test_runner/fixtures/common_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ class TimelineId(Id):
def __repr__(self) -> str:
return f'TimelineId("{self.id.hex()}")'

def __str__(self) -> str:
return self.id.hex()


# Workaround for compat with python 3.9, which does not have `typing.Self`
TTenantShardId = TypeVar("TTenantShardId", bound="TenantShardId")
Expand Down
28 changes: 28 additions & 0 deletions test_runner/fixtures/pageserver/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import time
from typing import Any, Dict, List, Optional, Tuple, Union

Expand All @@ -10,6 +11,7 @@

from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.pageserver.common_types import IndexPartDump
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.remote_storage import RemoteStorage, RemoteStorageKind, S3Storage
from fixtures.utils import wait_until
Expand Down Expand Up @@ -371,6 +373,32 @@ def list_prefix(
return response


def remote_storage_get_lastest_index_key(index_keys: List[str]) -> str:
yliang412 marked this conversation as resolved.
Show resolved Hide resolved
"""
Gets the latest index file key.

@param index_keys: A list of index keys of different generations.
"""

def parse_gen(index_key: str) -> int:
parts = index_key.split("index_part.json-")
return int(parts[-1], base=16) if len(parts) == 2 else -1

return max(index_keys, key=parse_gen)


def remote_storage_download_index_part(remote: S3Storage, index_key: str) -> IndexPartDump:
yliang412 marked this conversation as resolved.
Show resolved Hide resolved
"""
Downloads the index content from remote storage.

@param index_key: index key in remote storage.
"""
response = remote.client.get_object(Bucket=remote.bucket_name, Key=index_key)
body = response["Body"].read().decode("utf-8")
log.info(f"index_part.json: {body}")
return IndexPartDump.from_json(json.loads(body))


def remote_storage_delete_key(
remote: RemoteStorage,
key: str,
Expand Down
9 changes: 7 additions & 2 deletions test_runner/fixtures/remote_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import toml
from mypy_boto3_s3 import S3Client

from fixtures.common_types import TenantId, TimelineId
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log

TIMELINE_INDEX_PART_FILE_NAME = "index_part.json"
Expand Down Expand Up @@ -265,9 +265,14 @@ def do_cleanup(self):
def tenants_path(self) -> str:
return f"{self.prefix_in_bucket}/tenants"

def tenant_path(self, tenant_id: TenantId) -> str:
def tenant_path(self, tenant_id: Union[TenantShardId, TenantId]) -> str:
return f"{self.tenants_path()}/{tenant_id}"

def timeline_path(
self, tenant_id: Union[TenantShardId, TenantId], timeline_id: TimelineId
) -> str:
return f"{self.tenant_path(tenant_id)}/timelines/{timeline_id}"

def heatmap_key(self, tenant_id: TenantId) -> str:
return f"{self.tenant_path(tenant_id)}/{TENANT_HEATMAP_FILE_NAME}"

Expand Down
75 changes: 75 additions & 0 deletions test_runner/regress/test_storage_scrubber.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import os
import pprint
import shutil
from typing import Optional

import pytest
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
StorageScrubber,
)
from fixtures.pageserver.utils import (
remote_storage_download_index_part,
remote_storage_get_lastest_index_key,
)
from fixtures.remote_storage import S3Storage, s3_storage
from fixtures.workload import Workload

Expand Down Expand Up @@ -158,3 +164,72 @@ def test_scrubber_physical_gc(neon_env_builder: NeonEnvBuilder, shard_count: Opt
gc_summary = StorageScrubber(neon_env_builder).pageserver_physical_gc(min_age_secs=1)
assert gc_summary["remote_storage_errors"] == 0
assert gc_summary["indices_deleted"] == (expect_indices_per_shard - 2) * shard_count


@pytest.mark.parametrize("shard_count", [None, 4])
def test_scrubber_scan_pageserver_metadata(
neon_env_builder: NeonEnvBuilder, shard_count: Optional[int]
):
"""
Create some layers. Delete an object listed in index. Run scrubber and see if it detects the defect.
"""

# Use s3_storage so we could test out scrubber.
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.num_pageservers = shard_count if shard_count is not None else 1
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)

# Create some layers.

workload = Workload(env, env.initial_tenant, env.initial_timeline)
workload.init()

for _ in range(3):
workload.write_rows(128)

for pageserver in env.pageservers:
pageserver.stop()
pageserver.start()

for _ in range(3):
workload.write_rows(128)

# Get the latest index for a particular timeline.

tenant_shard_id = TenantShardId(env.initial_tenant, 0, shard_count if shard_count else 0)

assert isinstance(env.pageserver_remote_storage, S3Storage)
timeline_path = env.pageserver_remote_storage.timeline_path(
tenant_shard_id, env.initial_timeline
)

client = env.pageserver_remote_storage.client
bucket = env.pageserver_remote_storage.bucket_name
objects = client.list_objects_v2(Bucket=bucket, Prefix=f"{timeline_path}/", Delimiter="").get(
"Contents", []
)
keys = [obj["Key"] for obj in objects]
index_keys = list(filter(lambda s: s.startswith(f"{timeline_path}/index_part"), keys))
assert len(index_keys) > 0

latest_index_key = remote_storage_get_lastest_index_key(index_keys)
log.info(f"{latest_index_key=}")

index = remote_storage_download_index_part(env.pageserver_remote_storage, latest_index_key)

assert len(index.layer_metadata) > 0
it = iter(index.layer_metadata.items())

# Delete a layer file that is listed in the index.
layer, metadata = next(it)
log.info(f"Deleting {timeline_path}/{layer.to_str()}")
delete_response = client.delete_object(
Bucket=bucket,
Key=f"{timeline_path}/{layer.to_str()}-{metadata.generation:08x}",
)
log.info(f"delete response: {delete_response}")

# Check scan summary. Expect it to be a L0 layer so only emit warnings.
scan_summary = StorageScrubber(neon_env_builder).scan_metadata()
yliang412 marked this conversation as resolved.
Show resolved Hide resolved
log.info(f"{pprint.pformat(scan_summary)}")
assert len(scan_summary["with_warnings"]) > 0
Loading