티스토리 뷰

1. STOMP 개요

1.1.  STOMP란?

STOMP(Simple/Streaming Text Oriented Messaging Protocol) 텍스트 기반의 메세지 프로토콜이다.

STOMP는 클라이언트와 서버 간 전송할 메시지의 유형, 형식, 내용들을 정의한 규칙(FRAME)으로 TCP 또는 WebSocket과 같은 양방향 네트워크 프로토콜 기반으로 동작한다.

HTTP와 같은 프로토콜에서 사용되는 request-response 패턴과는 다르게, STOMP는 브로커와 연결된 클라이언트 간에 메시지를 교환하는 방식으로 동작한다.

STOMP는 기본적으로 Publish-Subscribe 구조로 되어있으며, 이 구조는 메시지를 공급하는 주체와 소비하는 주체를 분리해 제공하는 메시징 프로토콜이다.

1.2. STOMP 사용 이점

  • 메시지의 형식(Frame)을 정의할 수 있어, 클라이언트와 서버 간의 통신에서 일관성을 유지할 수 있다.
  • pub/sub 구조로 되어있어 메시지를 전송하고 메시지를 받아 처리하는 부분이 확실히 정해져 있기 때문에 개발자 입장에서 명확하게 인지하고 개발할 수 있다.
  • 메시지 브로커를 사용할 수 있어, 메시지 전송 시 중계 역할을 하는 서버를 따로 두어 메시지 전송의 안정성과 확장성을 높일 수 있다.
  • WebSocket 기반으로 각 Connection(연결)마다 WebSocketHandler를 구현하는 것 보다 @Controller 된 객체를 이용해 조직적으로 관리할 수 있다.
    • 메세지는 STOMP의 "destination" 헤더를 기반으로 @Controller 객체의 @MethodMapping 메서드로 라우팅 된다.
  • STOMP의 "destination" 및 Message Type을 기반으로 메세지를 보호하기 위해 Spring Security를 사용할 수 있다.

1.3. STOMP Frame 구조

COMMAND
header1 : value1
header2 : value2
 
Body^@
  • COMMAND : 메세지의 타입을 나타내는 문자열이다. SEND, SUBSCRIBE, UNSUBSCRIBE 등이 있다.
  • header1, header2 : 추가 정보를 제공하는 헤더이다.
    • destination : 이 헤더로 메세지를 보내거나(SEND), 구독(SUBSCRIBE)할 수 있다.
  • Body : 메세지의 내용이다.
  • ^@ : NULL 문자이다. Body의 끝을 나타낸다.

클라이언트는 메세지를 전송하기 위해 SEND,SUBSCRIBE COMMAND를 사용할 수 있다.

또한, SEND,SUBSCRIBE COMMAND 요청 Frame에는 메세지가 무엇이고, 누가 받아서 처리할지에 대한 Header 정보가 포함되어 있다.

이런 명령어들은 "Destination" 헤더를 요구하는데 이것이 어디에 전송할지, 혹은 어디에서 메세지를 구독할 것 인지를 나타낸다.

1.4. STOMP Destination 종류

1.4.1.  /exchange (Multicast)

  • SUBSCRIBE 프레임에서는 /exchange/<name>[/<pattern>] 형식의 목적지를 사용할 수 있다.
    • <pattern>이 제공된 경우, 큐를 <name> Exchange에 <pattern>으로 바인딩하고 큐 구독을 등록한다.
  • SEND 프레임에서는 /exchange/<name>[/<routing-key>]의 형식을 사용한다.
    • • <routing-key>를 사용하여 Exchange <name>로 전송한다.
  • 각 구독자마다 새 큐가 생성되어 지정된 Exchange에 라우팅 키를 사용하여 바인딩된다. 기존 큐를 사용하려면 /amq/queue 목적지를 사용해야 한다.

1.4.2. /topic (Broadcast)

  • /topic은 STOMP 클라이언트에서 가장 일반적으로 사용되는 목적지 유형이며, /topic/<name> 형식이다. 이것은 구독자 패턴에 대한 토픽 매칭을 수행하며, 메시지를 여러 구독자에게 라우팅할 수 있다.

