카테고리 없음

Redis pub/sub을 활용하여 채팅서버 구성하기

Lahezy 2023. 10. 31.
728x90

레디스를 적용했던 이유 

기존에 다른 시스템에서 채팅서버를 구성하면서 spring에 내장되어 있는 SimpleBroker(message broker)을 활용한 경험이 있습니다.

하지만 해당 방법은 하나의 서버에 비즈니스 로직과 채팅 로직 모두 동작하여 성능을 낮출 수 있습니다. 또한 만약 서버가 확장된다면 한 서버에 있는 메시지 브로커에서만 동일한 채팅 구독정보를 가지고 있어 확장성에 안 좋다는 문제가 있습니다.

 

이를 해결하기 위해 저희 팀은 redis pub/sub구조를 활용하여 채팅서버를 사용하였습니다. (근데 만약 레디스 서버에 문제가 생기면  메세지가 유실되는 문제가 발생될 수 있습니다. 또한 지금까지의 구독기록 모두 날아가버리는 일이 발생할 수 있습니다. )

 

일단 저희팀은 rabbitMQ와 Redis 중 고민하다 간단한 방식인 redis의 pub/sub을 이용하는 방식을 선택하여 구현하였습니다.

 

구현

1. yml 파일 (local 환경에서는 local 레디스를 사용할 수 있도록 하였습니다)

spring:
  config:
    activate:
      on-profile: local

  data:
    redis:
      host: localhost
      port: 6379

2. Redis 사용 설정 

저희 팀의 경우 레디스를 이용하여 채팅을 저장하기 때문에 redisTemplate을 추가로 설정하였습니다.

또한 Redis Client는 Lettuce를 이용하여 연결 하도록 하였습니다. 

https://jojoldu.tistory.com/418

 

Jedis 보다 Lettuce 를 쓰자

Java의 Redis Client는 크게 2가지가 있습니다. Jedis Lettuce 둘 모두 몇천개의 Star를 가질만큼 유명한 오픈소스입니다. 이번 시간에는 둘 중 어떤것을 사용해야할지에 대해 성능 테스트 결과를 공유하

jojoldu.tistory.com

@Configuration
@RequiredArgsConstructor
@Slf4j
public class RedisConfig {
    private final RedisProperties properties;
    @Value("${spring.data.redis.host}")
    private String host;
    @Value("${spring.data.redis.port}")
    private int port;

    /**
     * yml 파일에 의해 포트와, 호스트가 자동으로 지정된다.
     * 연결시에 추가로 지정해야하는 경우는 @Value 읽어 지정하면 된다.
     * 사실 아래 빈 파일이 없어도 자동으로 레디스 커넥션을 만들어서 지정해준다.</p>
     *
     */
    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        RedisStandaloneConfiguration redisConfiguration = new RedisStandaloneConfiguration();
        redisConfiguration.setHostName(host);
        redisConfiguration.setPort(port);

        LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(redisConfiguration);
        lettuceConnectionFactory.afterPropertiesSet();//yml세팅으로 init
        return lettuceConnectionFactory;
    }

    @Bean
    @Primary
    public StringRedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }

    /**
     * 레디스 서버와의 상호작용을 위한 텤플릿
     *
     * @param redisConnectionFactory
     * @return
     */
    @Bean
    public StringRedisTemplate chatRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
        //채팅을 불러오는 템플릿
        return new StringRedisTemplate(redisConnectionFactory);
    }

    /**
     * 실제 메세지를 처리하는 비즈니스 로직
     */
    @Bean
    MessageListenerAdapter messageListener(RedisMessageSubscriber redisMessageSubscriber) {
        return new MessageListenerAdapter(redisMessageSubscriber);
    }

    /**
     * 발행된 메세지 처리를 위한 리스너를 설정한다.
     */
    @Bean
    RedisMessageListenerContainer redisContainer(
            RedisConnectionFactory redisConnectionFactory,
            MessageListenerAdapter messageListener
    ) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        container.addMessageListener(messageListener, topic());
        return container;
    }

    @Bean
    ChannelTopic topic() {
        return new ChannelTopic("MESSAGE");
    }
}

 

채팅을 위해서는 Stomp를 활용하였습니다. 다음과 같이 스톰프 메시지 브로커에 의해 메시지를 분리합니다.

