Codeit/프로젝트

[모두의 플리] 고도화 - redis를 이용한 분산환경 고려 (pub/sub)

leejunkim 2025. 11. 26. 15:37

AWS ECS 등을 활용해 서버를 2대 이상 증설하는 분산 환경(Scale-out)에서는 기존 WebSocket 구현 방식에 문제가 발생한다. 이를 해결하기 위해 Redis Pub/Sub를 도입한 과정을 정리한다.

 

현재 WebSocketConfig:

@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
  private final AuthChannelInterceptor authChannelInterceptor;

  @Override
  public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/ws") // 최초 연결
        .setAllowedOriginPatterns("*") // CORS 허용
        .withSockJS(); // SockJS fallback
  }

  @Override
  public void configureMessageBroker(MessageBrokerRegistry registry) {
    registry.setApplicationDestinationPrefixes("/pub"); // client -> server
    registry.enableSimpleBroker("/sub"); // server -> client
  }

  @Override
  public void configureClientInboundChannel(ChannelRegistration registry) {
    registry.interceptors(authChannelInterceptor);
  }
}

 

현재 WebSocketConfig의 configureMessageBroker 메서드 안에 registry.enableSimpleBroker는 In Memory Message Broker이다.

 

즉, 문제 상황을 시나리오로 정리해보자면:

사용자 A는 서버1, 사용자 B는 서버2에 붙어 있음

  • 사용자 A가 채팅을 보냄 -> 서버 1의 SimpleBroker가 받음
  • 서버1은 자기한테 붙은 사용자에게만 메세지를 보냄
  • 서버2에 붙은 사용자 B는 메세지를 못받음

그래서 redis를 이용하기로 했다.

명세서에는 DB에 저장하지 않고, 휘발성과 즉시성 (latency)가 중요하기도 해서 내린 결정이였다. Kafka를 고려해보기도 했지만, kafka는 비동기 이벤트를 서버간 보낼때 더 적합하고 지금 용도는 메세지를 즉시 뿌리는 용도이기 때문에 고르기도 했다.

 

Redis의 Pub/Sub를 활용하기로 했다:

일단 서버 1,2는 Redis를 구독한다.

  • 사용자 A가 채팅 보냄 -> 서버 1이 받음
  • 서버 1은 SimplerBroker 대신, Redis의 특정 Topic(채팅방 Id)에 publish 함
  • Redis는 이 topic을 구독하고 있는 모든 서버 (서버 1,2)에게 메세지를 전달함
  • 서버 2는 Redis로부터 메세지를 수신 (subscribe)하고, messagingTemplate을 통해 자기한테 붙은 사용자 B에게 전송

Redis의 Pub/Sub 기능

Redis의 Pub/Sub 기능을 사용하면, 모든 서버가 하나의 토픽을 구독하게 함으로써 메세지를 동기화할 수 있다.

  1. 발행(Publish): 사용자 A가 메시지를 전송하면, 연결된 서버 1이 이를 받아 Redis의 특정 Topic에 발행한다.
  2. 중계(Broker): Redis는 해당 Topic을 구독 중인 모든 서버(서버 1, 서버 2)에게 메시지를 전파한다.
  3. 구독(Subscribe): 서버 2는 Redis로부터 메시지를 수신한 뒤, 자신의 메모리(Session)에 연결된 사용자 B에게 WebSocket(SimpMessagingTemplate)을 통해 최종 전송한다.

수정된 ChatController

// Individual server -> Redis database
// pushes to topic
@Slf4j
@Service
@RequiredArgsConstructor
public class RedisPublisher {

  private final RedisTemplate<String, Object> redisTemplate;
  private final ChannelTopic topic;

  // Object -> contentChatDto, WatchingSessionChange
  public void convertAndSend(String destination, Object object) {
    log.info("[웹소켓 (Redis)] Topic publish 시작, destination = {}, topic = {}",
        topic.getTopic(), destination);
    redisTemplate.convertAndSend(
        topic.getTopic(),
        new MessagePayload(destination, object)
    );
    log.info("[웹소켓 (Redis)] Topic publish 완료, destination = {}, topic = {}",
        topic.getTopic(), destination);
  }

RedisPublisher (메시지 발행)

WebSocket을 통해 들어온 메시지를 핸들링하는 컨트롤러 서비스 계층(ChatController)에서 호출된다. RedisTemplate을 사용하여 메시지를 Redis Topic으로 전송한다. (topic 설정은 뒤에 가서 설명하겠다)

@Slf4j
@Service
@RequiredArgsConstructor
public class RedisPublisher {

  private final RedisTemplate<String, Object> redisTemplate;
  private final ChannelTopic topic;

  // Object -> contentChatDto, WatchingSessionChange
  public void convertAndSend(String destination, Object object) {
    log.info("[웹소켓 (Redis)] Topic publish 시작, destination = {}, topic = {}",
        topic.getTopic(), destination);
    redisTemplate.convertAndSend(
        topic.getTopic(),
        new MessagePayload(destination, object)
    );
    log.info("[웹소켓 (Redis)] Topic publish 완료, destination = {}, topic = {}",
        topic.getTopic(), destination);
  }
}
  • 메세지 전송 요청을 Redis에 전송하는 역할을 한다

RedisSubscriber

Redis로부터 메시지가 도착했을 때 실행되는 리스너다. 수신된 JSON 데이터를 객체로 역직렬화한 뒤, SimpMessagingTemplate을 사용하여 실제 해당 서버에 연결된 WebSocket 클라이언트들에게 메시지를 뿌린다.

@Slf4j
@Service
@RequiredArgsConstructor
public class RedisSubscriber {

