Skip to content

Commit

Permalink
Merge branch 'master' into dev-reset-local-users-password
Browse files Browse the repository at this point in the history
  • Loading branch information
qiluo-msft authored Oct 2, 2024
2 parents 0541aed + 898aa5d commit fa8f09f
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .azure-pipelines/test-docker-sonic-vs-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ jobs:
# run pytests in sets of 20
all_tests=$(ls test_*.py)
all_tests="${all_tests} p4rt"
all_tests="${all_tests} p4rt dash"
test_set=()
for test in ${all_tests}; do
test_set+=("${test}")
Expand Down
15 changes: 11 additions & 4 deletions common/events_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,16 +310,23 @@ struct serialization
more = 0;
zmq_msg_init(&msg);
int rc = zmq_msg_recv(&msg, sock, flag);
if (rc != -1) {
if (rc == 1) {
char control_character = *(char*)zmq_msg_data(&msg);
if (control_character == 0x01 || control_character == 0x00) {
SWSS_LOG_INFO("Received subscription/unsubscription message when XSUB connect to XPUB: %c", control_character);
} else {
SWSS_LOG_DEBUG("Received non subscription based control character: %c", control_character);
}
rc = 0;
} else if (rc != -1) {
size_t more_size = sizeof (more);

zmq_getsockopt (sock, ZMQ_RCVMORE, &more, &more_size);

rc = zmsg_to_map(msg, data);
RET_ON_ERR(rc == 0, "Failed to deserialize part rc=%d", rc);
/* read more flag if message read fails to de-serialize */
}
else {
} else {
/* override with zmq err */
rc = zmq_errno();
if (rc != 11) {
Expand All @@ -332,7 +339,7 @@ struct serialization
return rc;
}


template<typename DT>
int
zmq_send_part(void *sock, int flag, const DT &data)
Expand Down
6 changes: 6 additions & 0 deletions common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ namespace swss {
#define CHASSIS_APP_DB 12
#define CHASSIS_STATE_DB 13
#define APPL_STATE_DB 14
#define DPU_APPL_DB 15
#define DPU_APPL_STATE_DB 16
#define DPU_STATE_DB 17
#define DPU_COUNTERS_DB 18
#define EVENT_DB 19
#define BMP_STATE_DB 20

Expand Down Expand Up @@ -213,6 +217,7 @@ namespace swss {
#define COUNTERS_DEBUG_NAME_SWITCH_STAT_MAP "COUNTERS_DEBUG_NAME_SWITCH_STAT_MAP"
#define COUNTERS_TUNNEL_TYPE_MAP "COUNTERS_TUNNEL_TYPE_MAP"
#define COUNTERS_TUNNEL_NAME_MAP "COUNTERS_TUNNEL_NAME_MAP"
#define COUNTERS_ENI_NAME_MAP "COUNTERS_ENI_NAME_MAP"
#define COUNTERS_ROUTE_NAME_MAP "COUNTERS_ROUTE_NAME_MAP"
#define COUNTERS_ROUTE_TO_PATTERN_MAP "COUNTERS_ROUTE_TO_PATTERN_MAP"
#define COUNTERS_FABRIC_QUEUE_NAME_MAP "COUNTERS_FABRIC_QUEUE_NAME_MAP"
Expand Down Expand Up @@ -250,6 +255,7 @@ namespace swss {
#define QUEUE_COUNTER_ID_LIST "QUEUE_COUNTER_ID_LIST"
#define QUEUE_ATTR_ID_LIST "QUEUE_ATTR_ID_LIST"
#define BUFFER_POOL_COUNTER_ID_LIST "BUFFER_POOL_COUNTER_ID_LIST"
#define ENI_COUNTER_ID_LIST "ENI_COUNTER_ID_LIST"
#define PFC_WD_STATE_TABLE "PFC_WD_STATE_TABLE"
#define PFC_WD_PORT_COUNTER_ID_LIST "PORT_COUNTER_ID_LIST"
#define PFC_WD_QUEUE_COUNTER_ID_LIST "QUEUE_COUNTER_ID_LIST"
Expand Down
15 changes: 13 additions & 2 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ using namespace std;
namespace swss {

ZmqClient::ZmqClient(const std::string& endpoint)
:ZmqClient(endpoint, "")
{
initialize(endpoint);
}

ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf)
{
initialize(endpoint, vrf);
}

ZmqClient::~ZmqClient()
Expand All @@ -39,12 +44,13 @@ ZmqClient::~ZmqClient()
}
}

void ZmqClient::initialize(const std::string& endpoint)
void ZmqClient::initialize(const std::string& endpoint, const std::string& vrf)
{
m_connected = false;
m_endpoint = endpoint;
m_context = nullptr;
m_socket = nullptr;
m_vrf = vrf;

connect();
}
Expand Down Expand Up @@ -89,6 +95,11 @@ void ZmqClient::connect()
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(m_socket, ZMQ_SNDHWM, &high_watermark, sizeof(high_watermark));

if (!m_vrf.empty())
{
zmq_setsockopt(m_socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length());
}

SWSS_LOG_NOTICE("connect to zmq endpoint: %s", m_endpoint.c_str());
int rc = zmq_connect(m_socket, m_endpoint.c_str());
if (rc != 0)
Expand Down
6 changes: 4 additions & 2 deletions common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class ZmqClient
{
public:
ZmqClient(const std::string& endpoint);
ZmqClient(const std::string& endpoint, const std::string& vrf);
~ZmqClient();

bool isConnected();
Expand All @@ -24,11 +25,12 @@ class ZmqClient
const std::vector<KeyOpFieldsValuesTuple>& kcos,
std::vector<char>& sendbuffer);
private:
void initialize(const std::string& endpoint);

void initialize(const std::string& endpoint, const std::string& vrf);

std::string m_endpoint;

std::string m_vrf;

void* m_context;

void* m_socket;
Expand Down
13 changes: 12 additions & 1 deletion common/zmqserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ using namespace std;
namespace swss {

ZmqServer::ZmqServer(const std::string& endpoint)
: m_endpoint(endpoint)
: ZmqServer(endpoint, "")
{
}

ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)
: m_endpoint(endpoint),
m_vrf(vrf)
{
m_buffer.resize(MQ_RESPONSE_MAX_COUNT);
m_runThread = true;
Expand Down Expand Up @@ -92,6 +98,11 @@ void ZmqServer::mqPollThread()
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));

if (!m_vrf.empty())
{
zmq_setsockopt(socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length());
}

int rc = zmq_bind(socket, m_endpoint.c_str());
if (rc != 0)
{
Expand Down
3 changes: 3 additions & 0 deletions common/zmqserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class ZmqServer
static constexpr int DEFAULT_POP_BATCH_SIZE = 128;

ZmqServer(const std::string& endpoint);
ZmqServer(const std::string& endpoint, const std::string& vrf);
~ZmqServer();

void registerMessageHandler(
Expand All @@ -53,6 +54,8 @@ class ZmqServer

std::string m_endpoint;

std::string m_vrf;

std::map<std::string, std::map<std::string, ZmqMessageHandler*>> m_HandlerMap;
};

Expand Down
46 changes: 44 additions & 2 deletions tests/events_common_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,51 @@ TEST(events_common, send_recv)
zmq_ctx_term(zmq_ctx);
}

TEST(events_common, send_recv_control_character)
{
#if 0
{
/* Direct log messages to stdout */
string dummy, op("STDOUT");
swss::Logger::swssOutputNotify(dummy, op);
swss::Logger::setMinPrio(swss::Logger::SWSS_DEBUG);
}
#endif

char *path = "tcp://127.0.0.1:5570";
void *zmq_ctx = zmq_ctx_new();
void *sock_p0 = zmq_socket (zmq_ctx, ZMQ_PAIR);
EXPECT_EQ(0, zmq_connect (sock_p0, path));

void *sock_p1 = zmq_socket (zmq_ctx, ZMQ_PAIR);
EXPECT_EQ(0, zmq_bind (sock_p1, path));

string source;
map<string, string> m;

// Subscription based control character test
zmq_msg_t sub_msg;
zmq_msg_init_size(&sub_msg, 1);
*(char*)zmq_msg_data(&sub_msg) = 0x01;
EXPECT_EQ(1, zmq_msg_send(&sub_msg, sock_p0, 0));
zmq_msg_close(&sub_msg);
// First part will be read only and will return as 0, but will not be deserialized event
EXPECT_EQ(0, zmq_message_read(sock_p1, 0, source, m));
EXPECT_EQ("", source);
EXPECT_EQ(0, m.size());

// Non-subscription based control character test
zmq_msg_t ctrl_msg;
zmq_msg_init_size(&ctrl_msg, 1);
*(char*)zmq_msg_data(&ctrl_msg) = 0x07;
EXPECT_EQ(1, zmq_msg_send(&ctrl_msg, sock_p0, 0));
zmq_msg_close(&ctrl_msg);
// First part will be read only and will return as 0, but will not be deserialized event
EXPECT_EQ(0, zmq_message_read(sock_p1, 0, source, m));
EXPECT_EQ("", source);
EXPECT_EQ(0, m.size());



zmq_close(sock_p0);
zmq_close(sock_p1);
zmq_ctx_term(zmq_ctx);
}

0 comments on commit fa8f09f

Please sign in to comment.