Skip to content

Commit

Permalink
jobsprofiler: introduce collection of job bundles
Browse files Browse the repository at this point in the history
Similar to statement bundles this change introduces the
infrastructure to collect and read job profiler bundles.
Right now, a job profiler bundle will only contain the
latest DSP diagram for a job, but going forward this will
give us a place to dump raw files such as:
- cluster-wide job traces
- cpu profiles
- trace-driven aggregated stats
- raw payload and progress protos

Downloading this bundle will be exposed in a future patch in
all of the places where statement bundles are today:
- DBConsole
- CLI shell
- SQL shell

This change introduces a builtin that constructs
and writes the bundle for a job to the system.job_info
table. It also introduces a new endpoint on the status
server to read this constructed bundle. The next set of
PRs will add the necessary components to allow downloading
the bundle from the DBConsole.

Informs: #105076

Release note: None
  • Loading branch information
adityamaru committed Jun 22, 2023
1 parent b539561 commit ec46b6b
Show file tree
Hide file tree
Showing 18 changed files with 642 additions and 9 deletions.
45 changes: 45 additions & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -5078,6 +5078,51 @@ Support status: [reserved](#support-status)



## GetJobProfilerBundle

`GET /_status/job_profiler_bundle/{job_id}/{bundle_id}`



Support status: [reserved](#support-status)

#### Request Parameters







| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| bundle_id | [string](#cockroach.server.serverpb.GetJobProfilerBundleRequest-string) | | | [reserved](#support-status) |
| job_id | [int64](#cockroach.server.serverpb.GetJobProfilerBundleRequest-int64) | | | [reserved](#support-status) |







#### Response Parameters







| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| bundle | [bytes](#cockroach.server.serverpb.GetJobProfilerBundleResponse-bytes) | | | [reserved](#support-status) |







## RequestCA

`GET /_join/v1/ca`
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3251,6 +3251,8 @@ active for the current transaction.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.repair_ttl_table_scheduled_job"></a><code>crdb_internal.repair_ttl_table_scheduled_job(oid: oid) &rarr; void</code></td><td><span class="funcdesc"><p>Repairs the scheduled job for a TTL table if it is missing.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.request_job_profiler_bundle"></a><code>crdb_internal.request_job_profiler_bundle(jobID: <a href="int.html">int</a>) &rarr; <a href="bool.html">bool</a></code></td><td><span class="funcdesc"><p>Used to request a job profiler bundle for a given job ID</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.request_statement_bundle"></a><code>crdb_internal.request_statement_bundle(stmtFingerprint: <a href="string.html">string</a>, samplingProbability: <a href="float.html">float</a>, minExecutionLatency: <a href="interval.html">interval</a>, expiresAfter: <a href="interval.html">interval</a>) &rarr; <a href="bool.html">bool</a></code></td><td><span class="funcdesc"><p>Used to request statement bundle for a given statement fingerprint
that has execution latency greater than the ‘minExecutionLatency’. If the
‘expiresAfter’ argument is empty, then the statement bundle request never
Expand Down
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,7 @@ GO_TARGETS = [
"//pkg/jobs/jobspb:jobspb",
"//pkg/jobs/jobspb:jobspb_test",
"//pkg/jobs/jobsprofiler/profilerconstants:profilerconstants",
"//pkg/jobs/jobsprofiler/profilerpb:profilerpb",
"//pkg/jobs/jobsprofiler:jobsprofiler",
"//pkg/jobs/jobsprofiler:jobsprofiler_test",
"//pkg/jobs/jobsprotectedts:jobsprotectedts",
Expand Down
1 change: 1 addition & 0 deletions pkg/gen/protobuf.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ PROTOBUF_SRCS = [
"//pkg/gossip:gossip_go_proto",
"//pkg/inspectz/inspectzpb:inspectzpb_go_proto",
"//pkg/jobs/jobspb:jobspb_go_proto",
"//pkg/jobs/jobsprofiler/profilerpb:profilerpb_go_proto",
"//pkg/keyvisualizer/keyvispb:keyvispb_go_proto",
"//pkg/kv/bulk/bulkpb:bulkpb_go_proto",
"//pkg/kv/kvnemesis:kvnemesis_go_proto",
Expand Down
5 changes: 1 addition & 4 deletions pkg/jobs/jobsprofiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,8 @@ func StorePerNodeProcessorProgressFraction(
if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := jobs.InfoStorageForJob(txn, jobID)
for componentID, fraction := range perComponentProgress {
key, err := profilerconstants.MakeNodeProcessorProgressInfoKey(componentID.FlowID.String(),
key := profilerconstants.MakeNodeProcessorProgressInfoKey(componentID.FlowID.String(),
componentID.SQLInstanceID.String(), componentID.ID)
if err != nil {
return errors.Wrap(err, "failed to construct progress info key")
}
fractionBytes := []byte(fmt.Sprintf("%f", fraction))
return infoStorage.Write(ctx, key, fractionBytes)
}
Expand Down
32 changes: 28 additions & 4 deletions pkg/jobs/jobsprofiler/profilerconstants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,35 @@ const NodeProcessorProgressInfoKeyPrefix = "~node-processor-progress-"

// MakeNodeProcessorProgressInfoKey returns the info_key used for rows that
// store the per node, per processor progress for a job.
func MakeNodeProcessorProgressInfoKey(
flowID string, instanceID string, processorID int32,
) (string, error) {
func MakeNodeProcessorProgressInfoKey(flowID string, instanceID string, processorID int32) string {
// The info key is of the form: <prefix>-<flowID>,<instanceID>,<processorID>.
return fmt.Sprintf("%s%s,%s,%d", NodeProcessorProgressInfoKeyPrefix, flowID, instanceID, processorID), nil
return fmt.Sprintf("%s%s,%s,%d", NodeProcessorProgressInfoKeyPrefix, flowID, instanceID, processorID)
}

// ProfilerBundleChunkKeyPrefix is the prefix of the info key used for rows that
// store chunks of a job's profiler bundle.
const ProfilerBundleChunkKeyPrefix = "~profiler-bundle-chunk-"

// MakeProfilerBundleChunkKeyPrefix is the prefix of the info key used to store all
// chunks of a given profiler bundle.
func MakeProfilerBundleChunkKeyPrefix(bundleID string) string {
return fmt.Sprintf("%s%s-", ProfilerBundleChunkKeyPrefix, bundleID)
}

// MakeProfilerBundleChunkKey is the info key used to store a chunk of a job's
// profiler bundle.
func MakeProfilerBundleChunkKey(bundleID string, unixNanos int64) string {
return fmt.Sprintf("%s%s-%d", ProfilerBundleChunkKeyPrefix, bundleID, unixNanos)
}

// ProfilerBundleMetadataKeyPrefix is the prefix of the info key used for rows
// that store the metadata entry for a profiler bundle.
const ProfilerBundleMetadataKeyPrefix = "~profiler-bundle-metadata-"

// MakeProfilerBundleMetadataKey is the info key used for rows that store the
// metadata entry for a profiler bundle.
func MakeProfilerBundleMetadataKey(bundleID string) string {
return fmt.Sprintf("%s%s", ProfilerBundleMetadataKeyPrefix, bundleID)
}

// GetNodeProcessorProgressInfoKeyParts deconstructs the passed in info key and
Expand Down
33 changes: 33 additions & 0 deletions pkg/jobs/jobsprofiler/profilerpb/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
load("@rules_proto//proto:defs.bzl", "proto_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")

proto_library(
name = "profilerpb_proto",
srcs = ["profiler.proto"],
strip_import_prefix = "/pkg",
visibility = ["//visibility:public"],
deps = [
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
"@com_google_protobuf//:timestamp_proto",
],
)

go_proto_library(
name = "profilerpb_go_proto",
compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"],
importpath = "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerpb",
proto = ":profilerpb_proto",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/uuid", # keep
"@com_github_gogo_protobuf//gogoproto",
],
)

go_library(
name = "profilerpb",
embed = [":profilerpb_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerpb",
visibility = ["//visibility:public"],
)
27 changes: 27 additions & 0 deletions pkg/jobs/jobsprofiler/profilerpb/profiler.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

syntax = "proto3";
package cockroach.jobs.jobsprofiler.profilerpb;
option go_package = "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerpb";

import "google/protobuf/timestamp.proto";
import "gogoproto/gogo.proto";

message ProfilerBundleMetadata {
bytes bundle_id = 1 [
(gogoproto.customname) = "BundleID",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID",
(gogoproto.nullable) = false
];
int32 num_chunks = 2;
google.protobuf.Timestamp collected_at = 3
[ (gogoproto.nullable) = false, (gogoproto.stdtime) = true ];
}
2 changes: 2 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ go_library(
"//pkg/inspectz/inspectzpb",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprofiler/profilerconstants",
"//pkg/jobs/jobsprofiler/profilerpb",
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/keyvisualizer",
Expand Down
17 changes: 17 additions & 0 deletions pkg/server/serverpb/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1995,6 +1995,16 @@ message CriticalNodesResponse {
roachpb.SpanConfigConformanceReport report = 2 [(gogoproto.nullable) = false];
}

message GetJobProfilerBundleRequest {
string bundle_id = 1;
int64 job_id = 2;
}

message GetJobProfilerBundleResponse {
bytes bundle = 1;
}


service Status {
// Certificates retrieves a copy of the TLS certificates.
rpc Certificates(CertificatesRequest) returns (CertificatesResponse) {
Expand Down Expand Up @@ -2457,4 +2467,11 @@ service Status {
// ListExecutionInsights returns potentially problematic statements cluster-wide,
// along with actions we suggest the application developer might take to remedy them.
rpc ListExecutionInsights(ListExecutionInsightsRequest) returns (ListExecutionInsightsResponse) {}


rpc GetJobProfilerBundle(GetJobProfilerBundleRequest) returns (GetJobProfilerBundleResponse) {
option (google.api.http) = {
get: "/_status/job_profiler_bundle/{job_id}/{bundle_id}"
};
}
}
58 changes: 58 additions & 0 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerpb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/keyvisualizer/keyvisstorage"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand All @@ -62,6 +64,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/contention"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/roleoption"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
Expand All @@ -74,6 +77,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -3916,3 +3920,57 @@ func (s *statusServer) TransactionContentionEvents(

return resp, nil
}

// GetJobProfilerBundle reads all the chunks of a profiler bundle for a given
// job and bundle ID.
func (s *statusServer) GetJobProfilerBundle(
ctx context.Context, req *serverpb.GetJobProfilerBundleRequest,
) (*serverpb.GetJobProfilerBundleResponse, error) {
ctx = s.AnnotateCtx(ctx)
err := s.privilegeChecker.requireViewDebugPermission(ctx)
if err != nil {
return nil, err
}

bundleID := req.BundleId
jobID := jobspb.JobID(req.JobId)

execCfg := s.sqlServer.execCfg
buf := bytes.NewBuffer([]byte{})
if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
// Reset the buf inside the txn closure to guard against txn retries.
buf.Reset()
jobInfo := jobs.InfoStorageForJob(txn, jobID)
mdKey := profilerconstants.MakeProfilerBundleMetadataKey(bundleID)
md, exists, err := jobInfo.Get(ctx, mdKey)
if err != nil {
return errors.Wrapf(err, "failed to get bundle metadata key for job %d", jobID)
}
if !exists {
return nil
}
profilerMetadata := &profilerpb.ProfilerBundleMetadata{}
if err := protoutil.Unmarshal(md, profilerMetadata); err != nil {
return err
}

chunkKeyPrefix := profilerconstants.MakeProfilerBundleChunkKeyPrefix(bundleID)
var numChunks int
if err := jobInfo.Iterate(ctx, chunkKeyPrefix, func(infoKey string, value []byte) error {
buf.WriteString(string(value))
numChunks++
return nil
}); err != nil {
return errors.Wrapf(err, "failed to iterate over chunks for job %d", jobID)
}

if numChunks != int(profilerMetadata.NumChunks) {
return errors.AssertionFailedf("number of chunks read %d is less than expected number of chunks %d",
numChunks, profilerMetadata.NumChunks)
}
return nil
}); err != nil {
return nil, err
}
return &serverpb.GetJobProfilerBundleResponse{Bundle: buf.Bytes()}, nil
}
4 changes: 4 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ go_library(
"job_exec_context_test_util.go",
"jobs_collection.go",
"jobs_execution_details.go",
"jobs_profiler_bundle.go",
"join.go",
"join_predicate.go",
"join_token.go",
Expand Down Expand Up @@ -312,6 +313,7 @@ go_library(
"//pkg/jobs/jobsauth",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprofiler/profilerconstants",
"//pkg/jobs/jobsprofiler/profilerpb",
"//pkg/keys",
"//pkg/keyvisualizer",
"//pkg/kv",
Expand Down Expand Up @@ -647,6 +649,7 @@ go_test(
"instrumentation_test.go",
"internal_test.go",
"jobs_execution_details_test.go",
"jobs_profiler_bundle_test.go",
"join_token_test.go",
"main_test.go",
"materialized_view_test.go",
Expand Down Expand Up @@ -739,6 +742,7 @@ go_test(
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprofiler",
"//pkg/jobs/jobsprofiler/profilerconstants",
"//pkg/jobs/jobsprofiler/profilerpb",
"//pkg/jobs/jobstest",
"//pkg/keys",
"//pkg/keyvisualizer",
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/jobs_execution_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func constructBackupExecutionDetails(
}
key, err := profilerconstants.MakeDSPDiagramInfoKey(timeutil.Now().UnixNano())
if err != nil {
return errors.Wrap(err, "failed to construct DSP info key")
return err
}
if err := infoStorage.Write(ctx, key, []byte(annotatedURL.String())); err != nil {
return err
Expand Down
Loading

0 comments on commit ec46b6b

Please sign in to comment.