-
Notifications
You must be signed in to change notification settings - Fork 81
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Support postgres db for store and forward (#1605)
* feat: Support postgres db for store and forward Support postgres db for store and forward. Signed-off-by: Lindsey Cheng <beckysocute@gmail.com> * fix: Add postgres store methods unit tests - Add postgres store methods unit tests. - Revert redigo package version. Signed-off-by: Lindsey Cheng <beckysocute@gmail.com> * fix: Modify NewClient testcase Relates to #1603. Modify NewClient testcase. Signed-off-by: Lindsey Cheng <beckysocute@gmail.com> * fix: Update RetrieveFromStore ordered by created timestamp Update RetrieveFromStore to retrieve data ordered by created timestamp. Signed-off-by: Lindsey Cheng <beckysocute@gmail.com> --------- Signed-off-by: Lindsey Cheng <beckysocute@gmail.com>
- Loading branch information
1 parent
5062475
commit ac7762b
Showing
15 changed files
with
944 additions
and
96 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
// | ||
// Copyright (C) 2024 IOTech Ltd | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package postgres | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net/url" | ||
"os" | ||
"sync" | ||
|
||
bootstrapConfig "github.com/edgexfoundry/go-mod-bootstrap/v3/config" | ||
|
||
"github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger" | ||
"github.com/edgexfoundry/go-mod-core-contracts/v3/errors" | ||
|
||
"github.com/jackc/pgx/v5/pgxpool" | ||
) | ||
|
||
const defaultDBName = "edgex_db" | ||
|
||
var once sync.Once | ||
var dc *Client | ||
|
||
type Client struct { | ||
connPool *pgxpool.Pool | ||
loggingClient logger.LoggingClient | ||
} | ||
|
||
// NewClient returns a pointer to the Postgres client | ||
func NewClient(ctx context.Context, config bootstrapConfig.Database, credentials bootstrapConfig.Credentials, baseScriptPath, extScriptPath string, lc logger.LoggingClient) (*Client, errors.EdgeX) { | ||
// Get the database name from the environment variable | ||
databaseName := os.Getenv("EDGEX_DBNAME") | ||
if databaseName == "" { | ||
databaseName = defaultDBName | ||
} | ||
|
||
var edgeXerr errors.EdgeX | ||
once.Do(func() { | ||
// use url encode to prevent special characters in the connection string | ||
connectionStr := "postgres://" + fmt.Sprintf("%s:%s@%s:%d/%s", url.PathEscape(credentials.Username), url.PathEscape(credentials.Password), url.PathEscape(config.Host), config.Port, url.PathEscape(databaseName)) | ||
dbPool, err := pgxpool.New(ctx, connectionStr) | ||
if err != nil { | ||
edgeXerr = wrapDBError("fail to create pg connection pool", err) | ||
} | ||
|
||
dc = &Client{ | ||
connPool: dbPool, | ||
loggingClient: lc, | ||
} | ||
}) | ||
if edgeXerr != nil { | ||
return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, "failed to create a Postgres client", edgeXerr) | ||
} | ||
|
||
// invoke Ping to test the connectivity to the DB | ||
if err := dc.connPool.Ping(ctx); err != nil { | ||
return nil, wrapDBError("failed to acquire a connection from database connection pool", err) | ||
} | ||
|
||
lc.Info("Successfully connect to Postgres database") | ||
|
||
// execute base DB scripts | ||
if edgeXerr = executeDBScripts(ctx, dc.connPool, baseScriptPath); edgeXerr != nil { | ||
return nil, errors.NewCommonEdgeX(errors.Kind(edgeXerr), "failed to execute Postgres base DB scripts", edgeXerr) | ||
} | ||
if baseScriptPath != "" { | ||
lc.Info("Successfully execute Postgres base DB scripts") | ||
} | ||
|
||
// execute extension DB scripts | ||
if edgeXerr = executeDBScripts(ctx, dc.connPool, extScriptPath); edgeXerr != nil { | ||
return nil, errors.NewCommonEdgeX(errors.Kind(edgeXerr), "failed to execute Postgres extension DB scripts", edgeXerr) | ||
} | ||
|
||
return dc, nil | ||
} | ||
|
||
// CloseSession closes the connections to postgres | ||
func (c Client) CloseSession() { | ||
c.connPool.Close() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
//go:build postgresRunning | ||
// +build postgresRunning | ||
|
||
/******************************************************************************* | ||
* Copyright (C) 2024 IOTech Ltd | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except | ||
* in compliance with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License | ||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | ||
* or implied. See the License for the specific language governing permissions and limitations under | ||
* the License. | ||
*******************************************************************************/ | ||
|
||
// This test will only be executed if the tag postgresRunning is added when running | ||
// the tests with a command like: | ||
// go test -tags postgresRunning | ||
|
||
package postgres | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/store/db" | ||
|
||
bootstrapConfig "github.com/edgexfoundry/go-mod-bootstrap/v3/config" | ||
"github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
const ( | ||
TestHost = "localhost" | ||
TestPort = 5432 | ||
TestTimeout = "5s" | ||
TestMaxIdle = 5000 | ||
TestBatchSize = 1337 | ||
|
||
TestRetryCount = 100 | ||
TestPipelinePosition = 1337 | ||
TestVersion = "your" | ||
TestCorrelationID = "test" | ||
TestPipelineId = "test-pipeline" | ||
) | ||
|
||
var TestValidNoAuthConfig = bootstrapConfig.Database{ | ||
Type: db.Postgres, | ||
Host: TestHost, | ||
Port: TestPort, | ||
Timeout: TestTimeout, | ||
} | ||
|
||
var TestCredential = bootstrapConfig.Credentials{Username: "postgres", Password: "mysecretpassword"} | ||
|
||
func TestClient_NewClient(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
config bootstrapConfig.Database | ||
expectedError bool | ||
}{ | ||
{"Success, no auth", TestValidNoAuthConfig, false}, | ||
} | ||
|
||
lc := logger.NewMockClient() | ||
for _, test := range tests { | ||
t.Run(test.name, func(t *testing.T) { | ||
_, err := NewClient(context.Background(), test.config, TestCredential, "", "", lc) | ||
|
||
if test.expectedError { | ||
require.Error(t, err) | ||
} else { | ||
require.NoError(t, err) | ||
} | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
// | ||
// Copyright (C) 2024 IOTech Ltd | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package postgres | ||
|
||
import ( | ||
"fmt" | ||
) | ||
|
||
// Constants for column names in the database table | ||
const ( | ||
contentCol = "content" | ||
idCol = "id" | ||
) | ||
|
||
// sqlInsertContent returns the SQL statement for inserting a new row with id, name, and content into the table | ||
func sqlInsertContent(table string) string { | ||
return fmt.Sprintf("INSERT INTO %s(%s, %s) VALUES ($1, $2)", table, idCol, contentCol) | ||
} | ||
|
||
// sqlQueryCountByJSONField returns the SQL statement for selecting content column in the table by the given JSON query string ordered by created column | ||
func sqlQueryContentByJSONField(table string) string { | ||
return fmt.Sprintf("SELECT content FROM %s WHERE content @> $1::jsonb ORDER BY created", table) | ||
} | ||
|
||
// sqlCheckExistsById returns the SQL statement for checking if a row exists in the table by id | ||
func sqlCheckExistsById(table string) string { | ||
return fmt.Sprintf("SELECT EXISTS(SELECT 1 FROM %s WHERE %s = $1)", table, idCol) | ||
} | ||
|
||
// sqlUpdateContentById returns the SQL statement for updating the content of a row in the table by id | ||
func sqlUpdateContentById(table string) string { | ||
return fmt.Sprintf("UPDATE %s SET %s = $1 WHERE %s = $2", table, contentCol, idCol) | ||
} | ||
|
||
// sqlDeleteById returns the SQL statement for deleting a row from the table by id | ||
func sqlDeleteById(table string) string { | ||
return fmt.Sprintf("DELETE FROM %s WHERE %s = $1", table, idCol) | ||
} |
Oops, something went wrong.