Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for ES index aliases / rollover to the dependency store (Resolves #2143) #2144

Merged
merged 3 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cmd/es-index-cleaner/app/index_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (i *IndexFilter) filter(indices []client.Index) []client.Index {
// archive works only for rollover
reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-span-archive-\\d{6}", i.IndexPrefix))
} else if i.Rollover {
reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-(span|service)-\\d{6}", i.IndexPrefix))
reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-(span|service|dependencies)-\\d{6}", i.IndexPrefix))
} else {
reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-(span|service|dependencies)-\\d{4}%s\\d{2}%s\\d{2}", i.IndexPrefix, i.IndexDateSeparator, i.IndexDateSeparator))
}
Expand All @@ -58,7 +58,10 @@ func (i *IndexFilter) filter(indices []client.Index) []client.Index {
for _, in := range indices {
if reg.MatchString(in.Index) {
// index in write alias cannot be removed
if in.Aliases[i.IndexPrefix+"jaeger-span-write"] || in.Aliases[i.IndexPrefix+"jaeger-service-write"] || in.Aliases[i.IndexPrefix+"jaeger-span-archive-write"] {
if in.Aliases[i.IndexPrefix+"jaeger-span-write"] ||
in.Aliases[i.IndexPrefix+"jaeger-service-write"] ||
in.Aliases[i.IndexPrefix+"jaeger-span-archive-write"] ||
in.Aliases[i.IndexPrefix+"jaeger-dependencies-write"] {
continue
}
filtered = append(filtered, in)
Expand Down
5 changes: 5 additions & 0 deletions cmd/es-rollover/app/index_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ func RolloverIndices(archive bool, prefix string) []IndexOption {
Mapping: "jaeger-service",
indexType: "jaeger-service",
},
{
prefix: prefix,
Mapping: "jaeger-dependencies",
indexType: "jaeger-dependencies",
},
}
}

Expand Down
14 changes: 14 additions & 0 deletions cmd/es-rollover/app/index_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ func TestRolloverIndices(t *testing.T) {
writeAliasName: "jaeger-service-write",
initialRolloverIndex: "jaeger-service-000001",
},
{
templateName: "jaeger-dependencies",
mapping: "jaeger-dependencies",
readAliasName: "jaeger-dependencies-read",
writeAliasName: "jaeger-dependencies-write",
initialRolloverIndex: "jaeger-dependencies-000001",
},
},
},
{
Expand Down Expand Up @@ -99,6 +106,13 @@ func TestRolloverIndices(t *testing.T) {
writeAliasName: "mytenant-jaeger-service-write",
initialRolloverIndex: "mytenant-jaeger-service-000001",
},
{
mapping: "jaeger-dependencies",
templateName: "mytenant-jaeger-dependencies",
readAliasName: "mytenant-jaeger-dependencies-read",
writeAliasName: "mytenant-jaeger-dependencies-write",
initialRolloverIndex: "mytenant-jaeger-dependencies-000001",
},
},
},
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/esmapping-generator/app/renderer/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
)

var supportedMappings = map[string]struct{}{
"jaeger-span": {},
"jaeger-service": {},
"jaeger-span": {},
"jaeger-service": {},
"jaeger-dependencies": {},
}

// GetMappingAsString returns rendered index templates as string
Expand Down
75 changes: 50 additions & 25 deletions plugin/storage/es/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"
"time"

"github.com/olivere/elastic"
Expand All @@ -32,38 +31,54 @@ import (
)

const (
dependencyType = "dependencies"
dependencyIndex = "jaeger-dependencies-"
dependencyType = "dependencies"
dependencyIndex = "jaeger-dependencies-"
indexPrefixSeparator = "-"
)

// DependencyStore handles all queries and insertions to ElasticSearch dependencies
type DependencyStore struct {
client es.Client
logger *zap.Logger
indexPrefix string
indexDateLayout string
maxDocCount int
client es.Client
logger *zap.Logger
dependencyIndexPrefix string
indexDateLayout string
maxDocCount int
useReadWriteAliases bool
}

