반응형
01-09 06:04
Today
Total
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
관리 메뉴

개발하는 고라니

[Spring Boot] WebSocket과 채팅 (4) - RabbitMQ 본문

Framework/Spring Boot

[Spring Boot] WebSocket과 채팅 (4) - RabbitMQ

조용한고라니 2021. 7. 30. 18:11
반응형

들어가기 앞서

 

[Spring Boot] WebSocket과 채팅 (3) - STOMP

[Spring Boot] WebSocket과 채팅 (2) - SockJS [Spring Boot] WebSocket과 채팅 (1) 일전에 WebSocket(웹소켓)과 SockJS를 사용해 Spring 프레임워크 환경에서 간단한 하나의 채팅방을 구현해본 적이 있다. [Sprin..

dev-gorany.tistory.com

이전에 Websocket + SockJS + STOMP로 클라이언트간 채팅을 간단하게 구현을 해보았다. 먼저 왜 이를 썻는지에 대한 이유를 요약하고 넘어가보자.

 

Websocket

  • 페이지의 refresh 없이 나 또는 다른 사람이 보낸 채팅을 받을 수 있어야 한다.
  • 즉, 연결이 끊기지 않아야 한다.

SockJS

  • 브라우저에서 Websocket을 지원하지 않거나, 네트워크 Proxy 제약 등으로 인한 Websocket을 사용할 수 없을 경우 fallback option을 제공하는데, 이는 SockJS Protocol에 기반으로 Websocket API를 사용할 수 있도록 한다.

STOMP

  • 웹소켓만 사용했을 땐 직접 세션을 관리해서, 해당 세션으로 채팅 데이터를 전송해야했다면, STOMP를 사용함으로써 publish/subscribe (발행/구독) 구조로 간단하게 메세지를 선택적으로 수신할 수 있었다.

 

이처럼 채팅을 구현하는데 왜 위와 같은 것들을 사용했는지 아주 간단하게 짚어보았다. 이제 조금 나아가 RabbitMQ라는 메세지 브로커를 사용해보고자 한다.

 

RabbitMQ는 Erlang이라는 언어로 구성된 오픈소스 소프트웨어이고, AMQP라는 프로토콜(AMQP가 뭔지 아직 제대로 모름)을 구현한 메세지 브로커이다. 자세한 설명과 설치 방법은 다음 링크에서 참고할 수 있다. 로컬 스토리지에 설치하는 방법과 Docker를 이용해 끌어오는 방법이 있는데, 나는 로컬에 설치하는 방법으로 진행하였다. (사실 RabbitMQ를 범용적으로 사용하기 위해서는 Docker라는 것을 이용해 외부에 두고 사용하는 법을 아는 것이 더 좋을 듯 하다.)

 

 

[RabbitMQ] install

RabbitMQ는 오픈소스 메세지 브로커이다. 메세지를 많은 사용자에게 전달해야하거나, 처리 시간이 긴 문제가 있을 경우, 빠른 응답을 만들기 위해 사용한다. Downloading and Installing RabbitMQ — RabbitMQ Do

dev-gorany.tistory.com

 

Messaging that just works — RabbitMQ

Developer Experience Deploy with BOSH, Chef, Docker and Puppet. Develop cross-language messaging with favorite programming languages such as: Java, .NET, PHP, Python, JavaScript, Ruby, Go, and many others.

www.rabbitmq.com

 

여기서 RabbitMQ가 무엇인지, 어떻게 설치하고 동작 방식에 대해 따로 기술하진 않으므로 RabbitMQ 공식 사이트를 보는 것이 가장 바람직하고, 다른 블로그나 본인 블로그에 튜토리얼을 번역한 것이 있으므로 참고하면 좋을 것 같다.

 

양해 글 : 이전 포스팅에서 사용하던 ChatMessageDTO, ChatRoomDTO , Javascript 및 몇몇 클래스의 코드가 일부 변경되었습니다. 이점 양해 부탁드리며, 적용법에 중점을 두었으면 좋겠습니다!

STOMP + RabbitMQ

혹시 이전의 포스팅을 보신적이 있거나 구현을 해보았다면, 혹은 STOMP를 적용한 채팅을 만들어보셨다면, 대부분 STOMP가 갖고있는(내장하고 있는)  SimpleBroker 라는 것을 사용했을 것 이다. 이것만으로도 채팅을 구현하는 것에 문제는 없다고 생각된다.

 

 실제로 진행했던 프로젝트에서도 이것을 사용해서 구현하였고 성능에 관한 문제 또한 없었다고 자신한다. 하지만 이는 이용자 수가 적을 때의 얘기이다. 만일 이용자 수가 증가하여 처리해야하는 데이터가 많아진다면, 내장되어있는 SimpleBroker는 철저하게 Spring Boot가 실행되는 (정확하게는 채팅 서버) 곳의 메모리를 잡아먹는다. 따라서 다른 많은 비즈니스 로직과 채팅에 대한 부담까지 '하나의 서버'가 떠안게 된다.

 

그래서 조금 더 개선시켜 보고자 한 것이 이 방향이다. "채팅 관리 따로 빼서 서버의 부담을 줄여주자." 실제로 Spring docs의 Websocket에 관한 부분을 보면 다음과 같은 설명이 있다.

 

You can (optionally) use message brokers (such as RabbitMQ, ActiveMQ, and others) to manage subscriptions and broadcast messages.

구독을 관리하고 메세지를 전달하는데에 선택적으로 메세지 브로커(RabbitMQ나 ActiveMQ 같은)를 사용할 수 있다.

 

다음은 외부 메세지 브로커를 사용했을 때의 Message Flow에 대한 그림이다.

출처: https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#spring-web

이전 포스팅에 있는 그림과 비교해보면 똑같으면서도 다르다. 노란색 원기둥이 추가가 되었다. 이전의 그림과 가장 큰 차이점은,

 

1. TCP를 통해 외부 STOMP 브로커로 메세지를 전달하고 브로커에서 구독 Client로 메세지를 전달하기 위해 "Broker relay"를 사용한다는 점이다.

2. WebSocket Connection으로부터 메세지를 받았을 때, STOMP 메세지 프레임으로 decode되고, Spring Message 표현으로 변한다. 그리고 나머지 처리를 위해 'clientInboundChannel'로 보내진다.

3. (해석귀찮) For example, STOMP messages whose destination headers start with /app may be routed to @MessageMapping methods in annotated controllers, while /topic and /queue messages may be routed directly to the message broker.

 

사실 읽어도 지금은 무슨 말인지 잘 모른다. 1주일을 영어로된 RabbitMQ docs, Spring docs 및 몇몇 블로그를 보아도 여전히 다 이해하기는 힘든 것 같다. 공부해보며 느낀거지만, RabbitMQ는 주로 클라우드 환경에서 서버간 메세징에 적합하고, 어떻게 구성하느냐에 따라 성능이 크게 차이나며 역할도 잘 잡아야 하는 것 같다. 그래서 시니어 개발자 분들은 메세징을 할 때(여기서 메세징은 채팅이 아님) 대부분 RabbitMQ, Kafka 그리고 가끔 Redis 을 고려하시는 것을 보았다.

어쨋든,, 한 두번 읽어서 알 수 있는 스케일의 소프트웨어가 아니므로 관심이 있다면 천천히 공부해보면 좋을 것 같다.

 

각설하고 이를 채팅에 적용하기 위해 준비해야할 것들을 보자.

Dependency

/* Gradle */
//외부 브로커를 사용하기 위해
implementation group: 'org.springframework.boot', name: 'spring-boot-starter-reactor-netty', version: '2.4.6'

//jackson2json에서 LocalDateTime을 handling 하기 위해
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.12.4'

implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.springframework.boot:spring-boot-starter-websocket'
testImplementation 'org.springframework.amqp:spring-rabbit-test'

/* ---아래는 선택--- */
// https://mvnrepository.com/artifact/org.webjars/sockjs-client
implementation group: 'org.webjars', name: 'sockjs-client', version: '1.1.2'

// https://mvnrepository.com/artifact/org.webjars/stomp-websocket
implementation group: 'org.webjars', name: 'stomp-websocket', version: '2.3.3-1'
/* Maven */
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-reactor-netty -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-reactor-netty</artifactId>
    <version>2.4.6</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310 -->
<dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
    <version>2.12.4</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-websocket -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

/* ---아래는 선택--- */
<!-- https://mvnrepository.com/artifact/org.webjars/sockjs-client -->
<dependency>
    <groupId>org.webjars</groupId>
    <artifactId>sockjs-client</artifactId>
    <version>1.1.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.webjars/stomp-websocket -->
<dependency>
    <groupId>org.webjars</groupId>
    <artifactId>stomp-websocket</artifactId>
    <version>2.3.3-1</version>
</dependency>

이번 글에서 진행할 내용에 추가된 디펜던시들이고, 전체 디펜던시는 다음과 같다.

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-amqp'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    implementation 'org.springframework.boot:spring-boot-starter-jdbc'
    implementation 'org.springframework.boot:spring-boot-starter-thymeleaf'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-websocket'
    compileOnly 'org.projectlombok:lombok'
    developmentOnly 'org.springframework.boot:spring-boot-devtools'
    runtimeOnly 'com.h2database:h2'
    runtimeOnly 'mysql:mysql-connector-java'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.amqp:spring-rabbit-test'

    //외부 브로커를 사용하기 위해
    // https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-reactor-netty
    implementation group: 'org.springframework.boot', name: 'spring-boot-starter-reactor-netty', version: '2.4.6'

    //jackson2json에서 LocalDateTime을 handling 하기 위해
    // https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310
    implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.12.4'

    // https://mvnrepository.com/artifact/org.webjars/sockjs-client
    implementation group: 'org.webjars', name: 'sockjs-client', version: '1.1.2'

    // https://mvnrepository.com/artifact/org.webjars/stomp-websocket
    implementation group: 'org.webjars', name: 'stomp-websocket', version: '2.3.3-1'

}

