Skip to content

Commit

Permalink
ebpf: per target configuration with labels (#2977)
Browse files Browse the repository at this point in the history
  • Loading branch information
korniltsev committed Feb 5, 2024
1 parent 5f808d3 commit 3713fae
Show file tree
Hide file tree
Showing 17 changed files with 458 additions and 124 deletions.
56 changes: 38 additions & 18 deletions ebpf/cmd/playground/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"connectrpc.com/connect"
"github.com/go-kit/log"
"github.com/grafana/pyroscope/ebpf/cpp/demangle"
ebpfmetrics "github.com/grafana/pyroscope/ebpf/metrics"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
Expand All @@ -28,7 +29,6 @@ import (
"github.com/grafana/pyroscope/ebpf/pprof"
"github.com/grafana/pyroscope/ebpf/sd"
"github.com/grafana/pyroscope/ebpf/symtab"
"github.com/grafana/pyroscope/ebpf/symtab/elf"
"github.com/prometheus/client_golang/prometheus"
commonconfig "github.com/prometheus/common/config"
)
Expand All @@ -43,11 +43,35 @@ var (
session ebpfspy.Session
)

type splitLog struct {
err log.Logger
rest log.Logger
}

func (s splitLog) Log(keyvals ...interface{}) error {
if len(keyvals)%2 != 0 {
return s.err.Log(keyvals...)
}
for i := 0; i < len(keyvals); i += 2 {
if keyvals[i] == "level" {
vv := keyvals[i+1]
vvs, ok := vv.(fmt.Stringer)
if ok && vvs.String() == "error" {
return s.err.Log(keyvals...)
}
}
}
return s.rest.Log(keyvals...)
}

func main() {
config = getConfig()
metrics = ebpfmetrics.New(prometheus.DefaultRegisterer)

logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
logger = &splitLog{
err: log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)),
rest: log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout)),
}

