Skip to content

Commit

Permalink
feat(compute): extracts module config from req
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 committed Oct 10, 2024
1 parent 46ea8c6 commit 515b739
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 36 deletions.
51 changes: 18 additions & 33 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ import (
capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"
)

const (
CapabilityIDCompute = "custom_compute@1.0.0"

binaryKey = "binary"
configKey = "config"
binaryKey = "binary"
configKey = "config"
maxMemoryMBsKey = "maxMemoryMBs"
timeoutKey = "timeout"
tickIntervalKey = "tickInterval"
)

var (
Expand Down Expand Up @@ -65,6 +67,8 @@ type Compute struct {
log logger.Logger
registry coretypes.CapabilitiesRegistry
modules *moduleCache

transformer ConfigTransformer
}

func (c *Compute) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
Expand All @@ -91,34 +95,29 @@ func copyRequest(req capabilities.CapabilityRequest) capabilities.CapabilityRequ
func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
copied := copyRequest(request)

binary, err := c.popBytesValue(copied.Config, binaryKey)
if err != nil {
return capabilities.CapabilityResponse{}, fmt.Errorf("invalid request: %w", err)
}

config, err := c.popBytesValue(copied.Config, configKey)
cfg, err := c.transformer.Transform(copied.Config, WithLogger(c.log))
if err != nil {
return capabilities.CapabilityResponse{}, fmt.Errorf("invalid request: %w", err)
return capabilities.CapabilityResponse{}, fmt.Errorf("invalid request: could not transform config: %w", err)
}

id := generateID(binary)
id := generateID(cfg.Binary)

m, ok := c.modules.get(id)
if !ok {
mod, err := c.initModule(id, binary, request.Metadata.WorkflowID, request.Metadata.ReferenceID)
mod, err := c.initModule(id, cfg.ModuleConfig, cfg.Binary, request.Metadata.WorkflowID, request.Metadata.ReferenceID)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

m = mod
}

return c.executeWithModule(m.module, config, request)
return c.executeWithModule(m.module, cfg.Config, request)
}

func (c *Compute) initModule(id string, binary []byte, workflowID, referenceID string) (*module, error) {
func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, workflowID, referenceID string) (*module, error) {
initStart := time.Now()
mod, err := host.NewModule(&host.ModuleConfig{Logger: c.log}, binary)
mod, err := host.NewModule(cfg, binary)
if err != nil {
return nil, fmt.Errorf("failed to instantiate WASM module: %w", err)
}
Expand All @@ -133,21 +132,6 @@ func (c *Compute) initModule(id string, binary []byte, workflowID, referenceID s
return m, nil
}

func (c *Compute) popBytesValue(m *values.Map, key string) ([]byte, error) {
v, ok := m.Underlying[key]
if !ok {
return nil, fmt.Errorf("could not find %q in map", key)
}

vb, ok := v.(*values.Bytes)
if !ok {
return nil, fmt.Errorf("value is not bytes: %q", key)
}

delete(m.Underlying, key)
return vb.Underlying, nil
}

func (c *Compute) executeWithModule(module *host.Module, config []byte, req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
executeStart := time.Now()
capReq := capabilitiespb.CapabilityRequestToProto(req)
Expand Down Expand Up @@ -204,9 +188,10 @@ func (c *Compute) Close() error {

func NewAction(log logger.Logger, registry coretypes.CapabilitiesRegistry) *Compute {
compute := &Compute{
log: logger.Named(log, "CustomCompute"),
registry: registry,
modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute, 3),
log: logger.Named(log, "CustomCompute"),
registry: registry,
modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute, 3),
transformer: NewTransformer(),
}
return compute
}
6 changes: 3 additions & 3 deletions core/capabilities/compute/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (t *transformer) Transform(in *values.Map, opts ...func(*ParsedConfig)) (*P
return nil, NewInvalidRequestError(err)
}

maxMemoryMBs, err := popOptionalValue[int64](in, "maxMemoryMBs")
maxMemoryMBs, err := popOptionalValue[int64](in, maxMemoryMBsKey)
if err != nil {
return nil, NewInvalidRequestError(err)
}
Expand All @@ -44,7 +44,7 @@ func (t *transformer) Transform(in *values.Map, opts ...func(*ParsedConfig)) (*P
MaxMemoryMBs: maxMemoryMBs,
}

timeout, err := popOptionalValue[string](in, "timeout")
timeout, err := popOptionalValue[string](in, timeoutKey)
if err != nil {
return nil, NewInvalidRequestError(err)
}
Expand All @@ -58,7 +58,7 @@ func (t *transformer) Transform(in *values.Map, opts ...func(*ParsedConfig)) (*P
mc.Timeout = &td
}

tickInterval, err := popOptionalValue[string](in, "tickInterval")
tickInterval, err := popOptionalValue[string](in, tickIntervalKey)
if err != nil {
return nil, NewInvalidRequestError(err)
}
Expand Down

0 comments on commit 515b739

Please sign in to comment.