Configuration

채팅을 위해 STOMPConfig, RabbitConfig를 설정해야하고, properties에도 몇 가지 정보를 추가해야한다.

properties

//application.properties

#RabbitMQ
spring.rabbitmq.username=guest //default ID
spring.rabbitmq.password=guest //default Password
spring.rabbitmq.host=localhost //default host
spring.rabbitmq.port=5672      //default port

StompConfig

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.util.AntPathMatcher;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class StompConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/stomp/chat")
                .setAllowedOriginPatterns("http://*.*.*.*:8081", "http://*:8081") //안해도 무관
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        
        registry.setPathMatcher(new AntPathMatcher(".")); // url을 chat/room/3 -> chat.room.3으로 참조하기 위한 설정
        registry.setApplicationDestinationPrefixes("/pub");
        
        //registry.enableSimpleBroker("/sub");
        registry.enableStompBrokerRelay("/queue", "/topic", "/exchange", "/amq/queue");
    }

}

RabbitConfig

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;

@Configuration
@EnableRabbit
public class RabbitConfig {

    private static final String CHAT_QUEUE_NAME = "chat.queue";
    private static final String CHAT_EXCHANGE_NAME = "chat.exchange";
    private static final String ROUTING_KEY = "room.*";

    //Queue 등록
    @Bean
    public Queue queue(){ return new Queue(CHAT_QUEUE_NAME, true); }

