Spring Messaging

1. JMS (Java Message Service)

Cấu hình JMS

@Configuration
public class JmsConfig {

    @Bean
    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(connectionFactory);
        template.setDefaultDestinationName("defaultQueue");
        return template;
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("3-10");
        return factory;
    }
}

Message Producer

@Service
public class OrderMessageProducer {

    @Autowired
    private JmsTemplate jmsTemplate;

    public void sendOrder(Order order) {
        try {
            jmsTemplate.convertAndSend("orderQueue", order);
            log.info("Order sent to queue: {}", order.getId());
        } catch (JmsException e) {
            log.error("Failed to send order: {}", e.getMessage());
            throw new MessageSendException("Failed to send order", e);
        }
    }

    public void sendOrderWithPriority(Order order, int priority) {
        jmsTemplate.send("orderQueue", session -> {
            Message message = session.createObjectMessage(order);
            message.setJMSPriority(priority);
            return message;
        });
    }
}

Message Consumer

@Service
public class OrderMessageConsumer {

    @Autowired
    private OrderService orderService;

    @JmsListener(destination = "orderQueue")
    public void receiveOrder(Order order) {
        try {
            log.info("Received order: {}", order.getId());
            orderService.processOrder(order);
        } catch (Exception e) {
            log.error("Error processing order: {}", e.getMessage());
            throw new MessageProcessingException("Failed to process order", e);
        }
    }

    @JmsListener(destination = "deadLetterQueue")
    public void handleFailedMessages(Message message) {
        try {
            log.warn("Processing dead letter: {}", message.getJMSMessageID());
            // Retry logic or error handling
        } catch (JMSException e) {
            log.error("Error handling dead letter: {}", e.getMessage());
        }
    }
}

2. RabbitMQ Integration

RabbitMQ Configuration

@Configuration
public class RabbitConfig {

    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable("orderQueue")
            .withArgument("x-dead-letter-exchange", "deadLetterExchange")
            .withArgument("x-dead-letter-routing-key", "deadLetter")
            .build();
    }

    @Bean
    public TopicExchange orderExchange() {
        return new TopicExchange("orderExchange");
    }

    @Bean
    public Binding binding(Queue orderQueue, TopicExchange orderExchange) {
        return BindingBuilder
            .bind(orderQueue)
            .to(orderExchange)
            .with("order.*");
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }
}

RabbitMQ Producer

@Service
public class RabbitOrderProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ObjectMapper objectMapper;

    public void sendOrder(Order order) {
        try {
            rabbitTemplate.convertAndSend("orderExchange", "order.created", order);
            log.info("Order sent to RabbitMQ: {}", order.getId());
        } catch (AmqpException e) {
            log.error("Failed to send order to RabbitMQ: {}", e.getMessage());
            throw new MessageSendException("Failed to send order", e);
        }
    }

    public void sendOrderWithHeaders(Order order, Map<String, Object> headers) {
        rabbitTemplate.convertAndSend("orderExchange", "order.created", order, message -> {
            message.getMessageProperties().getHeaders().putAll(headers);
            return message;
        });
    }
}

RabbitMQ Consumer

@Service
public class RabbitOrderConsumer {

    @Autowired
    private OrderService orderService;

    @RabbitListener(queues = "orderQueue")
    public void handleOrder(Order order) {
        try {
            log.info("Received order from RabbitMQ: {}", order.getId());
            orderService.processOrder(order);
        } catch (Exception e) {
            log.error("Error processing order from RabbitMQ: {}", e.getMessage());
            throw new AmqpRejectAndDontRequeueException("Failed to process order", e);
        }
    }

    @RabbitListener(queues = "#{orderQueue.name}")
    public void handleOrderWithAck(Order order, Channel channel, 
                                 @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            orderService.processOrder(order);
            channel.basicAck(tag, false);
        } catch (Exception e) {
            try {
                channel.basicNack(tag, false, true);
            } catch (IOException ex) {
                log.error("Error sending nack", ex);
            }
        }
    }
}

3. Kafka Integration

Kafka Configuration

@Configuration
public class KafkaConfig {

    @Bean
    public ProducerFactory<String, Order> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public ConsumerFactory<String, Order> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-group");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, Order> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Order> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Order> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Kafka Producer

@Service
public class KafkaOrderProducer {

    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;

    public void sendOrder(Order order) {
        try {
            ProducerRecord<String, Order> record = new ProducerRecord<>(
                "order-topic",
                order.getId().toString(),
                order
            );

            kafkaTemplate.send(record)
                .addCallback(
                    result -> log.info("Order sent to Kafka: {}", order.getId()),
                    ex -> log.error("Failed to send order to Kafka: {}", ex.getMessage())
                );
        } catch (Exception e) {
            log.error("Error sending order to Kafka: {}", e.getMessage());
            throw new MessageSendException("Failed to send order", e);
        }
    }

    public void sendOrderWithHeaders(Order order, Map<String, String> headers) {
        ProducerRecord<String, Order> record = new ProducerRecord<>(
            "order-topic",
            order.getId().toString(),
            order
        );

        headers.forEach((key, value) -> record.headers().add(key, value.getBytes()));

        kafkaTemplate.send(record);
    }
}

Kafka Consumer

@Service
public class KafkaOrderConsumer {

