- Today
- Total
개발하는 고라니
[RabbitMQ] Tutorials (2) 본문
Work Queues
- worker들에게 작업을 분배(경쟁 consumers 패턴)
저번 포스팅에서 우리는 이름붙인 큐로부터 메세지를 주고받고 하는 프로그램을 짰다. 이번 튜토리얼에서 우리는 시간이 많이 걸리는 작업을 여러 작업자에게 분산할 'Work Queue'를 만들어 볼 것 이다.
Task Queues라고 불리는 Work Queues의 핵심은 자원이 많이드는 일을 즉시 하는 것과, 그것을 완료하기 위해 기다려야만 하는 것을 피하는 것이다.
대신에 우리는 그 일을 나중에 처리하도록 조정해준다. 우리는 업무를 하나의 메세지로서 캡슐화하고, Queue에 보내버린다. 그러면 뒤에서 동작하는 worker가 업무들을 표시할 것이고 결국 일을 실행할 것이다.
당신이 많은 worker들을 구동할 때 업무들은 그것들 사이에서 공유될 것이다.
이 개념은 특히 짧은 HTTP Request 동안 복잡한 업무를 처리가 안되는 Web 어플리케이션에서 유용하다.
준비
이전 포스팅에서 우리는 "Hello World"를 메세지로 보냈다. 이제 우리는 복잡한 업무를 위해 여러 문자열들을 보낼 것 이다. 우리는 이미지를 편집하거나, PDF 파일을 띄우는 복잡한 실세계의 작업은 없으므로, 우리가 바쁜 것 처럼 속이기 위해 Thread.sleep()을 사용한다. 우리는 문자열에 점(.)들을 이용해 복잡성을 나타낸다. 각 점은 '1초'가 걸린다.
예를 들어, "Hello..."는 3초가 걸린다.
우리는 임의적인 메세지들을 Command Line으로 보내기 위해 이전 포스팅의 Send.java를 약간 수정할 것이다. 이 프로그램은 우리의 worker들을 조율할 것이고, NewTask.java로 명명하자.
NewTask.java
public class NewTask {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection conn = factory.newConnection();
Channel channel = conn.createChannel()){
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//Tutorial 2.
String msg = String.join(" ", args);
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("[x] Sent : " + msg);
}
}
}
우리의 이전 코드 Recv.java 역시 약간의 변화가 있다. (메세지 body의 점들은 몇 초의 지연이 필요하다.) 이는 전달된 메세지들을 다루고 업무를 수행할 것이다. 이 클래스를 Worker.java라고 명명하자.
Worker.java
public class Worker {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("[*] Waiting for msgs. (Press CTRL + C to exit)");
DeliverCallback callback = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received : " + msg);
try{
doWork(msg);
} catch (Exception e){
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME, autoAck, callback, consumerTag -> {});
}
private static void doWork(String task) throws InterruptedException{
//점 1개당 1초 지연
for(char ch: task.toCharArray())
if(ch == '.') Thread.sleep(1000);
}
}
이제 CLI에서 이 튜토리얼의 코드를 컴파일하자. (.jav 파일들을 환경변수 CP와 함께)
javac -cp $CP NewTask.java Worker.java
이게 위의 커맨드가 무슨 말이냐면...CLI에서 커맨드로 NewTask.java와 Worker.java를 컴파일하는데, 필요한 라이브러리가 있으니 -cp옵션을 주고 그 라이브러리를 올려주면 된다. 나같은 경우,
NewTask.java와 Worker.java가 각각 다른 프로젝트에 있으므로, 인텔리제이의 베이스 경로에 그 라이브러리 파일을 저장해두고 다음과 같이 커맨드 입력을 했다.
//CP를 변수로 지정해둔다. 현재 디렉토리에 해당 파일들이 있어야한다.
set CP=.;amqp-client-5.13.0.jar;slf4j-api-1.7.26.jar;slf4j-simple-1.7.26.jar
//window 기준
javac -cp %CP% rabbitMQRecvTest\src\main\java\Worker.java rabbitMQTest\src\main\java\NewTask.java
//Worker 실행
java -cp %CP% main\java\Worker.java
//NewTask 실행
java -cp %CP% rabbitMQTest\src\main\java\NewTask.java [message]
파일은 아래 있다. amqp-client 뿐 아니라, slf4j 라이브러리도 받아야 하는데, 이는 링크 에서 받아도 되고 밑에 첨부하겠다.
순차적으로 발송하기 (Round-Robin Dispatching)
Task Queue을 이용함으로써 장점 중 하나는, 쉽게 업무를 평행하게 놓을 수 있다는 것 이다. 만약 우리가 산더미 같은 일에 처해있다면, 우리는 더 많은 worker들을 추가하는 것 만으로 쉽게 확장할 수 있다.
먼저 2개의 worker를 동시에 구동시켜보도록 하자. 2개의 worker모두 Queue로부터 메세지를 받을 것 이지만, 얼마나 정확할까?
당신은 3개의 console이 필요하다. 2개는 worker들이고, 이 console들은 2개의 Consumer가 될 것이다. (C1, C2라 하자)
java -cp %CP% rabbitMQRecvTest\src\main\java\Worker.java
나머지 세번째는 새로운 업무들을 생산해서 보낼것이다. 일단 Consumer를 시작하기만 하면 당신은 약간의 메세지를 보낼 수 있다.
//Workers
기본적으로 RabbitMQ는 각 메세지를 각 Consumer에게 순서대로 보낸다. 평균적으로 모든 Consumer는 같은 양의 메세지들을 받게될 것 이다. 이런 메세지 분배 방법을 Round-Robin 이라 부른다. 이 방법으로 3개 이상의 worker를 둬보는걸 시도해보라.
메세지 Acknowledgment(승인)
업무를 처리하는 것은 약간의 시간이 걸릴 수 있다. 만약 어떤 Consumer가 긴 시간이 걸리는 업무를 시작했고, 그것을 부분적으로 처리하다 뻑났다면 어떻게 될 지 궁금할 것 이다. 현재까지 우리의 코드로는, 일단 RabbitMQ가 메세지를 consumer에게 전달하면, 그것은 즉시 제거된다. 이런 경우에 만약 당신이 worker를 죽인다면, 우리는 처리되던 메세지를 잃을 것 이다. 우리는 또한 이 특정 worker에게 발송되었지만 아직 처리되지 않은 모든 메세지를 잃게 된다.
그러나 우리는 어떤 업무도 잃길 원하지 않는다. 만약 worker가 뻑나더라도 우리는 그 업무가 다른 worker에게 전달되어지길 원한다.
메세지를 절대 잃지 않기 위해서 RabbitMQ는 Message Acknowledgments를 지원한다. Consumer는 특정 메세지가 수신 처리 되었으며, RabbitMQ가 이를 제거할 수 있음을 RabbitMQ에게 알리기 위해 승인을 다시 보낸다.
만약 Consumer가 승인 보내는 것 없이 죽는다면(채널이 닫이거나, 연결이 끊기거나, TCP 연결을 잃거나), RabbitMQ는 그 메세지가 완벽히 처리되지 않았다고 이해할 것이고, 다시 Queue에 넣을 것 이다. 만약 다른 Consumer들이 동시에 온라인 상태라면, 재빨리 그들중 하나에게 재전송할 것 이다. 이 방법은 worker들이 때때로 죽더라도 당신에게 메세지 분실이 안되게 하는 것에 확신을 줄 수 있다.
메세지에 시간 초과는 없다; RabbitMQ는 Consumer가 죽었을 때 메세지를 재전송할 것이다. 심지어 메세지 처리에 매우매우 긴 시간이 걸리더라도 문제없다.
Manual message acknowledgments는 기본적으로 on 되어있다. 이전 예제들에서 우리는 분명히 autoAck = true 값을 줌으로써 그것들을 해제했다. 이제 그 값을 false로 둘 시간이다. 그리고 우리가 업무를 마친다면 적절한 승인을 worker로부터 받을 시간이다.
//Worker.java
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("[*] Waiting for msgs. (Press CTRL + C to exit)");
DeliverCallback callback = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received : " + msg);
try{
doWork(msg);
} catch (Exception e){
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
//boolean autoAck = true;
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, callback, consumerTag -> {});
}
이 코드를 사용한다는 것은 우리의 worker가 업무를 처리하던 중, CTRL+C로 죽이더라도 아무것도 잃지 않는다는 것을 확신하게 되는 것이다. worker가 죽은 직후 승인되지 않은 메세지가 다시 전달된다.
승인은 반드시 동일한 주고받는 채널에 보내져야만 한다. 승인을 다른 채널에 보내려 시도하는 것은 'channel-level protocol' 예외가 발생할 것 이다. 자세한건 https://www.rabbitmq.com/confirms.html참조
Message Durability(메세지 유지)
우리는 worker가 죽더라도 업무를 잃지 않으려면 어떻게 해야하는지 배워왔다. 그러나 우리의 업무는 RabbitMQ 서버가 멈춘다면 모두 멈출 것 이다.
RabbitMQ가 종료 또는 망가질 때 Queue와 메세지들을 잃어버리게 될 것이다. 만약 당신이 RabbitMQ에게 말하지 않는다면. 메세지 손실 방지를 위해 요구되는 것은 2가지가 있다. (우리는 Queue와 메세지들이 유지되도록 표시해야한다)
먼저, RabbitMQ가 재시작되어도 Queue가 살아남는 것에 대한 확신이 필요하다. 그러기 위해서 우리는 그것을 유지되도록 선언하는게 필요하다.
//Worker.java의 일부
//NewTask.java에도 durable을 지정해줘야함
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
//durable
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
이 명령은 그 자체로 분명하지만, 현제 우리의 설정상 동작하진 않는다. 우리는 이미 "hello"라고 불리는 Queue를 유지되지 않도록 선언했기 때문이다. RabbitMQ는 이미 존재하는 Queue를 다른 인자로 재정의하도록 허락하지 않는다. 그리고 이를 시도하려는 어떤 프로그램이든 에러를 발생할 것이다. 그러나 빠른 해결 방법이 있다. 바로 Queue를 다른 이름으로 정의하는 것이다. 예를 들어 "TASK_QUEUE"라고 하자.
private final static String QUEUE_NAME = "TASK_QUEUE";
이 수정사항은 producer(NewTask.java)와 consumer(Worker.java) 모두 적용되어야 한다.
이제 우리는 RabbitMQ 서버가 재시작되어도 TASK_QUEUE 큐를 잃지 않도록 보증받을 수 있다. 지금부터 우리 메세지를 영속화되는 것으로써 표시해야한다. (BasicProperties를 구현한) MessageProperties를 PERSISTENT_TEXT_PLAIN 값으로 설정함으로써!
//NewTask.java
public class NewTask {
private final static String QUEUE_NAME = "TASK_QUEUE";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection conn = factory.newConnection();
Channel channel = conn.createChannel()){
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = String.join(" ", args);
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
System.out.println("[x] Sent : " + msg);
// channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
// System.out.println("[x] Sent : " + msg);
}
}
}
Fair Dispatch (공정한 발송)
우리가 원하는대로 정확하게 발송 작업이 잘 안되는 걸 당신은 알아챘을 지도 모른다. 예를 들어 두 개의 worker가 있는 상황에서 하나는 무겁고 하나는 심히 가벼운 메세지가 있을 때, 하나의 worker는 지속적으로 바쁘고 다른 하나는 거의 작업을 수행하지 않는다. RabbitMQ는 그것에 대해 알지 못하고 여전히 균등하게 발송할 것이다.
이 현상은 RabbitMQ가 메세지가 큐에 들어올 때 발송하기 때문에 나타난다. consumer의 승인되지 않은 메세지 수는 확인하지 않는다. 모든 n번째 메세지를 n번째 소비자에게 맹목적으로 발송한다.
이를 방지하기 위해 우리는 prefetchCount = 1 설정을 하는 basicQos 메서드를 사용할 수 있다. 이는 RabbitMQ에게 한번에 worker에게 하나 이상의 메세지를 주지 않도록 지시한다. 즉, worker가 이전 메세지를 처리하고 승인하기 전까지 worker에게 새로운 메세지를 발송하지 않는다. 대신에 바쁘지 않은 worker에게 보낼 것 이다.
int prefetchCount = 1;
channel.basicQos(prefetchCount);
//Worker.java
public class Worker {
private final static String QUEUE_NAME = "TASK_QUEUE";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
//durable
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
System.out.println("[*] Waiting for msgs. (Press CTRL + C to exit)");
int prefetchCount = 1;
channel.basicQos(prefetchCount);
DeliverCallback callback = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received : " + msg);
try{
doWork(msg);
} catch (Exception e){
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
//boolean autoAck = true;
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, callback, consumerTag -> {});
}
private static void doWork(String task) throws InterruptedException{
for(char ch: task.toCharArray())
if(ch == '.') Thread.sleep(1000);
}
}
이제 Message Acknowledgments와 prefetchCount를 설정한 work Queue(worker)를 사용할 수 있다. 이 유지 옵션은 업무들이 RabbitMQ가 재시작되더라도 살아있도록 해준다.
Channel 메서드와 MessageProperties에 관한 정보를 더 얻고 싶다면, 다음 링크를 참조하라.
https://rabbitmq.github.io/rabbitmq-java-client/api/current/
이제 우리는 다음 튜토리얼로 이동해서 동일한 메세지를 많은 worker에게 보내는 것을 배울 수 있다.
Result
실제로 RabbitMQ 서버를 껏다 켰는데 Queue에 그대로 작업물이 남아있음을 확인하였다. 하지만 왜인지 Queue 하나당 하나의 message를 받고 더이상 메세지를 받지 않는다... 뭔가 설정을 바꿔주어야 할 것 같다.
문제점 발견)
- Worker.java의 finally 부분에 빠트린 것이 있다.
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
위의 코드에는 반영해두었다.
이제 ..............................................................................을 무수히 많이 찍어 시간이 오래걸리도록 하나를 보내고, 뒤이어 메세지들 을 보내면 다른 Queue로 메세지가 간다. 이는 prefetchCount 설정과, ack 설정을 해줘서 가능한 일이다.
#Reference
'Open Source > RabbitMQ' 카테고리의 다른 글
[RabbitMQ] Tutorials (5) (0) | 2021.07.29 |
---|---|
[RabbitMQ] Tutorials (4) (0) | 2021.07.29 |
[RabbitMQ] Tutorials (3) (0) | 2021.07.28 |
[RabbitMQ] Tutorials (1) (0) | 2021.07.20 |
[RabbitMQ] install (0) | 2021.07.11 |