Skip to content

Commit

Permalink
Merge pull request #294 from VitalyGushin/add-dual-stack-support
Browse files Browse the repository at this point in the history
Add dual stack support.
  • Loading branch information
denis-tingaikin committed Apr 18, 2024
2 parents c453f4a + b08c41e commit a85f519
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 157 deletions.
69 changes: 0 additions & 69 deletions internal/imports/imports_linux.go

This file was deleted.

182 changes: 94 additions & 88 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/connectioncontext/ipcontext/vl3"
"github.com/networkservicemesh/sdk/pkg/networkservice/connectioncontext/mtu/vl3mtu"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"

registryclientinfo "github.com/networkservicemesh/sdk/pkg/registry/common/clientinfo"
registrysendfd "github.com/networkservicemesh/sdk/pkg/registry/common/sendfd"
Expand Down Expand Up @@ -112,7 +113,7 @@ type Config struct {
UnregisterItself bool `default:"true" desc:"if true then NSE unregister itself when it completes working" split_words:"true"`
OpenTelemetryEndpoint string `default:"otel-collector.observability.svc.cluster.local:4317" desc:"OpenTelemetry Collector Endpoint"`
MetricsExportInterval time.Duration `default:"10s" desc:"interval between mertics exports" split_words:"true"`
PrefixServerURL url.URL `default:"vl3-ipam:5006" desc:"URL to VL3 IPAM server" split_words:"true"`
PrefixServerURL []url.URL `default:"vl3-ipam:5006" desc:"URL to VL3 IPAM server(s)" split_words:"true"`
DNSTemplates []string `default:"{{ index .Labels \"podName\" }}.{{ .NetworkService }}." desc:"Represents domain naming templates in go-template format. It is using for generating the domain name for each nse/nsc in the vl3 network" split_words:"true"`
LogLevel string `default:"INFO" desc:"Log level" split_words:"true"`
dnsServerAddr net.IP
Expand All @@ -132,70 +133,63 @@ func (c *Config) Process() error {
}

func startListenPrefixes(ctx context.Context, c *Config, tlsClientConfig *tls.Config, subscriptions []chan *ipam.PrefixResponse) {
var previousResponse *ipam.PrefixResponse
go func() {
var cc *grpc.ClientConn
var err error
for ctx.Err() == nil {
// Close the previous clientConn
if cc != nil {
_ = cc.Close()
}
dialCtx, dialCtxCancel := context.WithTimeout(ctx, time.Millisecond*200)
cc, err = grpc.DialContext(dialCtx,
grpcutils.URLToTarget(&c.PrefixServerURL),
grpc.WithBlock(),
grpc.WithTransportCredentials(
credentials.NewTLS(
tlsClientConfig,
for i := range c.PrefixServerURL {
prefixServerURL := &c.PrefixServerURL[i]
go func(i int, prefixServerURL *url.URL) {
log.FromContext(ctx).Infof("Start listening prefix server %d: %s", i, prefixServerURL)
var previousResponse *ipam.PrefixResponse
var cc *grpc.ClientConn
var err error
for ctx.Err() == nil {
// Close the previous clientConn
if cc != nil {
_ = cc.Close()
}
dialCtx, dialCtxCancel := context.WithTimeout(ctx, time.Millisecond*200)
cc, err = grpc.DialContext(dialCtx,
grpcutils.URLToTarget(prefixServerURL),
grpc.WithBlock(),
grpc.WithTransportCredentials(
credentials.NewTLS(
tlsClientConfig,
),
),
),
)
// It is safe to cancel dial ctx after DialContext if WithBlock() option is used
dialCtxCancel()
if err != nil {
logrus.Error(err.Error())
continue
}
)
// It is safe to cancel dial ctx after DialContext if WithBlock() option is used
dialCtxCancel()
if err != nil {
logrus.Error(err.Error())
continue
}

managePrefixClient, err := ipam.NewIPAMClient(cc).ManagePrefixes(ctx)
if err != nil {
logrus.Error(err.Error())
continue
}
managePrefixClient, err := ipam.NewIPAMClient(cc).ManagePrefixes(ctx)
if err != nil {
logrus.Error(err.Error())
continue
}

request := &ipam.PrefixRequest{
Type: ipam.Type_ALLOCATE,
Prefix: previousResponse.GetPrefix(),
}
request := &ipam.PrefixRequest{
Type: ipam.Type_ALLOCATE,
Prefix: previousResponse.GetPrefix(),
}

err = managePrefixClient.Send(request)
err = managePrefixClient.Send(request)

if err != nil {
continue
}
if err != nil {
continue
}

for resp, recvErr := managePrefixClient.Recv(); recvErr == nil; resp, recvErr = managePrefixClient.Recv() {
if !proto.Equal(previousResponse, resp) {
previousResponse = resp
for _, sub := range subscriptions {
select {
case sub <- resp:
default:
}
for resp, recvErr := managePrefixClient.Recv(); recvErr == nil; resp, recvErr = managePrefixClient.Recv() {
if !proto.Equal(previousResponse, resp) {
previousResponse = resp
subscriptions[i] <- resp
}
}
}
}
}()
}(i, prefixServerURL)
}
}

const (
serverSubscriptionIdx = iota
clientSubscriptionIdx
totalSubscriptions
)

func main() {
// ********************************************************************************
// setup context to catch signals
Expand Down Expand Up @@ -374,21 +368,22 @@ func main() {
listenOn := &(url.URL{Scheme: "unix", Path: filepath.Join(tmpDir, config.ListenOn)})

// ********************************************************************************
log.FromContext(ctx).Infof("executing phase 6.2: create and register nse with nsm")
log.FromContext(ctx).Infof("executing phase 6: create and register nse with nsm")
// ********************************************************************************

var subscribedChannels []chan *ipam.PrefixResponse
for i := 0; i < totalSubscriptions; i++ {
subscribedChannels = append(subscribedChannels, make(chan *ipam.PrefixResponse, 1))
prefixServerCount := len(config.PrefixServerURL)
var subscriptions []chan *ipam.PrefixResponse
for i := 0; i < prefixServerCount; i++ {
subscriptions = append(subscriptions, make(chan *ipam.PrefixResponse, 1))
}
var closeSubscribedChannels = func() {
for i := 0; i < totalSubscriptions; i++ {
close(subscribedChannels[i])
for i := 0; i < prefixServerCount; i++ {
close(subscriptions[i])
}
}
startListenPrefixes(ctx, config, tlsClientConfig, subscribedChannels)
startListenPrefixes(ctx, config, tlsClientConfig, subscriptions)
ipams := extractIPAMList(ctx, subscriptions)

server := createVl3Endpoint(ctx, cancel, config, vppConn, tlsServerConfig, source, loopOptions, vrfOptions, subscribedChannels[serverSubscriptionIdx])
server := createVl3Endpoint(ctx, cancel, config, vppConn, tlsServerConfig, source, loopOptions, vrfOptions, ipams)

srvErrCh := grpcutils.ListenAndServe(ctx, listenOn, server)
exitOnErr(ctx, cancel, srvErrCh)
Expand Down Expand Up @@ -448,7 +443,7 @@ func main() {
config.dnsServerAddr = conn.GetContext().GetIpContext().GetSrcIPNets()[0].IP
config.dnsServerAddrCh <- conn.GetContext().GetIpContext().GetSrcIPNets()[0].IP

vl3Client := createVl3Client(ctx, config, vppConn, tlsClientConfig, source, loopOptions, vrfOptions, subscribedChannels[clientSubscriptionIdx], clientAdditionalFunctionality...)
vl3Client := createVl3Client(ctx, config, vppConn, tlsClientConfig, source, loopOptions, vrfOptions, ipams, clientAdditionalFunctionality...)
for _, nse := range nseList {
if nse.Name == config.Name {
continue
Expand Down Expand Up @@ -498,8 +493,31 @@ func main() {
closeSubscribedChannels()
}

func extractIPAMList(ctx context.Context, subscriptions []chan *ipam.PrefixResponse) []*vl3.IPAM {
ipams := make([]*vl3.IPAM, len(subscriptions))

for i := range ipams {
ipams[i] = new(vl3.IPAM)
}

handleChannel := func(prefixCh <-chan *ipam.PrefixResponse, ipam *vl3.IPAM) {
for prefix := range prefixCh {
err := ipam.Reset(prefix.Prefix, prefix.ExcludePrefixes...)
if err != nil {
log.FromContext(ctx).Errorf("failed to reset vl3 IPAM pool: %s", err.Error())
}
}
}

for i, ch := range subscriptions {
go handleChannel(ch, ipams[i])
}

return ipams
}

func createVl3Client(ctx context.Context, config *Config, vppConn vpphelper.Connection, tlsClientConfig *tls.Config, source x509svid.Source,
loopOpts []loopback.Option, vrfOpts []vrf.Option, prefixCh <-chan *ipam.PrefixResponse, clientAdditionalFunctionality ...networkservice.NetworkServiceClient) networkservice.NetworkServiceClient {
loopOpts []loopback.Option, vrfOpts []vrf.Option, ipams []*vl3.IPAM, clientAdditionalFunctionality ...networkservice.NetworkServiceClient) networkservice.NetworkServiceClient {
dialOptions := append(tracing.WithTracingDial(),
grpcfd.WithChainStreamInterceptor(),
grpcfd.WithChainUnaryInterceptor(),
Expand All @@ -516,24 +534,14 @@ func createVl3Client(ctx context.Context, config *Config, vppConn vpphelper.Conn
),
)

var clientIpam vl3.IPAM
go func() {
for prefix := range prefixCh {
err := clientIpam.Reset(prefix.Prefix, prefix.ExcludePrefixes...)
if err != nil {
log.FromContext(ctx).Errorf("failed to reset vl3 client IPAM pool: %s", err.Error())
}
}
}()

c := client.NewClient(
ctx,
client.WithClientURL(&config.ConnectTo),
client.WithName(config.Name),
client.WithAdditionalFunctionality(
append(
clientAdditionalFunctionality,
vl3.NewClient(ctx, &clientIpam),
newMultiIPAMClient(ctx, ipams),
vl3dns.NewClient(config.dnsServerAddr, &config.dnsConfigs),
up.NewClient(ctx, vppConn, up.WithLoadSwIfIndex(loopback.Load)),
ipaddress.NewClient(vppConn, ipaddress.WithLoadSwIfIndex(loopback.Load)),
Expand All @@ -556,18 +564,16 @@ func createVl3Client(ctx context.Context, config *Config, vppConn vpphelper.Conn
return retry.NewClient(c)
}

func createVl3Endpoint(ctx context.Context, cancel context.CancelFunc, config *Config, vppConn vpphelper.Connection, tlsServerConfig *tls.Config,
source x509svid.Source, loopOpts []loopback.Option, vrfOpts []vrf.Option, prefixCh <-chan *ipam.PrefixResponse) *grpc.Server {
var serverIpam vl3.IPAM
go func() {
for prefix := range prefixCh {
err := serverIpam.Reset(prefix.Prefix, prefix.ExcludePrefixes...)
if err != nil {
log.FromContext(ctx).Errorf("failed to reset vl3 server IPAM pool: %s", err.Error())
}
}
}()
func newMultiIPAMClient(ctx context.Context, ipams []*vl3.IPAM) networkservice.NetworkServiceClient {
var clients []networkservice.NetworkServiceClient
for _, ipam := range ipams {
clients = append(clients, vl3.NewClient(ctx, ipam))
}
return next.NewNetworkServiceClient(clients...)
}

func createVl3Endpoint(ctx context.Context, cancel context.CancelFunc, config *Config, vppConn vpphelper.Connection, tlsServerConfig *tls.Config,
source x509svid.Source, loopOpts []loopback.Option, vrfOpts []vrf.Option, ipams []*vl3.IPAM) *grpc.Server {
vl3Endpoint := endpoint.NewServer(ctx,
spiffejwt.TokenGeneratorFunc(source, config.MaxTokenLifetime),
endpoint.WithName(config.Name),
Expand All @@ -580,7 +586,7 @@ func createVl3Endpoint(ctx context.Context, cancel context.CancelFunc, config *C
vl3dns.WithConfigs(&config.dnsConfigs),
),
vl3mtu.NewServer(),
strictvl3ipam.NewServer(ctx, vl3.NewServer, &serverIpam),
strictvl3ipam.NewServer(ctx, vl3.NewServer, ipams...),
up.NewServer(ctx, vppConn, up.WithLoadSwIfIndex(loopback.Load)),
ipaddress.NewServer(vppConn, ipaddress.WithLoadSwIfIndex(loopback.Load)),
unnumbered.NewServer(vppConn, loopback.Load),
Expand Down

0 comments on commit a85f519

Please sign in to comment.