// DependencyStoreParams holds constructor parameters for NewDependencyStore
type DependencyStoreParams struct {
Client es.Client
Logger *zap.Logger
IndexPrefix string
IndexDateLayout string
MaxDocCount int
UseReadWriteAliases bool
}

// NewDependencyStore returns a DependencyStore
func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix, indexDateLayout string, maxDocCount int) *DependencyStore {
var prefix string
if indexPrefix != "" && !strings.HasSuffix(indexPrefix, "-") {
prefix = indexPrefix + "-"
}
func NewDependencyStore(p DependencyStoreParams) *DependencyStore {
return &DependencyStore{
client: client,
logger: logger,
indexPrefix: prefix + dependencyIndex,
indexDateLayout: indexDateLayout,
maxDocCount: maxDocCount,
client: p.Client,
logger: p.Logger,
dependencyIndexPrefix: prefixIndexName(p.IndexPrefix, dependencyIndex),
indexDateLayout: p.IndexDateLayout,
maxDocCount: p.MaxDocCount,
useReadWriteAliases: p.UseReadWriteAliases,
}
}

func prefixIndexName(prefix, index string) string {
if prefix != "" {
return prefix + indexPrefixSeparator + index
}
return index
}

// WriteDependencies implements dependencystore.Writer#WriteDependencies.
func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.DependencyLink) error {
indexName := indexWithDate(s.indexPrefix, s.indexDateLayout, ts)
s.writeDependencies(indexName, ts, dependencies)
writeIndexName := s.getWriteIndex(ts)
s.writeDependencies(writeIndexName, ts, dependencies)
return nil
}

Expand All @@ -85,7 +100,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
indices := getIndices(s.indexPrefix, s.indexDateLayout, endTs, lookback)
indices := s.getReadIndices(endTs, lookback)
searchResult, err := s.client.Search(indices...).
Size(s.maxDocCount).
Query(buildTSQuery(endTs, lookback)).
Expand All @@ -112,18 +127,28 @@ func buildTSQuery(endTs time.Time, lookback time.Duration) elastic.Query {
return elastic.NewRangeQuery("timestamp").Gte(endTs.Add(-lookback)).Lte(endTs)
}

func getIndices(prefix, dateLayout string, ts time.Time, lookback time.Duration) []string {
func (s *DependencyStore) getReadIndices(ts time.Time, lookback time.Duration) []string {
if s.useReadWriteAliases {
frittentheke marked this conversation as resolved.
Show resolved Hide resolved
return []string{s.dependencyIndexPrefix + "read"}
}
var indices []string
firstIndex := indexWithDate(prefix, dateLayout, ts.Add(-lookback))
currentIndex := indexWithDate(prefix, dateLayout, ts)
firstIndex := indexWithDate(s.dependencyIndexPrefix, s.indexDateLayout, ts.Add(-lookback))
currentIndex := indexWithDate(s.dependencyIndexPrefix, s.indexDateLayout, ts)
for currentIndex != firstIndex {
indices = append(indices, currentIndex)
ts = ts.Add(-24 * time.Hour)
currentIndex = indexWithDate(prefix, dateLayout, ts)
currentIndex = indexWithDate(s.dependencyIndexPrefix, s.indexDateLayout, ts)
}
return append(indices, firstIndex)
}

func indexWithDate(indexNamePrefix, indexDateLayout string, date time.Time) string {
return indexNamePrefix + date.UTC().Format(indexDateLayout)
}

func (s *DependencyStore) getWriteIndex(ts time.Time) string {
if s.useReadWriteAliases {
return s.dependencyIndexPrefix + "write"
}
return indexWithDate(s.dependencyIndexPrefix, s.indexDateLayout, ts)
}
83 changes: 68 additions & 15 deletions plugin/storage/es/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ func withDepStorage(indexPrefix, indexDateLayout string, maxDocCount int, fn fun
client: client,
logger: logger,
logBuffer: logBuffer,
storage: NewDependencyStore(client, logger, indexPrefix, indexDateLayout, maxDocCount),
storage: NewDependencyStore(DependencyStoreParams{
Client: client,
Logger: logger,
IndexPrefix: indexPrefix,
IndexDateLayout: indexDateLayout,
MaxDocCount: maxDocCount,
}),
}
fn(r)
}
Expand All @@ -69,8 +75,15 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) {
}
for _, testCase := range testCases {
client := &mocks.Client{}
r := NewDependencyStore(client, zap.NewNop(), testCase.prefix, "2006-01-02", defaultMaxDocCount)
assert.Equal(t, testCase.expected+dependencyIndex, r.indexPrefix)
r := NewDependencyStore(DependencyStoreParams{
Client: client,
Logger: zap.NewNop(),
IndexPrefix: testCase.prefix,
IndexDateLayout: "2006-01-02",
MaxDocCount: defaultMaxDocCount,
})

assert.Equal(t, testCase.expected+dependencyIndex, r.dependencyIndexPrefix)
}
}

