Skip to content

Commit

Permalink
fix: failed to start timer due to controller isn't ready
Browse files Browse the repository at this point in the history
  • Loading branch information
hwjiangkai committed Dec 11, 2022
1 parent 8236094 commit fe65285
Show file tree
Hide file tree
Showing 6 changed files with 775 additions and 624 deletions.
9 changes: 9 additions & 0 deletions internal/controller/eventbus/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func NewController(cfg Config, member embedetcd.Member) *controller {
eventBusMap: map[string]*metadata.Eventbus{},
member: member,
isLeader: false,
isReady: false,
readyNotify: make(chan error, 1),
stopNotify: make(chan error, 1),
}
Expand All @@ -83,6 +84,7 @@ type controller struct {
cancelFunc context.CancelFunc
membershipMutex sync.Mutex
isLeader bool
isReady bool
readyNotify chan error
stopNotify chan error
mutex sync.Mutex
Expand Down Expand Up @@ -457,6 +459,12 @@ func (ctrl *controller) Ping(_ context.Context, _ *emptypb.Empty) (*ctrlpb.PingR
}, nil
}

func (ctrl *controller) IsReady(_ context.Context, _ *emptypb.Empty) (*ctrlpb.IsReadyResponse, error) {
return &ctrlpb.IsReadyResponse{
IsReady: ctrl.isReady,
}, nil
}

func (ctrl *controller) ReportSegmentLeader(ctx context.Context,
req *ctrlpb.ReportSegmentLeaderRequest) (*emptypb.Empty, error) {
err := ctrl.eventLogMgr.UpdateSegmentReplicas(ctx, vanus.NewIDFromUint64(req.LeaderId), req.Term)
Expand Down Expand Up @@ -495,6 +503,7 @@ func (ctrl *controller) membershipChangedProcessor(ctx context.Context, event em
ctrl.stop(ctx, err)
return err
}
ctrl.isReady = true
case embedetcd.EventBecomeFollower:
if !ctrl.isLeader {
return nil
Expand Down
4 changes: 4 additions & 0 deletions internal/controller/eventbus/eventlog/eventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/google/uuid"
"github.com/huandu/skiplist"
ctrlerrors "github.com/linkall-labs/vanus/internal/controller/errors"
"github.com/linkall-labs/vanus/internal/controller/eventbus/block"
"github.com/linkall-labs/vanus/internal/controller/eventbus/errors"
"github.com/linkall-labs/vanus/internal/controller/eventbus/metadata"
Expand Down Expand Up @@ -174,6 +175,9 @@ func (mgr *eventlogManager) AcquireEventLog(ctx context.Context, eventbusID vanu
})
return nil, err
}
if mgr.kvClient == nil {
return nil, ctrlerrors.ErrNotLeader
}
elMD := &metadata.Eventlog{
ID: id,
EventbusID: eventbusID,
Expand Down
20 changes: 19 additions & 1 deletion internal/timer/timingwheel/timingwheel.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/linkall-labs/vanus/pkg/controller"
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb"
"k8s.io/apimachinery/pkg/util/wait"
)

Expand Down Expand Up @@ -154,8 +155,25 @@ func (tw *timingWheel) Start(ctx context.Context) error {
"leader": tw.leader,
})

// here is to wait for the leader to complete the creation of all eventbus
// here is to wait for controller is ready
waitCtx, cancel := context.WithCancel(ctx)
wait.Until(func() {
resp, err := tw.ctrlCli.IsReady(ctx, &emptypb.Empty{})
if err != nil {
log.Error(ctx, "get eventbus controller state failed", map[string]interface{}{
log.KeyError: err,
})
return
}
if resp.IsReady {
cancel()
} else {
log.Info(ctx, "wait for the controller to be ready", nil)
}
}, time.Second, waitCtx.Done())

// here is to wait for the leader to complete the creation of all eventbus
waitCtx, cancel = context.WithCancel(ctx)
wait.Until(func() {
if tw.IsLeader() || tw.IsDeployed(ctx) {
cancel()
Expand Down
9 changes: 9 additions & 0 deletions pkg/controller/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ func (ec *eventbusClient) Close() error {
return ec.cc.close()
}

func (ec *eventbusClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ctrlpb.IsReadyResponse, error) {
out := new(ctrlpb.IsReadyResponse)
err := ec.cc.invoke(ctx, "/linkall.vanus.controller.EventBusController/IsReady", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

func (ec *eventbusClient) CreateEventBus(ctx context.Context, in *ctrlpb.CreateEventBusRequest, opts ...grpc.CallOption) (*metapb.EventBus, error) {
out := new(metapb.EventBus)
err := ec.cc.invoke(ctx, "/linkall.vanus.controller.EventBusController/CreateEventBus", in, out, opts...)
Expand Down
Loading

0 comments on commit fe65285

Please sign in to comment.