@Configuration
@EnableWebSocketMessageBroker
public class webSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Autowired
    private StompHandler stompHandler; // jwt 인증

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws").setAllowedOriginPatterns("*").withSockJS();//추후에 도메인한정으로 수정해야한다.
        registry.addEndpoint("/ws").setAllowedOriginPatterns("*");//추후에 도메인한정으로 수정해야한다.
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/sub");
        registry.setApplicationDestinationPrefixes("/pub","/sub"); 
        // pub로 오는 메세지를 sub를 구독한 모든 사람들에게 준다
        // sub을 추가한 이유는 방을 구독하는 시점에도 해당 방에 있던 메세지를 전송해주기 위해 추가하였습니다.
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(stompHandler);
    }

}

 

참고로 저의 경우는 스프링 시큐리티를 사용하여 jwt 토큰을 헤더에서 추출하여 확인하는 과정을 가지고 있습니다. 또한 스프링 시큐리티에서는 소켓을 통신하는 /ws를 열어주어야 합니다.

만약 jwt 토큰 확인을 하지 않거나 스프링 시큐리티를 사용하지 않는다면 해당 과정은 생략가능합니다. 

import java.util.Objects;

@Component
@RequiredArgsConstructor
@Slf4j
public class StompHandler implements ChannelInterceptor {

    private final JwtTokenProvider jwtTokenProvider;

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        //jwt
        if (!StompCommand.DISCONNECT.equals(accessor.getCommand())) {
            String token = Objects.requireNonNull(accessor.getFirstNativeHeader("Authorization"))
                    .substring(7);

            JwtTokenProvider.JWTInfo jwtInfo = null;
            if (StringUtils.hasText(token) && jwtTokenProvider.validateToken(token)) {
                jwtInfo = jwtTokenProvider.decodeToken(token);
            } else {
                log.info("[소켓] 유효한 JWT토큰이 없습니다.");
            }

            // WebSocket 세션에 사용자 정보 저장
            accessor.getSessionAttributes().put("username", jwtInfo.getUsername());
        }
        return message;
    }
}

 

실제 채팅을 받는 공간입니다. 처음 채팅을 구독하면(sub) 이후에 발행된(pub) 채팅 정보를 구독한 사람들에게 보내줍니다.

@RestController
@Slf4j
@RequiredArgsConstructor
public class ChatController {

    private final ChatService chatService;
    private final ChatroomService chatroomService;

    /**
     * /pub/chat/message 으로 오는 메세지를 여기서 받아서 처리한다( -> chatservice -> redisMessageSubscriber에서 모두에게 전송)
     */
    @MessageMapping("/chat/message")
    public void message(
            ChatRequestDto chatRequestDto,
            SimpMessageHeaderAccessor accessor
    ) {
        // WebSocket 세션에서 사용자 정보 가져오기(만약 앞에서 시큐리티 설정을 하지 않았다면 dto에서 받아올 수 있습니다)
        String username = (String)accessor.getSessionAttributes().get("username");
        chatService.sendMessage(chatRequestDto, username);
    }

    /**
     * /sub/chat/room/{roomApiId} 방에 입장하면 이전 메세지를 출력해주는 기능
     */
    @SubscribeMapping("/chat/room/{roomApiId}")
    public List<ChatResponseDto> sendGreet(
            @DestinationVariable("roomApiId") String roomApiId
    ) {
        log.info("chatController: new subscription to {}", roomApiId);
        List<ChatResponseDto> messages = chatService.getLastMessages(roomApiId);
        return messages;

    }
}

 

 

레디스의 메시지 전송부분입니다. 구독한 메세지를 이곳에서 다시 전송합니다. 

@Service
@Slf4j
@RequiredArgsConstructor
public class RedisMessageSubscriber implements MessageListener {
    private final ObjectMapper objectMapper;
    private final RedisTemplate redisTemplate;
    private final SimpMessageSendingOperations messagingTemplate;

