Outbox/Inbox/Idempotency Implementation - Audit Trail Platform (ATP)¶
Exactly-once intent in an at-least-once world — ATP implements transactional outbox for atomic state + event persistence, inbox pattern for consumer deduplication, and idempotency gates for duplicate request handling, ensuring reliable message processing without dual-write problems or duplicate side effects.
📋 Documentation Generation Plan¶
This document will be generated in 16 cycles. Current progress:
| Cycle | Topics | Estimated Lines | Status |
|---|---|---|---|
| Cycle 1 | Outbox/Inbox/Idempotency Philosophy (1-2) | ~3,000 | ⏳ Not Started |
| Cycle 2 | Transactional Outbox Pattern (3-4) | ~4,000 | ⏳ Not Started |
| Cycle 3 | Outbox Table Schema & Implementation (5-6) | ~3,500 | ⏳ Not Started |
| Cycle 4 | Outbox Relay Worker (7-8) | ~3,500 | ⏳ Not Started |
| Cycle 5 | Inbox Pattern & Deduplication (9-10) | ~3,500 | ⏳ Not Started |
| Cycle 6 | Inbox Table Schema & Implementation (11-12) | ~3,000 | ⏳ Not Started |
| Cycle 7 | Idempotency Gate for HTTP APIs (13-14) | ~4,000 | ⏳ Not Started |
| Cycle 8 | Idempotent Message Consumers (15-16) | ~3,500 | ⏳ Not Started |
| Cycle 9 | MassTransit Outbox Integration (17-18) | ~3,000 | ⏳ Not Started |
| Cycle 10 | MassTransit Inbox Integration (19-20) | ~3,000 | ⏳ Not Started |
| Cycle 11 | Idempotency Key Generation & Storage (21-22) | ~3,000 | ⏳ Not Started |
| Cycle 12 | Conflict Detection & Resolution (23-24) | ~3,000 | ⏳ Not Started |
| Cycle 13 | Distributed Idempotency (25-26) | ~2,500 | ⏳ Not Started |
| Cycle 14 | Testing Outbox/Inbox/Idempotency (27-28) | ~2,500 | ⏳ Not Started |
| Cycle 15 | Monitoring & Observability (29-30) | ~3,000 | ⏳ Not Started |
| Cycle 16 | Best Practices & Troubleshooting (31-32) | ~3,000 | ⏳ Not Started |
Total Estimated Lines: ~52,000
Purpose & Scope¶
This document provides the complete implementation guide for transactional outbox, inbox deduplication, and idempotency patterns in the Audit Trail Platform (ATP), covering database schemas, C# implementation, MassTransit integration, relay workers, duplicate detection, conflict resolution, and operational best practices.
Why Outbox/Inbox/Idempotency for ATP?
- Exactly-Once Semantics: Despite at-least-once delivery guarantees, achieve exactly-once processing
- No Dual-Write Problem: Atomic persistence of domain state + events (transactional outbox)
- No Duplicate Side Effects: Idempotent consumers via inbox deduplication
- Reliable Event Publishing: Events persisted before publishing (no event loss)
- Deterministic Replay: Safely replay events from DLQ or event store
- Audit Trail Integrity: All operations idempotent (critical for compliance)
- Performance: Minimal latency overhead (<10ms for idempotency checks)
The Dual-Write Problem (Why Outbox?)
// ❌ ANTI-PATTERN: Dual-write (not atomic)
await _repository.SaveAsync(auditEvent); // Database write
await _bus.PublishAsync(eventPublished); // Message publish
Problems:
1. If database succeeds but publish fails → event lost
2. If publish succeeds but database rollback → duplicate event
3. No atomicity guarantee across systems
Solution: Transactional Outbox
// ✅ PATTERN: Transactional Outbox (atomic)
using var transaction = await _unitOfWork.BeginTransactionAsync();
await _repository.SaveAsync(auditEvent); // Database write
await _outbox.SaveAsync(eventPublished); // Outbox write (same tx)
await transaction.CommitAsync(); // Atomic commit
// Separate process (relay worker) publishes from outbox
_outboxRelay.PublishPendingAsync();
ATP Implementation Components
- Outbox Table: Persists unpublished events with domain state
- Outbox Relay Worker: Background service polling outbox, publishing to Service Bus
- Inbox Table: Tracks processed message IDs to prevent duplicate handling
- Idempotency Gate: HTTP middleware checking for duplicate requests
- Idempotent Handlers: Message consumers that safely handle duplicates
- MassTransit Outbox/Inbox: Built-in support for Entity Framework or custom stores
- Redis Idempotency Cache: Fast duplicate detection (optional)
Detailed Cycle Plan¶
CYCLE 1: Outbox/Inbox/Idempotency Philosophy (~3,000 lines)¶
Topic 1: The Dual-Write Problem & Outbox Pattern¶
What will be covered: - Dual-Write Problem Explained - Scenario: Save to database + publish to message broker - Problem: Two separate transactions (not atomic) - Failure Cases: 1. DB commit succeeds, publish fails → Event lost 2. Publish succeeds, DB commit fails → Duplicate event 3. DB and publish succeed, but app crashes before commit confirmation → Uncertainty
-
Transactional Outbox Solution
-
Outbox Benefits
- Atomicity: State + event committed together (no dual-write)
- Reliability: Events never lost (persisted before publishing)
- Resilience: Relay worker can fail and restart (events queued)
- Ordering: Events published in commit order
-
Audit Trail: All events persisted (even if publish fails)
-
Outbox Trade-Offs
- Latency: Event publishing delayed by relay polling interval
- Complexity: Additional table, background worker, monitoring
-
Storage: Outbox table grows (requires cleanup job)
-
When to Use Outbox
- Critical events that must not be lost (audit events, financial transactions)
- When domain state and events must be atomic (consistency)
- When event ordering matters
- When events must be auditable (persistent record)
Code Examples: - Dual-write problem code (anti-pattern) - Transactional outbox pattern (solution) - Outbox vs. direct publish comparison
Diagrams: - Dual-write problem failure scenarios - Transactional outbox flow - Atomicity guarantee
Deliverables: - Dual-write problem analysis - Outbox pattern overview - Trade-off analysis
Topic 2: Inbox Pattern & Idempotency¶
What will be covered: - At-Least-Once Delivery Problem - Message brokers guarantee: Message delivered at least once - Problem: Message can be delivered multiple times (network retries, broker fail-over) - Failure Cases: 1. Consumer processes message, crashes before ACK → Re-delivery, duplicate processing 2. Consumer ACKs message, but ACK lost → Re-delivery, already processed 3. Broker duplicates message (rare but possible) → Duplicate processing
-
Inbox Pattern Solution
-
Inbox Benefits
- Exactly-Once Processing: Despite at-least-once delivery
- Idempotent Consumers: Safe to retry without side effects
- Deterministic Replay: Can replay events from DLQ or event store
-
Audit Trail: Track all processed messages
-
Idempotency Principle
-
Idempotent Operations
- Naturally Idempotent: SET x = 5, DELETE WHERE id = 123
- Idempotent with Key: UPSERT (INSERT or UPDATE based on key)
-
Not Idempotent: x = x + 1, INSERT without key check
-
Making Operations Idempotent
// ❌ NOT Idempotent public async Task ProcessEvent(OrderCreated evt) { var order = new Order { Id = evt.OrderId, Total = evt.Total }; await _repository.AddAsync(order); // Duplicate key error on retry await _emailService.SendAsync($"Order {evt.OrderId} created"); // Duplicate email } // ✅ Idempotent public async Task ProcessEvent(OrderCreated evt) { // Check inbox first if (await _inbox.HasProcessedAsync(evt.MessageId)) { return; // Already processed } // Upsert (idempotent) var order = new Order { Id = evt.OrderId, Total = evt.Total }; await _repository.UpsertAsync(order); // Idempotent email (check if already sent) if (!await _emailLog.HasSentAsync(evt.OrderId)) { await _emailService.SendAsync($"Order {evt.OrderId} created"); await _emailLog.MarkSentAsync(evt.OrderId); } // Record in inbox await _inbox.MarkProcessedAsync(evt.MessageId); }
Code Examples: - At-least-once delivery scenarios - Inbox pattern implementation - Idempotent vs. non-idempotent code - Making operations idempotent
Diagrams: - At-least-once delivery problem - Inbox pattern flow - Idempotency principle
Deliverables: - Inbox pattern overview - Idempotency fundamentals - Operation patterns
CYCLE 2: Transactional Outbox Pattern (~4,000 lines)¶
Topic 3: Outbox Pattern Architecture¶
What will be covered: - Outbox Components
┌─────────────────────────────────────┐
│ Use Case / Command Handler │
├─────────────────────────────────────┤
│ Domain Model (Aggregate) │
├─────────────────────────────────────┤
│ Repository + Outbox │
│ ┌────────────┬──────────────────┐ │
│ │ Aggregates │ Outbox Table │ │
│ │ Table │ (Events) │ │
│ └────────────┴──────────────────┘ │
│ Same Transaction │
├─────────────────────────────────────┤
│ Outbox Relay Worker │
│ (Background Service) │
├─────────────────────────────────────┤
│ MassTransit + Azure Service Bus │
└─────────────────────────────────────┘
-
Outbox Workflow
sequenceDiagram participant API as REST API participant UC as Use Case participant AGG as Aggregate participant REPO as Repository participant DB as Database participant OB as Outbox Table participant RL as Relay Worker participant BUS as Service Bus participant SUB as Subscribers API->>UC: Execute Command UC->>AGG: Apply Business Logic AGG->>AGG: Raise Domain Event UC->>REPO: Save Aggregate Note over REPO,OB: Begin Transaction REPO->>DB: UPDATE/INSERT Aggregate REPO->>OB: INSERT Event(s) Note over REPO,OB: Commit Transaction RL->>OB: Poll Unpublished Events RL->>BUS: Publish Event BUS-->>RL: Confirm Published RL->>OB: Mark as Published BUS->>SUB: Deliver Event (at-least-once) SUB->>SUB: Process (idempotent)Hold "Alt" / "Option" to enable pan & zoom -
ATP Outbox Strategy
- Per-Service Outbox: Each service has its own outbox table
- Shared Database: Outbox in same database as domain tables
- Polling Interval: 100-500ms (configurable)
- Batch Publishing: Publish multiple events in batch (performance)
- Retry Strategy: Exponential backoff for failed publishes
- Event Ordering: FIFO per partition key (tenantId)
Code Examples: - Outbox architecture diagram - Complete workflow sequence - ATP outbox configuration
Diagrams: - Outbox component architecture - Detailed outbox flow - Transaction boundaries
Deliverables: - Outbox architecture overview - Workflow documentation - Configuration guide
Topic 4: Outbox vs. Direct Publishing¶
What will be covered: - Comparison Matrix | Aspect | Direct Publishing | Transactional Outbox | |--------|-------------------|----------------------| | Atomicity | ❌ Two separate transactions | ✅ Single transaction | | Event Loss | ⚠️ Possible if publish fails | ✅ Never (persisted first) | | Duplicate Events | ⚠️ Possible if DB rollback | ✅ Prevented by relay deduplication | | Latency | ✅ Immediate (0-10ms) | ⚠️ Delayed by relay poll (100-500ms) | | Complexity | ✅ Simple (one call) | ⚠️ Complex (table, worker, monitoring) | | Event Ordering | ⚠️ Not guaranteed | ✅ Guaranteed (commit order) | | Audit Trail | ❌ No persistence | ✅ Persistent record |
- When to Use Each
- Direct Publishing: Non-critical events, notifications, fire-and-forget
-
Transactional Outbox: Critical events (audit, financial), ordered events, high reliability
-
ATP Decision
- ✅ Outbox for:
audit.appended,projection.updated,integrity.verified,export.completed -
⚠️ Direct for: Internal telemetry events, low-priority notifications (if any)
-
Hybrid Approach
- Critical path: Outbox
- Non-critical path: Direct publishing
- Feature flag to control (
UseOutboxForAllEvents)
Code Examples: - Direct publish code example - Outbox publish code example - Comparison benchmarks - Feature flag configuration
Diagrams: - Direct vs. outbox comparison - Decision tree (when to use each)
Deliverables: - Comparison matrix - Decision criteria - Hybrid implementation guide
CYCLE 3: Outbox Table Schema & Implementation (~3,500 lines)¶
Topic 5: Outbox Database Schema¶
What will be covered: - Outbox Table Schema (SQL Server)
CREATE TABLE audit.Outbox (
Id BIGINT IDENTITY(1,1) PRIMARY KEY,
EventId VARCHAR(50) NOT NULL UNIQUE, -- ULID/UUIDv7
AggregateType VARCHAR(64) NOT NULL,
AggregateId VARCHAR(64) NOT NULL,
TenantId VARCHAR(64) NOT NULL,
EventType VARCHAR(128) NOT NULL, -- audit.appended.v1
SchemaVersion INT NOT NULL DEFAULT 1,
PayloadJson NVARCHAR(MAX) NOT NULL, -- Event payload
Headers NVARCHAR(MAX) NULL, -- Message headers (JSON)
IdempotencyKey NVARCHAR(128) NULL,
CorrelationId VARCHAR(64) NULL,
CausationId VARCHAR(64) NULL,
Traceparent NVARCHAR(64) NULL,
CreatedAtUtc DATETIME2(3) NOT NULL DEFAULT SYSUTCDATETIME(),
PublishedAtUtc DATETIME2(3) NULL,
Attempts INT NOT NULL DEFAULT 0,
LastAttemptAtUtc DATETIME2(3) NULL,
LastError NVARCHAR(MAX) NULL,
CONSTRAINT UQ_Outbox_EventId UNIQUE (EventId),
INDEX IX_Outbox_Unpublished (PublishedAtUtc, TenantId) WHERE PublishedAtUtc IS NULL,
INDEX IX_Outbox_Tenant (TenantId, CreatedAtUtc)
);
- Schema Design Principles
- EventId: Unique identifier (ULID for sortable, time-ordered IDs)
- Partition Key: TenantId for Azure Service Bus partitioning
- PublishedAtUtc: NULL = unpublished, NOT NULL = published
- Attempts & Error: Retry tracking and debugging
-
Headers: Correlation, causation, trace context
-
MongoDB Schema (Alternative)
public class OutboxMessage { public string Id { get; set; } // EventId (ULID) public string AggregateType { get; set; } public string AggregateId { get; set; } public string TenantId { get; set; } public string EventType { get; set; } public int SchemaVersion { get; set; } public BsonDocument Payload { get; set; } public Dictionary<string, string> Headers { get; set; } public string IdempotencyKey { get; set; } public string CorrelationId { get; set; } public string CausationId { get; set; } public string Traceparent { get; set; } public DateTime CreatedAtUtc { get; set; } public DateTime? PublishedAtUtc { get; set; } public int Attempts { get; set; } public DateTime? LastAttemptAtUtc { get; set; } public string LastError { get; set; } } -
Outbox Retention & Cleanup
- Retain published events for 7 days (debugging, replay)
- Automatic cleanup job (daily)
- Archive to blob storage if needed (compliance audit trail)
Code Examples: - Complete outbox table schema (SQL + MongoDB) - FluentMigrator migration for outbox table - Cleanup job implementation
Diagrams: - Outbox table schema diagram - Retention policy flow
Deliverables: - Outbox table schemas (SQL + MongoDB) - Migration scripts - Cleanup job
Topic 6: Outbox Repository Implementation¶
What will be covered: - IOutboxRepository Interface
public interface IOutboxRepository
{
Task SaveAsync(OutboxMessage message);
Task<IReadOnlyList<OutboxMessage>> GetUnpublishedAsync(int batchSize = 100);
Task MarkAsPublishedAsync(string eventId, DateTime publishedAtUtc);
Task IncrementAttemptAsync(string eventId, string error);
Task<int> DeletePublishedOlderThanAsync(DateTime threshold);
}
-
NHibernate Implementation
public class OutboxRepository : IOutboxRepository { private readonly IUnitOfWork _unitOfWork; public async Task SaveAsync(OutboxMessage message) { var session = _unitOfWork.GetSession(); await session.SaveAsync(message); // Flushed as part of unit of work commit } public async Task<IReadOnlyList<OutboxMessage>> GetUnpublishedAsync(int batchSize = 100) { var session = _unitOfWork.GetSession(); return await session.Query<OutboxMessage>() .Where(m => m.PublishedAtUtc == null) .OrderBy(m => m.CreatedAtUtc) // FIFO .ThenBy(m => m.TenantId) // Partition ordering .Take(batchSize) .ToListAsync(); } public async Task MarkAsPublishedAsync(string eventId, DateTime publishedAtUtc) { var session = _unitOfWork.GetSession(); await session.Query<OutboxMessage>() .Where(m => m.EventId == eventId) .UpdateAsync(m => new OutboxMessage { PublishedAtUtc = publishedAtUtc }); } public async Task IncrementAttemptAsync(string eventId, string error) { var session = _unitOfWork.GetSession(); var message = await session.GetAsync<OutboxMessage>(eventId); message.Attempts++; message.LastAttemptAtUtc = DateTime.UtcNow; message.LastError = error; await session.UpdateAsync(message); } } -
FluentNHibernate Mapping
public class OutboxMessageMap : ClassMap<OutboxMessage> { public OutboxMessageMap() { Schema("audit"); Table("Outbox"); Id(x => x.Id).GeneratedBy.Identity(); Map(x => x.EventId).Not.Nullable().UniqueKey("UQ_EventId"); Map(x => x.AggregateType).Not.Nullable().Length(64); Map(x => x.AggregateId).Not.Nullable().Length(64); Map(x => x.TenantId).Not.Nullable().Length(64).Index("IX_Outbox_Tenant"); Map(x => x.EventType).Not.Nullable().Length(128); Map(x => x.SchemaVersion).Not.Nullable(); Map(x => x.PayloadJson).CustomSqlType("NVARCHAR(MAX)").Not.Nullable(); Map(x => x.Headers).CustomSqlType("NVARCHAR(MAX)").Nullable(); Map(x => x.IdempotencyKey).Length(128).Nullable(); Map(x => x.CorrelationId).Length(64).Nullable(); Map(x => x.CausationId).Length(64).Nullable(); Map(x => x.Traceparent).Length(64).Nullable(); Map(x => x.CreatedAtUtc).Not.Nullable(); Map(x => x.PublishedAtUtc).Nullable().Index("IX_Outbox_Unpublished"); Map(x => x.Attempts).Not.Nullable(); Map(x => x.LastAttemptAtUtc).Nullable(); Map(x => x.LastError).CustomSqlType("NVARCHAR(MAX)").Nullable(); } }
Code Examples: - Complete IOutboxRepository implementation - NHibernate mapping - MongoDB repository (alternative) - Unit tests for repository
Diagrams: - Repository architecture - Data access flow
Deliverables: - Outbox repository (SQL + MongoDB) - FluentNHibernate mappings - Unit tests
CYCLE 4: Outbox Relay Worker (~3,500 lines)¶
Topic 7: Relay Worker Implementation¶
What will be covered: - Relay Worker Architecture
public class OutboxRelayWorker : BackgroundService
{
private readonly IOutboxRepository _outboxRepo;
private readonly IBus _bus;
private readonly ILogger<OutboxRelayWorker> _logger;
private readonly OutboxRelayOptions _options;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Outbox Relay Worker starting");
while (!stoppingToken.IsCancellationRequested)
{
try
{
await ProcessBatchAsync(stoppingToken);
// Wait before next poll
await Task.Delay(_options.PollingIntervalMs, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Outbox relay worker error");
await Task.Delay(_options.ErrorBackoffMs, stoppingToken);
}
}
}
private async Task ProcessBatchAsync(CancellationToken cancellationToken)
{
// Get unpublished events (FIFO, batched)
var messages = await _outboxRepo.GetUnpublishedAsync(_options.BatchSize);
if (!messages.Any())
{
return; // No work
}
_logger.LogInformation("Processing {Count} outbox messages", messages.Count);
foreach (var message in messages)
{
if (cancellationToken.IsCancellationRequested)
{
break;
}
await PublishMessageAsync(message, cancellationToken);
}
}
private async Task PublishMessageAsync(OutboxMessage message, CancellationToken cancellationToken)
{
try
{
// Deserialize payload
var eventPayload = JsonSerializer.Deserialize(message.PayloadJson, GetEventType(message.EventType));
// Set message headers
var sendContext = await _bus.GetSendEndpoint(new Uri($"queue:{message.EventType}"));
await sendContext.Send(eventPayload, ctx =>
{
ctx.MessageId = Guid.Parse(message.EventId);
ctx.CorrelationId = Guid.Parse(message.CorrelationId);
ctx.Headers.Set("TenantId", message.TenantId);
ctx.Headers.Set("IdempotencyKey", message.IdempotencyKey);
ctx.Headers.Set("SchemaVersion", message.SchemaVersion);
if (!string.IsNullOrEmpty(message.Traceparent))
{
ctx.Headers.Set("traceparent", message.Traceparent);
}
}, cancellationToken);
// Mark as published
await _outboxRepo.MarkAsPublishedAsync(message.EventId, DateTime.UtcNow);
_logger.LogInformation("Published event {EventId} of type {EventType}",
message.EventId, message.EventType);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to publish event {EventId}", message.EventId);
// Increment attempt counter
await _outboxRepo.IncrementAttemptAsync(message.EventId, ex.Message);
// Check max attempts
if (message.Attempts + 1 >= _options.MaxAttempts)
{
_logger.LogError("Event {EventId} exceeded max attempts ({MaxAttempts}), moving to DLQ",
message.EventId, _options.MaxAttempts);
await MoveToDeadLetterAsync(message);
}
}
}
}
-
Relay Worker Configuration
-
Graceful Shutdown
- Wait for current batch to complete
- Don't start new batches during shutdown
- Maximum shutdown wait: 30 seconds
Code Examples: - Complete OutboxRelayWorker implementation - Batch processing logic - Error handling and retry - Graceful shutdown
Diagrams: - Relay worker architecture - Batch processing flow - Error handling flow
Deliverables: - Relay worker implementation - Configuration guide - Operational procedures
Topic 8: Outbox Event Publishing¶
What will be covered: - Saving Events to Outbox
public class AuditEventService
{
private readonly IUnitOfWork _unitOfWork;
private readonly IAuditRepository _auditRepo;
private readonly IOutboxRepository _outboxRepo;
public async Task<string> AppendAuditEventAsync(AuditEvent auditEvent)
{
using var transaction = await _unitOfWork.BeginTransactionAsync();
try
{
// 1. Save domain entity
var auditRecord = auditEvent.ToAuditRecord();
await _auditRepo.SaveAsync(auditRecord);
// 2. Save event to outbox (same transaction)
var outboxMessage = new OutboxMessage
{
EventId = Ulid.NewUlid().ToString(),
AggregateType = "AuditEvent",
AggregateId = auditRecord.Id,
TenantId = auditEvent.TenantId,
EventType = "audit.appended.v1",
SchemaVersion = 1,
PayloadJson = JsonSerializer.Serialize(new AuditAppendedEvent
{
AuditRecordId = auditRecord.Id,
TenantId = auditEvent.TenantId,
OccurredAtUtc = auditEvent.OccurredAtUtc,
EventType = auditEvent.EventType
}),
IdempotencyKey = auditEvent.IdempotencyKey,
CorrelationId = auditEvent.CorrelationId,
Traceparent = Activity.Current?.Id,
CreatedAtUtc = DateTime.UtcNow
};
await _outboxRepo.SaveAsync(outboxMessage);
// 3. Commit transaction (atomic)
await transaction.CommitAsync();
return auditRecord.Id;
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}
- Event Serialization
- Use System.Text.Json (performance)
- Include type information for deserialization
- Compress large payloads (>10KB)
Code Examples: - Complete event saving workflow - Transaction management - Event serialization patterns
Diagrams: - Event saving flow - Transaction boundaries
Deliverables: - Event saving implementation - Transaction patterns - Serialization guide
CYCLE 5: Inbox Pattern & Deduplication (~3,500 lines)¶
Topic 9: Inbox Pattern Architecture¶
What will be covered: - Inbox Pattern Overview
Message Consumption Flow:
1. Receive message from broker
2. Check Inbox: Has MessageId been processed?
3. If YES → ACK message, skip processing (duplicate)
4. If NO → Process message
5. Record MessageId in Inbox
6. ACK message
- Inbox Deduplication Scope
- Global Inbox: All messages across all consumers (single table)
- Per-Consumer Inbox: Separate table per consumer (ATP uses this)
-
Per-Tenant Inbox: Partition by tenantId (performance)
-
Inbox TTL
- Default: 7 days (covers typical retry windows)
- Extended: 30 days for compliance (audit trail)
-
Automatic Cleanup: Background job removes expired entries
-
Inbox vs. Outbox | Aspect | Outbox (Producer) | Inbox (Consumer) | |--------|-------------------|------------------| | Purpose | Atomic publish guarantee | Duplicate detection | | Scope | Per-producer service | Per-consumer subscription | | Key | EventId (generated) | MessageId (from broker) | | Lifetime | Until published + 7 days | Until TTL expires | | Cleanup | Delete published events | Delete expired entries |
Code Examples: - Inbox pattern workflow - Scope comparison - TTL configuration
Diagrams: - Inbox pattern flow - Inbox vs. outbox comparison
Deliverables: - Inbox pattern overview - Scope decision guide - TTL policies
Topic 10: Inbox Consumer Pattern¶
What will be covered: - Idempotent Consumer Base Class
public abstract class IdempotentConsumer<TMessage> : IConsumer<TMessage>
where TMessage : class
{
private readonly IInboxRepository _inbox;
private readonly ILogger _logger;
protected IdempotentConsumer(IInboxRepository inbox, ILogger logger)
{
_inbox = inbox;
_logger = logger;
}
public async Task Consume(ConsumeContext<TMessage> context)
{
var messageId = context.MessageId?.ToString();
var consumerName = GetType().Name;
if (string.IsNullOrEmpty(messageId))
{
_logger.LogWarning("Message received without MessageId, generating fallback");
messageId = ComputeMessageIdFallback(context.Message);
}
// Check inbox for duplicate
if (await _inbox.HasProcessedAsync(consumerName, messageId))
{
_logger.LogInformation("Duplicate message {MessageId} detected, skipping", messageId);
// Track duplicate metric
_metrics.IncrementDuplicateCount(consumerName);
return; // Idempotent: ACK without processing
}
try
{
// Process message (implemented by derived class)
await ProcessMessageAsync(context);
// Mark as processed in inbox
await _inbox.MarkProcessedAsync(consumerName, messageId, DateTime.UtcNow);
_logger.LogInformation("Successfully processed message {MessageId}", messageId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process message {MessageId}", messageId);
throw; // Re-throw for MassTransit retry/DLQ handling
}
}
protected abstract Task ProcessMessageAsync(ConsumeContext<TMessage> context);
private string ComputeMessageIdFallback(TMessage message)
{
// Generate deterministic ID from message content
var json = JsonSerializer.Serialize(message);
var hash = SHA256.HashData(Encoding.UTF8.GetBytes(json));
return Convert.ToHexString(hash);
}
}
- Concrete Consumer Example
public class AuditAppendedConsumer : IdempotentConsumer<AuditAppendedEvent> { private readonly IProjectionRepository _projectionRepo; public AuditAppendedConsumer( IInboxRepository inbox, IProjectionRepository projectionRepo, ILogger<AuditAppendedConsumer> logger) : base(inbox, logger) { _projectionRepo = projectionRepo; } protected override async Task ProcessMessageAsync(ConsumeContext<AuditAppendedEvent> context) { var evt = context.Message; // Idempotent upsert (safe to call multiple times) await _projectionRepo.UpsertAuditRecordViewAsync(new AuditRecordView { AuditRecordId = evt.AuditRecordId, TenantId = evt.TenantId, OccurredAtUtc = evt.OccurredAtUtc, EventType = evt.EventType, Actor = evt.Actor, Resource = evt.Resource }); } }
Code Examples: - IdempotentConsumer base class (complete) - Concrete consumer implementations (all ATP consumers) - Fallback MessageId generation - Duplicate tracking
Diagrams: - Idempotent consumer flow - Inheritance hierarchy
Deliverables: - IdempotentConsumer base class - Consumer implementations - Usage patterns
CYCLE 6: Inbox Table Schema & Implementation (~3,000 lines)¶
Topic 11: Inbox Database Schema¶
What will be covered: - Inbox Table Schema (SQL Server)
CREATE TABLE audit.Inbox (
ConsumerName VARCHAR(128) NOT NULL,
MessageId VARCHAR(50) NOT NULL, -- ULID/UUIDv7 from broker
TenantId VARCHAR(64) NULL, -- For tenant-scoped cleanup
ProcessedAtUtc DATETIME2(3) NOT NULL DEFAULT SYSUTCDATETIME(),
ExpiresAtUtc DATETIME2(3) NOT NULL, -- TTL window (7-30 days)
ResultJson NVARCHAR(MAX) NULL, -- Optional: cache result for idempotent response
CONSTRAINT PK_Inbox PRIMARY KEY (ConsumerName, MessageId),
INDEX IX_Inbox_Expires (ExpiresAtUtc),
INDEX IX_Inbox_Tenant (TenantId, ProcessedAtUtc)
);
-
MongoDB Schema (Alternative)
public class InboxEntry { public string Id { get; set; } // ConsumerName:MessageId public string ConsumerName { get; set; } public string MessageId { get; set; } public string TenantId { get; set; } public DateTime ProcessedAtUtc { get; set; } public DateTime ExpiresAtUtc { get; set; } public BsonDocument Result { get; set; } } // Compound index collection.Indexes.CreateOne( Builders<InboxEntry>.IndexKeys .Ascending(x => x.ConsumerName) .Ascending(x => x.MessageId), new CreateIndexOptions { Unique = true } ); // TTL index collection.Indexes.CreateOne( Builders<InboxEntry>.IndexKeys.Ascending(x => x.ExpiresAtUtc), new CreateIndexOptions { ExpireAfter = TimeSpan.Zero } ); -
Inbox Partitioning Strategy
- Partition by ConsumerName (each consumer isolated)
- Sub-partition by TenantId (performance for multi-tenant cleanup)
- Composite primary key (ConsumerName, MessageId)
Code Examples: - Complete inbox table schema (SQL + MongoDB) - FluentMigrator migration - Partitioning configuration
Diagrams: - Inbox table schema - Partitioning strategy
Deliverables: - Inbox table schemas - Migration scripts - Partitioning guide
Topic 12: Inbox Repository Implementation¶
What will be covered: - IInboxRepository Interface
public interface IInboxRepository
{
Task<bool> HasProcessedAsync(string consumerName, string messageId);
Task MarkProcessedAsync(string consumerName, string messageId, DateTime processedAtUtc);
Task MarkProcessedWithResultAsync(string consumerName, string messageId, object result, DateTime processedAtUtc);
Task<TResult> GetResultAsync<TResult>(string consumerName, string messageId);
Task<int> DeleteExpiredAsync();
}
-
NHibernate Implementation
public class InboxRepository : IInboxRepository { private readonly IUnitOfWork _unitOfWork; private readonly InboxOptions _options; public async Task<bool> HasProcessedAsync(string consumerName, string messageId) { var session = _unitOfWork.GetSession(); var count = await session.Query<InboxEntry>() .Where(e => e.ConsumerName == consumerName && e.MessageId == messageId) .CountAsync(); return count > 0; } public async Task MarkProcessedAsync(string consumerName, string messageId, DateTime processedAtUtc) { var session = _unitOfWork.GetSession(); var entry = new InboxEntry { ConsumerName = consumerName, MessageId = messageId, ProcessedAtUtc = processedAtUtc, ExpiresAtUtc = processedAtUtc.AddDays(_options.TtlDays) }; await session.SaveAsync(entry); } public async Task<int> DeleteExpiredAsync() { var session = _unitOfWork.GetSession(); return await session.Query<InboxEntry>() .Where(e => e.ExpiresAtUtc < DateTime.UtcNow) .DeleteAsync(); } } -
Redis-Based Inbox (Performance)
public class RedisInboxRepository : IInboxRepository { private readonly IConnectionMultiplexer _redis; private readonly InboxOptions _options; public async Task<bool> HasProcessedAsync(string consumerName, string messageId) { var db = _redis.GetDatabase(); var key = $"inbox:{consumerName}:{messageId}"; return await db.KeyExistsAsync(key); } public async Task MarkProcessedAsync(string consumerName, string messageId, DateTime processedAtUtc) { var db = _redis.GetDatabase(); var key = $"inbox:{consumerName}:{messageId}"; var ttl = TimeSpan.FromDays(_options.TtlDays); await db.StringSetAsync(key, processedAtUtc.ToString("O"), ttl); } }
Code Examples: - Complete IInboxRepository implementations (SQL + Redis) - FluentNHibernate mapping - Cleanup job - Performance comparison
Diagrams: - Inbox repository architecture - SQL vs. Redis comparison
Deliverables: - Inbox repository implementations - Performance benchmarks - Selection guide
CYCLE 7: Idempotency Gate for HTTP APIs (~4,000 lines)¶
Topic 13: Idempotency-Key Header Middleware¶
What will be covered: - Idempotency-Key Header Standard
POST /api/v1/events HTTP/1.1
Content-Type: application/json
Idempotency-Key: acme-corp:app-123:evt-2025-10-30-001
X-Tenant-Id: acme-corp
{
"eventType": "user.login",
"occurredAt": "2025-10-30T12:34:56Z",
...
}
- Key Format:
{tenantId}:{sourceAppId}:{uniqueSequence} - Scope: Scoped to tenant (different tenants can use same key)
- TTL: 24 hours (idempotency window)
-
Optional: For GET/DELETE (safe methods), required for POST/PUT/PATCH
-
Idempotency Middleware
public class IdempotencyMiddleware { private readonly RequestDelegate _next; private readonly IIdempotencyGate _gate; private readonly ILogger<IdempotencyMiddleware> _logger; public async Task InvokeAsync(HttpContext context) { // Only apply to mutating operations if (!IsMutatingMethod(context.Request.Method)) { await _next(context); return; } var tenantId = context.GetTenantId(); var idempotencyKey = context.Request.Headers["Idempotency-Key"].ToString(); if (string.IsNullOrEmpty(idempotencyKey)) { // Idempotency-Key required for POST/PUT/PATCH context.Response.StatusCode = 400; await context.Response.WriteAsJsonAsync(new { Error = "IdempotencyKeyRequired", Message = "Idempotency-Key header is required for mutating operations" }); return; } // Check idempotency gate var gateResult = await _gate.TryReserveAsync(tenantId, idempotencyKey, context); if (gateResult.IsDuplicate) { _logger.LogInformation("Duplicate request detected for key {Key}", idempotencyKey); // Return cached result context.Response.StatusCode = gateResult.StatusCode; await context.Response.WriteAsync(gateResult.CachedResponse); return; } if (gateResult.IsConflict) { _logger.LogWarning("Idempotency conflict for key {Key}", idempotencyKey); // Return 409 Conflict context.Response.StatusCode = 409; await context.Response.WriteAsJsonAsync(new { Error = "IdempotencyConflict", Message = "Same idempotency key used with different request body" }); return; } // First-time request, proceed // Wrap response stream to capture result for caching var originalBodyStream = context.Response.Body; using var responseBody = new MemoryStream(); context.Response.Body = responseBody; await _next(context); // Cache successful result if (context.Response.StatusCode >= 200 && context.Response.StatusCode < 300) { responseBody.Seek(0, SeekOrigin.Begin); var responseText = await new StreamReader(responseBody).ReadToEndAsync(); await _gate.StoreResultAsync(tenantId, idempotencyKey, context.Response.StatusCode, responseText); } // Copy response back to original stream responseBody.Seek(0, SeekOrigin.Begin); await responseBody.CopyToAsync(originalBodyStream); } private bool IsMutatingMethod(string method) { return method == "POST" || method == "PUT" || method == "PATCH"; } } -
Idempotency Gate
public interface IIdempotencyGate { Task<IdempotencyGateResult> TryReserveAsync(string tenantId, string idempotencyKey, HttpContext context); Task StoreResultAsync(string tenantId, string idempotencyKey, int statusCode, string responseBody); } public class IdempotencyGateResult { public bool IsDuplicate { get; set; } public bool IsConflict { get; set; } public int StatusCode { get; set; } public string CachedResponse { get; set; } }
Code Examples: - Complete idempotency middleware - Idempotency gate implementation - Response caching - Conflict detection
Diagrams: - Middleware flow - Gate decision logic
Deliverables: - Idempotency middleware - Gate implementation - Integration guide
Topic 14: Idempotency Gate Implementation¶
What will be covered: - Idempotency Table Schema
CREATE TABLE audit.Idempotency (
TenantId VARCHAR(64) NOT NULL,
KeyHash CHAR(64) NOT NULL, -- SHA-256 of idempotency key
RecordId VARCHAR(26) NOT NULL, -- ULID of created resource
PayloadHash CHAR(64) NOT NULL, -- SHA-256 of request body
StatusCode INT NOT NULL,
ResponseBody NVARCHAR(MAX) NULL, -- Cached response
CreatedAtUtc DATETIME2(3) NOT NULL DEFAULT SYSUTCDATETIME(),
ExpiresAtUtc DATETIME2(3) NOT NULL, -- 24-hour TTL
CONSTRAINT PK_Idempotency PRIMARY KEY (TenantId, KeyHash),
INDEX IX_Idempotency_Expires (ExpiresAtUtc)
);
- Gate Implementation
public class SqlIdempotencyGate : IIdempotencyGate { private readonly IDbConnection _connection; private readonly ILogger<SqlIdempotencyGate> _logger; public async Task<IdempotencyGateResult> TryReserveAsync( string tenantId, string idempotencyKey, HttpContext context) { var keyHash = ComputeHash(tenantId, idempotencyKey); var payloadHash = await ComputePayloadHashAsync(context.Request); // MERGE query for atomic check-and-insert var sql = @" MERGE audit.Idempotency AS target USING (SELECT @TenantId AS TenantId, @KeyHash AS KeyHash) AS source ON (target.TenantId = source.TenantId AND target.KeyHash = source.KeyHash) WHEN NOT MATCHED THEN INSERT (TenantId, KeyHash, RecordId, PayloadHash, StatusCode, ExpiresAtUtc) VALUES (@TenantId, @KeyHash, @RecordId, @PayloadHash, 0, DATEADD(day, 1, SYSUTCDATETIME())) WHEN MATCHED AND target.PayloadHash = @PayloadHash THEN UPDATE SET ExpiresAtUtc = DATEADD(day, 1, SYSUTCDATETIME()) -- Refresh TTL WHEN MATCHED AND target.PayloadHash <> @PayloadHash THEN UPDATE SET RecordId = target.RecordId -- No-op to detect conflict OUTPUT $action, inserted.RecordId, inserted.StatusCode, inserted.ResponseBody;"; var result = await _connection.QuerySingleAsync(sql, new { TenantId = tenantId, KeyHash = keyHash, RecordId = Ulid.NewUlid().ToString(), PayloadHash = payloadHash }); if (result.Action == "INSERT") { // First-time request return new IdempotencyGateResult { IsDuplicate = false, IsConflict = false }; } else if (result.Action == "UPDATE" && result.PayloadHash == payloadHash) { // Exact duplicate return new IdempotencyGateResult { IsDuplicate = true, StatusCode = result.StatusCode, CachedResponse = result.ResponseBody }; } else { // Conflict (same key, different payload) return new IdempotencyGateResult { IsConflict = true }; } } private string ComputeHash(string tenantId, string key) { // Tenant-scoped hash (prevents cross-tenant collisions) var input = $"{tenantId}:{key}"; var hash = SHA256.HashData(Encoding.UTF8.GetBytes(input)); return Convert.ToHexString(hash); } private async Task<string> ComputePayloadHashAsync(HttpRequest request) { request.EnableBuffering(); // Allow multiple reads using var reader = new StreamReader(request.Body, leaveOpen: true); var body = await reader.ReadToEndAsync(); request.Body.Position = 0; // Reset for next reader var hash = SHA256.HashData(Encoding.UTF8.GetBytes(body)); return Convert.ToHexString(hash); } }
Code Examples: - Complete idempotency gate implementation (SQL + Redis) - MERGE query for atomic operations - Hash computation - Conflict detection
Diagrams: - Gate SQL MERGE flow - Hash computation
Deliverables: - Idempotency gate implementation - Table schema - Testing guide
CYCLE 8: Idempotent Message Consumers (~3,500 lines)¶
Topic 15: Idempotent Handler Patterns¶
What will be covered: - Upsert Pattern (Naturally Idempotent)
public class ProjectionUpdatedConsumer : IdempotentConsumer<ProjectionUpdatedEvent>
{
protected override async Task ProcessMessageAsync(ConsumeContext<ProjectionUpdatedEvent> context)
{
var evt = context.Message;
// Upsert is idempotent (same result if called multiple times)
await _repository.UpsertAsync(new AuditRecordView
{
Id = evt.AuditRecordId, // Primary key
TenantId = evt.TenantId,
EventType = evt.EventType,
OccurredAtUtc = evt.OccurredAtUtc,
UpdatedAtUtc = DateTime.UtcNow
});
}
}
-
Conditional Side-Effect Pattern
public class ExportCompletedConsumer : IdempotentConsumer<ExportCompletedEvent> { protected override async Task ProcessMessageAsync(ConsumeContext<ExportCompletedEvent> context) { var evt = context.Message; // Update export job status (idempotent) var job = await _exportRepo.GetAsync(evt.ExportJobId); job.Status = ExportStatus.Completed; job.CompletedAtUtc = DateTime.UtcNow; await _exportRepo.UpdateAsync(job); // Send webhook (check if already sent) if (!await _webhookLog.HasSentAsync(evt.ExportJobId)) { await _webhookService.SendAsync(new ExportCompletedWebhook { ExportJobId = evt.ExportJobId, DownloadUrl = evt.DownloadUrl }); await _webhookLog.MarkSentAsync(evt.ExportJobId); } } } -
Version-Based Idempotency (Optimistic Concurrency)
public class PolicyUpdatedConsumer : IdempotentConsumer<PolicyUpdatedEvent> { protected override async Task ProcessMessageAsync(ConsumeContext<PolicyUpdatedEvent> context) { var evt = context.Message; // Load entity with version var policy = await _policyRepo.GetAsync(evt.PolicyId); // Check if already applied (version-based idempotency) if (policy.Version >= evt.PolicyVersion) { _logger.LogInformation("Policy {PolicyId} already at version {Version}, skipping", evt.PolicyId, policy.Version); return; // Already processed } // Apply update policy.ApplyUpdate(evt); policy.Version = evt.PolicyVersion; await _policyRepo.UpdateAsync(policy); } }
Code Examples: - All idempotent handler patterns (upsert, conditional, version-based) - ATP-specific consumer implementations - Side-effect tracking
Diagrams: - Idempotent handler patterns - Version-based flow
Deliverables: - Idempotent handler library - Pattern catalog - Best practices
Topic 16: Inbox + Handler Integration¶
What will be covered: - Complete Idempotent Consumer Flow
public class AuditAppendedConsumer : IConsumer<AuditAppendedEvent>
{
private readonly IInboxRepository _inbox;
private readonly IProjectionRepository _projection;
private readonly IUnitOfWork _unitOfWork;
public async Task Consume(ConsumeContext<AuditAppendedEvent> context)
{
var messageId = context.MessageId?.ToString() ?? throw new InvalidOperationException("MessageId required");
var consumerName = GetType().Name;
var evt = context.Message;
using var transaction = await _unitOfWork.BeginTransactionAsync();
try
{
// 1. Check inbox (duplicate detection)
if (await _inbox.HasProcessedAsync(consumerName, messageId))
{
_logger.LogInformation("Duplicate message {MessageId}, skipping", messageId);
return; // Idempotent return
}
// 2. Process message (idempotent operations)
await _projection.UpsertAuditRecordViewAsync(new AuditRecordView
{
AuditRecordId = evt.AuditRecordId,
TenantId = evt.TenantId,
OccurredAtUtc = evt.OccurredAtUtc,
EventType = evt.EventType
});
// 3. Mark as processed in inbox
await _inbox.MarkProcessedAsync(consumerName, messageId, DateTime.UtcNow);
// 4. Commit transaction (processing + inbox = atomic)
await transaction.CommitAsync();
_logger.LogInformation("Successfully processed message {MessageId}", messageId);
}
catch (Exception ex)
{
await transaction.RollbackAsync();
_logger.LogError(ex, "Failed to process message {MessageId}", messageId);
throw; // MassTransit will retry
}
}
}
- Transaction Scope
- Inbox record + domain changes in single transaction
- If transaction fails, message redelivered (safe retry)
- If transaction succeeds, inbox prevents duplicate processing
Code Examples: - Complete idempotent consumer with inbox - Transaction management - Error handling
Diagrams: - Complete consumer flow - Transaction boundaries
Deliverables: - Complete consumer template - Transaction patterns - Error handling guide
CYCLE 9: MassTransit Outbox Integration (~3,000 lines)¶
Topic 17: MassTransit Outbox Configuration¶
What will be covered: - MassTransit Built-In Outbox
services.AddMassTransit(cfg =>
{
// Register consumers, sagas
cfg.AddConsumer<AuditAppendedConsumer>();
cfg.AddSagaStateMachine<ExportSaga, ExportSagaState>();
cfg.UsingAzureServiceBus((context, busCfg) =>
{
busCfg.Host("atp-prod.servicebus.windows.net");
// Enable outbox (Entity Framework)
busCfg.UseEntityFrameworkOutbox<AuditDbContext>(outbox =>
{
outbox.UsePostgres(); // Or UseSqlServer()
// Query delay (polling interval)
outbox.QueryDelay = TimeSpan.FromMilliseconds(500);
// Batch size
outbox.QueryMessageLimit = 100;
// Isolation level
outbox.IsolationLevel = IsolationLevel.ReadCommitted;
// Lock statement timeout
outbox.LockStatementTimeout = TimeSpan.FromSeconds(30);
});
busCfg.ConfigureEndpoints(context);
});
});
-
DbContext Configuration
public class AuditDbContext : DbContext { public DbSet<AuditRecord> AuditRecords { get; set; } public DbSet<OutboxMessage> OutboxMessages { get; set; } public DbSet<OutboxState> OutboxStates { get; set; } protected override void OnModelCreating(ModelBuilder modelBuilder) { base.OnModelCreating(modelBuilder); // MassTransit outbox tables modelBuilder.AddInboxStateEntity(); modelBuilder.AddOutboxStateEntity(); modelBuilder.AddOutboxMessageEntity(); } } -
Publishing with Outbox
public class AuditService { private readonly AuditDbContext _dbContext; private readonly IPublishEndpoint _publishEndpoint; public async Task<string> AppendAuditEventAsync(AuditEvent auditEvent) { using var transaction = await _dbContext.Database.BeginTransactionAsync(); try { // 1. Save domain entity var auditRecord = auditEvent.ToAuditRecord(); _dbContext.AuditRecords.Add(auditRecord); await _dbContext.SaveChangesAsync(); // 2. Publish event (saved to outbox automatically) await _publishEndpoint.Publish(new AuditAppendedEvent { AuditRecordId = auditRecord.Id, TenantId = auditEvent.TenantId, OccurredAtUtc = auditEvent.OccurredAtUtc }); // 3. Commit (domain + outbox atomic) await transaction.CommitAsync(); return auditRecord.Id; } catch { await transaction.RollbackAsync(); throw; } } }
Code Examples: - MassTransit outbox configuration (complete) - DbContext setup - Publishing with outbox - Migration for outbox tables
Diagrams: - MassTransit outbox architecture - DbContext + outbox integration
Deliverables: - MassTransit outbox setup - DbContext configuration - Publishing patterns
Topic 18: Custom Outbox Implementation¶
What will be covered: - When to Use Custom Outbox - Using NHibernate (not Entity Framework) - Custom serialization requirements - Advanced partition strategies - Integration with Orleans or other frameworks
- Custom Outbox Service
public class CustomOutboxService : IOutboxService { private readonly IOutboxRepository _outboxRepo; private readonly IUnitOfWork _unitOfWork; public async Task PublishWithOutboxAsync<TEvent>( TEvent domainEvent, Func<Task> domainOperation) where TEvent : IDomainEvent { using var transaction = await _unitOfWork.BeginTransactionAsync(); try { // 1. Execute domain operation (e.g., save aggregate) await domainOperation(); // 2. Save event to outbox var outboxMessage = new OutboxMessage { EventId = domainEvent.EventId ?? Ulid.NewUlid().ToString(), AggregateType = domainEvent.AggregateType, AggregateId = domainEvent.AggregateId, TenantId = domainEvent.TenantId, EventType = domainEvent.GetType().Name, PayloadJson = JsonSerializer.Serialize(domainEvent), CorrelationId = Activity.Current?.TraceId.ToString(), CreatedAtUtc = DateTime.UtcNow }; await _outboxRepo.SaveAsync(outboxMessage); // 3. Commit transaction await transaction.CommitAsync(); } catch { await transaction.RollbackAsync(); throw; } } }
Code Examples: - Custom outbox service implementation - NHibernate integration - Orleans integration - Comparison with MassTransit outbox
Diagrams: - Custom vs. MassTransit outbox - Integration patterns
Deliverables: - Custom outbox implementation - Framework integration guides - Selection criteria
CYCLE 10: MassTransit Inbox Integration (~3,000 lines)¶
Topic 19: MassTransit Inbox Configuration¶
What will be covered: - MassTransit Built-In Inbox
services.AddMassTransit(cfg =>
{
cfg.AddConsumer<AuditAppendedConsumer>();
cfg.UsingAzureServiceBus((context, busCfg) =>
{
busCfg.Host("atp-prod.servicebus.windows.net");
busCfg.ReceiveEndpoint("projection-queue", e =>
{
// Enable inbox (deduplication)
e.UseEntityFrameworkOutbox<AuditDbContext>(inbox =>
{
// Inbox configuration
inbox.UseSqlServer();
inbox.DuplicateDetectionWindow = TimeSpan.FromDays(7);
});
e.ConfigureConsumer<AuditAppendedConsumer>(context);
});
});
});
- Inbox State Table (MassTransit)
CREATE TABLE InboxState ( Id BIGINT IDENTITY(1,1) PRIMARY KEY, MessageId UNIQUEIDENTIFIER NOT NULL, ConsumerId UNIQUEIDENTIFIER NOT NULL, LockId UNIQUEIDENTIFIER NOT NULL, RowVersion TIMESTAMP, Received DATETIME2 NOT NULL, ReceiveCount INT NOT NULL, ExpirationTime DATETIME2 NULL, Consumed DATETIME2 NULL, Delivered DATETIME2 NULL, LastSequenceNumber BIGINT NULL, UNIQUE (MessageId, ConsumerId) );
Code Examples: - MassTransit inbox configuration - DbContext setup - Consumer with inbox
Diagrams: - MassTransit inbox architecture - Inbox state table
Deliverables: - MassTransit inbox setup - Configuration guide - Integration patterns
Topic 20: Custom Inbox Implementation¶
What will be covered: - Custom Inbox for NHibernate
public class NHibernateInboxRepository : IInboxRepository
{
private readonly ISessionFactory _sessionFactory;
public async Task<bool> HasProcessedAsync(string consumerName, string messageId)
{
using var session = _sessionFactory.OpenSession();
var count = await session.Query<InboxEntry>()
.Where(e => e.ConsumerName == consumerName && e.MessageId == messageId)
.CountAsync();
return count > 0;
}
public async Task MarkProcessedAsync(string consumerName, string messageId, DateTime processedAtUtc)
{
using var session = _sessionFactory.OpenSession();
using var transaction = session.BeginTransaction();
try
{
var entry = new InboxEntry
{
ConsumerName = consumerName,
MessageId = messageId,
ProcessedAtUtc = processedAtUtc,
ExpiresAtUtc = processedAtUtc.AddDays(7)
};
await session.SaveAsync(entry);
await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}
Code Examples: - Custom inbox for NHibernate - Custom inbox for MongoDB - Integration with MassTransit
Diagrams: - Custom inbox architecture
Deliverables: - Custom inbox implementations - Integration guide - Performance comparison
CYCLE 11: Idempotency Key Generation & Storage (~3,000 lines)¶
Topic 21: Idempotency Key Generation¶
What will be covered: - Key Generation Strategies
// 1. Client-Generated Keys (Preferred)
var idempotencyKey = $"{tenantId}:{sourceAppId}:{Ulid.NewUlid()}";
// 2. Sequence-Based Keys
var idempotencyKey = $"{tenantId}:{sourceAppId}:seq-{sequenceNumber}";
// 3. Content-Based Keys (Deterministic Hash)
var canonicalJson = JsonSerializer.Serialize(request, new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
WriteIndented = false
});
var contentHash = Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(canonicalJson)));
var idempotencyKey = $"{tenantId}:content:{contentHash}";
// 4. Time-Based Keys (ULID - sortable, time-ordered)
var ulid = Ulid.NewUlid(); // Encodes timestamp + randomness
var idempotencyKey = $"{tenantId}:{sourceAppId}:{ulid}";
-
ATP Key Format Standard
Format: {tenantId}:{sourceAppId}:{uniquePart} Examples: - acme-corp:erp-system:01HZX123456789ABCDEFGHJ - contoso:mobile-app:seq-12345 - fabrikam:webhook:content-a1b2c3d4... Rules: - Max length: 128 characters - Allowed chars: a-zA-Z0-9:-_ - Case-sensitive - Tenant-scoped (same key different tenants = different operations) -
Key Security
- Never expose internal IDs (use ULIDs or hashes)
- Hash keys before storage (prevent enumeration)
- Tenant-scoped (cross-tenant key reuse safe)
- TTL-based expiration (no infinite retention)
Code Examples: - Key generation strategies (all) - ATP key format validator - Key hashing implementation - Security best practices
Diagrams: - Key generation strategies - Key format standard
Deliverables: - Key generation library - Format validator - Security guide
Topic 22: Idempotency Key Storage¶
What will be covered: - Storage Options | Storage | Pros | Cons | ATP Usage | |---------|------|------|-----------| | Redis | Fast (<5ms), TTL native, distributed | Volatile (data loss on restart) | Inbox cache (layer 1) | | SQL Database | Durable, transactional, queryable | Slower (~10-50ms) | Idempotency gate (authoritative) | | MongoDB | Fast, flexible schema, TTL index | Eventual consistency | Alternative to SQL | | Cosmos DB | Global distribution, low latency | Cost, complexity | Multi-region ATP |
- Hybrid Approach (ATP Recommended)
public class HybridIdempotencyGate : IIdempotencyGate { private readonly IDistributedCache _cache; // Redis (L1) private readonly IIdempotencyRepository _db; // SQL (L2, authoritative) public async Task<IdempotencyGateResult> TryReserveAsync( string tenantId, string idempotencyKey, HttpContext context) { var cacheKey = $"idem:{tenantId}:{idempotencyKey}"; // 1. Check Redis cache (fast path) var cachedResult = await _cache.GetStringAsync(cacheKey); if (cachedResult != null) { var result = JsonSerializer.Deserialize<IdempotencyGateResult>(cachedResult); return result; // Cache hit (~2ms) } // 2. Check database (authoritative, slower) var dbResult = await _db.TryReserveAsync(tenantId, idempotencyKey, context); // 3. Cache result in Redis (24-hour TTL) var ttl = TimeSpan.FromHours(24); await _cache.SetStringAsync(cacheKey, JsonSerializer.Serialize(dbResult), new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = ttl }); return dbResult; // Database result (~15ms) } }
Code Examples: - Storage comparison benchmarks - Hybrid implementation (Redis + SQL) - TTL configuration - Cache invalidation
Diagrams: - Storage options comparison - Hybrid architecture
Deliverables: - Storage selection guide - Hybrid implementation - Performance benchmarks
CYCLE 12: Conflict Detection & Resolution (~3,000 lines)¶
Topic 23: Idempotency Conflict Detection¶
What will be covered: - Conflict Types
1. Exact Duplicate
- Same idempotency key
- Same request payload (hash match)
- Response: 200 OK with cached result
2. Semantic Conflict
- Same idempotency key
- Different request payload (hash mismatch)
- Response: 409 Conflict
3. Expired Key Reuse
- Same idempotency key (TTL expired)
- May be exact duplicate or conflict
- Response: Treat as new request or 409 (configurable)
-
Payload Hash Comparison
public class PayloadHasher { public string ComputeCanonicalHash(HttpRequest request) { // Read request body request.EnableBuffering(); using var reader = new StreamReader(request.Body, leaveOpen: true); var body = reader.ReadToEnd(); request.Body.Position = 0; // Parse JSON var jsonDoc = JsonDocument.Parse(body); // Remove non-material fields (correlation IDs, timestamps) var materialFields = ExtractMaterialFields(jsonDoc); // Serialize canonically (sorted keys, no whitespace) var canonical = JsonSerializer.Serialize(materialFields, new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase, WriteIndented = false, DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping }); // Compute SHA-256 hash var hash = SHA256.HashData(Encoding.UTF8.GetBytes(canonical)); return Convert.ToHexString(hash); } private Dictionary<string, object> ExtractMaterialFields(JsonDocument doc) { var material = new Dictionary<string, object>(); foreach (var property in doc.RootElement.EnumerateObject()) { // Skip non-material fields if (IsNonMaterialField(property.Name)) { continue; } material[property.Name] = property.Value; } return material; } private bool IsNonMaterialField(string fieldName) { // Fields that don't affect business meaning var nonMaterialFields = new[] { "correlationId", "traceId", "traceparent", "requestId", "timestamp", "requestedAt", "clientVersion" }; return nonMaterialFields.Contains(fieldName, StringComparer.OrdinalIgnoreCase); } } -
Conflict Response
HTTP/1.1 409 Conflict Content-Type: application/problem+json { "type": "https://audit.connectsoft.io/problems/idempotency-conflict", "title": "Idempotency Conflict", "status": 409, "detail": "The same idempotency key was used with different request content", "idempotencyKey": "acme-corp:app-123:evt-001", "originalRequestHash": "a1b2c3d4...", "currentRequestHash": "e5f6g7h8...", "firstSeenAt": "2025-10-30T12:00:00Z" }
Code Examples: - Payload hash computation (complete) - Canonical JSON serialization - Conflict detection logic - Conflict response formatting
Diagrams: - Conflict detection flow - Hash computation process
Deliverables: - Payload hasher implementation - Conflict detection logic - Response standards
Topic 24: Conflict Resolution Strategies¶
What will be covered: - Resolution Strategies
Strategy 1: Strict (ATP Default)
- Reject conflicts with 409
- Client must use different idempotency key
- Preserves audit trail integrity
Strategy 2: Last-Write-Wins
- Accept conflict, overwrite with latest
- Risk: Lost updates
- Use case: Non-critical operations
Strategy 3: Content-Based
- If content identical (hash match), treat as exact duplicate
- If content differs, reject with 409
- ATP uses this for content-hash fallback keys
Strategy 4: Warn-But-Allow
- Log conflict warning
- Process request (generate new ID)
- Return warning in response
- Use case: Development/testing only
- ATP Conflict Policy
- Default: Strict (reject with 409)
- Content Hash Keys: Content-based (hash match = duplicate)
- Development: Warn-but-allow (feature flag)
Code Examples: - All resolution strategies - ATP policy configuration - Feature flag for strategy selection
Diagrams: - Resolution strategy decision tree - Policy comparison
Deliverables: - Resolution strategy implementations - Policy configuration - Selection guide
CYCLE 13: Distributed Idempotency (~2,500 lines)¶
Topic 25: Cross-Service Idempotency¶
What will be covered: - Distributed Idempotency Challenges - Multiple services processing same request - Each service needs idempotency (not just first service) - Consistent key propagation across services
-
Idempotency Key Propagation
// Service A (Ingestion) [HttpPost("api/v1/events")] public async Task<IActionResult> IngestEvent([FromHeader(Name = "Idempotency-Key")] string idempotencyKey) { // Check idempotency gate var gateResult = await _gate.TryReserveAsync(tenantId, idempotencyKey); if (gateResult.IsDuplicate) { return Ok(gateResult.CachedResult); } // Process and publish event await _bus.Publish(new AuditAppendedEvent { AuditRecordId = recordId, IdempotencyKey = idempotencyKey, // Propagate key TenantId = tenantId }); return Created(); } // Service B (Projection) public class AuditAppendedConsumer : IConsumer<AuditAppendedEvent> { public async Task Consume(ConsumeContext<AuditAppendedEvent> context) { var evt = context.Message; // Use propagated idempotency key for Service B's inbox var messageId = context.MessageId?.ToString(); var idempotencyKey = evt.IdempotencyKey; // From event // Check Service B's inbox if (await _inbox.HasProcessedAsync("AuditAppendedConsumer", messageId)) { return; // Duplicate in Service B } // Process... } } -
Correlation-Based Idempotency
- Use CorrelationId for end-to-end deduplication
- Each service tracks (ConsumerName, CorrelationId)
- Allows replay with same CorrelationId
Code Examples: - Cross-service idempotency patterns - Key propagation - Correlation-based deduplication
Diagrams: - Distributed idempotency flow - Key propagation across services
Deliverables: - Cross-service patterns - Propagation guide - Correlation strategies
Topic 26: Saga Idempotency¶
What will be covered: - Saga State Idempotency
public class ExportSaga : MassTransitStateMachine<ExportSagaState>
{
public ExportSaga()
{
InstanceState(x => x.CurrentState);
Event(() => ExportRequested, x => x.CorrelateById(ctx => ctx.Message.ExportJobId));
Event(() => ExportCompleted, x => x.CorrelateById(ctx => ctx.Message.ExportJobId));
Initially(
When(ExportRequested)
.Then(ctx =>
{
// Idempotent state transition
ctx.Saga.ExportJobId = ctx.Message.ExportJobId;
ctx.Saga.TenantId = ctx.Message.TenantId;
ctx.Saga.StartedAt = DateTime.UtcNow;
})
.TransitionTo(Processing)
.PublishAsync(ctx => ctx.Init<StartExport>(new
{
ctx.Message.ExportJobId,
ctx.Message.TenantId
}))
);
During(Processing,
When(ExportCompleted)
.Then(ctx =>
{
ctx.Saga.CompletedAt = DateTime.UtcNow;
})
.TransitionTo(Completed)
.Finalize()
);
}
}
- Saga Idempotency Guarantees
- State transitions idempotent (same event = same state)
- Correlation by business key (ExportJobId)
- Version-based optimistic concurrency
- Duplicate events ignored (already in target state)
Code Examples: - Idempotent saga implementation - State transition patterns - Correlation configuration
Diagrams: - Saga state machine with idempotency - Correlation flow
Deliverables: - Saga idempotency guide - State transition patterns - Correlation strategies
CYCLE 14: Testing Outbox/Inbox/Idempotency (~2,500 lines)¶
Topic 27: Testing Strategies¶
What will be covered: - Unit Testing Outbox
[TestClass]
public class OutboxRepositoryTests
{
[TestMethod]
public async Task Should_SaveEventToOutbox()
{
// Arrange
var outboxRepo = CreateOutboxRepository();
var message = new OutboxMessage
{
EventId = Ulid.NewUlid().ToString(),
EventType = "test.event.v1",
PayloadJson = "{\"test\":true}"
};
// Act
await outboxRepo.SaveAsync(message);
// Assert
var unpublished = await outboxRepo.GetUnpublishedAsync();
Assert.AreEqual(1, unpublished.Count);
Assert.AreEqual(message.EventId, unpublished[0].EventId);
}
[TestMethod]
public async Task Should_MarkAsPublished()
{
// Arrange
var outboxRepo = CreateOutboxRepository();
var message = new OutboxMessage { EventId = Ulid.NewUlid().ToString(), ... };
await outboxRepo.SaveAsync(message);
// Act
await outboxRepo.MarkAsPublishedAsync(message.EventId, DateTime.UtcNow);
// Assert
var unpublished = await outboxRepo.GetUnpublishedAsync();
Assert.AreEqual(0, unpublished.Count);
}
}
-
Integration Testing Inbox
[TestClass] public class InboxIntegrationTests { [TestMethod] public async Task Should_DetectDuplicate() { // Arrange var inbox = CreateInboxRepository(); var messageId = Ulid.NewUlid().ToString(); // Act - First delivery var firstCheck = await inbox.HasProcessedAsync("TestConsumer", messageId); Assert.IsFalse(firstCheck); await inbox.MarkProcessedAsync("TestConsumer", messageId, DateTime.UtcNow); // Act - Duplicate delivery var duplicateCheck = await inbox.HasProcessedAsync("TestConsumer", messageId); Assert.IsTrue(duplicateCheck); } } -
End-to-End Idempotency Test
[TestMethod] public async Task Should_HandleDuplicateRequests_Idempotently() { // Arrange var client = CreateTestClient(); var idempotencyKey = $"test-tenant:test-app:{Ulid.NewUlid()}"; var request = new { eventType = "test.event", data = "test" }; // Act - First request var response1 = await client.PostAsJsonAsync("/api/v1/events", request, headers => { headers.Add("Idempotency-Key", idempotencyKey); }); var result1 = await response1.Content.ReadFromJsonAsync<CreateEventResponse>(); // Act - Duplicate request (same key, same payload) var response2 = await client.PostAsJsonAsync("/api/v1/events", request, headers => { headers.Add("Idempotency-Key", idempotencyKey); }); var result2 = await response2.Content.ReadFromJsonAsync<CreateEventResponse>(); // Assert Assert.AreEqual(201, (int)response1.StatusCode); Assert.AreEqual(200, (int)response2.StatusCode); // Duplicate returns 200 Assert.AreEqual(result1.AuditRecordId, result2.AuditRecordId); // Same resource // Verify only one event persisted var events = await _repository.GetAllAsync(); Assert.AreEqual(1, events.Count); }
Code Examples: - Complete test suites (outbox, inbox, idempotency gate) - Duplicate detection tests - Conflict detection tests - End-to-end idempotency tests
Diagrams: - Test architecture - Test data flow
Deliverables: - Complete test suite - Test helpers - Testing guide
Topic 28: Chaos Testing for Idempotency¶
What will be covered: - Fault Injection Tests
[TestMethod]
public async Task Should_HandleDatabaseFailure_DuringOutboxSave()
{
// Arrange
var dbMock = CreateFailingDatabase(); // Simulates transient failure
var service = new AuditService(dbMock, ...);
// Act & Assert - First attempt fails
await Assert.ThrowsExceptionAsync<DbException>(async () =>
{
await service.AppendAuditEventAsync(auditEvent);
});
// Verify: No event persisted, no event published
Assert.AreEqual(0, await _repository.CountAsync());
Assert.AreEqual(0, await _outbox.GetUnpublishedCountAsync());
// Act - Retry with working database
dbMock.EnableSuccess();
var recordId = await service.AppendAuditEventAsync(auditEvent);
// Assert - Event saved, outbox contains event
Assert.IsNotNull(recordId);
Assert.AreEqual(1, await _repository.CountAsync());
Assert.AreEqual(1, await _outbox.GetUnpublishedCountAsync());
}
[TestMethod]
public async Task Should_HandleMessageRedelivery()
{
// Simulate message delivered twice
var message = CreateTestMessage();
// First delivery
await _consumer.Consume(CreateContext(message));
var count1 = await _repository.CountAsync();
// Duplicate delivery (simulated redelivery)
await _consumer.Consume(CreateContext(message));
var count2 = await _repository.CountAsync();
// Assert: Processed only once
Assert.AreEqual(1, count1);
Assert.AreEqual(1, count2);
}
Code Examples: - Fault injection tests - Redelivery simulation - Chaos scenarios (network failures, crashes)
Diagrams: - Chaos test scenarios
Deliverables: - Chaos test suite - Fault injection utilities - Resilience validation
CYCLE 15: Monitoring & Observability (~3,000 lines)¶
Topic 29: Outbox/Inbox Metrics¶
What will be covered: - Outbox Metrics
public class OutboxMetrics
{
// Outbox backlog (unpublished events)
[Counter("outbox_backlog_total", "Unpublished events in outbox")]
public void RecordBacklog(int count, string tenantId);
// Outbox publish rate
[Counter("outbox_published_total", "Events published from outbox")]
public void RecordPublished(string eventType, string tenantId);
// Outbox publish latency
[Histogram("outbox_publish_duration_ms", "Time from event creation to publish")]
public void RecordPublishLatency(double durationMs, string eventType);
// Outbox publish failures
[Counter("outbox_publish_failed_total", "Failed outbox publishes")]
public void RecordPublishFailure(string eventType, string error);
}
-
Inbox Metrics
public class InboxMetrics { // Duplicate detection rate [Counter("inbox_duplicates_total", "Duplicate messages detected")] public void RecordDuplicate(string consumerName, string tenantId); // Inbox check latency [Histogram("inbox_check_duration_ms", "Time to check inbox for duplicates")] public void RecordCheckLatency(double durationMs); // Inbox size [Gauge("inbox_entries_total", "Total inbox entries")] public void RecordInboxSize(long count); } -
Idempotency Gate Metrics
public class IdempotencyMetrics { // Duplicate requests [Counter("idempotency_duplicates_total", "Duplicate HTTP requests")] public void RecordDuplicate(string endpoint, string tenantId); // Conflicts [Counter("idempotency_conflicts_total", "Idempotency key conflicts")] public void RecordConflict(string endpoint, string tenantId); // Cache hit rate [Counter("idempotency_cache_hits_total", "Idempotency cache hits")] public void RecordCacheHit(); [Counter("idempotency_cache_misses_total", "Idempotency cache misses")] public void RecordCacheMiss(); }
Code Examples: - Complete metrics collection (all patterns) - Prometheus exporters - Application Insights integration
Diagrams: - Metrics architecture - Dashboard layout
Deliverables: - Metrics collection library - Dashboard templates - Query library
Topic 30: Application Insights Queries¶
What will be covered: - Outbox Monitoring Queries
// Outbox backlog over time
customMetrics
| where name == "outbox_backlog_total"
| extend TenantId = tostring(customDimensions.TenantId)
| summarize AvgBacklog = avg(value), MaxBacklog = max(value) by bin(timestamp, 5m), TenantId
| order by timestamp desc
// Outbox publish latency (P50, P95, P99)
customMetrics
| where name == "outbox_publish_duration_ms"
| extend EventType = tostring(customDimensions.EventType)
| summarize
P50 = percentile(value, 50),
P95 = percentile(value, 95),
P99 = percentile(value, 99)
by EventType
// Outbox publish failures
customMetrics
| where name == "outbox_publish_failed_total"
| extend EventType = tostring(customDimensions.EventType)
| extend Error = tostring(customDimensions.Error)
| summarize FailureCount = sum(value) by EventType, Error
| order by FailureCount desc
- Inbox Monitoring Queries
// Duplicate detection rate customMetrics | where name == "inbox_duplicates_total" | extend ConsumerName = tostring(customDimensions.ConsumerName) | summarize DuplicateCount = sum(value) by bin(timestamp, 5m), ConsumerName | order by timestamp desc // Inbox deduplication effectiveness let total = customMetrics | where name == "inbox_messages_processed_total" | summarize sum(value); let duplicates = customMetrics | where name == "inbox_duplicates_total" | summarize sum(value); print DuplicatePercentage = (duplicates * 100.0) / total
Code Examples: - Complete query library (Kusto) - Alert rules (Azure Monitor) - Dashboard JSON
Diagrams: - Dashboard layouts (mockups)
Deliverables: - Query library (complete) - Alert rules - Dashboard templates
CYCLE 16: Best Practices & Troubleshooting (~3,000 lines)¶
Topic 31: Best Practices¶
What will be covered: - Outbox Best Practices - ✅ Always use outbox for critical events - ✅ Batch publish from outbox (performance) - ✅ Monitor outbox backlog (alert if >1000) - ✅ Cleanup published events regularly (7-day retention) - ✅ Use partition keys (TenantId) for ordering - ✅ Set max retry attempts (5-10) - ✅ Move to DLQ after max attempts
- Inbox Best Practices
- ✅ Always check inbox before processing
- ✅ Record inbox entry in same transaction as processing
- ✅ Use Redis for performance (SQL for durability)
- ✅ Set appropriate TTL (7-30 days)
- ✅ Cleanup expired entries daily
-
✅ Monitor duplicate detection rate
-
Idempotency Best Practices
- ✅ Use client-generated keys (ULIDs)
- ✅ Require Idempotency-Key header for mutations
- ✅ Cache idempotency results (24-hour TTL)
- ✅ Return cached result for exact duplicates
- ✅ Return 409 for conflicts
- ✅ Hash payloads for conflict detection
- ✅ Exclude non-material fields from hash
-
✅ Monitor conflict rate (alert if >1%)
-
Anti-Patterns to Avoid
- ❌ Direct publish without outbox (dual-write)
- ❌ No inbox deduplication (duplicate side effects)
- ❌ Non-idempotent operations (x = x + 1)
- ❌ No idempotency key validation
- ❌ Infinite key retention (no TTL)
- ❌ No conflict detection (silent overwrites)
- ❌ No monitoring (can't detect issues)
Code Examples: - Best practice implementations - Anti-pattern examples - Code review checklist
Diagrams: - Best practices checklist
Deliverables: - Best practices handbook - Anti-pattern catalog - Code review checklist
Topic 32: Troubleshooting¶
What will be covered: - Common Problems - Problem: Outbox backlog growing - Cause: Relay worker not running, publish failures - Solution: Check relay worker health, review publish errors - Debug: Query outbox for unpublished events, check error messages
-
Problem: Duplicate messages still processed
- Cause: Inbox not checked, transaction not committed
- Solution: Verify inbox check before processing, ensure atomic transaction
- Debug: Check inbox table, review consumer logs
-
Problem: 409 Conflict errors
- Cause: Same idempotency key with different payload
- Solution: Client bug (key reuse), investigate client logic
- Debug: Compare payload hashes, review client code
-
Problem: Slow idempotency checks
- Cause: Database bottleneck, no caching
- Solution: Add Redis cache layer, index optimization
- Debug: Query execution plans, add Redis
Code Examples: - Troubleshooting queries - Debug utilities - Common fixes
Diagrams: - Troubleshooting decision tree
Deliverables: - Troubleshooting guide - Debug tools - Common problems catalog
Summary of Deliverables¶
Across all 16 cycles, this documentation will provide:
- Fundamentals
- Dual-write problem and outbox solution
- At-least-once delivery and inbox solution
-
Idempotency principles and patterns
-
Outbox Implementation
- Table schema (SQL + MongoDB)
- Repository implementation
- Relay worker (background service)
-
Event publishing with outbox
-
Inbox Implementation
- Table schema (SQL + MongoDB + Redis)
- Repository implementation
- Deduplication logic
-
Idempotent consumer patterns
-
Idempotency Gates
- HTTP middleware (Idempotency-Key header)
- Idempotency table schema
- Conflict detection and resolution
-
Response caching
-
MassTransit Integration
- Built-in outbox/inbox configuration
- Entity Framework integration
-
Custom outbox/inbox for NHibernate
-
Key Management
- Key generation strategies (ULID, sequence, content-hash)
- ATP key format standard
- Key storage and hashing
-
TTL policies
-
Advanced Topics
- Distributed idempotency (cross-service)
- Saga idempotency
-
Correlation-based deduplication
-
Operations
- Testing strategies (unit, integration, E2E, chaos)
- Monitoring and metrics
- Application Insights queries
- Best practices and troubleshooting
Related Documentation¶
- Idempotency Patterns: Contract-level idempotency patterns
- Messaging Implementation: MassTransit and Azure Service Bus setup
- Message Schemas: Event schemas and contracts
- REST APIs: Idempotency-Key header specification
- Webhooks: Webhook idempotency patterns
- Persistence: Repository and Unit of Work patterns
- Testing Strategy: Idempotency testing approaches
- Observability: Metrics and monitoring
This documentation plan covers complete implementation of transactional outbox, inbox deduplication, and idempotency patterns for ATP, from database schemas and repository implementations to MassTransit integration, relay workers, conflict detection, testing strategies, monitoring, and operational best practices, ensuring exactly-once processing semantics in a distributed, at-least-once delivery environment.