Skip to content

Commit

Permalink
support for gRPC-based p2p backend (EXPERIMENTAL) (#222)
Browse files Browse the repository at this point in the history
gRPC-based point-to-point backend is implemented.
For point-to-point backend, metaserver is also implemented.
The metaserver is a broker that provides a set of endpoints to each
worker so that workers can communicate with each other.

The p2p backend is implemented with asyncio gRPC.
  • Loading branch information
myungjin committed Sep 1, 2022
1 parent 75bf57b commit 9af0480
Show file tree
Hide file tree
Showing 25 changed files with 1,808 additions and 75 deletions.
100 changes: 100 additions & 0 deletions cmd/metaserver/app/metastore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2022 Cisco Systems, Inc. and its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package app

import (
"time"

pbMeta "github.com/cisco-open/flame/pkg/proto/meta"
"go.uber.org/zap"
)

const (
HEART_BEAT_DURATION = 30 * time.Second
TIMEOUT_STALE_ENDPOINT = 2 * HEART_BEAT_DURATION
)

type job struct {
channels map[string]*channel
}

type channel struct {
roles map[string]*metaInfo
}

type metaInfo struct {
endpoints map[string]chan bool
}

func (j *job) register(mi *pbMeta.MetaInfo) error {
ch, ok := j.channels[mi.ChName]
if !ok {
ch = &channel{roles: make(map[string]*metaInfo)}
j.channels[mi.ChName] = ch
}

if err := ch.register(mi.Me, mi.Endpoint); err != nil {
return err
}

return nil
}

func (j *job) search(chName string, role string) map[string]chan bool {
ch, ok := j.channels[chName]
if !ok {
return nil
}

mi := ch.search(role)

if mi == nil {
return nil
}

return mi.endpoints
}

func (ch *channel) register(role string, endpoint string) error {
mi, ok := ch.roles[role]
if !ok {
mi = &metaInfo{endpoints: make(map[string]chan bool)}
ch.roles[role] = mi
}

_, ok = mi.endpoints[endpoint]
if ok {
zap.S().Infof("endpoint %s already registered", endpoint)
return nil
}

// registering for the first time, set heart beat channel nil
mi.endpoints[endpoint] = nil

zap.S().Infof("done calling ch.register() for endpoint %s", endpoint)

return nil
}

func (ch *channel) search(role string) *metaInfo {
mi, ok := ch.roles[role]
if !ok {
return nil
}

return mi
}
159 changes: 159 additions & 0 deletions cmd/metaserver/app/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright 2022 Cisco Systems, Inc. and its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package app

import (
"context"
"fmt"
"net"
"sync"
"time"

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

pbMeta "github.com/cisco-open/flame/pkg/proto/meta"
)

// meta server is a service to facilitate flame's data plane operations
type metaServer struct {
mutex sync.Mutex
jobs map[string]*job

pbMeta.UnimplementedMetaRouteServer
}

func Start(portNo uint16) {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", portNo))
if err != nil {
zap.S().Errorf("failed to listen grpc server: %v", err)
}

// create grpc server
s := grpc.NewServer()
server := &metaServer{
jobs: make(map[string]*job),
}

pbMeta.RegisterMetaRouteServer(s, server)
reflection.Register(s)

zap.S().Infof("MetaServer listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
zap.S().Fatalf("failed to serve: %s", err)
}
}

func (s *metaServer) RegisterMetaInfo(ctx context.Context, in *pbMeta.MetaInfo) (*pbMeta.MetaResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()

zap.S().Infof("jobId: %s, ChName: %s, Me: %s, Other: %s, Endpoint: %s", in.JobId, in.ChName, in.Me, in.Other, in.Endpoint)
j, ok := s.jobs[in.JobId]
if !ok {
j = &job{channels: make(map[string]*channel)}
s.jobs[in.JobId] = j
}

err := j.register(in)
if err != nil {
zap.S().Errorf("failed to register: %v", err)
return nil, fmt.Errorf("failed to register: %v", err)
}

err = s.setupEndpointTimeout(in)
if err != nil {
zap.S().Errorf("failed to setup endpoint timeout: %v", err)
return nil, err
}

resp := &pbMeta.MetaResponse{
Status: pbMeta.MetaResponse_SUCCESS,
Endpoints: make([]string, 0),
}
endpoints := j.search(in.ChName, in.Other)
for endpoint := range endpoints {
resp.Endpoints = append(resp.Endpoints, endpoint)
}

return resp, nil
}

func (s *metaServer) HeartBeat(ctx context.Context, in *pbMeta.MetaInfo) (*pbMeta.MetaResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()

err := s.setupEndpointTimeout(in)
if err != nil {
return nil, err
}

resp := &pbMeta.MetaResponse{
Status: pbMeta.MetaResponse_SUCCESS,
Endpoints: make([]string, 0),
}
return resp, nil
}

func (s *metaServer) setupEndpointTimeout(in *pbMeta.MetaInfo) error {
j, ok := s.jobs[in.JobId]
if !ok {
return fmt.Errorf("job id %s not found", in.JobId)
}

endpoints := j.search(in.ChName, in.Me)
if endpoints == nil {
return fmt.Errorf("no endpoint found for role %s", in.Me)
}

heartbeat, ok := endpoints[in.Endpoint]
if !ok {
return fmt.Errorf("no cancel channel found for endpoint %s", in.Endpoint)
}

if heartbeat != nil {
heartbeat <- true
} else {
heartbeat = make(chan bool)
endpoints[in.Endpoint] = heartbeat
go s.cleanStaleEndpiont(endpoints, heartbeat, in.Endpoint)
}

return nil
}

func (s *metaServer) cleanStaleEndpiont(endpoints map[string]chan bool, heartbeat chan bool, endpoint string) {
timer := time.NewTimer(TIMEOUT_STALE_ENDPOINT)

for {
select {
case <-timer.C:
zap.S().Infof("timer fired for endpoint %s", endpoint)

s.mutex.Lock()
delete(endpoints, endpoint)
s.mutex.Unlock()
return

case <-heartbeat:
// reset timer
timer.Reset(TIMEOUT_STALE_ENDPOINT)
zap.S().Infof("timer reset for endpoint %s", endpoint)
}
}
}
49 changes: 49 additions & 0 deletions cmd/metaserver/cmd/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2022 Cisco Systems, Inc. and its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package cmd

import (
"github.com/spf13/cobra"

"github.com/cisco-open/flame/cmd/metaserver/app"
"github.com/cisco-open/flame/pkg/util"
)

var rootCmd = &cobra.Command{
Use: util.MetaServer,
Short: util.ProjectName + " " + util.MetaServer,
RunE: func(cmd *cobra.Command, args []string) error {
flags := cmd.Flags()

port, err := flags.GetUint16("port")
if err != nil {
return err
}

app.Start(port)

return nil
},
}

func init() {
rootCmd.Flags().Uint16P("port", "p", util.MetaServerPort, "service port")
}

func Execute() error {
return rootCmd.Execute()
}
36 changes: 36 additions & 0 deletions cmd/metaserver/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2022 Cisco Systems, Inc. and its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package main

import (
"os"

"go.uber.org/zap"

"github.com/cisco-open/flame/cmd/metaserver/cmd"
"github.com/cisco-open/flame/pkg/util"
)

func main() {
loggerMgr := util.InitZapLog(util.MetaServer)
zap.ReplaceGlobals(loggerMgr)
defer loggerMgr.Sync()

if err := cmd.Execute(); err != nil {
os.Exit(1)
}
}
4 changes: 1 addition & 3 deletions fiab/helm-chart/control/templates/controller-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ data:
notifier: {{ .Values.frontDoorUrl.notifier }}:443
jobParams:
image: {{ .Values.workerImageName }}:{{ .Values.workerImageTag }}
brokers:
- sort: {{ .Values.broker.sort }}
host: {{ .Values.broker.host }}
brokers: {{ toYaml .Values.broker | nindent 8 }}
registry:
sort: {{ .Values.registry.sort }}
uri: {{ .Values.registry.uri }}
40 changes: 40 additions & 0 deletions fiab/helm-chart/control/templates/metaserver-deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2022 Cisco Systems, Inc. and its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0

---
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Release.Name }}-metaserver
namespace: {{ .Release.Namespace }}

spec:
replicas: {{ .Values.replicas }}
selector:
matchLabels:
app: {{ .Release.Name }}-metaserver
template:
metadata:
labels:
app: {{ .Release.Name }}-metaserver
spec:
containers:
- command: ["/usr/bin/metaserver"]
image: {{ .Values.imageName }}:{{ .Values.imageTag }}
imagePullPolicy: IfNotPresent
name: {{ .Release.Name }}-metaserver
ports:
- containerPort: {{ .Values.servicePort.metaserver }}
Loading

0 comments on commit 9af0480

Please sign in to comment.