Skip to content

Add New Source

Chris Lu edited this page Oct 24, 2018 · 4 revisions

Data Source API

Here is how the new plugin system works. Each data source need to implement one function:

type Sourcer interface {
	Generate(*Flow) *Dataset
}

Simple enough?

How it works

Here are the an example implementation for Cassandra data source.

func (s *CassandraSource) Generate(f *flow.Flow) *flow.Dataset {
	return s.genShardInfos(f).RoundRobin(s.Concurrency).Mapper(MapperReadShard)
}

genShardInfos(f) will generate an initial dataset with a list of CassandraShardInfo objects. The CassandraShardInfo objects are distributed to a few number of executors. Each executor takes one CassandraShardInfo object, connect to the source, and read the corresponding Cassandra shard.

It should be obvious that since CassandraShardInfo objects are sent to remote executors, the objects should be serializable and deserializable.

With this simple Sourcer interface, an actual data source can be implemented in pure Go in any way you want.

Notes

  1. The MapperReadShard is a pure Go function.

  2. CassandraShardInfo object should be serializable and deserializable.

  3. Data partitioning is determined by the number of CassandraShardInfo objects generated from genShardInfos(f). One CassandraShardInfo object corresponds to one data partition.

See cassandra_source.go for the example.