Skip to content

Commit

Permalink
[Auditbeat] User dataset: Numerous fixes to error handling (elastic#1…
Browse files Browse the repository at this point in the history
…0942)

Fixes a number of issues in the `user` dataset discovered while testing on Fedora 29:

* When calling C functions via cgo, don't rely on errno as an indicator of success - check the return value first.
* Set errno to 0 before calling getpwent(3) or getspent(3).
* Make C function calls thread-safe.
* Address a systemd bug where it can return ENOENT even when there is no error.
* Don't fail on every error.
* Fail system test on WARN/ERROR messages in the log.

(cherry picked from commit 8ce5440)
  • Loading branch information
Christoph Wurm committed Mar 11, 2019
1 parent c4a5799 commit 01ba0ae
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 89 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ https://github.com/elastic/beats/compare/v7.0.0-beta1...master[Check the HEAD di
- Package: Disable librpm signal handlers. {pull}10694[10694]
- Login: Handle different bad login UTMP types. {pull}10865[10865]
- System module: Fix and unify bucket closing logic. {pull}10897[10897]
- User dataset: Numerous fixes to error handling. {pull}10942[10942]

*Filebeat*

Expand Down
153 changes: 85 additions & 68 deletions x-pack/auditbeat/module/system/user/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/OneOfOne/xxhash"
"github.com/gofrs/uuid"
"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/auditbeat/datastore"
Expand Down Expand Up @@ -288,44 +289,55 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) {

// reportState reports all existing users on the system.
func (ms *MetricSet) reportState(report mb.ReporterV2) error {
var errs multierror.Errors
ms.lastState = time.Now()

users, err := GetUsers(ms.config.DetectPasswordChanges)
if err != nil {
return errors.Wrap(err, "failed to get users")
errs = append(errs, errors.Wrap(err, "error while getting users"))
}

ms.log.Debugf("Found %v users", len(users))
if len(users) > 0 {
stateID, err := uuid.NewV4()
if err != nil {
errs = append(errs, errors.Wrap(err, "error generating state ID"))
}

stateID, err := uuid.NewV4()
if err != nil {
return errors.Wrap(err, "error generating state ID")
}
for _, user := range users {
event := ms.userEvent(user, eventTypeState, eventActionExistingUser)
event.RootFields.Put("event.id", stateID.String())
report.Event(event)
}
for _, user := range users {
event := ms.userEvent(user, eventTypeState, eventActionExistingUser)
event.RootFields.Put("event.id", stateID.String())
report.Event(event)
}

if ms.cache != nil {
// This will initialize the cache with the current processes
ms.cache.DiffAndUpdateCache(convertToCacheable(users))
}
if ms.cache != nil {
// This will initialize the cache with the current processes
ms.cache.DiffAndUpdateCache(convertToCacheable(users))
}

// Save time so we know when to send the state again (config.StatePeriod)
timeBytes, err := ms.lastState.MarshalBinary()
if err != nil {
return err
}
err = ms.bucket.Store(bucketKeyStateTimestamp, timeBytes)
if err != nil {
return errors.Wrap(err, "error writing state timestamp to disk")
// Save time so we know when to send the state again (config.StatePeriod)
timeBytes, err := ms.lastState.MarshalBinary()
if err != nil {
errs = append(errs, err)
} else {
err = ms.bucket.Store(bucketKeyStateTimestamp, timeBytes)
if err != nil {
errs = append(errs, errors.Wrap(err, "error writing state timestamp to disk"))
}
}

err = ms.saveUsersToDisk(users)
if err != nil {
errs = append(errs, err)
}
}

return ms.saveUsersToDisk(users)
return errs.Err()
}

// reportChanges detects and reports any changes to users on this system since the last call.
func (ms *MetricSet) reportChanges(report mb.ReporterV2) error {
var errs multierror.Errors
currentTime := time.Now()

// If this is not the first call to Fetch/reportChanges,
Expand All @@ -343,70 +355,75 @@ func (ms *MetricSet) reportChanges(report mb.ReporterV2) error {

users, err := GetUsers(ms.config.DetectPasswordChanges)
if err != nil {
return errors.Wrap(err, "failed to get users")
errs = append(errs, errors.Wrap(err, "error while getting users"))
}
ms.log.Debugf("Found %v users", len(users))

newInCache, missingFromCache := ms.cache.DiffAndUpdateCache(convertToCacheable(users))
if len(users) > 0 {
newInCache, missingFromCache := ms.cache.DiffAndUpdateCache(convertToCacheable(users))

if len(newInCache) > 0 && len(missingFromCache) > 0 {
// Check for changes to users
missingUserMap := make(map[string](*User))
for _, missingUser := range missingFromCache {
missingUserMap[missingUser.(*User).UID] = missingUser.(*User)
}
if len(newInCache) > 0 && len(missingFromCache) > 0 {
// Check for changes to users
missingUserMap := make(map[string](*User))
for _, missingUser := range missingFromCache {
missingUserMap[missingUser.(*User).UID] = missingUser.(*User)
}

for _, userFromCache := range newInCache {
newUser := userFromCache.(*User)
oldUser, found := missingUserMap[newUser.UID]
for _, userFromCache := range newInCache {
newUser := userFromCache.(*User)
oldUser, found := missingUserMap[newUser.UID]

if found {
// Report password change separately
if ms.config.DetectPasswordChanges && newUser.PasswordType != detectionDisabled &&
oldUser.PasswordType != detectionDisabled {
if found {
// Report password change separately
if ms.config.DetectPasswordChanges && newUser.PasswordType != detectionDisabled &&
oldUser.PasswordType != detectionDisabled {

passwordChanged := newUser.PasswordChanged.Before(oldUser.PasswordChanged) ||
!bytes.Equal(newUser.PasswordHashHash, oldUser.PasswordHashHash) ||
newUser.PasswordType != oldUser.PasswordType
passwordChanged := newUser.PasswordChanged.Before(oldUser.PasswordChanged) ||
!bytes.Equal(newUser.PasswordHashHash, oldUser.PasswordHashHash) ||
newUser.PasswordType != oldUser.PasswordType

if passwordChanged {
report.Event(ms.userEvent(newUser, eventTypeEvent, eventActionPasswordChanged))
if passwordChanged {
report.Event(ms.userEvent(newUser, eventTypeEvent, eventActionPasswordChanged))
}
}
}

// Hack to check if only the password changed
oldUser.PasswordChanged = newUser.PasswordChanged
oldUser.PasswordHashHash = newUser.PasswordHashHash
oldUser.PasswordType = newUser.PasswordType
if newUser.Hash() != oldUser.Hash() {
report.Event(ms.userEvent(newUser, eventTypeEvent, eventActionUserChanged))
// Hack to check if only the password changed
oldUser.PasswordChanged = newUser.PasswordChanged
oldUser.PasswordHashHash = newUser.PasswordHashHash
oldUser.PasswordType = newUser.PasswordType
if newUser.Hash() != oldUser.Hash() {
report.Event(ms.userEvent(newUser, eventTypeEvent, eventActionUserChanged))
}

delete(missingUserMap, oldUser.UID)
} else {
report.Event(ms.userEvent(newUser, eventTypeEvent, eventActionUserAdded))
}
}

delete(missingUserMap, oldUser.UID)
} else {
report.Event(ms.userEvent(newUser, eventTypeEvent, eventActionUserAdded))
for _, missingUser := range missingUserMap {
report.Event(ms.userEvent(missingUser, eventTypeEvent, eventActionUserRemoved))
}
} else {
// No changes to users
for _, user := range newInCache {
report.Event(ms.userEvent(user.(*User), eventTypeEvent, eventActionUserAdded))
}
}

for _, missingUser := range missingUserMap {
report.Event(ms.userEvent(missingUser, eventTypeEvent, eventActionUserRemoved))
}
} else {
// No changes to users
for _, user := range newInCache {
report.Event(ms.userEvent(user.(*User), eventTypeEvent, eventActionUserAdded))
for _, user := range missingFromCache {
report.Event(ms.userEvent(user.(*User), eventTypeEvent, eventActionUserRemoved))
}
}

for _, user := range missingFromCache {
report.Event(ms.userEvent(user.(*User), eventTypeEvent, eventActionUserRemoved))
if len(newInCache) > 0 || len(missingFromCache) > 0 {
err = ms.saveUsersToDisk(users)
if err != nil {
errs = append(errs, err)
}
}
}

if len(newInCache) > 0 || len(missingFromCache) > 0 {
return ms.saveUsersToDisk(users)
}

return nil
return errs.Err()
}

func (ms *MetricSet) userEvent(user *User, eventType string, action eventAction) mb.Event {
Expand Down
9 changes: 6 additions & 3 deletions x-pack/auditbeat/module/system/user/user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ func TestData(t *testing.T) {

func getConfig() map[string]interface{} {
return map[string]interface{}{
"module": "system",
"metricsets": []string{"user"},
"user.detect_password_changes": true,
"module": "system",
"metricsets": []string{"user"},

// Would require root access to /etc/shadow
// which we usually don't have when testing.
"user.detect_password_changes": false,
}
}
76 changes: 60 additions & 16 deletions x-pack/auditbeat/module/system/user/users_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,26 @@

package user

// #include <errno.h>
// #include <sys/types.h>
// #include <pwd.h>
// #include <shadow.h>
//
// void clearErrno() {
// errno = 0;
// }
import "C"

import (
"crypto/sha512"
"os/user"
"runtime"
"strconv"
"strings"
"syscall"
"time"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"
)

Expand All @@ -28,24 +36,34 @@ var (
// GetUsers retrieves a list of users using information from
// /etc/passwd, /etc/group, and - if configured - /etc/shadow.
func GetUsers(readPasswords bool) ([]*User, error) {
users, err := readPasswdFile(readPasswords)
if err != nil {
return nil, err
}
var errs multierror.Errors

err = enrichWithGroups(users)
// We are using a number of thread sensitive C functions in
// this file, most importantly setpwent/getpwent/endpwent and
// setspent/getspent/endspent. And we set errno (which is thread-local).
runtime.LockOSThread()
defer runtime.UnlockOSThread()

users, err := readPasswdFile(readPasswords)
if err != nil {
return nil, err
errs = append(errs, err)
}

if readPasswords {
err = enrichWithShadow(users)
if len(users) > 0 {
err = enrichWithGroups(users)
if err != nil {
return nil, err
errs = append(errs, err)
}

if readPasswords {
err = enrichWithShadow(users)
if err != nil {
errs = append(errs, err)
}
}
}

return users, nil
return users, errs.Err()
}

func readPasswdFile(readPasswords bool) ([]*User, error) {
Expand All @@ -54,9 +72,22 @@ func readPasswdFile(readPasswords bool) ([]*User, error) {
C.setpwent()
defer C.endpwent()

for passwd, err := C.getpwent(); passwd != nil; passwd, err = C.getpwent() {
if err != nil {
return nil, errors.Wrap(err, "error getting user")
for {
// Setting errno to 0 before calling getpwent().
// See return value section of getpwent(3).
C.clearErrno()

passwd, err := C.getpwent()

if passwd == nil {
// getpwent() can return ENOENT even when there is no error,
// see https://github.com/systemd/systemd/issues/9585.
if err != nil && err != syscall.ENOENT {
return users, errors.Wrap(err, "error getting user")
}

// No more entries
break
}

// passwd is C.struct_passwd
Expand Down Expand Up @@ -161,9 +192,22 @@ func readShadowFile() (map[string]shadowFileEntry, error) {
defer C.endspent()

shadowEntries := make(map[string]shadowFileEntry)
for spwd, err := C.getspent(); spwd != nil; spwd, err = C.getspent() {
if err != nil {
return nil, errors.Wrap(err, "error while reading shadow file")

for {
// While getspnam(3) does not explicitly call out the need for setting errno to 0
// as getpwent(3) does, at least glibc uses the same code for both, and so it
// probably makes sense to do the same for both.
C.clearErrno()

spwd, err := C.getspent()

if spwd == nil {
if err != nil {
return shadowEntries, errors.Wrap(err, "error while reading shadow file")
}

// No more entries
break
}

shadow := shadowFileEntry{
Expand Down
3 changes: 1 addition & 2 deletions x-pack/auditbeat/tests/system/test_metricsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,4 @@ def test_metricset_user(self):

fields = ["user.entity_id", "system.audit.user.name"]

# Metricset is beta and that generates a warning, TODO: remove later
self.check_metricset("system", "user", COMMON_FIELDS + fields, warnings_allowed=True)
self.check_metricset("system", "user", COMMON_FIELDS + fields)

0 comments on commit 01ba0ae

Please sign in to comment.