Skip to content

Commit

Permalink
fix: add missing k8s job submission times to allocations (#9028)
Browse files Browse the repository at this point in the history
tasks stats used to report Go time zero value as their start time on k8s
```
determined> select
     allocation_id, start_time,
     end_time - start_time as duration
  from task_stats
  where event_type = 'QUEUED'
```
  • Loading branch information
hamidzr authored Apr 4, 2024
1 parent b8bf396 commit f03a8a8
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 82 deletions.
16 changes: 6 additions & 10 deletions master/internal/db/postgres_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

"github.com/o1egl/paseto"
log "github.com/sirupsen/logrus"

"github.com/pkg/errors"
"github.com/uptrace/bun"

Expand Down Expand Up @@ -336,22 +338,16 @@ func CloseOpenAllocations(ctx context.Context, exclude []model.AllocationID) err

// RecordTaskStats record stats for tasks.
func RecordTaskStats(ctx context.Context, stats *model.TaskStats) error {
return RecordTaskStatsBun(ctx, stats)
}
if stats.StartTime == nil || stats.StartTime.IsZero() {
log.Warnf("task stats %+v has no start time", stats)
}

// RecordTaskStatsBun record stats for tasks with bun.
func RecordTaskStatsBun(ctx context.Context, stats *model.TaskStats) error {
_, err := Bun().NewInsert().Model(stats).Exec(context.TODO())
return err
return errors.Wrap(err, "recording task stats")
}

// RecordTaskEndStats record end stats for tasks.
func RecordTaskEndStats(ctx context.Context, stats *model.TaskStats) error {
return RecordTaskEndStatsBun(ctx, stats)
}

// RecordTaskEndStatsBun record end stats for tasks with bun.
func RecordTaskEndStatsBun(ctx context.Context, stats *model.TaskStats) error {
query := Bun().NewUpdate().Model(stats).Column("end_time").
Where("allocation_id = ?", stats.AllocationID).
Where("event_type = ?", stats.EventType).
Expand Down
4 changes: 2 additions & 2 deletions master/internal/db/postgres_tasks_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,10 @@ func TestRecordAndEndTaskStats(t *testing.T) {
if i == 0 {
taskStats.ContainerID = nil
}
require.NoError(t, RecordTaskStatsBun(ctx, taskStats))
require.NoError(t, RecordTaskStats(ctx, taskStats))

taskStats.EndTime = ptrs.Ptr(time.Now().Truncate(time.Millisecond))
require.NoError(t, RecordTaskEndStatsBun(ctx, taskStats))
require.NoError(t, RecordTaskEndStats(ctx, taskStats))
expected = append(expected, taskStats)
}

Expand Down
4 changes: 2 additions & 2 deletions master/internal/rm/agentrm/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,9 +655,9 @@ func (a *agent) HandleIncomingWebsocketMessage(msg *aproto.MasterMessage) {
if a.taskNeedsRecording(msg.ContainerStatsRecord) {
var err error
if msg.ContainerStatsRecord.EndStats {
err = db.RecordTaskEndStatsBun(context.TODO(), msg.ContainerStatsRecord.Stats)
err = db.RecordTaskEndStats(context.TODO(), msg.ContainerStatsRecord.Stats)
} else {
err = db.RecordTaskStatsBun(context.TODO(), msg.ContainerStatsRecord.Stats)
err = db.RecordTaskStats(context.TODO(), msg.ContainerStatsRecord.Stats)
}
if err != nil {
a.syslog.Errorf("error recording task stats %s", err)
Expand Down
34 changes: 1 addition & 33 deletions master/internal/rm/agentrm/agent_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,39 +814,7 @@ func (a *ResourceManager) createResourcePoolSummary(
func (a *ResourceManager) fetchAvgQueuedTime(pool string) (
[]*jobv1.AggregateQueueStats, error,
) {
aggregates := []model.ResourceAggregates{}
err := db.Bun().NewSelect().Model(&aggregates).
Where("aggregation_type = ?", "queued").
Where("aggregation_key = ?", pool).
Where("date >= CURRENT_TIMESTAMP - interval '30 days'").
Order("date ASC").Scan(context.TODO())
if err != nil {
return nil, err
}
res := make([]*jobv1.AggregateQueueStats, 0)
for _, record := range aggregates {
res = append(res, &jobv1.AggregateQueueStats{
PeriodStart: record.Date.Format("2006-01-02"),
Seconds: record.Seconds,
})
}
today := float32(0)
subq := db.Bun().NewSelect().TableExpr("allocations").Column("allocation_id").
Where("resource_pool = ?", pool).
Where("start_time >= CURRENT_DATE")
err = db.Bun().NewSelect().TableExpr("task_stats").ColumnExpr(
"avg(extract(epoch FROM end_time - start_time))",
).Where("event_type = ?", "QUEUED").
Where("end_time >= CURRENT_DATE AND allocation_id IN (?) ", subq).
Scan(context.TODO(), &today)
if err != nil {
return nil, err
}
res = append(res, &jobv1.AggregateQueueStats{
PeriodStart: time.Now().Format("2006-01-02"),
Seconds: today,
})
return res, nil
return rm.FetchAvgQueuedTime(pool)
}

// mostly for tests.
Expand Down
51 changes: 51 additions & 0 deletions master/internal/rm/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package rm

import (
"context"
"time"

"github.com/pkg/errors"

"github.com/determined-ai/determined/master/internal/db"
"github.com/determined-ai/determined/master/pkg/model"
"github.com/determined-ai/determined/proto/pkg/jobv1"
)

// FetchAvgQueuedTime fetches the average queued time for a resource pool.
func FetchAvgQueuedTime(pool string) (
[]*jobv1.AggregateQueueStats, error,
) {
aggregates := []model.ResourceAggregates{}
err := db.Bun().NewSelect().Model(&aggregates).
Where("aggregation_type = ?", model.AggregationTypeQueued).
Where("aggregation_key = ?", pool).
Where("date >= CURRENT_TIMESTAMP - interval '30 days'").
Order("date ASC").Scan(context.TODO())
if err != nil {
return nil, errors.Wrap(err, "error fetching aggregates")
}
res := make([]*jobv1.AggregateQueueStats, 0)
for _, record := range aggregates {
res = append(res, &jobv1.AggregateQueueStats{
PeriodStart: record.Date.Format("2006-01-02"),
Seconds: record.Seconds,
})
}
today := float32(0)
subq := db.Bun().NewSelect().TableExpr("allocations").Column("allocation_id").
Where("resource_pool = ?", pool).
Where("start_time >= CURRENT_DATE")
err = db.Bun().NewSelect().TableExpr("task_stats").ColumnExpr(
"avg(extract(epoch FROM end_time - start_time))",
).Where("event_type = ?", "QUEUED").
Where("end_time >= CURRENT_DATE AND allocation_id IN (?) ", subq).
Scan(context.TODO(), &today)
if err != nil {
return nil, errors.Wrap(err, "error fetching average queued time")
}
res = append(res, &jobv1.AggregateQueueStats{
PeriodStart: time.Now().Format("2006-01-02"),
Seconds: today,
})
return res, nil
}
34 changes: 1 addition & 33 deletions master/internal/rm/kubernetesrm/kubernetes_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,39 +582,7 @@ func (k *ResourceManager) createResourcePoolSummary(
func (k *ResourceManager) fetchAvgQueuedTime(pool string) (
[]*jobv1.AggregateQueueStats, error,
) {
aggregates := []model.ResourceAggregates{}
err := db.Bun().NewSelect().Model(&aggregates).
Where("aggregation_type = ?", "queued").
Where("aggregation_key = ?", pool).
Where("date >= CURRENT_TIMESTAMP - interval '30 days'").
Order("date ASC").Scan(context.TODO())
if err != nil {
return nil, err
}
res := make([]*jobv1.AggregateQueueStats, 0)
for _, record := range aggregates {
res = append(res, &jobv1.AggregateQueueStats{
PeriodStart: record.Date.Format("2006-01-02"),
Seconds: record.Seconds,
})
}
today := float32(0)
subq := db.Bun().NewSelect().TableExpr("allocations").Column("allocation_id").
Where("resource_pool = ?", pool).
Where("start_time >= CURRENT_DATE")
err = db.Bun().NewSelect().TableExpr("task_stats").ColumnExpr(
"avg(extract(epoch FROM end_time - start_time))",
).Where("event_type = ?", "QUEUED").
Where("end_time >= CURRENT_DATE AND allocation_id IN (?) ", subq).
Scan(context.TODO(), &today)
if err != nil {
return nil, err
}
res = append(res, &jobv1.AggregateQueueStats{
PeriodStart: time.Now().Format("2006-01-02"),
Seconds: today,
})
return res, nil
return rm.FetchAvgQueuedTime(pool)
}

func (k *ResourceManager) getPoolJobStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"strconv"
"testing"
"time"

"github.com/google/uuid"
"github.com/sirupsen/logrus"
Expand All @@ -21,6 +22,8 @@ import (
"github.com/determined-ai/determined/master/internal/config"
"github.com/determined-ai/determined/master/internal/mocks"
"github.com/determined-ai/determined/master/internal/rm/tasklist"
"github.com/determined-ai/determined/master/internal/sproto"
"github.com/determined-ai/determined/master/pkg/cproto"
"github.com/determined-ai/determined/master/pkg/device"
"github.com/determined-ai/determined/master/pkg/model"
"github.com/determined-ai/determined/master/pkg/ptrs"
Expand Down Expand Up @@ -445,6 +448,38 @@ func TestGetSlot(t *testing.T) {
}
}

func TestAssignResourcesTime(t *testing.T) {
taskList := tasklist.New()
groups := make(map[model.JobID]*tasklist.Group)
allocateReq := sproto.AllocateRequest{
JobID: model.JobID("test-job"),
JobSubmissionTime: time.Now(),
SlotsNeeded: 0,
}
groups[allocateReq.JobID] = &tasklist.Group{
JobID: allocateReq.JobID,
}
mockPods := createMockPodsService(make(map[string]*k8sV1.Node), device.CUDA, true)
poolRef := &kubernetesResourcePool{
poolConfig: &config.ResourcePoolConfig{PoolName: "cpu-pool"},
podsService: mockPods,
reqList: taskList,
groups: groups,
allocationIDToContainerID: map[model.AllocationID]cproto.ID{},
containerIDtoAllocationID: map[string]model.AllocationID{},
jobIDToAllocationID: map[model.JobID]model.AllocationID{},
allocationIDToJobID: map[model.AllocationID]model.JobID{},
slotsUsedPerGroup: map[*tasklist.Group]int{},
allocationIDToRunningPods: map[model.AllocationID]int{},
syslog: logrus.WithField("component", "k8s-rp"),
}

poolRef.assignResources(&allocateReq)
resourcesAllocated := poolRef.reqList.Allocation(allocateReq.AllocationID)
require.NotNil(t, resourcesAllocated)
require.False(t, resourcesAllocated.JobSubmissionTime.IsZero())
}

func TestGetResourcePools(t *testing.T) {
expectedName := "testname"
expectedMetadata := map[string]string{"x": "y*y"}
Expand Down
6 changes: 5 additions & 1 deletion master/internal/rm/kubernetesrm/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,11 @@ func (k *kubernetesResourcePool) assignResources(
k.containerIDtoAllocationID[rs.containerID.String()] = req.AllocationID
}

assigned := sproto.ResourcesAllocated{ID: req.AllocationID, Resources: allocations}
assigned := sproto.ResourcesAllocated{
ID: req.AllocationID,
Resources: allocations,
JobSubmissionTime: req.JobSubmissionTime,
}
k.reqList.AddAllocationRaw(req.AllocationID, &assigned)
rmevents.Publish(req.AllocationID, assigned.Clone())

Expand Down
4 changes: 4 additions & 0 deletions master/internal/rm/tasklist/task_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"strings"

"github.com/emirpasic/gods/sets/treeset"
"github.com/sirupsen/logrus"

"github.com/determined-ai/determined/master/internal/sproto"
"github.com/determined-ai/determined/master/pkg/model"
Expand Down Expand Up @@ -93,6 +94,9 @@ func (l *TaskList) AddAllocation(id model.AllocationID, assigned *sproto.Resourc
// AddAllocationRaw adds an allocation for the allocation actor without modifying the
// sproto.AllocateRequest's sproto.SchedulingState.
func (l *TaskList) AddAllocationRaw(id model.AllocationID, assigned *sproto.ResourcesAllocated) {
if assigned != nil && assigned.JobSubmissionTime.IsZero() {
logrus.Warnf("added allocation %s without a job submission time", id)
}
l.allocations[id] = assigned
}

Expand Down
5 changes: 4 additions & 1 deletion master/pkg/model/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ const (
TaskTypeGeneric TaskType = "GENERIC"
// GlobalAccessScopeID represents global permission access.
GlobalAccessScopeID AccessScopeID = 0
// AggregationTypeQueued is the type of aggregation for queued tasks.
AggregationTypeQueued = "queued"
)

// TaskLogVersion is the version for our log-storing scheme. Useful because changing designs
Expand Down Expand Up @@ -200,7 +202,8 @@ type TaskStats struct {

// ResourceAggregates is the model for resource_aggregates in the database.
type ResourceAggregates struct {
Date *time.Time
Date *time.Time
// AggregationType is the type of aggregation. E.g. "total", "queued", "resource_pool", "username"
AggregationType string
AggregationKey string
Seconds float32
Expand Down

0 comments on commit f03a8a8

Please sign in to comment.