    /**
     * 여기서 메세지를 다시 구독자들에게 전송합니다.(레디스 pub/sub)
     * @param message message must not be {@literal null}.
     * @param pattern pattern matching the channel (if specified) - can be {@literal null}.
     */
    @Override
    public void onMessage(final Message message, final byte[] pattern) {
        try {
            String publishMessage = (String) redisTemplate.getStringSerializer().deserialize(message.getBody());
            ChatResponseDto chatResponseDto = objectMapper.readValue(publishMessage, ChatResponseDto.class);

            log.info("redisMessageSubscriber: data {}", chatResponseDto);
            log.info("redisMessageSubscriber: data to {}", "/sub/chat/room/" + chatResponseDto.getChatroomApiId());
            messagingTemplate.convertAndSend("/sub/chat/room/" + chatResponseDto.getChatroomApiId(), chatResponseDto);

        } catch (Exception e) {
            throw new BusinessException(ErrorCode.CHAT_NOT_FOUND);
        }
    }
}

 

채팅 메세지를 저장합니다.

@Service
@Slf4j
@RequiredArgsConstructor
@Transactional(readOnly = true)
public class ChatService {
    private final UserModuleService userModuleService;
    private final ChatroomService chatroomService;
    private final ChatRedisRepository chatRedisRepository;
    private final ChannelTopic channelTopic;
    private final RedisTemplate redisTemplate;
    private final ObjectMapper objectMapper;

    /**
     * 들어온 채팅을 관리합니다.
     */
    @Transactional
    public void sendMessage(ChatRequestDto chatRequestDto, String currentUsername) {
        Gson gson = new Gson();
        User user = userModuleService.getByUsername(currentUsername);
        Chatroom chatroom = chatroomService.getByApiId(chatRequestDto.getRoomApiId());
        
        //채팅 저장
        ChatRedis chatRedis = ChatRedis.of(chatroom, user, chatRequestDto.getMessage());
        chatRedisRepository.saveMessage(chatRedis);

        //반환 정보
        ChatResponseDto chatResponseDto = ChatResponseDto.fromEntity(chatRedis, chatroom.getApiId());
        String json = gson.toJson(chatResponseDto);

        //json으로 반환된 객체를 다시 전달한다.(방을 구독한 사람들에게 전달하기 위해서)
        redisTemplate.convertAndSend(channelTopic.getTopic(), json);
    }

    /**
     * 이전 메세지를 불러오는 기능(레디스 정보를 파싱해서 가져옵니다)
     */
    public List<ChatResponseDto> getLastMessages(String roomApiId) {

        List<ChatResponseDto> responseDtos = new ArrayList<>();
        if (chatroomService.getByApiId(roomApiId) != null) {

            int offset = 0;
            int size = 200;
            Set<String> values = chatRedisRepository.getMessages(roomApiId, offset, size);
            for (String value : values) {
                ChatResponseDto chatResponseDto = null;
                try {
                    ChatRedis chat = objectMapper.readValue(value, ChatRedis.class);
                    chatResponseDto = ChatResponseDto.fromEntity(chat, roomApiId);
                } catch (JsonProcessingException e) {
                    throw new RuntimeException(e);
                }
                responseDtos.add(chatResponseDto);
            }
        } 
        
        return responseDtos;
    }
}

 

문제점 파악 

redis의 경우에는 메세지 브로커 이기 때문에 구독시점에 구독자가 없으면 채팅이 보내지지 않고 바로 삭제된다는 특징이 있습니다.

또한 스톰프 형식을 지원하지 않아서 별도의 처리를 진행해 주어야 한다는 단점이 있습니다. 또한 참고자료에 따르면 구독자가 늘어다는 경우 구독자를 순회하면서 메세지를 발행해서 선형적으로 발행속도가 지연된다는 단점이 있었습니다.

 

이로 인해 현재 저희 팀은 프로젝트의 채팅 부분을 카프카로 바꿔 채팅을 제공해보려고 하고 있습니다

 

 

 

 

 

참고

https://docs.spring.io/spring-data/redis/docs/current/reference/html/#preface

https://www.youtube.com/watch?v=H_DaPyUOeTo&ab_channel=%EB%8D%B0%EB%B8%8C%EC%9B%90%EC%98%81DVWY

https://kakaoentertainment-tech.tistory.com/109

https://inpa.tistory.com/entry/REDIS-%F0%9F%93%9A-PUBSUB-%EA%B8%B0%EB%8A%A5-%EC%86%8C%EA%B0%9C-%EC%B1%84%ED%8C%85-%EA%B5%AC%EB%8F%85-%EC%95%8C%EB%A6%BC

 

728x90

댓글