Event-Driven Architecture trong Java
1. Nguyên tắc Cơ bản
1.1 Event
@Data
@AllArgsConstructor
public class Event<T> {
private String eventId;
private String eventType;
private T data;
private LocalDateTime timestamp;
private String source;
}
@Data
public class OrderCreatedEvent {
private String orderId;
private String customerId;
private BigDecimal totalAmount;
private List<OrderItem> items;
private LocalDateTime createdAt;
}
1.2 Event Publisher
@Service
@Slf4j
public class EventPublisher {
private final KafkaTemplate<String, Event> kafkaTemplate;
public void publish(String topic, Event event) {
try {
kafkaTemplate.send(topic, event).get();
log.info("Event published successfully: {}", event);
} catch (Exception e) {
log.error("Failed to publish event: {}", event, e);
throw new EventPublishException("Failed to publish event", e);
}
}
}
2. Message Brokers
2.1 Apache Kafka Configuration
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, Event> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.RETRIES_CONFIG, 3);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public ConsumerFactory<String, Event> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(config);
}
}
2.2 RabbitMQ Configuration
@Configuration
public class RabbitConfig {
@Bean
public Queue orderQueue() {
return new Queue("order-queue", true);
}
@Bean
public TopicExchange orderExchange() {
return new TopicExchange("order-exchange");
}
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("order.*");
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
}
3. Event Processing
3.1 Event Handlers
@Service
@Slf4j
public class OrderEventHandler {
private final OrderService orderService;
private final NotificationService notificationService;
@KafkaListener(topics = "order-events", groupId = "order-service")
public void handleOrderEvent(OrderCreatedEvent event) {
log.info("Received order event: {}", event);
try {
// Process order
orderService.processOrder(event);
// Send notification
notificationService.notifyCustomer(event.getCustomerId());
} catch (Exception e) {
log.error("Error processing order event: {}", event, e);
// Handle error (retry, dead letter queue, etc.)
}
}
}
3.2 Error Handling
@Configuration
public class KafkaErrorHandler {
@Bean
public ConsumerAwareListenerErrorHandler kafkaErrorHandler() {
return (message, exception, consumer) -> {
log.error("Error processing message: {}", message, exception);
// Retry logic
if (shouldRetry(exception)) {
return retryMessage(message);
}
// Dead letter queue
sendToDeadLetterQueue(message);
return null;
};
}
}
4. Event Sourcing
4.1 Event Store
@Entity
@Table(name = "event_store")
public class EventEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String aggregateId;
private String eventType;
private String eventData;
private LocalDateTime timestamp;
private Long version;
}
@Repository
public interface EventStore extends JpaRepository<EventEntity, Long> {
List<EventEntity> findByAggregateIdOrderByVersionAsc(String aggregateId);
}
4.2 Event Sourcing Service
@Service
@Transactional
public class OrderEventSourcingService {
private final EventStore eventStore;
private final EventPublisher eventPublisher;
public Order reconstructOrder(String orderId) {
List<EventEntity> events = eventStore.findByAggregateIdOrderByVersionAsc(orderId);
Order order = new Order();
for (EventEntity event : events) {
order.apply(deserialize(event));
}
return order;
}
public void saveEvent(String orderId, Event event) {
EventEntity eventEntity = new EventEntity();
eventEntity.setAggregateId(orderId);
eventEntity.setEventType(event.getEventType());
eventEntity.setEventData(serialize(event));
eventEntity.setTimestamp(LocalDateTime.now());
eventStore.save(eventEntity);
eventPublisher.publish("order-events", event);
}
}
5. CQRS (Command Query Responsibility Segregation)
5.1 Command Side
public interface OrderCommand {
String getOrderId();
}
public class CreateOrderCommand implements OrderCommand {
private final String orderId;
private final String customerId;
private final List<OrderItem> items;
}
@Service
public class OrderCommandHandler {
private final OrderRepository orderRepository;
private final EventPublisher eventPublisher;
@Transactional
public void handle(CreateOrderCommand command) {
Order order = new Order(command);
orderRepository.save(order);
eventPublisher.publish("order-events",
new OrderCreatedEvent(order));
}
}
5.2 Query Side
@Service
public class OrderQueryService {
private final OrderReadRepository orderReadRepository;
public OrderDTO getOrder(String orderId) {
return orderReadRepository.findById(orderId)
.map(this::toDTO)
.orElseThrow(() -> new OrderNotFoundException(orderId));
}
public List<OrderDTO> getOrdersByCustomer(String customerId) {
return orderReadRepository.findByCustomerId(customerId)
.stream()
.map(this::toDTO)
.collect(Collectors.toList());
}
}
6. Saga Pattern
6.1 Choreography-based Saga
@Service
public class OrderSaga {
@Transactional
@KafkaListener(topics = "payment-completed")
public void handlePaymentCompleted(PaymentCompletedEvent event) {
// Update order status
orderService.updateStatus(event.getOrderId(), OrderStatus.PAID);
// Trigger inventory reservation
eventPublisher.publish("inventory-events",
new ReserveInventoryCommand(event.getOrderId()));
}
@Transactional
@KafkaListener(topics = "inventory-reserved")
public void handleInventoryReserved(InventoryReservedEvent event) {
// Update order status
orderService.updateStatus(event.getOrderId(), OrderStatus.CONFIRMED);
// Trigger shipping
eventPublisher.publish("shipping-events",
new CreateShipmentCommand(event.getOrderId()));
}
@Transactional
@KafkaListener(topics = "compensation-needed")
public void handleCompensation(CompensationEvent event) {
// Rollback changes
orderService.compensate(event.getOrderId());
}
}
6.2 Orchestration-based Saga
@Service
public class OrderSagaOrchestrator {
private final OrderService orderService;
private final PaymentService paymentService;
private final InventoryService inventoryService;
private final ShippingService shippingService;
@Transactional
public void createOrder(CreateOrderCommand command) {
String sagaId = UUID.randomUUID().toString();
try {
// Create order
Order order = orderService.createOrder(command);
// Process payment
Payment payment = paymentService.processPayment(
new ProcessPaymentCommand(order));
// Reserve inventory
inventoryService.reserveInventory(
new ReserveInventoryCommand(order));
// Create shipment
shippingService.createShipment(
new CreateShipmentCommand(order));
} catch (Exception e) {
compensate(sagaId, command);
throw e;
}
}
private void compensate(String sagaId, CreateOrderCommand command) {
// Implement compensation logic
}
}
7. Event Monitoring và Debugging
7.1 Event Logging
@Aspect
@Component
@Slf4j
public class EventLoggingAspect {
@Around("@annotation(org.springframework.kafka.annotation.KafkaListener)")
public Object logEvent(ProceedingJoinPoint joinPoint) throws Throwable {
Object[] args = joinPoint.getArgs();
String methodName = joinPoint.getSignature().getName();
log.info("Processing event: method={}, args={}", methodName, args);
try {
Object result = joinPoint.proceed();
log.info("Event processed successfully: method={}", methodName);
return result;
} catch (Exception e) {
log.error("Error processing event: method={}, error={}",
methodName, e.getMessage());
throw e;
}
}
}
7.2 Event Monitoring
@Configuration
public class EventMetricsConfig {
@Bean
public MeterRegistry meterRegistry() {
return new SimpleMeterRegistry();
}
}
@Component
public class EventMetrics {
private final MeterRegistry registry;
public void recordEventProcessing(String eventType, long duration) {
registry.timer("event.processing",
"type", eventType)
.record(duration, TimeUnit.MILLISECONDS);
}
public void incrementEventCounter(String eventType) {
registry.counter("event.count",
"type", eventType)
.increment();
}
}
8. Testing Event-Driven Systems
8.1 Unit Testing
@ExtendWith(MockitoExtension.class)
public class OrderEventHandlerTest {
@Mock
private OrderService orderService;
@Mock
private NotificationService notificationService;
@InjectMocks
private OrderEventHandler eventHandler;
@Test
void shouldHandleOrderCreatedEvent() {
// Given
OrderCreatedEvent event = new OrderCreatedEvent();
event.setOrderId("123");
// When
eventHandler.handleOrderEvent(event);
// Then
verify(orderService).processOrder(event);
verify(notificationService).notifyCustomer(event.getCustomerId());
}
}
8.2 Integration Testing
@SpringBootTest
@TestcontainersTest
public class OrderEventIntegrationTest {
@Container
static KafkaContainer kafka = new KafkaContainer();
@Autowired
private EventPublisher eventPublisher;
@Autowired
private OrderService orderService;
@Test
void shouldProcessOrderEvent() {
// Given
OrderCreatedEvent event = new OrderCreatedEvent();
// When
eventPublisher.publish("order-events", event);
// Then
await()
.atMost(5, TimeUnit.SECONDS)
.until(() -> orderService.getOrder(event.getOrderId()) != null);
}
}
9. Best Practices
9.1 Event Design Guidelines
- Keep events immutable
- Include all necessary context
- Version your events
- Use meaningful event names
- Include timestamp and correlation ID
9.2 Error Handling Guidelines
- Implement retry mechanisms
- Use dead letter queues
- Log failed events
- Implement compensation logic
- Monitor event processing
10. References
- Enterprise Integration Patterns by Gregor Hohpe
- Building Event-Driven Microservices by Adam Bellemare
- Designing Event-Driven Systems by Ben Stopford
- Domain-Driven Design by Eric Evans