반응형
01-23 02:53
Today
Total
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
관리 메뉴

개발하는 고라니

[RabbitMQ] Tutorials (1) 본문

Open Source/RabbitMQ

[RabbitMQ] Tutorials (1)

조용한고라니 2021. 7. 20. 03:24
반응형
 

[RabbitMQ] install

RabbitMQ는 오픈소스 메세지 브로커이다. 메세지를 많은 사용자에게 전달해야하거나, 처리 시간이 긴 문제가 있을 경우, 빠른 응답을 만들기 위해 사용한다. Downloading and Installing RabbitMQ — RabbitMQ Do

dev-gorany.tistory.com

RabbitMQ는 메세지 브로커(Message Broker)다. 이는 메세지들을 받아주고 전달한다. 여러분은 RabbitMQ를 우체국으로 생각할 수 있다. 당신이 전달하고 싶은 편지를 우체통에 넣을 때, 우체부가 결국 당신 편지의 수취인에게 전달해줄 것을 확신할 수 있다. 이 비유에서, RabbitMQ는 우체통이고, 우체국이자 우체부이다.

 

RabbiqMQ와 우체국 간의 주요한 차이점은 종이를 다루지 않는다는 대신에 메세지의 이진 blobs 데이터를 받아주고, 저장하고, 전달한다.

 

RabbitMQ는 및 일반적인 메세징은 부분적으로 전문 용어를 사용한다.

 

  • Producing은 전송 그 이상의 의미를 갖지 않는다. 메세지를 보내는 프로그램은 '생산자'이다.

출처: https://www.rabbitmq.com/tutorials/tutorial-one-java.html

  • Queue는 RabbitMQ안에 살고있는 우체통이다. 비록 메세지가 RabbitMQ와 당신의 어플리케이션을 통해 흐른다 해도, 그것들은 Queue에만 저장될 수 있다. Queue는 host의 메모리 및 디스크에 긴밀하게 관련되어있다. Queue는 근본적으로 커다란 메세지 버퍼(Message Buffer)이다. 많은 생산자들은 하나의 큐에 가도록 메세지를 전송할 수 있고, 많은 소비자들은 하나의 큐로부터 데이터를 받도록 시도할 수 있다. 다음은 RabbitMQ가 Queue를 나타낸 것이다.

출처: https://www.rabbitmq.com/tutorials/tutorial-one-java.html

  • Consuming은 받는 것(수신)을 의미하는 것과 유사하다. 소비자는 대게 메세지를 받으려고 대기하는 프로그램이다.

출처: https://www.rabbitmq.com/tutorials/tutorial-one-java.html

Producer, Consumer 그리고 브로커는 같은 host에 존재해야만 할 필요는 없다는 것을 기억하라. 실제로 대부분의 응용프로그램에서 그렇지 않다. 하나의 어플리케이션은 소비자 및 생산자를 다 할 수 있다.

In Java Client

이 튜토리얼에서는 2개의 자바프로그램을 사용할 것이다. 하나는 단일 메세지를 전송하는 생산자(Producer), 하나는 메세지들을 받고 출력해주는 소비자(Consumer)이다. 우리는 Java API의 세부사항을 살펴보고 시작하기 위해 매우 간단한 것에 집중할 것이다.

 

아래의 diagram에서, "P"는 producer이고 "C"는 consumer이다. 중앙의 box는 Queue이다. (RabbitMQ가 consumer를 대신해 저장하는 메세지 버퍼)

출처: https://www.rabbitmq.com/tutorials/tutorial-one-java.html

Java Client Libs

RabbitMQ는 많은 프로토콜을 언급한다. 이 튜토리얼은 메세징을 위해 오픈되고 대중적인 AMQP o-g-1. 프로토콜을 사용한다. 무수히 많은 언어들로 만들어진 RabbitMQ를 위한 클라이언트들이 있다. 지금은 RabbitMQ에 의해 제공된 자바 클라이언트를 사용할 것이다.

Client JAR파일 다운로드

SLF4J JAR파일 다운로드

(Maven Repository에도 있음)

    <dependencies>
        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.26</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

이제 자바 클라이언트와 디펜던시들을 가졌으니, 코드를 작성해보자.

Sending

메세지를 보내주는 클래스를 (sender) 'SEND' 라고 부를 것이고, 메세지를 소비해주는 클래스를 (recevier) 'RECV'라고 부를 것이다. sender는 RabbitMQ에 연결할 것이고, 단일 메세지를 보내고 연결을 끊는다.

 

