헬스팅

Redis + STOMP 발행 성공

myeongjaechoi 2025. 3. 8. 17:09

문제점

  • [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 1]
  • 역직렬화에 실패하는 문제가 있었다.

해결

  • 시간과, 방 번호를 String으로 받았다.
    • LocalDateTime -> String, Long -> String

프로세스 흐름

package org.zeorck.fitme.common.config.redis;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.zeorck.fitme.common.application.RedisSubscriber;
import org.zeorck.fitme.websocket.dto.ChatDto;

@Configuration
public class RedisConfig {

    @Value("${spring.data.redis.host}")
    private String redisHost;

    @Value("${spring.data.redis.port}")
    private int redisPort;

    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        return new LettuceConnectionFactory(redisHost, redisPort);
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));

        return redisTemplate;
    }

    @Bean
    public RedisTemplate<String, ChatDto> redisTemplateMessage(RedisConnectionFactory connectionFactory){
        RedisTemplate<String, ChatDto> redisTemplateMessage = new RedisTemplate<>();
        redisTemplateMessage.setConnectionFactory(connectionFactory);
        redisTemplateMessage.setKeySerializer(new StringRedisSerializer());
        redisTemplateMessage.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));

        return redisTemplateMessage;
    }

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(
            RedisConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, ChannelTopic.of("chatroom"));
        return container;
    }

    @Bean
    public MessageListenerAdapter listenerAdapter(RedisSubscriber subscriber) {
        return new MessageListenerAdapter(subscriber, "onMessage");
    }

}
  • RedisConnectionFactory
    • Redis 서버와의 연결을 관리하는 객체로, LetteuceConnectionFactory를 사용하여 Redis 서버의 host와 port를 설정한다.
  • RedisTemplate
    • Redis에 데이터를 저장하거나 조회할 때 사용하는 객체
    • Key는 StringRedisSerializer로 직렬화하고, Value는 Jackson2JsonRedisSerizalizer를 사용하여 JSON 형태로 저장.
    • 하나는 일반 객체, 하나는 채팅 메시지 전용
  • RedisMessageListenerContainer
    • Redis에서 메시지를 구독(subscribe)하고 수신하는 컨테이너
    • 특정 채널(topic)에 메시지가 발행(publish)되면 이를 감지하여 처리할 messageListenerAdapter를 등록
  • MessageListenerAdapter
    • Redis에서 메시지를 수신했을 때 호출되는 메서드를 지정. 여기서는 RedisSubscriber 클래스의 onMessage 메서드 호출
@Service
@RequiredArgsConstructor
public class RedisPublisher {

    private final RedisTemplate<String, Object> redisTemplate;

    public void publish(ChannelTopic topic, ChatDto chatDto){
        redisTemplate.convertAndSend(topic.getTopic(), chatDto);
    }

}
  • 사용자가 웹소켓을 통해 메시지를 보내면, MessageController에서 RedisPublisher의 publish() 메서드를 호출
  • 이때 특정 채널(topic)에 메시지를 발행하며, RedisTemplate의 convertAndSend() 메서드가 사용
  • 발행한 메시지는 Redis 서버에 전달되어 해당 채널을 구독 중인 모든 리스너에게 전파
@Slf4j
@RequiredArgsConstructor
@Service
public class RedisSubscriber implements MessageListener {

    private final ObjectMapper objectMapper;
    private final SimpMessagingTemplate messagingTemplate;
    private final RedisTemplate redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            // Redis 메시지 역직렬화
            String publishMessage = (String) redisTemplate.getStringSerializer().deserialize(message.getBody());

            ChatDto chatMessage = objectMapper.readValue(publishMessage, ChatDto.class);

            // WebSocket 구독자에게 메시지 전달
            messagingTemplate.convertAndSend("/sub/room/" + chatMessage.getRoomNo(), chatMessage);
        } catch (Exception e) {
            log.error("Error processing message: {}", e.getMessage());
            log.info("Message 의 직렬화 전 : " + new String(message.getBody()));
        }
    }

}
  • Redis 서버의 특정 채널로부터 메시지가 도착하면, 앞서 설정한 MessageListenerAdapter에 의해 onMessage() 메서드가 호출
  • 수신된 메시지는 역직렬화 과정을 거쳐 다시 ChatDto로 변환
  • 변환된 객체는 SimpMessagingTemplate을 통해 웹소켓 구독자에게 실시간으로 전달
