Skip to content

Commit

Permalink
Merge pull request #226 from jpinsonneau/334
Browse files Browse the repository at this point in the history
NETOBSERV-334 concurrent map iteration & write fix
  • Loading branch information
jpinsonneau committed Jun 13, 2022
2 parents 532816c + 2648b82 commit ddba882
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 6 deletions.
2 changes: 0 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"github.com/sirupsen/logrus"
)

type GenericMap map[string]interface{}

var (
Opt = Options{}
PipeLine []Stage
Expand Down
31 changes: 31 additions & 0 deletions pkg/config/generic_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (C) 2022 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package config

type GenericMap map[string]interface{}

// Copy will create a flat copy of GenericMap
func (m GenericMap) Copy() GenericMap {
result := GenericMap{}

for k, v := range m {
result[k] = v
}

return result
}
11 changes: 7 additions & 4 deletions pkg/pipeline/write/write_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,11 @@ func buildLokiConfig(c *api.WriteLoki) (loki.Config, error) {
}

func (l *Loki) ProcessRecord(record config.GenericMap) error {
// copy record before process to avoid alteration on parallel stages
recordCopy := record.Copy()

// Get timestamp from record (default: TimeFlowStart)
timestamp := l.extractTimestamp(record)
timestamp := l.extractTimestamp(recordCopy)

labels := model.LabelSet{}

Expand All @@ -119,15 +122,15 @@ func (l *Loki) ProcessRecord(record config.GenericMap) error {
labels[k] = v
}

l.addNonStaticLabels(record, labels)
l.addNonStaticLabels(recordCopy, labels)

// Remove labels and configured ignore list from record
ignoreList := append(l.apiConfig.IgnoreList, l.apiConfig.Labels...)
for _, label := range ignoreList {
delete(record, label)
delete(recordCopy, label)
}

js, err := jsonIter.ConfigCompatibleWithStandardLibrary.Marshal(record)
js, err := jsonIter.ConfigCompatibleWithStandardLibrary.Marshal(recordCopy)
if err != nil {
return err
}
Expand Down

0 comments on commit ddba882

Please sign in to comment.