Testing Event-Driven Services — Kafka, RabbitMQ

9 min read

In a synchronous microservices system, the Order Service calls the Payment Service and waits for a response. In an event-driven system, the Order Service publishes an order.placed event to a Kafka topic and carries on — it never calls Payment Service directly. Payment Service, Notification Service, and Inventory Service all subscribe to that topic and react independently. This makes the system more resilient and decoupled, but it makes testing fundamentally different: there's no HTTP response to assert on, no single call to intercept, and no immediate result to check.

The three event test patterns

Pattern 1 — Producer test: verify that when a business action occurs, the expected event is published to the correct topic with the correct payload. This tests the output side of a service.

Pattern 2 — Consumer test: verify that when a service receives a specific event, it performs the correct business action. This tests the input side of a service.

Pattern 3 — End-to-end event test: trigger a business action in one service and verify the downstream effect in another service, after the event has been produced and consumed. This is the integration test for async systems.

Setting up Kafka with Testcontainers

Add the Testcontainers Kafka module to your Maven dependencies:

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.19.8</version>
    <scope>test</scope>
</dependency>

Create a shared base class so every Kafka test gets a real broker wired into Spring automatically:

@Testcontainers
class KafkaBaseTest {
 
    @Container
    static KafkaContainer kafka = new KafkaContainer(
        DockerImageName.parse("confluentinc/cp-kafka:7.5.0"));
 
    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
    }
}

Producer test: verify events are published

@SpringBootTest
@Testcontainers
class OrderEventProducerTest extends KafkaBaseTest {
 
    @Autowired
    private OrderService orderService;
 
    @Test
    void shouldPublishOrderPlacedEventWhenOrderCreated() throws Exception {
        KafkaConsumer<String, String> testConsumer = createTestConsumer(
            kafka.getBootstrapServers(), "orders.placed");
 
        CreateOrderRequest request = new CreateOrderRequest(42L, 100L, 2);
        Order order = orderService.createOrder(request);
 
        // Poll for the event — give the producer 10 seconds
        ConsumerRecords<String, String> records = testConsumer.poll(Duration.ofSeconds(10));
        assertThat(records.count()).isEqualTo(1);
 
        OrderPlacedEvent event = objectMapper.readValue(
            records.iterator().next().value(), OrderPlacedEvent.class);
 
        assertThat(event.getOrderId()).isEqualTo(order.getId());
        assertThat(event.getUserId()).isEqualTo(42L);
        assertThat(event.getProductId()).isEqualTo(100L);
        assertThat(event.getStatus()).isEqualTo("PLACED");
        assertThat(event.getEventType()).isEqualTo("order.placed");
    }
 
    private KafkaConsumer<String, String> createTestConsumer(
            String bootstrapServers, String topic) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-" + UUID.randomUUID());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(List.of(topic));
        return consumer;
    }
}

Key pattern: the test consumer uses a unique group ID every run ("test-" + UUID) so it always reads from the beginning of the topic and never misses an event published before the subscription was set up.

Consumer test: verify event processing

@SpringBootTest
@Testcontainers
class NotificationEventConsumerTest extends KafkaBaseTest {
 
    @Autowired
    private NotificationRepository notificationRepository;
 
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    @Test
    void shouldSendEmailWhenOrderPlacedEventReceived() throws Exception {
        String orderId = UUID.randomUUID().toString();
        OrderPlacedEvent event = new OrderPlacedEvent(orderId, 42L, "alice@test.com");
 
        kafkaTemplate.send("orders.placed", objectMapper.writeValueAsString(event)).get();
 
        // Wait for consumer to process the event and trigger notification
        await().atMost(15, SECONDS).pollInterval(500, MILLISECONDS).untilAsserted(() -> {
            Optional<Notification> notification =
                notificationRepository.findByOrderId(orderId);
            assertThat(notification).isPresent();
            assertThat(notification.get().getRecipient()).isEqualTo("alice@test.com");
            assertThat(notification.get().getType()).isEqualTo("ORDER_CONFIRMATION");
        });
    }
}

The untilAsserted pattern retries the assertion block until it passes or the timeout expires. Unlike until(() -> condition) which requires a boolean, untilAsserted accepts a lambda with AssertJ or JUnit assertions — much more expressive for event consumer tests where you want to assert on multiple fields at once, not just a single boolean condition.

Testing with RabbitMQ

The Testcontainers setup for RabbitMQ follows the same structure as Kafka — swap the container type and the property names:

@Container
static RabbitMQContainer rabbitmq = new RabbitMQContainer("rabbitmq:3.12-management");
 
@DynamicPropertySource
static void rabbitProperties(DynamicPropertyRegistry registry) {
    registry.add("spring.rabbitmq.host", rabbitmq::getHost);
    registry.add("spring.rabbitmq.port", rabbitmq::getAmqpPort);
}

The test patterns are identical: publish a message, assert the consumer processes it. The infrastructure is swapped; the structure is the same.

Topic and queue hygiene

Event tests that don't clean up their topics leave residual messages that cause the next test run to process old events. Three strategies help:

  • Create a unique topic name per test run: "orders.placed." + testRunId
  • Use consumer group IDs that reset to the beginning: AUTO_OFFSET_RESET = "earliest" with a unique group
  • Delete topics in @AfterEach — though Testcontainers creates a fresh Kafka per test class anyway, so this is mainly relevant when reusing a long-lived container across classes

Step 1 of 5

Testcontainers starts Kafka

A real Kafka container starts before the test class. Spring Boot configures its KafkaTemplate and consumer listeners to point at the containerised broker.

⚠️ Common mistakes

  • Using a shared consumer group ID across test runs. If two test runs use the same Kafka consumer group, the second run's consumer starts from where the first left off and misses the events from the current run. Always generate a unique group ID per test or per test run.
  • Asserting immediately after publishing without waiting. Kafka consumers process messages asynchronously. Calling assertThat(notificationRepository.findByOrderId(id)).isPresent() immediately after publishing the event will almost always fail because the consumer hasn't processed it yet. Always use await().untilAsserted(...).
  • Not verifying the event schema, only the side effect. The consumer test verifies that a notification was created — but if the producer sends a malformed event (wrong field names, wrong types), the consumer might handle it incorrectly or silently skip it. Add event schema assertions to both producer and consumer tests.

🎯 Practice task

  1. Add testcontainers-kafka to your project. Write a KafkaBaseTest class with a static KafkaContainer and @DynamicPropertySource that wires the bootstrap servers into Spring.
  2. Write the producer test: trigger an order creation, create a test consumer with a unique group ID, and poll the orders.placed topic. Assert on at least three fields of the event: orderId, eventType, and status.
  3. Write the consumer test: publish a hand-crafted OrderPlacedEvent directly to the topic using KafkaTemplate. Use await().atMost(15, SECONDS).untilAsserted(...) to verify the consumer created the expected notification record.
  4. Deliberately publish a malformed event (swap orderId for order_id). Run the consumer test. What happens? Does the consumer silently skip it, throw an exception, or dead-letter it? Add an assertion to verify the correct behaviour.
  5. Research Kafka's dead-letter topic pattern. Write a test that publishes a poison message (one that will always cause a deserialization error) and asserts that after the configured number of retries, the message lands in the dead-letter topic.

In the next lesson you'll learn how to test eventual consistency — how to assert that data propagated across services arrives within a defined SLA rather than asserting on an instantaneous state that may not yet exist.

// tip to track lessons you complete and pick up where you left off across devices.