Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fluent-bit: multi-instance support #1294

Merged
merged 1 commit into from
Jan 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/fluent-bit/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ To configure the Loki output plugin add this section to fluent-bit.conf
```
A full [example configuration file](fluent-bit.conf) is also available in this repository.

### Running multiple plugin instances

You can run multiple plugin instances in the same fluent-bit process, for example if you want to push to different Loki servers or route logs into different Loki tenant IDs. To do so, add additional `[Output]` sections.

## Building

## Prerequisites
Expand Down
68 changes: 44 additions & 24 deletions cmd/fluent-bit/out_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ import (
"github.com/weaveworks/common/logging"
)

var plugin *loki
var logger log.Logger
var (
// registered loki plugin instances, required for disposal during shutdown
plugins []*loki
logger log.Logger
)

func init() {
var logLevel logging.Level
Expand All @@ -40,38 +43,53 @@ func FLBPluginRegister(ctx unsafe.Pointer) int {
// (fluentbit will call this)
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {

conf, err := parseConfig(&pluginConfig{ctx: ctx})
if err != nil {
level.Error(logger).Log("[flb-go]", "failed to launch", "error", err)
return output.FLB_ERROR
}
logger = newLogger(conf.logLevel)

// numeric plugin ID, only used for user-facing purpose (logging, ...)
id := len(plugins)
logger := log.With(newLogger(conf.logLevel), "id", id)

level.Info(logger).Log("[flb-go]", "Starting fluent-bit-go-loki", "version", version.Info())
level.Info(logger).Log("[flb-go]", "provided parameter", "URL", conf.clientConfig.URL)
level.Info(logger).Log("[flb-go]", "provided parameter", "TenantID", conf.clientConfig.TenantID)
level.Info(logger).Log("[flb-go]", "provided parameter", "BatchWait", conf.clientConfig.BatchWait)
level.Info(logger).Log("[flb-go]", "provided parameter", "BatchSize", conf.clientConfig.BatchSize)
level.Info(logger).Log("[flb-go]", "provided parameter", "Labels", conf.clientConfig.ExternalLabels)
level.Info(logger).Log("[flb-go]", "provided parameter", "LogLevel", conf.logLevel)
level.Info(logger).Log("[flb-go]", "provided parameter", "AutoKubernetesLabels", conf.autoKubernetesLabels)
level.Info(logger).Log("[flb-go]", "provided parameter", "RemoveKeys", fmt.Sprintf("%+v", conf.removeKeys))
level.Info(logger).Log("[flb-go]", "provided parameter", "LabelKeys", fmt.Sprintf("%+v", conf.labelKeys))
level.Info(logger).Log("[flb-go]", "provided parameter", "LineFormat", conf.lineFormat)
level.Info(logger).Log("[flb-go]", "provided parameter", "DropSingleKey", conf.dropSingleKey)
level.Info(logger).Log("[flb-go]", "provided parameter", "LabelMapPath", fmt.Sprintf("%+v", conf.labelMap))

plugin, err = newPlugin(conf, logger)
paramLogger := log.With(logger, "[flb-go]", "provided parameter")
level.Info(paramLogger).Log("URL", conf.clientConfig.URL)
level.Info(paramLogger).Log("TenantID", conf.clientConfig.TenantID)
level.Info(paramLogger).Log("BatchWait", conf.clientConfig.BatchWait)
level.Info(paramLogger).Log("BatchSize", conf.clientConfig.BatchSize)
level.Info(paramLogger).Log("Labels", conf.clientConfig.ExternalLabels)
level.Info(paramLogger).Log("LogLevel", conf.logLevel.String())
level.Info(paramLogger).Log("AutoKubernetesLabels", conf.autoKubernetesLabels)
level.Info(paramLogger).Log("RemoveKeys", fmt.Sprintf("%+v", conf.removeKeys))
level.Info(paramLogger).Log("LabelKeys", fmt.Sprintf("%+v", conf.labelKeys))
level.Info(paramLogger).Log("LineFormat", conf.lineFormat)
level.Info(paramLogger).Log("DropSingleKey", conf.dropSingleKey)
level.Info(paramLogger).Log("LabelMapPath", fmt.Sprintf("%+v", conf.labelMap))

plugin, err := newPlugin(conf, logger)
if err != nil {
level.Error(logger).Log("newPlugin", err)
return output.FLB_ERROR
}

// register plugin instance, to be retrievable when sending logs
output.FLBPluginSetContext(ctx, plugin)
// remember plugin instance, required to cleanly dispose when fluent-bit is shutting down
plugins = append(plugins, plugin)

return output.FLB_OK
}

//export FLBPluginFlush
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
//export FLBPluginFlushCtx
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, _ *C.char) int {
plugin := output.FLBPluginGetContext(ctx).(*loki)
if plugin == nil {
level.Error(logger).Log("[flb-go]", "plugin not initialized")
return output.FLB_ERROR
}

var ret int
var ts interface{}
var record map[interface{}]interface{}
Expand All @@ -92,13 +110,13 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
case uint64:
timestamp = time.Unix(int64(t), 0)
default:
level.Warn(logger).Log("msg", "timestamp isn't known format. Use current time.")
level.Warn(plugin.logger).Log("msg", "timestamp isn't known format. Use current time.")
timestamp = time.Now()
}

err := plugin.sendRecord(record, timestamp)
if err != nil {
level.Error(logger).Log("msg", "error sending record to Loki", "error", err)
level.Error(plugin.logger).Log("msg", "error sending record to Loki", "error", err)
return output.FLB_ERROR
}
}
Expand All @@ -113,8 +131,10 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {

//export FLBPluginExit
func FLBPluginExit() int {
if plugin.client != nil {
plugin.client.Stop()
for _, plugin := range plugins {
if plugin.client != nil {
plugin.client.Stop()
}
}
return output.FLB_OK
}
Expand Down