    //Exchange 등록
    @Bean
    public TopicExchange exchange(){ return new TopicExchange(CHAT_EXCHANGE_NAME); }

    //Exchange와 Queue 바인딩
    @Bean
    public Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }

    /* messageConverter를 커스터마이징 하기 위해 Bean 새로 등록 */
    @Bean
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        rabbitTemplate.setRoutingKey(CHAT_QUEUE_NAME);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer container(){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueueNames(CHAT_QUEUE_NAME);
        container.setMessageListener(null);
        return container;
    }

    //Spring에서 자동생성해주는 ConnectionFactory는 SimpleConnectionFactory인가? 그건데
    //여기서 사용하는 건 CachingConnectionFacotry라 새로 등록해줌
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        return factory;
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter(){
        //LocalDateTime serializable을 위해
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, true);
        objectMapper.registerModule(dateTimeModule());

        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(objectMapper);

        return converter;
    }

    @Bean
    public Module dateTimeModule(){
        return new JavaTimeModule();
    }
}

DTO

사실 지금 Object들의 스펙은 크게 상관이 없다. 동작을 테스트하는데 사용한 컬럼은 ChatRoomId와 message 정도이니..

ChatDTO

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@ToString
public class ChatDTO {

    private Long id;
    private Long chatRoomId;
    private Long memberId;

    private String message;
    private String region;

    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
    private LocalDateTime regDate;
}

ChatRoomDTO

/* Not yet */

Controller

컨트롤러는 간단하게 채팅방을 주는 컨트롤러와, 전송한 메세지를 받아 처리하는 컨트롤러만 있으면 된다.

ChatRoomController

@Controller
@RequestMapping(value = "/chat")
public class ChatRoomController {

    @GetMapping("/rooms")
    public String getRooms(){
        return "chat/rooms";
    }

    @GetMapping(value = "/room")
    public String getRoom(Long chatRoomId, String nickname, Model model){

        model.addAttribute("chatRoomId", chatRoomId);
        model.addAttribute("nickname", nickname);

        return "chat/room";
    }
}

StompRabbitController

import com.gorany.ichatu.dto.ChatDTO;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Lazy;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;

import java.time.LocalDateTime;

