Skip to content

Commit

Permalink
fix: unknown block sync to subscribe/unsubscribe to network events (#…
Browse files Browse the repository at this point in the history
…5654)

* fix: unknown block sync to subscribe/unsubscribe to network events

* fix: UnknownBlockSync sim test

* fix: correct the condition to subscribe unknownBlock to network
  • Loading branch information
twoeths authored Jun 18, 2023
1 parent 4313726 commit 483aad7
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 35 deletions.
5 changes: 5 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ export function createLodestarMetrics(
},

syncUnknownBlock: {
switchNetworkSubscriptions: register.gauge<"action">({
name: "lodestar_sync_unknown_block_network_subscriptions_count",
help: "Switch network subscriptions on/off",
labelNames: ["action"],
}),
requests: register.gauge<"type">({
name: "lodestar_sync_unknown_block_requests_total",
help: "Total number of unknown block events or requests",
Expand Down
63 changes: 39 additions & 24 deletions packages/beacon-node/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,14 @@ export class BeaconSync implements IBeaconSync {

// Subscribe to RangeSync completing a SyncChain and recompute sync state
if (!opts.disableRangeSync) {
// prod code
this.logger.debug("RangeSync enabled.");
this.rangeSync.on(RangeSyncEvent.completedChain, this.updateSyncState);
this.network.events.on(NetworkEvent.peerConnected, this.addPeer);
this.network.events.on(NetworkEvent.peerDisconnected, this.removePeer);
} else {
// test code, this is needed for Unknown block sync sim test
this.unknownBlockSync.subscribeToNetwork();
this.logger.debug("RangeSync disabled.");
}

Expand Down Expand Up @@ -196,36 +199,48 @@ export class BeaconSync implements IBeaconSync {
const state = this.state; // Don't run the getter twice

// We have become synced, subscribe to all the gossip core topics
if (
state === SyncState.Synced &&
!this.network.isSubscribedToGossipCoreTopics() &&
this.chain.clock.currentEpoch >= MIN_EPOCH_TO_START_GOSSIP
) {
this.network
.subscribeGossipCoreTopics()
.then(() => {
this.metrics?.syncSwitchGossipSubscriptions.inc({action: "subscribed"});
this.logger.info("Subscribed gossip core topics");
})
.catch((e) => {
this.logger.error("Error subscribing to gossip core topics", {}, e);
});
if (state === SyncState.Synced && this.chain.clock.currentEpoch >= MIN_EPOCH_TO_START_GOSSIP) {
if (!this.network.isSubscribedToGossipCoreTopics()) {
this.network
.subscribeGossipCoreTopics()
.then(() => {
this.metrics?.syncSwitchGossipSubscriptions.inc({action: "subscribed"});
this.logger.info("Subscribed gossip core topics");
})
.catch((e) => {
this.logger.error("Error subscribing to gossip core topics", {}, e);
});
}

// also start searching for unknown blocks
if (!this.unknownBlockSync.isSubscribedToNetwork()) {
this.unknownBlockSync.subscribeToNetwork();
this.metrics?.syncUnknownBlock.switchNetworkSubscriptions.inc({action: "subscribed"});
}
}

// If we stopped being synced and falled significantly behind, stop gossip
else if (state !== SyncState.Synced && this.network.isSubscribedToGossipCoreTopics()) {
else if (state !== SyncState.Synced) {
const syncDiff = this.chain.clock.currentSlot - this.chain.forkChoice.getHead().slot;
if (syncDiff > this.slotImportTolerance * 2) {
this.logger.warn(`Node sync has fallen behind by ${syncDiff} slots`);
this.network
.unsubscribeGossipCoreTopics()
.then(() => {
this.metrics?.syncSwitchGossipSubscriptions.inc({action: "unsubscribed"});
this.logger.info("Un-subscribed gossip core topics");
})
.catch((e) => {
this.logger.error("Error unsubscribing to gossip core topics", {}, e);
});
if (this.network.isSubscribedToGossipCoreTopics()) {
this.network
.unsubscribeGossipCoreTopics()
.then(() => {
this.metrics?.syncSwitchGossipSubscriptions.inc({action: "unsubscribed"});
this.logger.info("Un-subscribed gossip core topics");
})
.catch((e) => {
this.logger.error("Error unsubscribing to gossip core topics", {}, e);
});
}

// also stop searching for unknown blocks
if (this.unknownBlockSync.isSubscribedToNetwork()) {
this.unknownBlockSync.unsubscribeFromNetwork();
this.metrics?.syncUnknownBlock.switchNetworkSubscriptions.inc({action: "unsubscribed"});
}
}
}
};
Expand Down
38 changes: 27 additions & 11 deletions packages/beacon-node/src/sync/unknownBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,17 @@ export class UnknownBlockSync {
private readonly knownBadBlocks = new Set<RootHex>();
private readonly proposerBoostSecWindow: number;
private readonly maxPendingBlocks;
private subscribedToNetworkEvents = false;

constructor(
private readonly config: ChainForkConfig,
private readonly network: INetwork,
private readonly chain: IBeaconChain,
private readonly logger: Logger,
private readonly metrics: Metrics | null,
opts?: SyncOptions
private readonly opts?: SyncOptions
) {
if (!opts?.disableUnknownBlockSync) {
this.logger.debug("UnknownBlockSync enabled.");
this.network.events.on(NetworkEvent.unknownBlock, this.onUnknownBlock);
this.network.events.on(NetworkEvent.unknownBlockParent, this.onUnknownParent);
this.network.events.on(NetworkEvent.peerConnected, this.triggerUnknownBlockSearch);
} else {
this.logger.debug("UnknownBlockSync disabled.");
}
this.maxPendingBlocks = opts?.maxPendingBlocks ?? MAX_PENDING_BLOCKS;

this.proposerBoostSecWindow = this.config.SECONDS_PER_SLOT / INTERVALS_PER_SLOT;

if (metrics) {
Expand All @@ -61,12 +53,36 @@ export class UnknownBlockSync {
}
}

close(): void {
subscribeToNetwork(): void {
if (!this.opts?.disableUnknownBlockSync) {
// cannot chain to the above if or the log will be incorrect
if (!this.subscribedToNetworkEvents) {
this.logger.debug("UnknownBlockSync enabled.");
this.network.events.on(NetworkEvent.unknownBlock, this.onUnknownBlock);
this.network.events.on(NetworkEvent.unknownBlockParent, this.onUnknownParent);
this.network.events.on(NetworkEvent.peerConnected, this.triggerUnknownBlockSearch);
this.subscribedToNetworkEvents = true;
}
} else {
this.logger.debug("UnknownBlockSync disabled.");
}
}

unsubscribeFromNetwork(): void {
this.network.events.off(NetworkEvent.unknownBlock, this.onUnknownBlock);
this.network.events.off(NetworkEvent.unknownBlockParent, this.onUnknownParent);
this.network.events.off(NetworkEvent.peerConnected, this.triggerUnknownBlockSearch);
}

close(): void {
this.unsubscribeFromNetwork();
// add more in the future if needed
}

isSubscribedToNetwork(): boolean {
return this.subscribedToNetworkEvents;
}

/**
* Process an unknownBlock event and register the block in `pendingBlocks` Map.
*/
Expand Down
3 changes: 3 additions & 0 deletions packages/beacon-node/test/unit/sync/unknownBlock.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ describe("sync / UnknownBlockSync", () => {
...defaultSyncOptions,
maxPendingBlocks,
});
syncService.subscribeToNetwork();
if (event === NetworkEvent.unknownBlockParent) {
network.events?.emit(NetworkEvent.unknownBlockParent, {
blockInput: getBlockInput.preDeneb(config, blockC, BlockSource.gossip),
Expand Down Expand Up @@ -220,6 +221,8 @@ describe("sync / UnknownBlockSync", () => {
"Wrong blocks in mock ForkChoice"
);
}

syncService.close();
});
}
});

0 comments on commit 483aad7

Please sign in to comment.