Skip to content

Commit

Permalink
Support more complex compositions
Browse files Browse the repository at this point in the history
  • Loading branch information
ajnavarro committed Sep 22, 2022
1 parent 2256a55 commit ef48926
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 94 deletions.
137 changes: 66 additions & 71 deletions routing/delegated.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,58 +28,27 @@ import (
var log = logging.Logger("routing/delegated")

func Parse(routers config.Routers, methods config.Methods, extraDHT *ExtraDHTParams, extraReframe *ExtraReframeParams) (routing.Routing, error) {
createdRouters := make(map[string]routing.Routing)
processLater := make(config.Routers)
log.Info("starting to parse ", len(routers), " routers")
for k, r := range routers {
if !r.Enabled.WithDefault(true) {
continue
}

if r.Type == config.RouterTypeSequential ||
r.Type == config.RouterTypeParallel {
processLater[k] = r
continue
}
log.Info("creating router ", k)
router, err := routingFromConfig(r.Router, extraDHT, extraReframe, nil, nil)
if err != nil {
return nil, err
}

log.Info("router ", k, " created with params ", r.Parameters)

createdRouters[k] = router
}

// using the createdRouters, instantiate all parallel and sequential routers
for k, r := range processLater {
crp, ok := r.Router.Parameters.(*config.ComposableRouterParams)
if !ok {
return nil, fmt.Errorf("problem getting composable router Parameters from router %q", k)
}

log.Info("creating router helper ", k)
router, err := routingFromConfig(r.Router, extraDHT, extraReframe, crp, createdRouters)
if err != nil {
return nil, err
}

createdRouters[k] = router

log.Info("router ", k, " created with params ", r.Parameters)
}

if err := methods.Check(); err != nil {
return nil, err
}

createdRouters := make(map[string]routing.Routing)
finalRouter := &Composer{}

// Create all needed routers from method names
for mn, m := range methods {
router, ok := createdRouters[m.RouterName]
if !ok {
return nil, fmt.Errorf("router with name %q not found for method %q", m.RouterName, mn)
r, ok := createdRouters[m.RouterName]
var router routing.Routing
if ok {
router = r
} else {
r, err := parse(make(map[string]bool), createdRouters, m.RouterName, routers, extraDHT, extraReframe)
if err != nil {
return nil, err
}
router = r
}

switch mn {
case config.MethodNamePutIPNS:
finalRouter.PutValueRouter = router
Expand All @@ -99,32 +68,49 @@ func Parse(routers config.Routers, methods config.Methods, extraDHT *ExtraDHTPar
return finalRouter, nil
}

func routingFromConfig(conf config.Router,
func parse(visited map[string]bool,
createdRouters map[string]routing.Routing,
routerName string,
routersCfg config.Routers,
extraDHT *ExtraDHTParams,
extraReframe *ExtraReframeParams,
extraComposableParams *config.ComposableRouterParams,
routers map[string]routing.Routing,
) (routing.Routing, error) {
// check if we already created it
r, ok := createdRouters[routerName]
if ok {
return r, nil
}

// check if we are in a dep loop
if visited[routerName] {
return nil, fmt.Errorf("dependency loop creating router with name %q", routerName)
}

// set node as visited
visited[routerName] = true

cfg, ok := routersCfg[routerName]
if !ok {
return nil, fmt.Errorf("config for router with name %q not found", routerName)
}

var router routing.Routing
var err error
switch conf.Type {
switch cfg.Type {
case config.RouterTypeReframe:
router, err = reframeRoutingFromConfig(conf, extraReframe)
router, err = reframeRoutingFromConfig(cfg.Router, extraReframe)
case config.RouterTypeDHT:
router, err = dhtRoutingFromConfig(conf, extraDHT)
router, err = dhtRoutingFromConfig(cfg.Router, extraDHT)
case config.RouterTypeParallel:
if extraComposableParams == nil || routers == nil {
err = fmt.Errorf("missing params needed to create a composable router")
break
}
crp := cfg.Parameters.(*config.ComposableRouterParams)
var pr []*routinghelpers.ParallelRouter
for _, cr := range extraComposableParams.Routers {
ri, ok := routers[cr.RouterName]
for _, cr := range crp.Routers {
ri, ok := createdRouters[cr.RouterName]
if !ok {
err = fmt.Errorf("router with name %q not found. If you have a router with this name, "+
"check routers order in configuration. Take into account that nested parallel and/or sequential "+
"routers are not supported", cr.RouterName)
break
ri, err = parse(visited, createdRouters, cr.RouterName, routersCfg, extraDHT, extraReframe)
if err != nil {
return nil, err
}
}

pr = append(pr, &routinghelpers.ParallelRouter{
Expand All @@ -133,35 +119,44 @@ func routingFromConfig(conf config.Router,
Timeout: cr.Timeout.Duration,
ExecuteAfter: cr.ExecuteAfter.WithDefault(0),
})

}

router = routinghelpers.NewComposableParallel(pr)
case config.RouterTypeSequential:
if extraComposableParams == nil || routers == nil {
err = fmt.Errorf("missing params needed to create a composable router")
break
}
crp := cfg.Parameters.(*config.ComposableRouterParams)
var sr []*routinghelpers.SequentialRouter
for _, cr := range extraComposableParams.Routers {
ri, ok := routers[cr.RouterName]
for _, cr := range crp.Routers {
ri, ok := createdRouters[cr.RouterName]
if !ok {
err = fmt.Errorf("router with name %q not found", cr.RouterName)
break
ri, err = parse(visited, createdRouters, cr.RouterName, routersCfg, extraDHT, extraReframe)
if err != nil {
return nil, err
}
}

sr = append(sr, &routinghelpers.SequentialRouter{
Router: ri,
IgnoreError: cr.IgnoreErrors,
Timeout: cr.Timeout.Duration,
})

}

router = routinghelpers.NewComposableSequential(sr)
default:
return nil, fmt.Errorf("unknown router type %q", conf.Type)
return nil, fmt.Errorf("unknown router type %q", cfg.Type)
}

if err != nil {
return nil, err
}

return router, err
createdRouters[routerName] = router

log.Info("created router ", routerName, " with params ", cfg.Parameters)

return router, nil
}

type ExtraReframeParams struct {
Expand Down
162 changes: 147 additions & 15 deletions routing/delegated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,23 @@ import (
"github.com/stretchr/testify/require"
)

func TestRoutingFromConfig(t *testing.T) {
func TestReframeRoutingFromConfig(t *testing.T) {
require := require.New(t)

r, err := routingFromConfig(config.Router{
Type: "unknown",
}, nil, nil, nil, nil)

require.Nil(r)
require.EqualError(err, "unknown router type \"unknown\"")

r, err = routingFromConfig(config.Router{
r, err := reframeRoutingFromConfig(config.Router{
Type: config.RouterTypeReframe,
Parameters: &config.ReframeRouterParams{},
}, nil, nil, nil, nil)
}, nil)

require.Nil(r)
require.EqualError(err, "configuration param 'Endpoint' is needed for reframe delegated routing types")

r, err = routingFromConfig(config.Router{
r, err = reframeRoutingFromConfig(config.Router{
Type: config.RouterTypeReframe,
Parameters: &config.ReframeRouterParams{
Endpoint: "test",
},
}, nil, nil, nil, nil)
}, nil)

require.NoError(err)
require.NotNil(r)
Expand All @@ -47,16 +40,16 @@ func TestRoutingFromConfig(t *testing.T) {
privM, err := crypto.MarshalPrivateKey(priv)
require.NoError(err)

r, err = routingFromConfig(config.Router{
r, err = reframeRoutingFromConfig(config.Router{
Type: config.RouterTypeReframe,
Parameters: &config.ReframeRouterParams{
Endpoint: "test",
},
}, nil, &ExtraReframeParams{
}, &ExtraReframeParams{
PeerID: id.String(),
Addrs: []string{"/ip4/0.0.0.0/tcp/4001"},
PrivKeyB64: base64.StdEncoding.EncodeToString(privM),
}, nil, nil)
})

require.NotNil(r)
require.NoError(err)
Expand Down Expand Up @@ -114,3 +107,142 @@ func TestParser(t *testing.T) {
require.Equal(comp.FindPeersRouter, comp.FindProvidersRouter)
require.Equal(comp.ProvideRouter, comp.PutValueRouter)
}

func TestParserRecursive(t *testing.T) {
require := require.New(t)

router, err := Parse(config.Routers{
"reframe1": config.RouterParser{
Router: config.Router{
Type: config.RouterTypeReframe,
Enabled: config.True,
Parameters: &config.ReframeRouterParams{
Endpoint: "testEndpoint1",
},
},
},
"reframe2": config.RouterParser{
Router: config.Router{
Type: config.RouterTypeReframe,
Enabled: config.True,
Parameters: &config.ReframeRouterParams{
Endpoint: "testEndpoint2",
},
},
},
"reframe3": config.RouterParser{
Router: config.Router{
Type: config.RouterTypeReframe,
Enabled: config.True,
Parameters: &config.ReframeRouterParams{
Endpoint: "testEndpoint3",
},
},
},
"composable1": config.RouterParser{
Router: config.Router{
Type: config.RouterTypeSequential,
Enabled: config.True,
Parameters: &config.ComposableRouterParams{
Routers: []config.ConfigRouter{
{
RouterName: "reframe1",
},
{
RouterName: "reframe2",
},
},
},
},
},
"composable2": config.RouterParser{
Router: config.Router{
Type: config.RouterTypeParallel,
Enabled: config.True,
Parameters: &config.ComposableRouterParams{
Routers: []config.ConfigRouter{
{
RouterName: "composable1",
},
{
RouterName: "reframe3",
},
},
},
},
},
}, config.Methods{
config.MethodNameFindPeers: config.Method{
RouterName: "composable2",
},
config.MethodNameFindProviders: config.Method{
RouterName: "composable2",
},
config.MethodNameGetIPNS: config.Method{
RouterName: "composable2",
},
config.MethodNamePutIPNS: config.Method{
RouterName: "composable2",
},
config.MethodNameProvide: config.Method{
RouterName: "composable2",
},
}, &ExtraDHTParams{}, nil)

require.NoError(err)

_, ok := router.(*Composer)
require.True(ok)

}

func TestParserRecursiveLoop(t *testing.T) {
require := require.New(t)

_, err := Parse(config.Routers{
"composable1": config.RouterParser{
Router: config.Router{
Type: config.RouterTypeSequential,
Enabled: config.True,
Parameters: &config.ComposableRouterParams{
Routers: []config.ConfigRouter{
{
RouterName: "composable2",
},
},
},
},
},
"composable2": config.RouterParser{
Router: config.Router{
Type: config.RouterTypeParallel,
Enabled: config.True,
Parameters: &config.ComposableRouterParams{
Routers: []config.ConfigRouter{
{
RouterName: "composable1",
},
},
},
},
},
}, config.Methods{
config.MethodNameFindPeers: config.Method{
RouterName: "composable2",
},
config.MethodNameFindProviders: config.Method{
RouterName: "composable2",
},
config.MethodNameGetIPNS: config.Method{
RouterName: "composable2",
},
config.MethodNamePutIPNS: config.Method{
RouterName: "composable2",
},
config.MethodNameProvide: config.Method{
RouterName: "composable2",
},
}, &ExtraDHTParams{}, nil)

require.ErrorContains(err, "dependency loop creating router with name \"composable2\"")
}
Loading

0 comments on commit ef48926

Please sign in to comment.