1.4.3. /queue (Unicast)

  • /queue는 간단한 대기열을 처리하기 위해 사용된다. /queue/<name> 형식을 사용하면 좋다. 큐 목적지는 각 메시지를 최대 한 명의 구독자에게 전달하며, 구독자가 없으면 구독자가 큐에 연결할 때까지 큐에 대기한다
    • SUBSCRIBE 프레임에서는 <name>의 공유 큐를 생성하고 현재 STOMP 세션에 대해 <name>의 공유 큐를 구독한다.

2.  실습 준비

2.1.  Rabbit MQ 사전 설치

2.1.1.  Docker Image Pull

docker pull rabbitmq:3.11.10-management

2.1.2.  STOMP 플러그인 설치 및 실행

rabbitmq-plugins enable rabbitmq_stomp
  • Rabbit MQ Docker 실행시 61613포트(STOMP 기본 포트)도 열어준다
docker run -d -p 15672:15672 -p 5672:5672 -p 61613:61613 --name rabbitmq rabbitmq:3.11.10-management

2.1.3.  Spring Boot 프로젝트 구성

  • Spring Boot 구성시 아래 의존성을 추가해준다.
implementation("org.springframework.boot:spring-boot-starter-amqp")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-reactor-netty")
implementation("org.springframework.boot:spring-boot-starter-websocket")
implementation("org.springframework.boot:spring-boot-starter-thymeleaf")

2.1.4.  Stomp MessageBroker 설정

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    /**
     * 클라이언트에서 websocket에 접속하는 endpoint를 등록한다.
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")
            .setAllowedOrigins("*");
    }
 
    /**
     * Stomp사용을 위한 Message Broker 설정을 해주는 메소드
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/pub") // 메세지를 보낼(publish) 경로를 설정
            .setUserDestinationPrefix("/users")  // 특정 사용자에게 메시지 전송시 사용할 주소
            .enableStompBrokerRelay("/queue", "/topic", "/exchange") // 메세지 수신(Subscribe), 경로를 설정해주는 메서드
            .setRelayHost("localhost")
            .setVirtualHost("/")
            .setRelayPort(61613) // RabbitMQ STOMP 기본 포트
            .setSystemLogin("guest")
            .setSystemPasscode("guest")
            .setClientLogin("guest")
            .setClientPasscode("guest");
    }
}
  • registerStompEndpoints - 클라이언트에서 websocket에 접속하는 endpoint를 등록하는 메서드
  • configureMessageBroker - Stomp 사용을 위한 Message Broker 설정을 해주는 메소드
    • setApplicationDestinationPrefixes
      • 메세지를 보낼 때, 관련 경로를 설정해주는 함수이다.
      • 클라이언트가 메세지를 보낼 떄, 경로 앞에 “/pub”이 붙어있으면 Broker로 보내진다.
    • enableStompBrokerRelay
      • STOMP 브로커 릴레이를 사용 설정하고 메시지 브로커에서 지원하는 대상 접두사를 구성
    • setUserDestinationPrefix
      • 특정 사용자에게 메시지 전송시 사용할 주소

3. 실습

3.1.  Client 구독1 - Topic

3.1.1.  Topic?

  • client에서 /topic 구독시 rabbitmq의 기본 내장 exchange인 amq.topic(Topic Exchange)를 사용하여 구독한다.
  • Topic Exchange는 메시지를 여러 개의 Queue로 라우팅하기 위한 유연한 방법을 제공한다.

3.1.2. Controller (View)

@Controller
public class ViewController {
    @GetMapping("/topic-view")
    public String topicView() {
        return "topic/index";
    }
}

3.1.3. Controller (Publish Message RESTAPI)

@Controller
@RequestMapping("/stomp")
@RequiredArgsConstructor
public class StompController {
 
    // 간단한 메시징 프로토콜(예: STOMP)에 사용하기 위한 메서드가 포함된 MessageSendingOperations의 구현체
    private final SimpMessageSendingOperations simpMessageSendingOperations;
 
    /**
     * /topic/wiki로 데이터를 publish한다.
     */
    @GetMapping("/topic")
    @ResponseBody
    public String publishTopicMessage() {
        Map<String, String> data = new HashMap<>();
        data.put("key1", "value1");
        data.put("key2", "value2");
 
        simpMessageSendingOperations.convertAndSend("/topic/wiki", data);
 
        return "OK";
    }
}

3.1.4.  View 페이지

