- Today
- Total
개발하는 고라니
[RabbitMQ] Tutorials (4) 본문
Routing
이전 튜토리얼에서 간단한 logging System을 만들어보았다. 많은 수신자들에게 로그 메세지를 송출할 수 있었다.
이번 튜토리얼에서는 특징을 추가할 것 이다. (메세지들 중 일부만 subscribe하는 것이 가능하게 할 것이다.) 예를 들어, 콘솔에서 모든 로그 메세지를 계속 출력하는 동안, 디스크 공간을 절약하기 위해 중요한 오류 메세지만 로그 파일로 보낼 수 있을 것이다.
Bindings
이전 예제에서 이미 바인딩을 만들었다, 그 코드를 다음과 같이 가져와도 된다.
channel.queueBind(queueName, EXCHANGE_NAME, "");
binding은 교환자와 Queue 사이의 관계라고 했다. 다시 말해, Queue는 교환자로부터의 메세지에 관심이 있다.
binding은 추가 routingKey 파라미터를 취할 수 있다. basic_publish 파라미터와의 혼란을 피하기 위해 우리는 이를 binding key라고 부를 것 이다. 다음은 어떻게 key와 바인딩할 수 있는가 이다.
channel.queueBind(queueName, EXCHANGE_NAME, "black");
이 binding key(여기서 black)의 의미는 교환자 타입에 따라 다르다. 이전에 사용했던 fanout 타입 교환자는 이 값을 간단히 무시해버린다.
Direct exchange
우리의 이전 튜토리얼 logging system은 모든 consumer에게 모든 메세지를 송출한다. 이번엔 심각도에 따라 메세지를 필터링할 수 있도록 확장하고자 한다. 예를 들어, 심각한 에러만 수신하고, 경고나 정보 로그 메세지를 디스크에 기록하는 프로그램을 원할 수 있다.
우린 우리에게 유연성을 제공하지 않는 fanout 타입 교환자를 이용해왔다. 이는 단지 무심코 송출만 할 수 있을 뿐이다.
이를 대신해 direct 타입의 교환자를 사용할 것이다. direct 타입 교환자가 갖는 라우팅 알고리즘은 간단하다.
-메세지의 routing key는 binding key가 정확히 일치하는 Queue로 간다. 도식화 한다면 다음과 같다.
이 설정에서, direct 타입 교환자 X를 볼 수 있고, 2개의 Queue가 그에 바인딩된 것도 확인할 수 있다. 첫번쨰 Queue는 binding key = orange로 바인딩됬고, 두번째는 black과 green 으로 바인딩 된 것을 알 수 있다.
이와 같은 설정에서, orange 라는 routing key를 갖는 교환자에 발송된 메세지는 Q1 (Queue)에 라우트될 것이다. routing key black또는 green을 갖는 메세지는 Q2로 갈 것 이다. 이를 제외한 모든 메세지는 버려진다.
(즉, 라우팅 키가 orange면 Q1으로 가고, 라우팅 키가 black이나 green이면 Q2로 간다. 이는 Q1와 교환자가 바인딩 키 orange로 바인드 되어있고, Q2와 교환자가 black/green으로 바인드 되어있기 때문이다.)
Multiple bindings
동일한 binding key(바인딩 키)로 여러 Queue에 바인딩 하는 것은 완벽하게 합법이다. 우리 예제에서 우리는 교환자 X와 Q1 Queue 사이에 바인딩 키 black으로 바인딩을 추가할 수 있다. 이 경우에, direct 교환자는 fanout처럼 행동할 것이고, 모든 매칭된 Queue들에게 메세지를 송출할 것 이다. routing key가 black인 메세지는 Q1과 Q2에 모두 전달될 것 이다.
Emitting logs
이 모델을 우리의 logging system에 적용할 것 이다. fanout 대신에 우리는 메세지를 direct 타입의 교환자에 보낼 것 이다. log의 심각도를 routing key로 사용할 것 이다. 이 방법에서 수신하는 프로그램(consumer)는 받기 원하는 심각도를 설정할 수 있을 것이다. 먼저 log를 방출하는 거에 집중하자.
늘 그랬듯이, 우리는 교환자를 먼저 만들어야한다.
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
이어서 메세지를 보낼 준비를 한다.
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
간단한 예제를 위해 심각도는 info / warning / error만 있다고 가정하자.
Subscribing
메세지를 받는 것은 한가지 제외하고 이전 튜토리얼 처럼 작동할 것이다. 바로 우리가 관심있는 심각도에만 새 바인딩을 만드는 것이다.
String queueName = channel.queueDeclare().getQueue();
for(String severity : args) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
//console에서 실행할 때 인자로 info / warning / error 중 하나를 넣는다.
Putting it all together
import com.rabbitmq.client.*;
//Tutorials (4)
public class SubscribeSeverity {
private final static String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
//1. ConnectionFactory
ConnectionFactory factory = new ConnectionFactory();
//2. setting Host
factory.setHost("localhost");
//3. get Connection
Connection conn = factory.newConnection();
//4. get Channel
Channel channel = conn.createChannel();
//validation
if(args.length < 1){
System.err.println("It must take at least one Severity[info/warning/error]");
System.exit(0);
}
//exchange -> direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//create Queue
String queueName = channel.queueDeclare().getQueue();
//binding
for(String severity:args)
channel.queueBind(queueName, EXCHANGE_NAME, severity);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback callback = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Recevied = " + delivery.getEnvelope().getRoutingKey() + "':'" + msg + "'");
};
boolean acknowledge = true;
channel.basicConsume(queueName,acknowledge, callback, consumerTag -> { });
}
}
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
//Tutorials (4)
public class PublishSeverity{
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
//1. create Factory
ConnectionFactory factory = new ConnectionFactory();
//2 set Host
factory.setHost("localhost");
try(//3. get Connection
Connection connection = factory.newConnection();
//4. get Channel
Channel channel = connection.createChannel();) {
//Exchange Declare
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String severity = getSeverity(args);
String message = getMessage(args);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("[x] Sent = [" + severity + "] : " + message);
}
}
private static String getSeverity(String[] args){
return args.length < 1? "info" : args[0];
}
private static String getMessage(String[] args){
return args.length < 2? "Hello World" : joinStrings(args, " ", 1);
}
private static String joinStrings(String[] strings, String delimiter, int startIndex) {
int length = strings.length;
if (length == 0) return "";
if (length <= startIndex) return "";
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex + 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
Result
SubscribeSeverity를 구동시킬 때, info warning error를 준 것과 error만 준 것을 각각 구동시켰다. 그 후 PublishSeverity에서 message를 보내며, severity를 info/warning/error 중 하나를 동봉해서 보냈다. error를 보내면 두 consumer에서 모두 출력되지만, 그 외의 것을 보내면 하나의 consumer에서만 출력됨을 확인하였다.
(Subscribe -> 특정한 것만 구독 -> 정해진 프로토콜(?) 혹은 패턴(?)에 해당하는 메세지만 받아볼 수 있음)
이제 다음 튜토리얼에서는 패턴에 기반한 메세지를 어떻게 듣는지 찾아보도록 하자.
'Open Source > RabbitMQ' 카테고리의 다른 글
[RabbitMQ] Tutorials (6) (0) | 2021.07.29 |
---|---|
[RabbitMQ] Tutorials (5) (0) | 2021.07.29 |
[RabbitMQ] Tutorials (3) (0) | 2021.07.28 |
[RabbitMQ] Tutorials (2) (0) | 2021.07.23 |
[RabbitMQ] Tutorials (1) (0) | 2021.07.20 |