반응형
01-23 02:53
Today
Total
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
관리 메뉴

개발하는 고라니

[RabbitMQ] Tutorials (5) 본문

Open Source/RabbitMQ

[RabbitMQ] Tutorials (5)

조용한고라니 2021. 7. 29. 03:48
반응형

Topics

패턴(주제)에 기반한 메세지만 받아보기

 

이전 튜토리얼에서 logging System을 개선했다.

* 요약

 

- before : 하나의 producer -> 하나의 exchange (fanout) -> 여러개의 Queue -> 각 Queue에 모든 메세지 push -> 모든 consumer 동일한 메세지 수신

 

- after : 하나의 producer -> 하나의 exchange (direct) -> 여러개의 Queue -> 각 Queue마다 원하는 severity의 메세지 push -> 각 consumer가 받아보는 메세지가 다를 수 있음

 

'fanout' 타입의 교환자는 오직 가짜 방송을 할 수 있는 대신, 우리는 'direct' 타입의 교환자를 사용했고, 선택적으로 log를 수신하는 가능성을 얻었다.

 

비록 'direct' 타입 교환자는 우리 시스템을 개선시켰지만, 여전히 한계는 존재한다. (여러 기준에 따라 라우팅을 할 수 없다.)

 

우리의 logging System에서 우리는 심각도에 따른 것 뿐만 아니라, 방출되는 log의 source에 따라 구독하고 싶을지도 모른다. 당신은 이 개념을 Unix tool에서 심각도(info/warn/crit...)와 기능(auth/cron/kern...)을 기반으로 로그를 라우팅하는  "syslog"로부터 알고있을지도 모른다.

 

이는 우리에게 많은 유연성을 제공한다. 우리는 'cron'으로부터 오는 심각한 에러메세지 뿐 아니라 'kern'으로부터의 모든 log도 받아보고싶을 수 있다.

 

우리의 logging System에 구현하기 위해 우리는 더 복잡한 topic 타입 교환자에 대해 배울 필요가 있다.

Topic Exchange

'topic' 타입 교환자로 보내진 메세지들은 임의의 routing key를 가질 수 없다. 그것은 점(.)으로 구분된 단어들의 목록이어야만 한다. 단어들은 어떤 것도 될 수 있지만, 보통 메세지에 연결된 일부 기능을 지정한다. 몇몇 검증된 routing key 예로, "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit". 등이 있다. 당신이 원하는 만큼, 최대 255byte 까지 이룰 수 있다.

 

binding key 또한 같은 형식이어야 한다. 'topic' 교환자 뒤의 로직은 'direct' 교환자와 유사하다. 특정 routing key와 보내진 메세지는 동일한 binding key로 매칭된 모든 Queue로 전달되는 것. 그러나 binding key에 있어 두 가지의 특히 중요한 경우가 있다.

  • * (star)는 한 단어를 대체할 수 있다.
  • # (hash)는 0개 이상의 단어들을 대체할 수 있다.

다음 예시를 보면 쉽다.

예시에서, 동물을 묘사한 메세지들을 보낼 것 이다. 메세지들은 2개의 점과 3개의 단어로 구성되어있는 routing key와 함께 보내질 것이다. routing key의 첫번째 단어는 속도를, 두번째는 색깔을, 세번째는 종을 묘사한다. (speed, color, species)

 

우리는 3개의 바인딩을 만든다.

Q1 : *.orange.* 라는 바인딩 키

Q2 : *.*.rabbit 라는 바인딩 키

Q3 : laze.# 라는 바인딩 키

이 바인딩들은 다음과 같이 요약할 수 있다.

 

  • Q1은 오랜지색 모든 동물에 관심있다.
  • Q2는 토끼에 대한 모든 것을 듣고싶어한다.
  • Q3는 게으른 동물의 모든 것을 듣고싶어한다.

(quick.orange.rabbit) 이라는 routing key를 갖는 메세지는 두 개의 Queue에 전달될 것이다.

(lazy.orange.elephant) 메세지 역시 둘 다에게 전달될 것이다.