@RestController
@RequiredArgsConstructor
@Slf4j
public class MessageController {

    private final ChatService chatService;
    private final RedisPublisher redisPublisher;

    @MessageMapping("/room/{roomNo}")
    public void sendMessage(ChatDto message, SimpMessageHeaderAccessor headerAccessor)  {
        Long memberId = (Long) headerAccessor.getSessionAttributes().get("AUTHENTICATED_MEMBER_ID");
        if (memberId == null) {
            throw new RuntimeException("User is not authenticated");
        }

        log.info("Authenticated memberId: {}", memberId);
        chatService.enterMessageRoom(message.getRoomNo());

        redisPublisher.publish(chatService.getTopic(message.getRoomNo()),message);
        // 메시지 처리
        chatService.createMessage(message, memberId);
    }

}
  • RedisPublisher를 통해 Redis에 메시지를 발행
  • DB에도 메시지를 저장하는 로직이 수행
@RequiredArgsConstructor
@Service
@Slf4j
public class ChatService {

    private final SimpMessagingTemplate simpMessagingTemplate;
    private final RedisTemplate<String, ChatDto> redisTemplateChat;
    private final RedisMessageListenerContainer redisMessageListener;
    private final RedisSubscriber redisSubscriber;
    private final ChatRepository chatRepository;
    private final ObjectMapper objectMapper;
    private final UserRepository userRepository;
    private Map<String, ChannelTopic> topics;
    final String DEFAULT_URL = "/sub/room/";


    @Transactional
    public void createMessage(ChatDto chatDto, Long memberId) {
        Optional<User> optionalUser = userRepository.findById(memberId);
        if (optionalUser.isEmpty()) {
            throw new RuntimeException("User not found");
        }
        User user = optionalUser.get();

        redisTemplateChat.setValueSerializer(new Jackson2JsonRedisSerializer<>(Chat.class));
        // Redis 에 메시지 발행
        redisTemplateChat.opsForList().rightPush(chatDto.getRoomNo(), chatDto);

        Chat entity = objectMapper.convertValue(chatDto, Chat.class);

        simpMessagingTemplate.convertAndSend(DEFAULT_URL + chatDto.getRoomNo(), chatDto);
        chatRepository.save(entity);

    }

    @PostConstruct
    private void init() {
        topics = new HashMap<>();
    }

    public ChannelTopic getTopic(String roomNo) {
        if(topics == null){
            topics = new HashMap<>();
        }
        return topics.get(roomNo);
    }

    public void enterMessageRoom(String roomId) {
        ChannelTopic topic = topics.get(roomId);

        if (topic == null) {
            topic = new ChannelTopic(roomId);
            redisMessageListener.addMessageListener(redisSubscriber, topic);
            topics.put(roomId, topic);
        }
    }

}
  • 채팅방 입장 시, 해당 방 번호(roomNo)를 기준으로 새로운 ChannelTopic을 생성하고 RedisMessageListenerContainer에 리스너를 등록
  • 채팅 메시지가 발행되면 RedisTemplate을 통해 Redis 리스트 자료구조에 저장되고, 동시에 DB에도 영구적으로 저장
  • SimpMessagingTemplate을 통해 웹소켓으로 연결된 클라이언트에게 실시간으로 전달

전체 흐름 요약

  • 사용자 -> 웹소켓 -> 컨트롤러 -> 서비스 -> RedisPublisher -> Redis 서버(publish) -> RedisSubscriber -> 웹소켓 SimpMessagingTemplate -> 클라이언트

흐름을 정리해보니, chatService와 onMessage()에서 두 번 메시지를 발행하는 것을 알았다. 그래서 chatService에서 

//simpMessagingTemplate.convertAndSend(DEFAULT_URL + chatDto.getRoomNo(), chatDto);

주석처리를 하였다.

이제 조회를 공부해보자.