diff --git a/api/process.capnp b/api/process.capnp index 15d60b40..67970299 100644 --- a/api/process.capnp +++ b/api/process.capnp @@ -9,13 +9,13 @@ $Go.import("github.com/wetware/pkg/api/process"); interface Executor { # Executor has the ability to create and run WASM processes given the # WASM bytecode. - exec @0 (bytecode :Data, bootstrapClient :Capability) -> (process :Process); + exec @0 (bytecode :Data, ppid :UInt32, bootstrapClient :Capability) -> (process :Process); # Exec creates an runs a process from the provided bytecode. Optionally, a # capability can be passed through the `cap` parameter. This capability will # be available at the process bootContext. # # The Process capability is associated to the created process. - execCached @1 (cid :Text, bootstrapClient :Capability) -> (process :Process); + execCached @1 (cid :Text, ppid :UInt32, bootstrapClient :Capability) -> (process :Process); # Same as Exec, but the bytecode is directly from the BytecodeRegistry. # Provides a significant performance improvement for medium to large # WASM streams. diff --git a/api/process/process.capnp.go b/api/process/process.capnp.go index ee30a868..9fd3d540 100644 --- a/api/process/process.capnp.go +++ b/api/process/process.capnp.go @@ -27,7 +27,7 @@ func (c Executor) Exec(ctx context.Context, params func(Executor_exec_Params) er }, } if params != nil { - s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 2} + s.ArgsSize = capnp.ObjectSize{DataSize: 8, PointerCount: 2} s.PlaceArgs = func(s capnp.Struct) error { return params(Executor_exec_Params(s)) } } @@ -47,7 +47,7 @@ func (c Executor) ExecCached(ctx context.Context, params func(Executor_execCache }, } if params != nil { - s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 2} + s.ArgsSize = capnp.ObjectSize{DataSize: 8, PointerCount: 2} s.PlaceArgs = func(s capnp.Struct) error { return params(Executor_execCached_Params(s)) } } @@ -229,12 +229,12 @@ type Executor_exec_Params capnp.Struct const Executor_exec_Params_TypeID = 0xf20b3dea95929312 func NewExecutor_exec_Params(s *capnp.Segment) (Executor_exec_Params, error) { - st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 2}) + st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 8, PointerCount: 2}) return Executor_exec_Params(st), err } func NewRootExecutor_exec_Params(s *capnp.Segment) (Executor_exec_Params, error) { - st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 2}) + st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 8, PointerCount: 2}) return Executor_exec_Params(st), err } @@ -283,6 +283,14 @@ func (s Executor_exec_Params) SetBytecode(v []byte) error { return capnp.Struct(s).SetData(0, v) } +func (s Executor_exec_Params) Ppid() uint32 { + return capnp.Struct(s).Uint32(0) +} + +func (s Executor_exec_Params) SetPpid(v uint32) { + capnp.Struct(s).SetUint32(0, v) +} + func (s Executor_exec_Params) BootstrapClient() capnp.Client { p, _ := capnp.Struct(s).Ptr(1) return p.Interface().Client() @@ -306,7 +314,7 @@ type Executor_exec_Params_List = capnp.StructList[Executor_exec_Params] // NewExecutor_exec_Params creates a new list of Executor_exec_Params. func NewExecutor_exec_Params_List(s *capnp.Segment, sz int32) (Executor_exec_Params_List, error) { - l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 0, PointerCount: 2}, sz) + l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 8, PointerCount: 2}, sz) return capnp.StructList[Executor_exec_Params](l), err } @@ -412,12 +420,12 @@ type Executor_execCached_Params capnp.Struct const Executor_execCached_Params_TypeID = 0xb9b9c4df47b44962 func NewExecutor_execCached_Params(s *capnp.Segment) (Executor_execCached_Params, error) { - st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 2}) + st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 8, PointerCount: 2}) return Executor_execCached_Params(st), err } func NewRootExecutor_execCached_Params(s *capnp.Segment) (Executor_execCached_Params, error) { - st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 2}) + st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 8, PointerCount: 2}) return Executor_execCached_Params(st), err } @@ -471,6 +479,14 @@ func (s Executor_execCached_Params) SetCid(v string) error { return capnp.Struct(s).SetText(0, v) } +func (s Executor_execCached_Params) Ppid() uint32 { + return capnp.Struct(s).Uint32(0) +} + +func (s Executor_execCached_Params) SetPpid(v uint32) { + capnp.Struct(s).SetUint32(0, v) +} + func (s Executor_execCached_Params) BootstrapClient() capnp.Client { p, _ := capnp.Struct(s).Ptr(1) return p.Interface().Client() @@ -494,7 +510,7 @@ type Executor_execCached_Params_List = capnp.StructList[Executor_execCached_Para // NewExecutor_execCached_Params creates a new list of Executor_execCached_Params. func NewExecutor_execCached_Params_List(s *capnp.Segment, sz int32) (Executor_execCached_Params_List, error) { - l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 0, PointerCount: 2}, sz) + l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 8, PointerCount: 2}, sz) return capnp.StructList[Executor_execCached_Params](l), err } @@ -1808,64 +1824,66 @@ func (f BytecodeCache_has_Results_Future) Struct() (BytecodeCache_has_Results, e return BytecodeCache_has_Results(p.Struct()), err } -const schema_9a51e53177277763 = "x\xda\xacT]\x88\x1bU\x14>\xe7\xde;\x99\x88I" + - "\xc3\xcd\xd4\xadYZ\xb51\x8b\xba\xb41a\x15lQ" + - "\x13w)\xcb.\x82\xb9\xc5}Yiq:\x19\x9a`" + - "\xb6\x09\x99\x09\xd9\x82\"\x0aR|\xf0A\xad\x08ZD" + - "\x84\xd2}\xf0\xa7\xaeU\xd8J}P\xab\xaf\xb2U\x04" + - "\xa5\x88\xd5\xf5A\xb1\xf8\xd7\x85\x828ro2\xc9\xa4" + - "It\xd1\xbe\x84\x0c\xf7\xbb\xdf\xf9\xcew\xbe{2\x16" + - "\xc9\xb3l4\xa7\x03\x11\x0fj!\xef\x83{\xd3\xe3\xb1" + - "\xf8{\xcf\x02\xdf\x8a\x00\x1a\xea\x00\x13O\xd1\xdd\x08h" + - "y\xae\xc5^@\x14a\xaa\x01t\x0cG?j<" + - ";\x0e\x84\x8f\xe9\xd8\x8d'\xfa\xcf\x82'\xe6\x81p\xae" + - "\xc7\xa4\xb8\xc5" + - "j\xd5u\xdc\xba\x89\xb5\xa9J\xd9>\xe4\x02\xc6\x19\x05" + - "\xc4x\xa0>\x1dP_5\xa4W\xdc\xff1\x9c\x81i" + - "*\x99\xce\xa04%\xbb\xcc=\xf2\xaf\xe0*\xb4?\x1f" + - ")W*-\xcb\xa9\xeb\x0c\x035\xcd\xb2\xdb\x01\x0d\x9b" + - "\x8b\xbdXv\xa7\xdas\x09\x03\xc1p\x7f\xba\x0a\xf5\x1b" + - "\xd5w7\\\xfeKB\x7f\xd7\x04\xc2\xe5\xef\x00\xf47" + - "\x14O\xc8\xb3\xa8\x1e\x93z\xf2\x18\x93\xda{CE\xae" + - "4J\xb7J\xb6\xac\x16Q\xd5\xfc\xed\x8e\xfe\x8e\xe0\"" + - "\x09\x84\xef\x91\xd5\xfc\xad\x80\xfeN\xe6\xbb\xe4\xd9N\x1d" + - "Ig\x87\xa2\xbf\x19\xf8vyv\x9d\xae\xd7\x1an\x1e" + - "\xf5\x83\xb6\xfc-\x99N\xaf\x9aAV\xab\x81\xa1\xf3\xcf" + - "\x89\x91 \xba\xe0\x04\xa3*\x9d\xbe\x95\xa2\xb8#\x10\xd5" + - "\xac\x8cj\x86\xa2\xb8{\xc8\xb3\xd8Hd\xd9\xb0M5" + - "\xe8\x1dn0\\\xfd\x8f\xfa?\x04\x95\x0d\x0b\xfd\xbf\x09" + - "+\x99\x0e\"\x10\xc4\xe1\xa9W\x81n\x8f\xe2\xef\x00\x00" + - "\x00\xff\xff\xee;)\xf5" +const schema_9a51e53177277763 = "x\xda\xacU]h\x1cU\x18\xfd\xce\xbd3;\x11\xb3" + + "]\xeeNLL\xa9?\x8d\x09b\xd05K\xf0\xc1\x82" + + "fM\x0c1!\xc5\xbd\xa5\x01\x8d\xb6\xb8\x9d\x1d\xba\x8b" + + "\xdb\xee\xb23KRP\xa4\x82\x14\x1f|\x88VD\xad" + + "\"\x824\x82\xd86\x16!\x15}P\xab\xaf\x92\x0a\xa2" + + "\"\x92j\x05\x15\x8a\xf5'\x10\x10G\xee\xec\xce\xeel" + + "\xb2\xebo_\x96\x1d\xee\x99s\xcfw\xbe\xf3}3t" + + "\x98\xa5\xb4dt\xc4 &w\xeb\x11\xef\xbd;\x13\x83" + + "\xb1\xf8\xdb\x0b$\xb6\x81H\x87A4\xfc\x04\xdf\x01\x82" + + "\xf9\x14\x1f!x/\x1f\xfae\xcb\xec\xd1\xad\xaf\x92\xe8" + + "\xa9\x03N\xf1]\x0a\xf0\xae\x0fX\xa9\x1c~\xed\xcc\xde" + + "\xc4\x09\x12[\xb8g\xcd\xdd8\x97\xbc _ \x82\xf9" + + "\x15?c^\xe0\x06\x91\xb9\xca\x8f\x98\xb7i\x06\x91\xf7" + + "+w\x1f8{\xe9\xa3\xa5\xf0u\xd7h\xa3\x8am@" + + "Sl\xfb&OO|\xfd\xe1\xf22\xc9\x1e(\x04S" + + "\x88ImJ!f\xb4\x13\x04\xef\xc5OV\x8e\x9f\xec" + + "\xbe\xf7\x9d\xb0\xa0KZ\x9f\x02\xac\xfb\x143\x7f\x1c\x9b" + + "\\\xec\x99}?|G\xaf\xee\x97\xb4]W\x80\x8b\xcf" + + "}\xea,\xe5\x92gIt\x83H\xe9\x1a\xbeK\xdf\x0a" + + "\xd2\xbc7&\x86\xfa_9=\xf8\x19\xc9n\x04G7" + + "\xa9#\x98I\xff\xd5\xe1\xc8\xc0\xe2\xb9\x9fn\xf8bS" + + "\xb13\xfa\x92\xb9GW\xc5\xde\xaf\x1f1_W\xff<" + + "\x17W\xac\xf1o\xa7\xbf\xdb\x04^\xd0\xcf\x9b/\xf9\xe0" + + "\xe7\xf5\x8fM\x11Q\xe0\x89=\xb7\x9e\xec}\xf3\xf8\xc5" + + "\x90\xa8u=\xaeD\xc5\x9fy\xfa\xd9\x1f\xee\xb8\xf2\xe7" + + "&KV\xab\xaa\xbe\xd7\x95%\xb9\xe5\xeb\x7f?\xf7\xe8" + + "\xb5\xbf\x85+~2\xe2\xbb\xba\x10Q\xb2\x1fY\xfc\xb1" + + "r,~t-\x0c8\x15\xf1-Y\xf6\x01\xb1\xbdk" + + "\x1f\xac\xdc\xfc\xd0:\xc9m\xf5\xc2?\xaf2\xac\xfa\x80" + + "o\xde\xfa\xb2\xe3\xfcT~=$\x0fF\x1c4\xe4\x95" + + "\xcaE\xcbv\x9c\x04\xb72\xa5\x83\xa5\x1d\xa3\x87\\\xdb" + + "*f\xed\xb1\x8c\x95\xb3\x13\xa5\x8a\xdb?\x92\xce\x943" + + "\x07\x1c\xa9q\x8dH\x03\x91\x88N\x11\xc9N\x0ey5" + + "\x83\xb7\xaf\xf6\x02\x11!J\x0cQB\x9dS\xabr\x8e" + + "\xcf\xdbV\xc5-\x96\x13\xf6\xbcm\xf9\xc4\xd9\xfe]\xb6" + + "S)\xb8h\xa2\x1d%\x92\x1d\x1c\xb2\x8b\xe1\xb1\x1a\x05" + + "D\xa3g\x04\x88\x109\x02\xf2\x91*{\x1a\x90\x1d\\" + + "'\xaa;\x8e k\"9HL\x0c\x18h\x04\x14\xc1" + + "`\x88\xdeYbB\x181%.\x05/\xd0H\xdc\xce" + + "\xa6\x90\xc6\xa6j\x9a\x1d\xdao\xbb\xb5Z\x1c\xba\xcc\x16" + + "U}\xf7y\x02\xda\xf1>\"\x99\xe2\x90\xd3\x0c@\x97" + + "\xea\xb5\x98\x1c$\x92ws\xc84\x83`\xe8\x02#\x12" + + ";\x1f'\x92\xd3\x1c\xf2>\x06\xc3\xcag\xd1I\x0c\x9d" + + "\x84X\xa9\x94\xcf\xa2\x83\x18:\xd4\xb4\x16\x8b\xae\xe3\x96" + + "3(\x8d\x15\xf2\xf6A\x97\x10\xd78\x01\xf1\x90@\xde" + + "B\xa0_\xb1Qp\xffG\xf7Z\xc6-\x97qZ\xc5" + + "\xad\xaf\xc1\x1c\xaee#W\xba\xf6\xf8p\xbeP\xa8\xf6" + + "\x84\xbbN;\xd0\\&\xef\xd6A\xed\x1ag\xcf\xe7\xdd" + + "\xb1Z\xe3\x02\xcf6\xc4/]\xbe\xce\x7fn\xa4/\x18" + + "5\x04\xdb(\x94\xbe`K \xd8a\xa2W\x9dE\x8d" + + "\x98\xd2\x93BLioN\x1d\xdbh\x94a\xe5lu" + + "[\xa7\x7f[\xf0\x01@\xb0D\x84\xec#&\xc6\xd5m" + + "\xc1\xda@\xb0\xb6\xc5\xed\xea\xec\x16\x03\xac\xbee\x11\xac" + + "\x0e\xb1]\x9d]e\x18\xa5\x8a\x9b\x82\xb1\xdfV\xbf\xb9" + + "\x8c\xd3\xac\xa6\x95\xd5~\xc3\xe0\xfcub\x14\x88\x1fp" + + "\xc2Y\x9ej\xe46\xc8\xf2N\x95\xe5{8\xe4\xeeP" + + "\x96\xa5\xcar\x9aC>\xd8z\x96\xfe}\xa6\xb5v\xbb" + + "\xae\xd5$\xff\xc3\xf4m^\x0b\xff!\xc9Z\xbb\xa9\xf8" + + ";a\xb9\x8c\x03\x10\x03\xda\x8f\x85\x9f\xf8Z\xaf\xfe\x0c" + + "\x00\x00\xff\xff1m5_" func RegisterSchema(reg *schemas.Registry) { reg.Register(&schemas.Schema{ diff --git a/cap/csp/executor.go b/cap/csp/executor.go index c3d4dc9a..990cbcc2 100644 --- a/cap/csp/executor.go +++ b/cap/csp/executor.go @@ -35,9 +35,33 @@ func (ex Executor) Release() { capnp.Client(ex).Release() } -func (ex Executor) Exec(ctx context.Context, src []byte) (Proc, capnp.ReleaseFunc) { - f, release := api.Executor(ex).Exec(ctx, func(ps api.Executor_exec_Params) error { - return ps.SetBytecode(src) - }) +// Exec spawns a new process from WASM bytecode bc. If the caller is a WASM process +// spawned in this same executor, it should use its PID as ppid to mark the +// new process as a subprocess. +func (ex Executor) Exec(ctx context.Context, bc []byte, ppid uint32, client capnp.Client) (Proc, capnp.ReleaseFunc) { + f, release := api.Executor(ex).Exec(ctx, + func(ps api.Executor_exec_Params) error { + if err := ps.SetBytecode(bc); err != nil { + return err + } + + ps.SetPpid(ppid) + return ps.SetBootstrapClient(client) + }) + return Proc(f.Process()), release +} + +// ExecFromCache behaves the same way as Exec, but expects the bytecode to be already +// cached at the executor. +func (ex Executor) ExecFromCache(ctx context.Context, cid string, ppid uint32, client capnp.Client) (Proc, capnp.ReleaseFunc) { + f, release := api.Executor(ex).ExecCached(ctx, + func(ps api.Executor_execCached_Params) error { + if err := ps.SetCid(cid); err != nil { + return err + } + + ps.SetPpid(ppid) + return ps.SetBootstrapClient(client) + }) return Proc(f.Process()), release } diff --git a/cap/csp/proc.go b/cap/csp/process.go similarity index 66% rename from cap/csp/proc.go rename to cap/csp/process.go index 0bed21eb..ce6e7321 100644 --- a/cap/csp/proc.go +++ b/cap/csp/process.go @@ -25,12 +25,26 @@ func (p Proc) Release() { capnp.Client(p).Release() } -// func (p Proc) Kill(ctx context.Context) error { -// f, release := api.Process(p).Kill(ctx, nil) -// defer release() +// Kill a process and any sub processes it might have spawned. +func (p Proc) Kill(ctx context.Context) error { + f, release := api.Process(p).Kill(ctx, nil) + defer release() + + select { + case <-f.Done(): + case <-ctx.Done(): + } + + if ctx.Err() != nil { + return ctx.Err() + } -// return casm.Future(f).Await(ctx) -// } + _, err := f.Struct() + if err != nil { + return err + } + return nil +} func (p Proc) Wait(ctx context.Context) error { f, release := api.Process(p).Wait(ctx, nil) diff --git a/cap/csp/server/executor.go b/cap/csp/server/executor.go index 46764049..8bb3aee1 100644 --- a/cap/csp/server/executor.go +++ b/cap/csp/server/executor.go @@ -26,8 +26,11 @@ import ( // Runtime is the main Executor implementation. It spawns WebAssembly- // based processes. The zero-value Runtime panics. type Runtime struct { - Runtime wazero.Runtime - Cache BytecodeCache + Runtime wazero.Runtime + Cache BytecodeCache + Tree ProcTree + + // HostModule is unused for now. HostModule *wazergo.ModuleInstance[*proc.Module] } @@ -51,8 +54,14 @@ func (r Runtime) Exec(ctx context.Context, call api.Executor_exec) error { r.Cache.put(bc) client := call.Args().BootstrapClient() + ppid := r.Tree.PpidOrInit(call.Args().Ppid()) + args := procArgs{ + bc: bc, + client: client, + ppid: ppid, + } - p, err := r.mkproc(ctx, bc, client) + p, err := r.mkproc(ctx, args) if err != nil { return err } @@ -77,8 +86,14 @@ func (r Runtime) ExecCached(ctx context.Context, call api.Executor_execCached) e } client := call.Args().BootstrapClient() + ppid := r.Tree.PpidOrInit(call.Args().Ppid()) + args := procArgs{ + bc: bc, + client: client, + ppid: ppid, + } - p, err := r.mkproc(ctx, bc, client) + p, err := r.mkproc(ctx, args) if err != nil { return err } @@ -86,8 +101,10 @@ func (r Runtime) ExecCached(ctx context.Context, call api.Executor_execCached) e return res.SetProcess(api.Process_ServerToClient(p)) } -func (r Runtime) mkproc(ctx context.Context, bc []byte, client capnp.Client) (*process, error) { - mod, err := r.mkmod(ctx, bc, client) +func (r Runtime) mkproc(ctx context.Context, args procArgs) (*process, error) { + pid := r.Tree.NextPid() + + mod, err := r.mkmod(ctx, args) if err != nil { return nil, err } @@ -97,19 +114,21 @@ func (r Runtime) mkproc(ctx context.Context, bc []byte, client capnp.Client) (*p return nil, errors.New("ww: missing export: _start") } - done, cancel := r.spawn(fn) - return &process{ - done: done, - cancel: cancel, - }, nil + proc := r.spawn(fn, pid) + + // Register new process. + r.Tree.Insert(proc.pid, args.ppid) + r.Tree.AddToMap(proc.pid, proc) + + return proc, nil } -func (r Runtime) mkmod(ctx context.Context, bc []byte, client capnp.Client) (wasm.Module, error) { - name := csp.ByteCode(bc).String() +func (r Runtime) mkmod(ctx context.Context, args procArgs) (wasm.Module, error) { + name := csp.ByteCode(args.bc).String() // TODO(perf): cache compiled modules so that we can instantiate module // instances for concurrent use. - compiled, err := r.Runtime.CompileModule(ctx, bc) + compiled, err := r.Runtime.CompileModule(ctx, args.bc) if err != nil { return nil, err } @@ -146,32 +165,46 @@ func (r Runtime) mkmod(ctx context.Context, bc []byte, client capnp.Client) (was return nil, err } - go ServeModule(addr, client) + go ServeModule(addr, args.client) return mod, nil } -func (r Runtime) spawn(fn wasm.Function) (<-chan execResult, context.CancelFunc) { - out := make(chan execResult, 1) +func (r Runtime) spawn(fn wasm.Function, pid uint32) *process { + done := make(chan execResult, 1) // NOTE: we use context.Background instead of the context obtained from the // rpc handler. This ensures that a process can continue to run after // the rpc handler has returned. Note also that this context is bound // to the application lifetime, so processes cannot block a shutdown. ctx, cancel := context.WithCancel(context.Background()) + killFunc := r.Tree.Kill + proc := &process{ + pid: pid, + killFunc: killFunc, + done: done, + cancel: cancel, + } go func() { - defer close(out) - defer cancel() + defer close(done) + defer proc.killFunc(proc.pid) + + vs, err := fn.Call(ctx) - vs, err := fn.Call(wazergo.WithModuleInstance(ctx, r.HostModule)) - out <- execResult{ + done <- execResult{ Values: vs, Err: err, } }() - return out, cancel + return proc +} + +type procArgs struct { + bc []byte + client capnp.Client + ppid uint32 } // ServeModule ensures the host side of the TCP connection with addr=addr diff --git a/cap/csp/server/process.go b/cap/csp/server/process.go index 4a83b09f..e3674115 100644 --- a/cap/csp/server/process.go +++ b/cap/csp/server/process.go @@ -9,17 +9,20 @@ import ( // process is the main implementation of the Process capability. type process struct { - done <-chan execResult - cancel context.CancelFunc - result execResult + pid uint32 + done <-chan execResult + killFunc func(uint32) // killFunc must call cancel() + cancel context.CancelFunc + result execResult } -func (p process) Kill(context.Context, api.Process_kill) error { - p.cancel() +func (p *process) Kill(ctx context.Context, call api.Process_kill) error { + p.killFunc(p.pid) return nil } func (p *process) Wait(ctx context.Context, call api.Process_wait) error { + call.Go() select { case res, ok := <-p.done: if ok { diff --git a/cap/csp/server/proctree.go b/cap/csp/server/proctree.go new file mode 100644 index 00000000..10ff0780 --- /dev/null +++ b/cap/csp/server/proctree.go @@ -0,0 +1,316 @@ +package csp_server + +import ( + "context" + "fmt" + "sync/atomic" + + api "github.com/wetware/pkg/api/process" +) + +const INIT_PID = 1 + +// ProcTree represents the process tree of an executor. +// It is represented a binary tree, in which the left branch of a node +// represents a child process, while the right branch represents a +// sibling process (shares the same parent). +// TODO: thread safety. +type ProcTree struct { + // TODO move context out of tree + Ctx context.Context + // PIDC is a couter that increases to assign new PIDs. + PIDC AtomicCounter + // TPC keeps track of the number of processes in the tree. + TPC AtomicCounter + // Root of the process tree. + Root *ProcNode + // Map of processes associated to their PIDs. MUST be initialized. + Map map[uint32]api.Process_Server +} + +// NewProcTree is the default constuctor for ProcTree, but it may +// also be maually constructed. +func NewProcTree(ctx context.Context) ProcTree { + return ProcTree{ + Ctx: ctx, + PIDC: NewAtomicCounter(INIT_PID), + TPC: NewAtomicCounter(1), + Root: &ProcNode{Pid: INIT_PID}, + Map: make(map[uint32]api.Process_Server), + } +} + +// ppidOrInit checks for a process with pid=ppid and returns +// ppid if found, INIT_PID otherwise. +func (pt *ProcTree) PpidOrInit(ppid uint32) uint32 { + if ppid == 0 { + return INIT_PID + } else { + // Default INIT_PID as a parent. + if _, ok := pt.Map[ppid]; !ok { + return INIT_PID + } + } + return ppid +} + +// NextPid returns the next avaiable PID and ensures it does not collide +// with any existing processes. +func (pt *ProcTree) NextPid() uint32 { + pid := pt.PIDC.Inc() + _, col := pt.Map[pid] + for col { + pid := pt.PIDC.Inc() + _, col = pt.Map[pid] + } + return pid +} + +// Kill recursively kills a process and it's children +func (pt *ProcTree) Kill(pid uint32) { + // Can't kill root process. + if pid == pt.Root.Pid { + return + } + + n := pt.Pop(pid) + p, ok := pt.Map[pid] + if ok && p != nil { + pt.TPC.Dec() + stop(pt.Ctx, p) + delete(pt.Map, pid) + } + + // Kill all subprocesses. + if n != nil { + pt.kill(n.Left) + } +} + +// kill recursively kills process n, its siblings and children +func (pt *ProcTree) kill(n *ProcNode) { + if n == nil { + return + } + p, ok := pt.Map[n.Pid] + if ok && p != nil { + pt.TPC.Dec() + stop(pt.Ctx, p) + delete(pt.Map, n.Pid) + } + pt.kill(n.Left) + pt.kill(n.Right) +} + +// stop a process in a specific way based on its implementation type. +func stop(ctx context.Context, p api.Process_Server) { + // *process p calls this function from its Kill implementation + // thus we must avoid infinite recursivity. The process is + // killed with p.cancel() instead. + if ps, ok := p.(*process); ok { + fmt.Printf("killing process %d\n", p.(*process).pid) + ps.cancel() + } else { + // Generic implementation. + p.Kill(ctx, api.Process_kill{}) + } +} + +// Pop removes the node with PID=pid and replaces it with a sibling +// in the process tree. +func (pt ProcTree) Pop(pid uint32) *ProcNode { + // Root proc. + if pid == pt.Root.Pid { + return nil + } + + // Find the parent. + parent := pt.FindParent(pid) + + // Orphaned node. + if parent == nil { + return pt.Find(pid) + } + + child := parent.Left + // This case should never occur if FindParent is correct. + if child == nil { + return nil + } + + // Child is immediate left branch. + if child.Pid == pid { + result := child + parent.Left = child.Right + return result + } + + // Descend throught the rightest branch. + sibling := child.Right + for sibling != nil && sibling.Pid != pid { + child, sibling = sibling, sibling.Right + } + + // Bridge left and right siblings. + if sibling != nil { + child.Right = sibling.Right + } + + return sibling +} + +// Find returns a node in the process tree with PID=pid. nil if not found. +func (pt ProcTree) Find(pid uint32) *ProcNode { + return find(pt.Root, pid) +} + +// FindParent returns the parent of the process with PID=pid. nil if not found. +func (pt ProcTree) FindParent(pid uint32) *ProcNode { + n, _ := findParent(pt.Root, pid) + return n +} + +// Insert creates a node with PID=pid as a child of PID=ppid. +func (pt ProcTree) Insert(pid, ppid uint32) error { + err := insert(pt.Root, pid, ppid) + if err == nil { + pt.TPC.Inc() + } + return err +} + +// find performs an In-Order Depth First Search of the tree. +func find(n *ProcNode, pid uint32) *ProcNode { + // Corner case. + if n == nil || n.Pid == pid { + return n + } + + // Explore left branch. + x := find(n.Left, pid) + if x != nil { + return x + } + + // Explore right branch. + return find(n.Right, pid) +} + +// findParent does a Depth First Search of the tree until +// finding the node with PID=pid, then returns it's parent node. +func findParent(n *ProcNode, pid uint32) (*ProcNode, bool) { + // Corner case, defaults to being the right-branch node. + if n == nil || n.Pid == pid { + return nil, n != nil + } + + // Explore left branch. + if n.Left != nil { + x, childInRight := findParent(n.Left, pid) + // Node was a children or grandchildren. + if x != nil { + return x, false + } else { + // Node was a sibling of right. + if childInRight { + return n, false + } + } + } + + // Explore immediate sibling. + return findParent(n.Right, pid) +} + +// Insert adds a new node PID to root as a child of PPID. +// If PPID has no children PID will be the immediate child. +// Otherwise it will iterate over the siblings and add it at the end of the chain. +func insert(root *ProcNode, pid, ppid uint32) error { + n := &ProcNode{ + Pid: pid, + } + + parent := find(root, ppid) + if parent == nil { + return fmt.Errorf( + "could not insert (pid=%d), parent (ppid=%d) no longer alive", + pid, + ppid, + ) + } + if parent.Left == nil { + parent.Left = n + return nil + } + + next := parent.Left + for next.Right != nil { + next = next.Right + } + next.Right = &ProcNode{ + Pid: pid, + } + return nil +} + +func (pt ProcTree) AddToMap(pid uint32, p api.Process_Server) { + pt.Map[pid] = p +} + +// Trim all orphaned branches. +func (pt ProcTree) Trim(ctx context.Context) { + for pid := range pt.Map { + if pt.Find(pid) == nil { + pt.Kill(pid) + } + } +} + +// ProcNode represents a process in the process tree. +type ProcNode struct { + // Pid contais the Process ID. + Pid uint32 + // Left contains a child process. + Left *ProcNode + // Right contains a sibling process. + Right *ProcNode +} + +func (n *ProcNode) String() string { + var left, right string + if n.Left != nil { + left = fmt.Sprint(n.Left.Pid) + } else { + left = "nil" + } + if n.Right != nil { + right = fmt.Sprint(n.Right.Pid) + } else { + right = "nil" + } + return fmt.Sprintf("{pid=%d, left=%s, right=%s}", n.Pid, left, right) +} + +// AtomicCounter is an atomic counter that increases the +type AtomicCounter struct { + n *uint32 +} + +func NewAtomicCounter(start uint32) AtomicCounter { + return AtomicCounter{n: &start} +} + +// Increase by 1. +func (p AtomicCounter) Inc() uint32 { + return atomic.AddUint32(p.n, 1) +} + +// Decrease by 1. +func (p AtomicCounter) Dec() uint32 { + return atomic.AddUint32(p.n, ^uint32(0)) +} + +// Get current value. +func (p AtomicCounter) Get() uint32 { + return atomic.LoadUint32(p.n) +} diff --git a/cap/csp/server/proctree_test.go b/cap/csp/server/proctree_test.go new file mode 100644 index 00000000..d6c7594f --- /dev/null +++ b/cap/csp/server/proctree_test.go @@ -0,0 +1,254 @@ +package csp_server_test + +import ( + "context" + "math" + "testing" + + api "github.com/wetware/pkg/api/process" + csp "github.com/wetware/pkg/cap/csp/server" +) + +type testProc struct { + pid uint32 + alive bool +} + +func (p *testProc) Kill(context.Context, api.Process_kill) error { + p.alive = false + return nil +} + +func (p *testProc) Wait(ctx context.Context, call api.Process_wait) error { + return nil +} + +func testProcTree() csp.ProcTree { + /* + 0 + | + 1 + / \ + 2 10 + / \ / + 3 6 11 + \ \ + 4 7 + / / \ + 5 8 9 + */ + root := &csp.ProcNode{ + Pid: 0, + Left: &csp.ProcNode{ + Pid: 1, + Left: &csp.ProcNode{ + Pid: 2, + Left: &csp.ProcNode{ + Pid: 3, + Right: &csp.ProcNode{ + Pid: 4, + Left: &csp.ProcNode{ + Pid: 5, + }, + }, + }, + Right: &csp.ProcNode{ + Pid: 6, + Right: &csp.ProcNode{ + Pid: 7, + Left: &csp.ProcNode{ + Pid: 8, + }, + Right: &csp.ProcNode{ + Pid: 9, + }, + }, + }, + }, + Right: &csp.ProcNode{ + Pid: 10, + Left: &csp.ProcNode{ + Pid: 11, + }, + }, + }, + } + + procMap := make(map[uint32]api.Process_Server) + for pid := uint32(0); pid <= 11; pid++ { + procMap[pid] = &testProc{pid: pid, alive: true} + } + + return csp.ProcTree{ + Ctx: context.Background(), + PIDC: csp.NewAtomicCounter(10), + TPC: csp.NewAtomicCounter(10), + Root: root, + Map: procMap, + } +} + +func TestProcTree_Find(t *testing.T) { + pt := testProcTree() + for i := uint32(0); i <= 11; i++ { + n := pt.Find(i) + if n == nil { + t.Fatalf("failed to find node %d", i) + } + if n.Pid != i { + t.Fatalf("found node %d instead of %d", n.Pid, i) + } + } +} + +func TestProcTree_FindParent(t *testing.T) { + // child, parent + matches := [6][2]uint32{ + {8, 7}, + {9, 1}, + {11, 10}, + {3, 2}, + {4, 2}, + {5, 4}, + } + pt := testProcTree() + for _, match := range matches { + c := match[0] + p := pt.FindParent(c) + if p == nil { + t.Fatalf("nil parent for %d", c) + } + e := match[1] + if p.Pid != e { + t.Fatalf("found parent %d for %d but expected %d", p.Pid, c, e) + } + } + c := uint32(math.MaxUint32) + p := pt.FindParent(c) + if p != nil { + t.Fatalf("found parent %d for %d but expected no parent", p.Pid, c) + } +} + +func TestProcTree_Insert(t *testing.T) { + // child, parent, branchof, 0=left 1=right + matches := [4][4]uint32{ + {12, 5, 5, 0}, + {13, 12, 12, 0}, + {13, 1, 9, 1}, + {14, 7, 8, 1}, + } + pt := testProcTree() + for _, match := range matches { + pid, ppid, expectedId, side := match[0], match[1], match[2], match[3] + pt.Insert(pid, ppid) + n := pt.Find(expectedId) + if side == 0 { + if n.Left == nil || n.Left.Pid != pid { + t.Fatalf("failet to insert %d at %d (branch %s)", pid, ppid, n) + } + } else { + if n.Right == nil || n.Right.Pid != pid { + t.Fatalf("failet to insert %d at %d (branch %s)", pid, ppid, n) + } + } + } + c, e := pt.TPC.Get(), uint32(14) + if c != e { + t.Fatalf("expected a process count of %d, got %d", e, c) + } +} + +func TestProcTree_Pop(t *testing.T) { + pt := testProcTree() + parent := pt.FindParent(6) + sibling := pt.Find(2) + child := pt.Find(6) + popped := pt.Pop(6) + if popped.Pid != child.Pid { + t.Fatalf("popped item with PID %d instead of %d", popped.Pid, child.Pid) + } + if sibling.Right.Pid != 7 { + t.Fatalf("new right branch of %d should be 7, not %d", parent.Pid, sibling.Right.Pid) + } + // this test makes me dizzy + parent = sibling.Right + child = parent.Left + if child.Pid != 8 { + t.Fatalf("expected pid 8 got %d", child.Pid) + } + popped = pt.Pop(child.Pid) + if popped.Pid != child.Pid { + t.Fatalf("popped item with PID %d instead of %d", popped.Pid, child.Pid) + } + if parent.Left != nil { + t.Fatalf("left branch of %d should be nil, not %d", sibling.Pid, sibling.Left.Pid) + } +} + +func TestProcTree_Kill(t *testing.T) { + pt := testProcTree() + + mapCopy := make(map[uint32]api.Process_Server) + for k, v := range pt.Map { + mapCopy[k] = v + } + + pt.Kill(1) + if pt.Root.Left == nil || pt.Root.Left.Pid != 10 { + t.Fatalf("expected to find 10 at the left of 0, found %s", pt.Root.Left) + } + killedProcs := []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9} + aliveProcs := []uint32{10, 11} + + for _, pid := range killedProcs { + if _, found := pt.Map[pid]; found { + t.Fatalf("found process %d in map but it should have been deleted", pid) + } + p := mapCopy[pid] + if tp, _ := p.(*testProc); tp.alive { + t.Fatalf("killed process %d is still alive", pid) + } + } + + for _, pid := range aliveProcs { + if _, found := pt.Map[pid]; !found { + t.Fatalf("failed to find process %d in map", pid) + } + p := mapCopy[pid] + if tp, _ := p.(*testProc); !tp.alive { + t.Fatalf("killed process %d should still be alive", pid) + } + } + c, e := pt.TPC.Get(), uint32(10-len(killedProcs)) + if c != e { + t.Fatalf("expected a process count of %d, got %d", e, c) + } +} + +func TestProcTree_Trim(t *testing.T) { + pt := testProcTree() + pt.Pop(1) + pt.Trim(context.TODO()) + + mapCopy := make(map[uint32]api.Process_Server) + for k, v := range pt.Map { + mapCopy[k] = v + } + + killedProcs := []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9} + aliveProcs := []uint32{10, 11} + for _, pid := range aliveProcs { + if _, found := pt.Map[pid]; !found { + t.Fatalf("failed to find process %d in map", pid) + } + p := mapCopy[pid] + if tp, _ := p.(*testProc); !tp.alive { + t.Fatalf("killed process %d should still be alive", pid) + } + } + c, e := pt.TPC.Get(), uint32(10-len(killedProcs)) + if c != e { + t.Fatalf("expected a process count of %d, got %d", e, c) + } +} diff --git a/cmd/ww/cluster/cluster.go b/cmd/ww/cluster/cluster.go new file mode 100644 index 00000000..6660765b --- /dev/null +++ b/cmd/ww/cluster/cluster.go @@ -0,0 +1,116 @@ +package cluster + +import ( + "path" + "runtime" + "time" + + "github.com/libp2p/go-libp2p" + local "github.com/libp2p/go-libp2p/core/host" + quic "github.com/libp2p/go-libp2p/p2p/transport/quic" + "github.com/libp2p/go-libp2p/p2p/transport/tcp" + "github.com/urfave/cli/v2" + + "github.com/wetware/pkg/cap/host" + "github.com/wetware/pkg/system" +) + +var ( + h host.Host + releases *[]func() + closes *[]func() error +) + +var flags = []cli.Flag{ + &cli.StringSliceFlag{ + Name: "addr", + Aliases: []string{"a"}, + Usage: "static bootstrap `ADDR`", + EnvVars: []string{"WW_ADDR"}, + }, + &cli.StringFlag{ + Name: "discover", + Aliases: []string{"d"}, + Usage: "bootstrap discovery `ADDR`", + Value: bootstrapAddr(), + EnvVars: []string{"WW_DISCOVER"}, + }, + &cli.StringFlag{ + Name: "ns", + Usage: "cluster namespace", + Value: "ww", + EnvVars: []string{"WW_NS"}, + }, + &cli.DurationFlag{ + Name: "timeout", + Usage: "dial timeout", + Value: time.Second * 15, + EnvVars: []string{"WW_CLIENT_TIMEOUT"}, + }, +} + +func Command() *cli.Command { + return &cli.Command{ + Name: "cluster", + Usage: "cli client for wetware clusters", + Aliases: []string{"client"}, // TODO(soon): deprecate + Flags: flags, + Subcommands: []*cli.Command{ + run(), + }, + } +} + +func setup() cli.BeforeFunc { + return func(c *cli.Context) (err error) { + *releases = make([]func(), 0) + *closes = make([]func() error, 0) + + h, err := clientHost(c) + if err != nil { + return err + } + *closes = append(*closes, h.Close) + + host, err := system.Boot[host.Host](c, h) + if err != nil { + return err + } + *releases = append(*releases, host.Release) + + return nil + } +} + +func teardown() cli.AfterFunc { + return func(c *cli.Context) (err error) { + for _, close := range *closes { + defer close() + } + for _, release := range *releases { + defer release() + } + return nil + } +} + +func clientHost(c *cli.Context) (local.Host, error) { + return libp2p.New( + libp2p.NoTransports, + libp2p.NoListenAddrs, + libp2p.Transport(tcp.NewTCPTransport), + libp2p.Transport(quic.NewTransport)) +} + +func bootstrapAddr() string { + return path.Join("/ip4/228.8.8.8/udp/8822/multicast", loopback()) +} + +func loopback() string { + switch runtime.GOOS { + case "darwin": + return "lo0" + default: + return "lo" + } +} diff --git a/cmd/ww/cluster/run.go b/cmd/ww/cluster/run.go new file mode 100644 index 00000000..8e1d8575 --- /dev/null +++ b/cmd/ww/cluster/run.go @@ -0,0 +1,70 @@ +package cluster + +import ( + "context" + "io" + "os" + "time" + + capnp "capnproto.org/go/capnp/v3" + "github.com/urfave/cli/v2" +) + +const killTimeout = 30 * time.Second + +func run() *cli.Command { + return &cli.Command{ + Name: "run", + Usage: "run a WASM module on a cluster node", + ArgsUsage: " (defaults to stdin)", + Before: setup(), + After: teardown(), + Action: runAction(), + } +} + +func runAction() cli.ActionFunc { + return func(c *cli.Context) error { + ctx := c.Context + + // Load the name of the entry function and the WASM file containing the module to run + src, err := bytecode(c) + if err != nil { + return err + } + + // Obtain an executor and spawn a process + executor, release := h.Executor(ctx) + defer release() + + client := capnp.Client(h.AddRef()) + proc, release := executor.Exec(ctx, src, 0, client) + defer release() + + waitChan := make(chan error, 1) + go func() { + waitChan <- proc.Wait(ctx) + }() + select { + case err = <-waitChan: + return err + case <-ctx.Done(): + killChan := make(chan error, 1) + go func() { killChan <- proc.Kill(context.Background()) }() + select { + case err = <-killChan: + return err + case <-time.After(killTimeout): + return err + } + } + } +} + +func bytecode(c *cli.Context) ([]byte, error) { + if c.Args().Len() > 0 { + return os.ReadFile(c.Args().First()) // file path + } + + return io.ReadAll(c.App.Reader) // stdin +} diff --git a/cmd/ww/main.go b/cmd/ww/main.go index 0c0ffdfb..4e9abebe 100644 --- a/cmd/ww/main.go +++ b/cmd/ww/main.go @@ -20,7 +20,7 @@ import ( "golang.org/x/exp/slog" ww "github.com/wetware/pkg" - + "github.com/wetware/pkg/cmd/ww/cluster" "github.com/wetware/pkg/cmd/ww/ls" "github.com/wetware/pkg/cmd/ww/run" "github.com/wetware/pkg/cmd/ww/start" @@ -92,6 +92,7 @@ func main() { ls.Command(), run.Command(), start.Command(), + cluster.Command(), }, } diff --git a/rom/internal/main.wasm b/rom/internal/main.wasm index a119dd5b..8e9ce22e 100755 Binary files a/rom/internal/main.wasm and b/rom/internal/main.wasm differ diff --git a/server/executor.go b/server/executor.go index 9bd7d49c..5c455570 100644 --- a/server/executor.go +++ b/server/executor.go @@ -24,5 +24,6 @@ func (cfg Config) newExecutor(ctx context.Context, ec executorConfig) (csp_serve return csp_server.Runtime{ Runtime: r, Cache: ec.Cache, + Tree: csp_server.NewProcTree(ctx), }, nil }