Room 기록서 2021.06.07

2021. 6. 8. 00:45

실시간 통신을 위한 Socket Server 프로젝트 생성

룸의 실시간 통신을 위한 서버 프로젝트 생성했다.

Webflux와 Websocket을 이용해서 실시간 통신을 해볼 생각이다.

 

Spring Webflux를 처음 사용해보았다. 

Reactive Stream을 제공한다고 한다. 아직 정확히 어떤건지 모르겠다. 이벤트가 발생했을 때 퍼블리셔가 생성 컨슈머가 소비 하는 느낌인가보다.

기존의 mvc방식도 제공하지만 reactive방식을 사용해보았다. 

대충 controller -> router, service -> handler 느낌인 것 같다.

 

그리고 또 생소했던 것이 Mono와 Flux이다. Mono는 단일, Flux는 여러개의 데이터를 전달한다는 데 아직 잘 모르겠다.

 

간단한 example을 통해서 webflux + websocket 익히기

https://developpaper.com/realization-of-websocket-chat-function-based-on-webflux-1/ 해당 사이트를 참고해서 example을 수행했다.

EchoHandler를 만들어보자.

@Component
public class EchoHandler implements WebSocketHandler {
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        Flux<WebSocketMessage> messageFlux = session.receive().map(message -> {
            String payload = message.getPayloadAsText();
            return "Received: " + payload;
        }).map(session::textMessage);
        return session.send(messageFlux);
    }
}

간단한 받은 문자열을 그대로 리턴하는 EchoHandler를 구현했다.

여기서 흥미로운 것은 `receive()`는 inbound stream이며 send()메소드는 outbound stream이다.

 

이제 handler를 url매핑할 router를 만들어보자.

@Configuration
public class WebSocketConfig {
    @Bean
    public HandlerMapping webSocketMapping() {
        SimpleUrlHandlerMapping simpleUrlHandlerMapping = new SimpleUrlHandlerMapping();
        Map<String, WebSocketHandler> handlerMap = new LinkedHashMap<>();
        handlerMap.put("/ws/echo", new EchoHandler());
        simpleUrlHandlerMapping.setUrlMap(handlerMap);
        simpleUrlHandlerMapping.setOrder(-1);
        return simpleUrlHandlerMapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

LinkedHashMap으로 url과 handler를 매핑한다.

 

Simple Chat을 만들어보자.

이제는 chat을 연습해보자.

@Data
public class Message {
    private String targetId;
    private String messageText;
    private String userId;
}

먼저 Chat 자신, 상대방 식별자와 메세지를 저장할 VO를 만들어주었다.

 

Handler를 만들어보자.

@Component
public class ChatHandler implements WebSocketHandler {
    private static final Map<String, WebSocketSession> userMap = new ConcurrentHashMap<>();
    private static final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        String query = session.getHandshakeInfo().getUri().getQuery();
        Map<String, String> queryMap = getQueryMap(query);
        String userId = queryMap.getOrDefault("id", "");
        userMap.put(userId, session);
        return session.receive().flatMap(webSocketMessage -> {
            String payload = webSocketMessage.getPayloadAsText();
            Message message;
            try {
                message = objectMapper.readValue(payload, Message.class);
                String targetId = message.getTargetId();
                if (userMap.containsKey(targetId)) {
                    WebSocketSession targetSession = userMap.get(targetId);
                    if (null != targetSession) {
                        WebSocketMessage textMessage = targetSession.textMessage(message.getMessageText());
                        return targetSession.send(Mono.just(textMessage));
                    }
                }
            } catch (JsonProcessingException e) {
                e.printStackTrace();
                return session.send(Mono.just(session.textMessage(e.getMessage())));
            }
            return  session.send ( Mono.just ( session.textMessage (the "target user is not online"));
        }).then().doFinally(signal ->  userMap.remove (userid)); // delete the connection after the user closes the connection
    }

    private Map<String, String> getQueryMap(String queryStr) {
        Map<String, String> queryMap = new HashMap<>();
        if (!StringUtils.isEmpty(queryStr)) {
            String[] queryParam = queryStr.split("&");
            Arrays.stream(queryParam).forEach(s -> {
                String[] kv = s.split("=", 2);
                String value = kv.length == 2 ? kv[1] : "";
                queryMap.put(kv[0], value);
            });
        }
        return queryMap;
    }
}

먼저 websocket 연결 세션을 가지고 있을 collection인 ConcurrentHashMap을 생성했다. 여러 요청의 동시성을 고려해서 Concurrent로 만들어야 Thread Block이 최소로 일어난다. 

코드의 동작과정을 적어보면, 처음 해당 handler로 websocket 연결을 하면 handle 메소드가 실행된다.

String query부터 return session.receive() 까지 실행되며,

메세지가 올 때마다 receive().flatMap() 내부가 실행된다.

이후 doFinally는 websocket 연결이 끈길때 실행된다. 

 

router는 기존의 EchoHandler에서 만든 router에 url과 handler만 추가로 매핑 해주었다.

@Configuration
public class WebSocketConfig {
    @Bean
    public HandlerMapping webSocketMapping() {
        ...
        handlerMap.put("/ws/echo", new EchoHandler());
        handlerMap.put("/ws/chat", new ChatHandler());
        ...
    }
    ...
}

 

 

구현한 기능 테스트 해보기

해당 https://www.websocket.org/echo.html 사이트에서 테스트를 진행했다.

1. EchoHandler

 ws://localhost:8081/ws/echo로 접속했고 hello world를 보내고 hello world를 전송받았다!

 

2. Simple Chat

1번의 식별자로 chat서버에 접속했다! 2번에게 hello를 전송했고 2번에게서 world를 받았다.

2번의 식별자로 chat서버에 접속했다! 1번에게서 hello를 받았고 1번에게 world를 전송했다.

 

생각

간단하게 Webflux + WebSocket으로 실시간 Chat서버를 만들었다. Room프로젝트에서 필요로하는 기능은 이번 example을 활용하면 구현가능할 것으로 보인다. 

새로운 것을 해보는 것은 언제든지 즐거운 것 같다. 

'Projects > Room' 카테고리의 다른 글

Room 기록서 '21.06.21 Chat Tab 개발  (0) 2021.06.21
Room 기록서 '21.06.09  (0) 2021.06.09

BELATED ARTICLES

more