diff --git a/internal/pkg/service/buffer/worker/distribution/listener_test.go b/internal/pkg/service/buffer/worker/distribution/listener_test.go index e52b66aba2..ab2304ae62 100644 --- a/internal/pkg/service/buffer/worker/distribution/listener_test.go +++ b/internal/pkg/service/buffer/worker/distribution/listener_test.go @@ -22,7 +22,7 @@ func TestOnChangeListener(t *testing.T) { t.Parallel() clk := clock.NewMock() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() var node1 *Node @@ -51,20 +51,20 @@ func TestOnChangeListener(t *testing.T) { _, d2 = createNode(t, clk, nil, etcdNamespace, "node2") assert.Eventually(t, func() bool { return strings.Contains(listenerLogs.String(), `found a new node "node2"`) - }, time.Second, 10*time.Millisecond, "timeout") + }, 10*time.Second, 10*time.Millisecond, "timeout") // Add node 3 _, d3 = createNode(t, clk, nil, etcdNamespace, "node3") assert.Eventually(t, func() bool { return strings.Contains(listenerLogs.String(), `found a new node "node3"`) - }, time.Second, 10*time.Millisecond, "timeout") + }, 10*time.Second, 10*time.Millisecond, "timeout") // Stop node 2 d2.Process().Shutdown(errors.New("test")) d2.Process().WaitForShutdown() assert.Eventually(t, func() bool { return strings.Contains(listenerLogs.String(), `the node "node2" gone`) - }, time.Second, 10*time.Millisecond, "timeout") + }, 10*time.Second, 10*time.Millisecond, "timeout") // Stop listener listener.Stop() diff --git a/internal/pkg/service/buffer/worker/service/conditions_test.go b/internal/pkg/service/buffer/worker/service/conditions_test.go index 152c8fb2d5..75a2185859 100644 --- a/internal/pkg/service/buffer/worker/service/conditions_test.go +++ b/internal/pkg/service/buffer/worker/service/conditions_test.go @@ -108,18 +108,16 @@ func TestConditionsChecker(t *testing.T) { // Check conditions checker logs wildcards.Assert(t, ` %A -[service][conditions]INFO checked "1" opened slices | %s -[service][conditions]INFO checked "1" opened slices | %s -[service][conditions]INFO checked "1" opened slices | %s [service][conditions]INFO closing slice "00000123/my-receiver-B/my-export-2/0001-01-01T00:00:02.000Z/0001-01-01T00:00:02.000Z": time threshold met, opened at: 0001-01-01T00:00:02.000Z, passed: 1m30s threshold: 1m0s %A `, strhelper.FilterLines(`^(\[service\]\[conditions\])`, workerDeps1.DebugLogger().AllMessages())) wildcards.Assert(t, ` %A -[service][conditions]INFO checked "1" opened slices | %s -[service][conditions]INFO checked "1" opened slices | %s [service][conditions]INFO closing slice "00000123/my-receiver-A/my-export-1/0001-01-01T00:00:02.000Z/0001-01-01T00:00:02.000Z": time threshold met, opened at: 0001-01-01T00:00:02.000Z, passed: 1m0s threshold: 1m0s -[service][conditions]INFO checked "1" opened slices | %s +%A +`, strhelper.FilterLines(`^(\[service\]\[conditions\])`, workerDeps2.DebugLogger().AllMessages())) + wildcards.Assert(t, ` +%A [service][conditions]INFO closing file "00000123/my-receiver-A/my-export-1/0001-01-01T00:00:02.000Z": size threshold met, received: 250KB, threshold: 200KB %A `, strhelper.FilterLines(`^(\[service\]\[conditions\])`, workerDeps2.DebugLogger().AllMessages())) diff --git a/internal/pkg/service/buffer/worker/task/node_test.go b/internal/pkg/service/buffer/worker/task/node_test.go index a740a59b76..cbe29b5a8f 100644 --- a/internal/pkg/service/buffer/worker/task/node_test.go +++ b/internal/pkg/service/buffer/worker/task/node_test.go @@ -177,7 +177,7 @@ task/00000123/my-receiver/my-export-1/%s func TestFailedTask(t *testing.T) { t.Parallel() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() exportKey := exportKeyForTest() diff --git a/internal/pkg/service/common/etcdclient/etcdclient.go b/internal/pkg/service/common/etcdclient/etcdclient.go index 37b8372095..e76b6af543 100644 --- a/internal/pkg/service/common/etcdclient/etcdclient.go +++ b/internal/pkg/service/common/etcdclient/etcdclient.go @@ -168,6 +168,7 @@ func New(ctx context.Context, proc *servicectx.Process, tracer trace.Tracer, end Username: conf.username, // optional Password: conf.password, // optional Logger: etcdLogger, + PermitWithoutStream: true, // always send keep-alive pings DialOptions: []grpc.DialOption{ grpc.WithBlock(), // wait for the connection grpc.WithReturnConnectionError(), diff --git a/provisioning/common/etcd/values.yaml b/provisioning/common/etcd/values.yaml index a36913ae4c..b9ac05316e 100644 --- a/provisioning/common/etcd/values.yaml +++ b/provisioning/common/etcd/values.yaml @@ -62,7 +62,7 @@ auth: allowNoneAuthentication: false token: type: jwt - ttl: 10m + ttl: 10080m # temporary: https://github.com/etcd-io/etcd/pull/14995 # Only internal communication is allowed: no public IP service: