-
Notifications
You must be signed in to change notification settings - Fork 0
/
join_protocol.go
98 lines (78 loc) · 2.88 KB
/
join_protocol.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package raft
import (
"fmt"
"log"
"github.com/hashicorp/raft"
)
type joinProtocol struct {
raft *raft.Raft
node LocalNode
token string
addr string
Logger *log.Logger
}
func (j *joinProtocol) GetSharedState() SharedState {
return nil
}
func (j *joinProtocol) GetTypeTranslator() MessageTypeTranslator {
var m = make(map[uint8]MessagePtrFactory)
m[rpcJoinCluster] = func() interface{} {
return &joinClusterRequest{}
}
return m
}
func (j *joinProtocol) OnMessageReceive(u uint8, req interface{}) (interface{}, error) {
// check if the req can be parsed to a JoinClusterRequest pointer
if join, ok := req.(*joinClusterRequest); u == rpcJoinCluster && ok {
j.Logger.Printf("[INFO] received join request from remote node %s at %s", join.NodeID, join.RemoteAddr)
// compare tokens; in case this node is a leader or a
// follower, the node will have to correct token locally
if join.Token != j.token {
err := fmt.Errorf("[ERR] join request contained wrong join token %s", join.Token)
j.Logger.Printf("%s", err)
return nil, err
}
// only leaders can add new nodes to the cluster
if leader := j.raft.Leader(); string(leader) != j.addr {
j.Logger.Printf("[INFO] this is not a leader, join request will be forwarded to leader at %s", leader)
var res = &joinClusterResponse{}
err := j.node.RemoteProcedureCall(&remoteNode{Address: leader}, rpcJoinCluster, req, res)
return res, err
}
// obtain configuration about registered nodes
configFuture := j.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
j.Logger.Printf("[ERR] failed to get raft configuration: %v", err)
return nil, err
}
for _, srv := range configFuture.Configuration().Servers {
if srv.ID == raft.ServerID(join.NodeID) || srv.Address == raft.ServerAddress(join.RemoteAddr) {
if srv.Address == raft.ServerAddress(join.RemoteAddr) && srv.ID == raft.ServerID(join.NodeID) {
j.Logger.Printf("[WARN] node %s at %s already member of cluster, ignoring join request", join.NodeID, join.RemoteAddr)
} else {
future := j.raft.RemoveServer(srv.ID, 0, 0)
if err := future.Error(); err != nil {
err = fmt.Errorf("[ERR] error removing existing node %s at %s: %s", join.NodeID, join.RemoteAddr, err)
j.Logger.Printf("%s", err)
return nil, err
}
}
}
}
future := j.raft.AddVoter(raft.ServerID(join.NodeID), raft.ServerAddress(join.RemoteAddr), 0, 0)
if err := future.Error(); err != nil {
err = fmt.Errorf("[ERR] failed to add new voter %s at %s: %s", join.NodeID, join.RemoteAddr, err)
j.Logger.Printf("%s", err)
return nil, err
}
j.Logger.Printf("[INFO] node %s at %s joined successfully", join.NodeID, join.RemoteAddr)
return &joinClusterResponse{
LeaderAddr: string(j.raft.Leader()),
LastIndex: j.raft.LastIndex(),
}, nil
}
return nil, nil
}
func (j *joinProtocol) SetLocalNode(node LocalNode) {
j.node = node
}