diff --git a/filebeat/input/filestream/internal/input-logfile/harvester_test.go b/filebeat/input/filestream/internal/input-logfile/harvester_test.go index 91615ae0381..d8800c85996 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester_test.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester_test.go @@ -31,9 +31,9 @@ import ( "github.com/elastic/beats/v7/filebeat/input/filestream/internal/task" input "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/tests/resources" - "github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipelinemock" "github.com/elastic/elastic-agent-libs/logp" ) @@ -393,7 +393,7 @@ func TestDefaultHarvesterGroup(t *testing.T) { func testDefaultHarvesterGroup(t *testing.T, mockHarvester Harvester) *defaultHarvesterGroup { return &defaultHarvesterGroup{ readers: newReaderGroup(), - pipeline: &pipelinemock.MockPipelineConnector{}, + pipeline: &MockPipeline{}, harvester: mockHarvester, store: testOpenStore(t, "test", nil), identifier: &sourceIdentifier{"filestream::.global::"}, @@ -465,3 +465,71 @@ func (tl *testLogger) Errorf(format string, args ...interface{}) { func (tl *testLogger) String() string { return (*strings.Builder)(tl).String() } + +// MockClient is a mock implementation of the beat.Client interface. +type MockClient struct { + published []beat.Event // Slice to store published events + + closed bool // Flag to indicate if the client is closed + mu sync.Mutex // Mutex to synchronize access to the published events slice +} + +// GetEvents returns all the events published by the mock client. +func (m *MockClient) GetEvents() []beat.Event { + m.mu.Lock() + defer m.mu.Unlock() + + return m.published +} + +// Publish publishes a single event. +func (m *MockClient) Publish(e beat.Event) { + es := make([]beat.Event, 1) + es = append(es, e) + + m.PublishAll(es) +} + +// PublishAll publishes multiple events. +func (m *MockClient) PublishAll(es []beat.Event) { + m.mu.Lock() + defer m.mu.Unlock() + + m.published = append(m.published, es...) +} + +// Close closes the mock client. +func (m *MockClient) Close() error { + m.mu.Lock() + defer m.mu.Unlock() + + if m.closed { + return fmt.Errorf("mock already closed") + } + + m.closed = true + return nil +} + +// MockPipeline is a mock implementation of the beat.Pipeline interface. +type MockPipeline struct { + c beat.Client // Client used by the pipeline + mu sync.Mutex // Mutex to synchronize access to the client +} + +// ConnectWith connects the mock pipeline with a client using the provided configuration. +func (mp *MockPipeline) ConnectWith(config beat.ClientConfig) (beat.Client, error) { + mp.mu.Lock() + defer mp.mu.Unlock() + + c := &MockClient{} + + mp.c = c + + return c, nil +} + +// Connect connects the mock pipeline with a client using the default configuration. +func (mp *MockPipeline) Connect() (beat.Client, error) { + return mp.ConnectWith(beat.ClientConfig{}) +}