Skip to content

Commit

Permalink
Merge pull request #290 from mmerkes/unitexists
Browse files Browse the repository at this point in the history
NewSystemd handles UnitExists when starting units
  • Loading branch information
AkihiroSuda authored Jul 16, 2023
2 parents 8fa6424 + 9008873 commit a0ae1c2
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 8 deletions.
63 changes: 55 additions & 8 deletions cgroup2/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,14 +870,7 @@ func NewSystemd(slice, group string, pid int, resources *Resources) (*Manager, e
newSystemdProperty("TasksMax", uint64(resources.Pids.Max)))
}

statusChan := make(chan string, 1)
if _, err := conn.StartTransientUnitContext(ctx, group, "replace", properties, statusChan); err == nil {
select {
case <-statusChan:
case <-time.After(time.Second):
logrus.Warnf("Timed out while waiting for StartTransientUnit(%s) completion signal from dbus. Continuing...", group)
}
} else if !isUnitExists(err) {
if err := startUnit(conn, group, properties, pid == -1); err != nil {
return &Manager{}, err
}

Expand All @@ -886,6 +879,60 @@ func NewSystemd(slice, group string, pid int, resources *Resources) (*Manager, e
}, nil
}

func startUnit(conn *systemdDbus.Conn, group string, properties []systemdDbus.Property, ignoreExists bool) error {
ctx := context.TODO()

statusChan := make(chan string, 1)
defer close(statusChan)

retry := true
started := false

for !started {
if _, err := conn.StartTransientUnitContext(ctx, group, "replace", properties, statusChan); err != nil {
if !isUnitExists(err) {
return err
}

if ignoreExists {
return nil
}

if retry {
retry = false
// When a unit of the same name already exists, it may be a leftover failed unit.
// If we reset it once, systemd can try to remove it.
attemptFailedUnitReset(conn, group)
continue
}

return err
} else {
started = true
}
}

select {
case s := <-statusChan:
if s != "done" {
attemptFailedUnitReset(conn, group)
return fmt.Errorf("error creating systemd unit `%s`: got `%s`", group, s)
}
case <-time.After(30 * time.Second):
logrus.Warnf("Timed out while waiting for StartTransientUnit(%s) completion signal from dbus. Continuing...", group)
}

return nil
}

func attemptFailedUnitReset(conn *systemdDbus.Conn, group string) {
err := conn.ResetFailedUnitContext(context.TODO(), group)

if err != nil {
logrus.Warnf("Unable to reset failed unit: %v", err)
}
}

func LoadSystemd(slice, group string) (*Manager, error) {
if slice == "" {
slice = defaultSlice
Expand Down
43 changes: 43 additions & 0 deletions cgroup2/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,49 @@ import (
"go.uber.org/goleak"
)

func setupForNewSystemd(t *testing.T) (cmd *exec.Cmd, group string) {
cmd = exec.Command("cat")
err := cmd.Start()
require.NoError(t, err, "failed to start cat process")
proc := cmd.Process
require.NotNil(t, proc, "process was nil")

group = fmt.Sprintf("testing-watcher-%d.scope", proc.Pid)

return
}

func TestErrorsWhenUnitAlreadyExists(t *testing.T) {
checkCgroupMode(t)

cmd, group := setupForNewSystemd(t)
proc := cmd.Process

_, err := NewSystemd("", group, proc.Pid, &Resources{})
require.NoError(t, err, "Failed to init new cgroup manager")

_, err = NewSystemd("", group, proc.Pid, &Resources{})
if err == nil {
t.Fatal("Expected recreating cgroup manager should fail")
} else if !isUnitExists(err) {
t.Fatalf("Failed to init cgroup manager with unexpected error: %s", err)
}
}

// kubelet relies on this behavior to make sure a slice exists
func TestIgnoreUnitExistsWhenPidNegativeOne(t *testing.T) {
checkCgroupMode(t)

cmd, group := setupForNewSystemd(t)
proc := cmd.Process

_, err := NewSystemd("", group, proc.Pid, &Resources{})
require.NoError(t, err, "Failed to init new cgroup manager")

_, err = NewSystemd("", group, -1, &Resources{})
require.NoError(t, err, "Expected to be able to recreate cgroup manager")
}

//nolint:staticcheck // Staticcheck false positives for nil pointer deference after t.Fatal
func TestEventChanCleanupOnCgroupRemoval(t *testing.T) {
checkCgroupMode(t)
Expand Down

0 comments on commit a0ae1c2

Please sign in to comment.