Event-Driven Architecture trong Java

1. Nguyên tắc Cơ bản

1.1 Event

@Data
@AllArgsConstructor
public class Event<T> {
    private String eventId;
    private String eventType;
    private T data;
    private LocalDateTime timestamp;
    private String source;
}

@Data
public class OrderCreatedEvent {
    private String orderId;
    private String customerId;
    private BigDecimal totalAmount;
    private List<OrderItem> items;
    private LocalDateTime createdAt;
}

1.2 Event Publisher

@Service
@Slf4j
public class EventPublisher {
    private final KafkaTemplate<String, Event> kafkaTemplate;

    public void publish(String topic, Event event) {
        try {
            kafkaTemplate.send(topic, event).get();
            log.info("Event published successfully: {}", event);
        } catch (Exception e) {
            log.error("Failed to publish event: {}", event, e);
            throw new EventPublishException("Failed to publish event", e);
        }
    }
}

2. Message Brokers

2.1 Apache Kafka Configuration

@Configuration
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, Event> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                  StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                  JsonSerializer.class);
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        config.put(ProducerConfig.RETRIES_CONFIG, 3);

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public ConsumerFactory<String, Event> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
                  StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
                  JsonDeserializer.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return new DefaultKafkaConsumerFactory<>(config);
    }
}

2.2 RabbitMQ Configuration

@Configuration
public class RabbitConfig {
    @Bean
    public Queue orderQueue() {
        return new Queue("order-queue", true);
    }

    @Bean
    public TopicExchange orderExchange() {
        return new TopicExchange("order-exchange");
    }

    @Bean
    public Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder
            .bind(queue)
            .to(exchange)
            .with("order.*");
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }
}

3. Event Processing

3.1 Event Handlers

@Service
@Slf4j
public class OrderEventHandler {
    private final OrderService orderService;
    private final NotificationService notificationService;

    @KafkaListener(topics = "order-events", groupId = "order-service")
    public void handleOrderEvent(OrderCreatedEvent event) {
        log.info("Received order event: {}", event);

        try {
            // Process order
            orderService.processOrder(event);

            // Send notification
            notificationService.notifyCustomer(event.getCustomerId());
        } catch (Exception e) {
            log.error("Error processing order event: {}", event, e);
            // Handle error (retry, dead letter queue, etc.)
        }
    }
}

3.2 Error Handling

@Configuration
public class KafkaErrorHandler {
    @Bean
    public ConsumerAwareListenerErrorHandler kafkaErrorHandler() {
        return (message, exception, consumer) -> {
            log.error("Error processing message: {}", message, exception);

            // Retry logic
            if (shouldRetry(exception)) {
                return retryMessage(message);
            }

            // Dead letter queue
            sendToDeadLetterQueue(message);

            return null;
        };
    }
}

4. Event Sourcing

4.1 Event Store

@Entity
@Table(name = "event_store")
public class EventEntity {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String aggregateId;
    private String eventType;
    private String eventData;
    private LocalDateTime timestamp;
    private Long version;
}

@Repository
public interface EventStore extends JpaRepository<EventEntity, Long> {
    List<EventEntity> findByAggregateIdOrderByVersionAsc(String aggregateId);
}

4.2 Event Sourcing Service

@Service
@Transactional
public class OrderEventSourcingService {
    private final EventStore eventStore;
    private final EventPublisher eventPublisher;

    public Order reconstructOrder(String orderId) {
        List<EventEntity> events = eventStore.findByAggregateIdOrderByVersionAsc(orderId);

        Order order = new Order();
        for (EventEntity event : events) {
            order.apply(deserialize(event));
        }

        return order;
    }

    public void saveEvent(String orderId, Event event) {
        EventEntity eventEntity = new EventEntity();
        eventEntity.setAggregateId(orderId);
        eventEntity.setEventType(event.getEventType());
        eventEntity.setEventData(serialize(event));
        eventEntity.setTimestamp(LocalDateTime.now());

        eventStore.save(eventEntity);
        eventPublisher.publish("order-events", event);
    }
}

5. CQRS (Command Query Responsibility Segregation)

5.1 Command Side

public interface OrderCommand {
    String getOrderId();
}

public class CreateOrderCommand implements OrderCommand {
    private final String orderId;
    private final String customerId;
    private final List<OrderItem> items;
}

@Service
public class OrderCommandHandler {
    private final OrderRepository orderRepository;
    private final EventPublisher eventPublisher;

    @Transactional
    public void handle(CreateOrderCommand command) {
        Order order = new Order(command);
        orderRepository.save(order);

        eventPublisher.publish("order-events", 
            new OrderCreatedEvent(order));
    }
}

5.2 Query Side

@Service
public class OrderQueryService {
    private final OrderReadRepository orderReadRepository;

    public OrderDTO getOrder(String orderId) {
        return orderReadRepository.findById(orderId)
            .map(this::toDTO)
            .orElseThrow(() -> new OrderNotFoundException(orderId));
    }

    public List<OrderDTO> getOrdersByCustomer(String customerId) {
        return orderReadRepository.findByCustomerId(customerId)
            .stream()
            .map(this::toDTO)
            .collect(Collectors.toList());
    }
}

