반응형
01-11 11:16
Today
Total
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
관리 메뉴

개발하는 고라니

[RabbitMQ] Tutorials (4) 본문

Open Source/RabbitMQ

[RabbitMQ] Tutorials (4)

조용한고라니 2021. 7. 29. 03:13
반응형

Routing

 

[RabbitMQ] Tutorials (3)

Publish / Subscribe [RabbitMQ] Tutorials (2) Work Queues - worker들에게 작업을 분배(경쟁 consumers 패턴) 저번 포스팅에서 우리는 이름붙인 큐로부터 메세지를 주고받고 하는 프로그램을 짰다. 이번 튜토리..

dev-gorany.tistory.com

이전 튜토리얼에서 간단한 logging System을 만들어보았다. 많은 수신자들에게 로그 메세지를 송출할 수 있었다.

 

이번 튜토리얼에서는 특징을 추가할 것 이다. (메세지들 중 일부만 subscribe하는 것이 가능하게 할 것이다.) 예를 들어, 콘솔에서 모든 로그 메세지를 계속 출력하는 동안, 디스크 공간을 절약하기 위해 중요한 오류 메세지만 로그 파일로 보낼 수 있을 것이다.

Bindings

이전 예제에서 이미 바인딩을 만들었다, 그 코드를 다음과 같이 가져와도 된다.

channel.queueBind(queueName, EXCHANGE_NAME, "");

binding은 교환자와 Queue 사이의 관계라고 했다. 다시 말해, Queue는 교환자로부터의 메세지에 관심이 있다.

 

binding은 추가 routingKey 파라미터를 취할 수 있다. basic_publish 파라미터와의 혼란을 피하기 위해 우리는 이를 binding key라고 부를 것 이다. 다음은 어떻게 key와 바인딩할 수 있는가 이다.

channel.queueBind(queueName, EXCHANGE_NAME, "black");

이 binding key(여기서 black)의 의미는 교환자 타입에 따라 다르다. 이전에 사용했던 fanout 타입 교환자는 이 값을 간단히 무시해버린다.

Direct exchange

우리의 이전 튜토리얼 logging system은 모든 consumer에게 모든 메세지를 송출한다. 이번엔 심각도에 따라 메세지를 필터링할 수 있도록 확장하고자 한다. 예를 들어, 심각한 에러만 수신하고, 경고나 정보 로그 메세지를 디스크에 기록하는 프로그램을 원할 수 있다.

 

우린 우리에게 유연성을 제공하지 않는 fanout 타입 교환자를 이용해왔다. 이는 단지 무심코 송출만 할 수 있을 뿐이다.

 

이를 대신해 direct 타입의 교환자를 사용할 것이다. direct 타입 교환자가 갖는 라우팅 알고리즘은 간단하다.

-메세지의 routing key는 binding key가 정확히 일치하는 Queue로 간다. 도식화 한다면 다음과 같다.

이 설정에서, direct 타입 교환자 X를 볼 수 있고, 2개의 Queue가 그에 바인딩된 것도 확인할 수 있다. 첫번쨰 Queue는 binding key = orange로 바인딩됬고, 두번째는 black과 green 으로 바인딩 된 것을 알 수 있다.

 

이와 같은 설정에서, orange 라는 routing key를 갖는 교환자에 발송된 메세지는 Q1 (Queue)에 라우트될 것이다. routing key black또는 green을 갖는 메세지는 Q2로 갈 것 이다. 이를 제외한 모든 메세지는 버려진다.

 

(즉, 라우팅 키가 orange면 Q1으로 가고, 라우팅 키가 black이나 green이면 Q2로 간다. 이는 Q1와 교환자가 바인딩 키 orange로 바인드 되어있고, Q2와 교환자가 black/green으로 바인드 되어있기 때문이다.)

Multiple bindings

동일한 binding key(바인딩 키)로 여러 Queue에 바인딩 하는 것은 완벽하게 합법이다. 우리 예제에서 우리는 교환자 X와 Q1 Queue 사이에 바인딩 키 black으로 바인딩을 추가할 수 있다. 이 경우에, direct 교환자는 fanout처럼 행동할 것이고, 모든 매칭된 Queue들에게 메세지를 송출할 것 이다. routing key가 black인 메세지는 Q1과 Q2에 모두 전달될 것 이다.

Emitting logs

