From 87d7472f38b23339d76aadae94f4106f3855eed6 Mon Sep 17 00:00:00 2001 From: Lev <1187448+levb@users.noreply.github.com> Date: Wed, 25 Sep 2024 12:04:27 -0700 Subject: [PATCH] [ADDED, BREAKING] Services: queue group now configurable and can be disabled (#800) * [ADDED, BREAKING] Services: queue group now configurable and can be disabled. * Fixed the test It appears possible that the client exits before the (own) dispatcher thread finishes and quits. Waiting until the "Done" callback is invoked fixes it. Also, since a sub is (often? always?) freed from the own dispatcher thread, use natsThread_Detach rather than natsThread_Join when cleaning up own dispatcher. * PR feedback --- examples/micro-arithmetics.c | 43 +++++---- examples/micro-func.c | 45 +++++----- src/micro.c | 167 ++++++++++++++++++++++++++--------- src/micro_endpoint.c | 49 +++++++--- src/micro_monitoring.c | 10 ++- src/microp.h | 10 ++- src/nats.h | 82 +++++++++++++++-- src/sub.c | 8 +- test/list_test.txt | 3 +- test/test.c | 127 ++++++++++++++++++++++++-- 10 files changed, 420 insertions(+), 124 deletions(-) diff --git a/examples/micro-arithmetics.c b/examples/micro-arithmetics.c index c3e8f5ee4..6bc94522a 100644 --- a/examples/micro-arithmetics.c +++ b/examples/micro-arithmetics.c @@ -88,24 +88,6 @@ int main(int argc, char **argv) microGroup *g = NULL; char errorbuf[1024]; - microServiceConfig cfg = { - .Description = "Arithmetic operations - NATS microservice example in C", - .Name = "c-arithmetics", - .Version = "1.0.0", - }; - microEndpointConfig add_cfg = { - .Name = "add", - .Handler = handle_add, - }; - microEndpointConfig divide_cfg = { - .Name = "divide", - .Handler = handle_divide, - }; - microEndpointConfig multiply_cfg = { - .Name = "multiply", - .Handler = handle_multiply, - }; - // Connect to NATS server opts = parseArgs(argc, argv, ""); s = natsConnection_Connect(&conn, opts); @@ -118,17 +100,34 @@ int main(int argc, char **argv) } // Create the Microservice that listens on nc. + microServiceConfig cfg = { + .Description = "Arithmetic operations - NATS microservice example in C", + .Name = "c-arithmetics", + .Version = "1.0.0", + }; err = micro_AddService(&m, conn, &cfg); // Add the endpoints for the functions. if (err == NULL) - microService_AddGroup(&g, m, "op"); + { + microGroupConfig groupConfig = { .Prefix = "op" }; + err = microService_AddGroup(&g, m, &groupConfig); + } if (err == NULL) - err = microGroup_AddEndpoint(g, &add_cfg); + { + microEndpointConfig addConfig = { .Name = "add", .Handler = handle_add }; + err = microGroup_AddEndpoint(g, &addConfig); + } if (err == NULL) - err = microGroup_AddEndpoint(g, &multiply_cfg); + { + microEndpointConfig multiplyConfig = { .Name = "multiply", .Handler = handle_multiply }; + err = microGroup_AddEndpoint(g, &multiplyConfig); + } if (err == NULL) - err = microGroup_AddEndpoint(g, ÷_cfg); + { + microEndpointConfig divideConfig = { .Name = "divide", .Handler = handle_divide }; + err = microGroup_AddEndpoint(g, ÷Config); + } // Run the service, until stopped. if (err == NULL) diff --git a/examples/micro-func.c b/examples/micro-func.c index dfac1dad6..97b899999 100644 --- a/examples/micro-func.c +++ b/examples/micro-func.c @@ -170,24 +170,6 @@ int main(int argc, char **argv) microGroup *g = NULL; char errorbuf[1024]; - microServiceConfig cfg = { - .Description = "Functions - NATS microservice example in C", - .Name = "c-functions", - .Version = "1.0.0", - }; - microEndpointConfig factorial_cfg = { - .Name = "factorial", - .Handler = handle_factorial, - }; - microEndpointConfig fibonacci_cfg = { - .Name = "fibonacci", - .Handler = handle_fibonacci, - }; - microEndpointConfig power2_cfg = { - .Name = "power2", - .Handler = handle_power2, - }; - // Connect to NATS server opts = parseArgs(argc, argv, ""); s = natsConnection_Connect(&conn, opts); @@ -200,17 +182,34 @@ int main(int argc, char **argv) } // Create the Microservice that listens on nc. - err = micro_AddService(&m, conn, &cfg); + microServiceConfig serviceConfig = { + .Description = "Functions - NATS microservice example in C", + .Name = "c-functions", + .Version = "1.0.0", + }; + err = micro_AddService(&m, conn, &serviceConfig); // Add the endpoints for the functions. if (err == NULL) - err = microService_AddGroup(&g, m, "f"); + { + microGroupConfig groupConfig = { .Prefix = "f" }; + err = microService_AddGroup(&g, m, &groupConfig); + } if (err == NULL) - err = microGroup_AddEndpoint(g, &factorial_cfg); + { + microEndpointConfig factorialConfig = { .Name = "factorial", .Handler = handle_factorial }; + err = microGroup_AddEndpoint(g, &factorialConfig); + } if (err == NULL) - err = microGroup_AddEndpoint(g, &fibonacci_cfg); + { + microEndpointConfig fibonacciConfig = { .Name = "fibonacci", .Handler = handle_fibonacci }; + err = microGroup_AddEndpoint(g, &fibonacciConfig); + } if (err == NULL) - err = microGroup_AddEndpoint(g, &power2_cfg); + { + microEndpointConfig power2Config = { .Name = "power2", .Handler = handle_power2 }; + err = microGroup_AddEndpoint(g, &power2Config); + } // Run the service, until stopped. if (err == NULL) diff --git a/src/micro.c b/src/micro.c index 86a572739..354824184 100644 --- a/src/micro.c +++ b/src/micro.c @@ -39,6 +39,8 @@ micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *c if ((new_m == NULL) || (nc == NULL) || (cfg == NULL) || !micro_is_valid_name(cfg->Name) || nats_IsStringEmpty(cfg->Version)) return micro_ErrorInvalidArg; + if ((cfg->QueueGroup != NULL) && nats_IsStringEmpty(cfg->QueueGroup)) + return micro_ErrorInvalidArg; // Make a microservice object, with a reference to a natsConnection. err = _new_service(&m, nc); @@ -68,7 +70,7 @@ micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *c } microError * -micro_add_endpoint(microEndpoint **new_ep, microService *m, const char *prefix, microEndpointConfig *cfg, bool is_internal) +micro_add_endpoint(microEndpoint **new_ep, microService *m, microGroup *g, microEndpointConfig *cfg, bool is_internal) { microError *err = NULL; microEndpoint *ptr = NULL; @@ -81,7 +83,7 @@ micro_add_endpoint(microEndpoint **new_ep, microService *m, const char *prefix, if (cfg == NULL) return NULL; - err = micro_new_endpoint(&ep, m, prefix, cfg, is_internal); + err = micro_new_endpoint(&ep, m, g, cfg, is_internal); if (err != NULL) return microError_Wrapf(err, "failed to create endpoint %s", cfg->Name); @@ -129,9 +131,9 @@ micro_add_endpoint(microEndpoint **new_ep, microService *m, const char *prefix, if (prev_ep != NULL) { - // Rid of the previous endpoint with the same name, if any. If this + // Rid of the previous endpoint with the same subject, if any. If this // fails we can return the error, leave the newly added endpoint in the - // list, not started. A retry with the same name will clean it up. + // list, not started. A retry with the same subject will clean it up. if (err = micro_stop_endpoint(prev_ep), err != NULL) return err; micro_release_endpoint(prev_ep); @@ -165,7 +167,7 @@ microGroup_AddEndpoint(microGroup *g, microEndpointConfig *cfg) if (g == NULL) return micro_ErrorInvalidArg; - return micro_add_endpoint(NULL, g->m, g->prefix, cfg, false); + return micro_add_endpoint(NULL, g->m, g, cfg, false); } microError * @@ -406,23 +408,44 @@ _release_service(microService *m) _free_service(m); } -static void -_free_service(microService *m) +static inline void +_free_cloned_group_config(microGroupConfig *cfg) { - microGroup *next = NULL; + if (cfg == NULL) + return; + + // the strings are declared const for the public, but in a clone these need + // to be freed. + NATS_FREE((char *)cfg->Prefix); + NATS_FREE((char *)cfg->QueueGroup); + NATS_FREE(cfg); +} +static inline void +_free_group(microGroup *g) +{ + if (g == NULL) + return; + + _free_cloned_group_config(g->config); + NATS_FREE(g); +} + +static inline void +_free_service(microService *m) +{ if (m == NULL) return; // destroy all groups. if (m->groups != NULL) { - microGroup *g = m->groups; - while (g != NULL) + microGroup *next = NULL; + microGroup *g; + for (g = m->groups; g != NULL; g = next) { next = g->next; - NATS_FREE(g); - g = next; + _free_group(g); } } @@ -445,7 +468,7 @@ _clone_service_config(microServiceConfig **out, microServiceConfig *cfg) microError *err = NULL; microServiceConfig *new_cfg = NULL; - if (out == NULL || cfg == NULL) + if ((out == NULL) || (cfg == NULL)) return micro_ErrorInvalidArg; err = _new_service_config(&new_cfg); @@ -458,6 +481,7 @@ _clone_service_config(microServiceConfig **out, microServiceConfig *cfg) MICRO_CALL(err, micro_strdup((char **)&new_cfg->Name, cfg->Name)); MICRO_CALL(err, micro_strdup((char **)&new_cfg->Version, cfg->Version)); MICRO_CALL(err, micro_strdup((char **)&new_cfg->Description, cfg->Description)); + MICRO_CALL(err, micro_strdup((char **)&new_cfg->QueueGroup, cfg->QueueGroup)); MICRO_CALL(err, micro_ErrorFromStatus( nats_cloneMetadata(&new_cfg->Metadata, cfg->Metadata))); MICRO_CALL(err, micro_clone_endpoint_config(&new_cfg->Endpoint, cfg->Endpoint)); @@ -482,6 +506,7 @@ _free_cloned_service_config(microServiceConfig *cfg) NATS_FREE((char *)cfg->Name); NATS_FREE((char *)cfg->Version); NATS_FREE((char *)cfg->Description); + NATS_FREE((char *)cfg->QueueGroup); nats_freeMetadata(&cfg->Metadata); micro_free_cloned_endpoint_config(cfg->Endpoint); NATS_FREE(cfg); @@ -663,20 +688,86 @@ _wrap_connection_event_callbacks(microService *m) return microError_Wrapf(err, "failed to wrap connection event callbacks"); } -microError * -microService_AddGroup(microGroup **new_group, microService *m, const char *prefix) +static inline microError * +_new_group_config(microGroupConfig **ptr) { - if ((m == NULL) || (new_group == NULL) || (prefix == NULL)) + *ptr = NATS_CALLOC(1, sizeof(microGroupConfig)); + return (*ptr == NULL) ? micro_ErrorOutOfMemory : NULL; +} + +static inline microError * +_clone_group_config(microGroupConfig **out, microGroupConfig *in, microGroup *parent) +{ + microError *err = NULL; + microGroupConfig *new_cfg = NULL; + + if ((out == NULL) || (in == NULL)) return micro_ErrorInvalidArg; - *new_group = NATS_CALLOC(1, sizeof(microGroup) + - strlen(prefix) + 1); // "prefix\0" - if (new_group == NULL) + err = _new_group_config(&new_cfg); + if (err == NULL) + { + memcpy(new_cfg, in, sizeof(microGroupConfig)); + } + + // If the queue group is not explicitly set, copy from the parent. + if (err == NULL) { + if (in->NoQueueGroup) + new_cfg->QueueGroup = NULL; + else if (!nats_IsStringEmpty(in->QueueGroup)) + err = micro_strdup((char **)&new_cfg->QueueGroup, in->QueueGroup); + else if (parent != NULL) + { + new_cfg->NoQueueGroup = parent->config->NoQueueGroup; + err = micro_strdup((char **)&new_cfg->QueueGroup, parent->config->QueueGroup); + } + } + + // prefix = parent_prefix.prefix + if (err == NULL) + { + size_t prefixSize = strlen(in->Prefix) + 1; + if (parent != NULL) + prefixSize += strlen(parent->config->Prefix) + 1; + new_cfg->Prefix = NATS_CALLOC(1, prefixSize); + if (new_cfg->Prefix != NULL) + { + if (parent != NULL) + snprintf((char *)new_cfg->Prefix, prefixSize, "%s.%s", parent->config->Prefix, in->Prefix); + else + memcpy((char *)new_cfg->Prefix, in->Prefix, prefixSize); + } + else + err = micro_ErrorOutOfMemory; + } + + if (err != NULL) + { + _free_cloned_group_config(new_cfg); + return err; + } + + *out = new_cfg; + return NULL; +} + +static inline microError * +_add_group(microGroup **new_group, microService *m, microGroup *parent, microGroupConfig *config) +{ + + *new_group = NATS_CALLOC(1, sizeof(microGroup)); + if (new_group == NULL) return micro_ErrorOutOfMemory; + + microError *err = NULL; + err = _clone_group_config(&(*new_group)->config, config, parent); + if (err != NULL) + { + NATS_FREE(*new_group); + return err; } - memcpy((*new_group)->prefix, prefix, strlen(prefix) + 1); (*new_group)->m = m; (*new_group)->next = m->groups; m->groups = *new_group; @@ -685,33 +776,21 @@ microService_AddGroup(microGroup **new_group, microService *m, const char *prefi } microError * -microGroup_AddGroup(microGroup **new_group, microGroup *parent, const char *prefix) +microService_AddGroup(microGroup **new_group, microService *m, microGroupConfig *config) { - char *p; - size_t len; - - if ((parent == NULL) || (new_group == NULL) || (prefix == NULL)) + if ((m == NULL) || (new_group == NULL) || (config == NULL) || nats_IsStringEmpty(config->Prefix)) return micro_ErrorInvalidArg; - *new_group = NATS_CALLOC(1, sizeof(microGroup) + - strlen(parent->prefix) + 1 + // "parent_prefix." - strlen(prefix) + 1); // "prefix\0" - if (new_group == NULL) - { - return micro_ErrorOutOfMemory; - } + return _add_group(new_group, m, NULL, config); +} - p = (*new_group)->prefix; - len = strlen(parent->prefix); - memcpy(p, parent->prefix, len); - p[len] = '.'; - p += len + 1; - memcpy(p, prefix, strlen(prefix) + 1); - (*new_group)->m = parent->m; - (*new_group)->next = parent->m->groups; - parent->m->groups = *new_group; +microError * +microGroup_AddGroup(microGroup **new_group, microGroup *parent, microGroupConfig *config) +{ + if ((parent == NULL) || (new_group == NULL) || (config == NULL) || nats_IsStringEmpty(config->Prefix)) + return micro_ErrorInvalidArg; - return NULL; + return _add_group(new_group, parent->m, parent, config); } natsConnection * @@ -771,6 +850,7 @@ microService_GetInfo(microServiceInfo **new_info, microService *m) { MICRO_CALL(err, micro_strdup((char **)&info->Endpoints[len].Name, ep->name)); MICRO_CALL(err, micro_strdup((char **)&info->Endpoints[len].Subject, ep->subject)); + MICRO_CALL(err, micro_strdup((char **)&info->Endpoints[len].QueueGroup, micro_queue_group_for_endpoint(ep))); MICRO_CALL(err, micro_ErrorFromStatus( nats_cloneMetadata(&info->Endpoints[len].Metadata, ep->config->Metadata))); if (err == NULL) @@ -805,6 +885,7 @@ void microServiceInfo_Destroy(microServiceInfo *info) { NATS_FREE((char *)info->Endpoints[i].Name); NATS_FREE((char *)info->Endpoints[i].Subject); + NATS_FREE((char *)info->Endpoints[i].QueueGroup); nats_freeMetadata(&info->Endpoints[i].Metadata); } NATS_FREE((char *)info->Endpoints); @@ -868,6 +949,7 @@ microService_GetStats(microServiceStats **new_stats, microService *m) MICRO_CALL(err, micro_strdup((char **)&stats->Endpoints[len].Name, ep->name)); MICRO_CALL(err, micro_strdup((char **)&stats->Endpoints[len].Subject, ep->subject)); + MICRO_CALL(err, micro_strdup((char **)&stats->Endpoints[len].QueueGroup, micro_queue_group_for_endpoint(ep))); if (err == NULL) { avg = (long double)ep->stats.ProcessingTimeSeconds * 1000000000.0 + (long double)ep->stats.ProcessingTimeNanoseconds; @@ -903,6 +985,7 @@ void microServiceStats_Destroy(microServiceStats *stats) { NATS_FREE((char *)stats->Endpoints[i].Name); NATS_FREE((char *)stats->Endpoints[i].Subject); + NATS_FREE((char *)stats->Endpoints[i].QueueGroup); } NATS_FREE(stats->Endpoints); NATS_FREE((char *)stats->Name); diff --git a/src/micro_endpoint.c b/src/micro_endpoint.c index c5149e57a..efe72f41d 100644 --- a/src/micro_endpoint.c +++ b/src/micro_endpoint.c @@ -16,7 +16,7 @@ #include "microp.h" #include "util.h" -static microError *_dup_with_prefix(char **dst, const char *prefix, const char *src); +static microError *_subjectWithGroupPrefix(char **dst, microGroup *g, const char *src); static void _handle_request(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure); @@ -24,7 +24,7 @@ static void _retain_endpoint(microEndpoint *ep, bool lock); static void _release_endpoint(microEndpoint *ep); microError * -micro_new_endpoint(microEndpoint **new_ep, microService *m, const char *prefix, microEndpointConfig *cfg, bool is_internal) +micro_new_endpoint(microEndpoint **new_ep, microService *m, microGroup *g, microEndpointConfig *cfg, bool is_internal) { microError *err = NULL; microEndpoint *ep = NULL; @@ -51,17 +51,42 @@ micro_new_endpoint(microEndpoint **new_ep, microService *m, const char *prefix, MICRO_CALL(err, micro_ErrorFromStatus(natsMutex_Create(&ep->endpoint_mu))); MICRO_CALL(err, micro_clone_endpoint_config(&ep->config, cfg)); MICRO_CALL(err, micro_strdup(&ep->name, cfg->Name)); - MICRO_CALL(err, _dup_with_prefix(&ep->subject, prefix, subj)); + MICRO_CALL(err, _subjectWithGroupPrefix(&ep->subject, g, subj)); if (err != NULL) { micro_free_endpoint(ep); return err; } + ep->group = g; *new_ep = ep; return NULL; } +const char * +micro_queue_group_for_endpoint(microEndpoint *ep) +{ + if (ep->config->NoQueueGroup) + return NULL; + else if (!nats_IsStringEmpty(ep->config->QueueGroup)) + return ep->config->QueueGroup; + + if (ep->group != NULL) + { + if(ep->group->config->NoQueueGroup) + return NULL; + else if (!nats_IsStringEmpty(ep->group->config->QueueGroup)) + return ep->group->config->QueueGroup; + } + + if (ep->m->cfg->NoQueueGroup) + return NULL; + else if(!nats_IsStringEmpty(ep->m->cfg->QueueGroup)) + return ep->m->cfg->QueueGroup; + + return MICRO_DEFAULT_QUEUE_GROUP; +} + microError * micro_start_endpoint(microEndpoint *ep) { @@ -75,10 +100,11 @@ micro_start_endpoint(microEndpoint *ep) // reset the stats. memset(&ep->stats, 0, sizeof(ep->stats)); - if (ep->is_monitoring_endpoint) + const char *queueGroup = micro_queue_group_for_endpoint(ep); + if (ep->is_monitoring_endpoint || (queueGroup == NULL)) s = natsConnection_Subscribe(&sub, ep->m->nc, ep->subject, _handle_request, ep); else - s = natsConnection_QueueSubscribe(&sub, ep->m->nc, ep->subject, MICRO_QUEUE_GROUP, _handle_request, ep); + s = natsConnection_QueueSubscribe(&sub, ep->m->nc, ep->subject, queueGroup, _handle_request, ep); if (s == NATS_OK) { @@ -324,6 +350,7 @@ micro_clone_endpoint_config(microEndpointConfig **out, microEndpointConfig *cfg) MICRO_CALL(err, micro_strdup((char **)&new_cfg->Name, cfg->Name)); MICRO_CALL(err, micro_strdup((char **)&new_cfg->Subject, cfg->Subject)); + MICRO_CALL(err, micro_strdup((char **)&new_cfg->QueueGroup, cfg->QueueGroup)); MICRO_CALL(err, micro_ErrorFromStatus( nats_cloneMetadata(&new_cfg->Metadata, cfg->Metadata))); @@ -407,23 +434,23 @@ bool micro_match_endpoint_subject(const char *ep_subject, const char *actual_sub } } -static microError *_dup_with_prefix(char **dst, const char *prefix, const char *src) +static microError *_subjectWithGroupPrefix(char **dst, microGroup *g, const char *src) { size_t len = strlen(src) + 1; char *p; - if (!nats_IsStringEmpty(prefix)) - len += strlen(prefix) + 1; + if (g != NULL) + len += strlen(g->config->Prefix) + 1; *dst = NATS_CALLOC(1, len); if (*dst == NULL) return micro_ErrorOutOfMemory; p = *dst; - if (!nats_IsStringEmpty(prefix)) + if (g != NULL) { - len = strlen(prefix); - memcpy(p, prefix, len); + len = strlen(g->config->Prefix); + memcpy(p, g->config->Prefix, len); p[len] = '.'; p += len + 1; } diff --git a/src/micro_monitoring.c b/src/micro_monitoring.c index 80f1188d6..c8639392b 100644 --- a/src/micro_monitoring.c +++ b/src/micro_monitoring.c @@ -78,7 +78,7 @@ handle_info(microRequest *req) } static microError * -handle_stats_internal(microRequest *req) +handle_stats_default(microRequest *req) { microError *err = NULL; microService *m = microRequest_GetService(req); @@ -108,7 +108,7 @@ handle_stats(microRequest *req) if (m->cfg->StatsHandler != NULL) return m->cfg->StatsHandler(req); else - return handle_stats_internal(req); + return handle_stats_default(req); } static microError * @@ -188,7 +188,7 @@ add_internal_handler(microService *m, const char *verb, const char *kind, .Name = name, .Handler = handler, }; - err = micro_add_endpoint(NULL, m, "", &cfg, true); + err = micro_add_endpoint(NULL, m, NULL, &cfg, true); NATS_FREE(subj); return err; } @@ -272,6 +272,8 @@ marshal_info(natsBuffer **new_buf, microServiceInfo *info) IFOK_attr("name", info->Endpoints[i].Name, ""); IFOK(s, nats_marshalMetadata(buf, true, "metadata", info->Endpoints[i].Metadata)); IFOK(s, natsBuf_AppendByte(buf, ',')); + if (!nats_IsStringEmpty(info->Endpoints[i].QueueGroup)) + IFOK_attr("queue_group", info->Endpoints[i].QueueGroup, ","); IFOK_attr("subject", info->Endpoints[i].Subject, ""); IFOK(s, natsBuf_AppendByte(buf, '}')); // end endpoint if (i != info->EndpointsLen - 1) @@ -323,6 +325,8 @@ marshal_stats(natsBuffer **new_buf, microServiceStats *stats) IFOK(s, natsBuf_AppendByte(buf, '{')); IFOK_attr("name", ep->Name, ","); IFOK_attr("subject", ep->Subject, ","); + if (!nats_IsStringEmpty(ep->QueueGroup)) + IFOK_attr("queue_group", ep->QueueGroup, ","); IFOK(s, nats_marshalLong(buf, false, "num_requests", ep->NumRequests)); IFOK(s, nats_marshalLong(buf, true, "num_errors", ep->NumErrors)); IFOK(s, nats_marshalDuration(buf, true, "average_processing_time", ep->AverageProcessingTimeNanoseconds)); diff --git a/src/microp.h b/src/microp.h index 620d7a0e7..8184691c4 100644 --- a/src/microp.h +++ b/src/microp.h @@ -27,7 +27,7 @@ if ((__err) == NULL) \ __block; -#define MICRO_QUEUE_GROUP "q" +#define MICRO_DEFAULT_QUEUE_GROUP "q" #define MICRO_DEFAULT_ENDPOINT_NAME "default" @@ -58,6 +58,7 @@ struct micro_endpoint_s // Retained/released by the service that owns the endpoint to avoid race // conditions. microService *m; + microGroup *group; // Monitoring endpoints are different in a few ways. For now, express it as // a single flag but consider unbundling: @@ -88,9 +89,9 @@ struct micro_endpoint_s struct micro_group_s { + struct micro_group_config_s *config; struct micro_service_s *m; struct micro_group_s *next; - char prefix[]; }; struct micro_service_s @@ -138,12 +139,12 @@ struct micro_request_s microEndpoint *Endpoint; }; -microError *micro_add_endpoint(microEndpoint **new_ep, microService *m, const char *prefix, microEndpointConfig *cfg, bool is_internal); +microError *micro_add_endpoint(microEndpoint **new_ep, microService *m, microGroup *g, microEndpointConfig *cfg, bool is_internal); microError *micro_clone_endpoint_config(microEndpointConfig **out, microEndpointConfig *cfg); microError *micro_init_monitoring(microService *m); microError *micro_is_error_message(natsStatus s, natsMsg *msg); microError *micro_new_control_subject(char **newSubject, const char *verb, const char *name, const char *id); -microError *micro_new_endpoint(microEndpoint **new_ep, microService *m, const char *prefix, microEndpointConfig *cfg, bool is_internal); +microError *micro_new_endpoint(microEndpoint **new_ep, microService *m, microGroup *g, microEndpointConfig *cfg, bool is_internal); microError *micro_new_request(microRequest **new_request, microService *m, microEndpoint *ep, natsMsg *msg); microError *micro_start_endpoint(microEndpoint *ep); microError *micro_stop_endpoint(microEndpoint *ep); @@ -155,6 +156,7 @@ void micro_release_endpoint(microEndpoint *ep); void micro_release_on_endpoint_complete(void *closure); void micro_retain_endpoint(microEndpoint *ep); void micro_update_last_error(microEndpoint *ep, microError *err); +const char *micro_queue_group_for_endpoint(microEndpoint *ep); bool micro_is_valid_name(const char *name); bool micro_is_valid_subject(const char *subject); diff --git a/src/nats.h b/src/nats.h index e1961231b..3b0a30f2e 100644 --- a/src/nats.h +++ b/src/nats.h @@ -7635,6 +7635,14 @@ typedef struct micro_error_s microError; */ typedef struct micro_group_s microGroup; +/** + * @brief The Microservice endpoint *group* configuration object. + * + * @see micro_group_config_s for descriptions of the fields, + * micro_service_config_s, micro_service_config_s, microService_AddGroup + */ +typedef struct micro_group_config_s microGroupConfig; + /** * @brief a request received by a microservice endpoint. * @@ -7781,7 +7789,19 @@ struct micro_endpoint_config_s const char *Subject; /** - * @briefMetadata for the endpoint, a JSON-encoded user-provided object, + * @brief Overrides the default queue group for the service. + * + */ + const char *QueueGroup; + + /** + * @brief Disables the use of a queue group for the service. + * + */ + bool NoQueueGroup; + + /** + * @brief Metadata for the endpoint, a JSON-encoded user-provided object, * e.g. `{"key":"value"}` */ natsMetadata Metadata; @@ -7813,6 +7833,12 @@ struct micro_endpoint_info_s */ const char *Subject; + /** + * @brief Endpoint's actual queue group (the default "q", or one explicitly + * set by the user), or omitted if NoQueueGroup was applied. + */ + const char *QueueGroup; + /** * @briefMetadata for the endpoint, a JSON-encoded user-provided object, * e.g. `{"key":"value"}` @@ -7828,6 +7854,12 @@ struct micro_endpoint_stats_s const char *Name; const char *Subject; + /** + * @brief Endpoint's actual queue group (the default "q", or one explicitly + * set by the user), or omitted if NoQueueGroup was applied. + */ + const char *QueueGroup; + /** * @brief The number of requests received by the endpoint. */ @@ -7860,6 +7892,29 @@ struct micro_endpoint_stats_s char LastErrorString[2048]; }; +/** + * #brief The Microservice endpoint *group* configuration object. + */ +struct micro_group_config_s +{ + /** + * @brief The subject prefix for the group. + */ + const char *Prefix; + + /** + * @brief Overrides the default queue group for the service. + * + */ + const char *QueueGroup; + + /** + * @brief Disables the use of a queue group for the service. + * + */ + bool NoQueueGroup; +}; + /** * @brief The Microservice top-level configuration object. * @@ -7886,7 +7941,20 @@ struct micro_service_config_s const char *Description; /** - * @brief Metadata for the service, a JSON-encoded user-provided object, e.g. `{"key":"value"}` + * @brief Overrides the default queue group for the service ("q"). + * + */ + const char *QueueGroup; + + /** + * @brief Disables the use of a queue group for the service. + * + */ + bool NoQueueGroup; + + /** + * @brief Immutable metadata for the service, a JSON-encoded user-provided + * object, e.g. `{"key":"value"}` */ natsMetadata Metadata; @@ -8147,15 +8215,14 @@ microService_AddEndpoint(microService *m, microEndpointConfig *config); * @param new_group the location where to store the pointer to the new * #microGroup object. * @param m the #microService that the group will be added to. - * @param prefix a prefix to use on names and subjects of all endpoints in the - * group. + * @param config group parameters. * * @return a #microError if an error occurred. * * @see #microGroup_AddGroup, #microGroup_AddEndpoint */ NATS_EXTERN microError * -microService_AddGroup(microGroup **new_group, microService *m, const char *prefix); +microService_AddGroup(microGroup **new_group, microService *m, microGroupConfig *config); /** @brief Destroys a microservice, stopping it first if needed. * @@ -8283,15 +8350,14 @@ NATS_EXTERN microError *microService_Stop(microService *m); * @param new_group the location where to store the pointer to the new * #microGroup object. * @param parent the #microGroup that the new group will be added to. - * @param prefix a prefix to use on names and subjects of all endpoints in the - * group. + * @param config group parameters. * * @return a #microError if an error occurred. * * @see #microGroup_AddGroup, #microGroup_AddEndpoint */ NATS_EXTERN microError * -microGroup_AddGroup(microGroup **new_group, microGroup *parent, const char *prefix); +microGroup_AddGroup(microGroup **new_group, microGroup *parent, microGroupConfig *config); /** @brief Adds an endpoint to a #microGroup and starts listening for messages. * diff --git a/src/sub.c b/src/sub.c index d5ffe33e9..925c27a10 100644 --- a/src/sub.c +++ b/src/sub.c @@ -89,13 +89,14 @@ _initOwnDispatcher(natsSubscription *sub) return NATS_UPDATE_ERR_STACK(s); } -static inline void _cleanupOwnDispatcher(natsSubscription *sub) +static inline void +_cleanupOwnDispatcher(natsSubscription *sub) { nats_destroyQueuedMessages(&sub->ownDispatcher.queue); if (sub->ownDispatcher.thread != NULL) { - natsThread_Join(sub->ownDispatcher.thread); + natsThread_Detach(sub->ownDispatcher.thread); natsThread_Destroy(sub->ownDispatcher.thread); sub->ownDispatcher.thread = NULL; } @@ -103,7 +104,8 @@ static inline void _cleanupOwnDispatcher(natsSubscription *sub) natsCondition_Destroy(sub->ownDispatcher.cond); } -void _freeSub(natsSubscription *sub) +static inline void +_freeSub(natsSubscription *sub) { if (sub == NULL) return; diff --git a/test/list_test.txt b/test/list_test.txt index 04e9e930d..a7a2218a1 100644 --- a/test/list_test.txt +++ b/test/list_test.txt @@ -139,6 +139,7 @@ _test(MicroAsyncErrorHandlerMaxPendingMsgs) _test(MicroBasics) _test(MicroGroups) _test(MicroMatchEndpointSubject) +_test(MicroQueueGroupForEndpoint) _test(MicroServiceStopsOnClosedConn) _test(MicroServiceStopsWhenServerStops) _test(MicroStartStop) @@ -254,10 +255,10 @@ _test(SSLCertAndKeyFromMemory) _test(SSLCiphers) _test(SSLConnectVerboseOption) _test(SSLHandshakeFirst) -_test(SSLServerNameIndication) _test(SSLLoadCAFromMemory) _test(SSLMultithreads) _test(SSLReconnectWithAuthError) +_test(SSLServerNameIndication) _test(SSLSkipServerVerification) _test(SSLSocketLeakWithEventLoop) _test(SSLVerify) diff --git a/test/test.c b/test/test.c index d2bd91666..770eb8b22 100644 --- a/test/test.c +++ b/test/test.c @@ -33876,10 +33876,6 @@ void test_MicroGroups(void) microServiceInfo *info = NULL; int i; - microEndpointConfig ep1_cfg = { - .Name = "ep1", - .Handler = _microHandleRequest42, - }; microEndpointConfig ep2_cfg = { .Name = "ep2", .Handler = _microHandleRequest42, @@ -33917,16 +33913,19 @@ void test_MicroGroups(void) _startMicroservice(&m, nc, &cfg, NULL, 0, &arg); test("AddEndpoint 1 to service: "); + microEndpointConfig ep1_cfg = { .Name = "ep1", .Handler = _microHandleRequest42 }; testCond(NULL == microService_AddEndpoint(m, &ep1_cfg)); test("AddGroup g1: "); - testCond(NULL == microService_AddGroup(&g1, m, "g1")); + microGroupConfig g1_cfg = { .Prefix = "g1" }; + testCond(NULL == microService_AddGroup(&g1, m, &g1_cfg)); test("AddEndpoint 1 to g1: "); testCond(NULL == microGroup_AddEndpoint(g1, &ep1_cfg)); test("Add sub-Group g2: "); - testCond(NULL == microGroup_AddGroup(&g2, g1, "g2")); + microGroupConfig g2_cfg = { .Prefix = "g2" }; + testCond(NULL == microGroup_AddGroup(&g2, g1, &g2_cfg)); test("AddEndpoint 1 to g2: "); testCond(NULL == microGroup_AddEndpoint(g2, &ep1_cfg)); @@ -33969,6 +33968,118 @@ void test_MicroGroups(void) _stopServer(serverPid); } +void test_MicroQueueGroupForEndpoint(void) +{ + microError *err = NULL; + natsPid serverPid = NATS_INVALID_PID; + natsOptions *opts = NULL; + natsConnection *nc = NULL; + + microServiceConfig serviceConfig = { .Name = "testService", .Version = "1.0.0" }; + microGroupConfig group1Config = { .Prefix = "testGroup1" }; + microGroupConfig group2Config = { .Prefix = "testGroup2" }; + microEndpointConfig epConfig = { .Name = "testEP", .Handler = _microHandleRequest42 }; + + typedef struct { + const char *name; + + bool serviceNoQueueGroup; + const char *serviceQueueGroup; + bool group1NoQueueGroup; + const char *group1QueueGroup; + bool group2NoQueueGroup; + const char *group2QueueGroup; + bool epNoQueueGroup; + const char *epQueueGroup; + + const char *expectedServiceLevel; + const char *expectedGroup1Level; + const char *expectedGroup2Level; + } TC; + TC tcs[] = { + {.name="default", + .expectedServiceLevel=MICRO_DEFAULT_QUEUE_GROUP, .expectedGroup1Level=MICRO_DEFAULT_QUEUE_GROUP, .expectedGroup2Level=MICRO_DEFAULT_QUEUE_GROUP}, + {.name="service value override", .serviceQueueGroup="test", + .expectedServiceLevel="test", .expectedGroup1Level="test", .expectedGroup2Level="test"}, + {.name="G1 value override", .group1QueueGroup="G1", + .expectedServiceLevel=MICRO_DEFAULT_QUEUE_GROUP, .expectedGroup1Level="G1", .expectedGroup2Level="G1"}, + {.name="service and G1 value overrides", .serviceQueueGroup="S", .group1QueueGroup="G1", + .expectedServiceLevel="S", .expectedGroup1Level="G1", .expectedGroup2Level="G1"}, + {.name="service and G2 value overrides", .serviceQueueGroup="S", .group2QueueGroup="G2", + .expectedServiceLevel="S", .expectedGroup1Level="S", .expectedGroup2Level="G2"}, + {.name="disabled", .serviceNoQueueGroup=true, + .expectedServiceLevel=NULL, .expectedGroup1Level=NULL, .expectedGroup2Level=NULL}, + {.name="disabled for S, set for G1", .serviceNoQueueGroup=true, .group1QueueGroup="G1", + .expectedServiceLevel=NULL, .expectedGroup1Level="G1", .expectedGroup2Level="G1"}, + {.name="disabled for G1", .group1NoQueueGroup=true, + .expectedServiceLevel=MICRO_DEFAULT_QUEUE_GROUP, .expectedGroup1Level=NULL, .expectedGroup2Level=NULL}, + {.name="disabled for G1, set for G2", .group1NoQueueGroup=true, .group2QueueGroup="G2", + .expectedServiceLevel=MICRO_DEFAULT_QUEUE_GROUP, .expectedGroup1Level=NULL, .expectedGroup2Level="G2"}, + }; + + serverPid = _startServer("nats://127.0.0.1:4222", NULL, true); + CHECK_SERVER_STARTED(serverPid); + + test("Connect to server: "); + testCond(NATS_OK == natsConnection_Connect(&nc, opts)); + + for (int i = 0; i < (int)(sizeof(tcs) / sizeof(tcs[0])); i++) + { + TC tc = tcs[i]; + struct threadArg arg; + microService *service = NULL; + microGroup *group1 = NULL, *group2 = NULL; + microServiceInfo *info = NULL; + microServiceStats *stats = NULL; + + if(_createDefaultThreadArgsForCbTests(&arg) != NATS_OK) + FAIL("Unable to setup test args"); + + testf("%s: ", tc.name); + + serviceConfig.NoQueueGroup = tc.serviceNoQueueGroup; + serviceConfig.QueueGroup = tc.serviceQueueGroup; + epConfig.NoQueueGroup = tc.epNoQueueGroup; + epConfig.QueueGroup = tc.epQueueGroup; + group1Config.NoQueueGroup = tc.group1NoQueueGroup; + group1Config.QueueGroup = tc.group1QueueGroup; + group2Config.NoQueueGroup = tc.group2NoQueueGroup; + group2Config.QueueGroup = tc.group2QueueGroup; + + err = _startMicroservice(&service, nc, &serviceConfig, NULL, 0, &arg); + MICRO_CALL(err, microService_AddEndpoint(service, &epConfig)); + MICRO_CALL(err, microService_AddGroup(&group1, service, &group1Config)); + MICRO_CALL(err, microGroup_AddEndpoint(group1, &epConfig)); + MICRO_CALL(err, microGroup_AddGroup(&group2, group1, &group2Config)); + MICRO_CALL(err, microGroup_AddEndpoint(group2, &epConfig)); + MICRO_CALL(err, microService_GetInfo(&info, service)); + MICRO_CALL(err, microService_GetStats(&stats, service)); + +#define _testQueueGroup(_expected, _actual) \ + (_expected) == NULL ? (_actual) == NULL : strcmp((_expected), (_actual)) == 0 + + testCond((err == NULL) && + (info != NULL) && (info->EndpointsLen == 3) && + (stats != NULL) && (stats->EndpointsLen == 3) && + (_testQueueGroup(tc.expectedServiceLevel, info->Endpoints[0].QueueGroup)) && + (_testQueueGroup(tc.expectedServiceLevel, stats->Endpoints[0].QueueGroup)) && + (_testQueueGroup(tc.expectedGroup1Level, stats->Endpoints[1].QueueGroup)) && + (_testQueueGroup(tc.expectedGroup1Level, info->Endpoints[1].QueueGroup)) && + (_testQueueGroup(tc.expectedGroup2Level, info->Endpoints[2].QueueGroup)) && + (_testQueueGroup(tc.expectedGroup2Level, stats->Endpoints[2].QueueGroup))); + + microService_Destroy(service); + _waitForMicroservicesAllDone(&arg); + microServiceInfo_Destroy(info); + microServiceStats_Destroy(stats); + _destroyDefaultThreadArgs(&arg); + } + + natsConnection_Destroy(nc); + natsOptions_Destroy(opts); + _stopServer(serverPid); +} + #define NUM_MICRO_SERVICES 5 void test_MicroBasics(void) @@ -34127,6 +34238,7 @@ void test_MicroBasics(void) testCond( (NATS_OK == nats_JSONGetStrPtr(array[0], "name", &str)) && (strcmp(str, "do") == 0) && (NATS_OK == nats_JSONGetStrPtr(array[0], "subject", &str)) && (strcmp(str, "svc.do") == 0) + && (NATS_OK == nats_JSONGetStrPtr(array[0], "queue_group", &str)) && (strcmp(str, MICRO_DEFAULT_QUEUE_GROUP) == 0) && (NATS_OK == nats_JSONGetObject(array[0], "metadata", &md)) && (md == NULL) ); @@ -34135,6 +34247,7 @@ void test_MicroBasics(void) testCond( (NATS_OK == nats_JSONGetStrPtr(array[1], "name", &str)) && (strcmp(str, "unused") == 0) && (NATS_OK == nats_JSONGetStrPtr(array[1], "subject", &str)) && (strcmp(str, "svc.unused") == 0) + && (NATS_OK == nats_JSONGetStrPtr(array[0], "queue_group", &str)) && (strcmp(str, MICRO_DEFAULT_QUEUE_GROUP) == 0) && (NATS_OK == nats_JSONGetObject(array[1], "metadata", &md)) && (NATS_OK == nats_JSONGetStrPtr(md, "key1", &str)) && (strcmp(str, "value1") == 0) && (NATS_OK == nats_JSONGetStrPtr(md, "key2", &str)) && (strcmp(str, "value2") == 0) @@ -34434,8 +34547,8 @@ void test_MicroServiceStopsWhenServerStops(void) natsMutex_Lock(arg.m); while ((s != NATS_TIMEOUT) && !arg.microAllDone) s = natsCondition_TimedWait(arg.c, arg.m, 1000); - natsMutex_Unlock(arg.m); testCond(arg.microAllDone); + natsMutex_Unlock(arg.m); test("Test microservice is not running: "); testCond(microService_IsStopped(m))