@Controller
@RequiredArgsConstructor
@Log4j2
public class StompRabbitController {

    private final RabbitTemplate template;

    private final static String CHAT_EXCHANGE_NAME = "chat.exchange";
    private final static String CHAT_QUEUE_NAME = "chat.queue";

    @MessageMapping("chat.enter.{chatRoomId}")
    public void enter(ChatDTO chat, @DestinationVariable String chatRoomId){
        
        chat.setMessage("입장하셨습니다.");
        chat.setRegDate(LocalDateTime.now());

        template.convertAndSend(CHAT_EXCHANGE_NAME, "room." + chatRoomId, chat); // exchange
        //template.convertAndSend("room." + chatRoomId, chat); //queue
        //template.convertAndSend("amq.topic", "room." + chatRoomId, chat); //topic
    }

    @MessageMapping("chat.message.{chatRoomId}")
    public void send(ChatDTO chat, @DestinationVariable String chatRoomId){
        
        chat.setRegDate(LocalDateTime.now());

        template.convertAndSend(CHAT_EXCHANGE_NAME, "room." + chatRoomId, chat);
        //template.convertAndSend( "room." + chatRoomId, chat);
        //template.convertAndSend("amq.topic", "room." + chatRoomId, chat);
    }

    //receive()는 단순히 큐에 들어온 메세지를 소비만 한다. (현재는 디버그용도)
    @RabbitListener(queues = CHAT_QUEUE_NAME)
    public void receive(ChatDTO chat){

        System.out.println("received : " + chat.getMessage());
    }
}

이 컨트롤러는 주의깊게 봐야한다. StompController와 비교했을 때 달라진 부분이 많다.

  • SimpMessagingTemplate -> RabbitTemplate
  • @MessageMapping()의 '/' -> '.'
  • @DestinationVariable은 Restfult API에서 @PathVariable과 비슷한 용도이다.
  • @RabbitListener(queues = CHAT_QUEUE_NAME) 어노테이션은, 'chat.queue'라는 Queue을 구독해 그 큐에 들어온 메세지를 소비하는 "소비자"가 되겠다는 어노테이션이다. 물론 속성은 더 많고 다르게 쓰일 수 있지만 여기선 그렇다.
  • convertAndSend([exchange 이름], routing-key, 전송하고자 하는 것)

RabbitMQ

RabbitMQ를 정상적으로 설치를 마쳤다면 (docker로 끌어온 경우는 잘 모르겠다...) Command창에서 RabbitMQ 서버를 실행하고.

$ rabbitmq-server start

Browser에 http://localhost:15672를 치면 다음과 같은 창이 나온다. 여기에 기본적으로 ID:guest PW:guest를 입력하면 접속이 가능하다.

접속하면 위와같이 현재 RabbitMQ 서버에 대한 개요를 볼 수 있으니 나중에 유용하게 써먹도록 하자.

 

아 그리고 STOMP를 적용하기 위해 플러그인을 설치해주어야 하는데, Command 창을 켜고 다음 명령어를 입력한다

$ rabbitmq-plugins enable rabbitmq_stomp

HTML

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8">
    <title>Title</title>

    <style>
        .chats{
            display: flex;
            flex-direction: column;
            
            gap: 10px;
        }
        .mine{
            background: #e9e9e9;
            border-radius: 5px;
        }
        .yours{
            background: #efef87;
            border-radius: 5px;
        }
        .nickname{
            font-size: 16px;
            font-weight: bold;
        }
        .message{
            font-size: 14px;
        }
    </style>
</head>
<body>

<h1>CHAT ROOM</h1>
<h2 th:text="'Room No. ' + ${chatRoomId}"></h2>
<h2 th:text="'Nickname = ' + ${nickname}"></h2>

<form>
    <input type="text" id="message">
    <input type="submit" value="전송" class="btn-send">
</form>

<div class="chats">
    <div class="mine">

    </div>
</div>

<script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
</body>
</html>

Javascript -> SockJS Client / Stomp Client

자바스크립트로 클라이언트 코드를 짰는데, 구독(sub)를 어떻게 하느냐에 따라 Rabbit의 동작이 천차만별로 달라지니 틀만 만들어 두고, 밑에서 조금 자세하게 풀어보고자 한다.