<!doctype html>
<html lang="ko">
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Agent</title>
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0-alpha1/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-GLhlTQ8iRABdZLl6O3oVMWSktQOp6b7In1Zl3/Jr59b6EGGoI1aFkw7cmDA6j6gD" crossorigin="anonymous">
    <script src="https://code.jquery.com/jquery-2.2.4.min.js" integrity="sha256-BbhdlvQf/xTY9gja0Dq3HiwQF8LaCRTXxZKRutelT44=" crossorigin="anonymous"></script>
    <script src="https://unpkg.com/axios/dist/axios.min.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/@stomp/stompjs@7.0.0/bundles/stomp.umd.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.4.0/sockjs.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.js"></script>
 
    <script type="text/javascript">
        // registerStompEndpoints에서 설정한 endpoint 주소를 입력한다 (socket 연결)
        const stompClient = Stomp.client('ws://localhost:8080/ws');
        stompClient.connect({}, stompConnectHandler, stompErrorHandler);
 
        // STOMP Socket 접속 성공시 실행되는 콜백 함수
        function stompConnectHandler() {
            console.log('connected!');
 
            // STOMP Socket 접속 성공시 '/topic/wiki' 메시지 구독을 한다.
            stompClient.subscribe('/topic/wiki', (data) => {
                // /topic/wiki로 메시지가 유입되면 해당 메시지를 가져와 실행되는 콜백 함수
                console.log('topic wiki subscribe data - ', JSON.parse(data.body));
                // 받은 데이터를 body에 출력한다.
                $('body').append(data.body + '<br/>');
            })
        }
 
        function stompErrorHandler(e) {
            console.error('stomp connect error - ', e);
        }
    </script>
</head>
 
<body>
 
</body>
</html>

3.1.5.  Client Socket 접속 및 구독 성공시 콘솔 출력

3.1.6.  Client구독시 구독 Queue 자동 생성

  • amq.topic exchange를 사용하여 Routing Key가 wiki로 바인딩 된다. (/topic/wiki)

3.1.7.  Server에서 메시지 publish 후 구독 메시지 출력

3.1.8.  메시지 전송 흐름도

3.2.  Client 구독1 - Queue

  • /queue는 특정 사용자에게 메시지를 전송할수 있게 해준다.
  • 특정 사용자를 구분하기 위해서 사용자 정보를 설정해줘야한다.

3.2.1.  Stomp Connect시 유저 역할 저장

  • WebSocketConfig내 configureClientInboundChannel추가
@Configuration
@EnableWebSocketMessageBroker
@Slf4j
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
        /**
     * 각 유저에 대한 세션키(socket id)를 저장할 용도의 map
     */
    private static final Map<String, String> sessionKeys = new HashMap<>();
    /**
     * RabbitMQ Datasource
     */
    private final RabbitTemplate rabbitTemplate;
 
        @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        // 메시지채널에서 주고받는 메시지를 확인 및/또는 수정할 수 있는 인터셉터 추가
        registration.interceptors(new ChannelInterceptor() {
            // 메시지가 실제로 채널로 전송되기 전에 호출됨
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                StompHeaderAccessor headerAccessor =
                    MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
 
                // 헤더에서 유저명을 가져온다.
                List<String> usernames = Optional
                    .ofNullable(headerAccessor.getNativeHeader("username"))
                    .orElseGet(Collections::emptyList);
 
                // 처음 접속 시도시 유저 데이터를 심어준다.
                if (StompCommand.CONNECT.equals(headerAccessor.getCommand()) && !usernames.isEmpty()) {
                    headerAccessor.setUser(new SimpleUsernamePrincipal(usernames.get(0)));
                    sessionKeys.put(usernames.get(0), headerAccessor.getSessionId());
                }
 
                // 사용자 접속 해제시 사용자 큐를 삭제한다.
                if (StompCommand.DISCONNECT.equals(headerAccessor.getCommand())) {
                    String sessionKey = sessionKeys.get(headerAccessor.getUser().getName());
                                        // message-user는 생성되는 큐의 접두사
                                        // /users/queue/message의 message를 가져와 -user를 붙여만든다.
                    new RabbitAdmin(rabbitTemplate).deleteQueue("message-user" + sessionKey);
                    sessionKeys.remove(sessionKey);
                }
 
                return message;
            }
        });
    }
 
