반응형
11-15 20:53
Today
Total
«   2024/11   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
관리 메뉴

개발하는 고라니

[RabbitMQ] Tutorials (6) 본문

Open Source/RabbitMQ

[RabbitMQ] Tutorials (6)

조용한고라니 2021. 7. 29. 18:28
반응형

Remote procedure call (RPC)

 

요청 / 응답 패턴 예시

 

저번 2번째 튜토리얼에서 여러 worker들 사이에 시간이 걸리는 업무를 분배하는 법을 배웠다.

 

그러나 원격 컴퓨터에서 기능을 동작해야하고 결과를 기다려야한다면 어떨까? 이는 좀 다른 얘기이다. 이 패턴은 보편적으로 Remote procedure call 또는 RPC라고 알려져있다.

 

이번 튜토리얼에서 우리는 RPC 시스템을 구축하기 위해 RabbitMQ를 사용할 것이다. (하나의 client와 확장가능한 RPC서버를). 분배할 가치가 있는 어떤 업무도 갖고있지 않을 때, 피보나치 수열을 반환하는 더미 RPC 서비스를 만들 것이다.

Client Interface

RPC 서버 사용법을 설명하기 위해 간단한 client 클래스를 만들 것이다. RPC요청을 보내고 답을 받을 때 까지 차단하는, call이라는 이름을 가진 메서드를 노출시킨다.

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println("fib(4) is " + result);

 

RPC 참고 사항

비록 컴퓨팅에서 RPC가 일반적으로 괜찮은 패턴이지만, 종종 비판받는다. 이 문제는 프로그래머가 함수 호출이 로컬인지, 느린 RPC인지 인지하지 못할 때 발생한다. 그러한 혼란은 예측할 수 없는 시스템을 초래하고, 디버깅에 불필요한 복잡성을 추가한다. 소프트웨어를 단순화하는 대신 RPC를 잘못 사용하면 유지보수할 수 없는 스파게티 코드가 생성될 수 있다.

다음 조언을 고려하라.
- 어떤 함수 호출이 로컬이고 어떤 것이 원격인지 확실하게 한다.
- 시스템을 문서화해서 구성 요소간의 종속성을 명확히한다.
- RPC 서버가 오랜기간 다운되었을 때 클라이언트는 어떻게 반응을 해야하는지 오류 사례를 처리한다.

확실하지 않으면 RPC를 지양한다. 가능하면 비동기식 파이프라인을 사용해야한다. RPC와 같은 차단 대신 결과가 다음 계산 단계로 비동기식으로 푸시된다.

Callback Queue

일반적으로 RabbitMQ를 통해 RPC를 사용하는 것은 쉽다. Client는 요청 메세지를 보내고 Server는 응답 메세지로 응답한다. 응답을 받기위해 우리는 요청에 Callback Queue 주소를 담아 보내야한다. 우리는 Java Client에 배타적인 기본 Queue를 사용할 수 있다.

private final static String RPC_QUEUE_NAME = "rpc_queue";

...

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties.builder().replyTo(callbackQueueName).build();

channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes));

 

메세지 속성(property)

AMQP 0-9-1 protocol은 메세지에 포함된 14개의 속성 세트를 사전 정의한다. 대부분의 속성들은 드믈게 사용되는데, 다음의 속성은 자주 쓰인다.

deliveryMode
- 메세지를 지속적으로 (value = 2) 또는 일시적 (다른 값) 으로 표시한다. 2번째 튜토리얼에서 기억할 수 있을 것이다.

contentType
- 인코딩 타입을 기술하기 위해 사용된다. 예를 들어 종종 사용되는 JSON 인코딩을 사용하기 위해 이 속성을 'application/json'으로 설정한다.

replyTo
- 일반적으로 콜백 Queue의 이름을 사용한다.

correlationId
- RPC 응답을 요청과 연관시키는데 유용하다.

Correlation Id

위의 제시된 메서드에서는 모든 RPC 요청에 대해 callback Queue를 만드는 것을 제안한다. 뭔가 비효율적이지만, 다행히도 좋은 방안이 있다. 바로 한 client 당 하나의 callback Queue를 만드는 것이다.

 