우선 다음과 같은 틀을 만들어놓자.

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Send {

    private final static String QUEUE_NAME = "hello";
    
    public static void main(String[] args) {
        
    }
}

그 후, 서버에 연결하는 코드를 작성한다.

    public static void main(String[] args) {
        
        ConnectionFactory factory = new ConnectionFactory();
        
        factory.setHost("localhost");
        
        try(Connection conn = factory.newConnection();
            Channel channel = conn.createChannel()){
            
        }
    }

connection은 추상적인 소켓 연결이고 프로토콜 version을 관리한다. 그리고 우리를 위한 인증 등을 관리한다.

 

다음으로 작업을 완료하기 위한 대부분의 API가 있는 채널을 만든다. Connection과 Channel 모두 java.io.Closeable을 구현하기 때문에 try-with-resources문을 사용할 수 있다. 이렇게하면 코드 블럭에서 명시적으로 닫아줄 필요가 없다.

 

전송하기 위해 우리가 누구에게 보낼지 Queue를 선언해주어야 한다. 그 후, Queue에 메세지를 전송할 수 있다.

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");

        try(Connection conn = factory.newConnection();
            Channel channel = conn.createChannel()){

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String msg = "HELLO WORLD!";

            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

            System.out.println("[x] Sent : " + msg);
        }
    }
}

어플리케이션을 실행시키면 RabbitMQ의 관리페이지에 다음과 같은 화면을 볼 수 있다.

메세지가 들어와서 ready에 누적된 것을 확인할 수 있다. 나는 2번 실행해서 2개가 쌓여있다.

Receiving

우리의 소비자(Consumer)는 RabbitMQ로부터의 메세지를 받아주므로 단일메세지를 전송하는 publisher와 달리 소비자가 메세지를 수신하고 출력하도록 계속 실행한다.

 

Send 클래스와 마찬가지로 틀을 준비해두자.

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception{

    }
}

추가된 DeliverCallback 인터페이스는 서버에 의해 들어온 메세지들을 담을 버퍼로 사용할 것이다. 나머지 설정은 SEND클래스와 동일하며, 연결과 채널을 열고 메세지를 받을 Queue를 선언한다. 이는 Send에서 사용한 Queue의 이름과 같다.

 

Recv 또한 Queue를 선언해줘야한다. publisher보다 먼서 consumer를 시작할 수 있으므로 Queue에서 메세지를 받아오기 전에 Queue가 존재하는지 확인하려고 한다.

 

채널과의 연결을 자동으로 닫기 위해 try-with-resources 문을 사용하지 않아도 되는 이유는 다음과 같다. 우리는 단순히 프로그램을 계속 진행하고 모든 것을 닫고 종료할 것이다.

 

이제 우리는 서버에게 '우리 Queue에게 메세지를 전달해줘' 라고 말할 준비가 되었다. 우리에게 메세지들을 비동기적으로 넣어줄 것이기 때문에, 우리는 callback을 제공한다. 그것이 DeliverCallback이 하는 일이다.

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception{

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        System.out.println("[*] Waiting for msgs. (Press CTRL + C to exit)");
        
        DeliverCallback callback = (consumerTag, delivery) -> {
            String msg = new String(delivery.getBody(), "UTF-8");
            System.out.println("[x] Received : " + msg);
        };
        
        /*
        public abstract String basicConsume(String s,
                                    boolean b,
                                    com.rabbitmq.client.DeliverCallback deliverCallback,
                                    com.rabbitmq.client.CancelCallback cancelCallback)
        */
        channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});
    }
}

Recv 클래스를 실행시키면 다음과 같은 콘솔을 볼 수 있다.

그리고 RabbitMQ 관리 페이지에서는 비어있는 ready 쪽을 확인할 수 있다.

 

# References

https://www.rabbitmq.com/tutorials/tutorial-one-java.html

반응형

'Open Source > RabbitMQ' 카테고리의 다른 글

[RabbitMQ] Tutorials (5)  (0) 2021.07.29
[RabbitMQ] Tutorials (4)  (0) 2021.07.29
[RabbitMQ] Tutorials (3)  (0) 2021.07.28
[RabbitMQ] Tutorials (2)  (0) 2021.07.23
[RabbitMQ] install  (0) 2021.07.11
Comments