이 모델을 우리의 logging system에 적용할 것 이다. fanout 대신에 우리는 메세지를 direct 타입의 교환자에 보낼 것 이다. log의 심각도를 routing key로 사용할 것 이다. 이 방법에서 수신하는 프로그램(consumer)는 받기 원하는 심각도를 설정할 수 있을 것이다. 먼저 log를 방출하는 거에 집중하자.

 

늘 그랬듯이, 우리는 교환자를 먼저 만들어야한다.

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

이어서 메세지를 보낼 준비를 한다.

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

간단한 예제를 위해 심각도는 info / warning / error만 있다고 가정하자.

Subscribing

메세지를 받는 것은 한가지 제외하고 이전 튜토리얼 처럼 작동할 것이다. 바로 우리가 관심있는 심각도에만 새 바인딩을 만드는 것이다.

String queueName = channel.queueDeclare().getQueue();

for(String severity : args) {
    channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
//console에서 실행할 때 인자로 info / warning / error 중 하나를 넣는다.

Putting it all together

import com.rabbitmq.client.*;

//Tutorials (4)
public class SubscribeSeverity {

    private final static String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception{

        //1. ConnectionFactory
        ConnectionFactory factory = new ConnectionFactory();
        //2. setting Host
        factory.setHost("localhost");
        //3. get Connection
        Connection conn = factory.newConnection();
        //4. get Channel
        Channel channel = conn.createChannel();

        //validation
        if(args.length < 1){
            System.err.println("It must take at least one Severity[info/warning/error]");
            System.exit(0);
        }

        //exchange -> direct
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //create Queue
        String queueName = channel.queueDeclare().getQueue();
        //binding
        for(String severity:args)
            channel.queueBind(queueName, EXCHANGE_NAME, severity);

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback callback = (consumerTag, delivery) -> {
            String msg = new String(delivery.getBody(), "UTF-8");
            System.out.println("[x] Recevied = " + delivery.getEnvelope().getRoutingKey() + "':'" + msg + "'");
        };

        boolean acknowledge = true;
        channel.basicConsume(queueName,acknowledge, callback, consumerTag -> { });
    }
}
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

//Tutorials (4)
public class PublishSeverity{

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception{

        //1. create Factory
        ConnectionFactory factory = new ConnectionFactory();
        //2 set Host
        factory.setHost("localhost");
        try(//3. get Connection
            Connection connection = factory.newConnection();
            //4. get Channel
            Channel channel = connection.createChannel();) {

            //Exchange Declare
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

            String severity = getSeverity(args);
            String message = getMessage(args);

            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes(StandardCharsets.UTF_8));

            System.out.println("[x] Sent = [" + severity + "] : " + message);
        }
    }

    private static String getSeverity(String[] args){

        return args.length < 1? "info" : args[0];
    }

    private static String getMessage(String[] args){

        return args.length < 2? "Hello World" : joinStrings(args, " ", 1);
    }

    private static String joinStrings(String[] strings, String delimiter, int startIndex) {
        int length = strings.length;
        if (length == 0) return "";
        if (length <= startIndex) return "";
        StringBuilder words = new StringBuilder(strings[startIndex]);
        for (int i = startIndex + 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

 

Result

SubscribeSeverity를 구동시킬 때, info warning error를 준 것과 error만 준 것을 각각 구동시켰다. 그 후 PublishSeverity에서 message를 보내며, severity를 info/warning/error 중 하나를 동봉해서 보냈다. error를 보내면 두 consumer에서 모두 출력되지만, 그 외의 것을 보내면 하나의 consumer에서만 출력됨을 확인하였다.

(Subscribe -> 특정한 것만 구독 -> 정해진 프로토콜(?) 혹은 패턴(?)에 해당하는 메세지만 받아볼 수 있음)

 

이제 다음 튜토리얼에서는 패턴에 기반한 메세지를 어떻게 듣는지 찾아보도록 하자.

반응형

'Open Source > RabbitMQ' 카테고리의 다른 글

[RabbitMQ] Tutorials (6)  (0) 2021.07.29
[RabbitMQ] Tutorials (5)  (0) 2021.07.29
[RabbitMQ] Tutorials (3)  (0) 2021.07.28
[RabbitMQ] Tutorials (2)  (0) 2021.07.23
[RabbitMQ] Tutorials (1)  (0) 2021.07.20
Comments