Expand Down Expand Up @@ -200,36 +213,76 @@ func createSearchResult(dependencyLink string) *elastic.SearchResult {
return searchResult
}

func TestGetIndices(t *testing.T) {
func TestGetReadIndices(t *testing.T) {
fixedTime := time.Date(1995, time.April, 21, 4, 12, 19, 95, time.UTC)
testCases := []struct {
expected []string
indices []string
lookback time.Duration
prefix string
params DependencyStoreParams
}{
{
expected: []string{indexWithDate("", "2006-01-02", fixedTime), indexWithDate("", "2006-01-02", fixedTime.Add(-24*time.Hour))},
params: DependencyStoreParams{IndexPrefix: "", IndexDateLayout: "2006-01-02", UseReadWriteAliases: true},
lookback: 23 * time.Hour,
indices: []string{
dependencyIndex + "read",
},
},
{
params: DependencyStoreParams{IndexPrefix: "", IndexDateLayout: "2006-01-02"},
lookback: 23 * time.Hour,
prefix: "",
indices: []string{
dependencyIndex + fixedTime.Format("2006-01-02"),
dependencyIndex + fixedTime.Add(-23*time.Hour).Format("2006-01-02"),
},
},
{
expected: []string{indexWithDate("", "2006-01-02", fixedTime), indexWithDate("", "2006-01-02", fixedTime.Add(-24*time.Hour))},
params: DependencyStoreParams{IndexPrefix: "", IndexDateLayout: "2006-01-02"},
lookback: 13 * time.Hour,
prefix: "",
indices: []string{
dependencyIndex + fixedTime.UTC().Format("2006-01-02"),
dependencyIndex + fixedTime.Add(-13*time.Hour).Format("2006-01-02"),
},
},
{
expected: []string{indexWithDate("foo:", "2006-01-02", fixedTime)},
params: DependencyStoreParams{IndexPrefix: "foo:", IndexDateLayout: "2006-01-02"},
lookback: 1 * time.Hour,
prefix: "foo:",
indices: []string{
"foo:" + indexPrefixSeparator + dependencyIndex + fixedTime.Format("2006-01-02"),
},
},
{
expected: []string{indexWithDate("foo-", "2006-01-02", fixedTime)},
params: DependencyStoreParams{IndexPrefix: "foo-", IndexDateLayout: "2006-01-02"},
lookback: 0,
prefix: "foo-",
indices: []string{
"foo-" + indexPrefixSeparator + dependencyIndex + fixedTime.Format("2006-01-02"),
},
},
}
for _, testCase := range testCases {
s := NewDependencyStore(testCase.params)
assert.EqualValues(t, testCase.indices, s.getReadIndices(fixedTime, testCase.lookback))
}
}

func TestGetWriteIndex(t *testing.T) {
fixedTime := time.Date(1995, time.April, 21, 4, 12, 19, 95, time.UTC)
testCases := []struct {
writeIndex string
lookback time.Duration
params DependencyStoreParams
}{
{
params: DependencyStoreParams{IndexPrefix: "", IndexDateLayout: "2006-01-02", UseReadWriteAliases: true},
writeIndex: dependencyIndex + "write",
},
{
params: DependencyStoreParams{IndexPrefix: "", IndexDateLayout: "2006-01-02", UseReadWriteAliases: false},
writeIndex: dependencyIndex + fixedTime.Format("2006-01-02"),
},
}
for _, testCase := range testCases {
assert.EqualValues(t, testCase.expected, getIndices(testCase.prefix, "2006-01-02", fixedTime, testCase.lookback))
s := NewDependencyStore(testCase.params)
assert.EqualValues(t, testCase.writeIndex, s.getWriteIndex(fixedTime))
}
}

Expand Down
20 changes: 17 additions & 3 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,7 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
reader := esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix(),
f.primaryConfig.GetIndexDateLayoutDependencies(), f.primaryConfig.GetMaxDocCount())
return reader, nil
return createDependencyReader(f.logger, f.primaryClient, f.primaryConfig)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we can just return esDepStore.NewDependencyStore(...).