...
}

3.2.2. 클라이언트 페이지 개발

<!doctype html>
<html lang="ko">
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Agent</title>
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0-alpha1/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-GLhlTQ8iRABdZLl6O3oVMWSktQOp6b7In1Zl3/Jr59b6EGGoI1aFkw7cmDA6j6gD" crossorigin="anonymous">
    <script src="https://code.jquery.com/jquery-2.2.4.min.js" integrity="sha256-BbhdlvQf/xTY9gja0Dq3HiwQF8LaCRTXxZKRutelT44=" crossorigin="anonymous"></script>
    <script src="https://unpkg.com/axios/dist/axios.min.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/@stomp/stompjs@7.0.0/bundles/stomp.umd.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.4.0/sockjs.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.js"></script>
 
    <script type="text/javascript">
        const username = prompt('유저명 입력: ');
        // registerStompEndpoints에서 설정한 endpoint 주소를 입력한다 (socket 연결)
        const stompClient = Stomp.client('ws://localhost:8080/ws');
        // 접속시 헤더에 유저명을 같이 넘겨준다.
        stompClient.connect({username: username}, stompConnectHandler, stompErrorHandler);
 
        // STOMP Socket 접속 성공시 실행되는 콜백 함수
        function stompConnectHandler() {
            console.log('connected!');
 
            // setUserDestinationPrefix에서 설정한 prefix를 적어준다. (/users)
            stompClient.subscribe('/users/queue/message', (data) => {
                const body = JSON.parse(data.body);
                $('body').append(`전송자: ${body.sender}, 메시지: ${body.message}<br/>`);
            });
        }
 
        function stompErrorHandler(e) {
            console.error('stomp connect error - ', e);
        }
 
        $(function () {
            // 특정 유저에게 메시지 전송 버튼 클릭시
            $('#sendBtn').click(function () {
                // 상대방 유저 명
                const targetUsername = $('#targetUsername').val();
                // 메시지
                const message = $('#message').val();
 
                // setApplicationDestinationPrefixes에서 설정한 접두어를 사용한다.
                // 메시지 발행
                stompClient.send('/pub/send/message', {}, JSON.stringify({
                    targetUsername,
                    message,
                    sender: username,
                }));
            });
        });
    </script>
</head>
 
<body>
    보낼 유저명: <input type="text" id="targetUsername"/>
    보낼 메시지: <textarea id="message"></textarea>
    <button id="sendBtn">전송</button>
    <br/>
</body>
</html>

3.2.3. /users/queue/message 구독시 사용자 각자의 큐 생성

  • {{url suffix}}/user-{{socket id}} 형식으로 사용자마다 큐가 생성된다.

3.2.4. 메시지 발행 컨트롤러 개발

@Controller
@RequestMapping("/stomp")
@RequiredArgsConstructor
@Slf4j
public class StompController {
     /**
     * /pub/send/message주소로 받은 메시지를 처리한다.
     * @param params
     */
    @MessageMapping("/send/message")
    public void pubSendMessage(Map<String, String> params) {
        String targetUsername = params.get("targetUsername");
        String message = params.get("message");
        String sender = params.get("sender");
 
        // 전송자와 메시지 내용을 데이터로 보낸다.
        Map<String, String> data = new HashMap<>();
        data.put("message", message);
        data.put("sender", sender);
 
        // convertAndSendToUser로 특정 유저의 큐에 데이터를 넣어준다.
        this.simpMessageSendingOperations.convertAndSendToUser(
            targetUsername, "/queue/message",  data
        );
    }
}

3.2.5.  실행 결과

  • 전송자

  • 수신자

3.2.6.  흐름


참고

https://velog.io/@hoyun7443/WebSocket의-Stomp

https://zamezzz.tistory.com/319

https://dev-gorany.tistory.com/325#comment12946227

https://velog.io/@jkijki12/STOMP-Spring-Boot#configuremessagebroker

'프레임워크 > 스프링 & 스프링 부트' 카테고리의 다른 글

Spring Boot Admin  (1) 2019.12.30
Spring boot actuator  (0) 2019.12.30
Spring Boot - ORM(Object-relational mapping)  (0) 2019.08.10
Spring Boot - MVC  (0) 2019.08.10
Spring Boot - @SpringBootApplication  (0) 2019.08.10
댓글