Skip to content

Commit

Permalink
support collecting FsUsageMetrics for containerd
Browse files Browse the repository at this point in the history
  • Loading branch information
yyrdl committed May 21, 2021
1 parent 9c29bca commit 020b7ff
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 49 deletions.
121 changes: 80 additions & 41 deletions container/common/fsHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

"github.com/google/cadvisor/fs"

"k8s.io/klog/v2"
)

Expand All @@ -37,15 +36,18 @@ type FsUsage struct {
InodeUsage uint64
}

type FsUsageProvider interface {
Usage() (*FsUsage, error)
Targets() []string
}

type realFsHandler struct {
sync.RWMutex
lastUpdate time.Time
usage FsUsage
period time.Duration
minPeriod time.Duration
rootfs string
extraDir string
fsInfo fs.FsInfo
lastUpdate time.Time
usage FsUsage
period time.Duration
minPeriod time.Duration
usageProvider FsUsageProvider
// Tells the container to stop.
stopChan chan struct{}
}
Expand All @@ -58,51 +60,35 @@ const DefaultPeriod = time.Minute

var _ FsHandler = &realFsHandler{}

func NewFsHandler(period time.Duration, rootfs, extraDir string, fsInfo fs.FsInfo) FsHandler {
func NewFsHandler(period time.Duration, provider FsUsageProvider) FsHandler {
return &realFsHandler{
lastUpdate: time.Time{},
usage: FsUsage{},
period: period,
minPeriod: period,
rootfs: rootfs,
extraDir: extraDir,
fsInfo: fsInfo,
stopChan: make(chan struct{}, 1),
lastUpdate: time.Time{},
usage: FsUsage{},
period: period,
minPeriod: period,
usageProvider: provider,
stopChan: make(chan struct{}, 1),
}
}

func (fh *realFsHandler) update() error {
var (
rootUsage, extraUsage fs.UsageInfo
rootErr, extraErr error
)
// TODO(vishh): Add support for external mounts.
if fh.rootfs != "" {
rootUsage, rootErr = fh.fsInfo.GetDirUsage(fh.rootfs)
}

if fh.extraDir != "" {
extraUsage, extraErr = fh.fsInfo.GetDirUsage(fh.extraDir)
usage, err := fh.usageProvider.Usage()

if err != nil {
return err
}

// Wait to handle errors until after all operartions are run.
// Wait to handle errors until after all operations are run.
// An error in one will not cause an early return, skipping others
fh.Lock()
defer fh.Unlock()
fh.lastUpdate = time.Now()
if fh.rootfs != "" && rootErr == nil {
fh.usage.InodeUsage = rootUsage.Inodes
fh.usage.BaseUsageBytes = rootUsage.Bytes
fh.usage.TotalUsageBytes = rootUsage.Bytes
}
if fh.extraDir != "" && extraErr == nil {
fh.usage.TotalUsageBytes += extraUsage.Bytes
}

// Combine errors into a single error to return
if rootErr != nil || extraErr != nil {
return fmt.Errorf("rootDiskErr: %v, extraDiskErr: %v", rootErr, extraErr)
}
fh.usage.InodeUsage = usage.InodeUsage
fh.usage.BaseUsageBytes = usage.BaseUsageBytes
fh.usage.TotalUsageBytes = usage.TotalUsageBytes

return nil
}

Expand All @@ -125,7 +111,8 @@ func (fh *realFsHandler) trackUsage() {
// if the long duration is persistent either because of slow
// disk or lots of containers.
longOp = longOp + time.Second
klog.V(2).Infof("fs: disk usage and inodes count on following dirs took %v: %v; will not log again for this container unless duration exceeds %v", duration, []string{fh.rootfs, fh.extraDir}, longOp)
klog.V(2).Infof(`fs: disk usage and inodes count on targets took %v: %v; `+
`will not log again for this container unless duration exceeds %v`, duration, fh.usageProvider.Targets(), longOp)
}
select {
case <-fh.stopChan:
Expand All @@ -148,3 +135,55 @@ func (fh *realFsHandler) Usage() FsUsage {
defer fh.RUnlock()
return fh.usage
}

type fsUsageProvider struct {
fsInfo fs.FsInfo
rootFs string
extraDir string
}

func NewGeneralFsUsageProvider(fsInfo fs.FsInfo, rootFs, extraDir string) FsUsageProvider {
return &fsUsageProvider{
fsInfo: fsInfo,
rootFs: rootFs,
extraDir: extraDir,
}
}

func (f *fsUsageProvider) Targets() []string {
return []string{f.rootFs, f.extraDir}
}

func (f *fsUsageProvider) Usage() (*FsUsage, error) {
var (
rootUsage, extraUsage fs.UsageInfo
rootErr, extraErr error
)

if f.rootFs != "" {
rootUsage, rootErr = f.fsInfo.GetDirUsage(f.rootFs)
}

if f.extraDir != "" {
extraUsage, extraErr = f.fsInfo.GetDirUsage(f.extraDir)
}

usage := &FsUsage{}

if f.rootFs != "" && rootErr == nil {
usage.InodeUsage = rootUsage.Inodes
usage.BaseUsageBytes = rootUsage.Bytes
usage.TotalUsageBytes = rootUsage.Bytes
}

if f.extraDir != "" && extraErr == nil {
usage.TotalUsageBytes += extraUsage.Bytes
}

// Combine errors into a single error to return
if rootErr != nil || extraErr != nil {
return nil, fmt.Errorf("rootDiskErr: %v, extraDiskErr: %v", rootErr, extraErr)
}

return usage, nil
}
20 changes: 20 additions & 0 deletions container/containerd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"time"

containersapi "github.com/containerd/containerd/api/services/containers/v1"
snaptshotapi "github.com/containerd/containerd/api/services/snapshots/v1"
tasksapi "github.com/containerd/containerd/api/services/tasks/v1"
versionapi "github.com/containerd/containerd/api/services/version/v1"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/pkg/dialer"
ptypes "github.com/gogo/protobuf/types"
"github.com/google/cadvisor/container/common"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
)
Expand All @@ -36,12 +38,14 @@ type client struct {
containerService containersapi.ContainersClient
taskService tasksapi.TasksClient
versionService versionapi.VersionClient
snapshotsService snaptshotapi.SnapshotsClient
}

type ContainerdClient interface {
LoadContainer(ctx context.Context, id string) (*containers.Container, error)
TaskPid(ctx context.Context, id string) (uint32, error)
Version(ctx context.Context) (string, error)
ContainerFsUsage(ctx context.Context, snapshotter, snapshotkey string) (*common.FsUsage, error)
}

var once sync.Once
Expand Down Expand Up @@ -92,6 +96,7 @@ func Client(address, namespace string) (ContainerdClient, error) {
containerService: containersapi.NewContainersClient(conn),
taskService: tasksapi.NewTasksClient(conn),
versionService: versionapi.NewVersionClient(conn),
snapshotsService: snaptshotapi.NewSnapshotsClient(conn),
}
})
return ctrdClient, retErr
Expand Down Expand Up @@ -125,6 +130,21 @@ func (c *client) Version(ctx context.Context) (string, error) {
return response.Version, nil
}