I can see the intention here is to maintain consistency, though I think CreateSpanReader/Writer extract the function for reuse with its Archive equivalent. The DependencyStore doesn't support archiving AFAIK, so there doesn't seem to be much benefit of extracting the function for the cost of the additional indirection, unless you had other reasons?

What do you think?

Copy link
Contributor Author

@frittentheke frittentheke Feb 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I understand your point and while I could argue there might arise the need to add archiving, I actually just would favor the consistency. As for performance, the compiler will optimize the additional call away and it's not like this is a hot path by any means.

In short, if you insist I will certainly change this, but I believe the code is just more approachable if similar things, as in creating readers for three types of stores, are not handled any differently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave this as is, I'm not strongly attached to my suggestion. 😄

}

// CreateArchiveSpanReader implements storage.ArchiveFactory
Expand Down Expand Up @@ -210,6 +208,22 @@ func createSpanWriter(
return writer, nil
}

func createDependencyReader(
logger *zap.Logger,
client es.Client,
cfg config.ClientBuilder,
) (dependencystore.Reader, error) {

reader := esDepStore.NewDependencyStore(esDepStore.DependencyStoreParams{
Client: client,
Logger: logger,
IndexPrefix: cfg.GetIndexPrefix(),
MaxDocCount: cfg.GetMaxDocCount(),
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
})
return reader, nil
}

var _ io.Closer = (*Factory)(nil)

// Close closes the resources held by the factory
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
{
"index_patterns": "*jaeger-dependencies-*",
"aliases": {
"test-jaeger-dependencies-read" : {}
},
"settings":{
"index.number_of_shards": 3,
"index.number_of_replicas": 3,
"index.mapping.nested_fields.limit":50,
"index.requests.cache.enable":true
,"lifecycle": {
"name": "jaeger-test-policy",
"rollover_alias": "test-jaeger-dependencies-write"
}
},
"mappings":{}
}
11 changes: 11 additions & 0 deletions plugin/storage/es/mappings/jaeger-dependencies-7.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
{
"index_patterns": "*jaeger-dependencies-*",
{{- if .UseILM }}
"aliases": {
"{{ .IndexPrefix }}jaeger-dependencies-read" : {}
},
{{- end }}
"settings":{
"index.number_of_shards": {{ .Shards }},
"index.number_of_replicas": {{ .Replicas }},
"index.mapping.nested_fields.limit":50,
"index.requests.cache.enable":true
{{- if .UseILM }}
,"lifecycle": {
"name": "{{ .ILMPolicyName }}",
"rollover_alias": "{{ .IndexPrefix }}jaeger-dependencies-write"
}
{{- end }}
},
"mappings":{}
}
Loading