Skip to content

Commit

Permalink
Add test_uploads_and_deletions test (#7758)
Browse files Browse the repository at this point in the history
Adds a test that is a reproducer for many tiered compaction bugs,
both ones that have since been fixed as well as still unfxied ones:
* (now fixed) #7296 
* #7707 
* #7759
* Likely also #7244 but I haven't tried that.

The key ordering bug can be reproduced by switching to
`merge_delta_keys` instead of `merge_delta_keys_buffered`, so reverting
a big part of #7661, although it only sometimes reproduces (30-50% of
cases).

part of #7554
  • Loading branch information
arpad-m authored and a-masterov committed May 20, 2024
1 parent 5353da7 commit 09e6629
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 82 deletions.
78 changes: 78 additions & 0 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
wait_for_upload,
wait_for_upload_queue_empty,
)
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
Expand All @@ -79,6 +80,7 @@
allure_attach_from_dir,
assert_no_errors,
get_self_dir,
print_gc_result,
subprocess_capture,
wait_until,
)
Expand Down Expand Up @@ -4419,3 +4421,79 @@ def parse_project_git_version_output(s: str) -> str:
return commit

raise ValueError(f"unable to parse --version output: '{s}'")


def generate_uploads_and_deletions(
env: NeonEnv,
*,
init: bool = True,
tenant_id: Optional[TenantId] = None,
timeline_id: Optional[TimelineId] = None,
data: Optional[str] = None,
pageserver: NeonPageserver,
):
"""
Using the environment's default tenant + timeline, generate a load pattern
that results in some uploads and some deletions to remote storage.
"""

if tenant_id is None:
tenant_id = env.initial_tenant
assert tenant_id is not None

if timeline_id is None:
timeline_id = env.initial_timeline
assert timeline_id is not None

ps_http = pageserver.http_client()

with env.endpoints.create_start(
"main", tenant_id=tenant_id, pageserver_id=pageserver.id
) as endpoint:
if init:
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
)

def churn(data):
endpoint.safe_psql_many(
[
f"""
INSERT INTO foo (id, val)
SELECT g, '{data}'
FROM generate_series(1, 200) g
ON CONFLICT (id) DO UPDATE
SET val = EXCLUDED.val
""",
# to ensure that GC can actually remove some layers
"VACUUM foo",
]
)
assert tenant_id is not None
assert timeline_id is not None
# We are waiting for uploads as well as local flush, in order to avoid leaving the system
# in a state where there are "future layers" in remote storage that will generate deletions
# after a restart.
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
)

# Compaction should generate some GC-elegible layers
for i in range(0, 2):
churn(f"{i if data is None else data}")

gc_result = ps_http.timeline_gc(tenant_id, timeline_id, 0)
print_gc_result(gc_result)
assert gc_result["layers_removed"] > 0

# Stop endpoint and flush all data to pageserver, then checkpoint it: this
# ensures that the pageserver is in a fully idle state: there will be no more
# background ingest, no more uploads pending, and therefore no non-determinism
# in subsequent actions like pageserver restarts.
final_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id)
ps_http.timeline_checkpoint(tenant_id, timeline_id)
# Finish uploads
wait_for_upload(ps_http, tenant_id, timeline_id, final_lsn)
# Finish all remote writes (including deletions)
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
62 changes: 61 additions & 1 deletion test_runner/regress/test_compaction.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import enum
import json
import os
from typing import Optional

import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_fixtures import NeonEnvBuilder, generate_uploads_and_deletions
from fixtures.pageserver.http import PageserverApiException
from fixtures.workload import Workload