  private final SimpMessagingTemplate template;
  private final ObjectMapper mapper;

  public void onMessage(String message) throws JsonProcessingException {
    MessagePayload payload = mapper.readValue(message, MessagePayload.class);
    String destination = payload.destination();
    log.info("[웹소켓 (Redis)] 메세지 수신, destination = {}", destination);
    template.convertAndSend(payload.destination(), payload.content());
  }

}

RedisPubSubConfiguration (설정)

  • RedisMessageListenerContainer: Redis 채널(Topic)로부터 메시지를 비동기적으로 수신하는 컨테이너
  • MessageListenerAdapter: 들어온 메시지를 RedisSubscriber의 onMessage 메서드에 위임(Delegate)하는 어댑터
    • RedisSubscriber에 onMessage 메서드가 들어있다.
  • RedisTemplate: 직렬화(Serializer) 설정을 통해 객체를 JSON 문자열로 변환하여 통신한다
@Configuration
@RequiredArgsConstructor
public class RedisPubSubConfiguration {

  private final RedisConnectionFactory connectionFactory;
  private final RedisSubscriber redisSubscriber;

  // topic
  @Bean
  public ChannelTopic chatTopic() {
    return new ChannelTopic("websocket-events");
  }

  // Delegates the handling of messages to target listener methods
  // onMessage() is in RedisSubscriber service
  @Bean
  public MessageListenerAdapter listenerAdapter() {
    return new MessageListenerAdapter(redisSubscriber, "onMessage");
  }

  // Container that handles the low level details of
  // listening, converting and message dispatching
  @Bean
  public RedisMessageListenerContainer redisContainer() {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.addMessageListener(listenerAdapter(), chatTopic());
    return container;
  }

  @Bean
  public RedisTemplate<String, Object> websocketChatRedisTemplate() {
    RedisTemplate<String, Object> template = new RedisTemplate<>();
    template.setConnectionFactory(connectionFactory);

    // 키를 위한 직렬화 설정 (StringRedisSerializer)
    template.setKeySerializer(new StringRedisSerializer());
    template.setHashKeySerializer(new StringRedisSerializer());

    // 값을 위한 긱렬화 설정 (Jackson2JsonRedisSerializer)
    Jackson2JsonRedisSerializer<MessagePayload> serializer = new Jackson2JsonRedisSerializer<>(MessagePayload.class);
    template.setValueSerializer(serializer);
    template.setHashValueSerializer(serializer);

    return template;
  }
}

 

로그

2025-11-26T16:05:45.764+09:00 INFO 6628 --- [mopl] [nio-8080-exec-9] c.c.m.d.w.service.RedisPublisher : [웹소켓 (Redis)] Topic publish 완료, destination = websocket-events, topic = /sub/contents/ccf6489c-aac2-4081-a6ca-067dec2287e0/watch

2025-11-26T16:05:45.784+09:00 INFO 6628 --- [mopl] [edisContainer-1] c.c.m.d.w.service.RedisSubscriber : [웹소켓 (Redis)] 메세지 수신, destination = /sub/contents/ccf6489c-aac2-4081-a6ca-067dec2287e0/watch

2025-11-26T16:05:51.338+09:00 INFO 6628 --- [mopl] [boundChannel-10] c.c.m.d.w.controller.ChatController : [실시간 채팅] Chat Controller - 유저 정보 받음. UserDto = UserDto[id=663c345e-9553-4e81-a14a-a9859b9a0e2d, createdAt=2025-11-26T16:05:17.020623, email=test@test.com, name=test, profileImageUrl=null, role=USER, locked=false]

2025-11-26T16:05:51.353+09:00 INFO 6628 --- [mopl] [boundChannel-10] c.c.m.d.w.service.RedisPublisher : [웹소켓 (Redis)] Topic publish 시작, destination = websocket-events, topic = /sub/contents/ccf6489c-aac2-4081-a6ca-067dec2287e0/chat

2025-11-26T16:05:51.359+09:00 INFO 6628 --- [mopl] [boundChannel-10] c.c.m.d.w.service.RedisPublisher : [웹소켓 (Redis)] Topic publish 완료, destination = websocket-events, topic = /sub/contents/ccf6489c-aac2-4081-a6ca-067dec2287e0/chat

2025-11-26T16:05:51.359+09:00 INFO 6628 --- [mopl] [boundChannel-10] c.c.m.d.w.controller.ChatController : [실시간 채팅] Chat Controller - 채팅 보냄. destination = /sub/contents/ccf6489c-aac2-4081-a6ca-067dec2287e0/chat

2025-11-26T16:05:51.359+09:00 INFO 6628 --- [mopl] [edisContainer-2] c.c.m.d.w.service.RedisSubscriber : [웹소켓 (Redis)] 메세지 수신, destination = /sub/contents/ccf6489c-aac2-4081-a6ca-067dec2287e0/chat

2025-11-26T16:06:03.385+09:00 INFO 6628 --- [mopl] [MessageBroker-2] o.s.w.s.c.WebSocketMessageBrokerStats : WebSocketSession[1 current WS(1)-HttpStream(0)-HttpPoll(0), 1 total, 0 closed abnormally (0 connect failure, 0 send limit, 0 transport error)], stompSubProtocol[processed CONNECT(1)-CONNECTED(1)-DISCONNECT(0)], stompBrokerRelay[null], inboundChannel[pool size = 12, active threads = 0, queued tasks = 0, completed tasks = 12], outboundChannel[pool size = 3, active threads = 0, queued tasks = 0, completed tasks = 3], sockJsScheduler[pool size = 8, active threads = 1, queued tasks = 5, completed tasks = 9]

자료