Skip to content

Commit

Permalink
fluent-bit: multi-instance support
Browse files Browse the repository at this point in the history
To run multiple plugin instances at the same time, the plugin instance
must be registered and later retrieved.

Additionally, a list of registered plugin instances must be stored for
proper disposal during fluent-bit shutdown.

Based on the [out_multiinstance example] in the upstream repository.

[out_multiinstance example]: https://github.com/fluent/fluent-bit-go/blob/fc386d263885e50387dd0081a77adf4072e8e4b6/examples/out_multiinstance/out.go

Signed-off-by: Jens Erat <email@jenserat.de>
  • Loading branch information
JensErat committed Dec 27, 2019
1 parent dafb9d8 commit 8e5abdd
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 24 deletions.
4 changes: 4 additions & 0 deletions cmd/fluent-bit/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ To configure the Loki output plugin add this section to fluent-bit.conf
```
A full [example configuration file](fluent-bit.conf) is also available in this repository.

### Running multiple plugin instances

You can run multiple plugin instances in the same fluent-bit process, for example if you want to push to different Loki servers or route logs into different Loki tenant IDs. To do so, add additional `[Output]` sections.

## Building

## Prerequisites
Expand Down
68 changes: 44 additions & 24 deletions cmd/fluent-bit/out_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ import (
"github.com/weaveworks/common/logging"
)

var plugin *loki
var logger log.Logger
var (
// registered loki plugin instances, required for disposal during shutdown
plugins []*loki
logger log.Logger
)

func init() {
var logLevel logging.Level
Expand All @@ -40,38 +43,53 @@ func FLBPluginRegister(ctx unsafe.Pointer) int {
// (fluentbit will call this)
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {

conf, err := parseConfig(&pluginConfig{ctx: ctx})
if err != nil {
level.Error(logger).Log("[flb-go]", "failed to launch", "error", err)
return output.FLB_ERROR
}
logger = newLogger(conf.logLevel)

// numeric plugin ID, only used for user-facing purpose (logging, ...)
id := len(plugins)
logger := log.With(newLogger(conf.logLevel), "id", id)

level.Info(logger).Log("[flb-go]", "Starting fluent-bit-go-loki", "version", version.Info())
level.Info(logger).Log("[flb-go]", "provided parameter", "URL", conf.clientConfig.URL)
level.Info(logger).Log("[flb-go]", "provided parameter", "TenantID", conf.clientConfig.TenantID)
level.Info(logger).Log("[flb-go]", "provided parameter", "BatchWait", conf.clientConfig.BatchWait)
level.Info(logger).Log("[flb-go]", "provided parameter", "BatchSize", conf.clientConfig.BatchSize)
level.Info(logger).Log("[flb-go]", "provided parameter", "Labels", conf.clientConfig.ExternalLabels)
level.Info(logger).Log("[flb-go]", "provided parameter", "LogLevel", conf.logLevel)
level.Info(logger).Log("[flb-go]", "provided parameter", "AutoKubernetesLabels", conf.autoKubernetesLabels)
level.Info(logger).Log("[flb-go]", "provided parameter", "RemoveKeys", fmt.Sprintf("%+v", conf.removeKeys))
level.Info(logger).Log("[flb-go]", "provided parameter", "LabelKeys", fmt.Sprintf("%+v", conf.labelKeys))
level.Info(logger).Log("[flb-go]", "provided parameter", "LineFormat", conf.lineFormat)
level.Info(logger).Log("[flb-go]", "provided parameter", "DropSingleKey", conf.dropSingleKey)
level.Info(logger).Log("[flb-go]", "provided parameter", "LabelMapPath", fmt.Sprintf("%+v", conf.labelMap))

plugin, err = newPlugin(conf, logger)
paramLogger := log.With(logger, "[flb-go]", "provided parameter")
level.Info(paramLogger).Log("URL", conf.clientConfig.URL)
level.Info(paramLogger).Log("TenantID", conf.clientConfig.TenantID)
level.Info(paramLogger).Log("BatchWait", conf.clientConfig.BatchWait)
level.Info(paramLogger).Log("BatchSize", conf.clientConfig.BatchSize)
level.Info(paramLogger).Log("Labels", conf.clientConfig.ExternalLabels)
level.Info(paramLogger).Log("LogLevel", conf.logLevel.String())
level.Info(paramLogger).Log("AutoKubernetesLabels", conf.autoKubernetesLabels)
level.Info(paramLogger).Log("RemoveKeys", fmt.Sprintf("%+v", conf.removeKeys))
level.Info(paramLogger).Log("LabelKeys", fmt.Sprintf("%+v", conf.labelKeys))
level.Info(paramLogger).Log("LineFormat", conf.lineFormat)
level.Info(paramLogger).Log("DropSingleKey", conf.dropSingleKey)
level.Info(paramLogger).Log("LabelMapPath", fmt.Sprintf("%+v", conf.labelMap))

plugin, err := newPlugin(conf, logger)
if err != nil {
level.Error(logger).Log("newPlugin", err)
return output.FLB_ERROR
}

// register plugin instance, to be retrievable when sending logs
output.FLBPluginSetContext(ctx, plugin)
// remember plugin instance, required to cleanly dispose when fluent-bit is shutting down
plugins = append(plugins, plugin)

return output.FLB_OK
}

//export FLBPluginFlush
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
//export FLBPluginFlushCtx
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, _ *C.char) int {
plugin := output.FLBPluginGetContext(ctx).(*loki)
if plugin == nil {
level.Error(logger).Log("[flb-go]", "plugin not initialized")
return output.FLB_ERROR
}

var ret int
var ts interface{}
var record map[interface{}]interface{}
Expand All @@ -92,13 +110,13 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
case uint64:
timestamp = time.Unix(int64(t), 0)
default:
level.Warn(logger).Log("msg", "timestamp isn't known format. Use current time.")
level.Warn(plugin.logger).Log("msg", "timestamp isn't known format. Use current time.")
timestamp = time.Now()
}

err := plugin.sendRecord(record, timestamp)
if err != nil {
level.Error(logger).Log("msg", "error sending record to Loki", "error", err)
level.Error(plugin.logger).Log("msg", "error sending record to Loki", "error", err)
return output.FLB_ERROR
}
}
Expand All @@ -113,8 +131,10 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {

//export FLBPluginExit
func FLBPluginExit() int {
if plugin.client != nil {
plugin.client.Stop()
for _, plugin := range plugins {
if plugin.client != nil {
plugin.client.Stop()
}
}
return output.FLB_OK
}
Expand Down

0 comments on commit 8e5abdd

Please sign in to comment.