From 24669d6ad2a9d2f6f4fb2a2cb2a7f574ac1105fa Mon Sep 17 00:00:00 2001 From: Michael Wolf Date: Fri, 5 Apr 2024 09:51:53 -0700 Subject: [PATCH] Auditbeat Processor to enrich auditd events with session view information (#37640) This adds a add_session_metadata processor to auditbeat which will enrich auditd process events with information needed to enable the Kibana session viewer on the events. In this implementation, eBPF is used to collect information on all processes running on the system, which are added to a process database. When a process event is run through the processor, the DB will be read to retrieve information on the processes related to the process in the event (the processes's parent, session leader,process group leader and entry leader will be retrieved). Then the event will be enriched with the metadata on these related processes, which can enable use of session view on the data --- CHANGELOG.next.asciidoc | 3 + x-pack/auditbeat/cmd/root.go | 3 + .../sessionmd/add_session_metadata.go | 215 +++ .../sessionmd/add_session_metadata_test.go | 378 +++++ .../auditbeat/processors/sessionmd/config.go | 22 + x-pack/auditbeat/processors/sessionmd/doc.go | 7 + .../processors/sessionmd/processdb/db.go | 668 +++++++++ .../processors/sessionmd/processdb/db_test.go | 24 + .../sessionmd/processdb/entry_leader_test.go | 1244 +++++++++++++++++ .../processors/sessionmd/procfs/mock.go | 42 + .../processors/sessionmd/procfs/procfs.go | 252 ++++ .../provider/ebpf_provider/ebpf_provider.go | 157 +++ .../processors/sessionmd/provider/provider.go | 15 + .../processors/sessionmd/timeutils/time.go | 77 + .../sessionmd/timeutils/time_test.go | 24 + .../processors/sessionmd/types/events.go | 94 ++ .../processors/sessionmd/types/process.go | 456 ++++++ 17 files changed, 3681 insertions(+) create mode 100644 x-pack/auditbeat/processors/sessionmd/add_session_metadata.go create mode 100644 x-pack/auditbeat/processors/sessionmd/add_session_metadata_test.go create mode 100644 x-pack/auditbeat/processors/sessionmd/config.go create mode 100644 x-pack/auditbeat/processors/sessionmd/doc.go create mode 100644 x-pack/auditbeat/processors/sessionmd/processdb/db.go create mode 100644 x-pack/auditbeat/processors/sessionmd/processdb/db_test.go create mode 100644 x-pack/auditbeat/processors/sessionmd/processdb/entry_leader_test.go create mode 100644 x-pack/auditbeat/processors/sessionmd/procfs/mock.go create mode 100644 x-pack/auditbeat/processors/sessionmd/procfs/procfs.go create mode 100644 x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go create mode 100644 x-pack/auditbeat/processors/sessionmd/provider/provider.go create mode 100644 x-pack/auditbeat/processors/sessionmd/timeutils/time.go create mode 100644 x-pack/auditbeat/processors/sessionmd/timeutils/time_test.go create mode 100644 x-pack/auditbeat/processors/sessionmd/types/events.go create mode 100644 x-pack/auditbeat/processors/sessionmd/types/process.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f0ad6e6a5c11..af97c40c0ace 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -138,6 +138,9 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Auditbeat* +- Added `add_session_metadata` processor, which enables session viewer on Auditbeat data. {pull}37640[37640] +- Add linux capabilities to processes in the system/process. {pull}37453[37453] +- Add opt-in eBPF backend for file_integrity module. {pull}37223[37223] - Add linux capabilities to processes in the system/process. {pull}37453[37453] - Add opt-in eBPF backend for file_integrity module. {pull}37223[37223] - Add process data to file events (Linux only, eBPF backend). {pull}38199[38199] diff --git a/x-pack/auditbeat/cmd/root.go b/x-pack/auditbeat/cmd/root.go index 60382602060e..4a9b32b56f14 100644 --- a/x-pack/auditbeat/cmd/root.go +++ b/x-pack/auditbeat/cmd/root.go @@ -20,6 +20,9 @@ import ( // Register Auditbeat x-pack modules. _ "github.com/elastic/beats/v7/x-pack/auditbeat/include" _ "github.com/elastic/beats/v7/x-pack/libbeat/include" + + // Import processors + _ "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd" ) // Name of the beat diff --git a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go new file mode 100644 index 000000000000..50636f9d476c --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go @@ -0,0 +1,215 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build linux + +package sessionmd + +import ( + "context" + "fmt" + "reflect" + "strconv" + "time" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/processdb" + "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/procfs" + "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/provider" + "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider" + cfg "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +const ( + processorName = "add_session_metadata" + logName = "processor." + processorName +) + +func init() { + processors.RegisterPlugin(processorName, New) +} + +type addSessionMetadata struct { + config config + logger *logp.Logger + db *processdb.DB + provider provider.Provider +} + +func New(cfg *cfg.C) (beat.Processor, error) { + c := defaultConfig() + if err := cfg.Unpack(&c); err != nil { + return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err) + } + + logger := logp.NewLogger(logName) + + ctx := context.Background() + reader := procfs.NewProcfsReader(*logger) + db, err := processdb.NewDB(reader, *logger) + if err != nil { + return nil, fmt.Errorf("failed to create DB: %w", err) + } + + backfilledPIDs := db.ScrapeProcfs() + logger.Debugf("backfilled %d processes", len(backfilledPIDs)) + + switch c.Backend { + case "auto": + // "auto" always uses ebpf, as it's currently the only backend + fallthrough + case "ebpf": + p, err := ebpf_provider.NewProvider(ctx, logger, db) + if err != nil { + return nil, fmt.Errorf("failed to create ebpf provider: %w", err) + } + return &addSessionMetadata{ + config: c, + logger: logger, + db: db, + provider: p, + }, nil + default: + return nil, fmt.Errorf("unknown backend configuration") + } +} + +func (p *addSessionMetadata) Run(ev *beat.Event) (*beat.Event, error) { + _, err := ev.GetValue(p.config.PIDField) + if err != nil { + // Do not attempt to enrich events without PID; it's not a supported event + return ev, nil //nolint:nilerr // Running on events without PID is expected + } + + err = p.provider.UpdateDB(ev) + if err != nil { + return ev, err + } + + result, err := p.enrich(ev) + if err != nil { + return ev, fmt.Errorf("enriching event: %w", err) + } + return result, nil +} + +func (p *addSessionMetadata) String() string { + return fmt.Sprintf("%v=[backend=%s, pid_field=%s, replace_fields=%t]", + processorName, p.config.Backend, p.config.PIDField, p.config.ReplaceFields) +} + +func (p *addSessionMetadata) enrich(ev *beat.Event) (*beat.Event, error) { + pidIf, err := ev.GetValue(p.config.PIDField) + if err != nil { + return nil, err + } + pid, err := pidToUInt32(pidIf) + if err != nil { + return nil, fmt.Errorf("cannot parse pid field '%s': %w", p.config.PIDField, err) + } + + fullProcess, err := p.db.GetProcess(pid) + if err != nil { + return nil, fmt.Errorf("pid %v not found in db: %w", pid, err) + } + + processMap := fullProcess.ToMap() + + if b, err := ev.Fields.HasKey("process"); !b || err != nil { + return nil, fmt.Errorf("no process field in event") + } + m, ok := tryToMapStr(ev.Fields["process"]) + if !ok { + return nil, fmt.Errorf("process field type not supported") + } + + result := ev.Clone() + err = mapstr.MergeFieldsDeep(m, processMap, true) + if err != nil { + return nil, fmt.Errorf("merging enriched fields with event: %w", err) + } + result.Fields["process"] = m + + if p.config.ReplaceFields { + if err := p.replaceFields(result); err != nil { + return nil, fmt.Errorf("replace fields: %w", err) + } + } + return result, nil +} + +// pidToUInt32 converts PID value to uint32 +func pidToUInt32(value interface{}) (pid uint32, err error) { + switch v := value.(type) { + case string: + nr, err := strconv.Atoi(v) + if err != nil { + return 0, fmt.Errorf("error converting string to integer: %w", err) + } + pid = uint32(nr) + case uint32: + pid = v + case int, int8, int16, int32, int64: + pid64 := reflect.ValueOf(v).Int() + if pid = uint32(pid64); int64(pid) != pid64 { + return 0, fmt.Errorf("integer out of range: %d", pid64) + } + case uint, uintptr, uint8, uint16, uint64: + pidu64 := reflect.ValueOf(v).Uint() + if pid = uint32(pidu64); uint64(pid) != pidu64 { + return 0, fmt.Errorf("integer out of range: %d", pidu64) + } + default: + return 0, fmt.Errorf("not an integer or string, but %T", v) + } + return pid, nil +} + +// replaceFields replaces event fields with values suitable user with the session viewer in Kibana +// The current version of session view in Kibana expects different values than what are used by auditbeat +// for some fields. This function converts these field to have values that will work with session view. +// +// This function is temporary, and can be removed when this Kibana issue is completed: https://github.com/elastic/kibana/issues/179396. +func (p *addSessionMetadata) replaceFields(ev *beat.Event) error { + kind, err := ev.Fields.GetValue("event.kind") + if err != nil { + return err + } + isAuditdEvent, err := ev.Fields.HasKey("auditd") + if err != nil { + return err + } + if kind == "event" && isAuditdEvent { + // process start + syscall, err := ev.Fields.GetValue("auditd.data.syscall") + if err != nil { + return nil //nolint:nilerr // processor can be called on unsupported events; not an error + } + switch syscall { + case "execveat", "execve": + ev.Fields.Put("event.action", []string{"exec", "fork"}) + ev.Fields.Put("event.type", []string{"start"}) + + case "exit_group": + ev.Fields.Put("event.action", []string{"end"}) + ev.Fields.Put("event.type", []string{"end"}) + ev.Fields.Put("process.end", time.Now()) + } + } + return nil +} + +func tryToMapStr(v interface{}) (mapstr.M, bool) { + switch m := v.(type) { + case mapstr.M: + return m, true + case map[string]interface{}: + return mapstr.M(m), true + default: + return nil, false + } +} diff --git a/x-pack/auditbeat/processors/sessionmd/add_session_metadata_test.go b/x-pack/auditbeat/processors/sessionmd/add_session_metadata_test.go new file mode 100644 index 000000000000..4890505aac48 --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/add_session_metadata_test.go @@ -0,0 +1,378 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build linux + +package sessionmd + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/processdb" + "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/procfs" + "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/types" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +var ( + enrichTests = []struct { + testName string + mockProcesses []types.ProcessExecEvent + config config + input beat.Event + expected beat.Event + expect_error bool + }{ + { + testName: "enrich process", + config: config{ + ReplaceFields: false, + PIDField: "process.pid", + }, + mockProcesses: []types.ProcessExecEvent{ + { + PIDs: types.PIDInfo{ + Tid: uint32(100), + Tgid: uint32(100), + Ppid: uint32(50), + Pgid: uint32(100), + Sid: uint32(40), + }, + CWD: "/", + Filename: "/bin/ls", + }, + { + PIDs: types.PIDInfo{ + Tid: uint32(50), + Tgid: uint32(50), + Ppid: uint32(40), + Sid: uint32(40), + }, + }, + { + PIDs: types.PIDInfo{ + Tid: uint32(40), + Tgid: uint32(40), + Ppid: uint32(1), + Sid: uint32(1), + }, + }, + }, + input: beat.Event{ + Fields: mapstr.M{ + "process": mapstr.M{ + "pid": uint32(100), + }, + }, + }, + expected: beat.Event{ + Fields: mapstr.M{ + "process": mapstr.M{ + "executable": "/bin/ls", + "working_directory": "/", + "pid": uint32(100), + "parent": mapstr.M{ + "pid": uint32(50), + }, + "session_leader": mapstr.M{ + "pid": uint32(40), + }, + "group_leader": mapstr.M{ + "pid": uint32(100), + }, + }, + }, + }, + expect_error: false, + }, + { + testName: "no PID field in event", + config: config{ + ReplaceFields: false, + PIDField: "process.pid", + }, + input: beat.Event{ + Fields: mapstr.M{ + "process": mapstr.M{ + "executable": "ls", + "working_directory": "/", + "parent": mapstr.M{ + "pid": uint32(100), + }, + }, + }, + }, + expect_error: true, + }, + { + testName: "PID not number", + config: config{ + ReplaceFields: false, + PIDField: "process.pid", + }, + input: beat.Event{ + Fields: mapstr.M{ + "process": mapstr.M{ + "pid": "xyz", + "executable": "ls", + "working_directory": "/", + "parent": mapstr.M{ + "pid": uint32(50), + }, + }, + }, + }, + expect_error: true, + }, + { + testName: "PID not in DB", + config: config{ + ReplaceFields: false, + PIDField: "process.pid", + }, + input: beat.Event{ + Fields: mapstr.M{ + "process": mapstr.M{ + "pid": "100", + "executable": "ls", + "working_directory": "/", + "parent": mapstr.M{ + "pid": uint32(100), + }, + }, + }, + }, + expect_error: true, + }, + { + testName: "process field not in event", + // This event, without a "process" field, is not supported by enrich, it should be handled gracefully + config: config{ + ReplaceFields: false, + PIDField: "action.pid", + }, + input: beat.Event{ + Fields: mapstr.M{ + "action": mapstr.M{ + "pid": "1010", + }, + }, + }, + expect_error: true, + }, + { + testName: "process field not mapstr", + // Unsupported process field type should be handled gracefully + config: config{ + ReplaceFields: false, + PIDField: "action.pid", + }, + input: beat.Event{ + Fields: mapstr.M{ + "action": mapstr.M{ + "pid": "100", + }, + "process": map[int]int{ + 10: 100, + 20: 200, + }, + }, + }, + expect_error: true, + }, + { + testName: "enrich event with map[string]any process field", + config: config{ + ReplaceFields: false, + PIDField: "process.pid", + }, + mockProcesses: []types.ProcessExecEvent{ + { + PIDs: types.PIDInfo{ + Tid: uint32(100), + Tgid: uint32(100), + Ppid: uint32(50), + Pgid: uint32(100), + Sid: uint32(40), + }, + CWD: "/", + Filename: "/bin/ls", + }, + { + PIDs: types.PIDInfo{ + Tid: uint32(50), + Tgid: uint32(50), + Ppid: uint32(40), + Sid: uint32(40), + }, + }, + { + PIDs: types.PIDInfo{ + Tid: uint32(40), + Tgid: uint32(40), + Ppid: uint32(1), + Sid: uint32(1), + }, + }, + }, + input: beat.Event{ + Fields: map[string]any{ + "process": map[string]any{ + "pid": uint32(100), + }, + }, + }, + expected: beat.Event{ + Fields: mapstr.M{ + "process": mapstr.M{ + "executable": "/bin/ls", + "working_directory": "/", + "pid": uint32(100), + "parent": mapstr.M{ + "pid": uint32(50), + }, + "session_leader": mapstr.M{ + "pid": uint32(40), + }, + "group_leader": mapstr.M{ + "pid": uint32(100), + }, + }, + }, + }, + expect_error: false, + }, + } + + filterTests = []struct { + testName string + mx mapstr.M + my mapstr.M + expected bool + }{ + { + testName: "equal", + mx: mapstr.M{ + "key1": "A", + "key2": mapstr.M{ + "key2_2": 2.0, + }, + "key3": 1, + }, + my: mapstr.M{ + "key1": "A", + "key2": mapstr.M{ + "key2_2": 2.0, + }, + "key3": 1, + }, + expected: true, + }, + { + testName: "mismatched values", + mx: mapstr.M{ + "key1": "A", + "key2": "B", + "key3": "C", + }, + my: mapstr.M{ + "key1": "A", + "key2": "X", + "key3": "C", + }, + expected: false, + }, + { + testName: "ignore key only in 2nd map", + mx: mapstr.M{ + "key1": "A", + "key2": "B", + }, + my: mapstr.M{ + "key1": "A", + "key2": "B", + "key3": "C", + }, + expected: true, + }, + { + testName: "nested mismatch", + mx: mapstr.M{ + "key1": "A", + "key2": mapstr.M{ + "key2_2": "B", + }, + }, + my: mapstr.M{ + "key1": "A", + "key2": mapstr.M{ + "key2_2": 2.0, + }, + "key3": 1, + }, + expected: false, + }, + } + + logger = logp.NewLogger("add_session_metadata_test") +) + +func TestEnrich(t *testing.T) { + for _, tt := range enrichTests { + reader := procfs.NewMockReader() + db, err := processdb.NewDB(reader, *logger) + assert.Nil(t, err) + + for _, ev := range tt.mockProcesses { + db.InsertExec(ev) + } + s := addSessionMetadata{ + logger: logger, + db: db, + config: tt.config, + } + + // avoid taking address of loop variable + i := tt.input + actual, err := s.enrich(&i) + if tt.expect_error { + assert.Error(t, err, "%s: error unexpectedly nil", tt.testName) + } else { + assert.Nil(t, err, "%s: enrich error: %w", tt.testName, err) + assert.NotNil(t, actual, "%s: returned nil event", tt.testName) + + //Validate output + if diff := cmp.Diff(tt.expected.Fields, actual.Fields, ignoreMissingFrom(tt.expected.Fields)); diff != "" { + t.Errorf("field mismatch:\n%s", diff) + } + } + } +} + +// IgnoreMissingFrom returns a filter that will ignore all fields missing from m +func ignoreMissingFrom(m mapstr.M) cmp.Option { + return cmp.FilterPath(func(p cmp.Path) bool { + mi, ok := p.Index(-1).(cmp.MapIndex) + if !ok { + return false + } + vx, _ := mi.Values() + return !vx.IsValid() + }, cmp.Ignore()) +} + +// TestFilter ensures `ignoreMissingFrom` filter is working as expected +// Note: This validates test code only +func TestFilter(t *testing.T) { + for _, tt := range filterTests { + if eq := cmp.Equal(tt.mx, tt.my, ignoreMissingFrom(tt.mx)); eq != tt.expected { + t.Errorf("%s: unexpected comparator result", tt.testName) + } + } +} diff --git a/x-pack/auditbeat/processors/sessionmd/config.go b/x-pack/auditbeat/processors/sessionmd/config.go new file mode 100644 index 000000000000..31c07c9065f1 --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/config.go @@ -0,0 +1,22 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build linux + +package sessionmd + +// Config for add_session_metadata processor. +type config struct { + Backend string `config:"backend"` + ReplaceFields bool `config:"replace_fields"` + PIDField string `config:"pid_field"` +} + +func defaultConfig() config { + return config{ + Backend: "auto", + ReplaceFields: false, + PIDField: "process.pid", + } +} diff --git a/x-pack/auditbeat/processors/sessionmd/doc.go b/x-pack/auditbeat/processors/sessionmd/doc.go new file mode 100644 index 000000000000..6067081c82cb --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/doc.go @@ -0,0 +1,7 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// sessionmd provides a Beat processor that can enrich process event documents with +// additional session metadata for the processes. +package sessionmd diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/db.go b/x-pack/auditbeat/processors/sessionmd/processdb/db.go new file mode 100644 index 000000000000..6b2de897973b --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/processdb/db.go @@ -0,0 +1,668 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build linux + +package processdb + +import ( + "encoding/base64" + "errors" + "fmt" + "os" + "path" + "slices" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/elastic/beats/v7/libbeat/common/capabilities" + "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/procfs" + "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/timeutils" + "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/types" + "github.com/elastic/elastic-agent-libs/logp" +) + +type TTYType int + +const ( + TTYUnknown TTYType = iota + Pts + TTY + TTYConsole +) + +type EntryType string + +const ( + Init EntryType = "init" + Sshd EntryType = "sshd" + Ssm EntryType = "ssm" + Container EntryType = "container" + Terminal EntryType = "terminal" + EntryConsole EntryType = "console" + EntryUnknown EntryType = "unknown" +) + +var containerRuntimes = [...]string{ + "containerd-shim", + "runc", + "conmon", +} + +// "filtered" executables are executables that relate to internal +// implementation details of entry mechanisms. The set of circumstances under +// which they can become an entry leader are reduced compared to other binaries +// (see implementation and unit tests). +var filteredExecutables = [...]string{ + "runc", + "containerd-shim", + "calico-node", + "check-status", + "conmon", +} + +const ( + ptsMinMajor = 136 + ptsMaxMajor = 143 + ttyMajor = 4 + consoleMaxMinor = 63 + ttyMaxMinor = 255 +) + +type Process struct { + PIDs types.PIDInfo + Creds types.CredInfo + CTTY types.TTYDev + Argv []string + Cwd string + Env map[string]string + Filename string +} + +var ( + bootID string + pidNsInode uint64 + initError error + once sync.Once +) + +func readBootID() (string, error) { + bootID, err := os.ReadFile("/proc/sys/kernel/random/boot_id") + if err != nil { + panic(fmt.Sprintf("could not read /proc/sys/kernel/random/boot_id: %v", err)) + } + + return strings.TrimRight(string(bootID), "\n"), nil +} + +func readPIDNsInode() (uint64, error) { + var ret uint64 + + pidNsInodeRaw, err := os.Readlink("/proc/self/ns/pid") + if err != nil { + panic(fmt.Sprintf("could not read /proc/self/ns/pid: %v", err)) + } + + if _, err = fmt.Sscanf(pidNsInodeRaw, "pid:[%d]", &ret); err != nil { + panic(fmt.Sprintf("could not parse contents of /proc/self/ns/pid (%s): %v", pidNsInodeRaw, err)) + } + + return ret, nil +} + +func pidInfoFromProto(p types.PIDInfo) types.PIDInfo { + return types.PIDInfo{ + StartTimeNS: p.StartTimeNS, + Tid: p.Tid, + Tgid: p.Tgid, + Vpid: p.Vpid, + Ppid: p.Ppid, + Pgid: p.Pgid, + Sid: p.Sid, + } +} + +func credInfoFromProto(p types.CredInfo) types.CredInfo { + return types.CredInfo{ + Ruid: p.Ruid, + Rgid: p.Rgid, + Euid: p.Euid, + Egid: p.Egid, + Suid: p.Suid, + Sgid: p.Sgid, + CapPermitted: p.CapPermitted, + CapEffective: p.CapEffective, + } +} + +func ttyTermiosFromProto(p types.TTYTermios) types.TTYTermios { + return types.TTYTermios{ + CIflag: p.CIflag, + COflag: p.COflag, + CLflag: p.CLflag, + CCflag: p.CCflag, + } +} + +func ttyWinsizeFromProto(p types.TTYWinsize) types.TTYWinsize { + return types.TTYWinsize{ + Rows: p.Rows, + Cols: p.Cols, + } +} + +func ttyDevFromProto(p types.TTYDev) types.TTYDev { + return types.TTYDev{ + Major: p.Major, + Minor: p.Minor, + Winsize: ttyWinsizeFromProto(p.Winsize), + Termios: ttyTermiosFromProto(p.Termios), + } +} + +func initialize() { + var err error + bootID, err = readBootID() + if err != nil { + initError = err + return + } + pidNsInode, err = readPIDNsInode() + if err != nil { + initError = err + } +} + +type DB struct { + mutex sync.RWMutex + logger *logp.Logger + processes map[uint32]Process + entryLeaders map[uint32]EntryType + entryLeaderRelationships map[uint32]uint32 + procfs procfs.Reader +} + +func NewDB(reader procfs.Reader, logger logp.Logger) (*DB, error) { + once.Do(initialize) + if initError != nil { + return &DB{}, initError + } + return &DB{ + logger: logp.NewLogger("processdb"), + processes: make(map[uint32]Process), + entryLeaders: make(map[uint32]EntryType), + entryLeaderRelationships: make(map[uint32]uint32), + procfs: reader, + }, nil +} + +func (db *DB) calculateEntityIDv1(pid uint32, startTime time.Time) string { + return base64.StdEncoding.EncodeToString( + []byte( + fmt.Sprintf("%d__%s__%d__%d", + pidNsInode, + bootID, + uint64(pid), + uint64(startTime.Unix()), + ), + ), + ) +} + +// `path.Base` returns a '.' for empty strings, this just special cases that +// situation to return an empty string +func basename(pathStr string) string { + if pathStr == "" { + return "" + } + + return path.Base(pathStr) +} + +func (db *DB) InsertFork(fork types.ProcessForkEvent) { + db.mutex.Lock() + defer db.mutex.Unlock() + + pid := fork.ChildPIDs.Tgid + ppid := fork.ParentPIDs.Tgid + if entry, ok := db.processes[ppid]; ok { + entry.PIDs = pidInfoFromProto(fork.ChildPIDs) + entry.Creds = credInfoFromProto(fork.Creds) + db.processes[pid] = entry + if entryPID, ok := db.entryLeaderRelationships[ppid]; ok { + db.entryLeaderRelationships[pid] = entryPID + } + } else { + db.processes[pid] = Process{ + PIDs: pidInfoFromProto(fork.ChildPIDs), + Creds: credInfoFromProto(fork.Creds), + } + } +} + +func (db *DB) insertProcess(process Process) { + pid := process.PIDs.Tgid + db.processes[pid] = process + entryLeaderPID := db.evaluateEntryLeader(process) + if entryLeaderPID != nil { + db.entryLeaderRelationships[pid] = *entryLeaderPID + db.logger.Debugf("%v name: %s, entry_leader: %d, entry_type: %s", process.PIDs, process.Filename, *entryLeaderPID, string(db.entryLeaders[*entryLeaderPID])) + } else { + db.logger.Debugf("%v name: %s, NO ENTRY LEADER", process.PIDs, process.Filename) + } +} + +func (db *DB) InsertExec(exec types.ProcessExecEvent) { + db.mutex.Lock() + defer db.mutex.Unlock() + + proc := Process{ + PIDs: pidInfoFromProto(exec.PIDs), + Creds: credInfoFromProto(exec.Creds), + CTTY: ttyDevFromProto(exec.CTTY), + Argv: exec.Argv, + Cwd: exec.CWD, + Env: exec.Env, + Filename: exec.Filename, + } + + db.processes[exec.PIDs.Tgid] = proc + entryLeaderPID := db.evaluateEntryLeader(proc) + if entryLeaderPID != nil { + db.entryLeaderRelationships[exec.PIDs.Tgid] = *entryLeaderPID + } +} + +func (db *DB) createEntryLeader(pid uint32, entryType EntryType) { + db.entryLeaders[pid] = entryType + db.logger.Debugf("created entry leader %d: %s, name: %s", pid, string(entryType), db.processes[pid].Filename) +} + +// pid returned is a pointer type because its possible for no +func (db *DB) evaluateEntryLeader(p Process) *uint32 { + pid := p.PIDs.Tgid + + // init never has an entry leader or meta type + if p.PIDs.Tgid == 1 { + db.logger.Debugf("entry_eval %d: process is init, no entry type", p.PIDs.Tgid) + return nil + } + + // kernel threads also never have an entry leader or meta type kthreadd + // (always pid 2) is the parent of all kernel threads, by filtering pid == + // 2 || ppid == 2, we get rid of all of them + if p.PIDs.Tgid == 2 || p.PIDs.Ppid == 2 { + db.logger.Debugf("entry_eval %d: kernel threads never an entry type (parent is pid 2)", p.PIDs.Tgid) + return nil + } + + // could be an entry leader + if p.PIDs.Tgid == p.PIDs.Sid { + ttyType := getTTYType(p.CTTY.Major, p.CTTY.Minor) + + procBasename := basename(p.Filename) + switch { + case ttyType == TTY: + db.createEntryLeader(pid, Terminal) + db.logger.Debugf("entry_eval %d: entry type is terminal", p.PIDs.Tgid) + return &pid + case ttyType == TTYConsole && procBasename == "login": + db.createEntryLeader(pid, EntryConsole) + db.logger.Debugf("entry_eval %d: entry type is console", p.PIDs.Tgid) + return &pid + case p.PIDs.Ppid == 1: + db.createEntryLeader(pid, Init) + db.logger.Debugf("entry_eval %d: entry type is init", p.PIDs.Tgid) + return &pid + case !isFilteredExecutable(procBasename): + if parent, ok := db.processes[p.PIDs.Ppid]; ok { + parentBasename := basename(parent.Filename) + if ttyType == Pts && parentBasename == "ssm-session-worker" { + db.createEntryLeader(pid, Ssm) + db.logger.Debugf("entry_eval %d: entry type is ssm", p.PIDs.Tgid) + return &pid + } else if parentBasename == "sshd" && procBasename != "sshd" { + // TODO: get ip from env vars + db.createEntryLeader(pid, Sshd) + db.logger.Debugf("entry_eval %d: entry type is sshd", p.PIDs.Tgid) + return &pid + } else if isContainerRuntime(parentBasename) { + db.createEntryLeader(pid, Container) + db.logger.Debugf("entry_eval %d: entry type is container", p.PIDs.Tgid) + return &pid + } + } + default: + db.logger.Debugf("entry_eval %d: is a filtered executable: %s", p.PIDs.Tgid, procBasename) + } + } + + // if not a session leader or was not determined to be an entry leader, get + // it via parent, session leader, group leader (in that order) + relations := []struct { + pid uint32 + name string + }{ + { + pid: p.PIDs.Ppid, + name: "parent", + }, + { + pid: p.PIDs.Sid, + name: "session_leader", + }, + { + pid: p.PIDs.Pgid, + name: "group_leader", + }, + } + + for _, relation := range relations { + if entry, ok := db.entryLeaderRelationships[relation.pid]; ok { + entryType := db.entryLeaders[entry] + db.logger.Debugf("entry_eval %d: got entry_leader: %d (%s), from relative: %d (%s)", p.PIDs.Tgid, entry, string(entryType), relation.pid, relation.name) + return &entry + } else { + db.logger.Debugf("entry_eval %d: failed to find relative: %d (%s)", p.PIDs.Tgid, relation.pid, relation.name) + } + } + + // if it's a session leader, then make it its own entry leader with unknown + // entry type + if p.PIDs.Tgid == p.PIDs.Sid { + db.createEntryLeader(pid, EntryUnknown) + db.logger.Debugf("entry_eval %d: this is a session leader and no relative has an entry leader. entry type is unknown", p.PIDs.Tgid) + return &pid + } + + db.logger.Debugf("entry_eval %d: this is not a session leader and no relative has an entry leader, entry_leader will be unset", p.PIDs.Tgid) + return nil +} + +func (db *DB) InsertSetsid(setsid types.ProcessSetsidEvent) { + db.mutex.Lock() + defer db.mutex.Unlock() + + if entry, ok := db.processes[setsid.PIDs.Tgid]; ok { + entry.PIDs = pidInfoFromProto(setsid.PIDs) + db.processes[setsid.PIDs.Tgid] = entry + } else { + db.processes[setsid.PIDs.Tgid] = Process{ + PIDs: pidInfoFromProto(setsid.PIDs), + } + } +} + +func (db *DB) InsertExit(exit types.ProcessExitEvent) { + db.mutex.Lock() + defer db.mutex.Unlock() + + pid := exit.PIDs.Tgid + delete(db.processes, pid) + delete(db.entryLeaders, pid) + delete(db.entryLeaderRelationships, pid) +} + +func interactiveFromTTY(tty types.TTYDev) bool { + return TTYUnknown != getTTYType(tty.Major, tty.Minor) +} + +func fullProcessFromDBProcess(p Process) types.Process { + reducedPrecisionStartTime := timeutils.ReduceTimestampPrecision(p.PIDs.StartTimeNS) + interactive := interactiveFromTTY(p.CTTY) + + ret := types.Process{ + PID: p.PIDs.Tgid, + Start: timeutils.TimeFromNsSinceBoot(reducedPrecisionStartTime), + Name: basename(p.Filename), + Executable: p.Filename, + Args: p.Argv, + WorkingDirectory: p.Cwd, + Interactive: &interactive, + } + + euid := p.Creds.Euid + egid := p.Creds.Egid + ret.User.ID = strconv.FormatUint(uint64(euid), 10) + ret.Group.ID = strconv.FormatUint(uint64(egid), 10) + ret.Thread.Capabilities.Permitted, _ = capabilities.FromUint64(p.Creds.CapPermitted) + ret.Thread.Capabilities.Effective, _ = capabilities.FromUint64(p.Creds.CapEffective) + + return ret +} + +func fillParent(process *types.Process, parent Process) { + reducedPrecisionStartTime := timeutils.ReduceTimestampPrecision(parent.PIDs.StartTimeNS) + + interactive := interactiveFromTTY(parent.CTTY) + euid := parent.Creds.Euid + egid := parent.Creds.Egid + process.Parent.PID = parent.PIDs.Tgid + process.Parent.Start = timeutils.TimeFromNsSinceBoot(reducedPrecisionStartTime) + process.Parent.Name = basename(parent.Filename) + process.Parent.Executable = parent.Filename + process.Parent.Args = parent.Argv + process.Parent.WorkingDirectory = parent.Cwd + process.Parent.Interactive = &interactive + process.Parent.User.ID = strconv.FormatUint(uint64(euid), 10) + process.Parent.Group.ID = strconv.FormatUint(uint64(egid), 10) +} + +func fillGroupLeader(process *types.Process, groupLeader Process) { + reducedPrecisionStartTime := timeutils.ReduceTimestampPrecision(groupLeader.PIDs.StartTimeNS) + + interactive := interactiveFromTTY(groupLeader.CTTY) + euid := groupLeader.Creds.Euid + egid := groupLeader.Creds.Egid + process.GroupLeader.PID = groupLeader.PIDs.Tgid + process.GroupLeader.Start = timeutils.TimeFromNsSinceBoot(reducedPrecisionStartTime) + process.GroupLeader.Name = basename(groupLeader.Filename) + process.GroupLeader.Executable = groupLeader.Filename + process.GroupLeader.Args = groupLeader.Argv + process.GroupLeader.WorkingDirectory = groupLeader.Cwd + process.GroupLeader.Interactive = &interactive + process.GroupLeader.User.ID = strconv.FormatUint(uint64(euid), 10) + process.GroupLeader.Group.ID = strconv.FormatUint(uint64(egid), 10) +} + +func fillSessionLeader(process *types.Process, sessionLeader Process) { + reducedPrecisionStartTime := timeutils.ReduceTimestampPrecision(sessionLeader.PIDs.StartTimeNS) + + interactive := interactiveFromTTY(sessionLeader.CTTY) + euid := sessionLeader.Creds.Euid + egid := sessionLeader.Creds.Egid + process.SessionLeader.PID = sessionLeader.PIDs.Tgid + process.SessionLeader.Start = timeutils.TimeFromNsSinceBoot(reducedPrecisionStartTime) + process.SessionLeader.Name = basename(sessionLeader.Filename) + process.SessionLeader.Executable = sessionLeader.Filename + process.SessionLeader.Args = sessionLeader.Argv + process.SessionLeader.WorkingDirectory = sessionLeader.Cwd + process.SessionLeader.Interactive = &interactive + process.SessionLeader.User.ID = strconv.FormatUint(uint64(euid), 10) + process.SessionLeader.Group.ID = strconv.FormatUint(uint64(egid), 10) +} + +func fillEntryLeader(process *types.Process, entryType EntryType, entryLeader Process) { + reducedPrecisionStartTime := timeutils.ReduceTimestampPrecision(entryLeader.PIDs.StartTimeNS) + + interactive := interactiveFromTTY(entryLeader.CTTY) + euid := entryLeader.Creds.Euid + egid := entryLeader.Creds.Egid + process.EntryLeader.PID = entryLeader.PIDs.Tgid + process.EntryLeader.Start = timeutils.TimeFromNsSinceBoot(reducedPrecisionStartTime) + process.EntryLeader.Name = basename(entryLeader.Filename) + process.EntryLeader.Executable = entryLeader.Filename + process.EntryLeader.Args = entryLeader.Argv + process.EntryLeader.WorkingDirectory = entryLeader.Cwd + process.EntryLeader.Interactive = &interactive + process.EntryLeader.User.ID = strconv.FormatUint(uint64(euid), 10) + process.EntryLeader.Group.ID = strconv.FormatUint(uint64(egid), 10) + + process.EntryLeader.EntryMeta.Type = string(entryType) +} + +func (db *DB) setEntityID(process *types.Process) { + if process.PID != 0 && process.Start != nil { + process.EntityID = db.calculateEntityIDv1(process.PID, *process.Start) + } + + if process.Parent.PID != 0 && process.Parent.Start != nil { + process.Parent.EntityID = db.calculateEntityIDv1(process.Parent.PID, *process.Parent.Start) + } + + if process.GroupLeader.PID != 0 && process.GroupLeader.Start != nil { + process.GroupLeader.EntityID = db.calculateEntityIDv1(process.GroupLeader.PID, *process.GroupLeader.Start) + } + + if process.SessionLeader.PID != 0 && process.SessionLeader.Start != nil { + process.SessionLeader.EntityID = db.calculateEntityIDv1(process.SessionLeader.PID, *process.SessionLeader.Start) + } + + if process.EntryLeader.PID != 0 && process.EntryLeader.Start != nil { + process.EntryLeader.EntityID = db.calculateEntityIDv1(process.EntryLeader.PID, *process.EntryLeader.Start) + } +} + +func setSameAsProcess(process *types.Process) { + if process.GroupLeader.PID != 0 && process.GroupLeader.Start != nil { + sameAsProcess := process.PID == process.GroupLeader.PID + process.GroupLeader.SameAsProcess = &sameAsProcess + } + + if process.SessionLeader.PID != 0 && process.SessionLeader.Start != nil { + sameAsProcess := process.PID == process.SessionLeader.PID + process.SessionLeader.SameAsProcess = &sameAsProcess + } + + if process.EntryLeader.PID != 0 && process.EntryLeader.Start != nil { + sameAsProcess := process.PID == process.EntryLeader.PID + process.EntryLeader.SameAsProcess = &sameAsProcess + } +} + +func (db *DB) GetProcess(pid uint32) (types.Process, error) { + db.mutex.RLock() + defer db.mutex.RUnlock() + + process, ok := db.processes[pid] + if !ok { + return types.Process{}, errors.New("process not found") + } + + ret := fullProcessFromDBProcess(process) + + if parent, ok := db.processes[process.PIDs.Ppid]; ok { + fillParent(&ret, parent) + } + + if groupLeader, ok := db.processes[process.PIDs.Pgid]; ok { + fillGroupLeader(&ret, groupLeader) + } + + if sessionLeader, ok := db.processes[process.PIDs.Sid]; ok { + fillSessionLeader(&ret, sessionLeader) + } + + if entryLeaderPID, foundEntryLeaderPID := db.entryLeaderRelationships[process.PIDs.Tgid]; foundEntryLeaderPID { + if entryLeader, foundEntryLeader := db.processes[entryLeaderPID]; foundEntryLeader { + // if there is an entry leader then there is a matching member in the entryLeaders table + fillEntryLeader(&ret, db.entryLeaders[entryLeaderPID], entryLeader) + } else { + db.logger.Errorf("failed to find entry leader entry %d for %d (%s)", entryLeaderPID, pid, db.processes[pid].Filename) + } + } else { + db.logger.Errorf("failed to find entry leader for %d (%s)", pid, db.processes[pid].Filename) + } + + db.setEntityID(&ret) + setSameAsProcess(&ret) + + return ret, nil +} + +func (db *DB) GetEntryType(pid uint32) (EntryType, error) { + db.mutex.RLock() + defer db.mutex.RUnlock() + + if entryType, ok := db.entryLeaders[pid]; ok { + return entryType, nil + } + return EntryUnknown, nil +} + +func (db *DB) ScrapeProcfs() []uint32 { + db.mutex.Lock() + defer db.mutex.Unlock() + + procs, err := db.procfs.GetAllProcesses() + if err != nil { + db.logger.Errorf("failed to get processes from procfs: %v", err) + return make([]uint32, 0) + } + + // sorting the slice to make sure that parents, session leaders, group + // leaders come first in the queue + sort.Slice(procs, func(i, j int) bool { + return procs[i].PIDs.Tgid == procs[j].PIDs.Ppid || + procs[i].PIDs.Tgid == procs[j].PIDs.Sid || + procs[i].PIDs.Tgid == procs[j].PIDs.Pgid + }) + + pids := make([]uint32, 0) + for _, procInfo := range procs { + process := Process{ + PIDs: pidInfoFromProto(procInfo.PIDs), + Creds: credInfoFromProto(procInfo.Creds), + CTTY: ttyDevFromProto(procInfo.CTTY), + Argv: procInfo.Argv, + Cwd: procInfo.Cwd, + Env: procInfo.Env, + Filename: procInfo.Filename, + } + + db.insertProcess(process) + pids = append(pids, process.PIDs.Tgid) + } + + return pids +} + +func stringStartsWithEntryInList(str string, list []string) bool { + for _, entry := range list { + if strings.HasPrefix(str, entry) { + return true + } + } + + return false +} + +func isContainerRuntime(executable string) bool { + return slices.ContainsFunc(containerRuntimes[:], func(s string) bool { + return strings.HasPrefix(executable, s) + }) +} + +func isFilteredExecutable(executable string) bool { + return stringStartsWithEntryInList(executable, filteredExecutables[:]) +} + +func getTTYType(major uint16, minor uint16) TTYType { + if major >= ptsMinMajor && major <= ptsMaxMajor { + return Pts + } + + if ttyMajor == major { + if minor <= consoleMaxMinor { + return TTYConsole + } else if minor > consoleMaxMinor && minor <= ttyMaxMinor { + return TTY + } + } + + return TTYUnknown +} diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/db_test.go b/x-pack/auditbeat/processors/sessionmd/processdb/db_test.go new file mode 100644 index 000000000000..5e8001a68e54 --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/processdb/db_test.go @@ -0,0 +1,24 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build linux + +package processdb + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-libs/logp" +) + +var logger = logp.NewLogger("processdb") + +func TestGetTTYType(t *testing.T) { + assert.Equal(t, TTYConsole, getTTYType(4, 0)) + assert.Equal(t, Pts, getTTYType(136, 0)) + assert.Equal(t, TTY, getTTYType(4, 64)) + assert.Equal(t, TTYUnknown, getTTYType(1000, 1000)) +} diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/entry_leader_test.go b/x-pack/auditbeat/processors/sessionmd/processdb/entry_leader_test.go new file mode 100644 index 000000000000..15f98250f55d --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/processdb/entry_leader_test.go @@ -0,0 +1,1244 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build linux + +package processdb + +import ( + "path" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/procfs" + "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/types" +) + +const ( + containerdShimPath = "/bin/containerd-shim-runc-v2" + containerdPath = "/bin/containerd" + sshdPath = "/usr/bin/sshd" + lsPath = "/usr/bin/ls" + bashPath = "/usr/bin/bash" + grepPath = "/usr/bin/grep" +) + +// Entry evaluation tests +// +// The entry leader isn't an entirely rigorous conceptual framework but that +// shortcoming is outweighted by the large and immediate value it provides. +// +// The idea is to assign two pieces of data to each process, the "entry meta" +// and "entry leader", the former of which describes how the user or system +// that was ultimately responsible for executing this process got into to the +// box (e.g. ssh, ssm, kubectl exec) and the latter of which describes the +// process associated with the user or system's initial entry into the "box" +// (be it a container, VM or otherwise). +// +// Generally speaking, the first session leader in a process lineage of an +// interactive session is an entry leader having an entry meta type depending +// on its lineage. For example, in the following process tree, "bash" is an +// entry leader with entry meta type "sshd": +// +// systemd (pid 1 sid 1) +// \___ sshd (pid 100 sid 100) +// \___ bash (pid 1000 sid 1000) +// \___ vim (pid 1001 sid 1000) +// +// Further entry meta types exist for ssm, container runtimes, serial consoles +// and other ways to get into a "box" (be it a container or actual machine). +// The entry meta type "init" is assigned to system processes created by the +// init service (e.g. rsyslogd, sshd). +// +// As should probably be apparent, the code to assign an entry meta type to a +// process is essentially a large amount of conditional logic with a ton of +// edge cases. It's something we "bolt on" to the linux process model, and thus +// finicky and highly subject to bugs. +// +// Thankfully, writing unit tests for entry leader evaluation is rather +// straightforward as it's basically a pure function that requires no external +// infrastructure to test (just create a mock process event with your desired +// fields set and pass it in). +// +// These tests should effectively serve as the spec for how we assign entry +// leaders. When further entry meta types or cases are added, tests should be + +func requireProcess(t *testing.T, db *DB, pid uint32, processPath string) { + t.Helper() + process, err := db.GetProcess(pid) + require.Nil(t, err) + require.Equal(t, pid, process.PID) + require.Equal(t, processPath, process.Executable) + if processPath == "" { + require.Equal(t, "", process.Name) + } else { + require.Equal(t, path.Base(processPath), process.Name) + } +} + +func requireParent(t *testing.T, db *DB, pid uint32, ppid uint32) { + t.Helper() + process, err := db.GetProcess(pid) + require.Nil(t, err) + require.Equal(t, ppid, process.Parent.PID) +} + +func requireParentUnset(t *testing.T, process types.Process) { + t.Helper() + require.Equal(t, "", process.Parent.EntityID) + require.Equal(t, uint32(0), process.Parent.PID) + require.Nil(t, process.Parent.Start) +} + +func requireSessionLeader(t *testing.T, db *DB, pid uint32, sid uint32) { + t.Helper() + process, err := db.GetProcess(pid) + require.Nil(t, err) + require.Equal(t, sid, process.SessionLeader.PID) + require.NotNil(t, process.SessionLeader.SameAsProcess) + require.Equal(t, pid == sid, *process.SessionLeader.SameAsProcess) +} + +func requireSessionLeaderUnset(t *testing.T, process types.Process) { + t.Helper() + require.Equal(t, "", process.SessionLeader.EntityID) + require.Equal(t, uint32(0), process.SessionLeader.PID) + require.Nil(t, process.SessionLeader.Start) +} + +func requireGroupLeader(t *testing.T, db *DB, pid uint32, pgid uint32) { + t.Helper() + process, err := db.GetProcess(pid) + require.Nil(t, err) + require.Equal(t, pgid, process.GroupLeader.PID) + require.NotNil(t, process.GroupLeader.SameAsProcess) + require.Equal(t, pid == pgid, *process.GroupLeader.SameAsProcess) +} + +func requireEntryLeader(t *testing.T, db *DB, pid uint32, entryPID uint32, expectedEntryType EntryType) { + t.Helper() + process, err := db.GetProcess(pid) + require.Nil(t, err) + require.Equal(t, entryPID, process.EntryLeader.PID) + require.NotNil(t, process.EntryLeader.SameAsProcess) + require.Equal(t, pid == entryPID, *process.EntryLeader.SameAsProcess) + + entryType, err := db.GetEntryType(entryPID) + require.Nil(t, err) + require.Equal(t, expectedEntryType, entryType) +} + +func requireEntryLeaderUnset(t *testing.T, process types.Process) { + t.Helper() + require.Equal(t, "", process.EntryLeader.EntityID) + require.Equal(t, uint32(0), process.EntryLeader.PID) + require.Nil(t, process.EntryLeader.Start) +} + +// tries to construct fork event from what's in the db +func insertForkAndExec(t *testing.T, db *DB, exec types.ProcessExecEvent) { + t.Helper() + var fork types.ProcessForkEvent + fork.ChildPIDs = exec.PIDs + parent, err := db.GetProcess(exec.PIDs.Ppid) + if err != nil { + fork.ParentPIDs = exec.PIDs + fork.ParentPIDs.Tgid = exec.PIDs.Ppid + fork.ParentPIDs.Ppid = 0 + fork.ParentPIDs.Pgid = 0 + + fork.ChildPIDs.Pgid = exec.PIDs.Ppid + + // if the exec makes itself a session and the parent is no where to be + // found we'll make the parent its own session + if exec.PIDs.Tgid == exec.PIDs.Sid { + fork.ParentPIDs.Sid = exec.PIDs.Ppid + } + } else { + fork.ParentPIDs.Tgid = parent.PID + fork.ParentPIDs.Ppid = parent.Parent.PID + fork.ParentPIDs.Sid = parent.SessionLeader.PID + + // keep group leader the same for now + fork.ParentPIDs.Pgid = exec.PIDs.Pgid + } + + if fork.ParentPIDs.Tgid != 0 { + db.InsertFork(fork) + } + + db.InsertExec(exec) +} + +var systemdPath = "/sbin/systemd" + +func populateProcfsWithInit(reader *procfs.MockReader) { + reader.AddEntry(1, procfs.ProcessInfo{ + PIDs: types.PIDInfo{ + Tid: 1, + Tgid: 1, + Pgid: 0, + Sid: 1, + }, + Filename: systemdPath, + }) +} + +func TestSingleProcessSessionLeaderEntryTypeTerminal(t *testing.T) { + reader := procfs.NewMockReader() + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + pid := uint32(1234) + procPath := "/bin/noproc" + db.InsertExec(types.ProcessExecEvent{ + Filename: procPath, + PIDs: types.PIDInfo{ + Tgid: pid, + Sid: pid, + }, + CTTY: types.TTYDev{ + Major: 4, + Minor: 64, + }, + }) + + requireProcess(t, db, 1234, procPath) + requireEntryLeader(t, db, 1234, 1234, Terminal) +} + +func TestSingleProcessSessionLeaderLoginProcess(t *testing.T) { + reader := procfs.NewMockReader() + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + pid := uint32(1234) + loginPath := "/bin/login" + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: loginPath, + PIDs: types.PIDInfo{ + Tgid: pid, + Sid: pid, + }, + CTTY: types.TTYDev{ + Major: 4, + Minor: 62, + }, + }) + + process, err := db.GetProcess(1234) + require.Nil(t, err) + requireParentUnset(t, process) + + requireProcess(t, db, pid, "/bin/login") + requireSessionLeader(t, db, pid, pid) + requireEntryLeader(t, db, pid, pid, EntryConsole) +} + +func TestSingleProcessSessionLeaderChildOfInit(t *testing.T) { + reader := procfs.NewMockReader() + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + pid := uint32(100) + rsyslogdPath := "/bin/rsyslogd" + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: rsyslogdPath, + PIDs: types.PIDInfo{ + Tgid: pid, + Sid: pid, + Ppid: 1, + }, + CTTY: types.TTYDev{ + Major: 136, + Minor: 62, + }, + }) + + process, err := db.GetProcess(1234) + require.NotNil(t, err) + requireParentUnset(t, process) + + requireProcess(t, db, pid, rsyslogdPath) + requireSessionLeader(t, db, pid, pid) + requireEntryLeader(t, db, pid, pid, Init) +} + +func TestSingleProcessSessionLeaderChildOfSsmSessionWorker(t *testing.T) { + reader := procfs.NewMockReader() + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + ssmPID := uint32(999) + bashPID := uint32(1000) + ssmPath := "/usr/bin/ssm-session-worker" + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: ssmPath, + PIDs: types.PIDInfo{ + Tgid: ssmPID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: bashPath, + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: bashPID, + Ppid: ssmPID, + }, + CTTY: types.TTYDev{ + Major: 136, + Minor: 62, + }, + }) + + requireProcess(t, db, bashPID, bashPath) + requireParent(t, db, bashPID, ssmPID) + requireSessionLeader(t, db, bashPID, bashPID) + requireEntryLeader(t, db, bashPID, bashPID, Ssm) +} + +func TestSingleProcessSessionLeaderChildOfSshd(t *testing.T) { + reader := procfs.NewMockReader() + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + sshdPID := uint32(999) + bashPID := uint32(1000) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshdPID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: bashPath, + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: bashPID, + Ppid: sshdPID, + }, + CTTY: types.TTYDev{ + Major: 136, + Minor: 62, + }, + }) + + requireProcess(t, db, bashPID, bashPath) + requireParent(t, db, bashPID, sshdPID) + requireSessionLeader(t, db, bashPID, bashPID) + requireEntryLeader(t, db, bashPID, bashPID, Sshd) +} + +func TestSingleProcessSessionLeaderChildOfContainerdShim(t *testing.T) { + reader := procfs.NewMockReader() + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + containerdShimPID := uint32(999) + bashPID := uint32(1000) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: containerdShimPath, + PIDs: types.PIDInfo{ + Tgid: containerdShimPID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: bashPath, + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: bashPID, + Ppid: containerdShimPID, + }, + CTTY: types.TTYDev{ + Major: 136, + Minor: 62, + }, + }) + + requireProcess(t, db, bashPID, bashPath) + requireParent(t, db, bashPID, containerdShimPID) + requireSessionLeader(t, db, bashPID, bashPID) + requireEntryLeader(t, db, bashPID, bashPID, Container) +} + +func TestSingleProcessSessionLeaderChildOfRunc(t *testing.T) { + reader := procfs.NewMockReader() + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + runcPID := uint32(999) + bashPID := uint32(1000) + runcPath := "/bin/runc" + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: runcPath, + PIDs: types.PIDInfo{ + Tgid: runcPID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: bashPath, + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: bashPID, + Ppid: runcPID, + }, + CTTY: types.TTYDev{ + Major: 136, + Minor: 62, + }, + }) + + requireProcess(t, db, bashPID, bashPath) + requireParent(t, db, bashPID, runcPID) + requireSessionLeader(t, db, bashPID, bashPID) + requireEntryLeader(t, db, bashPID, bashPID, Container) +} + +func TestSingleProcessEmptyProcess(t *testing.T) { + reader := procfs.NewMockReader() + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + // No information in proc at all, entry type should be "unknown" + // and entry leader pid should be unset (since pid is not set) + pid := uint32(1000) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: bashPath, + PIDs: types.PIDInfo{ + Tgid: pid, + Sid: pid, + }, + CTTY: types.TTYDev{ + Major: 136, + Minor: 62, + }, + }) + + process, err := db.GetProcess(pid) + require.Nil(t, err) + requireParentUnset(t, process) + + requireProcess(t, db, pid, bashPath) + requireSessionLeader(t, db, pid, pid) + requireEntryLeader(t, db, pid, pid, EntryUnknown) +} + +// Entry evaluation code should overwrite an old EntryLeaderPID and +// EntryLeaderEntryMetaType +func TestSingleProcessOverwriteOldEntryLeader(t *testing.T) { + reader := procfs.NewMockReader() + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + ssmPID := uint32(999) + bashPID := uint32(1000) + ssmPath := "/usr/bin/ssm-session-worker" + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: ssmPath, + PIDs: types.PIDInfo{ + Tgid: ssmPID, + Sid: ssmPID, + Ppid: 1, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: bashPath, + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: ssmPID, + Ppid: ssmPID, + }, + CTTY: types.TTYDev{ + Major: 136, + Minor: 62, + }, + }) + + // bash is not a session leader so it shouldn't be an entry leader. Its + // entry leader should be ssm, which is an init entry leader + requireProcess(t, db, bashPID, bashPath) + requireParent(t, db, bashPID, ssmPID) + requireSessionLeader(t, db, bashPID, ssmPID) + requireEntryLeader(t, db, bashPID, ssmPID, Init) + + // skiping setsid event and assuming the pids will be updated in this exec + db.InsertExec(types.ProcessExecEvent{ + Filename: bashPath, + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: bashPID, + Ppid: ssmPID, + }, + CTTY: types.TTYDev{ + Major: 136, + Minor: 62, + }, + }) + + requireProcess(t, db, bashPID, bashPath) + requireParent(t, db, bashPID, ssmPID) + requireSessionLeader(t, db, bashPID, bashPID) + requireEntryLeader(t, db, bashPID, bashPID, Ssm) +} + +// / (pid, sid, entry meta, entry leader) +// +// systemd (1, 1, none, none) +// +// \___ sshd (100, 100, "init", 100) +// \___ bash (1000, 1000, "sshd", 1000) +// \___ ls (1001, 1000, "sshd", 1000) +// +// This is unrealistic, sshd usually forks a bunch of sshd children before +// exec'ing bash (see subsequent tests) but is theoretically possible and +// thus something we should handle. +func TestInitSshdBashLs(t *testing.T) { + reader := procfs.NewMockReader() + populateProcfsWithInit(reader) + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + sshdPID := uint32(100) + bashPID := uint32(1000) + lsPID := uint32(1001) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshdPID, + Sid: sshdPID, + Ppid: 1, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: bashPath, + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: bashPID, + Ppid: sshdPID, + Pgid: bashPID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: lsPath, + PIDs: types.PIDInfo{ + Tgid: lsPID, + Sid: bashPID, + Ppid: bashPID, + Pgid: lsPID, + }, + }) + + // systemd + systemd, err := db.GetProcess(1) + require.Nil(t, err) + requireParentUnset(t, systemd) + requireEntryLeaderUnset(t, systemd) + + requireProcess(t, db, 1, systemdPath) + requireSessionLeader(t, db, 1, 1) + + // sshd + requireProcess(t, db, sshdPID, sshdPath) + requireParent(t, db, sshdPID, 1) + requireSessionLeader(t, db, sshdPID, sshdPID) + requireEntryLeader(t, db, sshdPID, sshdPID, Init) + + // bash + requireProcess(t, db, bashPID, bashPath) + requireParent(t, db, bashPID, sshdPID) + requireSessionLeader(t, db, bashPID, bashPID) + requireEntryLeader(t, db, bashPID, bashPID, Sshd) + requireGroupLeader(t, db, bashPID, bashPID) + + // ls + requireProcess(t, db, lsPID, lsPath) + requireParent(t, db, lsPID, bashPID) + requireSessionLeader(t, db, lsPID, bashPID) + requireEntryLeader(t, db, lsPID, bashPID, Sshd) + requireGroupLeader(t, db, lsPID, lsPID) +} + +// / (pid, sid, entry meta, entry leader) +// +// systemd (1, 1, none, none) +// +// \___ sshd (100, 100, "init", 100) +// \___ sshd (101, 101, "init", 100) +// \___ bash (1000, 1000, "sshd", 1000) +// \___ ls (1001, 1000, "sshd", 1000) +// +// sshd will usually fork a bunch of sshd children before invoking a shell +// usually 2 if it's a root shell, or 3 if it's a non-root shell. All +// "intermediate" sshd's should have entry meta "init" and an entry leader +// pid of the topmost sshd. +func TestInitSshdSshdBashLs(t *testing.T) { + reader := procfs.NewMockReader() + populateProcfsWithInit(reader) + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + sshd0PID := uint32(100) + sshd1PID := uint32(101) + bashPID := uint32(1000) + lsPID := uint32(1001) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshd0PID, + Sid: sshd0PID, + Ppid: 1, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshd1PID, + Sid: sshd1PID, + Ppid: sshd0PID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: bashPath, + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: bashPID, + Ppid: sshd1PID, + Pgid: bashPID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: lsPath, + PIDs: types.PIDInfo{ + Tgid: lsPID, + Sid: bashPID, + Ppid: bashPID, + Pgid: lsPID, + }, + }) + + // systemd + systemd, err := db.GetProcess(1) + require.Nil(t, err) + requireParentUnset(t, systemd) + requireEntryLeaderUnset(t, systemd) + + requireProcess(t, db, 1, systemdPath) + requireSessionLeader(t, db, 1, 1) + + // sshd0 + requireProcess(t, db, sshd0PID, sshdPath) + requireParent(t, db, sshd0PID, 1) + requireSessionLeader(t, db, sshd0PID, sshd0PID) + requireEntryLeader(t, db, sshd0PID, sshd0PID, Init) + + // sshd1 + requireProcess(t, db, sshd1PID, sshdPath) + requireParent(t, db, sshd1PID, sshd0PID) + requireSessionLeader(t, db, sshd1PID, sshd1PID) + requireEntryLeader(t, db, sshd1PID, sshd0PID, Init) + + // bash + requireProcess(t, db, bashPID, bashPath) + requireParent(t, db, bashPID, sshd1PID) + requireSessionLeader(t, db, bashPID, bashPID) + requireEntryLeader(t, db, bashPID, bashPID, Sshd) + + // ls + requireProcess(t, db, lsPID, lsPath) + requireParent(t, db, lsPID, bashPID) + requireSessionLeader(t, db, lsPID, bashPID) + requireEntryLeader(t, db, lsPID, bashPID, Sshd) +} + +// / (pid, sid, entry meta, entry leader) +// systemd (1, 1, none, none) +// +// \___ sshd (100, 100, "init", 100) +// \___ sshd (101, 101, "init", 100) +// \___ sshd (102, 101, "init", 100) +// \___ bash (1000, 1000, "sshd", 1000) +// \___ ls (1001, 1000, "sshd", 1000) +func TestInitSshdSshdSshdBashLs(t *testing.T) { + reader := procfs.NewMockReader() + populateProcfsWithInit(reader) + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + sshd0PID := uint32(100) + sshd1PID := uint32(101) + sshd2PID := uint32(102) + bashPID := uint32(1000) + lsPID := uint32(1001) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshd0PID, + Sid: sshd0PID, + Ppid: 1, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshd1PID, + Sid: sshd1PID, + Ppid: sshd0PID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshd2PID, + Sid: sshd1PID, + Ppid: sshd1PID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: bashPath, + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: bashPID, + Ppid: sshd2PID, + Pgid: bashPID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: lsPath, + PIDs: types.PIDInfo{ + Tgid: lsPID, + Sid: bashPID, + Ppid: bashPID, + Pgid: lsPID, + }, + }) + + // systemd + systemd, err := db.GetProcess(1) + require.Nil(t, err) + requireParentUnset(t, systemd) + requireEntryLeaderUnset(t, systemd) + + requireProcess(t, db, 1, systemdPath) + requireSessionLeader(t, db, 1, 1) + + // sshd0 + requireProcess(t, db, sshd0PID, sshdPath) + requireParent(t, db, sshd0PID, 1) + requireSessionLeader(t, db, sshd0PID, sshd0PID) + requireEntryLeader(t, db, sshd0PID, sshd0PID, Init) + + // sshd1 + requireProcess(t, db, sshd1PID, sshdPath) + requireParent(t, db, sshd1PID, sshd0PID) + requireSessionLeader(t, db, sshd1PID, sshd1PID) + requireEntryLeader(t, db, sshd1PID, sshd0PID, Init) + + // sshd2 + requireProcess(t, db, sshd2PID, sshdPath) + requireParent(t, db, sshd2PID, sshd1PID) + requireSessionLeader(t, db, sshd2PID, sshd1PID) + requireEntryLeader(t, db, sshd2PID, sshd0PID, Init) + + // bash + requireProcess(t, db, bashPID, bashPath) + requireParent(t, db, bashPID, sshd2PID) + requireSessionLeader(t, db, bashPID, bashPID) + requireEntryLeader(t, db, bashPID, bashPID, Sshd) + + // ls + requireProcess(t, db, lsPID, lsPath) + requireParent(t, db, lsPID, bashPID) + requireSessionLeader(t, db, lsPID, bashPID) + requireEntryLeader(t, db, lsPID, bashPID, Sshd) +} + +// / (pid, sid, entry meta, entry leader) +// +// systemd +// +// \___ containerd (100, 100, "init", 100) +// \___ containerd-shim-runc-v2 (1000, 100, "init", 100) +// +// containerd-shim-runc-v2 will reparent itself to init just prior to +// executing the containerized process. +func TestInitContainerdContainerdShim(t *testing.T) { + reader := procfs.NewMockReader() + populateProcfsWithInit(reader) + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + containerdPID := uint32(100) + containerdShimPID := uint32(1000) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: containerdPath, + PIDs: types.PIDInfo{ + Tgid: containerdPID, + Sid: containerdPID, + Ppid: 1, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: containerdShimPath, + PIDs: types.PIDInfo{ + Tgid: containerdShimPID, + Sid: containerdPID, + Ppid: containerdPID, + }, + }) + + // containerd + requireProcess(t, db, containerdPID, containerdPath) + requireParent(t, db, containerdPID, 1) + requireSessionLeader(t, db, containerdPID, containerdPID) + requireEntryLeader(t, db, containerdPID, containerdPID, Init) + + // containerd-shim-runc-v2 + requireProcess(t, db, containerdShimPID, containerdShimPath) + requireParent(t, db, containerdShimPID, containerdPID) + requireSessionLeader(t, db, containerdShimPID, containerdPID) + requireEntryLeader(t, db, containerdShimPID, containerdPID, Init) +} + +// / (pid, sid, entry meta, entry leader) +// +// systemd +// +// \___ containerd (100, 100, "init", 100) +// | +// \___ containerd-shim-runc-v2 (1000, 100, "init", 100) +// \___ bash (1001, 1001, "container", 1000) +// +// Note that containerd originally forks and exec's +// containerd-shim-runc-v2, which then forks such that it is reparented to +// init. +func TestInitContainerdShimBashContainerdShimIsReparentedToInit(t *testing.T) { + reader := procfs.NewMockReader() + populateProcfsWithInit(reader) + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + containerdPID := uint32(100) + containerdShimPID := uint32(1000) + bashPID := uint32(1001) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: containerdPath, + PIDs: types.PIDInfo{ + Tgid: containerdPID, + Sid: containerdPID, + Ppid: 1, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: containerdShimPath, + PIDs: types.PIDInfo{ + Tgid: containerdShimPID, + Sid: containerdPID, + Ppid: 1, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: bashPath, + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: bashPID, + Ppid: containerdShimPID, + }, + }) + + // containerd + requireProcess(t, db, containerdPID, containerdPath) + requireParent(t, db, containerdPID, 1) + requireSessionLeader(t, db, containerdPID, containerdPID) + requireEntryLeader(t, db, containerdPID, containerdPID, Init) + + // containerd-shim-runc-v2 + requireProcess(t, db, containerdShimPID, containerdShimPath) + requireParent(t, db, containerdShimPID, 1) + requireSessionLeader(t, db, containerdShimPID, containerdPID) + requireEntryLeader(t, db, containerdShimPID, containerdPID, Init) + + // bash + requireProcess(t, db, bashPID, bashPath) + requireParent(t, db, bashPID, containerdShimPID) + requireSessionLeader(t, db, bashPID, bashPID) + requireEntryLeader(t, db, bashPID, bashPID, Container) +} + +// / (pid, sid, entry meta, entry leader) +// +// systemd +// +// \___ containerd (100, 100, "init", 100) +// | +// \___ containerd-shim-runc-v2 (1000, 100, "init", 100) +// \___ pause (1001, 1001, "container", 1001) +// +// The pause binary is a Kubernetes internal binary that is exec'd in a +// container by the container runtime. It is responsible for holding +// open the pod sandbox while other containers start and stop +func TestInitContainerdShimPauseContainerdShimIsReparentedToInit(t *testing.T) { + reader := procfs.NewMockReader() + populateProcfsWithInit(reader) + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + containerdPID := uint32(100) + containerdShimPID := uint32(1000) + pausePID := uint32(1001) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: containerdPath, + PIDs: types.PIDInfo{ + Tgid: containerdPID, + Sid: containerdPID, + Ppid: 1, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: containerdShimPath, + PIDs: types.PIDInfo{ + Tgid: containerdShimPID, + Sid: containerdPID, + Ppid: 1, + }, + }) + + pausePath := "/usr/bin/pause" + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: pausePath, + PIDs: types.PIDInfo{ + Tgid: pausePID, + Sid: pausePID, + Ppid: containerdShimPID, + }, + }) + + // containerd + requireProcess(t, db, containerdPID, containerdPath) + requireParent(t, db, containerdPID, 1) + requireSessionLeader(t, db, containerdPID, containerdPID) + requireEntryLeader(t, db, containerdPID, containerdPID, Init) + + // containerd-shim-runc-v2 + requireProcess(t, db, containerdShimPID, containerdShimPath) + requireParent(t, db, containerdShimPID, 1) + requireSessionLeader(t, db, containerdShimPID, containerdPID) + requireEntryLeader(t, db, containerdShimPID, containerdPID, Init) + + // pause + requireProcess(t, db, pausePID, pausePath) + requireParent(t, db, pausePID, containerdShimPID) + requireSessionLeader(t, db, pausePID, pausePID) + requireEntryLeader(t, db, pausePID, pausePID, Container) +} + +// / (pid, sid, entry meta, entry leader) +// +// systemd (1, 1, none, none) +// +// \___ sshd (100, 100, "init", 100) +// \___ bash (1000, 1000, "sshd", 1000) +// \___ ls (1001, 1000, "sshd", 1000) +// | +// \___ grep (1002, 1000, "sshd", 1000) /* ppid/sid data is missing */ +// +// Grep does not have ppid or sid set, only pgid. Entry evaluation code +// should fallback to grabbing entry leader data from ls, the process group +// leader. +func TestInitSshdBashLsAndGrepGrepOnlyHasGroupLeader(t *testing.T) { + reader := procfs.NewMockReader() + populateProcfsWithInit(reader) + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + sshdPID := uint32(100) + bashPID := uint32(1000) + lsPID := uint32(1001) + grepPID := uint32(1002) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshdPID, + Sid: sshdPID, + Ppid: 1, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: bashPath, + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: bashPID, + Ppid: sshdPID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: lsPath, + PIDs: types.PIDInfo{ + Tgid: lsPID, + Sid: bashPID, + Ppid: bashPID, + Pgid: lsPID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: grepPath, + PIDs: types.PIDInfo{ + Tgid: grepPID, + Pgid: lsPID, + }, + }) + + // sshd + requireProcess(t, db, sshdPID, sshdPath) + requireParent(t, db, sshdPID, 1) + requireSessionLeader(t, db, sshdPID, sshdPID) + requireEntryLeader(t, db, sshdPID, sshdPID, Init) + + // bash + requireProcess(t, db, bashPID, bashPath) + requireParent(t, db, bashPID, sshdPID) + requireSessionLeader(t, db, bashPID, bashPID) + requireEntryLeader(t, db, bashPID, bashPID, Sshd) + + // ls + requireProcess(t, db, lsPID, lsPath) + requireParent(t, db, lsPID, bashPID) + requireSessionLeader(t, db, lsPID, bashPID) + requireEntryLeader(t, db, lsPID, bashPID, Sshd) + + // grep + grep, err := db.GetProcess(grepPID) + require.Nil(t, err) + requireParentUnset(t, grep) + + requireProcess(t, db, grepPID, grepPath) + requireEntryLeader(t, db, grepPID, bashPID, Sshd) +} + +// / (pid, sid, entry meta, entry leader) +// +// systemd (1, 1, none, none) +// +// \___ sshd (100, 100, "init", 100) +// \___ bash (1000, 1000, "sshd", 1000) +// \___ ls (1001, 1000, "sshd", 1000) +// | +// \___ grep (1002, 1000, "sshd", 1000) /* ppid/pgid data is missing */ +// +// Grep does not have ppid or pgid set, ppid. Entry evaluation code should +// fallback to grabbing entry leader data from sshd, the session leader. +func TestInitSshdBashLsAndGrepGrepOnlyHasSessionLeader(t *testing.T) { + reader := procfs.NewMockReader() + populateProcfsWithInit(reader) + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + sshdPID := uint32(100) + bashPID := uint32(1000) + lsPID := uint32(1001) + grepPID := uint32(1002) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshdPID, + Sid: sshdPID, + Ppid: 1, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: bashPath, + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: bashPID, + Ppid: sshdPID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: lsPath, + PIDs: types.PIDInfo{ + Tgid: lsPID, + Sid: bashPID, + Ppid: bashPID, + Pgid: lsPID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: grepPath, + PIDs: types.PIDInfo{ + Tgid: grepPID, + Sid: bashPID, + }, + }) + + // sshd + requireProcess(t, db, sshdPID, sshdPath) + requireParent(t, db, sshdPID, 1) + requireSessionLeader(t, db, sshdPID, sshdPID) + requireEntryLeader(t, db, sshdPID, sshdPID, Init) + + // bash + requireProcess(t, db, bashPID, bashPath) + requireParent(t, db, bashPID, sshdPID) + requireSessionLeader(t, db, bashPID, bashPID) + requireEntryLeader(t, db, bashPID, bashPID, Sshd) + + // ls + requireProcess(t, db, lsPID, lsPath) + requireParent(t, db, lsPID, bashPID) + requireSessionLeader(t, db, lsPID, bashPID) + requireEntryLeader(t, db, lsPID, bashPID, Sshd) + + // grep + grep, err := db.GetProcess(grepPID) + require.Nil(t, err) + requireParentUnset(t, grep) + + requireProcess(t, db, grepPID, grepPath) + requireSessionLeader(t, db, grepPID, bashPID) + requireEntryLeader(t, db, grepPID, bashPID, Sshd) +} + +// / (pid, sid, entry meta, entry leader) +// +// grep (1001, 1000, "unknown", 1001) +// +// No parent, session leader, or process group leader exists to draw +// on to get an entry leader for grep, fallback to assigning it an +// entry meta type of "unknown" and making it an entry leader. +func TestGrepInIsolation(t *testing.T) { + reader := procfs.NewMockReader() + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + grepPID := uint32(1001) + + db.InsertExec(types.ProcessExecEvent{ + Filename: grepPath, + PIDs: types.PIDInfo{ + Tgid: grepPID, + Ppid: 1000, + Sid: grepPID, + }, + }) + + process, err := db.GetProcess(grepPID) + require.Nil(t, err) + requireParentUnset(t, process) + + requireProcess(t, db, grepPID, grepPath) + requireSessionLeader(t, db, grepPID, grepPID) + requireEntryLeader(t, db, grepPID, grepPID, EntryUnknown) +} + +// / (pid, sid, entry meta, entry leader) +// +// kthreadd (2, 0, , ) +// +// \___ rcu_gp (3, 0, , ) +// +// Kernel threads should never have an entry meta type or entry leader set. +func TestKernelThreads(t *testing.T) { + reader := procfs.NewMockReader() + db, err := NewDB(reader, *logger) + require.Nil(t, err) + + kthreaddPID := uint32(2) + rcuGpPID := uint32(3) + + kthreaddPath := "kthreadd" + rcuGpPath := "rcu_gp" + + db.InsertExec(types.ProcessExecEvent{ + Filename: kthreaddPath, + PIDs: types.PIDInfo{ + Tgid: kthreaddPID, + Ppid: 1, + Sid: 0, + }, + }) + + db.InsertExec(types.ProcessExecEvent{ + Filename: rcuGpPath, + PIDs: types.PIDInfo{ + Tgid: rcuGpPID, + Ppid: kthreaddPID, + Sid: 0, + }, + }) + + // kthreadd + kthreadd, err := db.GetProcess(kthreaddPID) + require.Nil(t, err) + requireParentUnset(t, kthreadd) + requireSessionLeaderUnset(t, kthreadd) + requireEntryLeaderUnset(t, kthreadd) + + requireProcess(t, db, kthreaddPID, kthreaddPath) + + // rcu_gp + rcuGp, err := db.GetProcess(rcuGpPID) + require.Nil(t, err) + requireSessionLeaderUnset(t, rcuGp) + requireEntryLeaderUnset(t, rcuGp) + + requireProcess(t, db, rcuGpPID, rcuGpPath) + requireParent(t, db, rcuGpPID, kthreaddPID) +} diff --git a/x-pack/auditbeat/processors/sessionmd/procfs/mock.go b/x-pack/auditbeat/processors/sessionmd/procfs/mock.go new file mode 100644 index 000000000000..1689873044ec --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/procfs/mock.go @@ -0,0 +1,42 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build linux + +package procfs + +import ( + "fmt" +) + +type MockReader struct { + entries map[uint32]ProcessInfo +} + +func NewMockReader() *MockReader { + return &MockReader{ + entries: make(map[uint32]ProcessInfo), + } +} + +func (r *MockReader) AddEntry(pid uint32, entry ProcessInfo) { + r.entries[pid] = entry +} + +func (r *MockReader) GetProcess(pid uint32) (ProcessInfo, error) { + entry, ok := r.entries[pid] + if !ok { + return ProcessInfo{}, fmt.Errorf("not found") + } + return entry, nil +} + +func (r *MockReader) GetAllProcesses() ([]ProcessInfo, error) { + ret := make([]ProcessInfo, 0, len(r.entries)) + + for _, entry := range r.entries { + ret = append(ret, entry) + } + return ret, nil +} diff --git a/x-pack/auditbeat/processors/sessionmd/procfs/procfs.go b/x-pack/auditbeat/processors/sessionmd/procfs/procfs.go new file mode 100644 index 000000000000..f3a7d41d1d71 --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/procfs/procfs.go @@ -0,0 +1,252 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build linux + +package procfs + +import ( + "fmt" + "strconv" + "strings" + + "github.com/prometheus/procfs" + "golang.org/x/sys/unix" + + "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/timeutils" + "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/types" + "github.com/elastic/elastic-agent-libs/logp" +) + +func MajorTTY(ttyNr uint32) uint16 { + return uint16((ttyNr >> 8) & 0xf) +} + +func MinorTTY(ttyNr uint32) uint16 { + return uint16(((ttyNr & 0xfff00000) >> 20) | (ttyNr & 0xff)) +} + +// this interface exists so that we can inject a mock procfs reader for deterministic testing +type Reader interface { + GetProcess(pid uint32) (ProcessInfo, error) + GetAllProcesses() ([]ProcessInfo, error) +} + +type ProcfsReader struct { + logger logp.Logger +} + +func NewProcfsReader(logger logp.Logger) ProcfsReader { + return ProcfsReader{ + logger: logger, + } +} + +type Stat procfs.ProcStat + +type ProcessInfo struct { + PIDs types.PIDInfo + Creds types.CredInfo + CTTY types.TTYDev + Argv []string + Cwd string + Env map[string]string + Filename string + CGroupPath string +} + +func credsFromProc(proc procfs.Proc) (types.CredInfo, error) { + status, err := proc.NewStatus() + if err != nil { + return types.CredInfo{}, err + } + + ruid, err := strconv.Atoi(status.UIDs[0]) + if err != nil { + return types.CredInfo{}, err + } + + euid, err := strconv.Atoi(status.UIDs[1]) + if err != nil { + return types.CredInfo{}, err + } + + suid, err := strconv.Atoi(status.UIDs[2]) + if err != nil { + return types.CredInfo{}, err + } + + rgid, err := strconv.Atoi(status.GIDs[0]) + if err != nil { + return types.CredInfo{}, err + } + + egid, err := strconv.Atoi(status.GIDs[1]) + if err != nil { + return types.CredInfo{}, err + } + + sgid, err := strconv.Atoi(status.GIDs[2]) + if err != nil { + return types.CredInfo{}, err + } + + // procfs library doesn't grab CapEff or CapPrm, make the direct syscall + hdr := unix.CapUserHeader{ + Version: unix.LINUX_CAPABILITY_VERSION_3, + Pid: int32(proc.PID), + } + var data [2]unix.CapUserData + err = unix.Capget(&hdr, &data[0]) + if err != nil { + return types.CredInfo{}, err + } + permitted := uint64(data[1].Permitted) << 32 + permitted += uint64(data[0].Permitted) + effective := uint64(data[1].Effective) << 32 + effective += uint64(data[0].Effective) + + return types.CredInfo{ + Ruid: uint32(ruid), + Euid: uint32(euid), + Suid: uint32(suid), + Rgid: uint32(rgid), + Egid: uint32(egid), + Sgid: uint32(sgid), + CapPermitted: permitted, + CapEffective: effective, + }, nil +} + +func (r ProcfsReader) getProcessInfo(proc procfs.Proc) (ProcessInfo, error) { + pid := uint32(proc.PID) + // All other info can be best effort, but failing to get pid info and + // start time is needed to register the process in the database + stat, err := proc.Stat() + if err != nil { + return ProcessInfo{}, fmt.Errorf("failed to read /proc/%d/stat: %w", pid, err) + } + + argv, err := proc.CmdLine() + if err != nil { + argv = []string{} + } + + exe, err := proc.Executable() + if err != nil { + if len(argv) > 0 { + r.logger.Debugf("pid %d: got executable from cmdline: %s", pid, argv[0]) + exe = argv[0] + } else { + r.logger.Debugf("pid %d: failed to get executable path: %v", pid, err) + exe = "" + } + } + + environ, err := r.getEnviron(pid) + if err != nil { + environ = nil + } + + cwd, err := proc.Cwd() + if err != nil { + cwd = "" + } + + creds, err := credsFromProc(proc) + if err != nil { + creds = types.CredInfo{} + } + + cGroupPath := "" + cgroups, err := proc.Cgroups() + if err == nil { + out: + // Find the cgroup path from the PID controller. + // NOTE: This does not support the unified hierarchy from cgroup v2, as bpf also does not currently support it. + // When support is added for unified hierarchies, it should be added in bpf and userspace at the same time. + // (Currently all supported cgroup v2 systems (GKE) are working as they send backwards compatible v1 hierarchies as well) + for _, cgroup := range cgroups { + for _, controller := range cgroup.Controllers { + if controller == "pids" { + cGroupPath = cgroup.Path + break out + } + } + } + } + + startTimeNs := timeutils.TicksToNs(stat.Starttime) + return ProcessInfo{ + PIDs: types.PIDInfo{ + StartTimeNS: startTimeNs, + Tid: pid, + Tgid: pid, + Ppid: uint32(stat.PPID), + Pgid: uint32(stat.PGRP), + Sid: uint32(stat.Session), + }, + Creds: creds, + CTTY: types.TTYDev{ + Major: MajorTTY(uint32(stat.TTY)), + Minor: MinorTTY(uint32(stat.TTY)), + }, + Cwd: cwd, + Argv: argv, + Env: environ, + Filename: exe, + CGroupPath: cGroupPath, + }, nil +} + +func (r ProcfsReader) GetProcess(pid uint32) (ProcessInfo, error) { + proc, err := procfs.NewProc(int(pid)) + if err != nil { + return ProcessInfo{}, err + } + return r.getProcessInfo(proc) +} + +// returns empty slice on error +func (r ProcfsReader) GetAllProcesses() ([]ProcessInfo, error) { + procs, err := procfs.AllProcs() + if err != nil { + return nil, err + } + + ret := make([]ProcessInfo, 0) + for _, proc := range procs { + process_info, err := r.getProcessInfo(proc) + if err != nil { + r.logger.Warnf("failed to read process info for %v", proc.PID) + } + ret = append(ret, process_info) + } + + return ret, nil +} + +func (r ProcfsReader) getEnviron(pid uint32) (map[string]string, error) { + proc, err := procfs.NewProc(int(pid)) + if err != nil { + return nil, err + } + + flatEnviron, err := proc.Environ() + if err != nil { + return nil, err + } + + ret := make(map[string]string) + for _, entry := range flatEnviron { + index := strings.Index(entry, "=") + if index == -1 { + continue + } + + ret[entry[0:index]] = entry[index:] + } + + return ret, nil +} diff --git a/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go b/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go new file mode 100644 index 000000000000..2b9b540e037c --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go @@ -0,0 +1,157 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build linux + +package ebpf_provider + +import ( + "context" + "fmt" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/ebpf" + "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/processdb" + "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/provider" + "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/types" + "github.com/elastic/ebpfevents" + "github.com/elastic/elastic-agent-libs/logp" +) + +const ( + name = "add_session_metadata" + eventMask = ebpf.EventMask(ebpfevents.EventTypeProcessFork | ebpfevents.EventTypeProcessExec | ebpfevents.EventTypeProcessExit) +) + +type prvdr struct { + ctx context.Context + logger *logp.Logger + db *processdb.DB +} + +func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB) (provider.Provider, error) { + p := prvdr{ + ctx: ctx, + logger: logger, + db: db, + } + + w, err := ebpf.GetWatcher() + if err != nil { + return nil, fmt.Errorf("get ebpf watcher: %w", err) + } + + records := w.Subscribe(name, eventMask) + + go func(logger logp.Logger) { + for { + r := <-records + if r.Error != nil { + logger.Warnw("received error from the ebpf subscription", "error", err) + continue + } + if r.Event == nil { + continue + } + ev := r.Event + switch ev.Type { + case ebpfevents.EventTypeProcessFork: + body, ok := ev.Body.(*ebpfevents.ProcessFork) + if !ok { + logger.Errorf("unexpected event body, got %T", ev.Body) + continue + } + pe := types.ProcessForkEvent{ + ParentPIDs: types.PIDInfo{ + Tid: body.ParentPids.Tid, + Tgid: body.ParentPids.Tgid, + Ppid: body.ParentPids.Ppid, + Pgid: body.ParentPids.Pgid, + Sid: body.ParentPids.Sid, + StartTimeNS: body.ParentPids.StartTimeNs, + }, + ChildPIDs: types.PIDInfo{ + Tid: body.ChildPids.Tid, + Tgid: body.ChildPids.Tgid, + Ppid: body.ChildPids.Ppid, + Pgid: body.ChildPids.Pgid, + Sid: body.ChildPids.Sid, + StartTimeNS: body.ChildPids.StartTimeNs, + }, + Creds: types.CredInfo{ + Ruid: body.Creds.Ruid, + Rgid: body.Creds.Rgid, + Euid: body.Creds.Euid, + Egid: body.Creds.Egid, + Suid: body.Creds.Suid, + Sgid: body.Creds.Sgid, + CapPermitted: body.Creds.CapPermitted, + CapEffective: body.Creds.CapEffective, + }, + } + p.db.InsertFork(pe) + case ebpfevents.EventTypeProcessExec: + body, ok := ev.Body.(*ebpfevents.ProcessExec) + if !ok { + logger.Errorf("unexpected event body") + continue + } + pe := types.ProcessExecEvent{ + PIDs: types.PIDInfo{ + Tid: body.Pids.Tid, + Tgid: body.Pids.Tgid, + Ppid: body.Pids.Ppid, + Pgid: body.Pids.Pgid, + Sid: body.Pids.Sid, + StartTimeNS: body.Pids.StartTimeNs, + }, + Creds: types.CredInfo{ + Ruid: body.Creds.Ruid, + Rgid: body.Creds.Rgid, + Euid: body.Creds.Euid, + Egid: body.Creds.Egid, + Suid: body.Creds.Suid, + Sgid: body.Creds.Sgid, + CapPermitted: body.Creds.CapPermitted, + CapEffective: body.Creds.CapEffective, + }, + CTTY: types.TTYDev{ + Major: body.CTTY.Major, + Minor: body.CTTY.Minor, + }, + CWD: body.Cwd, + Argv: body.Argv, + Env: body.Env, + Filename: body.Filename, + } + p.db.InsertExec(pe) + case ebpfevents.EventTypeProcessExit: + body, ok := ev.Body.(*ebpfevents.ProcessExit) + if !ok { + logger.Errorf("unexpected event body") + continue + } + pe := types.ProcessExitEvent{ + PIDs: types.PIDInfo{ + Tid: body.Pids.Tid, + Tgid: body.Pids.Tgid, + Ppid: body.Pids.Ppid, + Pgid: body.Pids.Pgid, + Sid: body.Pids.Sid, + StartTimeNS: body.Pids.StartTimeNs, + }, + ExitCode: body.ExitCode, + } + p.db.InsertExit(pe) + } + } + }(*p.logger) + + return &p, nil +} + +func (s prvdr) UpdateDB(ev *beat.Event) error { + // no-op for ebpf, DB is updated from pushed ebpf events + return nil +} diff --git a/x-pack/auditbeat/processors/sessionmd/provider/provider.go b/x-pack/auditbeat/processors/sessionmd/provider/provider.go new file mode 100644 index 000000000000..e3fa1547806c --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/provider/provider.go @@ -0,0 +1,15 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build linux + +package provider + +import ( + "github.com/elastic/beats/v7/libbeat/beat" +) + +type Provider interface { + UpdateDB(*beat.Event) error +} diff --git a/x-pack/auditbeat/processors/sessionmd/timeutils/time.go b/x-pack/auditbeat/processors/sessionmd/timeutils/time.go new file mode 100644 index 000000000000..5c8dd7450df0 --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/timeutils/time.go @@ -0,0 +1,77 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build linux + +package timeutils + +import ( + "fmt" + "sync" + "time" + + "github.com/prometheus/procfs" + "github.com/tklauser/go-sysconf" +) + +var ( + getBootTimeOnce = sync.OnceValues(getBootTime) + getTicksPerSecondOnce = sync.OnceValues(getTicksPerSecond) +) + +func getBootTime() (time.Time, error) { + fs, err := procfs.NewDefaultFS() + if err != nil { + return time.Time{}, fmt.Errorf("could not get procfs: %w", err) + } + + stat, err := fs.Stat() + if err != nil { + return time.Time{}, fmt.Errorf("could not read /proc/stat: %w", err) + } + return time.Unix(int64(stat.BootTime), 0), nil +} + +func getTicksPerSecond() (uint64, error) { + tps, err := sysconf.Sysconf(sysconf.SC_CLK_TCK) + if err != nil { + return 0, fmt.Errorf("sysconf(SC_CLK_TCK) failed: %w", err) + } + return uint64(tps), nil +} + +func TicksToNs(ticks uint64) uint64 { + ticksPerSecond, err := getTicksPerSecondOnce() + if err != nil { + return 0 + } + return ticks * uint64(time.Second.Nanoseconds()) / ticksPerSecond +} + +func TimeFromNsSinceBoot(t time.Duration) *time.Time { + bootTime, err := getBootTimeOnce() + if err != nil { + return nil + } + timestamp := bootTime.Add(t) + return ×tamp +} + +// When generating an `entity_id` in ECS we need to reduce the precision of a +// process's start time to that of procfs. Process start times can come from either +// BPF (high precision) or procfs (lower precision). We must reduce them all to the +// lowest common denominator such that entity ID's generated are always consistent. +// +// - Timestamps we get from the kernel are in nanosecond precision. +// - Timestamps we get from procfs are typically 1/100th second precision. We +// get this precision from `sysconf()` +// - We store timestamps as nanoseconds, but reduce the precision to 1/100th +// second +func ReduceTimestampPrecision(timeNs uint64) time.Duration { + ticksPerSecond, err := getTicksPerSecondOnce() + if err != nil { + return 0 + } + return time.Duration(timeNs).Truncate(time.Second / time.Duration(ticksPerSecond)) +} diff --git a/x-pack/auditbeat/processors/sessionmd/timeutils/time_test.go b/x-pack/auditbeat/processors/sessionmd/timeutils/time_test.go new file mode 100644 index 000000000000..1aa5abdf469a --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/timeutils/time_test.go @@ -0,0 +1,24 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build linux + +package timeutils + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestReduceTimestampPrecision(t *testing.T) { + oneSecond := time.Second.Nanoseconds() + result1 := ReduceTimestampPrecision(uint64(oneSecond)) + require.Equal(t, time.Duration(oneSecond), result1) + + oneSecondWithDelay := oneSecond + 10 + result2 := ReduceTimestampPrecision(uint64(oneSecondWithDelay)) + require.Equal(t, time.Duration(oneSecond), result2) +} diff --git a/x-pack/auditbeat/processors/sessionmd/types/events.go b/x-pack/auditbeat/processors/sessionmd/types/events.go new file mode 100644 index 000000000000..5f8d67d763f1 --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/types/events.go @@ -0,0 +1,94 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package types + +//go:generate stringer -linecomment=true -type=Type,HookPoint,Field -output=gen_types_string.go + +type Type uint64 + +const ( + ProcessFork Type = iota + ProcessExec + ProcessExit + ProcessSetsid +) + +type ( + Field uint32 +) + +const ( + CWD Field = iota + 1 + Argv + Env + Filename +) + +type PIDInfo struct { + StartTimeNS uint64 + Tid uint32 + Tgid uint32 + Vpid uint32 + Ppid uint32 + Pgid uint32 + Sid uint32 +} + +type CredInfo struct { + Ruid uint32 + Rgid uint32 + Euid uint32 + Egid uint32 + Suid uint32 + Sgid uint32 + CapPermitted uint64 + CapEffective uint64 +} + +type TTYWinsize struct { + Rows uint16 + Cols uint16 +} + +type TTYTermios struct { + CIflag uint32 + COflag uint32 + CLflag uint32 + CCflag uint32 +} + +type TTYDev struct { + Minor uint16 + Major uint16 + Winsize TTYWinsize + Termios TTYTermios +} + +type ProcessForkEvent struct { + ParentPIDs PIDInfo + ChildPIDs PIDInfo + Creds CredInfo +} + +type ProcessExecEvent struct { + PIDs PIDInfo + Creds CredInfo + CTTY TTYDev + + // varlen fields + CWD string + Argv []string + Env map[string]string + Filename string +} + +type ProcessExitEvent struct { + PIDs PIDInfo + ExitCode int32 +} + +type ProcessSetsidEvent struct { + PIDs PIDInfo +} diff --git a/x-pack/auditbeat/processors/sessionmd/types/process.go b/x-pack/auditbeat/processors/sessionmd/types/process.go new file mode 100644 index 000000000000..e5a07d099876 --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/types/process.go @@ -0,0 +1,456 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package types + +import ( + "time" + + "github.com/elastic/elastic-agent-libs/mapstr" +) + +// These fields contain information about a process. +// These fields can help you correlate metrics information with a process id/name from a log message. The `process.pid` often stays in the metric itself and is copied to the global field for correlation. +type Process struct { + // Unique identifier for the process. + // The implementation of this is specified by the data source, but some examples of what could be used here are a process-generated UUID, Sysmon Process GUIDs, or a hash of some uniquely identifying components of a process. + // Constructing a globally unique identifier is a common practice to mitigate PID reuse as well as to identify a specific process over time, across multiple monitored hosts. + EntityID string `json:"entity_id,omitempty"` + + // Absolute path to the process executable. + Executable string `json:"executable,omitempty"` + + // Process name. + // Sometimes called program name or similar. + Name string `json:"name,omitempty"` + + // The time the process started. + Start *time.Time `json:"start,omitempty"` + + // The time the process ended. + End *time.Time `json:"end,omitempty"` + + // The exit code of the process, if this is a termination event. + // The field should be absent if there is no exit code for the event (e.g. process start). + ExitCode *int64 `json:"exit_code,omitempty"` + + // Whether the process is connected to an interactive shell. + // Process interactivity is inferred from the processes file descriptors. If the character device for the controlling tty is the same as stdin and stderr for the process, the process is considered interactive. + // Note: A non-interactive process can belong to an interactive session and is simply one that does not have open file descriptors reading the controlling TTY on FD 0 (stdin) or writing to the controlling TTY on FD 2 (stderr). A backgrounded process is still considered interactive if stdin and stderr are connected to the controlling TTY. + Interactive *bool `json:"interactive,omitempty"` + + // The working directory of the process. + WorkingDirectory string `json:"working_directory,omitempty"` + + // The effective user (euid). + User struct { + // Unique identifier of the user. + ID string `json:"id,omitempty"` + + // Short name or login of the user. + Name string `json:"name,omitempty"` + } `json:"user,omitempty"` + + // The effective group (egid). + Group struct { + // Unique identifier for the group on the system/platform. + ID string `json:"id,omitempty"` + + // Name of the group. + Name string `json:"name,omitempty"` + } `json:"group,omitempty"` + + // Process id. + PID uint32 `json:"pid,omitempty"` + + Vpid uint32 `json:"vpid,omitempty"` + + // Array of process arguments, starting with the absolute path to the executable. + // May be filtered to protect sensitive information. + Args []string `json:"args,omitempty"` + + // An array of previous executions for the process, including the initial fork. Only executable and args are set. + Previous []struct { + // Absolute path to the process executable. + Executable string `json:"executable,omitempty"` + + // Array of process arguments, starting with the absolute path to the executable. + // May be filtered to protect sensitive information. + Args []string `json:"args,omitempty"` + } `json:"previous,omitempty"` + + Thread struct { + Capabilities struct { + Permitted []string `json:"permitted,omitempty"` + + Effective []string `json:"effective,omitempty"` + } `json:"capabilities,omitempty"` + } `json:"thread,omitempty"` + + // Information about the parent process. + Parent struct { + // Unique identifier for the process. + // The implementation of this is specified by the data source, but some examples of what could be used here are a process-generated UUID, Sysmon Process GUIDs, or a hash of some uniquely identifying components of a process. + // Constructing a globally unique identifier is a common practice to mitigate PID reuse as well as to identify a specific process over time, across multiple monitored hosts. + EntityID string `json:"entity_id,omitempty"` + + // Absolute path to the process executable. + Executable string `json:"executable,omitempty"` + + // Whether the process is connected to an interactive shell. + // Process interactivity is inferred from the processes file descriptors. If the character device for the controlling tty is the same as stdin and stderr for the process, the process is considered interactive. + // Note: A non-interactive process can belong to an interactive session and is simply one that does not have open file descriptors reading the controlling TTY on FD 0 (stdin) or writing to the controlling TTY on FD 2 (stderr). A backgrounded process is still considered interactive if stdin and stderr are connected to the controlling TTY. + Interactive *bool `json:"interactive,omitempty"` + + // Process name. + // Sometimes called program name or similar. + Name string `json:"name,omitempty"` + + // The time the process started. + Start *time.Time `json:"start,omitempty"` + + // The working directory of the process. + WorkingDirectory string `json:"working_directory,omitempty"` + + // The effective user (euid). + User struct { + // Unique identifier of the user. + ID string `json:"id,omitempty"` + + // Short name or login of the user. + Name string `json:"name,omitempty"` + } `json:"user,omitempty"` + + // The effective group (egid). + Group struct { + // Unique identifier for the group on the system/platform. + ID string `json:"id,omitempty"` + + // Name of the group. + Name string `json:"name,omitempty"` + } `json:"group,omitempty"` + + // Process id. + PID uint32 `json:"pid,omitempty"` + + // Array of process arguments, starting with the absolute path to the executable. + // May be filtered to protect sensitive information. + Args []string `json:"args,omitempty"` + + Thread struct { + Capabilities struct { + Permitted []string `json:"permitted,omitempty"` + + Effective []string `json:"effective,omitempty"` + } `json:"capabilities,omitempty"` + } `json:"thread,omitempty"` + } `json:"parent,omitempty"` + + // Information about the process group leader. In some cases this may be the same as the top level process. + GroupLeader struct { + // Unique identifier for the process. + // The implementation of this is specified by the data source, but some examples of what could be used here are a process-generated UUID, Sysmon Process GUIDs, or a hash of some uniquely identifying components of a process. + // Constructing a globally unique identifier is a common practice to mitigate PID reuse as well as to identify a specific process over time, across multiple monitored hosts. + EntityID string `json:"entity_id,omitempty"` + + // Absolute path to the process executable. + Executable string `json:"executable,omitempty"` + + // Whether the process is connected to an interactive shell. + // Process interactivity is inferred from the processes file descriptors. If the character device for the controlling tty is the same as stdin and stderr for the process, the process is considered interactive. + // Note: A non-interactive process can belong to an interactive session and is simply one that does not have open file descriptors reading the controlling TTY on FD 0 (stdin) or writing to the controlling TTY on FD 2 (stderr). A backgrounded process is still considered interactive if stdin and stderr are connected to the controlling TTY. + Interactive *bool `json:"interactive,omitempty"` + + // Process name. + // Sometimes called program name or similar. + Name string `json:"name,omitempty"` + + // The time the process started. + Start *time.Time `json:"start,omitempty"` + + // The working directory of the process. + WorkingDirectory string `json:"working_directory,omitempty"` + + // The effective user (euid). + User struct { + // Unique identifier of the user. + ID string `json:"id,omitempty"` + + // Short name or login of the user. + Name string `json:"name,omitempty"` + } `json:"user,omitempty"` + + // The effective group (egid). + Group struct { + // Unique identifier for the group on the system/platform. + ID string `json:"id,omitempty"` + + // Name of the group. + Name string `json:"name,omitempty"` + } `json:"group,omitempty"` + + // Process id. + PID uint32 `json:"pid,omitempty"` + + // Array of process arguments, starting with the absolute path to the executable. + // May be filtered to protect sensitive information. + Args []string `json:"args,omitempty"` + + // This boolean is used to identify if a leader process is the same as the top level process. + // For example, if `process.group_leader.same_as_process = true`, it means the process event in question is the leader of its process group. Details under `process.*` like `pid` would be the same under `process.group_leader.*` The same applies for both `process.session_leader` and `process.entry_leader`. + // This field exists to the benefit of EQL and other rule engines since it's not possible to compare equality between two fields in a single document. e.g `process.entity_id` = `process.group_leader.entity_id` (top level process is the process group leader) OR `process.entity_id` = `process.entry_leader.entity_id` (top level process is the entry session leader) + // Instead these rules could be written like: `process.group_leader.same_as_process: true` OR `process.entry_leader.same_as_process: true` + // Note: This field is only set on `process.entry_leader`, `process.session_leader` and `process.group_leader`. + SameAsProcess *bool `json:"same_as_process,omitempty"` + } `json:"group_leader,omitempty"` + + // Often the same as entry_leader. When it differs, it represents a session started within another session. e.g. using tmux + SessionLeader struct { + // Unique identifier for the process. + // The implementation of this is specified by the data source, but some examples of what could be used here are a process-generated UUID, Sysmon Process GUIDs, or a hash of some uniquely identifying components of a process. + // Constructing a globally unique identifier is a common practice to mitigate PID reuse as well as to identify a specific process over time, across multiple monitored hosts. + EntityID string `json:"entity_id,omitempty"` + + // Absolute path to the process executable. + Executable string `json:"executable,omitempty"` + + // Whether the process is connected to an interactive shell. + // Process interactivity is inferred from the processes file descriptors. If the character device for the controlling tty is the same as stdin and stderr for the process, the process is considered interactive. + // Note: A non-interactive process can belong to an interactive session and is simply one that does not have open file descriptors reading the controlling TTY on FD 0 (stdin) or writing to the controlling TTY on FD 2 (stderr). A backgrounded process is still considered interactive if stdin and stderr are connected to the controlling TTY. + Interactive *bool `json:"interactive,omitempty"` + + // Process name. + // Sometimes called program name or similar. + Name string `json:"name,omitempty"` + + // The time the process started. + Start *time.Time `json:"start,omitempty"` + + // The working directory of the process. + WorkingDirectory string `json:"working_directory,omitempty"` + + // The effective user (euid). + User struct { + // Unique identifier of the user. + ID string `json:"id,omitempty"` + + // Short name or login of the user. + Name string `json:"name,omitempty"` + } `json:"user,omitempty"` + + // The effective group (egid). + Group struct { + // Unique identifier for the group on the system/platform. + ID string `json:"id,omitempty"` + + // Name of the group. + Name string `json:"name,omitempty"` + } `json:"group,omitempty"` + + // Process id. + PID uint32 `json:"pid,omitempty"` + + // Array of process arguments, starting with the absolute path to the executable. + // May be filtered to protect sensitive information. + Args []string `json:"args,omitempty"` + + // This boolean is used to identify if a leader process is the same as the top level process. + // For example, if `process.group_leader.same_as_process = true`, it means the process event in question is the leader of its process group. Details under `process.*` like `pid` would be the same under `process.group_leader.*` The same applies for both `process.session_leader` and `process.entry_leader`. + // This field exists to the benefit of EQL and other rule engines since it's not possible to compare equality between two fields in a single document. e.g `process.entity_id` = `process.group_leader.entity_id` (top level process is the process group leader) OR `process.entity_id` = `process.entry_leader.entity_id` (top level process is the entry session leader) + // Instead these rules could be written like: `process.group_leader.same_as_process: true` OR `process.entry_leader.same_as_process: true` + // Note: This field is only set on `process.entry_leader`, `process.session_leader` and `process.group_leader`. + SameAsProcess *bool `json:"same_as_process,omitempty"` + } `json:"session_leader,omitempty"` + + // First process from terminal or remote access via SSH, SSM, etc OR a service directly started by the init process. + EntryLeader struct { + // Unique identifier for the process. + // The implementation of this is specified by the data source, but some examples of what could be used here are a process-generated UUID, Sysmon Process GUIDs, or a hash of some uniquely identifying components of a process. + // Constructing a globally unique identifier is a common practice to mitigate PID reuse as well as to identify a specific process over time, across multiple monitored hosts. + EntityID string `json:"entity_id,omitempty"` + + // Absolute path to the process executable. + Executable string `json:"executable,omitempty"` + + // Whether the process is connected to an interactive shell. + // Process interactivity is inferred from the processes file descriptors. If the character device for the controlling tty is the same as stdin and stderr for the process, the process is considered interactive. + // Note: A non-interactive process can belong to an interactive session and is simply one that does not have open file descriptors reading the controlling TTY on FD 0 (stdin) or writing to the controlling TTY on FD 2 (stderr). A backgrounded process is still considered interactive if stdin and stderr are connected to the controlling TTY. + Interactive *bool `json:"interactive,omitempty"` + + // Process name. + // Sometimes called program name or similar. + Name string `json:"name,omitempty"` + + // The time the process started. + Start *time.Time `json:"start,omitempty"` + + // The working directory of the process. + WorkingDirectory string `json:"working_directory,omitempty"` + + EntryMeta struct { + // The entry type for the entry session leader. Values include: init(e.g systemd), sshd, ssm, kubelet, teleport, terminal, console + // Note: This field is only set on process.session_leader. + Type string `json:"type,omitempty"` + } `json:"entry_meta,omitempty"` + + // The effective user (euid). + User struct { + // Unique identifier of the user. + ID string `json:"id,omitempty"` + + // Short name or login of the user. + Name string `json:"name,omitempty"` + } `json:"user,omitempty"` + + // The effective group (egid). + Group struct { + // Unique identifier for the group on the system/platform. + ID string `json:"id,omitempty"` + + // Name of the group. + Name string `json:"name,omitempty"` + } `json:"group,omitempty"` + + // Process id. + PID uint32 `json:"pid,omitempty"` + + // Array of process arguments, starting with the absolute path to the executable. + // May be filtered to protect sensitive information. + Args []string `json:"args,omitempty"` + + // This boolean is used to identify if a leader process is the same as the top level process. + // For example, if `process.group_leader.same_as_process = true`, it means the process event in question is the leader of its process group. Details under `process.*` like `pid` would be the same under `process.group_leader.*` The same applies for both `process.session_leader` and `process.entry_leader`. + // This field exists to the benefit of EQL and other rule engines since it's not possible to compare equality between two fields in a single document. e.g `process.entity_id` = `process.group_leader.entity_id` (top level process is the process group leader) OR `process.entity_id` = `process.entry_leader.entity_id` (top level process is the entry session leader) + // Instead these rules could be written like: `process.group_leader.same_as_process: true` OR `process.entry_leader.same_as_process: true` + // Note: This field is only set on `process.entry_leader`, `process.session_leader` and `process.group_leader`. + SameAsProcess *bool `json:"same_as_process,omitempty"` + } `json:"entry_leader,omitempty"` +} + +func (p *Process) ToMap() mapstr.M { + process := mapstr.M{ + "entity_id": p.EntityID, + "executable": p.Executable, + "name": p.Name, + "exit_code": p.ExitCode, + "interactive": p.Interactive, + "working_directory": p.WorkingDirectory, + "user": mapstr.M{ + "id": p.User.ID, + "name": p.User.Name, + }, + "group": mapstr.M{ + "id": p.Group.ID, + "name": p.Group.Name, + }, + "pid": p.PID, + "vpid": p.Vpid, + "args": p.Args, + "thread": mapstr.M{ + "capabilities": mapstr.M{ + "permitted": p.Thread.Capabilities.Permitted, + "effective": p.Thread.Capabilities.Effective, + }, + }, + "parent": mapstr.M{ + "entity_id": p.Parent.EntityID, + "executable": p.Parent.Executable, + "name": p.Parent.Name, + "interactive": p.Parent.Interactive, + "working_directory": p.Parent.WorkingDirectory, + "user": mapstr.M{ + "id": p.Parent.User.ID, + "name": p.Parent.User.Name, + }, + "group": mapstr.M{ + "id": p.Parent.Group.ID, + "name": p.Parent.Group.Name, + }, + "pid": p.Parent.PID, + "args": p.Parent.Args, + "thread": mapstr.M{ + "capabilities": mapstr.M{ + "permitted": p.Parent.Thread.Capabilities.Permitted, + "effective": p.Parent.Thread.Capabilities.Effective, + }, + }, + }, + "group_leader": mapstr.M{ + "entity_id": p.GroupLeader.EntityID, + "executable": p.GroupLeader.Executable, + "name": p.GroupLeader.Name, + "interactive": p.GroupLeader.Interactive, + "working_directory": p.GroupLeader.WorkingDirectory, + "user": mapstr.M{ + "id": p.GroupLeader.User.ID, + "name": p.GroupLeader.User.Name, + }, + "group": mapstr.M{ + "id": p.GroupLeader.Group.ID, + "name": p.GroupLeader.Group.Name, + }, + "pid": p.GroupLeader.PID, + "args": p.GroupLeader.Args, + "same_as_process": p.GroupLeader.SameAsProcess, + }, + "session_leader": mapstr.M{ + "entity_id": p.SessionLeader.EntityID, + "executable": p.SessionLeader.Executable, + "name": p.SessionLeader.Name, + "interactive": p.SessionLeader.Interactive, + "working_directory": p.SessionLeader.WorkingDirectory, + "user": mapstr.M{ + "id": p.SessionLeader.User.ID, + "name": p.SessionLeader.User.Name, + }, + "group": mapstr.M{ + "id": p.SessionLeader.Group.ID, + "name": p.SessionLeader.Group.Name, + }, + "pid": p.SessionLeader.PID, + "args": p.SessionLeader.Args, + "same_as_process": p.SessionLeader.SameAsProcess, + }, + "entry_leader": mapstr.M{ + "entity_id": p.EntryLeader.EntityID, + "executable": p.EntryLeader.Executable, + "name": p.EntryLeader.Name, + "interactive": p.EntryLeader.Interactive, + "working_directory": p.EntryLeader.WorkingDirectory, + "entry_meta": mapstr.M{ + "type": p.EntryLeader.EntryMeta.Type, + }, + "user": mapstr.M{ + "id": p.EntryLeader.User.ID, + "name": p.EntryLeader.User.Name, + }, + "group": mapstr.M{ + "id": p.EntryLeader.Group.ID, + "name": p.EntryLeader.Group.Name, + }, + "pid": p.EntryLeader.PID, + "args": p.EntryLeader.Args, + "same_as_process": p.EntryLeader.SameAsProcess, + }, + } + + // nil timestamps will cause a panic within the publisher, only add the mapping if it exists + if p.Start != nil { + process.Put("start", p.Start) + } + if p.Parent.Start != nil { + process.Put("parent.start", p.Parent.Start) + } + if p.GroupLeader.Start != nil { + process.Put("group_leader.start", p.GroupLeader.Start) + } + if p.SessionLeader.Start != nil { + process.Put("session_leader.start", p.SessionLeader.Start) + } + if p.EntryLeader.Start != nil { + process.Put("entry_leader.start", p.EntryLeader.Start) + } + + return process +}