Skip to content

Commit

Permalink
Rework PageStream connection state handling: (#7611)
Browse files Browse the repository at this point in the history
 * Make PS connection startup use async APIs
   This allows for improved query cancellation when we start connections
 * Make PS connections have per-shard connection retry state.
   Previously they shared global backoff state, which is bad for quickly
   getting all connections started and/or back online.
 * Make sure we clean up most connection state on failed connections.
   Previously, we could technically leak some resources that we'd otherwise
   clean up. Now, the resources are correctly cleaned up.
 * pagestore_smgr.c now PANICs on unexpected response message types.
   Unexpected responses are likely a symptom of having a desynchronized
   view of the connection state. As a desynchronized connection state can
   cause corruption, we PANIC, as we don't know what data may have been
   written to buffers: the only solution is to fail fast & hope we didn't
   write wrong data.
 * Catch errors in sync pagestream request handling.
   Previously, if a query was cancelled after a message was sent to
   the pageserver, but before the data was received, the backend
   could forget that it sent the synchronous request, and let others
   deal with the repercussions. This could then lead to incorrect
   responses, or errors such as "unexpected response from page
   server with tag 0x68"
  • Loading branch information
MMeent authored May 23, 2024
1 parent ea2e830 commit 0e4f182
Show file tree
Hide file tree
Showing 4 changed files with 724 additions and 179 deletions.
11 changes: 11 additions & 0 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ async fn page_service_conn_main(
socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
let socket = std::pin::pin!(socket);

fail::fail_point!("ps::connection-start::pre-login");

// XXX: pgbackend.run() should take the connection_ctx,
// and create a child per-query context when it invokes process_query.
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
Expand Down Expand Up @@ -603,6 +605,7 @@ impl PageServerHandler {
};

trace!("query: {copy_data_bytes:?}");
fail::fail_point!("ps::handle-pagerequest-message");

// Trace request if needed
if let Some(t) = tracer.as_mut() {
Expand All @@ -617,6 +620,7 @@ impl PageServerHandler {

let (response, span) = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
fail::fail_point!("ps::handle-pagerequest-message::exists");
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
(
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
Expand All @@ -626,6 +630,7 @@ impl PageServerHandler {
)
}
PagestreamFeMessage::Nblocks(req) => {
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
(
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
Expand All @@ -635,6 +640,7 @@ impl PageServerHandler {
)
}
PagestreamFeMessage::GetPage(req) => {
fail::fail_point!("ps::handle-pagerequest-message::getpage");
// shard_id is filled in by the handler
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
(
Expand All @@ -645,6 +651,7 @@ impl PageServerHandler {
)
}
PagestreamFeMessage::DbSize(req) => {
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
(
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
Expand All @@ -654,6 +661,7 @@ impl PageServerHandler {
)
}
PagestreamFeMessage::GetSlruSegment(req) => {
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
(
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
Expand Down Expand Up @@ -1505,6 +1513,7 @@ where
_pgb: &mut PostgresBackend<IO>,
_sm: &FeStartupPacket,
) -> Result<(), QueryError> {
fail::fail_point!("ps::connection-start::startup-packet");
Ok(())
}

Expand All @@ -1519,6 +1528,8 @@ where
Err(QueryError::SimulatedConnectionError)
});

fail::fail_point!("ps::connection-start::process-query");

let ctx = self.connection_ctx.attached_child();
debug!("process query {query_string:?}");
let parts = query_string.split_whitespace().collect::<Vec<_>>();
Expand Down
Loading

1 comment on commit 0e4f182

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3202 tests run: 3062 passed, 0 failed, 140 skipped (full report)


Code coverage* (full report)

  • functions: 31.4% (6445 of 20541 functions)
  • lines: 48.3% (49839 of 103239 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
0e4f182 at 2024-05-23T22:52:08.198Z :recycle:

Please sign in to comment.