Skip to content

Commit

Permalink
Merge pull request #105 from mariomac/async-pipeline-1
Browse files Browse the repository at this point in the history
Asynchronous pipeline, initial integration
  • Loading branch information
Mario Macias committed Mar 3, 2022
2 parents 13cbe75 + ffc605d commit e0d95f2
Show file tree
Hide file tree
Showing 12 changed files with 468 additions and 451 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/ip2location/ip2location-go/v9 v9.2.0
github.com/json-iterator/go v1.1.12
github.com/mitchellh/mapstructure v1.4.3
github.com/netobserv/gopipes v0.1.0
github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9
github.com/netsampler/goflow2 v1.0.4
github.com/prometheus/client_golang v1.12.1
Expand Down Expand Up @@ -74,6 +75,7 @@ require (
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/grpc v1.43.0 // indirect
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.66.2 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
Expand Down
126 changes: 11 additions & 115 deletions go.sum

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type Ingest struct {

type File struct {
Filename string
Loop bool
Chunks int
}

type Aws struct {
Expand Down
4 changes: 1 addition & 3 deletions pkg/pipeline/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

package ingest

type ProcessFunction func(entries []interface{})

type Ingester interface {
Ingest(ProcessFunction)
Ingest(out chan<- []interface{})
}
type IngesterNone struct {
}
10 changes: 5 additions & 5 deletions pkg/pipeline/ingest/ingest_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,15 @@ func (w *TransportWrapper) Send(_, data []byte) error {
}

// Ingest ingests entries from a network collector using goflow2 library (https://github.com/netsampler/goflow2)
func (ingestC *ingestCollector) Ingest(process ProcessFunction) {
func (ingestC *ingestCollector) Ingest(out chan<- []interface{}) {
ctx := context.Background()
ingestC.in = make(chan map[string]interface{}, channelSize)

// initialize background listeners (a.k.a.netflow+legacy collector)
ingestC.initCollectorListener(ctx)

// forever process log lines received by collector
ingestC.processLogLines(process)

ingestC.processLogLines(out)
}

func (ingestC *ingestCollector) initCollectorListener(ctx context.Context) {
Expand Down Expand Up @@ -140,7 +139,7 @@ func (ingestC *ingestCollector) initCollectorListener(ctx context.Context) {

}

func (ingestC *ingestCollector) processLogLines(process ProcessFunction) {
func (ingestC *ingestCollector) processLogLines(out chan<- []interface{}) {
var records []interface{}
for {
select {
Expand All @@ -153,7 +152,8 @@ func (ingestC *ingestCollector) processLogLines(process ProcessFunction) {
case <-time.After(time.Millisecond * batchMaxTimeInMilliSecs): // Maximum batch time for each batch
// Process batch of records (if not empty)
if len(records) > 0 {
process(records)
log.Debugf("ingestCollector sending %d entries", len(records))
out <- records
}
records = []interface{}{}
}
Expand Down
36 changes: 28 additions & 8 deletions pkg/pipeline/ingest/ingest_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,19 @@ import (
)

type IngestFile struct {
params config.Ingest
exitChan chan bool
PrevRecords []interface{}
params config.Ingest
exitChan chan bool
PrevRecords []interface{}
TotalRecords int
}

const delaySeconds = 10
const (
delaySeconds = 10
chunkLines = 100
)

// Ingest ingests entries from a file and resends the same data every delaySeconds seconds
func (ingestF *IngestFile) Ingest(process ProcessFunction) {
func (ingestF *IngestFile) Ingest(out chan<- []interface{}) {
lines := make([]interface{}, 0)
file, err := os.Open(ingestF.params.File.Filename)
if err != nil {
Expand All @@ -53,11 +57,14 @@ func (ingestF *IngestFile) Ingest(process ProcessFunction) {
log.Debugf("%s", text)
lines = append(lines, text)
}

log.Debugf("Ingesting %d log lines from %s", len(lines), ingestF.params.File.Filename)
switch ingestF.params.Type {
case "file":
ingestF.PrevRecords = lines
process(lines)
ingestF.TotalRecords = len(lines)
log.Debugf("ingestFile sending %d lines", len(lines))
out <- lines
case "file_loop":
// loop forever
ticker := time.NewTicker(time.Duration(delaySeconds) * time.Second)
Expand All @@ -67,9 +74,22 @@ func (ingestF *IngestFile) Ingest(process ProcessFunction) {
log.Debugf("exiting ingestFile because of signal")
return
case <-ticker.C:
log.Debugf("ingestFile; for loop; before process")
ingestF.PrevRecords = lines
process(lines)
ingestF.TotalRecords += len(lines)
log.Debugf("ingestFile sending %d lines", len(lines))
out <- lines
}
}
case "file_chunks":
// sends the lines in chunks. Useful for testing parallelization
ingestF.TotalRecords = len(lines)
for len(lines) > 0 {
if len(lines) > chunkLines {
out <- lines[:chunkLines]
lines = lines[chunkLines:]
} else {
out <- lines
lines = nil
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/pipeline/ingest/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,12 @@ const channelSizeKafka = 1000
const defaultBatchReadTimeout = int64(100)

// Ingest ingests entries from kafka topic
func (ingestK *ingestKafka) Ingest(process ProcessFunction) {
func (ingestK *ingestKafka) Ingest(out chan<- []interface{}) {
// initialize background listener
ingestK.kafkaListener()

// forever process log lines received by collector
ingestK.processLogLines(process)

ingestK.processLogLines(out)
}

// background thread to read kafka messages; place received items into ingestKafka input channel
Expand All @@ -79,7 +78,7 @@ func (ingestK *ingestKafka) kafkaListener() {
}

// read items from ingestKafka input channel, pool them, and send down the pipeline
func (ingestK *ingestKafka) processLogLines(process ProcessFunction) {
func (ingestK *ingestKafka) processLogLines(out chan<- []interface{}) {
var records []interface{}
duration := time.Duration(ingestK.kafkaParams.BatchReadTimeout) * time.Millisecond
for {
Expand All @@ -92,7 +91,8 @@ func (ingestK *ingestKafka) processLogLines(process ProcessFunction) {
case <-time.After(duration): // Maximum batch time for each batch
// Process batch of records (if not empty)
if len(records) > 0 {
process(records)
log.Debugf("ingestKafka sending %d records", len(records))
out <- records
ingestK.prevRecords = records
log.Debugf("prevRecords = %v", ingestK.prevRecords)
}
Expand Down
20 changes: 6 additions & 14 deletions pkg/pipeline/ingest/ingest_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,14 @@ func Test_NewIngestKafka2(t *testing.T) {
require.Equal(t, defaultBatchReadTimeout, ingestKafka.kafkaParams.BatchReadTimeout)
}

var receivedEntries []interface{}
var dummyChan chan bool

func dummyProcessFunction(entries []interface{}) {
receivedEntries = entries
dummyChan <- true
}

func Test_IngestKafka(t *testing.T) {
dummyChan = make(chan bool)
newIngest := initNewIngestKafka(t, testConfig1)
ingestKafka := newIngest.(*ingestKafka)
ingestOutput := make(chan []interface{})

// run Ingest in a separate thread
go func() {
ingestKafka.Ingest(dummyProcessFunction)
ingestKafka.Ingest(ingestOutput)
}()
// wait a second for the ingest pipeline to come up
time.Sleep(time.Second)
Expand All @@ -126,7 +118,7 @@ func Test_IngestKafka(t *testing.T) {
inChan <- record3

// wait for the data to have been processed
<-dummyChan
receivedEntries := <-ingestOutput

require.Equal(t, 3, len(receivedEntries))
require.Equal(t, record1, receivedEntries[0])
Expand Down Expand Up @@ -167,7 +159,7 @@ func (f *fakeKafkaReader) Config() kafkago.ReaderConfig {
}

func Test_KafkaListener(t *testing.T) {
dummyChan = make(chan bool)
ingestOutput := make(chan []interface{})
newIngest := initNewIngestKafka(t, testConfig1)
ingestKafka := newIngest.(*ingestKafka)

Expand All @@ -177,11 +169,11 @@ func Test_KafkaListener(t *testing.T) {

// run Ingest in a separate thread
go func() {
ingestKafka.Ingest(dummyProcessFunction)
ingestKafka.Ingest(ingestOutput)
}()

// wait for the data to have been processed
<-dummyChan
receivedEntries := <-ingestOutput

require.Equal(t, 1, len(receivedEntries))
require.Equal(t, string(fakeRecord), receivedEntries[0])
Expand Down
Loading

0 comments on commit e0d95f2

Please sign in to comment.