<script th:inline="javascript">
    const chats = document.querySelector('.chats');
    const messageContent = document.querySelector('#message');
    const btnSend = document.querySelector('.btn-send');

    const chatRoomId = [[${chatRoomId}]];
    const nickname = [[${nickname}]];

    const sockJS = new SockJS("/stomp/chat");
    const stomp = Stomp.over(sockJS);

    stomp.heartbeat.outgoing = 0; //Rabbit에선 heartbeat 안먹힌다고 함
    stomp.heartbeat.incoming = 0; //Rabbit에선 heartbeat 안먹힌다고 함

    function onError(e) {
        console.log("STOMP ERROR", e);
    }

    function onDebug(m) {
        console.log("STOMP DEBUG", m);
    }

    stomp.debug = onDebug;

    stomp.connect('guest', 'guest', function (frame) {

        console.log('STOMP Connected');
        
        /* subscribe 설정에 따라 rabbit의 Exchange, Queue가 상당히 많이 바뀜 */
        stomp.subscribe(``, function (content) {

            const payload = JSON.parse(content.body);

            let className = payload.nickname == nickname? 'mine' : 'yours';

            const html = `<div class="${className}">
                            <div class="nickname">${payload.nickname}</div>
                            <div class="message">${payload.message}</div>
                        </div>`

            chats.insertAdjacentHTML('beforeend', html);
            
            //밑의 인자는 Queue 생성 시 주는 옵션
            //auto-delete : Consumer가 없으면 스스로 삭제되는 Queue
            //durable : 서버와 연결이 끊겨도 메세지를 저장하고 있음
            //exclusive : 동일한 이름의 Queue 생길 수 있음
        },{'auto-delete':true, 'durable':false, 'exclusive':false});

        //입장 메세지 전송
        stomp.send(`/pub/chat.enter.${chatRoomId}`, {}, JSON.stringify({
            memberId: 1,
            nickname: nickname
        }));

    }, onError, '/');

    //메세지 전송 버튼 click
    btnSend.addEventListener('click', (e) => {
        e.preventDefault();

        const message = messageContent.value;
        messageContent.value = '';

        stomp.send(`/pub/chat.message.${chatRoomId}`, {}, JSON.stringify({
            message: message,
            memberId: 1,
            nickname: nickname
        }));
    });
</script>

Prepare Clients

1. localhost:8081/chat/room?chatRoomId=1&nickname=gorany

 

2. localhost:8081/chat/room?chatRoomId=1&nickname=deer

 

3 localhost:8081/chat/room?chatRoomId=1&nickname=guinea

 

3 localhost:8081/chat/room?chatRoomId=1&nickname=cat

 

총 4개의 Client를 준비했다. 2개를 해도 상관은 없다. 그리고 동작만을 확인하기 위해 코드를 완성시킨 상태가 아니기에, QueryString으로 채팅방의 번호와 Nickname을 받았다.