반면에 (quick.orange.fox)는 오직 첫번째 Queue에만 전달될 것이다.

(lazy.brown.fox)는 오직 두번째 Queue에만 간다.

(lazy.pink.rabbit)은 2개의 바인딩에 매칭되지만, 두번째 Queue에 한번만 전달된다.

(quick.brown.fox)는 아무대도 바인딩 매칭 안되므로 버려진다.

 

만약 우리의 계약을 깨고 (orange)나 (quick.orange.male.rabbit) 처럼 1개 또는 4개의 단어들로 메세지를 보내면 어떻게 될까? 음.. 이 메세지들은 바인딩이 매칭되지 않을 것 이므로 버려질 것이다.

 

반면에 (lazy.orange.male.rabbit)은 비록 4개의 단어를 가졌더라도 마지막 binding에 매칭되므로 두번째 Queue에 전달될 것이다.

Putting it all together

우리는 logging system에 'topic' 교환자를 사용할 것이다. 우리는 log의 routing key는 두 단어를 가진다는 전제 하에 시작할 것이다. (<기능>, <심각도>)

 

코드는 대체로 이전 튜토리얼과 동일하다.

//TopicConsumer.java
import com.rabbitmq.client.*;

//Tutorials (5)
public class TopicConsumer {

    private final static String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception{

        //1. ConnectionFactory
        ConnectionFactory factory = new ConnectionFactory();
        //2. setting Host
        factory.setHost("localhost");
        //3. get Connection
        Connection conn = factory.newConnection();
        //4. get Channel
        Channel channel = conn.createChannel();

        //validation
        if(args.length < 1){
            System.err.println("It must take at least one Binding Key");
            System.exit(0);
        }

        //exchange -> direct
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //create Queue
        String queueName = channel.queueDeclare().getQueue();
        //binding
        for(String bindingKey:args)
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback callback = (consumerTag, delivery) -> {
            String msg = new String(delivery.getBody(), "UTF-8");
            System.out.println("[x] Recevied = " + delivery.getEnvelope().getRoutingKey() + "':'" + msg + "'");
        };

        boolean acknowledge = true;
        channel.basicConsume(queueName,acknowledge, callback, consumerTag -> { });
    }
}
//TopicProducer.java
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

//Tutorials (5)
public class TopicProducer {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception{

        //1. create Factory
        ConnectionFactory factory = new ConnectionFactory();
        //2 set Host
        factory.setHost("localhost");
        try(//3. get Connection
            Connection connection = factory.newConnection();
            //4. get Channel
            Channel channel = connection.createChannel();) {

            //Exchange Declare
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

            String routingKey = getRoutingKey(args);
            String message = getMessage(args);

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));

            System.out.println("[x] Sent = [" + routingKey + "] : " + message);
        }
    }

    private static String getRoutingKey(String[] strings) {
        if (strings.length < 1)
            return "anonymous.info";
        return strings[0];
    }

    private static String getMessage(String[] args){

        return args.length < 2? "Hello World" : joinStrings(args, " ", 1);
    }

    private static String joinStrings(String[] strings, String delimiter, int startIndex) {
        int length = strings.length;
        if (length == 0) return "";
        if (length <= startIndex) return "";
        StringBuilder words = new StringBuilder(strings[startIndex]);
        for (int i = startIndex + 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

Result

command 창 5개를 띄워놓았다.

1. rabbitMQ server

2. topicProducer

3. topicConsumer (#)

4. topicConsumer (kern.*)

5. topicConsumer (kern.#)

log를 emit할 때 패턴을 정하고 보냈는데, 해당 topic으로 바인딩된 큐에만 들어가는 것을 확인하였다.

반응형

'Open Source > RabbitMQ' 카테고리의 다른 글

[RabbitMQ] Spring Boot + RabbitMQ (basic)  (0) 2021.07.29
[RabbitMQ] Tutorials (6)  (0) 2021.07.29
[RabbitMQ] Tutorials (4)  (0) 2021.07.29
[RabbitMQ] Tutorials (3)  (0) 2021.07.28
[RabbitMQ] Tutorials (2)  (0) 2021.07.23
Comments