Skip to content

Commit

Permalink
session pool supports multiple nodes (#78)
Browse files Browse the repository at this point in the history
Co-authored-by: fuliwen <fuliw@yonyou.com>
  • Loading branch information
fuliwen and fuliwen authored Mar 9, 2023
1 parent ec57c6c commit d859381
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 36 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ If there is no available connections and the pool reaches its max size, the all
The PutBack method must be called after use

### New sessionPool
standalone

```golang

Expand All @@ -97,6 +98,18 @@ config := &client.PoolConfig{
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)

```
cluster or doubleLive

```golang

config := &client.PoolConfig{
UserName: user,
Password: password,
NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)

```

### Get session through sessionPool, putback after use
Expand Down
15 changes: 15 additions & 0 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ go run session_example.go

### 创建sessionPool

单实例
```golang

config := &client.PoolConfig{
Expand All @@ -81,6 +82,20 @@ sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)

```

分布式或双活

```golang

config := &client.PoolConfig{
UserName: user,
Password: password,
NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)

```


### 使用sessionPool获取session,使用完手动调用PutBack

例1:设置存储组
Expand Down
36 changes: 19 additions & 17 deletions client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,12 @@ func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) err
}

type ClusterConfig struct {
NodeUrls []string //ip:port
UserName string
Password string
FetchSize int32
TimeZone string
NodeUrls []string //ip:port
UserName string
Password string
FetchSize int32
TimeZone string
ConnectRetryMax int
}

type ClusterSession struct {
Expand Down Expand Up @@ -975,12 +976,12 @@ func NewSession(config *Config) Session {
return Session{config: config}
}

func NewClusterSession(ClusterConfig *ClusterConfig) Session {
func NewClusterSession(clusterConfig *ClusterConfig) Session {
session := Session{}
node := endPoint{}
for i := 0; i < len(ClusterConfig.NodeUrls); i++ {
node.Host = strings.Split(ClusterConfig.NodeUrls[i], ":")[0]
node.Port = strings.Split(ClusterConfig.NodeUrls[i], ":")[1]
for i := 0; i < len(clusterConfig.NodeUrls); i++ {
node.Host = strings.Split(clusterConfig.NodeUrls[i], ":")[0]
node.Port = strings.Split(clusterConfig.NodeUrls[i], ":")[1]
endPointList.PushBack(node)
}
var err error
Expand All @@ -996,7 +997,7 @@ func NewClusterSession(ClusterConfig *ClusterConfig) Session {
log.Println(err)
} else {
session.config = getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port,
ClusterConfig.UserName, ClusterConfig.Password, ClusterConfig.FetchSize, ClusterConfig.TimeZone)
clusterConfig.UserName, clusterConfig.Password, clusterConfig.FetchSize, clusterConfig.TimeZone, clusterConfig.ConnectRetryMax)
break
}
}
Expand Down Expand Up @@ -1052,14 +1053,15 @@ func (s *Session) initClusterConn(node endPoint) error {

}

func getConfig(host string, port string, userName string, passWord string, fetchSize int32, timeZone string) *Config {
func getConfig(host string, port string, userName string, passWord string, fetchSize int32, timeZone string, connectRetryMax int) *Config {
return &Config{
Host: host,
Port: port,
UserName: userName,
Password: passWord,
FetchSize: fetchSize,
TimeZone: timeZone,
Host: host,
Port: port,
UserName: userName,
Password: passWord,
FetchSize: fetchSize,
TimeZone: timeZone,
ConnectRetryMax: connectRetryMax,
}
}

Expand Down
31 changes: 25 additions & 6 deletions client/sessionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (spool *SessionPool) GetSession() (session Session, err error) {
if ok {
return session, nil
} else {
log.Println("sessionpool has closed")
log.Println("sessionPool has closed")
return session, errPoolClosed
}
default:
Expand All @@ -93,11 +93,19 @@ func (spool *SessionPool) GetSession() (session Session, err error) {
}
}

func (spool *SessionPool) ConstructSession(config *PoolConfig) (Session, error) {
session := NewSession(getSessionConfig(config))
if err := session.Open(spool.enableCompression, spool.connectionTimeoutInMs); err != nil {
log.Print(err)
return session, err
func (spool *SessionPool) ConstructSession(config *PoolConfig) (session Session, err error) {
if len(config.NodeUrls) > 0 {
session = NewClusterSession(getClusterSessionConfig(config))
if err := session.OpenCluster(spool.enableCompression); err != nil {
log.Print(err)
return session, err
}
} else {
session = NewSession(getSessionConfig(config))
if err := session.Open(spool.enableCompression, spool.connectionTimeoutInMs); err != nil {
log.Print(err)
return session, err
}
}
return session, nil
}
Expand All @@ -114,6 +122,17 @@ func getSessionConfig(config *PoolConfig) *Config {
}
}

func getClusterSessionConfig(config *PoolConfig) *ClusterConfig {
return &ClusterConfig{
NodeUrls: config.NodeUrls,
UserName: config.UserName,
Password: config.Password,
FetchSize: config.FetchSize,
TimeZone: config.TimeZone,
ConnectRetryMax: config.ConnectRetryMax,
}
}

func (spool *SessionPool) PutBack(session Session) {
if session.trans.IsOpen() {
spool.ch <- session
Expand Down
35 changes: 22 additions & 13 deletions example/session_pool/session_pool_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"log"
"math/rand"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -60,8 +61,8 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
setStorageGroup(fmt.Sprintf("root.ln%d", j))
deleteStorageGroup(fmt.Sprintf("root.ln%d", j))
setStorageGroup(fmt.Sprintf("root.ln-%d", j))
deleteStorageGroup(fmt.Sprintf("root.ln-%d", j))

}()

Expand Down Expand Up @@ -134,17 +135,6 @@ func main() {
insertAlignedTablets()
deleteTimeseries("root.ln.device1.*")
executeQueryStatement("show timeseries root.**")
for i := 0; i < 10000; i++ {
var j = i
wg.Add(1)
go func() {
defer wg.Done()
setStorageGroup(fmt.Sprintf("root.ln%d", j))
deleteStorageGroup(fmt.Sprintf("root.ln%d", j))

}()

}
wg.Wait()

}
Expand Down Expand Up @@ -773,3 +763,22 @@ func checkError(status *rpc.TSStatus, err error) {
}
}
}

// If your IotDB is a cluster version or doubleLive, you can use the following code for session pool connection
func useSessionPool() {

config := &client.PoolConfig{
UserName: user,
Password: password,
NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer sessionPool.Close()
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err != nil {
log.Print(err)
return
}

}

0 comments on commit d859381

Please sign in to comment.