Skip to content

Commit

Permalink
Support multi instances for fluent-bit go loki plugin
Browse files Browse the repository at this point in the history
In the previous implementation, this fluent-bit go loki plugin does not
support multi instances.

Because global variable which is `var plugin *loki` is used for plugin
instance management.
So, it is always overrided when loki plugin is used.

This implementation is based for fluent-plugin-go-s3 plugin multi
instances support code.

Signed-off-by: Hiroshi Hatake <cosmo0920.oucc@gmail.com>
  • Loading branch information
cosmo0920 committed Dec 28, 2019
1 parent dafb9d8 commit 78275a2
Showing 1 changed file with 46 additions and 14 deletions.
60 changes: 46 additions & 14 deletions cmd/fluent-bit/out_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/weaveworks/common/logging"
)

var plugin *loki
var plugins []*loki
var logger log.Logger

func init() {
Expand All @@ -36,18 +36,15 @@ func FLBPluginRegister(ctx unsafe.Pointer) int {
return output.FLBPluginRegister(ctx, "loki", "Ship fluent-bit logs to Grafana Loki")
}

//export FLBPluginInit
// (fluentbit will call this)
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {

func newLokiOutput(ctx unsafe.Pointer, lokiID int) (*loki, error) {
conf, err := parseConfig(&pluginConfig{ctx: ctx})
if err != nil {
level.Error(logger).Log("[flb-go]", "failed to launch", "error", err)
return output.FLB_ERROR
return nil, err
}
logger = newLogger(conf.logLevel)
level.Info(logger).Log("[flb-go]", "Starting fluent-bit-go-loki", "version", version.Info())
level.Info(logger).Log("[flb-go]", "instance ID", lokiID)
level.Info(logger).Log("[flb-go]", "provided parameter", "URL", conf.clientConfig.URL)
level.Info(logger).Log("[flb-go]", "provided parameter", "TenantID", conf.clientConfig.TenantID)
level.Info(logger).Log("[flb-go]", "provided parameter", "BatchWait", conf.clientConfig.BatchWait)
Expand All @@ -61,7 +58,34 @@ func FLBPluginInit(ctx unsafe.Pointer) int {
level.Info(logger).Log("[flb-go]", "provided parameter", "DropSingleKey", conf.dropSingleKey)
level.Info(logger).Log("[flb-go]", "provided parameter", "LabelMapPath", fmt.Sprintf("%+v", conf.labelMap))

plugin, err = newPlugin(conf, logger)
plugin, err := newPlugin(conf, logger)
return plugin, err
}

func addLokiOutput(ctx unsafe.Pointer) error {
lokiID := len(plugins)
// Set the context to point to any Go variable
output.FLBPluginSetContext(ctx, lokiID)
plugin, err := newLokiOutput(ctx, lokiID)
if err != nil {
return err
}

plugins = append(plugins, plugin)
return nil
}

func getLokiOutput(ctx unsafe.Pointer) (*loki, int) {
lokiID := output.FLBPluginGetContext(ctx).(int)
return plugins[lokiID], lokiID
}

//export FLBPluginInit
// (fluentbit will call this)
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {

err := addLokiOutput(ctx)
if err != nil {
level.Error(logger).Log("newPlugin", err)
return output.FLB_ERROR
Expand All @@ -70,8 +94,14 @@ func FLBPluginInit(ctx unsafe.Pointer) int {
return output.FLB_OK
}

//export FLBPluginFlush
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
//export FLBPluginFlushCtx
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int {
plugin, lokiID := getLokiOutput(ctx)
if plugin == nil {
level.Error(logger).Log("[flb-go]", "plugin not initialized", "lokiID", lokiID)
return output.FLB_ERROR
}

var ret int
var ts interface{}
var record map[interface{}]interface{}
Expand All @@ -92,13 +122,13 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
case uint64:
timestamp = time.Unix(int64(t), 0)
default:
level.Warn(logger).Log("msg", "timestamp isn't known format. Use current time.")
level.Warn(plugin.logger).Log("msg", "timestamp isn't known format. Use current time.")
timestamp = time.Now()
}

err := plugin.sendRecord(record, timestamp)
if err != nil {
level.Error(logger).Log("msg", "error sending record to Loki", "error", err)
level.Error(plugin.logger).Log("msg", "error sending record to Loki", "error", err)
return output.FLB_ERROR
}
}
Expand All @@ -113,8 +143,10 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {

//export FLBPluginExit
func FLBPluginExit() int {
if plugin.client != nil {
plugin.client.Stop()
for _, plugin := range plugins {
if plugin.client != nil {
plugin.client.Stop()
}
}
return output.FLB_OK
}
Expand Down

0 comments on commit 78275a2

Please sign in to comment.