반응형
12-23 19:41
Today
Total
«   2024/12   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30 31
관리 메뉴

개발하는 고라니

[RabbitMQ] Tutorials (3) 본문

Open Source/RabbitMQ

[RabbitMQ] Tutorials (3)

조용한고라니 2021. 7. 28. 20:19
반응형

Publish / Subscribe

 

[RabbitMQ] Tutorials (2)

Work Queues - worker들에게 작업을 분배(경쟁 consumers 패턴) 저번 포스팅에서 우리는 이름붙인 큐로부터 메세지를 주고받고 하는 프로그램을 짰다. 이번 튜토리얼에서 우리는 시간이 많이 걸리는 작

dev-gorany.tistory.com

이전 튜토리얼에서 우리는 일할 Queue를 만들었다. Queue의 뒷배경이 되는 가정은 각 업무를 하나의 worker에게 정확히 전달해주는 것이다.

 

이번 튜토리얼에서 우리는 완벽히 다른 무언가를 할 것 이다. (우리는 하나의 메세지를 여러 Consumer들에게 넘겨줄 것이다.) 이 규칙(패턴)은 "publish / subscribe"로 알려져있다.

 

이 패턴을 설명하기 위해 우리는 간단한 로깅 시스템을 구축할 것이다. 2개의 프로그램으로 구성될 것이고, 하나는 log message들을 송출할 것이고, 나머지 하나는 그것들을 받아서 출력할 것이다.

 

 우리의 로그 시스템에서 받는 쪽 프로그램의 모든 실행 복사본은 메세지를 받는다. 그러면 하나의 받는 프로그램을 구동하고, 로그를 disk 저장소로 보낼 수 있을 것이다. 그리고 동시에 또다른 받는 프로그램을 실행해 화면에 로그를 출력할 수 있을 것이다.

 

근본적으로, 발송된 로그 메세지들은 모든 수신 프로그램에 송출되어질 것이다.

Exchanges

이전 튜토리얼의 부분 중, 우리는 Queue로 메세지를 보냈고, Queue로부터 메세지를 받았다. 이제 Rabbit의 전체 메세징 모델을 소개하도록 한다.

 

이전 튜토리얼에서 다뤘던 것을 빠르게 훑어보자.

  • Producer는 메세지를 전송하는 사용자 어플리케이션이다.
  • Queue는 메세지를 저장하는 버퍼이다.
  • Consumer는 메세지를 받는 사용자 어플리케이션이다.

RabbitMQ에서 메세징 모델의 핵심은, producer는 어떤 메세지든 절대 큐로 직접 보내질 않는다는 것이다. 사실, 매우 종종 producer는 메세지가 어떤 Queue로 전달될지 전혀 알지 못하는 경우가 많다.

 

대신에 producer는 '교환'을 위해서만 메세지를 보낼 수 있다. '교환'이란 매우 간단한 개념이다. 어느 한 쪽에서 producer들로부터 메세지를 받고, 다른 한 쪽에서는 그것들(메세지)을 Queue에 밀어넣는다. 교환해주는 녀석은 받은 메세지로 무엇을 해야하는지 정확히 알고있어야 한다. 특정 Queue에 추가해야하는가? 많은 Queue에 추가해야하는가? 그도 아니면 폐기되어야만 한다. 이 규칙은 교환기 타입에 의해 정의된다.

https://www.rabbitmq.com/tutorials/tutorial-three-java.html

몇몇 이용가능한 교환기 타입이 있다. (direct, topic, headers, fanout) 마지막 것을 보도록 하자. 이 타입으로 교환기를 만들고, 그것을 logs라 부르자.

 

channel.exchangeDeclare("logs", "fanout");
//Publish.java
public class Publish {

    private final static String QUEUE_NAME = "TASK_QUEUE";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");

        try(Connection conn = factory.newConnection();
            Channel channel = conn.createChannel()){

            //durable
            boolean durable = true;

            channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

            channel.exchangeDeclare("logs", "fanout");

            String msg = String.join(" ", args);

            channel.basicPublish("logs", "", null, msg.getBytes());
            System.out.println("[x] Sent : " + msg);

        }
    }
}

fanout 교환기는 매우 간단하다. 이름에서 유추할 수 있듯이, 그것이 받은 모든 메세지를 알고있는 모든 Queue에 송출한다. 그리고 그건 우리가 정확히 원하는 logger이다.

이제 우리는 이름붙인 교환기에 발송할 수 있다.

 

Temporary Queues (임시 큐)

이전에 사용했던 Queue를 ("hello", "task_queue") 기억하는가? Queue의 이름을 붙일 수 있다는 것은 우리에게 매우 중요했었다. 우리는 worker들에게 동일한 Queue를 연결해줘야했다. producer와 consumer 사이에 Queue를 공유해야 할 때, Queue에 이름을 주는 것은 중요하다.

 

그러나 우리의 logger 예제에선 그렇지 않다. 우리는 모든 로그 메세지를 듣길 원하지, 일부를 원하지 않는다. 또한 과거의 메세지보다 현재 진행되는 메세지에 관심이 있다. 이를 해결하기 위해 2가지가 필요하다.

 

