diff --git a/cmd/fluent-bit/README.md b/cmd/fluent-bit/README.md index 873c5e375394..b2891d9a678a 100644 --- a/cmd/fluent-bit/README.md +++ b/cmd/fluent-bit/README.md @@ -106,6 +106,10 @@ To configure the Loki output plugin add this section to fluent-bit.conf ``` A full [example configuration file](fluent-bit.conf) is also available in this repository. +### Running multiple plugin instances + +You can run multiple plugin instances in the same fluent-bit process, for example if you want to push to different Loki servers or route logs into different Loki tenant IDs. To do so, add additional `[Output]` sections. + ## Building ## Prerequisites diff --git a/cmd/fluent-bit/out_loki.go b/cmd/fluent-bit/out_loki.go index 3d2cd09d680f..1a1aac65e7b9 100644 --- a/cmd/fluent-bit/out_loki.go +++ b/cmd/fluent-bit/out_loki.go @@ -14,8 +14,11 @@ import ( "github.com/weaveworks/common/logging" ) -var plugin *loki -var logger log.Logger +var ( + // registered loki plugin instances, required for disposal during shutdown + plugins []*loki + logger log.Logger +) func init() { var logLevel logging.Level @@ -40,38 +43,53 @@ func FLBPluginRegister(ctx unsafe.Pointer) int { // (fluentbit will call this) // ctx (context) pointer to fluentbit context (state/ c code) func FLBPluginInit(ctx unsafe.Pointer) int { - conf, err := parseConfig(&pluginConfig{ctx: ctx}) if err != nil { level.Error(logger).Log("[flb-go]", "failed to launch", "error", err) return output.FLB_ERROR } - logger = newLogger(conf.logLevel) + + // numeric plugin ID, only used for user-facing purpose (logging, ...) + id := len(plugins) + logger := log.With(newLogger(conf.logLevel), "id", id) + level.Info(logger).Log("[flb-go]", "Starting fluent-bit-go-loki", "version", version.Info()) - level.Info(logger).Log("[flb-go]", "provided parameter", "URL", conf.clientConfig.URL) - level.Info(logger).Log("[flb-go]", "provided parameter", "TenantID", conf.clientConfig.TenantID) - level.Info(logger).Log("[flb-go]", "provided parameter", "BatchWait", conf.clientConfig.BatchWait) - level.Info(logger).Log("[flb-go]", "provided parameter", "BatchSize", conf.clientConfig.BatchSize) - level.Info(logger).Log("[flb-go]", "provided parameter", "Labels", conf.clientConfig.ExternalLabels) - level.Info(logger).Log("[flb-go]", "provided parameter", "LogLevel", conf.logLevel) - level.Info(logger).Log("[flb-go]", "provided parameter", "AutoKubernetesLabels", conf.autoKubernetesLabels) - level.Info(logger).Log("[flb-go]", "provided parameter", "RemoveKeys", fmt.Sprintf("%+v", conf.removeKeys)) - level.Info(logger).Log("[flb-go]", "provided parameter", "LabelKeys", fmt.Sprintf("%+v", conf.labelKeys)) - level.Info(logger).Log("[flb-go]", "provided parameter", "LineFormat", conf.lineFormat) - level.Info(logger).Log("[flb-go]", "provided parameter", "DropSingleKey", conf.dropSingleKey) - level.Info(logger).Log("[flb-go]", "provided parameter", "LabelMapPath", fmt.Sprintf("%+v", conf.labelMap)) - - plugin, err = newPlugin(conf, logger) + paramLogger := log.With(logger, "[flb-go]", "provided parameter") + level.Info(paramLogger).Log("URL", conf.clientConfig.URL) + level.Info(paramLogger).Log("TenantID", conf.clientConfig.TenantID) + level.Info(paramLogger).Log("BatchWait", conf.clientConfig.BatchWait) + level.Info(paramLogger).Log("BatchSize", conf.clientConfig.BatchSize) + level.Info(paramLogger).Log("Labels", conf.clientConfig.ExternalLabels) + level.Info(paramLogger).Log("LogLevel", conf.logLevel.String()) + level.Info(paramLogger).Log("AutoKubernetesLabels", conf.autoKubernetesLabels) + level.Info(paramLogger).Log("RemoveKeys", fmt.Sprintf("%+v", conf.removeKeys)) + level.Info(paramLogger).Log("LabelKeys", fmt.Sprintf("%+v", conf.labelKeys)) + level.Info(paramLogger).Log("LineFormat", conf.lineFormat) + level.Info(paramLogger).Log("DropSingleKey", conf.dropSingleKey) + level.Info(paramLogger).Log("LabelMapPath", fmt.Sprintf("%+v", conf.labelMap)) + + plugin, err := newPlugin(conf, logger) if err != nil { level.Error(logger).Log("newPlugin", err) return output.FLB_ERROR } + // register plugin instance, to be retrievable when sending logs + output.FLBPluginSetContext(ctx, plugin) + // remember plugin instance, required to cleanly dispose when fluent-bit is shutting down + plugins = append(plugins, plugin) + return output.FLB_OK } -//export FLBPluginFlush -func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { +//export FLBPluginFlushCtx +func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, _ *C.char) int { + plugin := output.FLBPluginGetContext(ctx).(*loki) + if plugin == nil { + level.Error(logger).Log("[flb-go]", "plugin not initialized") + return output.FLB_ERROR + } + var ret int var ts interface{} var record map[interface{}]interface{} @@ -92,13 +110,13 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { case uint64: timestamp = time.Unix(int64(t), 0) default: - level.Warn(logger).Log("msg", "timestamp isn't known format. Use current time.") + level.Warn(plugin.logger).Log("msg", "timestamp isn't known format. Use current time.") timestamp = time.Now() } err := plugin.sendRecord(record, timestamp) if err != nil { - level.Error(logger).Log("msg", "error sending record to Loki", "error", err) + level.Error(plugin.logger).Log("msg", "error sending record to Loki", "error", err) return output.FLB_ERROR } } @@ -113,8 +131,10 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { //export FLBPluginExit func FLBPluginExit() int { - if plugin.client != nil { - plugin.client.Stop() + for _, plugin := range plugins { + if plugin.client != nil { + plugin.client.Stop() + } } return output.FLB_OK }