Skip to content

Commit

Permalink
feat: remove group chat via websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
borjom1 committed May 26, 2024
1 parent 25642e2 commit fb01edb
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ public interface ChatService {
void updateGroupChat(Long initiatorId, String chatId, @NonNull UpdateGroupChat updateGroupChat)
throws ChatMemberPermissionsDenied;

void removeGroupChat(Long initiatorId, String chatId) throws ChatMemberPermissionsDenied;

/**
* Returns group chat details if initiator has access to it.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class ChatMemberDetailsDto {

private String username;
private String name;
private String avatarPath;
private boolean isAvatarAvailable;
private boolean isOnline;

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ default void removeMember(U userId, C chatId) {

boolean chatExists(String chatId);

void deleteChatWithMembers(C chatId, Set<U> members);

Set<String> getChatMembersSessions(C chatId);

int getUnreadMessages(C chatId, U userId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ public boolean chatExists(String chatId) {
return TRUE.equals(redisTemplate.hasKey(chatKey(chatId)));
}

@Override
public void deleteChatWithMembers(String chatId, Set<Long> members) {
executeInTxn(redisTemplate, ops -> {
ops.delete(chatKey(chatId));
members.stream()
.map(this::userChatsKey)
.forEach(userChatsKey -> ops.opsForHash().delete(userChatsKey, chatId));
});
}

@Override
public void saveSession(Long userId, String sessionId) {
redisTemplate.opsForSet().add(userKey(userId), sessionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public enum Action {
// Messages
MESSAGE, FILE, BIND, UPD_MESSAGE,
READ, UNREAD_MESSAGES,
REMOVE, CLEAR_HISTORY,
REMOVE, CLEAR_HISTORY, CHAT_DELETED,

// Error
ERROR
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package org.linkwave.ws.websocket.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;

@NoArgsConstructor
@Getter
@JsonIgnoreProperties(ignoreUnknown = true)
@SuperBuilder
public class ChatMessage extends StatusMessage {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.linkwave.ws.websocket.route;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.linkwave.ws.api.ApiErrorException;
import org.linkwave.ws.api.chat.ChatMemberDto;
Expand All @@ -14,10 +16,15 @@
import org.linkwave.ws.websocket.routing.bpp.Broadcast;
import org.linkwave.ws.websocket.routing.bpp.Endpoint;
import org.linkwave.ws.websocket.routing.bpp.WebSocketRoute;
import org.linkwave.ws.websocket.routing.broadcast.WebSocketMessageBroadcast;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.lang.NonNull;
import org.springframework.web.bind.annotation.PathVariable;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.linkwave.shared.utils.Bearers.append;
import static org.linkwave.ws.websocket.routing.Box.error;
Expand All @@ -29,7 +36,15 @@
public class GroupChatRoutes {

private final ChatRepository<Long, String> chatRepository;
private final WebSocketMessageBroadcast messageBroadcast;
private final ChatServiceClient chatClient;
private final ObjectMapper objectMapper;

@Value("${server.instances.list}")
private String[] instances;

@Value("${server.instances.enabled}")
private boolean isMibEnabled;

@Endpoint(value = "/create", disabled = true)
public Box<GroupChatDto> createChat(@NonNull UserPrincipal principal,
Expand Down Expand Up @@ -274,4 +289,61 @@ public Box<ChatRoleMessage> changeMemberRole(@PathVariable String id,
.build());
}

@SuppressWarnings("unchecked")
@SneakyThrows
@Endpoint("/{id}/remove")
public Box<Void> removeChat(@PathVariable String id,
@NonNull UserPrincipal sender,
@NonNull String path) {

final Long senderId = sender.token().userId();

if (!chatRepository.isMember(id, senderId)) {
return error(ErrorMessage.create("You are not member of chat", path));
}

// make an api call to delete chat
try {
chatClient.removeGroupChat(append(sender.rawAccessToken()), id);
} catch (ApiErrorException e) {
return error(ErrorMessage.create(e.getMessage(), path));
}

final Set<Long> members = chatRepository.getMembers(id);
final Set<String> membersSessions = members.stream()
.map(chatRepository::getUserSessions)
.flatMap(Set::stream)
.collect(Collectors.toSet());

final String serializedMessage = objectMapper.writeValueAsString(
ChatMessage.builder()
.senderId(senderId)
.chatId(id)
.action(Action.CHAT_DELETED)
.build()
);

boolean isSharedCompletely = false;
try {
isSharedCompletely = messageBroadcast.share(membersSessions, serializedMessage);
} catch (IOException e) {
log.error("removeChat(): {}", e.getMessage());
}

if (!isSharedCompletely && isMibEnabled) {

final var content = objectMapper.readValue(serializedMessage, Map.class);
content.put("members", members);
final String newSerializedMessage = objectMapper.writeValueAsString(content);

for (String instanceId : instances) {
chatRepository.shareWithConsumer(instanceId, newSerializedMessage);
}
}

chatRepository.deleteChatWithMembers(id, members);

return Box.ok();
}

}
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
package org.linkwave.ws.websocket.routing.broadcast.instances;

import org.linkwave.ws.websocket.dto.*;
import org.linkwave.ws.repository.ChatRepository;
import org.linkwave.ws.websocket.routing.broadcast.WebSocketMessageBroadcast;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.linkwave.ws.repository.ChatRepository;
import org.linkwave.ws.websocket.dto.Action;
import org.linkwave.ws.websocket.dto.BaseMessage;
import org.linkwave.ws.websocket.dto.ChatMessage;
import org.linkwave.ws.websocket.dto.StatusMessage;
import org.linkwave.ws.websocket.routing.broadcast.WebSocketMessageBroadcast;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;

import java.util.*;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toSet;
Expand All @@ -20,6 +27,10 @@
@RequiredArgsConstructor
public class MessageDelegateImpl implements MessageDelegate {

private static final TypeReference<Map<String, Object>> MESSAGE_CONTENT_TYPE = new TypeReference<>() {};
private static final String HIDDEN_CHATS_KEY = "chats";
private static final String HIDDEN_MEMBERS_KEY = "members";

private final ChatRepository<Long, String> chatRepository;
private final WebSocketMessageBroadcast messageBroadcast;
private final ObjectMapper mapper;
Expand Down Expand Up @@ -51,7 +62,7 @@ public void handleMessage(@NonNull String message) {

if (action.equals(Action.OFFLINE)) {
userChats = new HashSet<>(
(List<String>) mapper.readValue(message, Map.class).get("chats")
(List<String>) mapper.readValue(message, MESSAGE_CONTENT_TYPE).get(HIDDEN_CHATS_KEY)
);
message = mapper.writeValueAsString(statusMessage); // remove chats property
} else {
Expand All @@ -65,6 +76,26 @@ public void handleMessage(@NonNull String message) {
.flatMap(Stream::distinct)
.collect(toSet());
}
case CHAT_DELETED -> {
final var content = mapper.readValue(message, MESSAGE_CONTENT_TYPE);

// get ids from json
final Set<Long> membersIds = new HashSet<>((
((List<Integer>) content.get(HIDDEN_MEMBERS_KEY)).stream()
.map(Integer::longValue)
.collect(toSet()))
);

// remove members property
content.remove(HIDDEN_MEMBERS_KEY);
message = mapper.writeValueAsString(content);

// collect members' sessions
members = membersIds.stream()
.map(chatRepository::getUserSessions)
.flatMap(Set::stream)
.collect(toSet());
}
case BIND, ERROR -> throw new IllegalStateException("Unsupported message action");
default -> {
var chatMessage = mapper.readValue(message, ChatMessage.class);
Expand Down

0 comments on commit fb01edb

Please sign in to comment.