첫째로, 우리가 Rabbit에 연결할 때마다, 싱싱하고 빈 Queue가 필요하다. 이를 위해 우리는 랜덤한 이름의 Queue를, 또는 더 나은 이름의 Queue를 만들 수 있다. (서버가 알아서 하도록 두자)

 

둘째, consumer와 연결을 끊으면 자동적으로 Queue가 삭제되어야 한다.

 

Java Client에서 queueDeclare()에 아무런 인자를 넣지 않을 때, 우리는 비영구적이고, 배타적이며 자동으로 제거되는 Queue를 이름과 함께 만든다.

String queueName = channel.queueDeclare().getQueue();

다음 링크에서 exclusive flag와 다른 Queue 속성에 대해 더 배울 수 있다.

https://www.rabbitmq.com/queues.html

 

queueName이 랜덤한 이름을 부여받을 때, 예를 들어 amq.gen-JzTY20BRgKO-HjmUJj0wLg처럼 보일 수 있다.

Bindings

이미 fanout 교환기를 만들었고, Queue도 만들었다. 이제 우린 교환기에게 우리 Queue에 메세지를 보내라고 말해야한다. 교환기와 Queue 사이의 관계를 binding이라 한다.

channel.queueBind(queueName, "logs", "");
//Publish.java
public class Publish {

    private final static String QUEUE_NAME = "TASK_QUEUE";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");

        try(Connection conn = factory.newConnection();
            Channel channel = conn.createChannel()){

            //durable
            boolean durable = true;

            String queueName = channel.queueDeclare().getQueue(); //random Queue

            channel.exchangeDeclare("logs", "fanout"); //exchange
            
            channel.queueBind(queueName, "logs", ""); //binding

            String msg = String.join(" ", args);

            channel.basicPublish("logs", "", null, msg.getBytes());
            System.out.println("[x] Sent : " + msg);

        }
    }
}

이제부터 log 교환기는 우리 Queue에 메세지를 추가해줄 것 이다.

Putting it all together

log메세지를 방출하는 producer 프로그램은 이전 예제와 별반 다를게 없어보인다. 가장 중요한 변화는, 이제 우리는 이름없는 Queue 대신에 우리의 log 교환기에 메세지를 발송하길 원한다. 우리는 발송할 때 routingKey를 공급해야한다. 그러나 fanout 교환기에서 그 값은 무시된다. 다음은 Publish.java의 코드이다.

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class Publish {

    private final static String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");

        try(Connection conn = factory.newConnection();
            Channel channel = conn.createChannel()){

            String queueName = channel.queueDeclare().getQueue(); //random Queue

            channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //exchange

            channel.queueBind(queueName, "logs", ""); //binding

            String msg = args.length < 1? "info: Hello World!" : String.join(" ", args);

            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
            System.out.println("[x] Sent : " + msg);
        }
    }
}

 

보다시피, 연결한 뒤 우리는 교환기를 선언했다. 존재하지 않는 교환기에 발송하는 것은 금지되어 있으므로 이 단계가 필요하다.

 

만약 아직 교환기에 바인딩된 Queue가 없으면 메세지가 손실되나, 괜찮다. 듣고있는 consumer가 없으면 우리는 메세지를 안전하게 제거할 수 있다. 다음은 Subscribe.java 의 코드이다.

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Subscribe {

    private final static String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception{

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();

        //exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //create queue
        String queueName = channel.queueDeclare().getQueue();
        //binding
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println("[*] Waiting for msgs. (Press CTRL + C to exit)");

        DeliverCallback callback = (consumerTag, delivery) -> {
            String msg = new String(delivery.getBody(), "UTF-8");
            System.out.println("[x] Received : " + msg);
        };

        channel.basicConsume(queueName, true, callback, consumerTag -> { });
    }
}

 

이전과 같이 컴파일하면 완료된다.

set CP=.\amqp-client-5.13.0.jar;.\slf4j-api-1.7.26.jar;.\slf4j-simple-1.7.26.jar

javac -cp %CP% rabbitMQRecvTest\src\main\java\Subscribe.java rabbitMQTest\src\main\java\Publish.java

//receiver
java -cp %CP% rabbitMQRecvTest\src\main\java\Subscribe.java

//sender
java -cp %CP% rabbitMQTest\src\main\java\Publish.java [message]

 

만약 log를 파일로 저장하고 싶음, 콘솔을 열고 타이핑 쳐라

java -cp %CP% rabbitMQRecvTest\src\main\java\Subscribe.java > logs_from_rabbit.log

Result

Publish.java를 실행하고, args에 메세지를 치면 위와 같이 Subscribe.java에게 동일한 메세지가 전송이 된다.

반응형

'Open Source > RabbitMQ' 카테고리의 다른 글

[RabbitMQ] Tutorials (5)  (0) 2021.07.29
[RabbitMQ] Tutorials (4)  (0) 2021.07.29
[RabbitMQ] Tutorials (2)  (0) 2021.07.23
[RabbitMQ] Tutorials (1)  (0) 2021.07.20
[RabbitMQ] install  (0) 2021.07.11
Comments