Skip to content

Commit

Permalink
[NOJIRA] Use the ParsePath function to gather the cluster name and ru…
Browse files Browse the repository at this point in the history
…n ID (#248)

* Use the ParsePath function to gather the cluster name and run ID automatically

* removing duplicate for run_id and cluster in config struct

* Update pkg/cmd/dump.go

Co-authored-by: Simon Maréchal <66471981+Minosity-VR@users.noreply.github.com>

---------

Co-authored-by: jt-dd <julien.terriac@datadoghq.com>
Co-authored-by: jt-dd <112463504+jt-dd@users.noreply.github.com>
Co-authored-by: Simon Maréchal <66471981+Minosity-VR@users.noreply.github.com>
  • Loading branch information
4 people committed Sep 11, 2024
1 parent fdafcbe commit 3f4ca04
Show file tree
Hide file tree
Showing 18 changed files with 238 additions and 51 deletions.
10 changes: 9 additions & 1 deletion cmd/kubehound/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
"github.com/spf13/viper"
)

var (
runID string
)

var (
ingestCmd = &cobra.Command{
Use: "ingest",
Expand All @@ -23,6 +27,8 @@ var (
Long: `Run an ingestion locally using a previous dump (directory or tar.gz)`,
Args: cobra.ExactArgs(1),
PreRunE: func(cobraCmd *cobra.Command, args []string) error {
cmd.BindFlagCluster(cobraCmd)

return cmd.InitializeKubehoundConfig(cobraCmd.Context(), "", true, true)
},
RunE: func(cobraCmd *cobra.Command, args []string) error {
Expand All @@ -41,6 +47,7 @@ var (
Short: "Ingest data remotely on a KHaaS instance",
Long: `Run an ingestion on KHaaS from a bucket to build the attack path, by default it will rehydrate the latest snapshot previously dumped on a KHaaS instance from all clusters`,
PreRunE: func(cobraCmd *cobra.Command, args []string) error {
cmd.BindFlagCluster(cobraCmd)
viper.BindPFlag(config.IngestorAPIEndpoint, cobraCmd.Flags().Lookup("khaas-server")) //nolint: errcheck
viper.BindPFlag(config.IngestorAPIInsecure, cobraCmd.Flags().Lookup("insecure")) //nolint: errcheck

Expand All @@ -62,7 +69,7 @@ var (
return core.CoreClientGRPCRehydrateLatest(khCfg.Ingestor)
}

return core.CoreClientGRPCIngest(khCfg.Ingestor, khCfg.Ingestor.ClusterName, khCfg.Ingestor.RunID)
return core.CoreClientGRPCIngest(khCfg.Ingestor, khCfg.Dynamic.ClusterName, runID)
},
}
)
Expand All @@ -81,6 +88,7 @@ func init() {

ingestCmd.AddCommand(remoteIngestCmd)
cmd.InitRemoteIngestCmd(remoteIngestCmd, true)
remoteIngestCmd.Flags().StringVar(&runID, "run_id", "", "KubeHound run id to ingest (e.g.: 01htdgjj34mcmrrksw4bjy2e94)")

rootCmd.AddCommand(ingestCmd)
}
5 changes: 5 additions & 0 deletions cmd/kubehound/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ var (
return fmt.Errorf("get config: %w", err)
}

err = core.CoreInitLive(cobraCmd.Context(), khCfg)
if err != nil {
return err
}

err = core.CoreLive(cobraCmd.Context(), khCfg)
if err != nil {
return err
Expand Down
9 changes: 0 additions & 9 deletions pkg/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,7 @@ func InitializeKubehoundConfig(ctx context.Context, configPath string, generateR
viper.Set(config.DynamicRunID, config.NewRunID())
}

// This code is also used for file ingestion (dump), so it is not needed in this case. So, we can continue if it fails.
clusterName, err := config.GetClusterName(ctx)
if err == nil {
viper.Set(config.DynamicClusterName, clusterName)
} else {
log.I.Errorf("collector cluster info: %v", err)
}

khCfg := config.NewKubehoundConfig(configPath, inline)

// Activate debug mode if needed
if khCfg.Debug {
log.I.Info("Debug mode activated")
Expand Down
25 changes: 16 additions & 9 deletions pkg/cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,30 @@ func InitRemoteDumpCmd(cmd *cobra.Command) {
viper.BindPFlag(config.CollectorFileBlobRegion, cmd.Flags().Lookup("region")) //nolint: errcheck
}

func InitLocalIngestCmd(cmd *cobra.Command) {
InitCluster(cmd)
cmd.Flags().MarkDeprecated(flagCluster, "Since v1.4.1, KubeHound dump archive contains a metadata file holding the clustername") //nolint: errcheck
}

func InitRemoteIngestCmd(cmd *cobra.Command, standalone bool) {

cmd.PersistentFlags().String("khaas-server", "", "GRPC endpoint exposed by KubeHound as a Service (KHaaS) server (e.g.: localhost:9000)")
cmd.PersistentFlags().Bool("insecure", config.DefaultIngestorAPIInsecure, "Allow insecure connection to the KHaaS server grpc endpoint")

// IngestorAPIEndpoint
if standalone {
cmd.Flags().String("run_id", "", "KubeHound run id to ingest (e.g.: 01htdgjj34mcmrrksw4bjy2e94)")
viper.BindPFlag(config.IngestorRunID, cmd.Flags().Lookup("run_id")) //nolint: errcheck

cmd.Flags().String("cluster", "", "Cluster name to ingest (e.g.: my-cluster-1)")
viper.BindPFlag(config.IngestorClusterName, cmd.Flags().Lookup("cluster")) //nolint: errcheck
InitCluster(cmd)
}
}

func InitLocalIngestCmd(cmd *cobra.Command) {
cmd.PersistentFlags().String("cluster", "", "Cluster name to ingest (e.g.: my-cluster-1)")
viper.BindPFlag(config.IngestorClusterName, cmd.PersistentFlags().Lookup("cluster")) //nolint: errcheck
cmd.MarkFlagRequired("cluster") //nolint: errcheck
const (
flagCluster = "cluster"
)

func InitCluster(cmd *cobra.Command) {
cmd.Flags().String(flagCluster, "", "Cluster name to ingest (e.g.: my-cluster-1)")
}

func BindFlagCluster(cmd *cobra.Command) {
viper.BindPFlag(config.DynamicClusterName, cmd.Flags().Lookup(flagCluster)) //nolint: errcheck
}
18 changes: 10 additions & 8 deletions pkg/collector/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ const (

// FileCollector implements a collector based on local K8s API json files generated outside the KubeHound application via e.g kubectl.
type FileCollector struct {
cfg *config.FileCollectorConfig
log *log.KubehoundLogger
tags collectorTags
cfg *config.FileCollectorConfig
log *log.KubehoundLogger
tags collectorTags
clusterName string
}

// NewFileCollector creates a new instance of the file collector from the provided application config.
Expand All @@ -73,9 +74,10 @@ func NewFileCollector(ctx context.Context, cfg *config.KubehoundConfig) (Collect
l.Infof("Creating file collector from directory %s", cfg.Collector.File.Directory)

return &FileCollector{
cfg: cfg.Collector.File,
log: l,
tags: newCollectorTags(),
cfg: cfg.Collector.File,
log: l,
tags: newCollectorTags(),
clusterName: cfg.Dynamic.ClusterName,
}, nil
}

Expand All @@ -98,7 +100,7 @@ func (c *FileCollector) HealthCheck(_ context.Context) (bool, error) {
return false, fmt.Errorf("file collector base path is not a directory: %s", file.Name())
}

if c.cfg.ClusterName == "" {
if c.clusterName == "" {
return false, errors.New("file collector cluster name not provided")
}

Expand All @@ -107,7 +109,7 @@ func (c *FileCollector) HealthCheck(_ context.Context) (bool, error) {

func (c *FileCollector) ClusterInfo(ctx context.Context) (*config.ClusterInfo, error) {
return &config.ClusterInfo{
Name: c.cfg.ClusterName,
Name: c.clusterName,
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/collector/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ func TestFileCollector_HealthCheck(t *testing.T) {

c = &FileCollector{
cfg: &config.FileCollectorConfig{
Directory: "testdata/test-cluster/",
ClusterName: "test-cluster",
Directory: "testdata/test-cluster/",
},
clusterName: "test-cluster",
}

ok, err = c.HealthCheck(context.Background())
Expand Down
3 changes: 2 additions & 1 deletion pkg/collector/testdata/kubehound-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ collector:
type: file-collector
file:
directory: testdata/test-cluster/
cluster_name: test-cluster
dynamic:
cluster_name: test-cluster
7 changes: 3 additions & 4 deletions pkg/config/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ type K8SAPICollectorConfig struct {

// FileCollectorConfig configures the file collector.
type FileCollectorConfig struct {
ClusterName string `mapstructure:"cluster_name"` // Target cluster (must be specified in config as not present in JSON files)
Directory string `mapstructure:"directory"` // Base directory holding the K8s data JSON files
Archive *FileArchiveConfig `mapstructure:"archive"` // Archive configuration
Blob *BlobConfig `mapstructure:"blob"` // Blob storage configuration
Directory string `mapstructure:"directory"` // Base directory holding the K8s data JSON files
Archive *FileArchiveConfig `mapstructure:"archive"` // Archive configuration
Blob *BlobConfig `mapstructure:"blob"` // Blob storage configuration
}

type FileArchiveConfig struct {
Expand Down
6 changes: 4 additions & 2 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ func TestMustLoadConfig(t *testing.T) {
Collector: CollectorConfig{
Type: CollectorTypeFile,
File: &FileCollectorConfig{
Directory: "cluster-data/",
ClusterName: "test-cluster",
Directory: "cluster-data/",
Archive: &FileArchiveConfig{
NoCompress: DefaultArchiveNoCompress,
},
Expand Down Expand Up @@ -89,6 +88,9 @@ func TestMustLoadConfig(t *testing.T) {
ArchiveName: "archive.tar.gz",
MaxArchiveSize: DefaultMaxArchiveSize,
},
Dynamic: DynamicConfig{
ClusterName: "test-cluster",
},
},
wantErr: false,
},
Expand Down
12 changes: 12 additions & 0 deletions pkg/config/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ type DynamicConfig struct {
ClusterName string `mapstructure:"cluster_name"`
}

func (d *DynamicConfig) HealthCheck() error {
if d.ClusterName == "" {
return fmt.Errorf("missing cluster name")
}

if d.RunID == nil {
return fmt.Errorf("missing run id")
}

return nil
}

// DynamicOption is a functional option for configuring the dynamic config.
type DynamicOption func() (func(*DynamicConfig), error)

Expand Down
2 changes: 0 additions & 2 deletions pkg/config/ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ type IngestorConfig struct {
TempDir string `mapstructure:"temp_dir"`
ArchiveName string `mapstructure:"archive_name"`
MaxArchiveSize int64 `mapstructure:"max_archive_size"`
ClusterName string `mapstructure:"cluster_name"`
RunID string `mapstructure:"run_id"`
}

type IngestorAPIConfig struct {
Expand Down
5 changes: 3 additions & 2 deletions pkg/config/testdata/kubehound-file-collector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ collector:
type: file-collector
file:
directory: cluster-data/
cluster_name: test-cluster
dynamic:
cluster_name: test-cluster
mongodb:
url: "mongodb://localhost:27017"
telemetry:
statsd:
url: "127.0.0.1:8125"
url: "127.0.0.1:8125"
40 changes: 40 additions & 0 deletions pkg/dump/ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"fmt"
"os"
"path/filepath"
"regexp"

"github.com/DataDog/KubeHound/pkg/collector"
"github.com/DataDog/KubeHound/pkg/config"
"github.com/DataDog/KubeHound/pkg/dump/pipeline"
"github.com/DataDog/KubeHound/pkg/dump/writer"
"github.com/DataDog/KubeHound/pkg/telemetry/log"
"github.com/DataDog/KubeHound/pkg/telemetry/span"
"github.com/DataDog/KubeHound/pkg/telemetry/tag"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
Expand Down Expand Up @@ -103,3 +105,41 @@ func (d *DumpIngestor) Close(ctx context.Context) error {

return d.writer.Close(ctx)
}

// Backward Compatibility: Extracting the metadata from the path
const (
DumpResultFilenameRegex = DumpResultPrefix + DumpResultClusterNameRegex + "_" + DumpResultRunIDRegex + DumpResultExtensionRegex
DumpResultPathRegex = DumpResultClusterNameRegex + "/" + DumpResultFilenameRegex
)

func ParsePath(path string) (*DumpResult, error) {
log.I.Warnf("[Backward Compatibility] Extracting the metadata from the path: %s", path)

// ./<clusterName>/kubehound_<clusterName>_<run_id>[.tar.gz]
// re := regexp.MustCompile(`([a-z0-9\.\-_]+)/kubehound_([a-z0-9\.-_]+)_([a-z0-9]{26})\.?([a-z0-9\.]+)?`)
re := regexp.MustCompile(DumpResultPathRegex)
if !re.MatchString(path) {
return nil, fmt.Errorf("Invalid path provided: %q", path)
}

matches := re.FindStringSubmatch(path)
// The cluster name should match (parent dir and in the filename)
if matches[1] != matches[2] {
return nil, fmt.Errorf("Cluster name does not match in the path provided: %q", path)
}

clusterName := matches[1]
runID := matches[3]
extension := matches[4]

isCompressed := false
if extension != "" {
isCompressed = true
}
result, err := NewDumpResult(clusterName, runID, isCompressed)
if err != nil {
return nil, err
}

return result, nil
}
Loading

0 comments on commit 3f4ca04

Please sign in to comment.