6. Saga Pattern

6.1 Choreography-based Saga

@Service
public class OrderSaga {
    @Transactional
    @KafkaListener(topics = "payment-completed")
    public void handlePaymentCompleted(PaymentCompletedEvent event) {
        // Update order status
        orderService.updateStatus(event.getOrderId(), OrderStatus.PAID);

        // Trigger inventory reservation
        eventPublisher.publish("inventory-events",
            new ReserveInventoryCommand(event.getOrderId()));
    }

    @Transactional
    @KafkaListener(topics = "inventory-reserved")
    public void handleInventoryReserved(InventoryReservedEvent event) {
        // Update order status
        orderService.updateStatus(event.getOrderId(), OrderStatus.CONFIRMED);

        // Trigger shipping
        eventPublisher.publish("shipping-events",
            new CreateShipmentCommand(event.getOrderId()));
    }

    @Transactional
    @KafkaListener(topics = "compensation-needed")
    public void handleCompensation(CompensationEvent event) {
        // Rollback changes
        orderService.compensate(event.getOrderId());
    }
}

6.2 Orchestration-based Saga

@Service
public class OrderSagaOrchestrator {
    private final OrderService orderService;
    private final PaymentService paymentService;
    private final InventoryService inventoryService;
    private final ShippingService shippingService;

    @Transactional
    public void createOrder(CreateOrderCommand command) {
        String sagaId = UUID.randomUUID().toString();

        try {
            // Create order
            Order order = orderService.createOrder(command);

            // Process payment
            Payment payment = paymentService.processPayment(
                new ProcessPaymentCommand(order));

            // Reserve inventory
            inventoryService.reserveInventory(
                new ReserveInventoryCommand(order));

            // Create shipment
            shippingService.createShipment(
                new CreateShipmentCommand(order));

        } catch (Exception e) {
            compensate(sagaId, command);
            throw e;
        }
    }

    private void compensate(String sagaId, CreateOrderCommand command) {
        // Implement compensation logic
    }
}

7. Event Monitoring và Debugging

7.1 Event Logging

@Aspect
@Component
@Slf4j
public class EventLoggingAspect {
    @Around("@annotation(org.springframework.kafka.annotation.KafkaListener)")
    public Object logEvent(ProceedingJoinPoint joinPoint) throws Throwable {
        Object[] args = joinPoint.getArgs();
        String methodName = joinPoint.getSignature().getName();

        log.info("Processing event: method={}, args={}", methodName, args);

        try {
            Object result = joinPoint.proceed();
            log.info("Event processed successfully: method={}", methodName);
            return result;
        } catch (Exception e) {
            log.error("Error processing event: method={}, error={}", 
                methodName, e.getMessage());
            throw e;
        }
    }
}

7.2 Event Monitoring

@Configuration
public class EventMetricsConfig {
    @Bean
    public MeterRegistry meterRegistry() {
        return new SimpleMeterRegistry();
    }
}

@Component
public class EventMetrics {
    private final MeterRegistry registry;

    public void recordEventProcessing(String eventType, long duration) {
        registry.timer("event.processing",
            "type", eventType)
            .record(duration, TimeUnit.MILLISECONDS);
    }

    public void incrementEventCounter(String eventType) {
        registry.counter("event.count",
            "type", eventType)
            .increment();
    }
}

8. Testing Event-Driven Systems

8.1 Unit Testing

@ExtendWith(MockitoExtension.class)
public class OrderEventHandlerTest {
    @Mock
    private OrderService orderService;

    @Mock
    private NotificationService notificationService;

    @InjectMocks
    private OrderEventHandler eventHandler;

    @Test
    void shouldHandleOrderCreatedEvent() {
        // Given
        OrderCreatedEvent event = new OrderCreatedEvent();
        event.setOrderId("123");

        // When
        eventHandler.handleOrderEvent(event);

        // Then
        verify(orderService).processOrder(event);
        verify(notificationService).notifyCustomer(event.getCustomerId());
    }
}

8.2 Integration Testing

@SpringBootTest
@TestcontainersTest
public class OrderEventIntegrationTest {
    @Container
    static KafkaContainer kafka = new KafkaContainer();

    @Autowired
    private EventPublisher eventPublisher;

    @Autowired
    private OrderService orderService;

    @Test
    void shouldProcessOrderEvent() {
        // Given
        OrderCreatedEvent event = new OrderCreatedEvent();

        // When
        eventPublisher.publish("order-events", event);

        // Then
        await()
            .atMost(5, TimeUnit.SECONDS)
            .until(() -> orderService.getOrder(event.getOrderId()) != null);
    }
}

9. Best Practices

9.1 Event Design Guidelines

  • Keep events immutable
  • Include all necessary context
  • Version your events
  • Use meaningful event names
  • Include timestamp and correlation ID

9.2 Error Handling Guidelines

  • Implement retry mechanisms
  • Use dead letter queues
  • Log failed events
  • Implement compensation logic
  • Monitor event processing

10. References

  • Enterprise Integration Patterns by Gregor Hohpe
  • Building Event-Driven Microservices by Adam Bellemare
  • Designing Event-Driven Systems by Ben Stopford
  • Domain-Driven Design by Eric Evans