From 25363e1ad1b361c06dbac353589da1ae500f5716 Mon Sep 17 00:00:00 2001 From: Igor Date: Sun, 21 Apr 2024 13:52:12 +0300 Subject: [PATCH 01/10] feat(SessionRepository): get user sessions by custom key --- .../linkwave/ws/repository/RedisChatRepository.java | 12 +++++++++++- .../linkwave/ws/repository/SessionRepository.java | 2 ++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/backend/ws-server/src/main/java/org/linkwave/ws/repository/RedisChatRepository.java b/backend/ws-server/src/main/java/org/linkwave/ws/repository/RedisChatRepository.java index f58c9549..c2ffef33 100644 --- a/backend/ws-server/src/main/java/org/linkwave/ws/repository/RedisChatRepository.java +++ b/backend/ws-server/src/main/java/org/linkwave/ws/repository/RedisChatRepository.java @@ -171,6 +171,12 @@ public Set getUserSessions(Long userId) { return members == null ? emptySet() : members; } + @Override + public Set getUserSessions(String customKey) { + final Set members = redisTemplate.opsForSet().members(customKey); + return members == null ? emptySet() : members; + } + @Override public boolean isMember(String chatId, Long userId) { return TRUE.equals( @@ -215,7 +221,11 @@ public void shareWithConsumer(String consumerId, String jsonMessage) { } private String userKey(Long userId) { - return "user:%d".formatted(userId); + return userKey(String.valueOf(userId)); + } + + private String userKey(String userId) { + return "user:%s".formatted(userId); } private String chatKey(String chatId) { diff --git a/backend/ws-server/src/main/java/org/linkwave/ws/repository/SessionRepository.java b/backend/ws-server/src/main/java/org/linkwave/ws/repository/SessionRepository.java index d7b01009..f23f6b07 100644 --- a/backend/ws-server/src/main/java/org/linkwave/ws/repository/SessionRepository.java +++ b/backend/ws-server/src/main/java/org/linkwave/ws/repository/SessionRepository.java @@ -5,6 +5,8 @@ public interface SessionRepository { Set getUserSessions(T userId); + Set getUserSessions(String customKey); + void saveSession(T userId, String sessionId); void removeSession(T userId, String sessionId); From 13935f5f90b0f900a90cea581ceb85b177b89315 Mon Sep 17 00:00:00 2001 From: Igor Date: Sun, 21 Apr 2024 13:53:05 +0300 Subject: [PATCH 02/10] feat: support repeatable broadcast annotation --- .../ws/websocket/routing/bpp/Broadcast.java | 8 ++-- .../ws/websocket/routing/bpp/Broadcasts.java | 18 +++++++++ .../bpp/WebSocketRouterBeanPostProcessor.java | 40 +++++++++++-------- 3 files changed, 44 insertions(+), 22 deletions(-) create mode 100644 backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/bpp/Broadcasts.java diff --git a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/bpp/Broadcast.java b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/bpp/Broadcast.java index 1521b08b..ce42de4c 100644 --- a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/bpp/Broadcast.java +++ b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/bpp/Broadcast.java @@ -2,19 +2,17 @@ import org.linkwave.ws.websocket.routing.broadcast.BroadcastManager; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; +import java.lang.annotation.*; /** - * This annotation is used to enable message broadcast for route handler + * This annotation is used to enable message broadcast for route handler. * * @see BroadcastManager * @see SubRoute */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) +@Repeatable(Broadcasts.class) public @interface Broadcast { /** diff --git a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/bpp/Broadcasts.java b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/bpp/Broadcasts.java new file mode 100644 index 00000000..78d95bc1 --- /dev/null +++ b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/bpp/Broadcasts.java @@ -0,0 +1,18 @@ +package org.linkwave.ws.websocket.routing.bpp; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Container for {@link Broadcast} annotations in order to + * support broadcast for different destinations. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface Broadcasts { + + Broadcast[] value() default {}; + +} diff --git a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/bpp/WebSocketRouterBeanPostProcessor.java b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/bpp/WebSocketRouterBeanPostProcessor.java index 25a7d0b1..a7153d5e 100644 --- a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/bpp/WebSocketRouterBeanPostProcessor.java +++ b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/bpp/WebSocketRouterBeanPostProcessor.java @@ -68,7 +68,7 @@ public Object postProcessAfterInitialization(@NonNull Object bean, } // check broadcast options - verifyBroadcast(method); + final boolean broadcast = verifyBroadcast(method); method.setAccessible(true); routes.put(combinedPath, new RouteComponent(entry.getValue(), method)); @@ -77,7 +77,7 @@ public Object postProcessAfterInitialization(@NonNull Object bean, sb.setLength(0); sb.append(rootPath); - log.debug("Route [{}], broadcast: {}", combinedPath, method.isAnnotationPresent(Broadcast.class)); + log.debug("Route [{}], broadcast: {}", combinedPath, broadcast); } } sb.setLength(0); @@ -87,29 +87,35 @@ public Object postProcessAfterInitialization(@NonNull Object bean, return bean; } - private void verifyBroadcast(@NonNull Method routeHandler) { - if (!routeHandler.isAnnotationPresent(Broadcast.class)) { - return; + private boolean verifyBroadcast(@NonNull Method routeHandler) { + final Broadcast[] annotations = routeHandler.getAnnotationsByType(Broadcast.class); + if (annotations.length == 0) { + return false; } if (routeHandler.getReturnType().equals(void.class)) { throw new RuntimeException( - format("Route handler \"%s\" with broadcast has return type void", routeHandler.getName()) + format( + "Route handler \"%s\" marked as broadcast has return type void", + "%s.%s".formatted(routeHandler.getDeclaringClass().getName(), routeHandler.getName()) + ) ); } - String[] keyComponents = routeHandler.getAnnotation(Broadcast.class) - .value() - .trim() - .split(BroadcastManager.KEY_SEPARATOR); - - if (keyComponents.length < 2) { - String errMsg = format( - "Broadcast annotation value incorrect format at route handler \"%s\"", - routeHandler.getName() - ); - throw new RuntimeException(errMsg); + for (Broadcast annotation : annotations) { + final String[] keyComponents = annotation.value() + .trim() + .split(BroadcastManager.KEY_SEPARATOR); + + if (keyComponents.length < 2) { + String errMsg = format( + "Broadcast annotation value incorrect format at route handler \"%s\"", + "%s.%s".formatted(routeHandler.getDeclaringClass().getName(), routeHandler.getName()) + ); + throw new RuntimeException(errMsg); + } } + return true; } private Field getRoutesMapField(@NonNull Class cls) { From 16ebf404e9e8049276bf875d6544849054c6b760 Mon Sep 17 00:00:00 2001 From: Igor Date: Sun, 21 Apr 2024 13:53:25 +0300 Subject: [PATCH 03/10] feat: implement broadcast repository resolver --- .../BroadcastRepositoryResolver.java | 21 +++++++ .../BroadcastRepositoryResolverImpl.java | 59 +++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/BroadcastRepositoryResolver.java create mode 100644 backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/BroadcastRepositoryResolverImpl.java diff --git a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/BroadcastRepositoryResolver.java b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/BroadcastRepositoryResolver.java new file mode 100644 index 00000000..e318bc1d --- /dev/null +++ b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/BroadcastRepositoryResolver.java @@ -0,0 +1,21 @@ +package org.linkwave.ws.websocket.routing.broadcast; + +import org.linkwave.ws.websocket.routing.bpp.Broadcast; + +import java.util.Set; + +/** + * Defines which sessions should be retrieved that corresponds to
+ * key-pattern set in {@link Broadcast#value()}. + */ +public interface BroadcastRepositoryResolver { + + /** + * Retrieves a set of sessions ids based on key-pattern. + * @param broadcastKeyPattern key-pattern set in {@link Broadcast#value()} + * @param resolvedKeyPattern key-pattern with resolved key variables + * @return set of sessions ids that matched the specified criteria + */ + Set resolve(String broadcastKeyPattern, String resolvedKeyPattern); + +} diff --git a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/BroadcastRepositoryResolverImpl.java b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/BroadcastRepositoryResolverImpl.java new file mode 100644 index 00000000..563e6f00 --- /dev/null +++ b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/BroadcastRepositoryResolverImpl.java @@ -0,0 +1,59 @@ +package org.linkwave.ws.websocket.routing.broadcast; + +import lombok.RequiredArgsConstructor; +import org.linkwave.ws.repository.ChatRepository; +import org.springframework.lang.NonNull; + +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; + +import static org.linkwave.ws.utils.RouteUtils.isPathVariable; +import static org.linkwave.ws.websocket.routing.broadcast.BroadcastManager.KEY_SEPARATOR; + +@RequiredArgsConstructor +public class BroadcastRepositoryResolverImpl implements BroadcastRepositoryResolver { + + private final ChatRepository chatRepository; + + /** + * If the field type seems a bit complicated, this is an example how this looks like: + *
 {@code
+     * return new BroadcastRepositoryResolverImpl(
+     *       Map.of(
+     *             "user:{}", SessionRepository::getUserSessions,
+     *             "chat:{}", ChatRepository::getSessions
+     *       )
+     * );
+     * }
+ */ + private final Map< + String, + BiFunction, String, Set> + > repositoryResolvers; + + @Override + public Set resolve(String broadcastKeyPattern, String resolvedKeyPattern) { + return repositoryResolvers + .get(eraseKey(broadcastKeyPattern)) + .apply(chatRepository, resolvedKeyPattern); + } + + /** + * Erases key variables names from passed key-pattern.
+ * Example: For key-pattern {@code "chat:{id}"} it returns {@code "chat:{}"}. + * + * @param keyPattern non-null string that contains key pattern + * @return key pattern without its variables names + */ + @NonNull + private String eraseKey(@NonNull String keyPattern) { + final String[] components = keyPattern.trim().split(KEY_SEPARATOR); + final var sb = new StringBuilder(); + for (String part : components) { + sb.append(isPathVariable(part) ? "{}" : part).append(KEY_SEPARATOR); + } + return sb.substring(0, sb.length() - 1); + } + +} From aa55cfe04fef198864ce0376a69fafedafd84d9d Mon Sep 17 00:00:00 2001 From: Igor Date: Sun, 21 Apr 2024 13:54:33 +0300 Subject: [PATCH 04/10] feat: handle repeatable annotations --- .../websocket/routing/RoutingAutoConfig.java | 24 ++++++ .../broadcast/FlexBroadcastManager.java | 81 +++++++++++++++++++ .../broadcast/SimpleBroadcastManager.java | 21 ++--- 3 files changed, 116 insertions(+), 10 deletions(-) create mode 100644 backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/FlexBroadcastManager.java diff --git a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/RoutingAutoConfig.java b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/RoutingAutoConfig.java index 5847a3a5..e4fa7715 100644 --- a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/RoutingAutoConfig.java +++ b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/RoutingAutoConfig.java @@ -1,22 +1,46 @@ package org.linkwave.ws.websocket.routing; import com.fasterxml.jackson.databind.ObjectMapper; +import org.linkwave.ws.repository.ChatRepository; +import org.linkwave.ws.repository.SessionRepository; import org.linkwave.ws.websocket.jwt.UserPrincipal; import org.linkwave.ws.websocket.routing.args.ArgumentResolverStrategy; import org.linkwave.ws.websocket.routing.args.PathVariableResolverStrategy; import org.linkwave.ws.websocket.routing.args.PayloadResolverStrategy; +import org.linkwave.ws.websocket.routing.broadcast.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.WebSocketSession; import java.security.Principal; import java.util.List; +import java.util.Map; @Configuration public class RoutingAutoConfig { public static final String PATH_PARAM_NAME = "path"; + @Bean + public BroadcastRepositoryResolver broadcastRepositoryResolver( + ChatRepository chatRepository + ) { + return new BroadcastRepositoryResolverImpl( + chatRepository, + Map.of( + "user:{}", SessionRepository::getUserSessions, + "chat:{}", ChatRepository::getSessions + ) + ); + } + + @Bean + public BroadcastManager broadcastManager(WebSocketMessageBroadcast messageBroadcast, + ChatRepository chatRepository, + BroadcastRepositoryResolver repositoryResolver) { + return new FlexBroadcastManager(messageBroadcast, chatRepository, repositoryResolver); + } + // argument resolvers registry @Bean public List argumentResolverStrategies(ObjectMapper objectMapper) { diff --git a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/FlexBroadcastManager.java b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/FlexBroadcastManager.java new file mode 100644 index 00000000..857b4db9 --- /dev/null +++ b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/FlexBroadcastManager.java @@ -0,0 +1,81 @@ +package org.linkwave.ws.websocket.routing.broadcast; + +import lombok.extern.slf4j.Slf4j; +import org.linkwave.ws.repository.ChatRepository; +import org.linkwave.ws.websocket.routing.bpp.Broadcast; +import org.springframework.lang.NonNull; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Map; +import java.util.Set; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; + +@Slf4j +public class FlexBroadcastManager extends SimpleBroadcastManager { + + private final BroadcastRepositoryResolver repositoryResolver; + + public FlexBroadcastManager(WebSocketMessageBroadcast messageBroadcast, + ChatRepository chatRepository, + BroadcastRepositoryResolver repositoryResolver) { + super(messageBroadcast, chatRepository); + this.repositoryResolver = repositoryResolver; + } + + @Override + public void process(@NonNull Method routeHandler, @NonNull Map pathVariables, + @NonNull Object message, @NonNull String serializedMessage) { + + // if it's necessary to broadcast message + if (!isBroadcast(routeHandler)) { + return; + } + + log.debug("-> process(): routeHandler=[{}.{}]", + routeHandler.getDeclaringClass().getSimpleName(), + routeHandler.getName() + ); + + Broadcast[] broadcasts = routeHandler.getAnnotationsByType(Broadcast.class); + + // remove duplicate key-patterns + if (broadcasts.length > 1) { + final Map broadcastMap = Arrays + .stream(broadcasts) + .collect(toMap(Broadcast::value, identity())); + + if (broadcastMap.size() != broadcasts.length) { + log.warn("-> process(): found duplicate key-patterns"); + broadcasts = broadcastMap.values().toArray(new Broadcast[0]); + } + } + + for (Broadcast broadcastAnn : broadcasts) { + + final String broadcastKeyPattern = broadcastAnn.value(); + final String resolveBroadcastKey = resolveKey( + broadcastKeyPattern, + pathVariables, + broadcastAnn.analyzeMessage() ? message : null + ); + + // resolve sessions ids + final Set members = repositoryResolver.resolve(broadcastKeyPattern, resolveBroadcastKey); + + if (members.isEmpty()) { + log.debug("-> process({}): sessions not found", broadcastKeyPattern); + continue; + } + + broadcast(broadcastAnn, members, serializedMessage); + } + } + + @Override + public boolean isBroadcast(@NonNull Method routeHandler) { + return routeHandler.getAnnotationsByType(Broadcast.class).length != 0; + } +} diff --git a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/SimpleBroadcastManager.java b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/SimpleBroadcastManager.java index 58620ff3..005b6abb 100644 --- a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/SimpleBroadcastManager.java +++ b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/SimpleBroadcastManager.java @@ -6,7 +6,6 @@ import org.linkwave.ws.websocket.routing.bpp.Broadcast; import org.springframework.beans.factory.annotation.Value; import org.springframework.lang.NonNull; -import org.springframework.stereotype.Component; import java.io.IOException; import java.lang.reflect.Field; @@ -20,18 +19,17 @@ import static org.springframework.util.ReflectionUtils.*; @Slf4j -@Component @RequiredArgsConstructor public class SimpleBroadcastManager implements BroadcastManager { @Value("${server.instances.value}") - private String instances; + protected String instances; @Value("${server.instances.separator}") - private String separator; + protected String separator; - private final WebSocketMessageBroadcast messageBroadcast; - private final ChatRepository chatRepository; + protected final WebSocketMessageBroadcast messageBroadcast; + protected final ChatRepository chatRepository; @Override public void process(@NonNull Method routeHandler, @NonNull Map pathVariables, @@ -60,19 +58,22 @@ public void process(@NonNull Method routeHandler, @NonNull Map p return; } + broadcast(broadcastAnn, members, serializedMessage); + } + + protected void broadcast(Broadcast broadcast, Set sessionsIds, String serializedMessage) { boolean isSharedCompletely; try { - isSharedCompletely = messageBroadcast.share(members, serializedMessage); + isSharedCompletely = messageBroadcast.share(sessionsIds, serializedMessage); } catch (IOException e) { log.error("-> process(): {}", e.getMessage()); isSharedCompletely = false; } - if (!isSharedCompletely && broadcastAnn.multiInstances()) { + if (!isSharedCompletely && broadcast.multiInstances()) { log.debug("-> process(): multi-instance broadcast is required"); for (String instanceId : instances.split(separator)) { - // here we should pass message with session id too chatRepository.shareWithConsumer(instanceId, serializedMessage); } } @@ -84,7 +85,7 @@ public boolean isBroadcast(@NonNull Method routeHandler) { } @NonNull - private String resolveKey(@NonNull String keyPattern, @NonNull Map pathVariables, Object message) { + protected String resolveKey(@NonNull String keyPattern, @NonNull Map pathVariables, Object message) { // parse broadcast value pattern final String[] components = keyPattern.trim().split(KEY_SEPARATOR); From 3fc18f37489df21f2287a3b783390f7ccc0166cf Mon Sep 17 00:00:00 2001 From: Igor Date: Sun, 21 Apr 2024 13:55:15 +0300 Subject: [PATCH 05/10] feat(GroupChatRoutes): add repeatable annotations --- .../java/org/linkwave/ws/websocket/route/GroupChatRoutes.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/route/GroupChatRoutes.java b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/route/GroupChatRoutes.java index e3a65517..82ae9e9e 100644 --- a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/route/GroupChatRoutes.java +++ b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/route/GroupChatRoutes.java @@ -112,6 +112,7 @@ public Box join(@PathVariable String id, } @Broadcast + @Broadcast(value = "user:{senderId}", analyzeMessage = true) @SubRoute("/{id}/leave") public Box leaveChat(@PathVariable String id, @NonNull UserPrincipal principal, @@ -191,6 +192,7 @@ public Box addMember(@PathVariable String id, } @Broadcast + @Broadcast("user:{memberId}") @SubRoute("/{id}/kick/{memberId}") public Box removeMember(@PathVariable String id, @PathVariable Long memberId, From 36c57a4ccad260bc0d463d9592c9a895423df2fe Mon Sep 17 00:00:00 2001 From: Igor Date: Sun, 21 Apr 2024 13:55:31 +0300 Subject: [PATCH 06/10] docs(README.MD): update docs --- backend/ws-server/README.MD | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/backend/ws-server/README.MD b/backend/ws-server/README.MD index 90b0e9ef..c19b26a1 100644 --- a/backend/ws-server/README.MD +++ b/backend/ws-server/README.MD @@ -91,10 +91,10 @@ In other case you will receive message with `ERROR` action: ```json { - "action": "ERROR", - "timestamp": 1710622280.309209600, - "error": "You are not member of chat", - "path": "/chat/group/65fca83ea94fa31e8a726f9956/leave" + "action": "LEAVE", + "timestamp": 1713443892.540455500, + "senderId": 34, + "chatId": "6621128c182be4335fc0a6e5" } ``` From ce361425d42c2df99e8595964461b7719e6ce29b Mon Sep 17 00:00:00 2001 From: Igor Date: Sun, 21 Apr 2024 18:51:31 +0300 Subject: [PATCH 07/10] feat: create tests for flex broadcast manager --- .../ws/unit/FlexBroadcastManagerTest.java | 145 ++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 backend/ws-server/src/test/java/org/linkwave/ws/unit/FlexBroadcastManagerTest.java diff --git a/backend/ws-server/src/test/java/org/linkwave/ws/unit/FlexBroadcastManagerTest.java b/backend/ws-server/src/test/java/org/linkwave/ws/unit/FlexBroadcastManagerTest.java new file mode 100644 index 00000000..89e1d96d --- /dev/null +++ b/backend/ws-server/src/test/java/org/linkwave/ws/unit/FlexBroadcastManagerTest.java @@ -0,0 +1,145 @@ +package org.linkwave.ws.unit; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.linkwave.ws.websocket.routing.bpp.Broadcast; +import org.linkwave.ws.websocket.routing.bpp.SubRoute; +import org.linkwave.ws.websocket.routing.broadcast.BroadcastRepositoryResolver; +import org.linkwave.ws.websocket.routing.broadcast.FlexBroadcastManager; +import org.linkwave.ws.websocket.routing.broadcast.WebSocketMessageBroadcast; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.web.bind.annotation.PathVariable; + +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import static java.lang.Boolean.TRUE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.platform.commons.util.ReflectionUtils.getRequiredMethod; +import static org.linkwave.ws.unit.SessionTestUtils.generateSessionIds; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +public class FlexBroadcastManagerTest { + + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Mock + private BroadcastRepositoryResolver repositoryResolver; + + @Mock + private WebSocketMessageBroadcast messageBroadcast; + + @InjectMocks + private FlexBroadcastManager broadcastManager; + + @Test + void Should_DetectRouteHandler_WhenItAnnotatedWithBroadcast() { + final Method send = getRequiredMethod(Routes.class, "send", String.class); + final Method sendWithBroadcast = getRequiredMethod(Routes.class, "sendWithBroadcast"); + + assertThat(broadcastManager.isBroadcast(send)).isTrue(); + assertThat(broadcastManager.isBroadcast(sendWithBroadcast)).isFalse(); + } + + @Test + @SneakyThrows + void Should_PerformMultipleBroadcast_When_RouteHandlerHasSeveralAnnotations() { + + final Method send = getRequiredMethod(Routes.class, "send", String.class); + final var pathVariables = Map.of("id", "xyz"); + final var message = new Message(pathVariables.get("id"), 1L); + final String serializedMessage = objectMapper.writeValueAsString(message); + + final Set firstBroadcastSessions = generateSessionIds(5L); + final Set secondBroadcastSessions = generateSessionIds(10L); + + // sessions for first broadcast + when(repositoryResolver.resolve("chat:{id}", "chat:%s".formatted(pathVariables.get("id")))) + .thenReturn(firstBroadcastSessions); + + // sessions for second broadcast + when(repositoryResolver.resolve("user:{senderId}", "user:%s".formatted(message.senderId()))) + .thenReturn(secondBroadcastSessions); + + when(messageBroadcast.share(anySet(), eq(serializedMessage))).thenReturn(TRUE); + + broadcastManager.process(send, pathVariables, message, serializedMessage); + + verify(messageBroadcast, times(1)).share(firstBroadcastSessions, serializedMessage); + verify(messageBroadcast, times(1)).share(secondBroadcastSessions, serializedMessage); + } + + @Test + @SneakyThrows + void Should_PerformSingleBroadcast_When_DuplicateKeysDetected() { + + final Method sendDuplicates = getRequiredMethod(Routes.class, "sendDuplicates", String.class); + final var pathVariables = Map.of("id", "xyz"); + final var message = new Message(pathVariables.get("id"), 1L); + final String serializedMessage = objectMapper.writeValueAsString(message); + + final Set resolvedSessions = generateSessionIds(5L); + + when(repositoryResolver.resolve("chat:{id}", "chat:%s".formatted(pathVariables.get("id")))) + .thenReturn(resolvedSessions); + + when(messageBroadcast.share(anySet(), eq(serializedMessage))).thenReturn(TRUE); + + broadcastManager.process(sendDuplicates, pathVariables, message, serializedMessage); + + verify(messageBroadcast, times(1)).share(anySet(), anyString()); + } + + @Test + @SneakyThrows + void Should_SkipBroadcast_When_SessionsNotFound() { + final Method send = getRequiredMethod(Routes.class, "send", String.class); + final var pathVariables = Map.of("id", "xyz"); + final var message = new Message(pathVariables.get("id"), 1L); + final String serializedMessage = objectMapper.writeValueAsString(message); + + // sessions for first broadcast + when(repositoryResolver.resolve("chat:{id}", "chat:%s".formatted(pathVariables.get("id")))) + .thenReturn(Collections.emptySet()); + + // sessions for second broadcast + when(repositoryResolver.resolve("user:{senderId}", "user:%s".formatted(message.senderId()))) + .thenReturn(Collections.emptySet()); + + broadcastManager.process(send, pathVariables, message, serializedMessage); + + verify(messageBroadcast, never()).share(any(), any()); + } + + static class Routes { + + @Broadcast + @Broadcast(value = "user:{senderId}", analyzeMessage = true) + @SubRoute("/chat/{id}") + Message send(@PathVariable String id) { + return new Message(id, 1L); + } + + @Broadcast + @Broadcast + @SubRoute("/chat/{id}") + Message sendDuplicates(@PathVariable String id) { + return new Message(id, 1L); + } + + void sendWithBroadcast() { + + } + + } + + record Message(String id, Long senderId) {} + +} From 3eb3fcb8193b7f9bb3b821c497905745a78580b6 Mon Sep 17 00:00:00 2001 From: Igor Date: Sun, 21 Apr 2024 18:52:16 +0300 Subject: [PATCH 08/10] fix(FlexBroadcastManager): merge duplicates --- .../ws/websocket/routing/broadcast/FlexBroadcastManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/FlexBroadcastManager.java b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/FlexBroadcastManager.java index 857b4db9..d6be678f 100644 --- a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/FlexBroadcastManager.java +++ b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/FlexBroadcastManager.java @@ -41,11 +41,11 @@ public void process(@NonNull Method routeHandler, @NonNull Map p Broadcast[] broadcasts = routeHandler.getAnnotationsByType(Broadcast.class); - // remove duplicate key-patterns if (broadcasts.length > 1) { + // remove duplicate key-patterns final Map broadcastMap = Arrays .stream(broadcasts) - .collect(toMap(Broadcast::value, identity())); + .collect(toMap(Broadcast::value, identity(), (b1, b2) -> b1)); if (broadcastMap.size() != broadcasts.length) { log.warn("-> process(): found duplicate key-patterns"); From 65dcd993446e515e5b8a2f9a8f555a26fac59b33 Mon Sep 17 00:00:00 2001 From: Igor Date: Mon, 22 Apr 2024 13:06:52 +0300 Subject: [PATCH 09/10] feat(BroadcastRepositoryResolver): create tests --- .../routing/broadcast/BroadcastRepositoryResolver.java | 3 ++- .../broadcast/BroadcastRepositoryResolverImpl.java | 10 +++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/BroadcastRepositoryResolver.java b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/BroadcastRepositoryResolver.java index e318bc1d..e8340db2 100644 --- a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/BroadcastRepositoryResolver.java +++ b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/BroadcastRepositoryResolver.java @@ -1,6 +1,7 @@ package org.linkwave.ws.websocket.routing.broadcast; import org.linkwave.ws.websocket.routing.bpp.Broadcast; +import org.springframework.lang.NonNull; import java.util.Set; @@ -16,6 +17,6 @@ public interface BroadcastRepositoryResolver { * @param resolvedKeyPattern key-pattern with resolved key variables * @return set of sessions ids that matched the specified criteria */ - Set resolve(String broadcastKeyPattern, String resolvedKeyPattern); + Set resolve(@NonNull String broadcastKeyPattern, @NonNull String resolvedKeyPattern); } diff --git a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/BroadcastRepositoryResolverImpl.java b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/BroadcastRepositoryResolverImpl.java index 563e6f00..79af70f7 100644 --- a/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/BroadcastRepositoryResolverImpl.java +++ b/backend/ws-server/src/main/java/org/linkwave/ws/websocket/routing/broadcast/BroadcastRepositoryResolverImpl.java @@ -5,6 +5,7 @@ import org.springframework.lang.NonNull; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.function.BiFunction; @@ -33,7 +34,14 @@ public class BroadcastRepositoryResolverImpl implements BroadcastRepositoryResol > repositoryResolvers; @Override - public Set resolve(String broadcastKeyPattern, String resolvedKeyPattern) { + public Set resolve(@NonNull String broadcastKeyPattern, @NonNull String resolvedKeyPattern) { + Objects.requireNonNull(broadcastKeyPattern); + Objects.requireNonNull(resolvedKeyPattern); + + if (broadcastKeyPattern.equals(resolvedKeyPattern)) { + throw new IllegalArgumentException("Invalid broadcast keys"); + } + return repositoryResolvers .get(eraseKey(broadcastKeyPattern)) .apply(chatRepository, resolvedKeyPattern); From f37cd5aa35dd28b93e40b53faef6d677381003a1 Mon Sep 17 00:00:00 2001 From: Igor Date: Mon, 22 Apr 2024 13:10:27 +0300 Subject: [PATCH 10/10] feat(BroadcastRepositoryResolver): add tests --- .../unit/BroadcastRepositoryResolverTest.java | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 backend/ws-server/src/test/java/org/linkwave/ws/unit/BroadcastRepositoryResolverTest.java diff --git a/backend/ws-server/src/test/java/org/linkwave/ws/unit/BroadcastRepositoryResolverTest.java b/backend/ws-server/src/test/java/org/linkwave/ws/unit/BroadcastRepositoryResolverTest.java new file mode 100644 index 00000000..dedb6c53 --- /dev/null +++ b/backend/ws-server/src/test/java/org/linkwave/ws/unit/BroadcastRepositoryResolverTest.java @@ -0,0 +1,79 @@ +package org.linkwave.ws.unit; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.linkwave.ws.repository.ChatRepository; +import org.linkwave.ws.repository.SessionRepository; +import org.linkwave.ws.websocket.routing.broadcast.BroadcastRepositoryResolverImpl; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Map; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.linkwave.ws.unit.SessionTestUtils.generateSessionIds; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +public class BroadcastRepositoryResolverTest { + + @Mock + private ChatRepository chatRepository; + + private BroadcastRepositoryResolverImpl repositoryResolver; + + @BeforeEach + void setUp() { + repositoryResolver = new BroadcastRepositoryResolverImpl( + chatRepository, + Map.of( + "user:{}", SessionRepository::getUserSessions, + "chat:{}", ChatRepository::getSessions + ) + ); + } + + @Test + void Should_ResolveChatSessions_When_KeysValid() { + final String broadcastKeyPattern = "chat:{id}"; + final String resolvedKeyPattern = "chat:xyz"; + final Set expectedSessionIds = generateSessionIds(5L); + + when(chatRepository.getSessions(resolvedKeyPattern)).thenReturn(expectedSessionIds); + + final Set actualSessions = repositoryResolver.resolve(broadcastKeyPattern, resolvedKeyPattern); + + verify(chatRepository, only()).getSessions(resolvedKeyPattern); + assertThat(actualSessions).isNotNull(); + assertThat(actualSessions).isNotEmpty(); + assertThat(actualSessions).isEqualTo(expectedSessionIds); + } + + @Test + void Should_ResolveUserSessions_When_KeysValid() { + final String broadcastKeyPattern = "user:{id}"; + final String resolvedKeyPattern = "user:1"; + final Set expectedSessionIds = generateSessionIds(5L); + + when(chatRepository.getUserSessions(resolvedKeyPattern)).thenReturn(expectedSessionIds); + + final Set actualSessions = repositoryResolver.resolve(broadcastKeyPattern, resolvedKeyPattern); + + verify(chatRepository, only()).getUserSessions(resolvedKeyPattern); + assertThat(actualSessions).isNotNull(); + assertThat(actualSessions).isNotEmpty(); + assertThat(actualSessions).isEqualTo(expectedSessionIds); + } + + @Test + void Should_ThrowException_When_KeysInvalid() { + assertThrows(NullPointerException.class, () -> repositoryResolver.resolve(null, null)); + assertThrows(NullPointerException.class, () -> repositoryResolver.resolve("chat:{id}", null)); + assertThrows(NullPointerException.class, () -> repositoryResolver.resolve(null, "chat:xyz")); + assertThrows(IllegalArgumentException.class, () -> repositoryResolver.resolve("chat:{id}", "chat:{id}")); + } + +}