func (c *client) ContainerFsUsage(ctx context.Context, snapshotter, snapshotkey string) (*common.FsUsage, error) {
usage, err := c.snapshotsService.Usage(ctx, &snaptshotapi.UsageRequest{
Snapshotter: snapshotter,
Key: snapshotkey,
})
if err != nil {
return nil, err
}
return &common.FsUsage{
BaseUsageBytes: uint64(usage.Size_),
TotalUsageBytes: uint64(usage.Size_),
InodeUsage: uint64(usage.Inodes),
}, nil
}

func containerFromProto(containerpb containersapi.Container) *containers.Container {
var runtime containers.RuntimeInfo
if containerpb.Runtime != nil {
Expand Down
6 changes: 6 additions & 0 deletions container/containerd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"github.com/containerd/containerd/containers"
"github.com/google/cadvisor/container/common"
)

type containerdClientMock struct {
Expand All @@ -45,6 +46,11 @@ func (c *containerdClientMock) TaskPid(ctx context.Context, id string) (uint32,
return 2389, nil
}

func (c *containerdClientMock) ContainerFsUsage(ctx context.Context, snapshotter,
snapshotkey string) (*common.FsUsage, error) {
return &common.FsUsage{}, nil
}

func mockcontainerdClient(cntrs map[string]*containers.Container, returnErr error) ContainerdClient {
return &containerdClientMock{
cntrs: cntrs,
Expand Down
58 changes: 52 additions & 6 deletions container/containerd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

// Handler for containerd containers.

package containerd

import (
Expand All @@ -38,6 +39,7 @@ type containerdContainerHandler struct {
// (e.g.: "cpu" -> "/sys/fs/cgroup/cpu/test")
cgroupPaths map[string]string
fsInfo fs.FsInfo
fsHandler common.FsHandler
// Metadata associated with the container.
reference info.ContainerReference
envs map[string]string
Expand Down Expand Up @@ -122,9 +124,15 @@ func newContainerdContainerHandler(
libcontainerHandler := containerlibcontainer.NewHandler(cgroupManager, rootfs, int(taskPid), includedMetrics)

handler := &containerdContainerHandler{
machineInfoFactory: machineInfoFactory,
cgroupPaths: cgroupPaths,
fsInfo: fsInfo,
machineInfoFactory: machineInfoFactory,
cgroupPaths: cgroupPaths,
fsInfo: fsInfo,
fsHandler: common.NewFsHandler(common.DefaultPeriod, &fsUsageProvider{
ctx: ctx,
client: client,
snapshotter: cntr.Snapshotter,
snapshotkey: cntr.SnapshotKey,
}),
envs: make(map[string]string),
labels: cntr.Labels,
includedMetrics: includedMetrics,
Expand Down Expand Up @@ -169,9 +177,7 @@ func (h *containerdContainerHandler) needNet() bool {
}

func (h *containerdContainerHandler) GetSpec() (info.ContainerSpec, error) {
// TODO: Since we dont collect disk usage stats for containerd, we set hasFilesystem
// to false. Revisit when we support disk usage stats for containerd
hasFilesystem := false
hasFilesystem := true
spec, err := common.GetSpec(h.cgroupPaths, h.machineInfoFactory, h.needNet(), hasFilesystem)
spec.Labels = h.labels
spec.Envs = h.envs
Expand All @@ -189,6 +195,25 @@ func (h *containerdContainerHandler) getFsStats(stats *info.ContainerStats) erro
if h.includedMetrics.Has(container.DiskIOMetrics) {
common.AssignDeviceNamesToDiskStats((*common.MachineInfoNamer)(mi), &stats.DiskIo)
}
if !h.includedMetrics.Has(container.DiskUsageMetrics) {
return nil
}

// TODO(yyrdl):for overlay ,the 'upperPath' is:
// `${containerd.Config.Root}/io.containerd.snapshotter.v1.overlayfs/snapshots/${snapshots.ID}/fs`,
// and for other snapshots plugins, we can also find the law from containerd's source code.

// Device 、fsType and fsLimits and other information are not supported yet, unless there is a way to
// know the id of the snapshot , or the `Stat`(snapshotsClient.Stat) method returns these information directly.

fsStat := info.FsStats{}
usage := h.fsHandler.Usage()
fsStat.BaseUsage = usage.BaseUsageBytes
fsStat.Usage = usage.TotalUsageBytes
fsStat.Inodes = usage.InodeUsage

stats.Filesystem = append(stats.Filesystem, fsStat)

return nil
}

Expand Down Expand Up @@ -239,12 +264,33 @@ func (h *containerdContainerHandler) Type() container.ContainerType {
}

func (h *containerdContainerHandler) Start() {
if h.fsHandler != nil {
h.fsHandler.Start()
}
}

func (h *containerdContainerHandler) Cleanup() {
if h.fsHandler != nil {
h.fsHandler.Stop()
}
}

func (h *containerdContainerHandler) GetContainerIPAddress() string {
// containerd doesnt take care of networking.So it doesnt maintain networking states
return ""
}

type fsUsageProvider struct {
ctx context.Context
snapshotter string
snapshotkey string
client ContainerdClient
}

func (f *fsUsageProvider) Usage() (*common.FsUsage, error) {
return f.client.ContainerFsUsage(f.ctx, f.snapshotter, f.snapshotkey)
}

func (f *fsUsageProvider) Targets() []string {
return []string{fmt.Sprintf("snapshotter(%s) with key (%s)", f.snapshotter, f.snapshotkey)}
}
3 changes: 2 additions & 1 deletion container/crio/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ func newCrioContainerHandler(

// we optionally collect disk usage metrics
if includedMetrics.Has(container.DiskUsageMetrics) {
handler.fsHandler = common.NewFsHandler(common.DefaultPeriod, rootfsStorageDir, storageLogDir, fsInfo)
handler.fsHandler = common.NewFsHandler(common.DefaultPeriod, common.NewGeneralFsUsageProvider(
fsInfo, rootfsStorageDir, storageLogDir))
}
// TODO for env vars we wanted to show from container.Config.Env from whitelist
//for _, exposedEnv := range metadataEnvs {
Expand Down
3 changes: 2 additions & 1 deletion container/docker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ func newDockerContainerHandler(

if includedMetrics.Has(container.DiskUsageMetrics) {
handler.fsHandler = &dockerFsHandler{
fsHandler: common.NewFsHandler(common.DefaultPeriod, rootfsStorageDir, otherStorageDir, fsInfo),
fsHandler: common.NewFsHandler(common.DefaultPeriod, common.NewGeneralFsUsageProvider(
fsInfo, rootfsStorageDir, otherStorageDir)),
thinPoolWatcher: thinPoolWatcher,
zfsWatcher: zfsWatcher,
deviceID: ctnr.GraphDriver.Data["DeviceId"],
Expand Down

0 comments on commit 020b7ff

Please sign in to comment.