※ 현재 stomp.subscribe(``, function(content) { ... 처럼 되어있기 때문에 위와 같이 되지 않을 것 이다. (메세지가 오지 않을 것) 저 캡쳐는 설명을 위해 해둔 것 이므로 잠시기다려보자!

Destination

RabbitMQ를 사용해

Producer(Client) -> Message(+routing_key) -> Rabbit Broker[Exchange + Queues] -> Consumer(Client)

의 구성은 이미 다 짜놨었지만, 문제가 있었다. 애당초 예상했던 모델링은 다음과 같았다.

 

1. 클라이언트(Producer)가 어디로 가야할 지에 대한 Routing_key와 함께 메세지 내용을 담아 서버로 보낸다.

2. 서버는 이를 Rabbit Broker로 위임한다. 

3. Rabbit Broker는 특정 Exchange로 보내고, 메세지에 있는 routing_key와 매칭된(바인딩된) Queue로 메세지를 보낸다. (사실 진짜 보내는 것이 아닌 참조를 보낸다.)

4. Queue에 push된 메세지들은 그 Queue를 구독하고 있는 클라이언트(Consumer)들에게 소비된다.

 

이렇게 되면 하나의 Topic Exchange, 채팅방 N개에 대한 Queue가 N개가 존재하는 그림이다.

 

하지만 아무리 해봐도 위와같은 그림이 나오기 힘들다. 내가 아직 잘 몰라서 그럴수도 있는데... 좀 더 깊게,길게 공부해보면 답이 나올 수 도 있지 않을까 싶다.

 

그럼 어떻게 했길래 안되었다고 하는건지 설명할 차례이다. RabbitMQ docs에 기술된 stomp의 destination을 보자.

 

STOMP Plugin — RabbitMQ

STOMP Plugin RabbitMQ supports STOMP via a plugin that ships in the core distribution. The plugin supports STOMP versions 1.0 through 1.2 with some extensions and restrictions. STOMP clients can interoperate with other protocols. All the functionality in t

www.rabbitmq.com

  • /exchange
    • SUBSCRIBE - 임의의 바인딩 패턴으로 구독
    • SEND - 임의의 라우팅키와 함께 전송
  • /queue
    • SUBSCRIBE - STOMP에서 관리하는 Queue 구독
    • SEND - STOMP에서 관리하는 Queue로 전송
  • /amq/queue
    • SUBSCRIBE - STOMP gateway 바깥에서 만들어진 Queue 구독
    • SEND - STOMP gateway 바깥에서 만들어진 Queue로 전송
  • /topic
    • SUBSCRIBE - 일시적이고 지속되는 topic으로 구독
    • SEND - 일시적이고 지속되는 topic으로 전송
  • /temp-queue
    • SUBSCRIBE - 안함
    • SEND - 안함

 

※ (AMQP 0-9-1 의미론적)

 

MESSAGE 프레임의 'destination' 헤더는 마치 SEND 프레임에 있는 것 처럼 설정되있다.

  • destination을 '/queue/<name>'으로 설정하면 메세지는 default exchange로 발행된다. (중요)
  • destination을 '/topic/<routing_key>'로 설정하면 메세지는 'amq.topic'이라는 exchange로 발행된다. (중요)
  • 나머지 destination에 대해 /exchange/<exchange_name>/[routing_key]의 exchange로 메세지가 발행된다.

Exchange Destination

SUBSCRIBE 프레임에 대해 목적지로 '/exchange/<name>/[pattern]'이 사용될 수 있다.

  • 배타적이고, 자동으로 삭제되는 <name>으로 명명된 exchange가 생긴다.
  • 만약 [pattern]이 있으면, exchange와 queue를 패턴으로 바인딩한다.
  • 현재 STOMP 세션에서 그 queue에 대해 구독한다.

 

이 형태로 테스트해보고자 한다. 우리는 RabbitConfig에서, 'chat.exchange'라는 exchange와 'chat.queue'라는 queue를 생성했고, 'room.*'이라는 바인딩 키로 바인딩 서로를 바인딩 시켜놨다. 고로 현재 exchange는 다음과 같이 존재한다.

chat.exchange는 우리가 만든 것이고, 위에 나머지는 rabbit에서 기본으로 만들어준 것들이다. 아래서 몇 가지 exchange를 사용해볼 기회가 있을 것이다.

 

아까 비워놨던 자바스크립트 코드에 가서,

stomp.subscribe(`/exchange/chat.exchange/room.${chatRoomId}`, function (content) {
...

//chatRoomId가 1일 때
//chat.exchange라는 exchange에 room.1이라는 패턴을 가진 Queue를 생성 후
//바인딩하고 그 Queue를 구독한다.

이때 StompRabbitController의 코드이다.

public class StompRabbitController {

    private final RabbitTemplate template;

    private final static String CHAT_EXCHANGE_NAME = "chat.exchange";
    private final static String CHAT_QUEUE_NAME = "chat.queue";

    @MessageMapping("chat.enter.{chatRoomId}")
    public void enter(ChatDTO chat, @DestinationVariable String chatRoomId){
        chat.setMessage("입장하셨습니다.");
        chat.setRegDate(LocalDateTime.now());

        template.convertAndSend(CHAT_EXCHANGE_NAME, "room." + chatRoomId, chat); // exchange
    }

    @MessageMapping("chat.message.{chatRoomId}")
    public void send(ChatDTO chat, @DestinationVariable String chatRoomId){
        chat.setRegDate(LocalDateTime.now());

        template.convertAndSend(CHAT_EXCHANGE_NAME, "room." + chatRoomId, chat);
    }

    @RabbitListener(queues = CHAT_QUEUE_NAME)
    public void receive(ChatDTO chat){
        System.out.println("received : " + chat.getMessage());
    }
}

 

위와같이 설정하고 아까처럼 4개의 클라이언트를 켜보면 다음과 같이 4개의 Queue가 생긴다.

즉, 각 클라이언트마다 하나의 큐를 생성하고, exchange에게서 메세지를 받아오는 것 이다.

Queue를 자세히 보면 다음과 같이 바인딩 된 것을 알 수 있다.

바인딩

 

동작 예시

잘 보이진 않겠지만... 제대로 동작 한다.


Queue Destination

Controller는 다음과 같다.

public class StompRabbitController {

    private final RabbitTemplate template;

    private final static String CHAT_EXCHANGE_NAME = "chat.exchange";
    private final static String CHAT_QUEUE_NAME = "chat.queue";

    @MessageMapping("chat.enter.{chatRoomId}")
    public void enter(ChatDTO chat, @DestinationVariable String chatRoomId){
        chat.setMessage("입장하셨습니다.");
        chat.setRegDate(LocalDateTime.now());

        template.convertAndSend("room." + chatRoomId, chat); //queue
    }

    @MessageMapping("chat.message.{chatRoomId}")
    public void send(ChatDTO chat, @DestinationVariable String chatRoomId){
        chat.setRegDate(LocalDateTime.now());

        template.convertAndSend("room." + chatRoomId, chat);
    }

    @RabbitListener(queues = CHAT_QUEUE_NAME)
    public void receive(ChatDTO chat){

        System.out.println("received : " + chat.getMessage());
    }
}
stomp.subscribe(`/queue/room.${chatRoomId}`, function (content) {
...

//chatRoomId = 1일 때
//room.1이라는 이름의 Queue를 생성하고 구독한다.
//단, RabbitMQ의 default Exchange(AMQP Default)와 바인딩 된다.
//바인딩 키는 queue의 이름과 동일하다.

//이거하면 채팅이 안된다. 4개의 클라이언트 중 1개의 클라이언트로 밖에 메세지가 안감.
//이유는 AMQP Default의 type이 direct이기 때문이다.

Run을 해보자.

예상대로 room.1이라는 이름의 Queue가 생기고, 1번 채팅방의 모두가 Consumer가 된 것을 알 수 있다.

 

바인딩도 디폴트 익스체인지로 된 것을 알 수 있다.

 

하지만 채팅을 해보면?

각 채팅이 모든 클라이언트에게 전달되어야하는데, 한 클라이언트에게 밖에 가지 않는다. 채팅에는 어울리지 않는 exchange type이다.


Topic Destination

public class StompRabbitController {

    private final RabbitTemplate template;

    private final static String CHAT_EXCHANGE_NAME = "chat.exchange";
    private final static String CHAT_QUEUE_NAME = "chat.queue";

    @MessageMapping("chat.enter.{chatRoomId}")
    public void enter(ChatDTO chat, @DestinationVariable String chatRoomId){
        chat.setMessage("입장하셨습니다.");
        chat.setRegDate(LocalDateTime.now());

        template.convertAndSend("amq.topic", "room." + chatRoomId, chat); //topic
    }

    @MessageMapping("chat.message.{chatRoomId}")
    public void send(ChatDTO chat, @DestinationVariable String chatRoomId){
        chat.setRegDate(LocalDateTime.now());

        template.convertAndSend("amq.topic", "room." + chatRoomId, chat);
    }

    @RabbitListener(queues = CHAT_QUEUE_NAME)
    public void receive(ChatDTO chat){

        System.out.println("received : " + chat.getMessage());
    }
}

 

stomp.subscribe(`/topic/room.${chatRoomId}`, function (content) {
...

// /topic/<name> 의 형태
//if: chatRoomId = 1
//'amq.topic'이라는 Rabbit이 준비해둔 Exchange에 바인딩되는데, 바인딩 되는 Queue는 임의적인
//이름을 가지며, Binding_key는 room.1이다.

//exchange와 마찬가지로 클라이언트 당 1개의 Queue가 생긴다.
//이 때 생성되는 Queue는 auto_deleted하고, durable하며 이름은 subscription-xxx...와 같이 생성된다

이제 마찬가지로 Run 해보자.

"amq.topic"이라는 exchange를 상세히 보면 다음과 같이 4개의 Queue에 대해 'room.1'이라는 라우팅 키로 바인딩 된 것을 확인할 수 있다.

/exchange때처럼 subscription-xxx... 형태로 Queue가 1개씩 생김을 확인하였다.


Amq/queue Destination

- /amq/queue/<name> 의 형태

- <name>이라는 queue가 존재해야만 한다 (존재하지 않을 시 예외 발생)

 

amq/queue 형태의 경우, 미리 만들어둔 Queue를 사용하기 때문에 따로 바인딩을 생각할 필요는 없다. 예를 들어 아까 우리가 만들어둔 [chat.queue]와 [chat.exchange]는 [room.*]이라는 라우팅 키로 바인딩을 시켜놓았고, 이 chat.queue라는 큐를 쓸 때 /amq/queue/chat.queue라고 써주면 된다.

public class StompRabbitController {

    private final RabbitTemplate template;

    private final static String CHAT_EXCHANGE_NAME = "chat.exchange";
    private final static String CHAT_QUEUE_NAME = "chat.queue";

    @MessageMapping("chat.enter.{chatRoomId}")
    public void enter(ChatDTO chat, @DestinationVariable String chatRoomId){
        chat.setMessage("입장하셨습니다.");
        chat.setRegDate(LocalDateTime.now());

        template.convertAndSend(CHAT_EXCHANGE_NAME, "room." + chatRoomId, chat); // exchange
    }

    @MessageMapping("chat.message.{chatRoomId}")
    public void send(ChatDTO chat, @DestinationVariable String chatRoomId){
        chat.setRegDate(LocalDateTime.now());

        template.convertAndSend(CHAT_EXCHANGE_NAME, "room." + chatRoomId, chat);
    }

    @RabbitListener(queues = CHAT_QUEUE_NAME)
    public void receive(ChatDTO chat){
        System.out.println("received : " + chat.getMessage());
    }
}
stomp.subscribe(`/amq/queue/room.${chatRoomId}`, function (content) {
...

마치며

철이 없었죠... STOMP 문서를 보며 외부에 메세지 브로커를 둘 수 있다는 말을 듣고 거기에 꽂혀 RabbitMQ를 공부했다는 것이.

RabbitMQ를 깊게 공부한 것은 아니지만, 공부해보면서 또 기존의 채팅에 적용해보면서 많은 것을 배웠다. STOMP의 pub/sub 구조도 확실히 이해하고 RabbitMQ의 대강 동작하는 방법과 이점, 설정에 따른 역할들이 어떻게 되는지 조금은 알 수 있던 것 같다.

 

물론 다른 것들과 마찬가지로 문서가 전부 영어다 보니 읽고 해석하고 이해하는데 시간이 많이 들었고, 이를 다루는 블로그는 많지만 내가 찾던 내용은 별로 없어서 애좀 먹었다. 코드로 계속 부딪혀보다가 하나 되고 왜 되는거지 하고 디버깅해보고 주의깊게 보았던 것이 포스팅을 마칠 때 쯤 되니 주마등이 스쳐지나가듯 떠오른다.

 

근데 막상 공부해보니,, 채팅에 굳이 RabbitMQ를 적용할 필요가 있을까? 하는 생각이 계속 든다. 물론 이점은 있겠지만, 내가 처음에 예상했던 모델링과 다르게 나와버리니까 효율을 계속 따지게 되고 방법을 찾다가 현재로선 방법이 없는 것 같음을 깨달았을 때야 어쩔 수 없이 만족하게 되었다.

 

아 그래서 나는 위의 /amq/queue, /topic, /queue, /exchange 중에 /topic을 쓸 생각이다. 정말 싫어하는 모델링이 나오겠지만...우연히 다른 분 깃허브 코드를 보니 그분도 'topic'으로 구독을 관리하셔서 일단 나도 그걸로 할 생각이다.

 

위의 써놓은 코드는 결코 완성되지 않고 단지 동작만 테스트하고, Rabbit이 어떻게 돌아가는지 확인하기 위한 코드이므로, 계속 살을 붙여나가거나, 다음 포스팅에서 JPA로 채팅 내역을 영속화하고, 불러오는 것을 하지 않을까? 생각중이다.

 

----- (+ 추가) -----

채팅 서버를 구현하면서 외부의 메세지 브로커를 두어야 하는 이유를 찾았다.

 

1. 로드밸런싱 시, 채팅 서버는 스프링 프로젝트에 종속적이기 때문에 같은 채팅방에 있다 하더라도 같은 내용의 채팅을 볼 수 없다.

2. 동시접속자가 특정 범위를 초과하면(컨테이너가 수용할 수 있는 수를 초과하면) 다른 컨테이너의 서버로 켜지기 때문에 동일한 채팅을 볼 수 없다.

 

정도가 되겠다. 나머지 이점도 있겠지만, 우연히 글을 보며 문득 떠오른 생각을 얼른 적으러 왔다.

 

 #References 

https://brunch.co.kr/@springboot/298

https://m.blog.naver.com/writer0713/221615276956

https://medium.com/@rameez.s.shaikh/build-a-chat-application-using-spring-boot-websocket-rabbitmq-2b82c142f85a

https://dev-gorany.tistory.com/324

https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#websocket-stomp%20

https://skibis.tistory.com/308

https://coding-start.tistory.com/371?category=790331

https://velog.io/@hellozin/Spring-Boot%EC%99%80-RabbitMQ-%EC%B4%88%EA%B0%84%EB%8B%A8-%EC%84%A4%EB%AA%85%EC%84%9C

https://heowc.tistory.com/36

https://minholee93.tistory.com/entry/RabbitMQ-Jackson2JsonMessageConvertor

반응형
Comments