Skip to content

Commit

Permalink
x-pack/filebeat/input/cel: add ability to remove request trace logs (#…
Browse files Browse the repository at this point in the history
…39969)

The previous configuration system did not allow users to remove trace
logs from agents after they are no longer needed. This is potential
security risk as it leaves potentially sensitive information on the file
system beyond its required lifetime. The mechanism for communicating to
the input whether to write logs is not currently powerful enough to
indicate that existing logs should be removed without deleting logs from
other instances. So add an enabled configuration option to allow the
target name to be sent independently of whether the files should be
written or removed.

The new option is optional, defaulting to the previous behaviour so
that it can be opted into via progressive repair in the client
integrations.
  • Loading branch information
efd6 committed Jun 21, 2024
1 parent b0dcd95 commit 8b89f79
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add request trace support for Okta and EntraID entity analytics providers. {pull}39821[39821]
- Fix handling of infinite rate values in CEL rate limit handling logic. {pull}39940[39940]
- Allow elision of set and append failure logging. {issue}34544[34544] {pull}39929[39929]
- Add ability to remove request trace logs from CEL input. {pull}39969[39969]

*Auditbeat*

Expand Down
18 changes: 12 additions & 6 deletions x-pack/filebeat/docs/inputs/input-cel.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -675,17 +675,23 @@ The value of the response that specifies the maximum overall resource request ra
The maximum burst size. Burst is the maximum number of resource requests that can be made above the overall rate limit.

[float]
==== `resource.tracer.filename`
==== `resource.tracer.enable`

It is possible to log HTTP requests and responses in a CEL program to a local file-system for debugging configurations.
This option is enabled by setting the `resource.tracer.filename` value. Additional options are available to
tune log rotation behavior.

To differentiate the trace files generated from different input instances, a placeholder `*` can be added to the filename and will be replaced with the input instance id.
For Example, `http-request-trace-*.ndjson`.
This option is enabled by setting `resource.tracer.enabled` to true and setting the `resource.tracer.filename` value.
Additional options are available to tune log rotation behavior. To delete existing logs, set `resource.tracer.enabled`
to false without unsetting the filename option.

Enabling this option compromises security and should only be used for debugging.

[float]
==== `resource.tracer.filename`

To differentiate the trace files generated from different input instances, a placeholder `*` can be added to the
filename and will be replaced with the input instance id. For Example, `http-request-trace-*.ndjson`. Setting
`resource.tracer.filename` with `resource.tracer.enable` set to false will cause any existing trace logs matching
the filename option to be deleted.

[float]
==== `resource.tracer.maxsize`

Expand Down
11 changes: 10 additions & 1 deletion x-pack/filebeat/input/cel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,16 @@ type ResourceConfig struct {

Transport httpcommon.HTTPTransportSettings `config:",inline"`

Tracer *lumberjack.Logger `config:"tracer"`
Tracer *tracerConfig `config:"tracer"`
}

type tracerConfig struct {
Enabled *bool `config:"enabled"`
lumberjack.Logger `config:",inline"`
}

func (t *tracerConfig) enabled() bool {
return t != nil && (t.Enabled == nil || *t.Enabled)
}

type urlConfig struct {
Expand Down
28 changes: 27 additions & 1 deletion x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"reflect"
"regexp"
Expand Down Expand Up @@ -717,6 +719,11 @@ func getLimit(which string, rateLimit map[string]interface{}, log *logp.Logger)
return limit, true
}

// lumberjackTimestamp is a glob expression matching the time format string used
// by lumberjack when rolling over logs, "2006-01-02T15-04-05.000".
// https://github.com/natefinch/lumberjack/blob/4cb27fcfbb0f35cb48c542c5ea80b7c1d18933d0/lumberjack.go#L39
const lumberjackTimestamp = "[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]T[0-9][0-9]-[0-9][0-9]-[0-9][0-9].[0-9][0-9][0-9]"

