Skip to content

Commit

Permalink
Fix connect to PS on MacOS/X (#7885)
Browse files Browse the repository at this point in the history
## Problem

After [0e4f182] which introduce async
connect
Neon is not able to connect to page server.

## Summary of changes

Perform sync commit at MacOS/X

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
  • Loading branch information
knizhnik and Konstantin Knizhnik authored May 27, 2024
1 parent b2d34a8 commit d61e924
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 49 deletions.
89 changes: 45 additions & 44 deletions pgxn/neon/libpagestore.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,6 @@ typedef struct
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *wes_read;
/*---
* WaitEventSet containing:
* - WL_SOCKET_WRITABLE on 'conn'
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *wes_write;
} PageServer;

static PageServer page_servers[MAX_SHARDS];
Expand Down Expand Up @@ -336,11 +329,6 @@ CLEANUP_AND_DISCONNECT(PageServer *shard)
FreeWaitEventSet(shard->wes_read);
shard->wes_read = NULL;
}
if (shard->wes_write)
{
FreeWaitEventSet(shard->wes_write);
shard->wes_write = NULL;
}
if (shard->conn)
{
PQfinish(shard->conn);
Expand Down Expand Up @@ -436,22 +424,6 @@ pageserver_connect(shardno_t shard_no, int elevel)
return false;
}

shard->wes_read = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(shard->wes_read, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(shard->wes_read, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(shard->wes_read, WL_SOCKET_READABLE, PQsocket(shard->conn), NULL, NULL);

shard->wes_write = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(shard->wes_write, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(shard->wes_write, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(shard->wes_write, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE,
PQsocket(shard->conn),
NULL, NULL);

shard->state = PS_Connecting_Startup;
/* fallthrough */
}
Expand All @@ -460,13 +432,12 @@ pageserver_connect(shardno_t shard_no, int elevel)
char *pagestream_query;
int ps_send_query_ret;
bool connected = false;

int poll_result = PGRES_POLLING_WRITING;
neon_shard_log(shard_no, DEBUG5, "Connection state: Connecting_Startup");

do
{
WaitEvent event;
int poll_result = PQconnectPoll(shard->conn);

switch (poll_result)
{
Expand Down Expand Up @@ -497,25 +468,45 @@ pageserver_connect(shardno_t shard_no, int elevel)
}
case PGRES_POLLING_READING:
/* Sleep until there's something to do */
(void) WaitEventSetWait(shard->wes_read, -1L, &event, 1,
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);

/* query cancellation, backend shutdown */
CHECK_FOR_INTERRUPTS();

while (true)
{
int rc = WaitLatchOrSocket(MyLatch,
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | WL_SOCKET_READABLE,
PQsocket(shard->conn),
0,
PG_WAIT_EXTENSION);
elog(DEBUG5, "PGRES_POLLING_READING=>%d", rc);
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
/* query cancellation, backend shutdown */
CHECK_FOR_INTERRUPTS();
}
if (rc & WL_SOCKET_READABLE)
break;
}
/* PQconnectPoll() handles the socket polling state updates */

break;
case PGRES_POLLING_WRITING:
/* Sleep until there's something to do */
(void) WaitEventSetWait(shard->wes_write, -1L, &event, 1,
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);

/* query cancellation, backend shutdown */
CHECK_FOR_INTERRUPTS();

while (true)
{
int rc = WaitLatchOrSocket(MyLatch,
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | WL_SOCKET_WRITEABLE,
PQsocket(shard->conn),
0,
PG_WAIT_EXTENSION);
elog(DEBUG5, "PGRES_POLLING_WRITING=>%d", rc);
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
/* query cancellation, backend shutdown */
CHECK_FOR_INTERRUPTS();
}
if (rc & WL_SOCKET_WRITEABLE)
break;
}
/* PQconnectPoll() handles the socket polling state updates */

break;
Expand All @@ -524,12 +515,22 @@ pageserver_connect(shardno_t shard_no, int elevel)
connected = true;
break;
}
poll_result = PQconnectPoll(shard->conn);
elog(DEBUG5, "PQconnectPoll=>%d", poll_result);
}
while (!connected);

/* No more polling needed; connection succeeded */
shard->last_connect_time = GetCurrentTimestamp();

shard->wes_read = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(shard->wes_read, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(shard->wes_read, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(shard->wes_read, WL_SOCKET_READABLE, PQsocket(shard->conn), NULL, NULL);


switch (neon_protocol_version)
{
case 2:
Expand Down
10 changes: 5 additions & 5 deletions pgxn/neon/pagestore_smgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -584,9 +584,9 @@ prefetch_read(PrefetchRequest *slot)
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
neon_shard_log(slot->shard_no, ERROR,
"Incorrect prefetch read: status=%d response=%llx my=%llu receive=%llu",
slot->status, (size_t) (void *) slot->response,
slot->my_ring_index, MyPState->ring_receive);
"Incorrect prefetch read: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long)slot->my_ring_index, (long)MyPState->ring_receive);

old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive(slot->shard_no);
Expand All @@ -606,8 +606,8 @@ prefetch_read(PrefetchRequest *slot)
else
{
neon_shard_log(slot->shard_no, WARNING,
"No response from reading prefetch entry %llu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
slot->my_ring_index,
"No response from reading prefetch entry %lu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
(long)slot->my_ring_index,
RelFileInfoFmt(BufTagGetNRelFileInfo(slot->buftag)),
slot->buftag.forkNum, slot->buftag.blockNum);
return false;
Expand Down

1 comment on commit d61e924

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3226 tests run: 3080 passed, 0 failed, 146 skipped (full report)


Code coverage* (full report)

  • functions: 31.4% (6449 of 20543 functions)
  • lines: 48.3% (49964 of 103355 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
d61e924 at 2024-05-27T14:20:03.230Z :recycle:

Please sign in to comment.