targetFinder, err := sd.NewTargetFinder(os.DirFS("/"), logger, convertTargetOptions())
if err != nil {
Expand Down Expand Up @@ -76,16 +100,11 @@ func main() {
}

func collectProfiles(profiles chan *pushv1.PushRequest) {
builders := pprof.NewProfileBuilders(int64(config.SampleRate))
err := session.CollectProfiles(func(target *sd.Target, stack []string, value uint64, pid uint32, aggregation ebpfspy.SampleAggregation) {
labelsHash, labels := target.Labels()
builder := builders.BuilderForTarget(labelsHash, labels)
if aggregation == ebpfspy.SampleAggregated {
builder.CreateSample(stack, value)
} else {
builder.CreateSampleOrAddValue(stack, value)
}
builders := pprof.NewProfileBuilders(pprof.BuildersOptions{
SampleRate: int64(config.SampleRate),
PerPIDProfile: true,
})
err := pprof.Collect(builders, session)

if err != nil {
panic(err)
Expand Down Expand Up @@ -163,6 +182,7 @@ func convertSessionOptions() ebpfspy.SessionOptions {
PythonEnabled: config.PythonEnabled,
Metrics: metrics,
CacheOptions: config.CacheOptions,
VerifierLogSize: 1024 * 1024 * 20,
}
}

Expand Down Expand Up @@ -191,12 +211,13 @@ var defaultConfig = Config{
UnknownSymbolModuleOffset: true,
UnknownSymbolAddress: true,
PythonEnabled: true,
SymbolOptions: symtab.SymbolOptions{
GoTableFallback: true,
PythonFullFilePath: false,
DemangleOptions: demangle.DemangleFull,
},
CacheOptions: symtab.CacheOptions{
SymbolOptions: symtab.SymbolOptions{
GoTableFallback: true,
PythonFullFilePath: false,
DemangleOptions: elf.DemangleFull,
},

PidCacheOptions: symtab.GCacheOptions{
Size: 239,
KeepRounds: 8,
Expand All @@ -223,6 +244,7 @@ type Config struct {
UnknownSymbolModuleOffset bool
UnknownSymbolAddress bool
PythonEnabled bool
SymbolOptions symtab.SymbolOptions
CacheOptions symtab.CacheOptions
SampleRate int
TargetsOnly bool
Expand Down Expand Up @@ -296,7 +318,6 @@ func getProcessTargets() []sd.DiscoveryTarget {
"__meta_process_comm": string(comm),
"__meta_process_cgroup": string(cgroup),
}
_ = level.Debug(logger).Log("msg", "process target", "target", target.DebugString())
res = append(res, target)
}
return res
Expand Down Expand Up @@ -326,7 +347,6 @@ func relabelProcessTargets(targets []sd.DiscoveryTarget, cfg []*RelabelConfig) [
continue
}
tt := sd.DiscoveryTarget(lbls.Map())
_ = level.Debug(logger).Log("msg", "relabelled process", "target", tt.DebugString())
res = append(res, tt)
}
return res
Expand Down
24 changes: 24 additions & 0 deletions ebpf/cpp/demangle/demangle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package demangle

import "github.com/ianlancetaylor/demangle"

var DemangleUnspecified []demangle.Option = nil
var DemangleNoneSpecified []demangle.Option = make([]demangle.Option, 0)
var DemangleSimplified = []demangle.Option{demangle.NoParams, demangle.NoEnclosingParams, demangle.NoTemplateParams}
var DemangleTemplates = []demangle.Option{demangle.NoParams, demangle.NoEnclosingParams}
var DemangleFull = []demangle.Option{demangle.NoClones}

func ConvertDemangleOptions(o string) []demangle.Option {
switch o {
case "none":
return DemangleNoneSpecified
case "simplified":
return DemangleSimplified
case "templates":
return DemangleTemplates
case "full":
return DemangleFull
default:
return DemangleUnspecified
}
}
140 changes: 115 additions & 25 deletions ebpf/pprof/pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/cespare/xxhash/v2"
"github.com/google/pprof/profile"
"github.com/grafana/pyroscope/ebpf/sd"
"github.com/klauspost/compress/gzip"
"github.com/prometheus/prometheus/model/labels"
)
Expand All @@ -26,21 +27,96 @@ var (
}
)

type SampleType uint32

var SampleTypeCpu = SampleType(0)
var SampleTypeMem = SampleType(1)

type SampleAggregation bool

var (
// SampleAggregated mean samples are accumulated in ebpf, no need to dedup these
SampleAggregated = SampleAggregation(true)
// SampleNotAggregated mean values are not accumulated in ebpf, but streamed to userspace with value=1
// TODO make consider aggregating python in ebpf as well
SampleNotAggregated = SampleAggregation(false)
)

type CollectProfilesCallback func(sample ProfileSample)

type SamplesCollector interface {
CollectProfiles(callback CollectProfilesCallback) error
}

type ProfileSample struct {
Target *sd.Target
Pid uint32
SampleType SampleType
Aggregation SampleAggregation
Stack []string
Value uint64
Value2 uint64
}

type BuildersOptions struct {
SampleRate int64
PerPIDProfile bool
}

type builderHashKey struct {
labelsHash uint64
pid uint32
sampleType SampleType
}

type ProfileBuilders struct {
Builders map[uint64]*ProfileBuilder
SampleRate int64
Builders map[builderHashKey]*ProfileBuilder
opt BuildersOptions
}

func NewProfileBuilders(sampleRate int64) *ProfileBuilders {
return &ProfileBuilders{Builders: make(map[uint64]*ProfileBuilder), SampleRate: sampleRate}
func NewProfileBuilders(options BuildersOptions) *ProfileBuilders {
return &ProfileBuilders{Builders: make(map[builderHashKey]*ProfileBuilder), opt: options}
}

func (b ProfileBuilders) BuilderForTarget(hash uint64, labels labels.Labels) *ProfileBuilder {
res := b.Builders[hash]
func Collect(builders *ProfileBuilders, collector SamplesCollector) error {
return collector.CollectProfiles(func(sample ProfileSample) {
builders.AddSample(&sample)
})
}

func (b *ProfileBuilders) AddSample(sample *ProfileSample) {
bb := b.BuilderForSample(sample)
if sample.Aggregation == SampleAggregated {
bb.CreateSample(sample)
} else {
bb.CreateSampleOrAddValue(sample)
}
}

func (b *ProfileBuilders) BuilderForSample(sample *ProfileSample) *ProfileBuilder {
labelsHash, labels := sample.Target.Labels()

k := builderHashKey{labelsHash: labelsHash, sampleType: sample.SampleType}
if b.opt.PerPIDProfile {
k.pid = sample.Pid
}
res := b.Builders[k]
if res != nil {
return res
}

var sampleType []*profile.ValueType
var periodType *profile.ValueType
var period int64
if sample.SampleType == SampleTypeCpu {
sampleType = []*profile.ValueType{{Type: "cpu", Unit: "nanoseconds"}}
periodType = &profile.ValueType{Type: "cpu", Unit: "nanoseconds"}
period = time.Second.Nanoseconds() / b.opt.SampleRate
} else {
sampleType = []*profile.ValueType{{Type: "alloc_objects", Unit: "count"}, {Type: "alloc_space", Unit: "bytes"}}
periodType = &profile.ValueType{Type: "space", Unit: "bytes"}
period = 512 * 1024 // todo
}
builder := &ProfileBuilder{
locations: make(map[string]*profile.Location),
functions: make(map[string]*profile.Function),
Expand All @@ -52,16 +128,16 @@ func (b ProfileBuilders) BuilderForTarget(hash uint64, labels labels.Labels) *Pr
ID: 1,
},
},
SampleType: []*profile.ValueType{{Type: "cpu", Unit: "nanoseconds"}},
Period: time.Second.Nanoseconds() / b.SampleRate,
PeriodType: &profile.ValueType{Type: "cpu", Unit: "nanoseconds"},
SampleType: sampleType,
Period: period,
PeriodType: periodType,
TimeNanos: time.Now().UnixNano(),
},
tmpLocationIDs: make([]uint64, 0, 128),
tmpLocations: make([]*profile.Location, 0, 128),
}
res = builder
b.Builders[hash] = res
b.Builders[k] = res
return res
}

Expand All @@ -76,36 +152,31 @@ type ProfileBuilder struct {
tmpLocationIDs []uint64
}

func (p *ProfileBuilder) CreateSample(stacktrace []string, value uint64) {
sample := &profile.Sample{
Value: []int64{int64(value) * p.Profile.Period},
}
for _, s := range stacktrace {
loc := p.addLocation(s)
sample.Location = append(sample.Location, loc)
func (p *ProfileBuilder) CreateSample(inputSample *ProfileSample) {
sample := p.newSample(inputSample)
p.addValue(inputSample, sample)
for i, s := range inputSample.Stack {
sample.Location[i] = p.addLocation(s)
}
p.Profile.Sample = append(p.Profile.Sample, sample)
}

func (p *ProfileBuilder) CreateSampleOrAddValue(stacktrace []string, value uint64) {
scaledValue := int64(value) * p.Profile.Period
func (p *ProfileBuilder) CreateSampleOrAddValue(inputSample *ProfileSample) {
p.tmpLocations = p.tmpLocations[:0]
p.tmpLocationIDs = p.tmpLocationIDs[:0]
for _, s := range stacktrace {
for _, s := range inputSample.Stack {
loc := p.addLocation(s)
p.tmpLocations = append(p.tmpLocations, loc)
p.tmpLocationIDs = append(p.tmpLocationIDs, loc.ID)
}
h := xxhash.Sum64(uint64Bytes(p.tmpLocationIDs))
sample := p.sampleHashToSample[h]
if sample != nil {
sample.Value[0] += scaledValue
p.addValue(inputSample, sample)
return
}
sample = &profile.Sample{
Location: make([]*profile.Location, len(p.tmpLocations)),
Value: []int64{scaledValue},
}
sample = p.newSample(inputSample)
p.addValue(inputSample, sample)
copy(sample.Location, p.tmpLocations)
p.sampleHashToSample[h] = sample
p.Profile.Sample = append(p.Profile.Sample, sample)
Expand Down Expand Up @@ -177,3 +248,22 @@ func uint64Bytes(s []uint64) []byte {
hdr.Data = uintptr(unsafe.Pointer(&s[0]))
return bs
}
func (p *ProfileBuilder) newSample(inputSample *ProfileSample) *profile.Sample {
sample := new(profile.Sample)
if inputSample.SampleType == SampleTypeCpu {
sample.Value = []int64{0}
} else {
sample.Value = []int64{0, 0}
}
sample.Location = make([]*profile.Location, len(inputSample.Stack))
return sample
}

func (p *ProfileBuilder) addValue(inputSample *ProfileSample, sample *profile.Sample) {
if inputSample.SampleType == SampleTypeCpu {
sample.Value[0] += int64(inputSample.Value) * p.Profile.Period
} else {
sample.Value[0] += int64(inputSample.Value)
sample.Value[1] += int64(inputSample.Value2)
}
}
Loading

0 comments on commit 3713fae

Please sign in to comment.