Spring Messaging
1. JMS (Java Message Service)
Cấu hình JMS
@Configuration
public class JmsConfig {
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory);
template.setDefaultDestinationName("defaultQueue");
return template;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(
ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("3-10");
return factory;
}
}
Message Producer
@Service
public class OrderMessageProducer {
@Autowired
private JmsTemplate jmsTemplate;
public void sendOrder(Order order) {
try {
jmsTemplate.convertAndSend("orderQueue", order);
log.info("Order sent to queue: {}", order.getId());
} catch (JmsException e) {
log.error("Failed to send order: {}", e.getMessage());
throw new MessageSendException("Failed to send order", e);
}
}
public void sendOrderWithPriority(Order order, int priority) {
jmsTemplate.send("orderQueue", session -> {
Message message = session.createObjectMessage(order);
message.setJMSPriority(priority);
return message;
});
}
}
Message Consumer
@Service
public class OrderMessageConsumer {
@Autowired
private OrderService orderService;
@JmsListener(destination = "orderQueue")
public void receiveOrder(Order order) {
try {
log.info("Received order: {}", order.getId());
orderService.processOrder(order);
} catch (Exception e) {
log.error("Error processing order: {}", e.getMessage());
throw new MessageProcessingException("Failed to process order", e);
}
}
@JmsListener(destination = "deadLetterQueue")
public void handleFailedMessages(Message message) {
try {
log.warn("Processing dead letter: {}", message.getJMSMessageID());
// Retry logic or error handling
} catch (JMSException e) {
log.error("Error handling dead letter: {}", e.getMessage());
}
}
}
2. RabbitMQ Integration
RabbitMQ Configuration
@Configuration
public class RabbitConfig {
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("orderQueue")
.withArgument("x-dead-letter-exchange", "deadLetterExchange")
.withArgument("x-dead-letter-routing-key", "deadLetter")
.build();
}
@Bean
public TopicExchange orderExchange() {
return new TopicExchange("orderExchange");
}
@Bean
public Binding binding(Queue orderQueue, TopicExchange orderExchange) {
return BindingBuilder
.bind(orderQueue)
.to(orderExchange)
.with("order.*");
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
}
RabbitMQ Producer
@Service
public class RabbitOrderProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ObjectMapper objectMapper;
public void sendOrder(Order order) {
try {
rabbitTemplate.convertAndSend("orderExchange", "order.created", order);
log.info("Order sent to RabbitMQ: {}", order.getId());
} catch (AmqpException e) {
log.error("Failed to send order to RabbitMQ: {}", e.getMessage());
throw new MessageSendException("Failed to send order", e);
}
}
public void sendOrderWithHeaders(Order order, Map<String, Object> headers) {
rabbitTemplate.convertAndSend("orderExchange", "order.created", order, message -> {
message.getMessageProperties().getHeaders().putAll(headers);
return message;
});
}
}
RabbitMQ Consumer
@Service
public class RabbitOrderConsumer {
@Autowired
private OrderService orderService;
@RabbitListener(queues = "orderQueue")
public void handleOrder(Order order) {
try {
log.info("Received order from RabbitMQ: {}", order.getId());
orderService.processOrder(order);
} catch (Exception e) {
log.error("Error processing order from RabbitMQ: {}", e.getMessage());
throw new AmqpRejectAndDontRequeueException("Failed to process order", e);
}
}
@RabbitListener(queues = "#{orderQueue.name}")
public void handleOrderWithAck(Order order, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
orderService.processOrder(order);
channel.basicAck(tag, false);
} catch (Exception e) {
try {
channel.basicNack(tag, false, true);
} catch (IOException ex) {
log.error("Error sending nack", ex);
}
}
}
}
3. Kafka Integration
Kafka Configuration
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, Order> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public ConsumerFactory<String, Order> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Order> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Order> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Kafka Producer
@Service
public class KafkaOrderProducer {
@Autowired
private KafkaTemplate<String, Order> kafkaTemplate;
public void sendOrder(Order order) {
try {
ProducerRecord<String, Order> record = new ProducerRecord<>(
"order-topic",
order.getId().toString(),
order
);
kafkaTemplate.send(record)
.addCallback(
result -> log.info("Order sent to Kafka: {}", order.getId()),
ex -> log.error("Failed to send order to Kafka: {}", ex.getMessage())
);
} catch (Exception e) {
log.error("Error sending order to Kafka: {}", e.getMessage());
throw new MessageSendException("Failed to send order", e);
}
}
public void sendOrderWithHeaders(Order order, Map<String, String> headers) {
ProducerRecord<String, Order> record = new ProducerRecord<>(
"order-topic",
order.getId().toString(),
order
);
headers.forEach((key, value) -> record.headers().add(key, value.getBytes()));
kafkaTemplate.send(record);
}
}
Kafka Consumer
@Service
public class KafkaOrderConsumer {
@Autowired
private OrderService orderService;
@KafkaListener(topics = "order-topic", groupId = "order-group")
public void handleOrder(Order order) {
try {
log.info("Received order from Kafka: {}", order.getId());
orderService.processOrder(order);
} catch (Exception e) {
log.error("Error processing order from Kafka: {}", e.getMessage());
// Error handling strategy
}
}
@KafkaListener(topics = "order-topic", groupId = "order-group")
public void handleOrderWithAck(ConsumerRecord<String, Order> record,
Acknowledgment ack) {
try {
Order order = record.value();
orderService.processOrder(order);
ack.acknowledge();
} catch (Exception e) {
log.error("Error processing order", e);
// Retry logic or dead letter queue
}
}
}
4. WebSocket Support
WebSocket Configuration
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOrigins("*")
.withSockJS();
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setMessageSizeLimit(8192)
.setSendBufferSizeLimit(8192)
.setSendTimeLimit(10000);
}
}
WebSocket Controller
@Controller
public class OrderWebSocketController {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@MessageMapping("/orders")
@SendTo("/topic/orders")
public OrderStatus processOrder(Order order) {
try {
OrderStatus status = orderService.processOrder(order);
return status;
} catch (Exception e) {
return new OrderStatus(OrderState.ERROR, e.getMessage());
}
}
public void notifyOrderUpdate(String orderId, OrderStatus status) {
messagingTemplate.convertAndSend(
"/topic/orders/" + orderId,
status
);
}
@SubscribeMapping("/orders/updates")
public void subscribeToUpdates(Principal principal) {
log.info("User {} subscribed to order updates", principal.getName());
}
}
5. Best Practices
1. Error Handling
@Component
public class MessageErrorHandler {
@Autowired
private DeadLetterService deadLetterService;
public void handleError(Message<?> message, Exception e) {
MessageHeaders headers = message.getHeaders();
String messageId = headers.getId().toString();
log.error("Error processing message {}: {}", messageId, e.getMessage());
DeadLetter deadLetter = DeadLetter.builder()
.messageId(messageId)
.payload(message.getPayload())
.headers(headers)
.error(e.getMessage())
.timestamp(Instant.now())
.build();
deadLetterService.save(deadLetter);
}
}
2. Message Transformation
@Component
public class MessageTransformer {
@Autowired
private ObjectMapper objectMapper;
public <T> Message<T> transform(Message<?> message, Class<T> targetClass) {
try {
T payload = objectMapper.convertValue(message.getPayload(), targetClass);
return MessageBuilder
.withPayload(payload)
.copyHeaders(message.getHeaders())
.setHeader("transformed", true)
.build();
} catch (Exception e) {
throw new MessageTransformationException(
"Failed to transform message to " + targetClass.getSimpleName(),
e
);
}
}
}
3. Message Routing
@Component
public class MessageRouter {
private final Map<String, MessageChannel> routes = new ConcurrentHashMap<>();
public void registerRoute(String type, MessageChannel channel) {
routes.put(type, channel);
}
@Router(inputChannel = "inputChannel")
public MessageChannel route(Message<?> message) {
String type = message.getHeaders().get("type", String.class);
MessageChannel channel = routes.get(type);
if (channel == null) {
throw new MessageRoutingException(message, "No route found for type: " + type);
}
return channel;
}
}
4. Message Monitoring
@Component
public class MessageMonitor {
@Autowired
private MeterRegistry meterRegistry;
private final Map<String, Counter> counters = new ConcurrentHashMap<>();
@EventListener
public void onMessageSent(MessageSentEvent event) {
String destination = event.getMessage().getHeaders()
.get("destination", String.class);
Counter counter = counters.computeIfAbsent(
"message.sent." + destination,
id -> Counter.builder(id)
.description("Messages sent to " + destination)
.register(meterRegistry)
);
counter.increment();
}
@Scheduled(fixedRate = 60000)
public void reportMetrics() {
counters.forEach((key, counter) ->
log.info("{}: {}", key, counter.count())
);
}
}
5. Message Persistence
```java @Service public class MessagePersistenceService {
@Autowired
private MessageRepository messageRepository;
@Transactional
public void persistMessage(Message<?> message) {
MessageEntity entity = MessageEntity.builder()
.messageId(message.getHeaders().getId().toString())
.payload(convertPayloadToString(message.getPayload()))
.headers(convertHeadersToMap(message.getHeaders()))
.timestamp(Instant.now())
.build();
messageRepository.save(entity);
}
private String convertPayloadToString(Object payload) {
try {
return objectMapper.writeValueAsString(payload);
} catch (JsonProcessingException e) {
throw new MessagePersistenceException("Failed to convert payload", e);
}
}
private Map<String, String> convertHeadersToMap(MessageHeaders headers) {
return headers.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> String.valueOf(e.getValue())
));
}
}