들어가면서
Pub/Sub을 지원하는 도구라 하면 보통 RabbitMQ, Apache ActiveMQ, Amazon SQS 와 Apache Kafka를 생각할 것이다. 하지만 Redis도 NoSQL 기능에 더해 Pub/Sub을 지원한다.
이번 글에서는 Redis Pub/Sub과 Spring Data Redis가 어떻게 Redis Pub/Sub을 지원하는지 알아보겠다.
목차
1. Redis Pub/Sub
2. Spring Data Redis로 Redis Pub/Sub 사용하기
3. Spring Data Redis 코드 파헤치기
Redis Pub/Sub
Redis의 Pub/Sub 지원은 일반적인 Pub/Sub 기술들과 유사하다.
1. 구독자는 토픽을 구독한다.
2. 토픽에 메시지가 발행될때 메시지가 구독자들에게 전달된다.
하지만 기억해야 할 한 가지 사실이 있다. Redis는 메시지를 따로 저장하지 않기 때문에 전송된 메시지를 구독자가 제대로 받지 못했을 경우, 메시지를 복구 할 수 없다. 따라서 메시지 손실에 민감한 서비스는 Redis Streams를 추가로 도입해야한다.
좀 더 자세한 내용을 보고 싶다면 공식문서를 참조하기 바란다.
실습
- 1번 : "firstChannel"을 구독한다.
- 2번 : 구독한 채널에서 메시지를 받는다.
- firstChannel로 "helloWorld!" 메시지를 발행한다.
Spring Data Redis로 Redis Pub/Sub 사용하기
Spring 공식 문서의 예제 코드를 참조해 Spring Data Redis로 Redis Pub/Sub 기능을 사용해보자.
dependency
plugins {
id("org.springframework.boot") version "3.1.1"
id("io.spring.dependency-management") version "1.1.0"
kotlin("jvm") version "1.8.22"
kotlin("plugin.spring") version "1.8.22"
}
group = "kr.elvis"
version = "0.0.1-SNAPSHOT"
java {
sourceCompatibility = JavaVersion.VERSION_17
}
repositories {
mavenCentral()
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-data-redis")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("io.github.oshai:kotlin-logging-jvm:4.0.0")
testImplementation("org.springframework.boot:spring-boot-starter-test")
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs += "-Xjsr305=strict"
jvmTarget = "17"
}
}
tasks.withType<Test> {
useJUnitPlatform()
}
Receiver (Redis에게 받은 메시지를 처리하는 리스너)
private val logger = KotlinLogging.logger { }
class Receiver {
private val counter = AtomicInteger()
fun receiveMessage(message: String) {
logger.info { "Received < $message >" }
counter.incrementAndGet()
}
fun getCount() = counter.get()
}
PlaygroundSpringDataRedisPubsubApplication (스프링 설정 & 시작)
private val logger = KotlinLogging.logger { }
@SpringBootApplication
class PlaygroundSpringDataRedisPubsubApplication {
@Bean
fun container(
connectionFactory: RedisConnectionFactory,
listenerAdapter: MessageListenerAdapter,
): RedisMessageListenerContainer {
val container = RedisMessageListenerContainer()
container.setConnectionFactory(connectionFactory)
return container
}
@Bean
fun listenerAdapter(receiver: Receiver) = MessageListenerAdapter(receiver, "receiveMessage")
@Bean
fun receiver() = Receiver()
@Bean
fun template(connectionFactory: RedisConnectionFactory) = StringRedisTemplate(connectionFactory)
}
fun main(args: Array<String>) {
val ctx = runApplication<PlaygroundSpringDataRedisPubsubApplication>(*args)
val container = ctx.getBean(RedisMessageListenerContainer::class.java)
val listenerAdapter = ctx.getBean(MessageListenerAdapter::class.java)
// adding message listener before container's listener initialization. (subscribe redis)
container.addMessageListener(listenerAdapter, ChannelTopic("first chat"))
// adding message listener after container's listener initialization. (subscribe redis)
container.addMessageListener(listenerAdapter, ChannelTopic("second chat"))
val template = ctx.getBean(StringRedisTemplate::class.java)
val receiver = ctx.getBean(Receiver::class.java)
while (receiver.getCount() == 0) {
logger.info { "Sending message..." }
// publish message to redis
template.convertAndSend("first chat", "Hello from Redis!")
Thread.sleep(500L)
}
exitProcess(0)
}
- 발행
- StringRedisTemplate의 convertAndSend API를 이용하면 Redis의 특정 채널에 메시지를 발행할 수 있다. (첫 번째 인자가 채널, 두 번째 인자가 메시지다.)
- 구독
- Redis의 메시지는 MessageListener에 의해 처리된다.
- 위 코드의 Recevier는 POJO이기 때문에 MessageListenerAdapter로 Wrapping해 RedisMessageListenerContainer에 등록했다.
- RedisMessageListenerContainer는 MessageListener들을 관리한다.
- Redis의 메시지는 MessageListener에 의해 처리된다.
실행 결과
코드 설명
1. "first chat" 과 "second chat" 채널에 발행된 메시지를 받는 리스너를 RedisMessageListenerContainer에 등록했다.
2. StringRedisTemplate을 이용해 "first chat" 채널에 메시지를 발행한다.
3. RedisMessageListenerContainer에 등록된 MessageListener로 Redis에서 받은 메시지를 처리한다.
자세한 코드는 GitHub을 참고하기 바란다.
Spring Data Redis의 추상화 덕분에 Low-Level API를 사용하지 않고 Redis의 Pub/Sub 기능을 구현했다. 하지만 내부에선 어떤 API가 사용될까? 디버거를 통해 내부 코드를 분석해보겠다.
Spring Data Redis 코드 파헤치기
발행
- 메시지 발행을 위해 StringRedisTemplate의 convertAndSend 호출
- LettuceConnection의 publish API 호출
- Lettuce의 publish API를 통해 Redis에 메시지 발행
RedisTemplate의 convertAndSend API는 Redis Client(Lettuce)의 API를 이용해 Redis에게 메시지를 발행한다.
구독
구독 로직은 발행 로직보다 조금 복잡하다. 로직은 크게 2가지 케이스로 나뉜다.
- RedisMessageListenerContainer의 listener가 초기화 되기 전
- RedisMessageListenerContainer의 listener 초기화 된 후
RedisMessageListenerContainer의 listener 초기화 되기 전
- "first chat" 채널에서 오는 메시지를 처리하기 위한 MessageListener 등록
- Container의 Message Listener 지연 초기화 메서드 호출
- Message Listener가 초기화 돼지 않아, 초기화
- Message Listener 초기화 과정에서 Redis 채널 구독.
- Redis 구독을 위한 메소드 호출
- Redis 구독을 위한 메소드 호출
- Redis 구독을 위한 메소드 호출..
- Redis Client(Lettuce)의 subscribe API로 Redis 구독
RedisMessageListenerContainer의 addMessageListener를 호출하면 리스너들이 초기화 되는 과정에 Redis Client(Lettuce)를 이용해 Redis 채널을 구독한다.
RedisMessageListenerContainer가 초기화 된 후
- RedisMessageListenerContainer이용해 새로운 채널에 MessageListener 등록
- lazyListen 메서드를 호출하지만 이미 초기화 돼있어 종료
- 추가된 리스너의 채널을 구독하기 위해 subscribeChannel 호출
- Redis 채널 구독을 위해 doWithSubscription 호출
- Redis 채널 구독을 위한 메서드 호출
- Redis Client(Lettuce) API로 Redis 구독
RedisMessageListenerContainer의 MessageListener 들이 초기화 된 후 MessageListener를 추가하면, 추가된 MessageListener가 처리하는 채널만 Redis Client(Lettuce)를 이용해 구독한다.
정리
이번 글을 통해 Redis의 Pub/Sub 기능과 Spring Data Redis로 Redis Pub/Sub 기능을 사용하는 방법에대해 정리했다.
Redis를 사용하면 캐싱 기능뿐 아니라 Pub/Sub 기능도 개발할 수 있기 때문에 간단한 서비스에서 유용해 보인다. 단, 메시지가 복구되지 않으니 메시지 손실이 치명적인 서비스는 추가적인 작업을 해야한다.
Spring Data Redis로 Redis Pub/Sub 사용해봤다. Spring의 추상화의 덕분에 적은 량의 코드로 Pub/Sub 기능을 구현했다. 하지만 항상 내부 동작을 이해하고 사용하자.
'Spring' 카테고리의 다른 글
Spring @Transactional 동작원리 완벽 정리 (0) | 2024.03.02 |
---|---|
Spring MVC에서 MappingJacksonValue 활용하기 (0) | 2024.03.01 |
Spring's STOMP support 파헤치기 (0) | 2023.06.02 |
Spring Websocket Trobleshooting!! (feat. Decorator Pattern) (0) | 2023.05.21 |
Spring Cloud GateWay의 non-blocking server (0) | 2022.12.31 |