From d962b4d05f2419ef25fbd329b29cabf2b433b2f3 Mon Sep 17 00:00:00 2001 From: Anusha Date: Tue, 19 Sep 2023 18:02:06 -0700 Subject: [PATCH 1/6] OverMaxNeeded --- warp/aggregator/aggregation_job_test.go | 57 +++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/warp/aggregator/aggregation_job_test.go b/warp/aggregator/aggregation_job_test.go index a184138489..e4e9fd7936 100644 --- a/warp/aggregator/aggregation_job_test.go +++ b/warp/aggregator/aggregation_job_test.go @@ -211,6 +211,63 @@ func TestAggregateThresholdSignatures(t *testing.T) { }) } +func TestAggregateThresholdSignaturesOverMaxNeeded(t *testing.T) { + ctx := context.Background() + aggregationJob := newSignatureAggregationJob( + &mockFetcher{ + fetch: func(_ context.Context, nodeID ids.NodeID, _ *avalancheWarp.UnsignedMessage) (*bls.Signature, error) { + // Allow bls signatures from all nodes even though we only need 3/5 + for i, matchingNodeID := range nodeIDs { + if matchingNodeID == nodeID { + return blsSignatures[i], nil + } + } + return nil, errors.New("what do we say to the god of death") + }, + }, + pChainHeight, + subnetID, + 60, + 60, + 100, + &validators.TestState{ + GetSubnetIDF: getSubnetIDF, + GetCurrentHeightF: getCurrentHeightF, + GetValidatorSetF: func(ctx context.Context, height uint64, subnetID ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { + res := make(map[ids.NodeID]*validators.GetValidatorOutput) + for i := 0; i < 5; i++ { + res[nodeIDs[i]] = &validators.GetValidatorOutput{ + NodeID: nodeIDs[i], + PublicKey: blsPublicKeys[i], + Weight: 100, + } + } + return res, nil + }, + }, + unsignedMsg, + ) + + signature := &avalancheWarp.BitSetSignature{ + Signers: set.NewBits(0, 1, 2).Bytes(), + } + signedMessage, err := avalancheWarp.NewMessage(unsignedMsg, signature) + require.NoError(t, err) + aggregateSignature, err := bls.AggregateSignatures(blsSignatures) + require.NoError(t, err) + copy(signature.Signature[:], bls.SignatureToBytes(aggregateSignature)) + expectedRes := &AggregateSignatureResult{ + SignatureWeight: 300, + TotalWeight: 500, + Message: signedMessage, + } + executeSignatureAggregationTest(t, signatureAggregationTest{ + ctx: ctx, + job: aggregationJob, + expectedRes: expectedRes, + }) +} + func TestAggregateThresholdSignaturesInsufficientWeight(t *testing.T) { ctx := context.Background() aggregationJob := newSignatureAggregationJob( From 263b0adfd3d79825c5ff683cae465a98b5889523 Mon Sep 17 00:00:00 2001 From: Anusha Date: Tue, 19 Sep 2023 18:09:20 -0700 Subject: [PATCH 2/6] Testing if VerifyWeight cancels properly --- warp/aggregator/aggregation_job_test.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/warp/aggregator/aggregation_job_test.go b/warp/aggregator/aggregation_job_test.go index e4e9fd7936..2485e7e4a4 100644 --- a/warp/aggregator/aggregation_job_test.go +++ b/warp/aggregator/aggregation_job_test.go @@ -217,10 +217,16 @@ func TestAggregateThresholdSignaturesOverMaxNeeded(t *testing.T) { &mockFetcher{ fetch: func(_ context.Context, nodeID ids.NodeID, _ *avalancheWarp.UnsignedMessage) (*bls.Signature, error) { // Allow bls signatures from all nodes even though we only need 3/5 - for i, matchingNodeID := range nodeIDs { - if matchingNodeID == nodeID { - return blsSignatures[i], nil + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + for i, matchingNodeID := range nodeIDs { + if matchingNodeID == nodeID { + return blsSignatures[i], nil + } } + } return nil, errors.New("what do we say to the god of death") }, @@ -261,6 +267,7 @@ func TestAggregateThresholdSignaturesOverMaxNeeded(t *testing.T) { TotalWeight: 500, Message: signedMessage, } + // Why does this test get 500/500 signature weight? Shouldn't it cancel the context after we hit 300/500 or 60/100? executeSignatureAggregationTest(t, signatureAggregationTest{ ctx: ctx, job: aggregationJob, From dbd6d69ffd0c2a2423b5fac570fd2682bd35ba14 Mon Sep 17 00:00:00 2001 From: Anusha Date: Tue, 19 Sep 2023 18:12:14 -0700 Subject: [PATCH 3/6] fix lint --- warp/aggregator/aggregation_job_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/warp/aggregator/aggregation_job_test.go b/warp/aggregator/aggregation_job_test.go index 2485e7e4a4..c559fce961 100644 --- a/warp/aggregator/aggregation_job_test.go +++ b/warp/aggregator/aggregation_job_test.go @@ -216,17 +216,16 @@ func TestAggregateThresholdSignaturesOverMaxNeeded(t *testing.T) { aggregationJob := newSignatureAggregationJob( &mockFetcher{ fetch: func(_ context.Context, nodeID ids.NodeID, _ *avalancheWarp.UnsignedMessage) (*bls.Signature, error) { - // Allow bls signatures from all nodes even though we only need 3/5 select { case <-ctx.Done(): return nil, ctx.Err() default: + // Allow bls signatures from all nodes even though we only need 3/5 for i, matchingNodeID := range nodeIDs { if matchingNodeID == nodeID { return blsSignatures[i], nil } } - } return nil, errors.New("what do we say to the god of death") }, @@ -267,7 +266,8 @@ func TestAggregateThresholdSignaturesOverMaxNeeded(t *testing.T) { TotalWeight: 500, Message: signedMessage, } - // Why does this test get 500/500 signature weight? Shouldn't it cancel the context after we hit 300/500 or 60/100? + // This test is failing even though it shoudn't be. + // Why does this test get 500/500 signature weight? Shouldn't it the child context be canceled by VerifyWeight after we hit 300/500 or 60/100? executeSignatureAggregationTest(t, signatureAggregationTest{ ctx: ctx, job: aggregationJob, From 30322ef85492e1f5236918db2f6cf6cdb3957fba Mon Sep 17 00:00:00 2001 From: Anusha Date: Tue, 19 Sep 2023 18:14:26 -0700 Subject: [PATCH 4/6] pass in proper ctx --- warp/aggregator/aggregation_job_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warp/aggregator/aggregation_job_test.go b/warp/aggregator/aggregation_job_test.go index c559fce961..ec8003556a 100644 --- a/warp/aggregator/aggregation_job_test.go +++ b/warp/aggregator/aggregation_job_test.go @@ -215,7 +215,7 @@ func TestAggregateThresholdSignaturesOverMaxNeeded(t *testing.T) { ctx := context.Background() aggregationJob := newSignatureAggregationJob( &mockFetcher{ - fetch: func(_ context.Context, nodeID ids.NodeID, _ *avalancheWarp.UnsignedMessage) (*bls.Signature, error) { + fetch: func(ctx context.Context, nodeID ids.NodeID, _ *avalancheWarp.UnsignedMessage) (*bls.Signature, error) { select { case <-ctx.Done(): return nil, ctx.Err() From 3eb5d1a7c60d4ff3078d5df1894394c23206684f Mon Sep 17 00:00:00 2001 From: Anusha Date: Tue, 19 Sep 2023 18:16:15 -0700 Subject: [PATCH 5/6] comment --- warp/aggregator/aggregation_job_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warp/aggregator/aggregation_job_test.go b/warp/aggregator/aggregation_job_test.go index ec8003556a..b27daf7ea6 100644 --- a/warp/aggregator/aggregation_job_test.go +++ b/warp/aggregator/aggregation_job_test.go @@ -267,7 +267,7 @@ func TestAggregateThresholdSignaturesOverMaxNeeded(t *testing.T) { Message: signedMessage, } // This test is failing even though it shoudn't be. - // Why does this test get 500/500 signature weight? Shouldn't it the child context be canceled by VerifyWeight after we hit 300/500 or 60/100? + // Why does this test get 500/500 signature weight? Shouldn't the child context be canceled by VerifyWeight after we hit 300/500 or 60/100? executeSignatureAggregationTest(t, signatureAggregationTest{ ctx: ctx, job: aggregationJob, From 30c56bb97310ebf601d0737117081bbe3f6c191e Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Thu, 28 Sep 2023 14:21:12 -0700 Subject: [PATCH 6/6] TestAggregateThresholdSignaturesOverMaxNeeded nits --- warp/aggregator/aggregation_job.go | 20 ++++++++++++++------ warp/aggregator/aggregation_job_test.go | 6 ++---- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/warp/aggregator/aggregation_job.go b/warp/aggregator/aggregation_job.go index d08120668b..e0c84ef72e 100644 --- a/warp/aggregator/aggregation_job.go +++ b/warp/aggregator/aggregation_job.go @@ -65,7 +65,8 @@ func newSignatureAggregationJob( // Execute aggregates signatures for the requested message func (a *signatureAggregationJob) Execute(ctx context.Context) (*AggregateSignatureResult, error) { - log.Info("Fetching signature", "subnetID", a.subnetID, "height", a.height) + msgID := a.msg.ID() + log.Info("Fetching signature", "msgID", msgID, "subnetID", a.subnetID, "height", a.height) validators, totalWeight, err := avalancheWarp.GetCanonicalValidatorSet(ctx, a.state, a.height, a.subnetID) if err != nil { return nil, fmt.Errorf("failed to get validator set: %w", err) @@ -95,24 +96,31 @@ func (a *signatureAggregationJob) Execute(ctx context.Context) (*AggregateSignat wg.Add(1) go func() { defer wg.Done() - log.Info("Fetching warp signature", "nodeID", signatureJob.nodeID, "index", i) + log.Info("Fetching warp signature", "msgID", msgID, "nodeID", signatureJob.nodeID, "index", i) blsSignature, err := signatureJob.Execute(signatureFetchCtx) if err != nil { log.Info("Failed to fetch signature at index %d: %s", i, signatureJob) return } - log.Info("Retrieved warp signature", "nodeID", signatureJob.nodeID, "index", i, "signature", hexutil.Bytes(bls.SignatureToBytes(blsSignature))) - // Add the signature and check if we've reached the requested threshold + log.Info("Retrieved warp signature", "msgID", msgID, "nodeID", signatureJob.nodeID, "index", i, "signature", hexutil.Bytes(bls.SignatureToBytes(blsSignature))) + + // Obtain signatureLock for aggregating the signature weight signatureLock.Lock() defer signatureLock.Unlock() + // Exit early if context was cancelled + if err := signatureFetchCtx.Err(); err != nil { + return + } + + // Add the signature and check if we've reached the requested threshold blsSignatures = append(blsSignatures, blsSignature) bitSet.Add(i) - log.Info("Updated weight", "totalWeight", signatureWeight+signatureJob.weight, "addedWeight", signatureJob.weight) signatureWeight += signatureJob.weight + log.Info("Updated weight", "msgID", msgID, "signatureWeight", signatureWeight, "addedWeight", signatureJob.weight) // If the signature weight meets the requested threshold, cancel signature fetching if err := avalancheWarp.VerifyWeight(signatureWeight, totalWeight, a.maxNeededQuorumNum, a.quorumDen); err == nil { - log.Info("Verify weight passed, exiting aggregation early", "maxNeededQuorumNum", a.maxNeededQuorumNum, "totalWeight", totalWeight, "signatureWeight", signatureWeight) + log.Info("Verify weight passed, exiting aggregation early", "msgID", msgID, "maxNeededQuorumNum", a.maxNeededQuorumNum, "totalWeight", totalWeight, "signatureWeight", signatureWeight) signatureFetchCancel() } }() diff --git a/warp/aggregator/aggregation_job_test.go b/warp/aggregator/aggregation_job_test.go index b27daf7ea6..54fe8625d5 100644 --- a/warp/aggregator/aggregation_job_test.go +++ b/warp/aggregator/aggregation_job_test.go @@ -34,8 +34,8 @@ func executeSignatureAggregationTest(t testing.TB, test signatureAggregationTest t.Helper() res, err := test.job.Execute(test.ctx) + require.ErrorIs(t, err, test.expectedErr) if test.expectedErr != nil { - require.ErrorIs(t, err, test.expectedErr) return } @@ -240,7 +240,7 @@ func TestAggregateThresholdSignaturesOverMaxNeeded(t *testing.T) { GetCurrentHeightF: getCurrentHeightF, GetValidatorSetF: func(ctx context.Context, height uint64, subnetID ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { res := make(map[ids.NodeID]*validators.GetValidatorOutput) - for i := 0; i < 5; i++ { + for i := 0; i < len(nodeIDs); i++ { res[nodeIDs[i]] = &validators.GetValidatorOutput{ NodeID: nodeIDs[i], PublicKey: blsPublicKeys[i], @@ -266,8 +266,6 @@ func TestAggregateThresholdSignaturesOverMaxNeeded(t *testing.T) { TotalWeight: 500, Message: signedMessage, } - // This test is failing even though it shoudn't be. - // Why does this test get 500/500 signature weight? Shouldn't the child context be canceled by VerifyWeight after we hit 300/500 or 60/100? executeSignatureAggregationTest(t, signatureAggregationTest{ ctx: ctx, job: aggregationJob,