AGGRESIVE_COMPACTION_TENANT_CONF = {
Expand Down Expand Up @@ -190,3 +192,61 @@ def test_sharding_compaction(

# Assert that everything is still readable
workload.validate()


class CompactionAlgorithm(str, enum.Enum):
LEGACY = "Legacy"
TIERED = "Tiered"


@pytest.mark.parametrize(
"compaction_algorithm", [CompactionAlgorithm.LEGACY, CompactionAlgorithm.TIERED]
)
def test_uploads_and_deletions(
neon_env_builder: NeonEnvBuilder,
compaction_algorithm: CompactionAlgorithm,
):
"""
:param compaction_algorithm: the compaction algorithm to use.
"""

tenant_conf = {
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{128 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{128 * 1024}",
# no PITR horizon, we specify the horizon when we request on-demand GC
"pitr_interval": "0s",
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
"compaction_algorithm": json.dumps({"kind": compaction_algorithm.value}),
}
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)

# TODO remove these allowed errors
# https://github.com/neondatabase/neon/issues/7707
# https://github.com/neondatabase/neon/issues/7759
allowed_errors = [
".*duplicated L1 layer.*",
".*delta layer created with.*duplicate values.*",
".*assertion failed: self.lsn_range.start <= lsn.*",
".*HTTP request handler task panicked: task.*panicked.*",
]
if compaction_algorithm == CompactionAlgorithm.TIERED:
env.pageserver.allowed_errors.extend(allowed_errors)

try:
generate_uploads_and_deletions(env, pageserver=env.pageserver)
except PageserverApiException as e:
log.info(f"Obtained PageserverApiException: {e}")

# The errors occur flakily and no error is ensured to occur,
# however at least one of them occurs.
if compaction_algorithm == CompactionAlgorithm.TIERED:
found_allowed_error = any(env.pageserver.log_contains(e) for e in allowed_errors)
if not found_allowed_error:
raise Exception("None of the allowed_errors occured in the log")
83 changes: 2 additions & 81 deletions test_runner/regress/test_pageserver_generations.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,21 @@
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
NeonPageserver,
PgBin,
S3Scrubber,
flush_ep_to_pageserver,
last_flush_lsn_upload,
generate_uploads_and_deletions,
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
assert_tenant_state,
list_prefix,
wait_for_last_record_lsn,
wait_for_upload,
wait_for_upload_queue_empty,
)
from fixtures.remote_storage import (
RemoteStorageKind,
)
from fixtures.utils import print_gc_result, wait_until
from fixtures.utils import wait_until
from fixtures.workload import Workload

# A tenant configuration that is convenient for generating uploads and deletions
Expand All @@ -59,82 +56,6 @@
}


def generate_uploads_and_deletions(
env: NeonEnv,
*,
init: bool = True,
tenant_id: Optional[TenantId] = None,
timeline_id: Optional[TimelineId] = None,
data: Optional[str] = None,
pageserver: NeonPageserver,
):
"""
Using the environment's default tenant + timeline, generate a load pattern
that results in some uploads and some deletions to remote storage.
"""

if tenant_id is None:
tenant_id = env.initial_tenant
assert tenant_id is not None

if timeline_id is None:
timeline_id = env.initial_timeline
assert timeline_id is not None

ps_http = pageserver.http_client()

with env.endpoints.create_start(
"main", tenant_id=tenant_id, pageserver_id=pageserver.id
) as endpoint:
if init:
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
)

def churn(data):
endpoint.safe_psql_many(
[
f"""
INSERT INTO foo (id, val)
SELECT g, '{data}'
FROM generate_series(1, 200) g
ON CONFLICT (id) DO UPDATE
SET val = EXCLUDED.val
""",
# to ensure that GC can actually remove some layers
"VACUUM foo",
]
)
assert tenant_id is not None
assert timeline_id is not None
# We are waiting for uploads as well as local flush, in order to avoid leaving the system
# in a state where there are "future layers" in remote storage that will generate deletions
# after a restart.
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
)

# Compaction should generate some GC-elegible layers
for i in range(0, 2):
churn(f"{i if data is None else data}")

gc_result = ps_http.timeline_gc(tenant_id, timeline_id, 0)
print_gc_result(gc_result)
assert gc_result["layers_removed"] > 0

# Stop endpoint and flush all data to pageserver, then checkpoint it: this
# ensures that the pageserver is in a fully idle state: there will be no more
# background ingest, no more uploads pending, and therefore no non-determinism
# in subsequent actions like pageserver restarts.
final_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id)
ps_http.timeline_checkpoint(tenant_id, timeline_id)
# Finish uploads
wait_for_upload(ps_http, tenant_id, timeline_id, final_lsn)
# Finish all remote writes (including deletions)
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)


def read_all(
env: NeonEnv, tenant_id: Optional[TenantId] = None, timeline_id: Optional[TimelineId] = None
):
Expand Down

0 comments on commit 09e6629

Please sign in to comment.