Skip to content

Commit

Permalink
fix(sync): handle sync node going awol
Browse files Browse the repository at this point in the history
  • Loading branch information
nitsujlangston committed Jan 27, 2019
1 parent 9082d3d commit 372b273
Showing 1 changed file with 26 additions and 18 deletions.
44 changes: 26 additions & 18 deletions packages/bitcore-node/src/services/p2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export class P2pWorker {
private invCacheLimits: any;
private initialSyncComplete: boolean;
private isSyncingNode: boolean;
private syncingNodeHeartBeat?: NodeJS.Timer;
private stopping?: boolean;
private blockModel: BlockModel;
constructor({ chain, network, chainConfig, blockModel = BlockStorage }) {
this.blockModel = blockModel;
Expand Down Expand Up @@ -389,43 +389,51 @@ export class P2pWorker {
this.syncing = false;
}

registerSyncingNode() {
this.syncingNodeHeartBeat = setInterval(async () => {
async registerSyncingNode() {
while(!this.stopping) {
const syncingNode = await StateStorage.getSyncingNode({ chain: this.chain, network: this.network });
if (!syncingNode) {
return StateStorage.selfNominateSyncingNode({
StateStorage.selfNominateSyncingNode({
chain: this.chain,
network: this.network,
lastHeartBeat: syncingNode
});
continue;
}
const [hostname, pid] = syncingNode.split(':');
const amSyncingNode = hostname === os.hostname() && pid === process.pid.toString();
const [hostname, pid, timestamp] = syncingNode.split(':');
const amSyncingNode = hostname === os.hostname() && pid === process.pid.toString() && Date.now() - parseInt(timestamp) < 1000;
if (amSyncingNode) {
StateStorage.selfNominateSyncingNode({ chain: this.chain, network: this.network, lastHeartBeat: syncingNode });
StateStorage.selfNominateSyncingNode({
chain: this.chain,
network: this.network,
lastHeartBeat: syncingNode
});
if (!this.isSyncingNode) {
logger.info(`This worker is now the syncing node for ${this.chain} ${this.network}`);
this.isSyncingNode = true;
this.sync();
}
} else {
setTimeout(() => {
StateStorage.selfNominateSyncingNode({
chain: this.chain,
network: this.network,
lastHeartBeat: syncingNode
});
}, 10000);
if (this.isSyncingNode) {
logger.info(`This worker is no longer syncing node for ${this.chain} ${this.network}`);
this.isSyncingNode = false;
await new Promise(resolve => setTimeout(resolve, 100000));
}
await new Promise(resolve => setTimeout(resolve, 10000));
StateStorage.selfNominateSyncingNode({
chain: this.chain,
network: this.network,
lastHeartBeat: syncingNode
});
}
}, 500);
await new Promise(resolve => setTimeout(resolve, 500));
}
}

async stop() {
this.stopping = true;
logger.debug(`Stopping worker for chain ${this.chain}`);
await this.disconnect();
if (this.syncingNodeHeartBeat) {
clearInterval(this.syncingNodeHeartBeat);
}
}

async start() {
Expand Down

0 comments on commit 372b273

Please sign in to comment.