Kafka's exactly-once semantics (EOS) requires three cooperative pieces: the idempotent producer (enable.idempotence=true) to deduplicate retried batches at the broker, producer transactions to atomically publish across partitions, and read_committed isolation on consumers. In Spring Boot, wire these up with KafkaTransactionManager and @Transactional. For dual-write safety between your database and Kafka, use the Transactional Outbox pattern. Monitor abort rates and consumer lag religiously in production.
Table of Contents
- Why Exactly-Once Matters
- Kafka Delivery Semantics Deep Dive
- Idempotent Producer (enable.idempotence=true)
- Producer Transactions in Kafka
- Spring Boot KafkaTransactionManager Configuration
- Transactional Outbox Pattern with Kafka
- Consumer Side: Read-Committed Isolation Level
- Offset Management Strategies
- Monitoring Transactions
- Common Pitfalls and Anti-Patterns
- Production Checklist
1. Why Exactly-Once Matters
In distributed event-driven systems, the guarantee you choose for message delivery has direct consequences on data correctness, system complexity, and business outcomes. Payment debits, inventory deductions, and audit-log entries are all examples where processing a message twice — or missing it entirely — causes real-world harm.
Consider an order-processing service that publishes an OrderCreated event to Kafka. If the producer retries after a broker timeout, the consumer might process two identical events, charging the customer twice. Conversely, if the producer gives up prematurely, the downstream fulfillment service never receives the event and the order silently drops.
Kafka's exactly-once semantics, introduced in version 0.11, gives you a transactional primitive at the broker level: combine it with careful application design and you eliminate entire categories of duplicates and data loss without resorting to two-phase commit.
2. Kafka Delivery Semantics Deep Dive
Kafka supports three delivery guarantees. Understanding each one's trade-offs is the foundation for choosing the right configuration.
| Guarantee | acks | Retries | Duplicates? | Use Case |
|---|---|---|---|---|
| At-Most-Once | 0 | 0 | No (data loss possible) | Metrics, IoT telemetry |
| At-Least-Once | 1 or all | > 0 | Yes (retries cause duplicates) | Most event pipelines |
| Exactly-Once | all | MAX_INT | No (idempotent + txn) | Payments, orders, inventory |
At-least-once is the default in most Kafka deployments and is perfectly fine when your consumers are idempotent by design (e.g., upsert operations with a natural key). Exactly-once adds complexity but removes the need for application-level deduplication entirely.
3. Idempotent Producer (enable.idempotence=true) — How It Works Internally
When you set enable.idempotence=true, the Kafka broker assigns the producer a Producer ID (PID) during initialization. Every batch of records is stamped with the PID and a per-partition sequence number that monotonically increases. The broker maintains the last committed sequence number per (PID, partition) pair and silently discards any batch whose sequence number is already seen — eliminating duplicates caused by producer retries transparently.
This requires acks=all (so the leader waits for all in-sync replicas) and constrains max.in.flight.requests.per.connection to ≤ 5 to preserve ordering even when batches are retried out of order.
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
enable.idempotence: true
max.in.flight.requests.per.connection: 5
acks: all
retries: 2147483647
delivery.timeout.ms: 120000
With this configuration alone (no transactions), you get exactly-once delivery to a single topic-partition per send. The idempotent producer is the mandatory building block for Kafka transactions — you cannot use transactions without enabling idempotence first.
| Producer Property | Required Value | Reason |
|---|---|---|
enable.idempotence |
true | Enables PID + sequence deduplication |
acks |
all | All ISR replicas must acknowledge |
retries |
Integer.MAX_VALUE | Retry indefinitely within delivery.timeout.ms |
max.in.flight.requests |
≤ 5 | Prevents out-of-order delivery on retry |
4. Producer Transactions in Kafka (beginTransaction, send, commitTransaction)
Producer transactions extend idempotence to span multiple partitions and topics atomically. The producer interacts with a Transaction Coordinator on the broker — a partition-leader of the internal __transaction_state topic. The flow is: initTransactions() → beginTransaction() → N × send() → commitTransaction() or abortTransaction(). All sends within one transaction are either all committed (visible to read_committed consumers) or all aborted (invisible).
Each transactional producer requires a unique transactional.id. If a producer with the same ID restarts, the broker fences the old instance by bumping the epoch, preventing zombie producers from committing stale transactions.
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-producer-");
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
DefaultKafkaProducerFactory<String, Object> factory =
new DefaultKafkaProducerFactory<>(config);
factory.setTransactionIdPrefix("order-tx-");
return factory;
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager() {
return new KafkaTransactionManager<>(producerFactory());
}
}
The setTransactionIdPrefix call is important: Spring Kafka appends a suffix (e.g., instance index) to ensure uniqueness across application replicas, preventing producer fencing in multi-instance deployments.
5. Spring Boot KafkaTransactionManager Configuration
Spring Kafka's KafkaTransactionManager integrates with Spring's @Transactional infrastructure. Annotating a service method with @Transactional("kafkaTransactionManager") causes Spring to call beginTransaction() before the method and commitTransaction() (or abortTransaction() on exception) after it — just like a JPA transaction manager, but for Kafka.
@Service
@Slf4j
public class OrderService {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final OrderRepository orderRepository;
@Transactional("kafkaTransactionManager")
public void processOrder(Order order) {
// Save to DB and publish event atomically
orderRepository.save(order);
OrderCreatedEvent event = OrderCreatedEvent.builder()
.orderId(order.getId())
.customerId(order.getCustomerId())
.totalAmount(order.getTotalAmount())
.timestamp(Instant.now())
.build();
kafkaTemplate.send("orders.created", order.getId().toString(), event);
kafkaTemplate.send("notifications.email", order.getCustomerId(), event);
log.info("Order {} published transactionally", order.getId());
}
}
Both send() calls are wrapped in a single Kafka transaction. If either the DB save or the second send() throws, Spring aborts the transaction and neither message becomes visible to downstream consumers.
Important: KafkaTransactionManager manages only the Kafka transaction. To also wrap the JPA save in the same logical unit of work, use ChainedKafkaTransactionManager (deprecated in Spring Kafka 3.x) or the Transactional Outbox pattern (Section 6) which avoids the dual-write problem entirely.
spring:
kafka:
producer:
transaction-id-prefix: order-tx-
properties:
transaction.timeout.ms: 60000 # broker-side transaction timeout
max.block.ms: 10000 # max wait for send() metadata
listener:
ack-mode: manual_immediate
concurrency: 3
6. Transactional Outbox Pattern with Kafka
The dual-write problem: you want to update a database record and publish a Kafka event atomically. A native Kafka transaction only covers Kafka — it cannot coordinate with a JPA/JDBC transaction. If the application crashes between the DB commit and the Kafka send, the event is lost; if it crashes after the Kafka send but before the DB commit, you have a phantom event.
The Transactional Outbox pattern resolves this by writing the event payload to an outbox_events table within the same DB transaction as the business entity. A background publisher polls for PENDING rows and publishes them to Kafka, then marks them PUBLISHED. Alternatively, use Debezium CDC to stream outbox rows directly from the database changelog — eliminating the polling overhead.
@Entity
@Table(name = "outbox_events")
@Data
@Builder
public class OutboxEvent {
@Id
@GeneratedValue(strategy = GenerationType.UUID)
private String id;
@Column(nullable = false)
private String aggregateType;
@Column(nullable = false)
private String aggregateId;
@Column(nullable = false)
private String eventType;
@Column(columnDefinition = "TEXT", nullable = false)
private String payload;
@Column(nullable = false)
@Enumerated(EnumType.STRING)
private OutboxStatus status; // PENDING, PUBLISHED, FAILED
@Column(nullable = false)
private Instant createdAt;
private Instant publishedAt;
private int retryCount;
}
@Component
@Slf4j
public class OutboxPublisher {
private final OutboxEventRepository outboxRepo;
private final KafkaTemplate<String, Object> kafkaTemplate;
@Scheduled(fixedDelay = 1000)
@Transactional
public void publishPendingEvents() {
List<OutboxEvent> events = outboxRepo
.findTop100ByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);
events.forEach(event -> {
try {
kafkaTemplate.send(
resolveTopicName(event.getAggregateType()),
event.getAggregateId(),
event.getPayload()
);
event.setStatus(OutboxStatus.PUBLISHED);
event.setPublishedAt(Instant.now());
} catch (Exception e) {
event.setRetryCount(event.getRetryCount() + 1);
if (event.getRetryCount() >= 5) {
event.setStatus(OutboxStatus.FAILED);
}
log.error("Failed to publish outbox event {}", event.getId(), e);
}
});
outboxRepo.saveAll(events);
}
}
The outbox publisher guarantees at-least-once delivery to Kafka. Combine it with the idempotent producer on the Kafka side and an idempotent consumer (e.g., checking event_id in a processed-events table) to achieve end-to-end exactly-once semantics without Kafka transactions.
7. Consumer Side: Read-Committed Isolation Level
Producer transactions are only half the story. A Kafka consumer using the default read_uncommitted isolation will read messages from aborted transactions — defeating the purpose of transactions entirely. Set isolation.level=read_committed to ensure consumers only see records from committed transactions. Uncommitted records are buffered by the broker and released only after the producer commits.
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.events");
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
With read_committed isolation, the consumer's Last Stable Offset (LSO) — the highest offset safe to read — is bounded by the earliest open transaction. If a slow or stuck producer holds an open transaction, consumer lag will grow even if many newer committed records exist. Monitor LSO lag separately from high-watermark lag in production.
8. Offset Management Strategies (auto vs manual commit)
Kafka consumers track progress by committing offsets. Choosing the wrong commit strategy breaks your delivery guarantees even if the producer side is perfectly configured.
| Strategy | Config | Risk | Best For |
|---|---|---|---|
| Auto Commit | enable.auto.commit=true |
Commits before processing completes — at-most-once | Metrics / analytics |
| Manual Immediate | AckMode.MANUAL_IMMEDIATE |
Must explicitly call ack.acknowledge() |
EOS critical paths |
| Batch Manual | AckMode.MANUAL |
Commit after batch — at-least-once | Bulk DB inserts |
| Record | AckMode.RECORD |
Commit per record — high overhead | Low-throughput critical events |
@Component
@Slf4j
public class OrderConsumer {
@KafkaListener(
topics = "orders.created",
groupId = "order-processor",
containerFactory = "kafkaListenerContainerFactory"
)
public void consumeOrder(
ConsumerRecord<String, OrderCreatedEvent> record,
Acknowledgment ack) {
try {
OrderCreatedEvent event = record.value();
log.info("Processing order {} from partition {} offset {}",
event.getOrderId(), record.partition(), record.offset());
processOrderEvent(event);
ack.acknowledge(); // Commit offset only on success
} catch (NonRetryableException e) {
log.error("Non-retryable error, sending to DLQ", e);
ack.acknowledge(); // Acknowledge to skip poison pill
} catch (Exception e) {
log.error("Retryable error processing order", e);
// Do NOT acknowledge - message will be redelivered
throw e;
}
}
}
For retryable errors, Spring Kafka's DefaultErrorHandler with exponential backoff handles re-delivery gracefully. Configure a Dead Letter Topic (DLT) for messages that exhaust all retries to prevent a single poison-pill message from stalling the entire consumer group.
9. Monitoring Transactions (Producer Metrics, Broker Metrics, Kafka UI)
Production reliability of EOS depends on visibility into transaction health. Instrument both the producer and broker, and build alerting rules before going live.
@Configuration
public class KafkaMetricsConfig {
@Bean
public MeterRegistryCustomizer<MeterRegistry> kafkaMetrics(
KafkaTemplate<String, Object> kafkaTemplate) {
return registry -> {
// Transaction success/failure counters
Counter.builder("kafka.transactions.committed")
.description("Number of committed Kafka transactions")
.register(registry);
Counter.builder("kafka.transactions.aborted")
.description("Number of aborted Kafka transactions")
.register(registry);
// Producer retry gauge
Gauge.builder("kafka.producer.record.retry.total",
kafkaTemplate, t -> getProducerMetric(t, "record-retry-total"))
.description("Total producer record retries")
.register(registry);
};
}
}
The most critical metrics to watch in production are:
| Metric | Source | Alert Threshold |
|---|---|---|
record-error-rate |
Producer JMX | > 0 for > 30s |
txn-abort-rate |
Producer JMX | > 1% of txn rate |
consumer lag (LSO) |
Broker / Kafka UI | Growing lag |
request-latency-avg |
Producer JMX | p99 > 500ms |
record-retry-total |
Producer JMX | Spike above baseline |
management:
endpoints:
web:
exposure:
include: health, metrics, prometheus
metrics:
tags:
application: ${spring.application.name}
export:
prometheus:
enabled: true
endpoint:
health:
show-details: always
10. Common Pitfalls and Anti-Patterns
Even with idempotence and transactions configured correctly, these common mistakes cause subtle exactly-once violations in production:
- Sharing a transactional producer across threads: A
KafkaProducerinstance is not thread-safe for transactions. Spring Kafka handles this via thread-bound transaction contexts — never inject the raw producer directly. - Using
read_uncommittedon critical consumers: Default isolation reads aborted transactions. Always setisolation.level=read_committedwhen using producer transactions. - Catching and swallowing exceptions inside
@Transactional: If you catch an exception without rethrowing it, Spring considers the transaction successful and commits — even though your business logic failed. - Non-unique
transactional.idacross replicas: Two producer instances with the same ID will fence each other. Use Spring Kafka'ssetTransactionIdPrefixcombined with a unique instance suffix (e.g., pod ordinal in Kubernetes). - Long-running transactions: Transactions that exceed
transaction.timeout.msare aborted by the broker. Keep transactions as short as possible — do not perform remote HTTP calls inside a transaction boundary. - Ignoring LSO-based consumer lag: With
read_committed, consumer lag is measured against LSO not the high watermark. Tools that report HWM-based lag will underreport real lag when open transactions exist.
// BAD - transaction commits even though processing failed
@Transactional("kafkaTransactionManager")
public void badMethod(Order order) {
try {
kafkaTemplate.send("orders.created", order.getId().toString(), order);
riskyBusinessLogic(order); // throws RuntimeException
} catch (Exception e) {
log.error("Something went wrong", e);
// Exception is swallowed - Kafka transaction WILL commit!
}
}
// GOOD - let the exception propagate to trigger rollback
@Transactional("kafkaTransactionManager")
public void goodMethod(Order order) {
kafkaTemplate.send("orders.created", order.getId().toString(), order);
riskyBusinessLogic(order); // exception propagates, transaction aborted
}
11. Production Checklist
Before deploying Kafka exactly-once semantics to production, verify every item in this checklist:
| Area | Checklist Item | Config / Action |
|---|---|---|
| Producer | enable.idempotence=true |
Verify via /actuator/metrics |
| Producer | Unique transactional.id per replica |
setTransactionIdPrefix + pod ordinal |
| Broker | min.insync.replicas ≥ 2 |
Topic-level or broker default |
| Broker | Replication factor ≥ 3 | Critical topics only |
| Consumer | isolation.level=read_committed |
All EOS consumers |
| Consumer | enable.auto.commit=false |
MANUAL_IMMEDIATE ack mode |
| Consumer | Dead Letter Topic configured | DefaultErrorHandler + DLT |
| Monitoring | LSO-based consumer lag alert | Grafana + Prometheus alert rule |
| Monitoring | Transaction abort rate alert | txn-abort-rate > 0 for 5 min |
| Testing | Integration test with Testcontainers Kafka | Simulate broker failures and retries |
@SpringBootTest
@Testcontainers
class OrderServiceEosIntegrationTest {
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.6.0"))
.withKraft();
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
}
@Autowired OrderService orderService;
@Test
void shouldPublishExactlyOnceOnRetry() throws Exception {
Order order = Order.builder()
.id(UUID.randomUUID())
.customerId("cust-42")
.totalAmount(new BigDecimal("99.99"))
.build();
orderService.processOrder(order);
// Consume from orders.created and assert exactly one record
ConsumerRecords<String, ?> records = pollKafka("orders.created", 1);
assertThat(records.count()).isEqualTo(1);
}
}