func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitoring.Registry) (*http.Client, *httplog.LoggingRoundTripper, error) {
if !wantClient(cfg) {
return nil, nil, nil
Expand All @@ -740,7 +747,7 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitorin
}

var trace *httplog.LoggingRoundTripper
if cfg.Resource.Tracer != nil {
if cfg.Resource.Tracer.enabled() {
w := zapcore.AddSync(cfg.Resource.Tracer)
go func() {
// Close the logger when we are done.
Expand All @@ -758,6 +765,25 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitorin
maxSize := cfg.Resource.Tracer.MaxSize * 1e6
trace = httplog.NewLoggingRoundTripper(c.Transport, traceLogger, max(0, maxSize-margin), log)
c.Transport = trace
} else if cfg.Resource.Tracer != nil {
// We have a trace log name, but we are not enabled,
// so remove all trace logs we own.
err = os.Remove(cfg.Resource.Tracer.Filename)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
log.Errorw("failed to remove request trace log", "path", cfg.Resource.Tracer.Filename, "error", err)
}
ext := filepath.Ext(cfg.Resource.Tracer.Filename)
base := strings.TrimSuffix(cfg.Resource.Tracer.Filename, ext)
paths, err := filepath.Glob(base + "-" + lumberjackTimestamp + ext)
if err != nil {
log.Errorw("failed to collect request trace log path names", "error", err)
}
for _, p := range paths {
err = os.Remove(p)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
log.Errorw("failed to remove request trace log", "path", p, "error", err)
}
}
}

if reg != nil {
Expand Down
106 changes: 106 additions & 0 deletions x-pack/filebeat/input/cel/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var inputTests = []struct {
wantCursor []map[string]interface{}
wantErr error
wantFile string
wantNoFile string
}{
// Autonomous tests (no FS or net dependency).
{
Expand Down Expand Up @@ -1141,6 +1142,102 @@ var inputTests = []struct {
},
wantFile: filepath.Join("logs", "http-request-trace-test_id_tracer_filename_sanitization.ndjson"),
},
{
name: "tracer_filename_sanitization_enabled",
server: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
server := httptest.NewServer(h)
config["resource.url"] = server.URL
t.Cleanup(server.Close)
},
config: map[string]interface{}{
"interval": 1,
"resource.tracer.enabled": true,
"resource.tracer.filename": "logs/http-request-trace-*.ndjson",
"state": map[string]interface{}{
"fake_now": "2002-10-02T15:00:00Z",
},
"program": `
// Use terse non-standard check for presence of timestamp. The standard
// alternative is to use has(state.cursor) && has(state.cursor.timestamp).
(!is_error(state.cursor.timestamp) ?
state.cursor.timestamp
:
timestamp(state.fake_now)-duration('10m')
).as(time_cursor,
string(state.url).parse_url().with_replace({
"RawQuery": {"$filter": ["alertCreationTime ge "+string(time_cursor)]}.format_query()
}).format_url().as(url, bytes(get(url).Body)).decode_json().as(event, {
"events": [event],
// Get the timestamp from the event if it exists, otherwise advance a little to break a request loop.
// Due to the name of the @timestamp field, we can't use has(), so use is_error().
"cursor": [{"timestamp": !is_error(event["@timestamp"]) ? event["@timestamp"] : time_cursor+duration('1s')}],
// Just for testing, cycle this back into the next state.
"fake_now": state.fake_now
}))
`,
},
handler: dateCursorHandler(),
want: []map[string]interface{}{
{"@timestamp": "2002-10-02T15:00:00Z", "foo": "bar"},
{"@timestamp": "2002-10-02T15:00:01Z", "foo": "bar"},
{"@timestamp": "2002-10-02T15:00:02Z", "foo": "bar"},
},
wantCursor: []map[string]interface{}{
{"timestamp": "2002-10-02T15:00:00Z"},
{"timestamp": "2002-10-02T15:00:01Z"},
{"timestamp": "2002-10-02T15:00:02Z"},
},
wantFile: filepath.Join("logs", "http-request-trace-test_id_tracer_filename_sanitization_enabled.ndjson"),
},
{
name: "tracer_filename_sanitization_disabled",
server: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
server := httptest.NewServer(h)
config["resource.url"] = server.URL
t.Cleanup(server.Close)
},
config: map[string]interface{}{
"interval": 1,
"resource.tracer.enabled": false,
"resource.tracer.filename": "logs/http-request-trace-*.ndjson",
"state": map[string]interface{}{
"fake_now": "2002-10-02T15:00:00Z",
},
"program": `
// Use terse non-standard check for presence of timestamp. The standard
// alternative is to use has(state.cursor) && has(state.cursor.timestamp).
(!is_error(state.cursor.timestamp) ?
state.cursor.timestamp
:
timestamp(state.fake_now)-duration('10m')
).as(time_cursor,
string(state.url).parse_url().with_replace({
"RawQuery": {"$filter": ["alertCreationTime ge "+string(time_cursor)]}.format_query()
}).format_url().as(url, bytes(get(url).Body)).decode_json().as(event, {
"events": [event],
// Get the timestamp from the event if it exists, otherwise advance a little to break a request loop.
// Due to the name of the @timestamp field, we can't use has(), so use is_error().
"cursor": [{"timestamp": !is_error(event["@timestamp"]) ? event["@timestamp"] : time_cursor+duration('1s')}],
// Just for testing, cycle this back into the next state.
"fake_now": state.fake_now
}))
`,
},
handler: dateCursorHandler(),
want: []map[string]interface{}{
{"@timestamp": "2002-10-02T15:00:00Z", "foo": "bar"},
{"@timestamp": "2002-10-02T15:00:01Z", "foo": "bar"},
{"@timestamp": "2002-10-02T15:00:02Z", "foo": "bar"},
},
wantCursor: []map[string]interface{}{
{"timestamp": "2002-10-02T15:00:00Z"},
{"timestamp": "2002-10-02T15:00:01Z"},
{"timestamp": "2002-10-02T15:00:02Z"},
},
wantNoFile: filepath.Join("logs", "http-request-trace-test_id_tracer_filename_sanitization_disabled*"),
},
{
name: "pagination_cursor_object",
server: newTestServer(httptest.NewServer),
Expand Down Expand Up @@ -1625,6 +1722,15 @@ func TestInput(t *testing.T) {
t.Errorf("expected log file not found: %v", err)
}
}
if test.wantNoFile != "" {
paths, err := filepath.Glob(filepath.Join(tempDir, test.wantNoFile))
if err != nil {
t.Fatalf("unexpected error calling filepath.Glob(%q): %v", test.wantNoFile, err)
}
if len(paths) != 0 {
t.Errorf("unexpected files found: %v", paths)
}
}
})
}
}
Expand Down

0 comments on commit 8b89f79

Please sign in to comment.