이건 새로운 이슈를 떠오르게 한다. 해당 Queue에서 응답을 받았으나, 응답이 속한 요청이 명확하지 않다. 바로 이때 correlateionId 속성이 사용된다. 우리는 이를 매 요청마다 유일한 값으로 설정할 것이다. 이후에, callback Queue에서 메세지를 받을 때 이 속성을 살펴보고, 이를 기반으로 요청과 응답을 일치시킬 수 있다. 이는 요청에 귀속되지 않는다.

 

아마 의문이 들 수 있다. 왜 에러를 동반해 실패하는 것 보다 callback Queue의 불분명한 메세지들을 무시해야만 하는가? 서버 측의 경쟁 조건 가능성 때문이다. 가능성은 낮지만 RPC 서버가 응답을 보낸 직후 요청에 대한 확인 메세지(acknowledge message)를 보내기 전에 죽을 수 있다. 그렇게 되면 RPC 서버는 그 요청을 다시 처리할 것이다. 그렇기 때문에 클라이언트에서 중복 응답을 정상적으로 처리해야하고, RPC는 이상적으로 멱등적이어야 한다.

Summary

우리의 RPC는 이와 같이 동작할 것이다.

  • 한 RPC 요청에 대해, Client는 2개의 속성과 메세지를 보낸다. replyTo는 단지 요청을 위해 만들어진 익명의 배타적인 Queue로 설정되어있다. correlationId는 매 요청에 유일한 값으로 설정되어있다.
  • 요청은 rpc_queue Queue로 보내진다.
  • RPC worker(aka. server)는 Queue에서의 요청을 기다린다. 요청이 나타났을 때 업무를 수행하고 Client에게 결과와 메세지를 보낸다. replyTo 속성의 Queue를 사용해서.
  • client는 응답 Queue에서 데이터를 기다리고. 메세지가 나타나면 correlationId를 확인한다. 만약 요청에서 값이 일치하면 어플리케이션으로 응답을 반환한다.

Putting it all together

