From 692f967d4fb2d55593267663a722826186ddf0d1 Mon Sep 17 00:00:00 2001 From: ganglv <88995770+ganglyu@users.noreply.github.com> Date: Fri, 17 May 2024 15:48:12 +0800 Subject: [PATCH 1/4] Update pipeline to check memory leak (#228) Why I did it We have found some memory leak introduced by cgo, and we need tools to detect memory leak. How I did it Run unit test with Address Santilizer to detect memory leak, and add a new step in pipeline to check memory leak. How to verify it Run sonic-gnmi pipeline. --- Makefile | 14 ++++++++++++++ azure-pipelines.yml | 11 +++++++++-- gnmi_server/server_test.go | 5 +++++ sonic_data_client/client_test.go | 6 ++++++ sonic_db_config/db_config_test.go | 5 +++++ telemetry/telemetry_test.go | 6 ++++++ test_utils/memcheck_disable.go | 5 +++++ test_utils/memcheck_enable.go | 15 +++++++++++++++ 8 files changed, 65 insertions(+), 2 deletions(-) create mode 100644 test_utils/memcheck_disable.go create mode 100644 test_utils/memcheck_enable.go diff --git a/Makefile b/Makefile index c4fbd406..f5ff0f26 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,8 @@ export GOBIN := $(abspath $(BUILD_DIR)) export PATH := $(PATH):$(GOBIN):$(shell dirname $(GO)) export CGO_LDFLAGS := -lswsscommon -lhiredis export CGO_CXXFLAGS := -I/usr/include/swss -w -Wall -fpermissive +export MEMCHECK_CGO_LDFLAGS := $(CGO_LDFLAGS) -fsanitize=address +export MEMCHECK_CGO_CXXFLAGS := $(CGO_CXXFLAGS) -fsanitize=leak ifeq ($(ENABLE_TRANSLIB_WRITE),y) BLD_TAGS := gnmi_translib_write @@ -26,6 +28,11 @@ ifneq ($(BLD_TAGS),) BLD_FLAGS := -tags "$(strip $(BLD_TAGS))" endif +MEMCHECK_TAGS := $(BLD_TAGS) gnmi_memcheck +ifneq ($(MEMCHECK_TAGS),) +MEMCHECK_FLAGS := -tags "$(strip $(MEMCHECK_TAGS))" +endif + ENABLE_DIALOUT_VALUE := 1 ifeq ($(ENABLE_DIALOUT),n) ENABLE_DIALOUT_VALUE = 0 @@ -133,6 +140,13 @@ endif gocov convert coverage-*.txt | gocov-xml -source $(shell pwd) > coverage.xml rm -rf coverage-*.txt +check_memleak: $(DBCONFG) $(ENVFILE) + sudo CGO_LDFLAGS="$(MEMCHECK_CGO_LDFLAGS)" CGO_CXXFLAGS="$(MEMCHECK_CGO_CXXFLAGS)" $(GO) test -mod=vendor $(MEMCHECK_FLAGS) -v github.com/sonic-net/sonic-gnmi/telemetry + sudo CGO_LDFLAGS="$(MEMCHECK_CGO_LDFLAGS)" CGO_CXXFLAGS="$(MEMCHECK_CGO_CXXFLAGS)" $(GO) test $(MEMCHECK_FLAGS) -v github.com/sonic-net/sonic-gnmi/sonic_db_config + sudo CGO_LDFLAGS="$(MEMCHECK_CGO_LDFLAGS)" CGO_CXXFLAGS="$(MEMCHECK_CGO_CXXFLAGS)" $(GO) test -mod=vendor $(MEMCHECK_FLAGS) -v github.com/sonic-net/sonic-gnmi/gnmi_server -run TestGNMINative + sudo CGO_LDFLAGS="$(MEMCHECK_CGO_LDFLAGS)" CGO_CXXFLAGS="$(MEMCHECK_CGO_CXXFLAGS)" $(GO) test -mod=vendor $(MEMCHECK_FLAGS) -v github.com/sonic-net/sonic-gnmi/sonic_data_client + + clean: $(RM) -r build $(RM) -r vendor diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 1b99822d..6553044b 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -23,6 +23,8 @@ variables: value: $(System.PullRequest.TargetBranch) ${{ else }}: value: $(Build.SourceBranchName) + - name: UNIT_TEST_FLAG + value: 'ENABLE_TRANSLIB_WRITE=y' resources: repositories: @@ -161,8 +163,13 @@ stages: - script: | pushd sonic-gnmi - make check_gotest ENABLE_TRANSLIB_WRITE=y - displayName: "Test" + make all && make check_memleak $(UNIT_TEST_FLAG) + displayName: "Check memory leak" + + - script: | + pushd sonic-gnmi + make all && make check_gotest $(UNIT_TEST_FLAG) + displayName: "Run unit test" - publish: $(Build.ArtifactStagingDirectory)/ artifact: sonic-gnmi diff --git a/gnmi_server/server_test.go b/gnmi_server/server_test.go index e4098404..dd380df3 100644 --- a/gnmi_server/server_test.go +++ b/gnmi_server/server_test.go @@ -4173,3 +4173,8 @@ func init() { // Inform gNMI server to use redis tcp localhost connection sdc.UseRedisLocalTcpPort = true } + +func TestMain(m *testing.M) { + defer test_utils.MemLeakCheck() + m.Run() +} diff --git a/sonic_data_client/client_test.go b/sonic_data_client/client_test.go index 4c199660..86bc9025 100644 --- a/sonic_data_client/client_test.go +++ b/sonic_data_client/client_test.go @@ -13,6 +13,7 @@ import ( "github.com/jipanyang/gnxi/utils/xpath" "github.com/sonic-net/sonic-gnmi/swsscommon" + "github.com/sonic-net/sonic-gnmi/test_utils" gnmipb "github.com/openconfig/gnmi/proto/gnmi" ) @@ -624,3 +625,8 @@ func TestGetZmqClient(t *testing.T) { swsscommon.DeleteZmqClient(client) } } + +func TestMain(m *testing.M) { + defer test_utils.MemLeakCheck() + m.Run() +} diff --git a/sonic_db_config/db_config_test.go b/sonic_db_config/db_config_test.go index f03b3f92..56afb550 100644 --- a/sonic_db_config/db_config_test.go +++ b/sonic_db_config/db_config_test.go @@ -351,3 +351,8 @@ func TestGetDbMultiInstance(t *testing.T) { } }) } + +func TestMain(m *testing.M) { + defer test_utils.MemLeakCheck() + m.Run() +} diff --git a/telemetry/telemetry_test.go b/telemetry/telemetry_test.go index 03bcd1fc..cab194cc 100644 --- a/telemetry/telemetry_test.go +++ b/telemetry/telemetry_test.go @@ -17,6 +17,7 @@ import ( "testing" "flag" gnmi "github.com/sonic-net/sonic-gnmi/gnmi_server" + "github.com/sonic-net/sonic-gnmi/test_utils" "github.com/agiledragon/gomonkey/v2" "os" "syscall" @@ -858,3 +859,8 @@ func testHandlerSyscall(t *testing.T, signal os.Signal) { func sendSignal(serverControlSignal chan<- ServerControlValue, value ServerControlValue) { serverControlSignal <- value } + +func TestMain(m *testing.M) { + defer test_utils.MemLeakCheck() + m.Run() +} diff --git a/test_utils/memcheck_disable.go b/test_utils/memcheck_disable.go new file mode 100644 index 00000000..157699c6 --- /dev/null +++ b/test_utils/memcheck_disable.go @@ -0,0 +1,5 @@ +// +build !gnmi_memcheck + +package test_utils + +func MemLeakCheck() {} diff --git a/test_utils/memcheck_enable.go b/test_utils/memcheck_enable.go new file mode 100644 index 00000000..976f8e8b --- /dev/null +++ b/test_utils/memcheck_enable.go @@ -0,0 +1,15 @@ +// +build gnmi_memcheck + +package test_utils + +// int __lsan_do_recoverable_leak_check(void); +import "C" +import "fmt" + +func MemLeakCheck() { + ret := int(C.__lsan_do_recoverable_leak_check()) + if ret != 0 { + panic(fmt.Errorf("Detect memory leak!")) + } +} + From 81174c0510c143035680e55489275ddcda98f3a5 Mon Sep 17 00:00:00 2001 From: ganglv <88995770+ganglyu@users.noreply.github.com> Date: Tue, 21 May 2024 15:53:56 +0800 Subject: [PATCH 2/4] Fix full config update (#240) Why I did it GNMI full config end to end test is broken. How I did it Fix yang validation part to send file name to python code. How to verify it Run gnmi unit test and end to end test. --- sonic_data_client/mixed_db_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sonic_data_client/mixed_db_client.go b/sonic_data_client/mixed_db_client.go index f748a576..3de666df 100644 --- a/sonic_data_client/mixed_db_client.go +++ b/sonic_data_client/mixed_db_client.go @@ -1353,7 +1353,7 @@ func (c *MixedDbClient) SetFullConfig(delete []*gnmipb.Path, replace []*gnmipb.U return err } - PyCodeInGo := fmt.Sprintf(PyCodeForYang, ietf_json_val) + PyCodeInGo := fmt.Sprintf(PyCodeForYang, fileName) err = RunPyCode(PyCodeInGo) if err != nil { return fmt.Errorf("Yang validation failed!") From 585f4419f983b8d931121fdcb8acfe549bb06fba Mon Sep 17 00:00:00 2001 From: Ryan Lucus Date: Tue, 21 May 2024 16:10:45 -0700 Subject: [PATCH 3/4] Add SaveOnSet (#108) * Add SaveOnSet * Add SaveOnSet * use trans_util for save on set * use trans_util for save on set * use trans_util for save on set * use trans_util for save on set * use trans_util for save on set * use trans_util for save on set * change save-on-set to sonic-services-client * change save-on-set to sonic-services-client * Update telemetry.go formating * fix merge * add coverage * add coverage * checking test * mistake * re-enable unit test --------- Co-authored-by: Shashank Neelam <77703519+sneelam20@users.noreply.github.com> --- gnmi_server/server.go | 58 +- gnmi_server/server_test.go | 1380 ++++++++++++++++++------------------ telemetry/telemetry.go | 54 +- 3 files changed, 774 insertions(+), 718 deletions(-) diff --git a/gnmi_server/server.go b/gnmi_server/server.go index 4ac4f3a0..b080fc11 100644 --- a/gnmi_server/server.go +++ b/gnmi_server/server.go @@ -4,12 +4,18 @@ import ( "bytes" "errors" "fmt" + "net" + "strings" + "sync" + "github.com/Azure/sonic-mgmt-common/translib" "github.com/sonic-net/sonic-gnmi/common_utils" spb "github.com/sonic-net/sonic-gnmi/proto" spb_gnoi "github.com/sonic-net/sonic-gnmi/proto/gnoi" spb_jwt_gnoi "github.com/sonic-net/sonic-gnmi/proto/gnoi/jwt" sdc "github.com/sonic-net/sonic-gnmi/sonic_data_client" + ssc "github.com/sonic-net/sonic-gnmi/sonic_service_client" + log "github.com/golang/glog" "github.com/golang/protobuf/proto" gnmipb "github.com/openconfig/gnmi/proto/gnmi" @@ -21,9 +27,6 @@ import ( "google.golang.org/grpc/peer" "google.golang.org/grpc/reflection" "google.golang.org/grpc/status" - "net" - "strings" - "sync" ) var ( @@ -39,6 +42,10 @@ type Server struct { config *Config cMu sync.Mutex clients map[string]*Client + // SaveStartupConfig points to a function that is called to save changes of + // configuration to a file. By default it points to an empty function - + // the configuration is not saved to a file. + SaveStartupConfig func() error // ReqFromMaster point to a function that is called to verify if the request // comes from a master controller. ReqFromMaster func(req *gnmipb.SetRequest, masterEID *uint128) error @@ -50,14 +57,14 @@ type AuthTypes map[string]bool type Config struct { // Port for the Server to listen on. If 0 or unset the Server will pick a port // for this Server. - Port int64 - LogLevel int - Threshold int - UserAuth AuthTypes + Port int64 + LogLevel int + Threshold int + UserAuth AuthTypes EnableTranslibWrite bool - EnableNativeWrite bool - ZmqPort string - IdleConnDuration int + EnableNativeWrite bool + ZmqPort string + IdleConnDuration int } var AuthLock sync.Mutex @@ -139,9 +146,10 @@ func NewServer(config *Config, opts []grpc.ServerOption) (*Server, error) { reflection.Register(s) srv := &Server{ - s: s, - config: config, - clients: map[string]*Client{}, + s: s, + config: config, + clients: map[string]*Client{}, + SaveStartupConfig: saveOnSetDisabled, // ReqFromMaster point to a function that is called to verify if // the request comes from a master controller. ReqFromMaster: ReqFromMasterDisabledMA, @@ -160,7 +168,7 @@ func NewServer(config *Config, opts []grpc.ServerOption) (*Server, error) { if srv.config.EnableTranslibWrite || srv.config.EnableNativeWrite { gnoi_system_pb.RegisterSystemServer(srv.s, srv) } - if srv.config.EnableTranslibWrite { + if srv.config.EnableTranslibWrite { spb_gnoi.RegisterSonicServiceServer(srv.s, srv) } spb_gnoi.RegisterDebugServer(srv.s, srv) @@ -400,6 +408,25 @@ func (s *Server) Get(ctx context.Context, req *gnmipb.GetRequest) (*gnmipb.GetRe return &gnmipb.GetResponse{Notification: notifications}, nil } +// saveOnSetEnabled saves configuration to a file +func SaveOnSetEnabled() error { + sc, err := ssc.NewDbusClient() + if err != nil { + log.V(0).Infof("Saving startup config failed to create dbus client: %v", err) + return err + } + if err := sc.ConfigSave("/etc/sonic/config_db.json"); err != nil { + log.V(0).Infof("Saving startup config failed: %v", err) + return err + } else { + log.V(1).Infof("Success! Startup config has been saved!") + } + return nil +} + +// SaveOnSetDisabeld does nothing. +func saveOnSetDisabled() error { return nil } + func (s *Server) Set(ctx context.Context, req *gnmipb.SetRequest) (*gnmipb.SetResponse, error) { e := s.ReqFromMaster(req, &s.masterEID) if e != nil { @@ -503,6 +530,7 @@ func (s *Server) Set(ctx context.Context, req *gnmipb.SetRequest) (*gnmipb.SetRe common_utils.IncCounter(common_utils.GNMI_SET_FAIL) } + s.SaveStartupConfig() return &gnmipb.SetResponse{ Prefix: req.GetPrefix(), Response: results, @@ -600,7 +628,7 @@ func ReqFromMasterEnabledMA(req *gnmipb.SetRequest, masterEID *uint128) error { // Role will be implemented later. return status.Errorf(codes.Unimplemented, "MA: Role is not implemented") } - + reqEID = uint128{High: ma.ElectionId.High, Low: ma.ElectionId.Low} // Use the election ID that is in the last extension, so, no 'break' here. } diff --git a/gnmi_server/server_test.go b/gnmi_server/server_test.go index dd380df3..27e75741 100644 --- a/gnmi_server/server_test.go +++ b/gnmi_server/server_test.go @@ -5,26 +5,31 @@ package gnmi import ( "crypto/tls" "encoding/json" - "path/filepath" "flag" "fmt" -"sync" - "strings" - "unsafe" - - testcert "github.com/sonic-net/sonic-gnmi/testdata/tls" - "github.com/go-redis/redis" - "github.com/golang/protobuf/proto" - "io/ioutil" "os" "os/exec" "os/user" + "path/filepath" "reflect" + "runtime" + "strings" + "sync" "testing" "time" - "runtime" + "unsafe" + + spb "github.com/sonic-net/sonic-gnmi/proto" + sgpb "github.com/sonic-net/sonic-gnmi/proto/gnoi" + sdc "github.com/sonic-net/sonic-gnmi/sonic_data_client" + sdcfg "github.com/sonic-net/sonic-gnmi/sonic_db_config" + ssc "github.com/sonic-net/sonic-gnmi/sonic_service_client" + "github.com/sonic-net/sonic-gnmi/test_utils" + testcert "github.com/sonic-net/sonic-gnmi/testdata/tls" + "github.com/go-redis/redis" + "github.com/golang/protobuf/proto" "github.com/kylelemons/godebug/pretty" "github.com/openconfig/gnmi/client" pb "github.com/openconfig/gnmi/proto/gnmi" @@ -36,26 +41,20 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" - "google.golang.org/grpc/status" "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/status" // Register supported client types. - spb "github.com/sonic-net/sonic-gnmi/proto" - sgpb "github.com/sonic-net/sonic-gnmi/proto/gnoi" - gnmipb "github.com/openconfig/gnmi/proto/gnmi" - sdc "github.com/sonic-net/sonic-gnmi/sonic_data_client" - sdcfg "github.com/sonic-net/sonic-gnmi/sonic_db_config" - "github.com/Workiva/go-datastructures/queue" - linuxproc "github.com/c9s/goprocinfo/linux" - "github.com/sonic-net/sonic-gnmi/common_utils" - "github.com/sonic-net/sonic-gnmi/test_utils" - gclient "github.com/jipanyang/gnmi/client/gnmi" - "github.com/jipanyang/gnxi/utils/xpath" - gnoi_system_pb "github.com/openconfig/gnoi/system" + "github.com/Workiva/go-datastructures/queue" "github.com/agiledragon/gomonkey/v2" + linuxproc "github.com/c9s/goprocinfo/linux" "github.com/godbus/dbus/v5" + gclient "github.com/jipanyang/gnmi/client/gnmi" + "github.com/jipanyang/gnxi/utils/xpath" cacheclient "github.com/openconfig/gnmi/client" - + gnmipb "github.com/openconfig/gnmi/proto/gnmi" + gnoi_system_pb "github.com/openconfig/gnoi/system" + "github.com/sonic-net/sonic-gnmi/common_utils" ) var clientTypes = []string{gclient.Type} @@ -163,7 +162,7 @@ func createRejectServer(t *testing.T, port int64) *Server { } opts := []grpc.ServerOption{grpc.Creds(credentials.NewTLS(tlsCfg))} - cfg := &Config{Port: port, EnableTranslibWrite: true, Threshold: 2} + cfg := &Config{Port: port, EnableTranslibWrite: true, Threshold: 2} s, err := NewServer(cfg, opts) if err != nil { t.Fatalf("Failed to create gNMI server: %v", err) @@ -356,7 +355,7 @@ func runTestSet(t *testing.T, ctx context.Context, gClient pb.GNMIClient, pathTa runTestSetRaw(t, ctx, gClient, req, wantRetCode) } -func runTestSetRaw(t *testing.T, ctx context.Context, gClient pb.GNMIClient, req *pb.SetRequest, +func runTestSetRaw(t *testing.T, ctx context.Context, gClient pb.GNMIClient, req *pb.SetRequest, wantRetCode codes.Code) { t.Helper() @@ -799,7 +798,7 @@ func createEventsQuery(t *testing.T, paths ...string) client.Query { func createStateDbQueryOnChangeMode(t *testing.T, paths ...string) client.Query { return createQueryOrFail(t, - pb.SubscriptionList_STREAM, + pb.SubscriptionList_STREAM, "STATE_DB", []subscriptionQuery{ { @@ -962,9 +961,9 @@ func TestGnmiSet(t *testing.T) { // operation: Update, //}, { - desc: "Set OC Interface IP", - pathTarget: "OC_YANG", - textPbPath: pathToPb("/openconfig-interfaces:interfaces/interface[name=Ethernet4]/subinterfaces/subinterface[index=0]/openconfig-if-ip:ipv4"), + desc: "Set OC Interface IP", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/openconfig-interfaces:interfaces/interface[name=Ethernet4]/subinterfaces/subinterface[index=0]/openconfig-if-ip:ipv4"), attributeData: "../testdata/set_interface_ipv4.json", wantRetCode: codes.OK, operation: Update, @@ -991,9 +990,9 @@ func TestGnmiSet(t *testing.T) { valTest: false, }, { - desc: "Set OC Interface IPv6 (unprefixed path)", - pathTarget: "OC_YANG", - textPbPath: pathToPb("/interfaces/interface[name=Ethernet0]/subinterfaces/subinterface[index=0]/ipv6/addresses/address"), + desc: "Set OC Interface IPv6 (unprefixed path)", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/interfaces/interface[name=Ethernet0]/subinterfaces/subinterface[index=0]/ipv6/addresses/address"), attributeData: `{"address": [{"ip": "150::1","config": {"ip": "150::1","prefix-length": 80}}]}`, wantRetCode: codes.OK, operation: Update, @@ -1006,13 +1005,13 @@ func TestGnmiSet(t *testing.T) { operation: Delete, }, { - desc: "Create ACL (unprefixed path)", - pathTarget: "OC_YANG", - textPbPath: pathToPb("/acl/acl-sets/acl-set"), + desc: "Create ACL (unprefixed path)", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/acl/acl-sets/acl-set"), attributeData: `{"acl-set": [{"name": "A001", "type": "ACL_IPV4", "config": {"name": "A001", "type": "ACL_IPV4", "description": "hello, world!"}}]}`, - wantRetCode: codes.OK, - operation: Update, + wantRetCode: codes.OK, + operation: Update, }, { desc: "Verify Create ACL", @@ -1059,7 +1058,7 @@ func TestGnmiSet(t *testing.T) { t.Run(td.desc, func(t *testing.T) { runTestGet(t, ctx, gClient, td.pathTarget, td.textPbPath, td.wantRetCode, td.wantRespVal, td.valTest) }) - t.Run(td.desc + " (unprefixed path)", func(t *testing.T) { + t.Run(td.desc+" (unprefixed path)", func(t *testing.T) { p := removeModulePrefixFromPathPb(t, td.textPbPath) runTestGet(t, ctx, gClient, td.pathTarget, p, td.wantRetCode, td.wantRespVal, td.valTest) }) @@ -1412,9 +1411,9 @@ func runGnmiTestGet(t *testing.T, namespace string) { wantRetCode: codes.OK, wantRespVal: []byte(`{"test_field": "test_value"}`), }, { - desc: "Invalid DBKey of length 1", - pathTarget: stateDBPath, - textPbPath: ``, + desc: "Invalid DBKey of length 1", + pathTarget: stateDBPath, + textPbPath: ``, valTest: true, wantRetCode: codes.NotFound, }, @@ -1823,7 +1822,7 @@ func runTestSubscribe(t *testing.T, namespace string) { generateIntervals bool } - tests := []TestExec { + tests := []TestExec{ { desc: "stream query for table COUNTERS_PORT_NAME_MAP with new test_field field", q: createCountersDbQueryOnChangeMode(t, "COUNTERS_PORT_NAME_MAP"), @@ -2666,7 +2665,7 @@ func runTestSubscribe(t *testing.T, namespace string) { mutexGotNoti.Unlock() return nil } - go func(t2 TestExec) { + go func(t2 TestExec) { defer wg.Done() err := c.Subscribe(context.Background(), q) if t2.wantSubErr != nil && t2.wantSubErr.Error() != err.Error() { @@ -3029,7 +3028,7 @@ func TestBulkSet(t *testing.T) { t.Run("Invalid Replace Path", func(t *testing.T) { req := &pb.SetRequest{ - Delete: []*pb.Path{aclPath1, aclPath2}, + Delete: []*pb.Path{aclPath1, aclPath2}, Replace: []*pb.Update{ newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`), }} @@ -3051,23 +3050,23 @@ func newPbUpdate(path, value string) *pb.Update { v := &pb.TypedValue_JsonIetfVal{JsonIetfVal: extractJSON(value)} return &pb.Update{ Path: p, - Val: &pb.TypedValue{Value: v}, + Val: &pb.TypedValue{Value: v}, } } type loginCreds struct { - Username, Password string + Username, Password string } func (c *loginCreds) GetRequestMetadata(context.Context, ...string) (map[string]string, error) { - return map[string]string{ - "username": c.Username, - "password": c.Password, - }, nil + return map[string]string{ + "username": c.Username, + "password": c.Password, + }, nil } func (c *loginCreds) RequireTransportSecurity() bool { - return true + return true } func TestAuthCapabilities(t *testing.T) { @@ -3107,659 +3106,659 @@ func TestAuthCapabilities(t *testing.T) { } func TestTableKeyOnDeletion(t *testing.T) { - s := createKeepAliveServer(t, 8081) - go runServer(t, s) - defer s.Stop() - - fileName := "../testdata/NEIGH_STATE_TABLE_MAP.txt" - neighStateTableByte, err := ioutil.ReadFile(fileName) - if err != nil { - t.Fatalf("read file %v err: %v", fileName, err) - } - var neighStateTableJson interface{} - json.Unmarshal(neighStateTableByte, &neighStateTableJson) - - fileName = "../testdata/NEIGH_STATE_TABLE_key_deletion_57.txt" - neighStateTableDeletedByte57, err := ioutil.ReadFile(fileName) - if err != nil { - t.Fatalf("read file %v err: %v", fileName, err) - } - var neighStateTableDeletedJson57 interface{} - json.Unmarshal(neighStateTableDeletedByte57, &neighStateTableDeletedJson57) - - fileName = "../testdata/NEIGH_STATE_TABLE_MAP_2.txt" - neighStateTableByteTwo, err := ioutil.ReadFile(fileName) - if err != nil { - t.Fatalf("read file %v err: %v", fileName, err) - } - var neighStateTableJsonTwo interface{} - json.Unmarshal(neighStateTableByteTwo, &neighStateTableJsonTwo) - - fileName = "../testdata/NEIGH_STATE_TABLE_key_deletion_59.txt" - neighStateTableDeletedByte59, err := ioutil.ReadFile(fileName) - if err != nil { - t.Fatalf("read file %v err: %v", fileName, err) - } - var neighStateTableDeletedJson59 interface{} - json.Unmarshal(neighStateTableDeletedByte59, &neighStateTableDeletedJson59) - - fileName = "../testdata/NEIGH_STATE_TABLE_key_deletion_61.txt" - neighStateTableDeletedByte61, err := ioutil.ReadFile(fileName) - if err != nil { - t.Fatalf("read file %v err: %v", fileName, err) - } - var neighStateTableDeletedJson61 interface{} - json.Unmarshal(neighStateTableDeletedByte61, &neighStateTableDeletedJson61) - - namespace, _ := sdcfg.GetDbDefaultNamespace() - rclient := getRedisClientN(t, 6, namespace) - defer rclient.Close() - prepareStateDb(t, namespace) - - tests := []struct { - desc string - q client.Query - wantNoti []client.Notification - paths []string - }{ - { - desc: "Testing deletion of NEIGH_STATE_TABLE:10.0.0.57", - q: createStateDbQueryOnChangeMode(t, "NEIGH_STATE_TABLE"), - wantNoti: []client.Notification { - client.Update{Path: []string{"NEIGH_STATE_TABLE"}, TS: time.Unix(0, 200), Val: neighStateTableJson}, - client.Update{Path: []string{"NEIGH_STATE_TABLE"}, TS: time.Unix(0, 200), Val: neighStateTableDeletedJson57}, - }, - paths: []string { - "NEIGH_STATE_TABLE|10.0.0.57", - }, - }, - { - desc: "Testing deletion of NEIGH_STATE_TABLE:10.0.0.59 and NEIGH_STATE_TABLE 10.0.0.61", - q: createStateDbQueryOnChangeMode(t, "NEIGH_STATE_TABLE"), - wantNoti: []client.Notification { - client.Update{Path: []string{"NEIGH_STATE_TABLE"}, TS: time.Unix(0, 200), Val: neighStateTableJsonTwo}, - client.Update{Path: []string{"NEIGH_STATE_TABLE"}, TS: time.Unix(0, 200), Val: neighStateTableDeletedJson59}, - client.Update{Path: []string{"NEIGH_STATE_TABLE"}, TS: time.Unix(0, 200), Val: neighStateTableDeletedJson61}, - }, - paths: []string { - "NEIGH_STATE_TABLE|10.0.0.59", - "NEIGH_STATE_TABLE|10.0.0.61", - }, - }, - } - - var mutexNoti sync.RWMutex - var mutexPaths sync.Mutex - for _, tt := range tests { - t.Run(tt.desc, func(t *testing.T) { - q := tt.q - q.Addrs = []string{"127.0.0.1:8081"} - c := client.New() - defer c.Close() - var gotNoti []client.Notification - q.NotificationHandler = func(n client.Notification) error { - if nn, ok := n.(client.Update); ok { - nn.TS = time.Unix(0, 200) - mutexNoti.Lock() - currentNoti := gotNoti - mutexNoti.Unlock() - - mutexNoti.RLock() - gotNoti = append(currentNoti, nn) - mutexNoti.RUnlock() - } - return nil - } - - go func() { - c.Subscribe(context.Background(), q) - }() - - time.Sleep(time.Millisecond * 500) // half a second for subscribe request to sync - - mutexPaths.Lock() - paths := tt.paths - mutexPaths.Unlock() - - rclient.Del(paths...) - - time.Sleep(time.Millisecond * 1500) - - mutexNoti.Lock() - if diff := pretty.Compare(tt.wantNoti, gotNoti); diff != "" { - t.Log("\n Want: \n", tt.wantNoti) - t.Log("\n Got : \n", gotNoti) - t.Errorf("unexpected updates:\n%s", diff) - } - mutexNoti.Unlock() - }) - } + s := createKeepAliveServer(t, 8081) + go runServer(t, s) + defer s.Stop() + + fileName := "../testdata/NEIGH_STATE_TABLE_MAP.txt" + neighStateTableByte, err := ioutil.ReadFile(fileName) + if err != nil { + t.Fatalf("read file %v err: %v", fileName, err) + } + var neighStateTableJson interface{} + json.Unmarshal(neighStateTableByte, &neighStateTableJson) + + fileName = "../testdata/NEIGH_STATE_TABLE_key_deletion_57.txt" + neighStateTableDeletedByte57, err := ioutil.ReadFile(fileName) + if err != nil { + t.Fatalf("read file %v err: %v", fileName, err) + } + var neighStateTableDeletedJson57 interface{} + json.Unmarshal(neighStateTableDeletedByte57, &neighStateTableDeletedJson57) + + fileName = "../testdata/NEIGH_STATE_TABLE_MAP_2.txt" + neighStateTableByteTwo, err := ioutil.ReadFile(fileName) + if err != nil { + t.Fatalf("read file %v err: %v", fileName, err) + } + var neighStateTableJsonTwo interface{} + json.Unmarshal(neighStateTableByteTwo, &neighStateTableJsonTwo) + + fileName = "../testdata/NEIGH_STATE_TABLE_key_deletion_59.txt" + neighStateTableDeletedByte59, err := ioutil.ReadFile(fileName) + if err != nil { + t.Fatalf("read file %v err: %v", fileName, err) + } + var neighStateTableDeletedJson59 interface{} + json.Unmarshal(neighStateTableDeletedByte59, &neighStateTableDeletedJson59) + + fileName = "../testdata/NEIGH_STATE_TABLE_key_deletion_61.txt" + neighStateTableDeletedByte61, err := ioutil.ReadFile(fileName) + if err != nil { + t.Fatalf("read file %v err: %v", fileName, err) + } + var neighStateTableDeletedJson61 interface{} + json.Unmarshal(neighStateTableDeletedByte61, &neighStateTableDeletedJson61) + + namespace, _ := sdcfg.GetDbDefaultNamespace() + rclient := getRedisClientN(t, 6, namespace) + defer rclient.Close() + prepareStateDb(t, namespace) + + tests := []struct { + desc string + q client.Query + wantNoti []client.Notification + paths []string + }{ + { + desc: "Testing deletion of NEIGH_STATE_TABLE:10.0.0.57", + q: createStateDbQueryOnChangeMode(t, "NEIGH_STATE_TABLE"), + wantNoti: []client.Notification{ + client.Update{Path: []string{"NEIGH_STATE_TABLE"}, TS: time.Unix(0, 200), Val: neighStateTableJson}, + client.Update{Path: []string{"NEIGH_STATE_TABLE"}, TS: time.Unix(0, 200), Val: neighStateTableDeletedJson57}, + }, + paths: []string{ + "NEIGH_STATE_TABLE|10.0.0.57", + }, + }, + { + desc: "Testing deletion of NEIGH_STATE_TABLE:10.0.0.59 and NEIGH_STATE_TABLE 10.0.0.61", + q: createStateDbQueryOnChangeMode(t, "NEIGH_STATE_TABLE"), + wantNoti: []client.Notification{ + client.Update{Path: []string{"NEIGH_STATE_TABLE"}, TS: time.Unix(0, 200), Val: neighStateTableJsonTwo}, + client.Update{Path: []string{"NEIGH_STATE_TABLE"}, TS: time.Unix(0, 200), Val: neighStateTableDeletedJson59}, + client.Update{Path: []string{"NEIGH_STATE_TABLE"}, TS: time.Unix(0, 200), Val: neighStateTableDeletedJson61}, + }, + paths: []string{ + "NEIGH_STATE_TABLE|10.0.0.59", + "NEIGH_STATE_TABLE|10.0.0.61", + }, + }, + } + + var mutexNoti sync.RWMutex + var mutexPaths sync.Mutex + for _, tt := range tests { + t.Run(tt.desc, func(t *testing.T) { + q := tt.q + q.Addrs = []string{"127.0.0.1:8081"} + c := client.New() + defer c.Close() + var gotNoti []client.Notification + q.NotificationHandler = func(n client.Notification) error { + if nn, ok := n.(client.Update); ok { + nn.TS = time.Unix(0, 200) + mutexNoti.Lock() + currentNoti := gotNoti + mutexNoti.Unlock() + + mutexNoti.RLock() + gotNoti = append(currentNoti, nn) + mutexNoti.RUnlock() + } + return nil + } + + go func() { + c.Subscribe(context.Background(), q) + }() + + time.Sleep(time.Millisecond * 500) // half a second for subscribe request to sync + + mutexPaths.Lock() + paths := tt.paths + mutexPaths.Unlock() + + rclient.Del(paths...) + + time.Sleep(time.Millisecond * 1500) + + mutexNoti.Lock() + if diff := pretty.Compare(tt.wantNoti, gotNoti); diff != "" { + t.Log("\n Want: \n", tt.wantNoti) + t.Log("\n Got : \n", gotNoti) + t.Errorf("unexpected updates:\n%s", diff) + } + mutexNoti.Unlock() + }) + } } func TestCPUUtilization(t *testing.T) { - mock := gomonkey.ApplyFunc(sdc.PollStats, func() { - var i uint64 - for i = 0; i < 3000; i++ { - sdc.WriteStatsToBuffer(&linuxproc.Stat{}) - } - }) - - defer mock.Reset() - s := createServer(t, 8081) - go runServer(t, s) - defer s.Stop() - - tests := []struct { - desc string - q client.Query - want []client.Notification - poll int - }{ - { - desc: "poll query for CPU Utilization", - poll: 10, - q: client.Query{ - Target: "OTHERS", - Type: client.Poll, - Queries: []client.Path{{"platform", "cpu"}}, - TLS: &tls.Config{InsecureSkipVerify: true}, - }, - want: []client.Notification{ - client.Connected{}, - client.Sync{}, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.desc, func(t *testing.T) { - q := tt.q - q.Addrs = []string{"127.0.0.1:8081"} - c := client.New() - var gotNoti []client.Notification - q.NotificationHandler = func(n client.Notification) error { - if nn, ok := n.(client.Update); ok { - nn.TS = time.Unix(0, 200) - gotNoti = append(gotNoti, nn) - } else { - gotNoti = append(gotNoti, n) - } - return nil - } - - wg := new(sync.WaitGroup) - wg.Add(1) - - go func() { - defer wg.Done() - if err := c.Subscribe(context.Background(), q); err != nil { - t.Errorf("c.Subscribe(): got error %v, expected nil", err) - } - }() - - wg.Wait() - - for i := 0; i < tt.poll; i++ { - if err := c.Poll(); err != nil { - t.Errorf("c.Poll(): got error %v, expected nil", err) - } - } - - if len(gotNoti) == 0 { - t.Errorf("expected non zero notifications") - } - - c.Close() - }) - } + mock := gomonkey.ApplyFunc(sdc.PollStats, func() { + var i uint64 + for i = 0; i < 3000; i++ { + sdc.WriteStatsToBuffer(&linuxproc.Stat{}) + } + }) + + defer mock.Reset() + s := createServer(t, 8081) + go runServer(t, s) + defer s.Stop() + + tests := []struct { + desc string + q client.Query + want []client.Notification + poll int + }{ + { + desc: "poll query for CPU Utilization", + poll: 10, + q: client.Query{ + Target: "OTHERS", + Type: client.Poll, + Queries: []client.Path{{"platform", "cpu"}}, + TLS: &tls.Config{InsecureSkipVerify: true}, + }, + want: []client.Notification{ + client.Connected{}, + client.Sync{}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.desc, func(t *testing.T) { + q := tt.q + q.Addrs = []string{"127.0.0.1:8081"} + c := client.New() + var gotNoti []client.Notification + q.NotificationHandler = func(n client.Notification) error { + if nn, ok := n.(client.Update); ok { + nn.TS = time.Unix(0, 200) + gotNoti = append(gotNoti, nn) + } else { + gotNoti = append(gotNoti, n) + } + return nil + } + + wg := new(sync.WaitGroup) + wg.Add(1) + + go func() { + defer wg.Done() + if err := c.Subscribe(context.Background(), q); err != nil { + t.Errorf("c.Subscribe(): got error %v, expected nil", err) + } + }() + + wg.Wait() + + for i := 0; i < tt.poll; i++ { + if err := c.Poll(); err != nil { + t.Errorf("c.Poll(): got error %v, expected nil", err) + } + } + + if len(gotNoti) == 0 { + t.Errorf("expected non zero notifications") + } + + c.Close() + }) + } } func TestClientConnections(t *testing.T) { - s := createRejectServer(t, 8081) - go runServer(t, s) - defer s.Stop() - - tests := []struct { - desc string - q client.Query - want []client.Notification - poll int - }{ - { - desc: "Reject OTHERS/proc/uptime", - poll: 10, - q: client.Query{ - Target: "OTHERS", - Type: client.Poll, - Queries: []client.Path{{"proc", "uptime"}}, - TLS: &tls.Config{InsecureSkipVerify: true}, - }, - want: []client.Notification{ - client.Connected{}, - client.Sync{}, - }, - }, - { - desc: "Reject COUNTERS/Ethernet*", - poll: 10, - q: client.Query{ - Target: "COUNTERS_DB", - Type: client.Poll, - Queries: []client.Path{{"COUNTERS", "Ethernet*"}}, - TLS: &tls.Config{InsecureSkipVerify: true}, - }, - want: []client.Notification{ - client.Connected{}, - client.Sync{}, - }, - }, - { - desc: "Reject COUNTERS/Ethernet68", - poll: 10, - q: client.Query{ - Target: "COUNTERS_DB", - Type: client.Poll, - Queries: []client.Path{{"COUNTERS", "Ethernet68"}}, - TLS: &tls.Config{InsecureSkipVerify: true}, - }, - want: []client.Notification{ - client.Connected{}, - client.Sync{}, - }, - }, - } - - var clients []*cacheclient.CacheClient - - for i, tt := range tests { - t.Run(tt.desc, func(t *testing.T) { - q := tt.q - q.Addrs = []string{"127.0.0.1:8081"} - var gotNoti []client.Notification - q.NotificationHandler = func(n client.Notification) error { - if nn, ok := n.(client.Update); ok { - nn.TS = time.Unix(0, 200) - gotNoti = append(gotNoti, nn) - } else { - gotNoti = append(gotNoti, n) - } - return nil - } - - wg := new(sync.WaitGroup) - wg.Add(1) - - go func() { - defer wg.Done() - c := client.New() - clients = append(clients, c) - err := c.Subscribe(context.Background(), q) - if err == nil && i == len(tests) - 1 { // reject third - t.Errorf("Expecting rejection message as no connections are allowed") - } - if err != nil && i < len(tests) - 1 { // accept first two - t.Errorf("Expecting accepts for first two connections") - } - }() - - wg.Wait() - }) - } - - for _, cacheClient := range(clients) { - cacheClient.Close() - } + s := createRejectServer(t, 8081) + go runServer(t, s) + defer s.Stop() + + tests := []struct { + desc string + q client.Query + want []client.Notification + poll int + }{ + { + desc: "Reject OTHERS/proc/uptime", + poll: 10, + q: client.Query{ + Target: "OTHERS", + Type: client.Poll, + Queries: []client.Path{{"proc", "uptime"}}, + TLS: &tls.Config{InsecureSkipVerify: true}, + }, + want: []client.Notification{ + client.Connected{}, + client.Sync{}, + }, + }, + { + desc: "Reject COUNTERS/Ethernet*", + poll: 10, + q: client.Query{ + Target: "COUNTERS_DB", + Type: client.Poll, + Queries: []client.Path{{"COUNTERS", "Ethernet*"}}, + TLS: &tls.Config{InsecureSkipVerify: true}, + }, + want: []client.Notification{ + client.Connected{}, + client.Sync{}, + }, + }, + { + desc: "Reject COUNTERS/Ethernet68", + poll: 10, + q: client.Query{ + Target: "COUNTERS_DB", + Type: client.Poll, + Queries: []client.Path{{"COUNTERS", "Ethernet68"}}, + TLS: &tls.Config{InsecureSkipVerify: true}, + }, + want: []client.Notification{ + client.Connected{}, + client.Sync{}, + }, + }, + } + + var clients []*cacheclient.CacheClient + + for i, tt := range tests { + t.Run(tt.desc, func(t *testing.T) { + q := tt.q + q.Addrs = []string{"127.0.0.1:8081"} + var gotNoti []client.Notification + q.NotificationHandler = func(n client.Notification) error { + if nn, ok := n.(client.Update); ok { + nn.TS = time.Unix(0, 200) + gotNoti = append(gotNoti, nn) + } else { + gotNoti = append(gotNoti, n) + } + return nil + } + + wg := new(sync.WaitGroup) + wg.Add(1) + + go func() { + defer wg.Done() + c := client.New() + clients = append(clients, c) + err := c.Subscribe(context.Background(), q) + if err == nil && i == len(tests)-1 { // reject third + t.Errorf("Expecting rejection message as no connections are allowed") + } + if err != nil && i < len(tests)-1 { // accept first two + t.Errorf("Expecting accepts for first two connections") + } + }() + + wg.Wait() + }) + } + + for _, cacheClient := range clients { + cacheClient.Close() + } } func TestConnectionDataSet(t *testing.T) { - s := createServer(t, 8081) - go runServer(t, s) - defer s.Stop() - - tests := []struct { - desc string - q client.Query - want []client.Notification - poll int - }{ - { - desc: "poll query for COUNTERS/Ethernet*", - poll: 10, - q: client.Query{ - Target: "COUNTERS_DB", - Type: client.Poll, - Queries: []client.Path{{"COUNTERS", "Ethernet*"}}, - TLS: &tls.Config{InsecureSkipVerify: true}, - }, - want: []client.Notification{ - client.Connected{}, - client.Sync{}, - }, - }, - } - namespace, _ := sdcfg.GetDbDefaultNamespace() - rclient := getRedisClientN(t, 6, namespace) - defer rclient.Close() - - for _, tt := range tests { - prepareStateDb(t, namespace) - t.Run(tt.desc, func(t *testing.T) { - q := tt.q - q.Addrs = []string{"127.0.0.1:8081"} - c := client.New() - - wg := new(sync.WaitGroup) - wg.Add(1) - - go func() { - defer wg.Done() - if err := c.Subscribe(context.Background(), q); err != nil { - t.Errorf("c.Subscribe(): got error %v, expected nil", err) - } - }() - - wg.Wait() - - resultMap, err := rclient.HGetAll("TELEMETRY_CONNECTIONS").Result() - - if resultMap == nil { - t.Errorf("result Map is nil, expected non nil, err: %v", err) - } - if len(resultMap) != 1 { - t.Errorf("result for TELEMETRY_CONNECTIONS should be 1") - } - - for key, _ := range resultMap { - if !strings.Contains(key, "COUNTERS_DB|COUNTERS|Ethernet*") { - t.Errorf("key is expected to contain correct query, received: %s", key) - } - } - - c.Close() - }) - } + s := createServer(t, 8081) + go runServer(t, s) + defer s.Stop() + + tests := []struct { + desc string + q client.Query + want []client.Notification + poll int + }{ + { + desc: "poll query for COUNTERS/Ethernet*", + poll: 10, + q: client.Query{ + Target: "COUNTERS_DB", + Type: client.Poll, + Queries: []client.Path{{"COUNTERS", "Ethernet*"}}, + TLS: &tls.Config{InsecureSkipVerify: true}, + }, + want: []client.Notification{ + client.Connected{}, + client.Sync{}, + }, + }, + } + namespace, _ := sdcfg.GetDbDefaultNamespace() + rclient := getRedisClientN(t, 6, namespace) + defer rclient.Close() + + for _, tt := range tests { + prepareStateDb(t, namespace) + t.Run(tt.desc, func(t *testing.T) { + q := tt.q + q.Addrs = []string{"127.0.0.1:8081"} + c := client.New() + + wg := new(sync.WaitGroup) + wg.Add(1) + + go func() { + defer wg.Done() + if err := c.Subscribe(context.Background(), q); err != nil { + t.Errorf("c.Subscribe(): got error %v, expected nil", err) + } + }() + + wg.Wait() + + resultMap, err := rclient.HGetAll("TELEMETRY_CONNECTIONS").Result() + + if resultMap == nil { + t.Errorf("result Map is nil, expected non nil, err: %v", err) + } + if len(resultMap) != 1 { + t.Errorf("result for TELEMETRY_CONNECTIONS should be 1") + } + + for key, _ := range resultMap { + if !strings.Contains(key, "COUNTERS_DB|COUNTERS|Ethernet*") { + t.Errorf("key is expected to contain correct query, received: %s", key) + } + } + + c.Close() + }) + } } func TestConnectionsKeepAlive(t *testing.T) { - s := createKeepAliveServer(t, 8081) - go runServer(t, s) - defer s.Stop() - - tests := []struct { - desc string - q client.Query - want []client.Notification - poll int - }{ - { - desc: "Testing KeepAlive with goroutine count", - poll: 3, - q: client.Query{ - Target: "COUNTERS_DB", - Type: client.Poll, - Queries: []client.Path{{"COUNTERS", "Ethernet*"}}, - TLS: &tls.Config{InsecureSkipVerify: true}, - }, - want: []client.Notification{ - client.Connected{}, - client.Sync{}, - }, - }, - } - for _, tt := range(tests) { - for i := 0; i < 5; i++ { - t.Run(tt.desc, func(t *testing.T) { - q := tt.q - q.Addrs = []string{"127.0.0.1:8081"} - c := client.New() - wg := new(sync.WaitGroup) - wg.Add(1) - - go func() { - defer wg.Done() - if err := c.Subscribe(context.Background(), q); err != nil { - t.Errorf("c.Subscribe(): got error %v, expected nil", err) - } - }() - - wg.Wait() - after_subscribe := runtime.NumGoroutine() - t.Logf("Num go routines after client subscribe: %d", after_subscribe) - time.Sleep(10 * time.Second) - after_sleep := runtime.NumGoroutine() - t.Logf("Num go routines after sleep, should be less, as keepalive should close idle connections: %d", after_sleep) - if after_sleep > after_subscribe { - t.Errorf("Expecting goroutine after sleep to be less than or equal to after subscribe, after_subscribe: %d, after_sleep: %d", after_subscribe, after_sleep) - } - }) - } - } + s := createKeepAliveServer(t, 8081) + go runServer(t, s) + defer s.Stop() + + tests := []struct { + desc string + q client.Query + want []client.Notification + poll int + }{ + { + desc: "Testing KeepAlive with goroutine count", + poll: 3, + q: client.Query{ + Target: "COUNTERS_DB", + Type: client.Poll, + Queries: []client.Path{{"COUNTERS", "Ethernet*"}}, + TLS: &tls.Config{InsecureSkipVerify: true}, + }, + want: []client.Notification{ + client.Connected{}, + client.Sync{}, + }, + }, + } + for _, tt := range tests { + for i := 0; i < 5; i++ { + t.Run(tt.desc, func(t *testing.T) { + q := tt.q + q.Addrs = []string{"127.0.0.1:8081"} + c := client.New() + wg := new(sync.WaitGroup) + wg.Add(1) + + go func() { + defer wg.Done() + if err := c.Subscribe(context.Background(), q); err != nil { + t.Errorf("c.Subscribe(): got error %v, expected nil", err) + } + }() + + wg.Wait() + after_subscribe := runtime.NumGoroutine() + t.Logf("Num go routines after client subscribe: %d", after_subscribe) + time.Sleep(10 * time.Second) + after_sleep := runtime.NumGoroutine() + t.Logf("Num go routines after sleep, should be less, as keepalive should close idle connections: %d", after_sleep) + if after_sleep > after_subscribe { + t.Errorf("Expecting goroutine after sleep to be less than or equal to after subscribe, after_subscribe: %d, after_sleep: %d", after_subscribe, after_sleep) + } + }) + } + } } func TestClient(t *testing.T) { - var mutexDeInit sync.RWMutex - var mutexHB sync.RWMutex - var mutexIdx sync.RWMutex - - // sonic-host:device-test-event is a test event. - // Events client will drop it on floor. - events := [] sdc.Evt_rcvd { - { "test0", 7, 777 }, - { "test1", 6, 677 }, - { "{\"sonic-host:device-test-event\"", 5, 577 }, - { "test2", 5, 577 }, - { "test3", 4, 477 }, - } - - HEARTBEAT_SET := 5 - heartbeat := 0 - event_index := 0 - rcv_timeout := sdc.SUBSCRIBER_TIMEOUT - deinit_done := false - - mock1 := gomonkey.ApplyFunc(sdc.C_init_subs, func(use_cache bool) unsafe.Pointer { - return nil - }) - defer mock1.Reset() - - mock2 := gomonkey.ApplyFunc(sdc.C_recv_evt, func(h unsafe.Pointer) (int, sdc.Evt_rcvd) { - rc := (int)(0) - var evt sdc.Evt_rcvd - mutexIdx.Lock() - current_index := event_index - mutexIdx.Unlock() - if current_index < len(events) { - evt = events[current_index] - mutexIdx.RLock() - event_index = current_index + 1 - mutexIdx.RUnlock() - } else { - time.Sleep(time.Millisecond * time.Duration(rcv_timeout)) - rc = -1 - } - return rc, evt - }) - defer mock2.Reset() - - mock3 := gomonkey.ApplyFunc(sdc.Set_heartbeat, func(val int) { - mutexHB.RLock() - heartbeat = val - mutexHB.RUnlock() - }) - defer mock3.Reset() - - mock4 := gomonkey.ApplyFunc(sdc.C_deinit_subs, func(h unsafe.Pointer) { - mutexDeInit.RLock() - deinit_done = true - mutexDeInit.RUnlock() - }) - defer mock4.Reset() - - mock5 := gomonkey.ApplyMethod(reflect.TypeOf(&queue.PriorityQueue{}), "Put", func(pq *queue.PriorityQueue, item ...queue.Item) error { - return fmt.Errorf("Queue error") - }) - defer mock5.Reset() - - mock6 := gomonkey.ApplyMethod(reflect.TypeOf(&queue.PriorityQueue{}), "Len", func(pq *queue.PriorityQueue) int { - return 150000 // Max size for pending events in PQ is 102400 - }) - defer mock6.Reset() - - s := createServer(t, 8081) - go runServer(t, s) - - qstr := fmt.Sprintf("all[heartbeat=%d]", HEARTBEAT_SET) - q := createEventsQuery(t, qstr) - q.Addrs = []string{"127.0.0.1:8081"} - - tests := []struct { - desc string - pub_data []string - wantErr bool - wantNoti []client.Notification - pause int - poll int - } { - { - desc: "dropped event", - poll: 3, - }, - { - desc: "queue error", - poll: 3, - }, - { - desc: "base client create", - poll: 3, - }, - } - - sdc.C_init_subs(true) - - var mutexNoti sync.RWMutex - - for testNum, tt := range tests { - mutexHB.RLock() - heartbeat = 0 - mutexHB.RUnlock() - - mutexIdx.RLock() - event_index = 0 - mutexIdx.RUnlock() - - mutexDeInit.RLock() - deinit_done = false - mutexDeInit.RUnlock() - - t.Run(tt.desc, func(t *testing.T) { - c := client.New() - defer c.Close() - - var gotNoti []string - q.NotificationHandler = func(n client.Notification) error { - if nn, ok := n.(client.Update); ok { - nn.TS = time.Unix(0, 200) - str := fmt.Sprintf("%v", nn.Val) - - mutexNoti.Lock() - currentNoti := gotNoti - mutexNoti.Unlock() - - mutexNoti.RLock() - gotNoti = append(currentNoti, str) - mutexNoti.RUnlock() - } - return nil - } - - go func() { - c.Subscribe(context.Background(), q) - }() - - // wait for half second for subscribeRequest to sync - // and to receive events via notification handler. - - time.Sleep(time.Millisecond * 2000) - - if testNum > 1 { - mutexNoti.Lock() - // -1 to discount test event, which receiver would drop. - if (len(events) - 1) != len(gotNoti) { - t.Errorf("noti[%d] != events[%d]", len(gotNoti), len(events)-1) - } - - mutexHB.Lock() - if (heartbeat != HEARTBEAT_SET) { - t.Errorf("Heartbeat is not set %d != expected:%d", heartbeat, HEARTBEAT_SET) - } - mutexHB.Unlock() - - fmt.Printf("DONE: Expect events:%d - 1 gotNoti=%d\n", len(events), len(gotNoti)) - mutexNoti.Unlock() - } - }) - - if testNum == 0 { - mock6.Reset() - } - - if testNum == 1 { - mock5.Reset() - } - time.Sleep(time.Millisecond * 1000) - - mutexDeInit.Lock() - if deinit_done == false { - t.Errorf("Events client deinit *NOT* called.") - } - mutexDeInit.Unlock() - // t.Log("END of a TEST") - } - - s.Stop() + var mutexDeInit sync.RWMutex + var mutexHB sync.RWMutex + var mutexIdx sync.RWMutex + + // sonic-host:device-test-event is a test event. + // Events client will drop it on floor. + events := []sdc.Evt_rcvd{ + {"test0", 7, 777}, + {"test1", 6, 677}, + {"{\"sonic-host:device-test-event\"", 5, 577}, + {"test2", 5, 577}, + {"test3", 4, 477}, + } + + HEARTBEAT_SET := 5 + heartbeat := 0 + event_index := 0 + rcv_timeout := sdc.SUBSCRIBER_TIMEOUT + deinit_done := false + + mock1 := gomonkey.ApplyFunc(sdc.C_init_subs, func(use_cache bool) unsafe.Pointer { + return nil + }) + defer mock1.Reset() + + mock2 := gomonkey.ApplyFunc(sdc.C_recv_evt, func(h unsafe.Pointer) (int, sdc.Evt_rcvd) { + rc := (int)(0) + var evt sdc.Evt_rcvd + mutexIdx.Lock() + current_index := event_index + mutexIdx.Unlock() + if current_index < len(events) { + evt = events[current_index] + mutexIdx.RLock() + event_index = current_index + 1 + mutexIdx.RUnlock() + } else { + time.Sleep(time.Millisecond * time.Duration(rcv_timeout)) + rc = -1 + } + return rc, evt + }) + defer mock2.Reset() + + mock3 := gomonkey.ApplyFunc(sdc.Set_heartbeat, func(val int) { + mutexHB.RLock() + heartbeat = val + mutexHB.RUnlock() + }) + defer mock3.Reset() + + mock4 := gomonkey.ApplyFunc(sdc.C_deinit_subs, func(h unsafe.Pointer) { + mutexDeInit.RLock() + deinit_done = true + mutexDeInit.RUnlock() + }) + defer mock4.Reset() + + mock5 := gomonkey.ApplyMethod(reflect.TypeOf(&queue.PriorityQueue{}), "Put", func(pq *queue.PriorityQueue, item ...queue.Item) error { + return fmt.Errorf("Queue error") + }) + defer mock5.Reset() + + mock6 := gomonkey.ApplyMethod(reflect.TypeOf(&queue.PriorityQueue{}), "Len", func(pq *queue.PriorityQueue) int { + return 150000 // Max size for pending events in PQ is 102400 + }) + defer mock6.Reset() + + s := createServer(t, 8081) + go runServer(t, s) + + qstr := fmt.Sprintf("all[heartbeat=%d]", HEARTBEAT_SET) + q := createEventsQuery(t, qstr) + q.Addrs = []string{"127.0.0.1:8081"} + + tests := []struct { + desc string + pub_data []string + wantErr bool + wantNoti []client.Notification + pause int + poll int + }{ + { + desc: "dropped event", + poll: 3, + }, + { + desc: "queue error", + poll: 3, + }, + { + desc: "base client create", + poll: 3, + }, + } + + sdc.C_init_subs(true) + + var mutexNoti sync.RWMutex + + for testNum, tt := range tests { + mutexHB.RLock() + heartbeat = 0 + mutexHB.RUnlock() + + mutexIdx.RLock() + event_index = 0 + mutexIdx.RUnlock() + + mutexDeInit.RLock() + deinit_done = false + mutexDeInit.RUnlock() + + t.Run(tt.desc, func(t *testing.T) { + c := client.New() + defer c.Close() + + var gotNoti []string + q.NotificationHandler = func(n client.Notification) error { + if nn, ok := n.(client.Update); ok { + nn.TS = time.Unix(0, 200) + str := fmt.Sprintf("%v", nn.Val) + + mutexNoti.Lock() + currentNoti := gotNoti + mutexNoti.Unlock() + + mutexNoti.RLock() + gotNoti = append(currentNoti, str) + mutexNoti.RUnlock() + } + return nil + } + + go func() { + c.Subscribe(context.Background(), q) + }() + + // wait for half second for subscribeRequest to sync + // and to receive events via notification handler. + + time.Sleep(time.Millisecond * 2000) + + if testNum > 1 { + mutexNoti.Lock() + // -1 to discount test event, which receiver would drop. + if (len(events) - 1) != len(gotNoti) { + t.Errorf("noti[%d] != events[%d]", len(gotNoti), len(events)-1) + } + + mutexHB.Lock() + if heartbeat != HEARTBEAT_SET { + t.Errorf("Heartbeat is not set %d != expected:%d", heartbeat, HEARTBEAT_SET) + } + mutexHB.Unlock() + + fmt.Printf("DONE: Expect events:%d - 1 gotNoti=%d\n", len(events), len(gotNoti)) + mutexNoti.Unlock() + } + }) + + if testNum == 0 { + mock6.Reset() + } + + if testNum == 1 { + mock5.Reset() + } + time.Sleep(time.Millisecond * 1000) + + mutexDeInit.Lock() + if deinit_done == false { + t.Errorf("Events client deinit *NOT* called.") + } + mutexDeInit.Unlock() + // t.Log("END of a TEST") + } + + s.Stop() } func TestTableData2MsiUseKey(t *testing.T) { - tblPath := sdc.CreateTablePath("STATE_DB", "NEIGH_STATE_TABLE", "|", "10.0.0.57") - newMsi := make(map[string]interface{}) - sdc.TableData2Msi(&tblPath, true, nil, &newMsi) - newMsiData, _ := json.MarshalIndent(newMsi, "", " ") - t.Logf(string(newMsiData)) - expectedMsi := map[string]interface{} { - "10.0.0.57": map[string]interface{} { - "peerType": "e-BGP", - "state": "Established", - }, - } - expectedMsiData, _ := json.MarshalIndent(expectedMsi, "", " ") - t.Logf(string(expectedMsiData)) - - if !reflect.DeepEqual(newMsi, expectedMsi) { - t.Errorf("Msi data does not match for use key = true") - } + tblPath := sdc.CreateTablePath("STATE_DB", "NEIGH_STATE_TABLE", "|", "10.0.0.57") + newMsi := make(map[string]interface{}) + sdc.TableData2Msi(&tblPath, true, nil, &newMsi) + newMsiData, _ := json.MarshalIndent(newMsi, "", " ") + t.Logf(string(newMsiData)) + expectedMsi := map[string]interface{}{ + "10.0.0.57": map[string]interface{}{ + "peerType": "e-BGP", + "state": "Established", + }, + } + expectedMsiData, _ := json.MarshalIndent(expectedMsi, "", " ") + t.Logf(string(expectedMsiData)) + + if !reflect.DeepEqual(newMsi, expectedMsi) { + t.Errorf("Msi data does not match for use key = true") + } } func TestRecoverFromJSONSerializationPanic(t *testing.T) { - panicMarshal := func(v interface{}) ([]byte, error) { - panic("json.Marshal panics and is unable to serialize JSON") - } - mock := gomonkey.ApplyFunc(json.Marshal, panicMarshal) - defer mock.Reset() + panicMarshal := func(v interface{}) ([]byte, error) { + panic("json.Marshal panics and is unable to serialize JSON") + } + mock := gomonkey.ApplyFunc(json.Marshal, panicMarshal) + defer mock.Reset() - tblPath := sdc.CreateTablePath("STATE_DB", "NEIGH_STATE_TABLE", "|", "10.0.0.57") - msi := make(map[string]interface{}) - sdc.TableData2Msi(&tblPath, true, nil, &msi) + tblPath := sdc.CreateTablePath("STATE_DB", "NEIGH_STATE_TABLE", "|", "10.0.0.57") + msi := make(map[string]interface{}) + sdc.TableData2Msi(&tblPath, true, nil, &msi) - typedValue, err := sdc.Msi2TypedValue(msi) - if typedValue != nil && err != nil { - t.Errorf("Test should recover from panic and have nil TypedValue/Error after attempting JSON serialization") - } + typedValue, err := sdc.Msi2TypedValue(msi) + if typedValue != nil && err != nil { + t.Errorf("Test should recover from panic and have nil TypedValue/Error after attempting JSON serialization") + } } func TestGnmiSetBatch(t *testing.T) { - mockCode := -` + mockCode := + ` print('No Yang validation for test mode...') print('%s') ` @@ -3930,7 +3929,7 @@ func TestServerPort(t *testing.T) { } func TestNilServerStop(t *testing.T) { - // Create a server with nil grpc server, such that s.Stop is called with nil value + // Create a server with nil grpc server, such that s.Stop is called with nil value t.Log("Expecting s.Stop to log error as server is nil") s := &Server{} s.Stop() @@ -4165,6 +4164,29 @@ func TestMasterArbitration(t *testing.T) { }) }*/ +func TestSaveOnSet(t *testing.T) { + // Fail client creation + fakeDBC := gomonkey.ApplyFuncReturn(ssc.NewDbusClient, nil, fmt.Errorf("Fail Create")) + if err := SaveOnSetEnabled(); err == nil { + t.Error("Expected Client Failure") + } + fakeDBC.Reset() + + // Successful Dbus call + goodDbus := gomonkey.ApplyFuncReturn(ssc.DbusApi, nil) + if err := SaveOnSetEnabled(); err != nil { + t.Error("Unexpected DBUS failure") + } + goodDbus.Reset() + + // Fail Dbus call + badDbus := gomonkey.ApplyFuncReturn(ssc.DbusApi, fmt.Errorf("Fail Send")) + defer badDbus.Reset() + if err := SaveOnSetEnabled(); err == nil { + t.Error("Expected DBUS failure") + } +} + func init() { // Enable logs at UT setup flag.Lookup("v").Value.Set("10") diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index 2cf0284f..febee1d3 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -1,37 +1,39 @@ package main import ( + "crypto/sha512" "crypto/tls" "crypto/x509" - "crypto/sha512" "encoding/hex" "flag" + "fmt" "io" "io/ioutil" - "path/filepath" - "time" "os" "os/signal" + "path/filepath" "strings" - "syscall" "sync" "sync/atomic" - log "github.com/golang/glog" + "syscall" + "time" + + gnmi "github.com/sonic-net/sonic-gnmi/gnmi_server" + testcert "github.com/sonic-net/sonic-gnmi/testdata/tls" + "github.com/fsnotify/fsnotify" + log "github.com/golang/glog" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" - "fmt" - gnmi "github.com/sonic-net/sonic-gnmi/gnmi_server" - testcert "github.com/sonic-net/sonic-gnmi/testdata/tls" ) type ServerControlValue int const ( - ServerStop ServerControlValue = iota // 0 - ServerStart ServerControlValue = iota // 1 - ServerRestart ServerControlValue = iota // 2 + ServerStop ServerControlValue = iota // 0 + ServerStart ServerControlValue = iota // 1 + ServerRestart ServerControlValue = iota // 2 ) type TelemetryConfig struct { @@ -52,6 +54,7 @@ type TelemetryConfig struct { GnmiNativeWrite *bool Threshold *int WithMasterArbitration *bool + WithSaveOnSet *bool IdleConnDuration *int } @@ -100,9 +103,9 @@ func runTelemetry(args []string) error { return nil } -func getGlogFlagsMap() map[string] bool { +func getGlogFlagsMap() map[string]bool { // glog flags: https://pkg.go.dev/github.com/golang/glog - return map[string]bool { + return map[string]bool{ "-alsologtostderr": true, "-log_backtrace_at": true, "-log_dir": true, @@ -140,7 +143,7 @@ func parseOSArgs() ([]string, []string) { } func setupFlags(fs *flag.FlagSet) (*TelemetryConfig, *gnmi.Config, error) { - telemetryCfg := &TelemetryConfig { + telemetryCfg := &TelemetryConfig{ UserAuth: gnmi.AuthTypes{"password": false, "cert": false, "jwt": false}, Port: fs.Int("port", -1, "port to listen on"), LogLevel: fs.Int("v", 2, "log level of process"), @@ -158,6 +161,7 @@ func setupFlags(fs *flag.FlagSet) (*TelemetryConfig, *gnmi.Config, error) { GnmiNativeWrite: fs.Bool("gnmi_native_write", gnmi.ENABLE_NATIVE_WRITE, "Enable gNMI native write"), Threshold: fs.Int("threshold", 100, "max number of client connections"), WithMasterArbitration: fs.Bool("with-master-arbitration", false, "Enables master arbitration policy."), + WithSaveOnSet: fs.Bool("with-save-on-set", false, "Enables save-on-set."), IdleConnDuration: fs.Int("idle_conn_duration", 5, "Seconds before server closes idle connections"), } @@ -258,9 +262,8 @@ func iNotifyCertMonitoring(watcher *fsnotify.Watcher, telemetryCfg *TelemetryCon for { select { case event := <-watcher.Events: - if event.Name != "" && ( - filepath.Ext(event.Name) == ".cert" || filepath.Ext(event.Name) == ".crt" || - filepath.Ext(event.Name) == ".cer" || filepath.Ext(event.Name) == ".pem" || + if event.Name != "" && (filepath.Ext(event.Name) == ".cert" || filepath.Ext(event.Name) == ".crt" || + filepath.Ext(event.Name) == ".cer" || filepath.Ext(event.Name) == ".pem" || filepath.Ext(event.Name) == ".key") { log.V(1).Infof("Inotify watcher has received event: %v", event) if event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create { @@ -271,7 +274,7 @@ func iNotifyCertMonitoring(watcher *fsnotify.Watcher, telemetryCfg *TelemetryCon } if event.Op&fsnotify.Remove == fsnotify.Remove || event.Op&fsnotify.Rename == fsnotify.Rename { log.V(1).Infof("Cert file has been deleted: %s", event.Name) - serverControlSignal <- ServerRestart // let server know that a remove/rename event occurred + serverControlSignal <- ServerRestart // let server know that a remove/rename event occurred if atomic.LoadInt32(certLoaded) == 1 { // Should continue monitoring if certs are not present done <- true return @@ -361,13 +364,13 @@ func startGNMIServer(telemetryCfg *TelemetryConfig, cfg *gnmi.Config, serverCont } } - tlsCfg := &tls.Config { - ClientAuth: tls.RequireAndVerifyClientCert, - Certificates: []tls.Certificate{certificate}, - MinVersion: tls.VersionTLS12, - CurvePreferences: []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256}, + tlsCfg := &tls.Config{ + ClientAuth: tls.RequireAndVerifyClientCert, + Certificates: []tls.Certificate{certificate}, + MinVersion: tls.VersionTLS12, + CurvePreferences: []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256}, PreferServerCipherSuites: true, - CipherSuites: []uint16 { + CipherSuites: []uint16{ tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, @@ -438,6 +441,9 @@ func startGNMIServer(telemetryCfg *TelemetryConfig, cfg *gnmi.Config, serverCont log.Errorf("Failed to create gNMI server: %v", err) return } + if *telemetryCfg.WithSaveOnSet { + s.SaveStartupConfig = gnmi.SaveOnSetEnabled + } if *telemetryCfg.WithMasterArbitration { s.ReqFromMaster = gnmi.ReqFromMasterEnabledMA From 3ea4fe59420b4d726d68dd91d1bae2b4c2e3296d Mon Sep 17 00:00:00 2001 From: Zain Budhwani <99770260+zbud-msft@users.noreply.github.com> Date: Wed, 29 May 2024 15:53:09 -0700 Subject: [PATCH 4/4] Revert "Account for GLOBAL key in PFC_WD (#178)" (#221) --- sonic_data_client/virtual_db.go | 6 ++---- testdata/CONFIG_PFCWD_PORTS.txt | 5 +---- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/sonic_data_client/virtual_db.go b/sonic_data_client/virtual_db.go index ca2fbb0c..c3ded154 100644 --- a/sonic_data_client/virtual_db.go +++ b/sonic_data_client/virtual_db.go @@ -149,10 +149,8 @@ func getPfcwdMap() (map[string]map[string]string, error) { } for _, key := range resp { - if strings.Contains(key, "Ethernet") { // Account for non interface ports in PFC_WD such as GLOBAL - name := key[7:] - pfcwdName_map[name] = make(map[string]string) - } + name := key[7:] + pfcwdName_map[name] = make(map[string]string) } // Get Queue indexes that are enabled with PFC-WD diff --git a/testdata/CONFIG_PFCWD_PORTS.txt b/testdata/CONFIG_PFCWD_PORTS.txt index d30d9cc5..6629fb68 100644 --- a/testdata/CONFIG_PFCWD_PORTS.txt +++ b/testdata/CONFIG_PFCWD_PORTS.txt @@ -165,10 +165,7 @@ "PFC_WD|Ethernet9": { "action": "drop" }, - "PFC_WD|GLOBAL": { - "action": "drop" - }, - "PORT_QOS_MAP|Ethernet0,Ethernet1,Ethernet2,Ethernet3,Ethernet4,Ethernet5,Ethernet6,Ethernet7,Ethernet8,Ethernet9,Ethernet10,Ethernet11,Ethernet12,Ethernet13,Ethernet14,Ethernet15,Ethernet16,Ethernet17,Ethernet18,Ethernet19,Ethernet20,Ethernet21,Ethernet22,Ethernet23,Ethernet24,Ethernet25,Ethernet26,Ethernet27,Ethernet28,Ethernet29,Ethernet30,Ethernet31,Ethernet32,Ethernet33,Ethernet34,Ethernet35,Ethernet36,Ethernet37,Ethernet38,Ethernet39,Ethernet40,Ethernet41,Ethernet42,Ethernet43,Ethernet44,Ethernet45,Ethernet46,Ethernet47,Ethernet48,Ethernet52,Ethernet56,Ethernet60,Ethernet64,Ethernet68,global": { + "PORT_QOS_MAP|Ethernet0,Ethernet1,Ethernet2,Ethernet3,Ethernet4,Ethernet5,Ethernet6,Ethernet7,Ethernet8,Ethernet9,Ethernet10,Ethernet11,Ethernet12,Ethernet13,Ethernet14,Ethernet15,Ethernet16,Ethernet17,Ethernet18,Ethernet19,Ethernet20,Ethernet21,Ethernet22,Ethernet23,Ethernet24,Ethernet25,Ethernet26,Ethernet27,Ethernet28,Ethernet29,Ethernet30,Ethernet31,Ethernet32,Ethernet33,Ethernet34,Ethernet35,Ethernet36,Ethernet37,Ethernet38,Ethernet39,Ethernet40,Ethernet41,Ethernet42,Ethernet43,Ethernet44,Ethernet45,Ethernet46,Ethernet47,Ethernet48,Ethernet52,Ethernet56,Ethernet60,Ethernet64,Ethernet68": { "pfc_enable": "3,4" } }