From 55ebeac1f48c7e8cb64df373fb28e6a0c50bdafe Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Mon, 9 Aug 2021 09:46:07 +0300 Subject: [PATCH] [walproposer] [contrib/zenith] [refer #395] Do no align sart replication position in wal_proppser to segment boundary --- contrib/zenith/pagestore_smgr.c | 4 + src/backend/replication/walproposer.c | 359 +++++++++++++------------- 2 files changed, 181 insertions(+), 182 deletions(-) diff --git a/contrib/zenith/pagestore_smgr.c b/contrib/zenith/pagestore_smgr.c index 858a67841ea..3d24cb79f5f 100644 --- a/contrib/zenith/pagestore_smgr.c +++ b/contrib/zenith/pagestore_smgr.c @@ -21,6 +21,7 @@ #include "storage/relfilenode.h" #include "storage/smgr.h" #include "access/xlogdefs.h" +#include "postmaster/interrupt.h" #include "storage/bufmgr.h" #include "fmgr.h" #include "miscadmin.h" @@ -243,6 +244,9 @@ zenith_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, { XLogRecPtr lsn = PageGetLSN(buffer); + if (ShutdownRequestPending) + return; + /* * If the page was not WAL-logged before eviction then we can lose its modification. * PD_WAL_LOGGED bit is used to mark pages which are wal-logged. diff --git a/src/backend/replication/walproposer.c b/src/backend/replication/walproposer.c index 240e3769a6a..837b751a24c 100644 --- a/src/backend/replication/walproposer.c +++ b/src/backend/replication/walproposer.c @@ -326,11 +326,6 @@ static void WalProposerStartStreaming(XLogRecPtr startpos) { StartReplicationCmd cmd; - /* - * Always start streaming at the beginning of a segment - */ - startpos -= XLogSegmentOffset(startpos, serverInfo.walSegSize); - cmd.slotname = WAL_PROPOSER_SLOT_NAME; cmd.timeline = serverInfo.timeline; cmd.startpoint = startpos; @@ -656,226 +651,227 @@ WalProposerPoll(void) WalKeeper* wk = (WalKeeper*) event.user_data; int i = (int)(wk - walkeeper); - /* communication with walkeepers */ - if (event.events & WL_SOCKET_READABLE) + if (rc != 0) { - switch (wk->state) + /* communication with walkeepers */ + if (event.events & WL_SOCKET_READABLE) { - case SS_HANDSHAKE: - /* Receive walkeeper node state */ - rc = ReadSocketAsync(wk->sock, - (char*)&wk->info + wk->asyncOffs, - sizeof(wk->info) - wk->asyncOffs); - if (rc < 0) - { - ResetConnection(i); - } - else if ((wk->asyncOffs += rc) == sizeof(wk->info)) - { - /* WalKeeper response completely received */ - - /* Check protocol version */ - if (wk->info.server.protocolVersion != SK_PROTOCOL_VERSION) + switch (wk->state) + { + case SS_HANDSHAKE: + /* Receive walkeeper node state */ + rc = ReadSocketAsync(wk->sock, + (char*)&wk->info + wk->asyncOffs, + sizeof(wk->info) - wk->asyncOffs); + if (rc < 0) { - elog(WARNING, "WalKeeper has incompatible protocol version %d vs. %d", - wk->info.server.protocolVersion, SK_PROTOCOL_VERSION); ResetConnection(i); } - else + else if ((wk->asyncOffs += rc) == sizeof(wk->info)) { - wk->state = SS_VOTING; - wk->feedback.flushLsn = restartLsn; - wk->feedback.hs.ts = 0; + /* WalKeeper response completely received */ - /* Check if we have quorum */ - if (++n_connected >= quorum) + /* Check protocol version */ + if (wk->info.server.protocolVersion != SK_PROTOCOL_VERSION) { - if (n_connected == quorum) - StartElection(); + elog(WARNING, "WalKeeper has incompatible protocol version %d vs. %d", + wk->info.server.protocolVersion, SK_PROTOCOL_VERSION); + ResetConnection(i); + } + else + { + wk->state = SS_VOTING; + wk->feedback.flushLsn = restartLsn; + wk->feedback.hs.ts = 0; - /* Now send max-node-id to everyone participating in voting and wait their responses */ - for (int j = 0; j < n_walkeepers; j++) + /* Check if we have quorum */ + if (++n_connected >= quorum) { - if (walkeeper[j].state == SS_VOTING) + if (n_connected == quorum) + StartElection(); + + /* Now send max-node-id to everyone participating in voting and wait their responses */ + for (int j = 0; j < n_walkeepers; j++) { - if (!WriteSocket(walkeeper[j].sock, &prop, sizeof(prop))) - { - ResetConnection(j); - } - else + if (walkeeper[j].state == SS_VOTING) { - walkeeper[j].asyncOffs = 0; - walkeeper[j].state = SS_WAIT_VERDICT; + if (!WriteSocket(walkeeper[j].sock, &prop, sizeof(prop))) + { + ResetConnection(j); + } + else + { + walkeeper[j].asyncOffs = 0; + walkeeper[j].state = SS_WAIT_VERDICT; + } } } } } } - } - break; - - case SS_WAIT_VERDICT: - /* Receive walkeeper response for our candidate */ - rc = ReadSocketAsync(wk->sock, - (char*)&wk->info.server.nodeId + wk->asyncOffs, - sizeof(wk->info.server.nodeId) - wk->asyncOffs); - if (rc < 0) - { - ResetConnection(i); - } - else if ((wk->asyncOffs += rc) == sizeof(wk->info.server.nodeId)) - { - /* Response completely received */ - - /* If server accept our candidate, then it returns it in response */ - if (CompareNodeId(&wk->info.server.nodeId, &prop.nodeId) != 0) + break; + + case SS_WAIT_VERDICT: + /* Receive walkeeper response for our candidate */ + rc = ReadSocketAsync(wk->sock, + (char*)&wk->info.server.nodeId + wk->asyncOffs, + sizeof(wk->info.server.nodeId) - wk->asyncOffs); + if (rc < 0) { - elog(FATAL, "WalKeeper %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", - wk->host, wk->port, - wk->info.server.nodeId.term, prop.nodeId.term); + ResetConnection(i); } - else + else if ((wk->asyncOffs += rc) == sizeof(wk->info.server.nodeId)) { - /* Handshake completed, do we have quorum? */ - wk->state = SS_IDLE; - if (++n_votes == quorum) - { - elog(LOG, "Successfully established connection with %d nodes, VCL %X/%X", - quorum, - (uint32) (prop.VCL >> 32), (uint32) (prop.VCL) - ); + /* Response completely received */ - /* Check if not all safekeepers are up-to-date, we need to download WAL needed to synchronize them */ - if (restartLsn != prop.VCL) - { - /* Perform recovery */ - if (!WalProposerRecovery(leader, serverInfo.timeline, restartLsn, prop.VCL)) - elog(FATAL, "Failed to recover state"); - } - WalProposerStartStreaming(prop.VCL); - /* Should not return here */ + /* If server accept our candidate, then it returns it in response */ + if (CompareNodeId(&wk->info.server.nodeId, &prop.nodeId) != 0) + { + elog(FATAL, "WalKeeper %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", + wk->host, wk->port, + wk->info.server.nodeId.term, prop.nodeId.term); } else { - /* We are already streaming WAL: send all pending messages to the attached walkeeper */ - SendMessageToNode(i, msgQueueHead); + /* Handshake completed, do we have quorum? */ + wk->state = SS_IDLE; + if (++n_votes == quorum) + { + elog(LOG, "Successfully established connection with %d nodes, VCL %X/%X", + quorum, + (uint32) (prop.VCL >> 32), (uint32) (prop.VCL) + ); + + /* Check if not all safekeepers are up-to-date, we need to download WAL needed to synchronize them */ + if (restartLsn != prop.VCL) + { + /* Perform recovery */ + if (!WalProposerRecovery(leader, serverInfo.timeline, restartLsn, prop.VCL)) + elog(FATAL, "Failed to recover state"); + } + WalProposerStartStreaming(prop.VCL); + /* Should not return here */ + } + else + { + /* We are already streaming WAL: send all pending messages to the attached walkeeper */ + SendMessageToNode(i, msgQueueHead); + } } } - } - break; - - case SS_RECV_FEEDBACK: - /* Read walkeeper response with flushed WAL position */ - rc = ReadSocketAsync(wk->sock, - (char*)&wk->feedback + wk->asyncOffs, - sizeof(wk->feedback) - wk->asyncOffs); - if (rc < 0) - { - ResetConnection(i); - } - else if ((wk->asyncOffs += rc) == sizeof(wk->feedback)) - { - WalMessage* next = wk->currMsg->next; - Assert(wk->feedback.flushLsn == wk->currMsg->req.endLsn); - wk->currMsg->ackMask |= 1 << i; /* this walkeeper confirms receiving of this message */ - wk->state = SS_IDLE; - wk->asyncOffs = 0; - wk->currMsg = NULL; - HandleWalKeeperResponse(); - SendMessageToNode(i, next); - - /* - * Also send the new VCL to all the walkeepers. - * - * FIXME: This is redundant for walkeepers that have other outbound messages - * pending. - */ - if (true) + break; + + case SS_RECV_FEEDBACK: + /* Read walkeeper response with flushed WAL position */ + rc = ReadSocketAsync(wk->sock, + (char*)&wk->feedback + wk->asyncOffs, + sizeof(wk->feedback) - wk->asyncOffs); + if (rc < 0) { - XLogRecPtr minQuorumLsn = GetAcknowledgedByQuorumWALPosition(); - WalMessage *vclUpdateMsg; - - if (minQuorumLsn > lastSentVCLLsn) + ResetConnection(i); + } + else if ((wk->asyncOffs += rc) == sizeof(wk->feedback)) + { + WalMessage* next = wk->currMsg->next; + Assert(wk->feedback.flushLsn == wk->currMsg->req.endLsn); + wk->currMsg->ackMask |= 1 << i; /* this walkeeper confirms receiving of this message */ + wk->state = SS_IDLE; + wk->asyncOffs = 0; + wk->currMsg = NULL; + HandleWalKeeperResponse(); + SendMessageToNode(i, next); + + /* + * Also send the new VCL to all the walkeepers. + * + * FIXME: This is redundant for walkeepers that have other outbound messages + * pending. + */ + if (true) { - vclUpdateMsg = CreateMessageVCLOnly(); - if (vclUpdateMsg) - BroadcastMessage(vclUpdateMsg); - lastSentVCLLsn = minQuorumLsn; + XLogRecPtr minQuorumLsn = GetAcknowledgedByQuorumWALPosition(); + WalMessage *vclUpdateMsg; + + if (minQuorumLsn > lastSentVCLLsn) + { + vclUpdateMsg = CreateMessageVCLOnly(); + if (vclUpdateMsg) + BroadcastMessage(vclUpdateMsg); + lastSentVCLLsn = minQuorumLsn; + } } } - } - break; - - case SS_IDLE: - elog(WARNING, "WalKeeper %s:%s drops connection", wk->host, wk->port); - ResetConnection(i); - break; + break; + case SS_IDLE: + elog(WARNING, "WalKeeper %s:%s drops connection", wk->host, wk->port); + ResetConnection(i); + break; - default: - elog(FATAL, "Unexpected walkeeper %s:%s read state %d", wk->host, wk->port, wk->state); + default: + elog(FATAL, "Unexpected walkeeper %s:%s read state %d", wk->host, wk->port, wk->state); + } } - } - else if (event.events & WL_SOCKET_WRITEABLE) - { - switch (wk->state) + else if (event.events & WL_SOCKET_WRITEABLE) { - case SS_CONNECTING: + switch (wk->state) { - int optval = 0; - ACCEPT_TYPE_ARG3 optlen = sizeof(optval); - if (getsockopt(wk->sock, SOL_SOCKET, SO_ERROR, (char *) &optval, &optlen) < 0 || optval != 0) + case SS_CONNECTING: { - elog(WARNING, "Failed to connect to node '%s:%s': %s", - wk->host, wk->port, - strerror(optval)); - closesocket(wk->sock); - wk->sock = PGINVALID_SOCKET; - wk->state = SS_OFFLINE; - ResetWalProposerEventSet(); - } - else - { - uint32 len = 0; - ModifyWaitEvent(waitEvents, wk->eventPos, WL_SOCKET_READABLE, NULL); - /* - * Start handshake: send information about server. - * First of all send 0 as package size: it allows walkeeper to distinguish - * wal_proposer's connection from standard replication connection from pagers. - */ - if (WriteSocket(wk->sock, &len, sizeof len) - && WriteSocket(wk->sock, &serverInfo, sizeof serverInfo)) + int optval = 0; + ACCEPT_TYPE_ARG3 optlen = sizeof(optval); + if (getsockopt(wk->sock, SOL_SOCKET, SO_ERROR, (char *) &optval, &optlen) < 0 || optval != 0) { - wk->state = SS_HANDSHAKE; - wk->asyncOffs = 0; + elog(WARNING, "Failed to connect to node '%s:%s': %s", + wk->host, wk->port, + strerror(optval)); + closesocket(wk->sock); + wk->sock = PGINVALID_SOCKET; + wk->state = SS_OFFLINE; + ResetWalProposerEventSet(); } else { - ResetConnection(i); + uint32 len = 0; + ModifyWaitEvent(waitEvents, wk->eventPos, WL_SOCKET_READABLE, NULL); + /* + * Start handshake: send information about server. + * First of all send 0 as package size: it allows walkeeper to distinguish + * wal_proposer's connection from standard replication connection from pagers. + */ + if (WriteSocket(wk->sock, &len, sizeof len) + && WriteSocket(wk->sock, &serverInfo, sizeof serverInfo)) + { + wk->state = SS_HANDSHAKE; + wk->asyncOffs = 0; + } + else + { + ResetConnection(i); + } } + break; } - break; - } - case SS_SEND_WAL: - rc = WriteSocketAsync(wk->sock, (char*)&wk->currMsg->req + wk->asyncOffs, wk->currMsg->size - wk->asyncOffs); - if (rc < 0) - { - ResetConnection(i); - } - else if ((wk->asyncOffs += rc) == wk->currMsg->size) - { - /* WAL block completely sent */ - wk->state = SS_RECV_FEEDBACK; - wk->asyncOffs = 0; - ModifyWaitEvent(waitEvents, wk->eventPos, WL_SOCKET_READABLE, NULL); - } - break; + case SS_SEND_WAL: + rc = WriteSocketAsync(wk->sock, (char*)&wk->currMsg->req + wk->asyncOffs, wk->currMsg->size - wk->asyncOffs); + if (rc < 0) + { + ResetConnection(i); + } + else if ((wk->asyncOffs += rc) == wk->currMsg->size) + { + /* WAL block completely sent */ + wk->state = SS_RECV_FEEDBACK; + wk->asyncOffs = 0; + ModifyWaitEvent(waitEvents, wk->eventPos, WL_SOCKET_READABLE, NULL); + } + break; - default: - elog(FATAL, "Unexpected write state %d", wk->state); + default: + elog(FATAL, "Unexpected write state %d", wk->state); + } } } - ReconnectWalKeepers(); /* @@ -883,12 +879,11 @@ WalProposerPoll(void) * each wal flush), then exit loop. (no need for pm death check due to * WL_EXIT_ON_PM_DEATH) */ - if (event.events & WL_LATCH_SET) + if (rc != 0 && (event.events & WL_LATCH_SET)) { ResetLatch(MyLatch); break; } - } }