private static int fib(int n) {
    
    if(n == 0) return 0;
    if(n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

피보나치 함수를 선언했고, 양수만 입력이 가능하다고 가정하자. 

 

RPC server의 코드는 RPCServer에 있다.

서버 코드는 비교적 간단하고 직관적이다.

  • 늘 그랬듯, 연결하고 시작하고, 채널과 큐를 선언한다.
  • 우리는 1개보다 많은 서버 프로세스가 동작하길 원할 수 있다. load를 여러 서버에 균등하게 분산하려면 channel.basicQos에서 prefetchCount를 설정해야 한다. (QOS = Quality of service)
  • 우리는 basicConsume 메서드를 사용해 작업을 수행하고 응답을 다시 보낼 객체 DelieverCallback 형태의 콜백을 제공하는 Queue에 접근한다.

 

RPC client 코드는 RPCClient에서 찾을 수 있다.

클라이언트 코드는 약간 더 복잡하다.

  • 역시 연결과 채널을 세운다
  • call메서드는 실제 RPC 요청을 만든다.
  • 먼저 유일한 correleationId 번호를 만들고, 저장한다. 우리의 Consumer callback은 이 값을 적절한 응답에 매칭해 사용할 것 이다.
  • 그리고난 후, 그 correleationId에 응답하고 구독하는 배타적으로 사용되는 Queue를 만든다.
  • 그 다음은, 2개의 속성(replyTo 및 correlationId)값과 함께 요청 메세지를 발송(pub)한다.
  • 이때 적절한 응답이 도착할 때 까지 가만히 앉아 편히 기다릴 수 있다.
  • Consumer로 전달 처리가 별도의 쓰레드에서 발생하기 때문에 응답이 도착하기 전 main 쓰레드를 일시 중단할 무언가가 필요하다. BlockingQueue의 사용은 그렇게 하는 한 가지 가능한 솔루션이다. 여기에서는 하나의 응답만 기다려야 하므로 1로 설정된 ArrayBlockingQueue를 생성한다. (??)
  • Consumer는 정말 간단한 작업을 한다. 만약 correlationId가 우리가 찾던 것 인지 ,모든 소비된 응답 메세지에 대해 그것을 확인한다.
  • 동시에 main 쓰레드는 BlockingQueue로부터 응답을 기다린다.
  • 마침내 user에게 응답을 반환한다.

Making the Client Request

RPCClient fibonacciRpc = new RPCClient();

System.out.println("[x] Requesting fib(30)");
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");

fibonacciRpc.close();
//RPCServer.java
import com.rabbitmq.client.*;

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n){
        if(n == 0) return 0;
        if(n == 1) return 1;

        return fib(n-1) + fib(n-2);
    }

    public static void main(String[] args) throws Exception{

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localHost");

        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();

        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
        channel.queuePurge(RPC_QUEUE_NAME);

        int prefetchCount = 1;

        channel.basicQos(prefetchCount);

        System.out.println(" [x] Awaiting RPC requests");

        Object monitor = new Object();
        DeliverCallback callback = (consumerTag, delivery) -> {
            AMQP.BasicProperties props = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(delivery.getProperties().getCorrelationId())
                    .build();

            String response = "";

            try{
                String msg = new String(delivery.getBody(), "UTF-8");
                int n = Integer.parseInt(msg);

                System.out.println(" [.] fib(" + msg + ")");
                response += fib(n);
            } catch (RuntimeException e){
                System.out.println(" [.] " + e.toString());
            } finally {
                channel.basicPublish("", delivery.getProperties().getReplyTo(), props, response.getBytes("UTF-8"));
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

                synchronized (monitor){
                    monitor.notify();
                }
            }
        };
        channel.basicConsume(RPC_QUEUE_NAME, false, callback, (consumerTag -> { }));
        // Wait and be prepared to consume the message from RPC client.
        while (true) {
            synchronized (monitor) {
                try {
                    monitor.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
//RPCClient.java
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;

public class RPCClient implements AutoCloseable{

    private Connection conn;
    private Channel channel;

    private final static String RPC_QUEUE_NAME = "rpc_queue";

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        conn = factory.newConnection();
        channel = conn.createChannel();
    }

    public static void main(String[] args) {

        //AutoCloseable을 구현 -> try(...)에 선언 가능
        try(RPCClient fibonacciRPC = new RPCClient()){

            IntStream.range(0, 32).forEach(i -> {
                String i_str = Integer.toString(i);

                System.out.println(" [x] Requesting fib(" + i_str + ")");

                String response = null;

                try {
                    response = fibonacciRPC.call(i_str);
                } catch (Exception e) {
                    e.printStackTrace();
                }

                System.out.println(" [.] Got '" + response + "'");
            });
        } catch (Exception e){
            e.printStackTrace();
        }
    }

    public String call(String msg) throws Exception{

        final String correlationId = UUID.randomUUID().toString();

        //unique Queue create
        String replyQueueName = channel.queueDeclare().getQueue();

        //properties setting
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(correlationId)
                .replyTo(replyQueueName)
                .build();

        //Send Request
        channel.basicPublish("", RPC_QUEUE_NAME, props, msg.getBytes("UTF-8"));

        //차단하는 Queue...? 추측: correlationId 가 다르면 차단
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        DeliverCallback callback = (consumerTag, delivery) -> {

            //되받은 메세지에 담긴 correlationId를 요청보낸 correlationId와 대조
            if(delivery.getProperties().getCorrelationId().equals(correlationId))
                response.offer(new String(delivery.getBody(), "UTF-8"));
        };

        String ctag = channel.basicConsume(replyQueueName, true, callback, consumerTag -> {});
        String result = response.take();
        channel.basicCancel(ctag);

        return result;
    }

    public void close() throws IOException{ this.conn.close(); }
}

 

#Reference

 

RabbitMQ tutorial - Remote procedure call (RPC) — RabbitMQ

Remote procedure call (RPC) (using the Java client) Prerequisites This tutorial assumes RabbitMQ is installed and running on localhost on the standard port (5672). In case you use a different host, port or credentials, connections settings would require ad

www.rabbitmq.com

 

반응형

'Open Source > RabbitMQ' 카테고리의 다른 글

[RabbitMQ] Spring Boot + RabbitMQ (basic)  (0) 2021.07.29
[RabbitMQ] Tutorials (5)  (0) 2021.07.29
[RabbitMQ] Tutorials (4)  (0) 2021.07.29
[RabbitMQ] Tutorials (3)  (0) 2021.07.28
[RabbitMQ] Tutorials (2)  (0) 2021.07.23
Comments