헬스팅
Redis + STOMP 발행 성공
myeongjaechoi
2025. 3. 8. 17:09
문제점
- [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 1]
- 역직렬화에 실패하는 문제가 있었다.
해결
- 시간과, 방 번호를 String으로 받았다.
- LocalDateTime -> String, Long -> String
프로세스 흐름
package org.zeorck.fitme.common.config.redis;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.zeorck.fitme.common.application.RedisSubscriber;
import org.zeorck.fitme.websocket.dto.ChatDto;
@Configuration
public class RedisConfig {
@Value("${spring.data.redis.host}")
private String redisHost;
@Value("${spring.data.redis.port}")
private int redisPort;
@Bean
public RedisConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory(redisHost, redisPort);
}
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));
return redisTemplate;
}
@Bean
public RedisTemplate<String, ChatDto> redisTemplateMessage(RedisConnectionFactory connectionFactory){
RedisTemplate<String, ChatDto> redisTemplateMessage = new RedisTemplate<>();
redisTemplateMessage.setConnectionFactory(connectionFactory);
redisTemplateMessage.setKeySerializer(new StringRedisSerializer());
redisTemplateMessage.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));
return redisTemplateMessage;
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, ChannelTopic.of("chatroom"));
return container;
}
@Bean
public MessageListenerAdapter listenerAdapter(RedisSubscriber subscriber) {
return new MessageListenerAdapter(subscriber, "onMessage");
}
}
- RedisConnectionFactory
- Redis 서버와의 연결을 관리하는 객체로, LetteuceConnectionFactory를 사용하여 Redis 서버의 host와 port를 설정한다.
- RedisTemplate
- Redis에 데이터를 저장하거나 조회할 때 사용하는 객체
- Key는 StringRedisSerializer로 직렬화하고, Value는 Jackson2JsonRedisSerizalizer를 사용하여 JSON 형태로 저장.
- 하나는 일반 객체, 하나는 채팅 메시지 전용
- RedisMessageListenerContainer
- Redis에서 메시지를 구독(subscribe)하고 수신하는 컨테이너
- 특정 채널(topic)에 메시지가 발행(publish)되면 이를 감지하여 처리할 messageListenerAdapter를 등록
- MessageListenerAdapter
- Redis에서 메시지를 수신했을 때 호출되는 메서드를 지정. 여기서는 RedisSubscriber 클래스의 onMessage 메서드 호출
@Service
@RequiredArgsConstructor
public class RedisPublisher {
private final RedisTemplate<String, Object> redisTemplate;
public void publish(ChannelTopic topic, ChatDto chatDto){
redisTemplate.convertAndSend(topic.getTopic(), chatDto);
}
}
- 사용자가 웹소켓을 통해 메시지를 보내면, MessageController에서 RedisPublisher의 publish() 메서드를 호출
- 이때 특정 채널(topic)에 메시지를 발행하며, RedisTemplate의 convertAndSend() 메서드가 사용
- 발행한 메시지는 Redis 서버에 전달되어 해당 채널을 구독 중인 모든 리스너에게 전파
@Slf4j
@RequiredArgsConstructor
@Service
public class RedisSubscriber implements MessageListener {
private final ObjectMapper objectMapper;
private final SimpMessagingTemplate messagingTemplate;
private final RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
try {
// Redis 메시지 역직렬화
String publishMessage = (String) redisTemplate.getStringSerializer().deserialize(message.getBody());
ChatDto chatMessage = objectMapper.readValue(publishMessage, ChatDto.class);
// WebSocket 구독자에게 메시지 전달
messagingTemplate.convertAndSend("/sub/room/" + chatMessage.getRoomNo(), chatMessage);
} catch (Exception e) {
log.error("Error processing message: {}", e.getMessage());
log.info("Message 의 직렬화 전 : " + new String(message.getBody()));
}
}
}
- Redis 서버의 특정 채널로부터 메시지가 도착하면, 앞서 설정한 MessageListenerAdapter에 의해 onMessage() 메서드가 호출
- 수신된 메시지는 역직렬화 과정을 거쳐 다시 ChatDto로 변환
- 변환된 객체는 SimpMessagingTemplate을 통해 웹소켓 구독자에게 실시간으로 전달
@RestController
@RequiredArgsConstructor
@Slf4j
public class MessageController {
private final ChatService chatService;
private final RedisPublisher redisPublisher;
@MessageMapping("/room/{roomNo}")
public void sendMessage(ChatDto message, SimpMessageHeaderAccessor headerAccessor) {
Long memberId = (Long) headerAccessor.getSessionAttributes().get("AUTHENTICATED_MEMBER_ID");
if (memberId == null) {
throw new RuntimeException("User is not authenticated");
}
log.info("Authenticated memberId: {}", memberId);
chatService.enterMessageRoom(message.getRoomNo());
redisPublisher.publish(chatService.getTopic(message.getRoomNo()),message);
// 메시지 처리
chatService.createMessage(message, memberId);
}
}
- RedisPublisher를 통해 Redis에 메시지를 발행
- DB에도 메시지를 저장하는 로직이 수행
@RequiredArgsConstructor
@Service
@Slf4j
public class ChatService {
private final SimpMessagingTemplate simpMessagingTemplate;
private final RedisTemplate<String, ChatDto> redisTemplateChat;
private final RedisMessageListenerContainer redisMessageListener;
private final RedisSubscriber redisSubscriber;
private final ChatRepository chatRepository;
private final ObjectMapper objectMapper;
private final UserRepository userRepository;
private Map<String, ChannelTopic> topics;
final String DEFAULT_URL = "/sub/room/";
@Transactional
public void createMessage(ChatDto chatDto, Long memberId) {
Optional<User> optionalUser = userRepository.findById(memberId);
if (optionalUser.isEmpty()) {
throw new RuntimeException("User not found");
}
User user = optionalUser.get();
redisTemplateChat.setValueSerializer(new Jackson2JsonRedisSerializer<>(Chat.class));
// Redis 에 메시지 발행
redisTemplateChat.opsForList().rightPush(chatDto.getRoomNo(), chatDto);
Chat entity = objectMapper.convertValue(chatDto, Chat.class);
simpMessagingTemplate.convertAndSend(DEFAULT_URL + chatDto.getRoomNo(), chatDto);
chatRepository.save(entity);
}
@PostConstruct
private void init() {
topics = new HashMap<>();
}
public ChannelTopic getTopic(String roomNo) {
if(topics == null){
topics = new HashMap<>();
}
return topics.get(roomNo);
}
public void enterMessageRoom(String roomId) {
ChannelTopic topic = topics.get(roomId);
if (topic == null) {
topic = new ChannelTopic(roomId);
redisMessageListener.addMessageListener(redisSubscriber, topic);
topics.put(roomId, topic);
}
}
}
- 채팅방 입장 시, 해당 방 번호(roomNo)를 기준으로 새로운 ChannelTopic을 생성하고 RedisMessageListenerContainer에 리스너를 등록
- 채팅 메시지가 발행되면 RedisTemplate을 통해 Redis 리스트 자료구조에 저장되고, 동시에 DB에도 영구적으로 저장
- SimpMessagingTemplate을 통해 웹소켓으로 연결된 클라이언트에게 실시간으로 전달
전체 흐름 요약
- 사용자 -> 웹소켓 -> 컨트롤러 -> 서비스 -> RedisPublisher -> Redis 서버(publish) -> RedisSubscriber -> 웹소켓 SimpMessagingTemplate -> 클라이언트
흐름을 정리해보니, chatService와 onMessage()에서 두 번 메시지를 발행하는 것을 알았다. 그래서 chatService에서
//simpMessagingTemplate.convertAndSend(DEFAULT_URL + chatDto.getRoomNo(), chatDto);
주석처리를 하였다.
이제 조회를 공부해보자.