    @Autowired
    private OrderService orderService;

    @KafkaListener(topics = "order-topic", groupId = "order-group")
    public void handleOrder(Order order) {
        try {
            log.info("Received order from Kafka: {}", order.getId());
            orderService.processOrder(order);
        } catch (Exception e) {
            log.error("Error processing order from Kafka: {}", e.getMessage());
            // Error handling strategy
        }
    }

    @KafkaListener(topics = "order-topic", groupId = "order-group")
    public void handleOrderWithAck(ConsumerRecord<String, Order> record,
                                 Acknowledgment ack) {
        try {
            Order order = record.value();
            orderService.processOrder(order);
            ack.acknowledge();
        } catch (Exception e) {
            log.error("Error processing order", e);
            // Retry logic or dead letter queue
        }
    }
}

4. WebSocket Support

WebSocket Configuration

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")
            .setAllowedOrigins("*")
            .withSockJS();
    }

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setMessageSizeLimit(8192)
            .setSendBufferSizeLimit(8192)
            .setSendTimeLimit(10000);
    }
}

WebSocket Controller

@Controller
public class OrderWebSocketController {

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @MessageMapping("/orders")
    @SendTo("/topic/orders")
    public OrderStatus processOrder(Order order) {
        try {
            OrderStatus status = orderService.processOrder(order);
            return status;
        } catch (Exception e) {
            return new OrderStatus(OrderState.ERROR, e.getMessage());
        }
    }

    public void notifyOrderUpdate(String orderId, OrderStatus status) {
        messagingTemplate.convertAndSend(
            "/topic/orders/" + orderId,
            status
        );
    }

    @SubscribeMapping("/orders/updates")
    public void subscribeToUpdates(Principal principal) {
        log.info("User {} subscribed to order updates", principal.getName());
    }
}

5. Best Practices

1. Error Handling

@Component
public class MessageErrorHandler {

    @Autowired
    private DeadLetterService deadLetterService;

    public void handleError(Message<?> message, Exception e) {
        MessageHeaders headers = message.getHeaders();
        String messageId = headers.getId().toString();

        log.error("Error processing message {}: {}", messageId, e.getMessage());

        DeadLetter deadLetter = DeadLetter.builder()
            .messageId(messageId)
            .payload(message.getPayload())
            .headers(headers)
            .error(e.getMessage())
            .timestamp(Instant.now())
            .build();

        deadLetterService.save(deadLetter);
    }
}

2. Message Transformation

@Component
public class MessageTransformer {

    @Autowired
    private ObjectMapper objectMapper;

    public <T> Message<T> transform(Message<?> message, Class<T> targetClass) {
        try {
            T payload = objectMapper.convertValue(message.getPayload(), targetClass);

            return MessageBuilder
                .withPayload(payload)
                .copyHeaders(message.getHeaders())
                .setHeader("transformed", true)
                .build();
        } catch (Exception e) {
            throw new MessageTransformationException(
                "Failed to transform message to " + targetClass.getSimpleName(),
                e
            );
        }
    }
}

3. Message Routing

@Component
public class MessageRouter {

    private final Map<String, MessageChannel> routes = new ConcurrentHashMap<>();

    public void registerRoute(String type, MessageChannel channel) {
        routes.put(type, channel);
    }

    @Router(inputChannel = "inputChannel")
    public MessageChannel route(Message<?> message) {
        String type = message.getHeaders().get("type", String.class);

        MessageChannel channel = routes.get(type);
        if (channel == null) {
            throw new MessageRoutingException(message, "No route found for type: " + type);
        }

        return channel;
    }
}

4. Message Monitoring

@Component
public class MessageMonitor {

    @Autowired
    private MeterRegistry meterRegistry;

    private final Map<String, Counter> counters = new ConcurrentHashMap<>();

    @EventListener
    public void onMessageSent(MessageSentEvent event) {
        String destination = event.getMessage().getHeaders()
            .get("destination", String.class);

        Counter counter = counters.computeIfAbsent(
            "message.sent." + destination,
            id -> Counter.builder(id)
                .description("Messages sent to " + destination)
                .register(meterRegistry)
        );

        counter.increment();
    }

    @Scheduled(fixedRate = 60000)
    public void reportMetrics() {
        counters.forEach((key, counter) ->
            log.info("{}: {}", key, counter.count())
        );
    }
}

5. Message Persistence

```java @Service public class MessagePersistenceService {

@Autowired
private MessageRepository messageRepository;

@Transactional
public void persistMessage(Message<?> message) {
    MessageEntity entity = MessageEntity.builder()
        .messageId(message.getHeaders().getId().toString())
        .payload(convertPayloadToString(message.getPayload()))
        .headers(convertHeadersToMap(message.getHeaders()))
        .timestamp(Instant.now())
        .build();

    messageRepository.save(entity);
}

private String convertPayloadToString(Object payload) {
    try {
        return objectMapper.writeValueAsString(payload);
    } catch (JsonProcessingException e) {
        throw new MessagePersistenceException("Failed to convert payload", e);
    }
}

private Map<String, String> convertHeadersToMap(MessageHeaders headers) {
    return headers.entrySet().stream()
        .collect(Collectors.toMap(
            Map.Entry::getKey,
            e -> String.valueOf(e.getValue())
        ));
}

}