Skip to content

Outbox/Inbox/Idempotency Implementation - Audit Trail Platform (ATP)

Exactly-once intent in an at-least-once world — ATP implements transactional outbox for atomic state + event persistence, inbox pattern for consumer deduplication, and idempotency gates for duplicate request handling, ensuring reliable message processing without dual-write problems or duplicate side effects.


📋 Documentation Generation Plan

This document will be generated in 16 cycles. Current progress:

Cycle Topics Estimated Lines Status
Cycle 1 Outbox/Inbox/Idempotency Philosophy (1-2) ~3,000 ⏳ Not Started
Cycle 2 Transactional Outbox Pattern (3-4) ~4,000 ⏳ Not Started
Cycle 3 Outbox Table Schema & Implementation (5-6) ~3,500 ⏳ Not Started
Cycle 4 Outbox Relay Worker (7-8) ~3,500 ⏳ Not Started
Cycle 5 Inbox Pattern & Deduplication (9-10) ~3,500 ⏳ Not Started
Cycle 6 Inbox Table Schema & Implementation (11-12) ~3,000 ⏳ Not Started
Cycle 7 Idempotency Gate for HTTP APIs (13-14) ~4,000 ⏳ Not Started
Cycle 8 Idempotent Message Consumers (15-16) ~3,500 ⏳ Not Started
Cycle 9 MassTransit Outbox Integration (17-18) ~3,000 ⏳ Not Started
Cycle 10 MassTransit Inbox Integration (19-20) ~3,000 ⏳ Not Started
Cycle 11 Idempotency Key Generation & Storage (21-22) ~3,000 ⏳ Not Started
Cycle 12 Conflict Detection & Resolution (23-24) ~3,000 ⏳ Not Started
Cycle 13 Distributed Idempotency (25-26) ~2,500 ⏳ Not Started
Cycle 14 Testing Outbox/Inbox/Idempotency (27-28) ~2,500 ⏳ Not Started
Cycle 15 Monitoring & Observability (29-30) ~3,000 ⏳ Not Started
Cycle 16 Best Practices & Troubleshooting (31-32) ~3,000 ⏳ Not Started

Total Estimated Lines: ~52,000


Purpose & Scope

This document provides the complete implementation guide for transactional outbox, inbox deduplication, and idempotency patterns in the Audit Trail Platform (ATP), covering database schemas, C# implementation, MassTransit integration, relay workers, duplicate detection, conflict resolution, and operational best practices.

Why Outbox/Inbox/Idempotency for ATP?

  1. Exactly-Once Semantics: Despite at-least-once delivery guarantees, achieve exactly-once processing
  2. No Dual-Write Problem: Atomic persistence of domain state + events (transactional outbox)
  3. No Duplicate Side Effects: Idempotent consumers via inbox deduplication
  4. Reliable Event Publishing: Events persisted before publishing (no event loss)
  5. Deterministic Replay: Safely replay events from DLQ or event store
  6. Audit Trail Integrity: All operations idempotent (critical for compliance)
  7. Performance: Minimal latency overhead (<10ms for idempotency checks)

The Dual-Write Problem (Why Outbox?)

// ❌ ANTI-PATTERN: Dual-write (not atomic)
await _repository.SaveAsync(auditEvent);  // Database write
await _bus.PublishAsync(eventPublished);  // Message publish

Problems:
1. If database succeeds but publish fails → event lost
2. If publish succeeds but database rollback → duplicate event
3. No atomicity guarantee across systems

Solution: Transactional Outbox

// ✅ PATTERN: Transactional Outbox (atomic)
using var transaction = await _unitOfWork.BeginTransactionAsync();
await _repository.SaveAsync(auditEvent);      // Database write
await _outbox.SaveAsync(eventPublished);      // Outbox write (same tx)
await transaction.CommitAsync();              // Atomic commit

// Separate process (relay worker) publishes from outbox
_outboxRelay.PublishPendingAsync();

ATP Implementation Components

  • Outbox Table: Persists unpublished events with domain state
  • Outbox Relay Worker: Background service polling outbox, publishing to Service Bus
  • Inbox Table: Tracks processed message IDs to prevent duplicate handling
  • Idempotency Gate: HTTP middleware checking for duplicate requests
  • Idempotent Handlers: Message consumers that safely handle duplicates
  • MassTransit Outbox/Inbox: Built-in support for Entity Framework or custom stores
  • Redis Idempotency Cache: Fast duplicate detection (optional)

Detailed Cycle Plan

CYCLE 1: Outbox/Inbox/Idempotency Philosophy (~3,000 lines)

Topic 1: The Dual-Write Problem & Outbox Pattern

What will be covered: - Dual-Write Problem Explained - Scenario: Save to database + publish to message broker - Problem: Two separate transactions (not atomic) - Failure Cases: 1. DB commit succeeds, publish fails → Event lost 2. Publish succeeds, DB commit fails → Duplicate event 3. DB and publish succeed, but app crashes before commit confirmation → Uncertainty

  • Transactional Outbox Solution

    Single Transaction:
    1. Save domain state (aggregate/entity) to database
    2. Save event to Outbox table (same transaction)
    3. Commit atomically
    
    Separate Process:
    4. Relay worker reads unpublished events from Outbox
    5. Publishes to message broker
    6. Marks as published in Outbox (idempotent)
    

  • Outbox Benefits

  • Atomicity: State + event committed together (no dual-write)
  • Reliability: Events never lost (persisted before publishing)
  • Resilience: Relay worker can fail and restart (events queued)
  • Ordering: Events published in commit order
  • Audit Trail: All events persisted (even if publish fails)

  • Outbox Trade-Offs

  • Latency: Event publishing delayed by relay polling interval
  • Complexity: Additional table, background worker, monitoring
  • Storage: Outbox table grows (requires cleanup job)

  • When to Use Outbox

  • Critical events that must not be lost (audit events, financial transactions)
  • When domain state and events must be atomic (consistency)
  • When event ordering matters
  • When events must be auditable (persistent record)

Code Examples: - Dual-write problem code (anti-pattern) - Transactional outbox pattern (solution) - Outbox vs. direct publish comparison

Diagrams: - Dual-write problem failure scenarios - Transactional outbox flow - Atomicity guarantee

Deliverables: - Dual-write problem analysis - Outbox pattern overview - Trade-off analysis


Topic 2: Inbox Pattern & Idempotency

What will be covered: - At-Least-Once Delivery Problem - Message brokers guarantee: Message delivered at least once - Problem: Message can be delivered multiple times (network retries, broker fail-over) - Failure Cases: 1. Consumer processes message, crashes before ACK → Re-delivery, duplicate processing 2. Consumer ACKs message, but ACK lost → Re-delivery, already processed 3. Broker duplicates message (rare but possible) → Duplicate processing

  • Inbox Pattern Solution

    On Message Receipt:
    1. Check Inbox: Has this messageId been processed?
    2. If YES → ACK and skip (idempotent)
    3. If NO → Process message
    4. Insert messageId into Inbox
    5. Commit transaction (processing + inbox record)
    6. ACK message
    

  • Inbox Benefits

  • Exactly-Once Processing: Despite at-least-once delivery
  • Idempotent Consumers: Safe to retry without side effects
  • Deterministic Replay: Can replay events from DLQ or event store
  • Audit Trail: Track all processed messages

  • Idempotency Principle

    f(x) = y
    f(f(x)) = f(x) = y
    
    Applying operation multiple times = same result as applying once
    

  • Idempotent Operations

  • Naturally Idempotent: SET x = 5, DELETE WHERE id = 123
  • Idempotent with Key: UPSERT (INSERT or UPDATE based on key)
  • Not Idempotent: x = x + 1, INSERT without key check

  • Making Operations Idempotent

    // ❌ NOT Idempotent
    public async Task ProcessEvent(OrderCreated evt)
    {
        var order = new Order { Id = evt.OrderId, Total = evt.Total };
        await _repository.AddAsync(order);  // Duplicate key error on retry
        await _emailService.SendAsync($"Order {evt.OrderId} created");  // Duplicate email
    }
    
    // ✅ Idempotent
    public async Task ProcessEvent(OrderCreated evt)
    {
        // Check inbox first
        if (await _inbox.HasProcessedAsync(evt.MessageId))
        {
            return;  // Already processed
        }
    
        // Upsert (idempotent)
        var order = new Order { Id = evt.OrderId, Total = evt.Total };
        await _repository.UpsertAsync(order);
    
        // Idempotent email (check if already sent)
        if (!await _emailLog.HasSentAsync(evt.OrderId))
        {
            await _emailService.SendAsync($"Order {evt.OrderId} created");
            await _emailLog.MarkSentAsync(evt.OrderId);
        }
    
        // Record in inbox
        await _inbox.MarkProcessedAsync(evt.MessageId);
    }
    

Code Examples: - At-least-once delivery scenarios - Inbox pattern implementation - Idempotent vs. non-idempotent code - Making operations idempotent

Diagrams: - At-least-once delivery problem - Inbox pattern flow - Idempotency principle

Deliverables: - Inbox pattern overview - Idempotency fundamentals - Operation patterns


CYCLE 2: Transactional Outbox Pattern (~4,000 lines)

Topic 3: Outbox Pattern Architecture

What will be covered: - Outbox Components

┌─────────────────────────────────────┐
│  Use Case / Command Handler         │
├─────────────────────────────────────┤
│  Domain Model (Aggregate)           │
├─────────────────────────────────────┤
│  Repository + Outbox                │
│  ┌────────────┬──────────────────┐  │
│  │ Aggregates │  Outbox Table    │  │
│  │ Table      │  (Events)        │  │
│  └────────────┴──────────────────┘  │
│         Same Transaction            │
├─────────────────────────────────────┤
│  Outbox Relay Worker                │
│  (Background Service)               │
├─────────────────────────────────────┤
│  MassTransit + Azure Service Bus    │
└─────────────────────────────────────┘

  • Outbox Workflow

    sequenceDiagram
        participant API as REST API
        participant UC as Use Case
        participant AGG as Aggregate
        participant REPO as Repository
        participant DB as Database
        participant OB as Outbox Table
        participant RL as Relay Worker
        participant BUS as Service Bus
        participant SUB as Subscribers
    
        API->>UC: Execute Command
        UC->>AGG: Apply Business Logic
        AGG->>AGG: Raise Domain Event
        UC->>REPO: Save Aggregate
    
        Note over REPO,OB: Begin Transaction
        REPO->>DB: UPDATE/INSERT Aggregate
        REPO->>OB: INSERT Event(s)
        Note over REPO,OB: Commit Transaction
    
        RL->>OB: Poll Unpublished Events
        RL->>BUS: Publish Event
        BUS-->>RL: Confirm Published
        RL->>OB: Mark as Published
    
        BUS->>SUB: Deliver Event (at-least-once)
        SUB->>SUB: Process (idempotent)
    Hold "Alt" / "Option" to enable pan & zoom

  • ATP Outbox Strategy

  • Per-Service Outbox: Each service has its own outbox table
  • Shared Database: Outbox in same database as domain tables
  • Polling Interval: 100-500ms (configurable)
  • Batch Publishing: Publish multiple events in batch (performance)
  • Retry Strategy: Exponential backoff for failed publishes
  • Event Ordering: FIFO per partition key (tenantId)

Code Examples: - Outbox architecture diagram - Complete workflow sequence - ATP outbox configuration

Diagrams: - Outbox component architecture - Detailed outbox flow - Transaction boundaries

Deliverables: - Outbox architecture overview - Workflow documentation - Configuration guide


Topic 4: Outbox vs. Direct Publishing

What will be covered: - Comparison Matrix | Aspect | Direct Publishing | Transactional Outbox | |--------|-------------------|----------------------| | Atomicity | ❌ Two separate transactions | ✅ Single transaction | | Event Loss | ⚠️ Possible if publish fails | ✅ Never (persisted first) | | Duplicate Events | ⚠️ Possible if DB rollback | ✅ Prevented by relay deduplication | | Latency | ✅ Immediate (0-10ms) | ⚠️ Delayed by relay poll (100-500ms) | | Complexity | ✅ Simple (one call) | ⚠️ Complex (table, worker, monitoring) | | Event Ordering | ⚠️ Not guaranteed | ✅ Guaranteed (commit order) | | Audit Trail | ❌ No persistence | ✅ Persistent record |

  • When to Use Each
  • Direct Publishing: Non-critical events, notifications, fire-and-forget
  • Transactional Outbox: Critical events (audit, financial), ordered events, high reliability

  • ATP Decision

  • Outbox for: audit.appended, projection.updated, integrity.verified, export.completed
  • ⚠️ Direct for: Internal telemetry events, low-priority notifications (if any)

  • Hybrid Approach

  • Critical path: Outbox
  • Non-critical path: Direct publishing
  • Feature flag to control (UseOutboxForAllEvents)

Code Examples: - Direct publish code example - Outbox publish code example - Comparison benchmarks - Feature flag configuration

Diagrams: - Direct vs. outbox comparison - Decision tree (when to use each)

Deliverables: - Comparison matrix - Decision criteria - Hybrid implementation guide


CYCLE 3: Outbox Table Schema & Implementation (~3,500 lines)

Topic 5: Outbox Database Schema

What will be covered: - Outbox Table Schema (SQL Server)

CREATE TABLE audit.Outbox (
    Id                 BIGINT IDENTITY(1,1) PRIMARY KEY,
    EventId            VARCHAR(50) NOT NULL UNIQUE,        -- ULID/UUIDv7
    AggregateType      VARCHAR(64) NOT NULL,
    AggregateId        VARCHAR(64) NOT NULL,
    TenantId           VARCHAR(64) NOT NULL,

    EventType          VARCHAR(128) NOT NULL,             -- audit.appended.v1
    SchemaVersion      INT NOT NULL DEFAULT 1,

    PayloadJson        NVARCHAR(MAX) NOT NULL,            -- Event payload
    Headers            NVARCHAR(MAX) NULL,                -- Message headers (JSON)

    IdempotencyKey     NVARCHAR(128) NULL,
    CorrelationId      VARCHAR(64) NULL,
    CausationId        VARCHAR(64) NULL,
    Traceparent        NVARCHAR(64) NULL,

    CreatedAtUtc       DATETIME2(3) NOT NULL DEFAULT SYSUTCDATETIME(),
    PublishedAtUtc     DATETIME2(3) NULL,

    Attempts           INT NOT NULL DEFAULT 0,
    LastAttemptAtUtc   DATETIME2(3) NULL,
    LastError          NVARCHAR(MAX) NULL,

    CONSTRAINT UQ_Outbox_EventId UNIQUE (EventId),
    INDEX IX_Outbox_Unpublished (PublishedAtUtc, TenantId) WHERE PublishedAtUtc IS NULL,
    INDEX IX_Outbox_Tenant (TenantId, CreatedAtUtc)
);

  • Schema Design Principles
  • EventId: Unique identifier (ULID for sortable, time-ordered IDs)
  • Partition Key: TenantId for Azure Service Bus partitioning
  • PublishedAtUtc: NULL = unpublished, NOT NULL = published
  • Attempts & Error: Retry tracking and debugging
  • Headers: Correlation, causation, trace context

  • MongoDB Schema (Alternative)

    public class OutboxMessage
    {
        public string Id { get; set; }  // EventId (ULID)
        public string AggregateType { get; set; }
        public string AggregateId { get; set; }
        public string TenantId { get; set; }
    
        public string EventType { get; set; }
        public int SchemaVersion { get; set; }
    
        public BsonDocument Payload { get; set; }
        public Dictionary<string, string> Headers { get; set; }
    
        public string IdempotencyKey { get; set; }
        public string CorrelationId { get; set; }
        public string CausationId { get; set; }
        public string Traceparent { get; set; }
    
        public DateTime CreatedAtUtc { get; set; }
        public DateTime? PublishedAtUtc { get; set; }
    
        public int Attempts { get; set; }
        public DateTime? LastAttemptAtUtc { get; set; }
        public string LastError { get; set; }
    }
    

  • Outbox Retention & Cleanup

  • Retain published events for 7 days (debugging, replay)
  • Automatic cleanup job (daily)
  • Archive to blob storage if needed (compliance audit trail)

Code Examples: - Complete outbox table schema (SQL + MongoDB) - FluentMigrator migration for outbox table - Cleanup job implementation

Diagrams: - Outbox table schema diagram - Retention policy flow

Deliverables: - Outbox table schemas (SQL + MongoDB) - Migration scripts - Cleanup job


Topic 6: Outbox Repository Implementation

What will be covered: - IOutboxRepository Interface

public interface IOutboxRepository
{
    Task SaveAsync(OutboxMessage message);
    Task<IReadOnlyList<OutboxMessage>> GetUnpublishedAsync(int batchSize = 100);
    Task MarkAsPublishedAsync(string eventId, DateTime publishedAtUtc);
    Task IncrementAttemptAsync(string eventId, string error);
    Task<int> DeletePublishedOlderThanAsync(DateTime threshold);
}

  • NHibernate Implementation

    public class OutboxRepository : IOutboxRepository
    {
        private readonly IUnitOfWork _unitOfWork;
    
        public async Task SaveAsync(OutboxMessage message)
        {
            var session = _unitOfWork.GetSession();
            await session.SaveAsync(message);
            // Flushed as part of unit of work commit
        }
    
        public async Task<IReadOnlyList<OutboxMessage>> GetUnpublishedAsync(int batchSize = 100)
        {
            var session = _unitOfWork.GetSession();
    
            return await session.Query<OutboxMessage>()
                .Where(m => m.PublishedAtUtc == null)
                .OrderBy(m => m.CreatedAtUtc)  // FIFO
                .ThenBy(m => m.TenantId)       // Partition ordering
                .Take(batchSize)
                .ToListAsync();
        }
    
        public async Task MarkAsPublishedAsync(string eventId, DateTime publishedAtUtc)
        {
            var session = _unitOfWork.GetSession();
    
            await session.Query<OutboxMessage>()
                .Where(m => m.EventId == eventId)
                .UpdateAsync(m => new OutboxMessage 
                { 
                    PublishedAtUtc = publishedAtUtc 
                });
        }
    
        public async Task IncrementAttemptAsync(string eventId, string error)
        {
            var session = _unitOfWork.GetSession();
    
            var message = await session.GetAsync<OutboxMessage>(eventId);
            message.Attempts++;
            message.LastAttemptAtUtc = DateTime.UtcNow;
            message.LastError = error;
    
            await session.UpdateAsync(message);
        }
    }
    

  • FluentNHibernate Mapping

    public class OutboxMessageMap : ClassMap<OutboxMessage>
    {
        public OutboxMessageMap()
        {
            Schema("audit");
            Table("Outbox");
    
            Id(x => x.Id).GeneratedBy.Identity();
            Map(x => x.EventId).Not.Nullable().UniqueKey("UQ_EventId");
            Map(x => x.AggregateType).Not.Nullable().Length(64);
            Map(x => x.AggregateId).Not.Nullable().Length(64);
            Map(x => x.TenantId).Not.Nullable().Length(64).Index("IX_Outbox_Tenant");
    
            Map(x => x.EventType).Not.Nullable().Length(128);
            Map(x => x.SchemaVersion).Not.Nullable();
    
            Map(x => x.PayloadJson).CustomSqlType("NVARCHAR(MAX)").Not.Nullable();
            Map(x => x.Headers).CustomSqlType("NVARCHAR(MAX)").Nullable();
    
            Map(x => x.IdempotencyKey).Length(128).Nullable();
            Map(x => x.CorrelationId).Length(64).Nullable();
            Map(x => x.CausationId).Length(64).Nullable();
            Map(x => x.Traceparent).Length(64).Nullable();
    
            Map(x => x.CreatedAtUtc).Not.Nullable();
            Map(x => x.PublishedAtUtc).Nullable().Index("IX_Outbox_Unpublished");
    
            Map(x => x.Attempts).Not.Nullable();
            Map(x => x.LastAttemptAtUtc).Nullable();
            Map(x => x.LastError).CustomSqlType("NVARCHAR(MAX)").Nullable();
        }
    }
    

Code Examples: - Complete IOutboxRepository implementation - NHibernate mapping - MongoDB repository (alternative) - Unit tests for repository

Diagrams: - Repository architecture - Data access flow

Deliverables: - Outbox repository (SQL + MongoDB) - FluentNHibernate mappings - Unit tests


CYCLE 4: Outbox Relay Worker (~3,500 lines)

Topic 7: Relay Worker Implementation

What will be covered: - Relay Worker Architecture

public class OutboxRelayWorker : BackgroundService
{
    private readonly IOutboxRepository _outboxRepo;
    private readonly IBus _bus;
    private readonly ILogger<OutboxRelayWorker> _logger;
    private readonly OutboxRelayOptions _options;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Outbox Relay Worker starting");

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await ProcessBatchAsync(stoppingToken);

                // Wait before next poll
                await Task.Delay(_options.PollingIntervalMs, stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Outbox relay worker error");
                await Task.Delay(_options.ErrorBackoffMs, stoppingToken);
            }
        }
    }

    private async Task ProcessBatchAsync(CancellationToken cancellationToken)
    {
        // Get unpublished events (FIFO, batched)
        var messages = await _outboxRepo.GetUnpublishedAsync(_options.BatchSize);

        if (!messages.Any())
        {
            return;  // No work
        }

        _logger.LogInformation("Processing {Count} outbox messages", messages.Count);

        foreach (var message in messages)
        {
            if (cancellationToken.IsCancellationRequested)
            {
                break;
            }

            await PublishMessageAsync(message, cancellationToken);
        }
    }

    private async Task PublishMessageAsync(OutboxMessage message, CancellationToken cancellationToken)
    {
        try
        {
            // Deserialize payload
            var eventPayload = JsonSerializer.Deserialize(message.PayloadJson, GetEventType(message.EventType));

            // Set message headers
            var sendContext = await _bus.GetSendEndpoint(new Uri($"queue:{message.EventType}"));
            await sendContext.Send(eventPayload, ctx =>
            {
                ctx.MessageId = Guid.Parse(message.EventId);
                ctx.CorrelationId = Guid.Parse(message.CorrelationId);
                ctx.Headers.Set("TenantId", message.TenantId);
                ctx.Headers.Set("IdempotencyKey", message.IdempotencyKey);
                ctx.Headers.Set("SchemaVersion", message.SchemaVersion);

                if (!string.IsNullOrEmpty(message.Traceparent))
                {
                    ctx.Headers.Set("traceparent", message.Traceparent);
                }
            }, cancellationToken);

            // Mark as published
            await _outboxRepo.MarkAsPublishedAsync(message.EventId, DateTime.UtcNow);

            _logger.LogInformation("Published event {EventId} of type {EventType}", 
                message.EventId, message.EventType);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to publish event {EventId}", message.EventId);

            // Increment attempt counter
            await _outboxRepo.IncrementAttemptAsync(message.EventId, ex.Message);

            // Check max attempts
            if (message.Attempts + 1 >= _options.MaxAttempts)
            {
                _logger.LogError("Event {EventId} exceeded max attempts ({MaxAttempts}), moving to DLQ",
                    message.EventId, _options.MaxAttempts);

                await MoveToDeadLetterAsync(message);
            }
        }
    }
}

  • Relay Worker Configuration

    {
      "OutboxRelay": {
        "Enabled": true,
        "PollingIntervalMs": 500,
        "BatchSize": 100,
        "MaxAttempts": 5,
        "ErrorBackoffMs": 5000,
        "ParallelWorkers": 1
      }
    }
    

  • Graceful Shutdown

  • Wait for current batch to complete
  • Don't start new batches during shutdown
  • Maximum shutdown wait: 30 seconds

Code Examples: - Complete OutboxRelayWorker implementation - Batch processing logic - Error handling and retry - Graceful shutdown

Diagrams: - Relay worker architecture - Batch processing flow - Error handling flow

Deliverables: - Relay worker implementation - Configuration guide - Operational procedures


Topic 8: Outbox Event Publishing

What will be covered: - Saving Events to Outbox

public class AuditEventService
{
    private readonly IUnitOfWork _unitOfWork;
    private readonly IAuditRepository _auditRepo;
    private readonly IOutboxRepository _outboxRepo;

    public async Task<string> AppendAuditEventAsync(AuditEvent auditEvent)
    {
        using var transaction = await _unitOfWork.BeginTransactionAsync();

        try
        {
            // 1. Save domain entity
            var auditRecord = auditEvent.ToAuditRecord();
            await _auditRepo.SaveAsync(auditRecord);

            // 2. Save event to outbox (same transaction)
            var outboxMessage = new OutboxMessage
            {
                EventId = Ulid.NewUlid().ToString(),
                AggregateType = "AuditEvent",
                AggregateId = auditRecord.Id,
                TenantId = auditEvent.TenantId,
                EventType = "audit.appended.v1",
                SchemaVersion = 1,
                PayloadJson = JsonSerializer.Serialize(new AuditAppendedEvent
                {
                    AuditRecordId = auditRecord.Id,
                    TenantId = auditEvent.TenantId,
                    OccurredAtUtc = auditEvent.OccurredAtUtc,
                    EventType = auditEvent.EventType
                }),
                IdempotencyKey = auditEvent.IdempotencyKey,
                CorrelationId = auditEvent.CorrelationId,
                Traceparent = Activity.Current?.Id,
                CreatedAtUtc = DateTime.UtcNow
            };

            await _outboxRepo.SaveAsync(outboxMessage);

            // 3. Commit transaction (atomic)
            await transaction.CommitAsync();

            return auditRecord.Id;
        }
        catch
        {
            await transaction.RollbackAsync();
            throw;
        }
    }
}

  • Event Serialization
  • Use System.Text.Json (performance)
  • Include type information for deserialization
  • Compress large payloads (>10KB)

Code Examples: - Complete event saving workflow - Transaction management - Event serialization patterns

Diagrams: - Event saving flow - Transaction boundaries

Deliverables: - Event saving implementation - Transaction patterns - Serialization guide


CYCLE 5: Inbox Pattern & Deduplication (~3,500 lines)

Topic 9: Inbox Pattern Architecture

What will be covered: - Inbox Pattern Overview

Message Consumption Flow:
1. Receive message from broker
2. Check Inbox: Has MessageId been processed?
3. If YES → ACK message, skip processing (duplicate)
4. If NO → Process message
5. Record MessageId in Inbox
6. ACK message

  • Inbox Deduplication Scope
  • Global Inbox: All messages across all consumers (single table)
  • Per-Consumer Inbox: Separate table per consumer (ATP uses this)
  • Per-Tenant Inbox: Partition by tenantId (performance)

  • Inbox TTL

  • Default: 7 days (covers typical retry windows)
  • Extended: 30 days for compliance (audit trail)
  • Automatic Cleanup: Background job removes expired entries

  • Inbox vs. Outbox | Aspect | Outbox (Producer) | Inbox (Consumer) | |--------|-------------------|------------------| | Purpose | Atomic publish guarantee | Duplicate detection | | Scope | Per-producer service | Per-consumer subscription | | Key | EventId (generated) | MessageId (from broker) | | Lifetime | Until published + 7 days | Until TTL expires | | Cleanup | Delete published events | Delete expired entries |

Code Examples: - Inbox pattern workflow - Scope comparison - TTL configuration

Diagrams: - Inbox pattern flow - Inbox vs. outbox comparison

Deliverables: - Inbox pattern overview - Scope decision guide - TTL policies


Topic 10: Inbox Consumer Pattern

What will be covered: - Idempotent Consumer Base Class

public abstract class IdempotentConsumer<TMessage> : IConsumer<TMessage>
    where TMessage : class
{
    private readonly IInboxRepository _inbox;
    private readonly ILogger _logger;

    protected IdempotentConsumer(IInboxRepository inbox, ILogger logger)
    {
        _inbox = inbox;
        _logger = logger;
    }

    public async Task Consume(ConsumeContext<TMessage> context)
    {
        var messageId = context.MessageId?.ToString();
        var consumerName = GetType().Name;

        if (string.IsNullOrEmpty(messageId))
        {
            _logger.LogWarning("Message received without MessageId, generating fallback");
            messageId = ComputeMessageIdFallback(context.Message);
        }

        // Check inbox for duplicate
        if (await _inbox.HasProcessedAsync(consumerName, messageId))
        {
            _logger.LogInformation("Duplicate message {MessageId} detected, skipping", messageId);

            // Track duplicate metric
            _metrics.IncrementDuplicateCount(consumerName);

            return;  // Idempotent: ACK without processing
        }

        try
        {
            // Process message (implemented by derived class)
            await ProcessMessageAsync(context);

            // Mark as processed in inbox
            await _inbox.MarkProcessedAsync(consumerName, messageId, DateTime.UtcNow);

            _logger.LogInformation("Successfully processed message {MessageId}", messageId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to process message {MessageId}", messageId);
            throw;  // Re-throw for MassTransit retry/DLQ handling
        }
    }

    protected abstract Task ProcessMessageAsync(ConsumeContext<TMessage> context);

    private string ComputeMessageIdFallback(TMessage message)
    {
        // Generate deterministic ID from message content
        var json = JsonSerializer.Serialize(message);
        var hash = SHA256.HashData(Encoding.UTF8.GetBytes(json));
        return Convert.ToHexString(hash);
    }
}

  • Concrete Consumer Example
    public class AuditAppendedConsumer : IdempotentConsumer<AuditAppendedEvent>
    {
        private readonly IProjectionRepository _projectionRepo;
    
        public AuditAppendedConsumer(
            IInboxRepository inbox,
            IProjectionRepository projectionRepo,
            ILogger<AuditAppendedConsumer> logger)
            : base(inbox, logger)
        {
            _projectionRepo = projectionRepo;
        }
    
        protected override async Task ProcessMessageAsync(ConsumeContext<AuditAppendedEvent> context)
        {
            var evt = context.Message;
    
            // Idempotent upsert (safe to call multiple times)
            await _projectionRepo.UpsertAuditRecordViewAsync(new AuditRecordView
            {
                AuditRecordId = evt.AuditRecordId,
                TenantId = evt.TenantId,
                OccurredAtUtc = evt.OccurredAtUtc,
                EventType = evt.EventType,
                Actor = evt.Actor,
                Resource = evt.Resource
            });
        }
    }
    

Code Examples: - IdempotentConsumer base class (complete) - Concrete consumer implementations (all ATP consumers) - Fallback MessageId generation - Duplicate tracking

Diagrams: - Idempotent consumer flow - Inheritance hierarchy

Deliverables: - IdempotentConsumer base class - Consumer implementations - Usage patterns


CYCLE 6: Inbox Table Schema & Implementation (~3,000 lines)

Topic 11: Inbox Database Schema

What will be covered: - Inbox Table Schema (SQL Server)

CREATE TABLE audit.Inbox (
    ConsumerName       VARCHAR(128) NOT NULL,
    MessageId          VARCHAR(50) NOT NULL,              -- ULID/UUIDv7 from broker

    TenantId           VARCHAR(64) NULL,                  -- For tenant-scoped cleanup

    ProcessedAtUtc     DATETIME2(3) NOT NULL DEFAULT SYSUTCDATETIME(),
    ExpiresAtUtc       DATETIME2(3) NOT NULL,             -- TTL window (7-30 days)

    ResultJson         NVARCHAR(MAX) NULL,                -- Optional: cache result for idempotent response

    CONSTRAINT PK_Inbox PRIMARY KEY (ConsumerName, MessageId),
    INDEX IX_Inbox_Expires (ExpiresAtUtc),
    INDEX IX_Inbox_Tenant (TenantId, ProcessedAtUtc)
);

  • MongoDB Schema (Alternative)

    public class InboxEntry
    {
        public string Id { get; set; }  // ConsumerName:MessageId
        public string ConsumerName { get; set; }
        public string MessageId { get; set; }
        public string TenantId { get; set; }
        public DateTime ProcessedAtUtc { get; set; }
        public DateTime ExpiresAtUtc { get; set; }
        public BsonDocument Result { get; set; }
    }
    
    // Compound index
    collection.Indexes.CreateOne(
        Builders<InboxEntry>.IndexKeys
            .Ascending(x => x.ConsumerName)
            .Ascending(x => x.MessageId),
        new CreateIndexOptions { Unique = true }
    );
    
    // TTL index
    collection.Indexes.CreateOne(
        Builders<InboxEntry>.IndexKeys.Ascending(x => x.ExpiresAtUtc),
        new CreateIndexOptions { ExpireAfter = TimeSpan.Zero }
    );
    

  • Inbox Partitioning Strategy

  • Partition by ConsumerName (each consumer isolated)
  • Sub-partition by TenantId (performance for multi-tenant cleanup)
  • Composite primary key (ConsumerName, MessageId)

Code Examples: - Complete inbox table schema (SQL + MongoDB) - FluentMigrator migration - Partitioning configuration

Diagrams: - Inbox table schema - Partitioning strategy

Deliverables: - Inbox table schemas - Migration scripts - Partitioning guide


Topic 12: Inbox Repository Implementation

What will be covered: - IInboxRepository Interface

public interface IInboxRepository
{
    Task<bool> HasProcessedAsync(string consumerName, string messageId);
    Task MarkProcessedAsync(string consumerName, string messageId, DateTime processedAtUtc);
    Task MarkProcessedWithResultAsync(string consumerName, string messageId, object result, DateTime processedAtUtc);
    Task<TResult> GetResultAsync<TResult>(string consumerName, string messageId);
    Task<int> DeleteExpiredAsync();
}

  • NHibernate Implementation

    public class InboxRepository : IInboxRepository
    {
        private readonly IUnitOfWork _unitOfWork;
        private readonly InboxOptions _options;
    
        public async Task<bool> HasProcessedAsync(string consumerName, string messageId)
        {
            var session = _unitOfWork.GetSession();
    
            var count = await session.Query<InboxEntry>()
                .Where(e => e.ConsumerName == consumerName && e.MessageId == messageId)
                .CountAsync();
    
            return count > 0;
        }
    
        public async Task MarkProcessedAsync(string consumerName, string messageId, DateTime processedAtUtc)
        {
            var session = _unitOfWork.GetSession();
    
            var entry = new InboxEntry
            {
                ConsumerName = consumerName,
                MessageId = messageId,
                ProcessedAtUtc = processedAtUtc,
                ExpiresAtUtc = processedAtUtc.AddDays(_options.TtlDays)
            };
    
            await session.SaveAsync(entry);
        }
    
        public async Task<int> DeleteExpiredAsync()
        {
            var session = _unitOfWork.GetSession();
    
            return await session.Query<InboxEntry>()
                .Where(e => e.ExpiresAtUtc < DateTime.UtcNow)
                .DeleteAsync();
        }
    }
    

  • Redis-Based Inbox (Performance)

    public class RedisInboxRepository : IInboxRepository
    {
        private readonly IConnectionMultiplexer _redis;
        private readonly InboxOptions _options;
    
        public async Task<bool> HasProcessedAsync(string consumerName, string messageId)
        {
            var db = _redis.GetDatabase();
            var key = $"inbox:{consumerName}:{messageId}";
    
            return await db.KeyExistsAsync(key);
        }
    
        public async Task MarkProcessedAsync(string consumerName, string messageId, DateTime processedAtUtc)
        {
            var db = _redis.GetDatabase();
            var key = $"inbox:{consumerName}:{messageId}";
            var ttl = TimeSpan.FromDays(_options.TtlDays);
    
            await db.StringSetAsync(key, processedAtUtc.ToString("O"), ttl);
        }
    }
    

Code Examples: - Complete IInboxRepository implementations (SQL + Redis) - FluentNHibernate mapping - Cleanup job - Performance comparison

Diagrams: - Inbox repository architecture - SQL vs. Redis comparison

Deliverables: - Inbox repository implementations - Performance benchmarks - Selection guide


CYCLE 7: Idempotency Gate for HTTP APIs (~4,000 lines)

Topic 13: Idempotency-Key Header Middleware

What will be covered: - Idempotency-Key Header Standard

POST /api/v1/events HTTP/1.1
Content-Type: application/json
Idempotency-Key: acme-corp:app-123:evt-2025-10-30-001
X-Tenant-Id: acme-corp

{
  "eventType": "user.login",
  "occurredAt": "2025-10-30T12:34:56Z",
  ...
}

  • Key Format: {tenantId}:{sourceAppId}:{uniqueSequence}
  • Scope: Scoped to tenant (different tenants can use same key)
  • TTL: 24 hours (idempotency window)
  • Optional: For GET/DELETE (safe methods), required for POST/PUT/PATCH

  • Idempotency Middleware

    public class IdempotencyMiddleware
    {
        private readonly RequestDelegate _next;
        private readonly IIdempotencyGate _gate;
        private readonly ILogger<IdempotencyMiddleware> _logger;
    
        public async Task InvokeAsync(HttpContext context)
        {
            // Only apply to mutating operations
            if (!IsMutatingMethod(context.Request.Method))
            {
                await _next(context);
                return;
            }
    
            var tenantId = context.GetTenantId();
            var idempotencyKey = context.Request.Headers["Idempotency-Key"].ToString();
    
            if (string.IsNullOrEmpty(idempotencyKey))
            {
                // Idempotency-Key required for POST/PUT/PATCH
                context.Response.StatusCode = 400;
                await context.Response.WriteAsJsonAsync(new
                {
                    Error = "IdempotencyKeyRequired",
                    Message = "Idempotency-Key header is required for mutating operations"
                });
                return;
            }
    
            // Check idempotency gate
            var gateResult = await _gate.TryReserveAsync(tenantId, idempotencyKey, context);
    
            if (gateResult.IsDuplicate)
            {
                _logger.LogInformation("Duplicate request detected for key {Key}", idempotencyKey);
    
                // Return cached result
                context.Response.StatusCode = gateResult.StatusCode;
                await context.Response.WriteAsync(gateResult.CachedResponse);
                return;
            }
    
            if (gateResult.IsConflict)
            {
                _logger.LogWarning("Idempotency conflict for key {Key}", idempotencyKey);
    
                // Return 409 Conflict
                context.Response.StatusCode = 409;
                await context.Response.WriteAsJsonAsync(new
                {
                    Error = "IdempotencyConflict",
                    Message = "Same idempotency key used with different request body"
                });
                return;
            }
    
            // First-time request, proceed
            // Wrap response stream to capture result for caching
            var originalBodyStream = context.Response.Body;
            using var responseBody = new MemoryStream();
            context.Response.Body = responseBody;
    
            await _next(context);
    
            // Cache successful result
            if (context.Response.StatusCode >= 200 && context.Response.StatusCode < 300)
            {
                responseBody.Seek(0, SeekOrigin.Begin);
                var responseText = await new StreamReader(responseBody).ReadToEndAsync();
    
                await _gate.StoreResultAsync(tenantId, idempotencyKey, 
                    context.Response.StatusCode, responseText);
            }
    
            // Copy response back to original stream
            responseBody.Seek(0, SeekOrigin.Begin);
            await responseBody.CopyToAsync(originalBodyStream);
        }
    
        private bool IsMutatingMethod(string method)
        {
            return method == "POST" || method == "PUT" || method == "PATCH";
        }
    }
    

  • Idempotency Gate

    public interface IIdempotencyGate
    {
        Task<IdempotencyGateResult> TryReserveAsync(string tenantId, string idempotencyKey, HttpContext context);
        Task StoreResultAsync(string tenantId, string idempotencyKey, int statusCode, string responseBody);
    }
    
    public class IdempotencyGateResult
    {
        public bool IsDuplicate { get; set; }
        public bool IsConflict { get; set; }
        public int StatusCode { get; set; }
        public string CachedResponse { get; set; }
    }
    

Code Examples: - Complete idempotency middleware - Idempotency gate implementation - Response caching - Conflict detection

Diagrams: - Middleware flow - Gate decision logic

Deliverables: - Idempotency middleware - Gate implementation - Integration guide


Topic 14: Idempotency Gate Implementation

What will be covered: - Idempotency Table Schema

CREATE TABLE audit.Idempotency (
    TenantId           VARCHAR(64) NOT NULL,
    KeyHash            CHAR(64) NOT NULL,                 -- SHA-256 of idempotency key

    RecordId           VARCHAR(26) NOT NULL,              -- ULID of created resource
    PayloadHash        CHAR(64) NOT NULL,                 -- SHA-256 of request body

    StatusCode         INT NOT NULL,
    ResponseBody       NVARCHAR(MAX) NULL,                -- Cached response

    CreatedAtUtc       DATETIME2(3) NOT NULL DEFAULT SYSUTCDATETIME(),
    ExpiresAtUtc       DATETIME2(3) NOT NULL,             -- 24-hour TTL

    CONSTRAINT PK_Idempotency PRIMARY KEY (TenantId, KeyHash),
    INDEX IX_Idempotency_Expires (ExpiresAtUtc)
);

  • Gate Implementation
    public class SqlIdempotencyGate : IIdempotencyGate
    {
        private readonly IDbConnection _connection;
        private readonly ILogger<SqlIdempotencyGate> _logger;
    
        public async Task<IdempotencyGateResult> TryReserveAsync(
            string tenantId, 
            string idempotencyKey, 
            HttpContext context)
        {
            var keyHash = ComputeHash(tenantId, idempotencyKey);
            var payloadHash = await ComputePayloadHashAsync(context.Request);
    
            // MERGE query for atomic check-and-insert
            var sql = @"
                MERGE audit.Idempotency AS target
                USING (SELECT @TenantId AS TenantId, @KeyHash AS KeyHash) AS source
                ON (target.TenantId = source.TenantId AND target.KeyHash = source.KeyHash)
                WHEN NOT MATCHED THEN
                    INSERT (TenantId, KeyHash, RecordId, PayloadHash, StatusCode, ExpiresAtUtc)
                    VALUES (@TenantId, @KeyHash, @RecordId, @PayloadHash, 0, DATEADD(day, 1, SYSUTCDATETIME()))
                WHEN MATCHED AND target.PayloadHash = @PayloadHash THEN
                    UPDATE SET ExpiresAtUtc = DATEADD(day, 1, SYSUTCDATETIME())  -- Refresh TTL
                WHEN MATCHED AND target.PayloadHash <> @PayloadHash THEN
                    UPDATE SET RecordId = target.RecordId  -- No-op to detect conflict
                OUTPUT $action, inserted.RecordId, inserted.StatusCode, inserted.ResponseBody;";
    
            var result = await _connection.QuerySingleAsync(sql, new
            {
                TenantId = tenantId,
                KeyHash = keyHash,
                RecordId = Ulid.NewUlid().ToString(),
                PayloadHash = payloadHash
            });
    
            if (result.Action == "INSERT")
            {
                // First-time request
                return new IdempotencyGateResult { IsDuplicate = false, IsConflict = false };
            }
            else if (result.Action == "UPDATE" && result.PayloadHash == payloadHash)
            {
                // Exact duplicate
                return new IdempotencyGateResult
                {
                    IsDuplicate = true,
                    StatusCode = result.StatusCode,
                    CachedResponse = result.ResponseBody
                };
            }
            else
            {
                // Conflict (same key, different payload)
                return new IdempotencyGateResult { IsConflict = true };
            }
        }
    
        private string ComputeHash(string tenantId, string key)
        {
            // Tenant-scoped hash (prevents cross-tenant collisions)
            var input = $"{tenantId}:{key}";
            var hash = SHA256.HashData(Encoding.UTF8.GetBytes(input));
            return Convert.ToHexString(hash);
        }
    
        private async Task<string> ComputePayloadHashAsync(HttpRequest request)
        {
            request.EnableBuffering();  // Allow multiple reads
            using var reader = new StreamReader(request.Body, leaveOpen: true);
            var body = await reader.ReadToEndAsync();
            request.Body.Position = 0;  // Reset for next reader
    
            var hash = SHA256.HashData(Encoding.UTF8.GetBytes(body));
            return Convert.ToHexString(hash);
        }
    }
    

Code Examples: - Complete idempotency gate implementation (SQL + Redis) - MERGE query for atomic operations - Hash computation - Conflict detection

Diagrams: - Gate SQL MERGE flow - Hash computation

Deliverables: - Idempotency gate implementation - Table schema - Testing guide


CYCLE 8: Idempotent Message Consumers (~3,500 lines)

Topic 15: Idempotent Handler Patterns

What will be covered: - Upsert Pattern (Naturally Idempotent)

public class ProjectionUpdatedConsumer : IdempotentConsumer<ProjectionUpdatedEvent>
{
    protected override async Task ProcessMessageAsync(ConsumeContext<ProjectionUpdatedEvent> context)
    {
        var evt = context.Message;

        // Upsert is idempotent (same result if called multiple times)
        await _repository.UpsertAsync(new AuditRecordView
        {
            Id = evt.AuditRecordId,  // Primary key
            TenantId = evt.TenantId,
            EventType = evt.EventType,
            OccurredAtUtc = evt.OccurredAtUtc,
            UpdatedAtUtc = DateTime.UtcNow
        });
    }
}

  • Conditional Side-Effect Pattern

    public class ExportCompletedConsumer : IdempotentConsumer<ExportCompletedEvent>
    {
        protected override async Task ProcessMessageAsync(ConsumeContext<ExportCompletedEvent> context)
        {
            var evt = context.Message;
    
            // Update export job status (idempotent)
            var job = await _exportRepo.GetAsync(evt.ExportJobId);
            job.Status = ExportStatus.Completed;
            job.CompletedAtUtc = DateTime.UtcNow;
            await _exportRepo.UpdateAsync(job);
    
            // Send webhook (check if already sent)
            if (!await _webhookLog.HasSentAsync(evt.ExportJobId))
            {
                await _webhookService.SendAsync(new ExportCompletedWebhook
                {
                    ExportJobId = evt.ExportJobId,
                    DownloadUrl = evt.DownloadUrl
                });
    
                await _webhookLog.MarkSentAsync(evt.ExportJobId);
            }
        }
    }
    

  • Version-Based Idempotency (Optimistic Concurrency)

    public class PolicyUpdatedConsumer : IdempotentConsumer<PolicyUpdatedEvent>
    {
        protected override async Task ProcessMessageAsync(ConsumeContext<PolicyUpdatedEvent> context)
        {
            var evt = context.Message;
    
            // Load entity with version
            var policy = await _policyRepo.GetAsync(evt.PolicyId);
    
            // Check if already applied (version-based idempotency)
            if (policy.Version >= evt.PolicyVersion)
            {
                _logger.LogInformation("Policy {PolicyId} already at version {Version}, skipping",
                    evt.PolicyId, policy.Version);
                return;  // Already processed
            }
    
            // Apply update
            policy.ApplyUpdate(evt);
            policy.Version = evt.PolicyVersion;
    
            await _policyRepo.UpdateAsync(policy);
        }
    }
    

Code Examples: - All idempotent handler patterns (upsert, conditional, version-based) - ATP-specific consumer implementations - Side-effect tracking

Diagrams: - Idempotent handler patterns - Version-based flow

Deliverables: - Idempotent handler library - Pattern catalog - Best practices


Topic 16: Inbox + Handler Integration

What will be covered: - Complete Idempotent Consumer Flow

public class AuditAppendedConsumer : IConsumer<AuditAppendedEvent>
{
    private readonly IInboxRepository _inbox;
    private readonly IProjectionRepository _projection;
    private readonly IUnitOfWork _unitOfWork;

    public async Task Consume(ConsumeContext<AuditAppendedEvent> context)
    {
        var messageId = context.MessageId?.ToString() ?? throw new InvalidOperationException("MessageId required");
        var consumerName = GetType().Name;
        var evt = context.Message;

        using var transaction = await _unitOfWork.BeginTransactionAsync();

        try
        {
            // 1. Check inbox (duplicate detection)
            if (await _inbox.HasProcessedAsync(consumerName, messageId))
            {
                _logger.LogInformation("Duplicate message {MessageId}, skipping", messageId);
                return;  // Idempotent return
            }

            // 2. Process message (idempotent operations)
            await _projection.UpsertAuditRecordViewAsync(new AuditRecordView
            {
                AuditRecordId = evt.AuditRecordId,
                TenantId = evt.TenantId,
                OccurredAtUtc = evt.OccurredAtUtc,
                EventType = evt.EventType
            });

            // 3. Mark as processed in inbox
            await _inbox.MarkProcessedAsync(consumerName, messageId, DateTime.UtcNow);

            // 4. Commit transaction (processing + inbox = atomic)
            await transaction.CommitAsync();

            _logger.LogInformation("Successfully processed message {MessageId}", messageId);
        }
        catch (Exception ex)
        {
            await transaction.RollbackAsync();
            _logger.LogError(ex, "Failed to process message {MessageId}", messageId);
            throw;  // MassTransit will retry
        }
    }
}

  • Transaction Scope
  • Inbox record + domain changes in single transaction
  • If transaction fails, message redelivered (safe retry)
  • If transaction succeeds, inbox prevents duplicate processing

Code Examples: - Complete idempotent consumer with inbox - Transaction management - Error handling

Diagrams: - Complete consumer flow - Transaction boundaries

Deliverables: - Complete consumer template - Transaction patterns - Error handling guide


CYCLE 9: MassTransit Outbox Integration (~3,000 lines)

Topic 17: MassTransit Outbox Configuration

What will be covered: - MassTransit Built-In Outbox

services.AddMassTransit(cfg =>
{
    // Register consumers, sagas
    cfg.AddConsumer<AuditAppendedConsumer>();
    cfg.AddSagaStateMachine<ExportSaga, ExportSagaState>();

    cfg.UsingAzureServiceBus((context, busCfg) =>
    {
        busCfg.Host("atp-prod.servicebus.windows.net");

        // Enable outbox (Entity Framework)
        busCfg.UseEntityFrameworkOutbox<AuditDbContext>(outbox =>
        {
            outbox.UsePostgres();  // Or UseSqlServer()

            // Query delay (polling interval)
            outbox.QueryDelay = TimeSpan.FromMilliseconds(500);

            // Batch size
            outbox.QueryMessageLimit = 100;

            // Isolation level
            outbox.IsolationLevel = IsolationLevel.ReadCommitted;

            // Lock statement timeout
            outbox.LockStatementTimeout = TimeSpan.FromSeconds(30);
        });

        busCfg.ConfigureEndpoints(context);
    });
});

  • DbContext Configuration

    public class AuditDbContext : DbContext
    {
        public DbSet<AuditRecord> AuditRecords { get; set; }
        public DbSet<OutboxMessage> OutboxMessages { get; set; }
        public DbSet<OutboxState> OutboxStates { get; set; }
    
        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            base.OnModelCreating(modelBuilder);
    
            // MassTransit outbox tables
            modelBuilder.AddInboxStateEntity();
            modelBuilder.AddOutboxStateEntity();
            modelBuilder.AddOutboxMessageEntity();
        }
    }
    

  • Publishing with Outbox

    public class AuditService
    {
        private readonly AuditDbContext _dbContext;
        private readonly IPublishEndpoint _publishEndpoint;
    
        public async Task<string> AppendAuditEventAsync(AuditEvent auditEvent)
        {
            using var transaction = await _dbContext.Database.BeginTransactionAsync();
    
            try
            {
                // 1. Save domain entity
                var auditRecord = auditEvent.ToAuditRecord();
                _dbContext.AuditRecords.Add(auditRecord);
                await _dbContext.SaveChangesAsync();
    
                // 2. Publish event (saved to outbox automatically)
                await _publishEndpoint.Publish(new AuditAppendedEvent
                {
                    AuditRecordId = auditRecord.Id,
                    TenantId = auditEvent.TenantId,
                    OccurredAtUtc = auditEvent.OccurredAtUtc
                });
    
                // 3. Commit (domain + outbox atomic)
                await transaction.CommitAsync();
    
                return auditRecord.Id;
            }
            catch
            {
                await transaction.RollbackAsync();
                throw;
            }
        }
    }
    

Code Examples: - MassTransit outbox configuration (complete) - DbContext setup - Publishing with outbox - Migration for outbox tables

Diagrams: - MassTransit outbox architecture - DbContext + outbox integration

Deliverables: - MassTransit outbox setup - DbContext configuration - Publishing patterns


Topic 18: Custom Outbox Implementation

What will be covered: - When to Use Custom Outbox - Using NHibernate (not Entity Framework) - Custom serialization requirements - Advanced partition strategies - Integration with Orleans or other frameworks

  • Custom Outbox Service
    public class CustomOutboxService : IOutboxService
    {
        private readonly IOutboxRepository _outboxRepo;
        private readonly IUnitOfWork _unitOfWork;
    
        public async Task PublishWithOutboxAsync<TEvent>(
            TEvent domainEvent,
            Func<Task> domainOperation)
            where TEvent : IDomainEvent
        {
            using var transaction = await _unitOfWork.BeginTransactionAsync();
    
            try
            {
                // 1. Execute domain operation (e.g., save aggregate)
                await domainOperation();
    
                // 2. Save event to outbox
                var outboxMessage = new OutboxMessage
                {
                    EventId = domainEvent.EventId ?? Ulid.NewUlid().ToString(),
                    AggregateType = domainEvent.AggregateType,
                    AggregateId = domainEvent.AggregateId,
                    TenantId = domainEvent.TenantId,
                    EventType = domainEvent.GetType().Name,
                    PayloadJson = JsonSerializer.Serialize(domainEvent),
                    CorrelationId = Activity.Current?.TraceId.ToString(),
                    CreatedAtUtc = DateTime.UtcNow
                };
    
                await _outboxRepo.SaveAsync(outboxMessage);
    
                // 3. Commit transaction
                await transaction.CommitAsync();
            }
            catch
            {
                await transaction.RollbackAsync();
                throw;
            }
        }
    }
    

Code Examples: - Custom outbox service implementation - NHibernate integration - Orleans integration - Comparison with MassTransit outbox

Diagrams: - Custom vs. MassTransit outbox - Integration patterns

Deliverables: - Custom outbox implementation - Framework integration guides - Selection criteria


CYCLE 10: MassTransit Inbox Integration (~3,000 lines)

Topic 19: MassTransit Inbox Configuration

What will be covered: - MassTransit Built-In Inbox

services.AddMassTransit(cfg =>
{
    cfg.AddConsumer<AuditAppendedConsumer>();

    cfg.UsingAzureServiceBus((context, busCfg) =>
    {
        busCfg.Host("atp-prod.servicebus.windows.net");

        busCfg.ReceiveEndpoint("projection-queue", e =>
        {
            // Enable inbox (deduplication)
            e.UseEntityFrameworkOutbox<AuditDbContext>(inbox =>
            {
                // Inbox configuration
                inbox.UseSqlServer();
                inbox.DuplicateDetectionWindow = TimeSpan.FromDays(7);
            });

            e.ConfigureConsumer<AuditAppendedConsumer>(context);
        });
    });
});

  • Inbox State Table (MassTransit)
    CREATE TABLE InboxState (
        Id                 BIGINT IDENTITY(1,1) PRIMARY KEY,
        MessageId          UNIQUEIDENTIFIER NOT NULL,
        ConsumerId         UNIQUEIDENTIFIER NOT NULL,
        LockId             UNIQUEIDENTIFIER NOT NULL,
    
        RowVersion         TIMESTAMP,
    
        Received           DATETIME2 NOT NULL,
        ReceiveCount       INT NOT NULL,
        ExpirationTime     DATETIME2 NULL,
    
        Consumed           DATETIME2 NULL,
        Delivered          DATETIME2 NULL,
    
        LastSequenceNumber BIGINT NULL,
    
        UNIQUE (MessageId, ConsumerId)
    );
    

Code Examples: - MassTransit inbox configuration - DbContext setup - Consumer with inbox

Diagrams: - MassTransit inbox architecture - Inbox state table

Deliverables: - MassTransit inbox setup - Configuration guide - Integration patterns


Topic 20: Custom Inbox Implementation

What will be covered: - Custom Inbox for NHibernate

public class NHibernateInboxRepository : IInboxRepository
{
    private readonly ISessionFactory _sessionFactory;

    public async Task<bool> HasProcessedAsync(string consumerName, string messageId)
    {
        using var session = _sessionFactory.OpenSession();

        var count = await session.Query<InboxEntry>()
            .Where(e => e.ConsumerName == consumerName && e.MessageId == messageId)
            .CountAsync();

        return count > 0;
    }

    public async Task MarkProcessedAsync(string consumerName, string messageId, DateTime processedAtUtc)
    {
        using var session = _sessionFactory.OpenSession();
        using var transaction = session.BeginTransaction();

        try
        {
            var entry = new InboxEntry
            {
                ConsumerName = consumerName,
                MessageId = messageId,
                ProcessedAtUtc = processedAtUtc,
                ExpiresAtUtc = processedAtUtc.AddDays(7)
            };

            await session.SaveAsync(entry);
            await transaction.CommitAsync();
        }
        catch
        {
            await transaction.RollbackAsync();
            throw;
        }
    }
}

Code Examples: - Custom inbox for NHibernate - Custom inbox for MongoDB - Integration with MassTransit

Diagrams: - Custom inbox architecture

Deliverables: - Custom inbox implementations - Integration guide - Performance comparison


CYCLE 11: Idempotency Key Generation & Storage (~3,000 lines)

Topic 21: Idempotency Key Generation

What will be covered: - Key Generation Strategies

// 1. Client-Generated Keys (Preferred)
var idempotencyKey = $"{tenantId}:{sourceAppId}:{Ulid.NewUlid()}";

// 2. Sequence-Based Keys
var idempotencyKey = $"{tenantId}:{sourceAppId}:seq-{sequenceNumber}";

// 3. Content-Based Keys (Deterministic Hash)
var canonicalJson = JsonSerializer.Serialize(request, new JsonSerializerOptions
{
    PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
    DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
    WriteIndented = false
});
var contentHash = Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(canonicalJson)));
var idempotencyKey = $"{tenantId}:content:{contentHash}";

// 4. Time-Based Keys (ULID - sortable, time-ordered)
var ulid = Ulid.NewUlid();  // Encodes timestamp + randomness
var idempotencyKey = $"{tenantId}:{sourceAppId}:{ulid}";

  • ATP Key Format Standard

    Format: {tenantId}:{sourceAppId}:{uniquePart}
    
    Examples:
    - acme-corp:erp-system:01HZX123456789ABCDEFGHJ
    - contoso:mobile-app:seq-12345
    - fabrikam:webhook:content-a1b2c3d4...
    
    Rules:
    - Max length: 128 characters
    - Allowed chars: a-zA-Z0-9:-_
    - Case-sensitive
    - Tenant-scoped (same key different tenants = different operations)
    

  • Key Security

  • Never expose internal IDs (use ULIDs or hashes)
  • Hash keys before storage (prevent enumeration)
  • Tenant-scoped (cross-tenant key reuse safe)
  • TTL-based expiration (no infinite retention)

Code Examples: - Key generation strategies (all) - ATP key format validator - Key hashing implementation - Security best practices

Diagrams: - Key generation strategies - Key format standard

Deliverables: - Key generation library - Format validator - Security guide


Topic 22: Idempotency Key Storage

What will be covered: - Storage Options | Storage | Pros | Cons | ATP Usage | |---------|------|------|-----------| | Redis | Fast (<5ms), TTL native, distributed | Volatile (data loss on restart) | Inbox cache (layer 1) | | SQL Database | Durable, transactional, queryable | Slower (~10-50ms) | Idempotency gate (authoritative) | | MongoDB | Fast, flexible schema, TTL index | Eventual consistency | Alternative to SQL | | Cosmos DB | Global distribution, low latency | Cost, complexity | Multi-region ATP |

  • Hybrid Approach (ATP Recommended)
    public class HybridIdempotencyGate : IIdempotencyGate
    {
        private readonly IDistributedCache _cache;  // Redis (L1)
        private readonly IIdempotencyRepository _db;  // SQL (L2, authoritative)
    
        public async Task<IdempotencyGateResult> TryReserveAsync(
            string tenantId, 
            string idempotencyKey, 
            HttpContext context)
        {
            var cacheKey = $"idem:{tenantId}:{idempotencyKey}";
    
            // 1. Check Redis cache (fast path)
            var cachedResult = await _cache.GetStringAsync(cacheKey);
            if (cachedResult != null)
            {
                var result = JsonSerializer.Deserialize<IdempotencyGateResult>(cachedResult);
                return result;  // Cache hit (~2ms)
            }
    
            // 2. Check database (authoritative, slower)
            var dbResult = await _db.TryReserveAsync(tenantId, idempotencyKey, context);
    
            // 3. Cache result in Redis (24-hour TTL)
            var ttl = TimeSpan.FromHours(24);
            await _cache.SetStringAsync(cacheKey, JsonSerializer.Serialize(dbResult), new DistributedCacheEntryOptions
            {
                AbsoluteExpirationRelativeToNow = ttl
            });
    
            return dbResult;  // Database result (~15ms)
        }
    }
    

Code Examples: - Storage comparison benchmarks - Hybrid implementation (Redis + SQL) - TTL configuration - Cache invalidation

Diagrams: - Storage options comparison - Hybrid architecture

Deliverables: - Storage selection guide - Hybrid implementation - Performance benchmarks


CYCLE 12: Conflict Detection & Resolution (~3,000 lines)

Topic 23: Idempotency Conflict Detection

What will be covered: - Conflict Types

1. Exact Duplicate
   - Same idempotency key
   - Same request payload (hash match)
   - Response: 200 OK with cached result

2. Semantic Conflict
   - Same idempotency key
   - Different request payload (hash mismatch)
   - Response: 409 Conflict

3. Expired Key Reuse
   - Same idempotency key (TTL expired)
   - May be exact duplicate or conflict
   - Response: Treat as new request or 409 (configurable)

  • Payload Hash Comparison

    public class PayloadHasher
    {
        public string ComputeCanonicalHash(HttpRequest request)
        {
            // Read request body
            request.EnableBuffering();
            using var reader = new StreamReader(request.Body, leaveOpen: true);
            var body = reader.ReadToEnd();
            request.Body.Position = 0;
    
            // Parse JSON
            var jsonDoc = JsonDocument.Parse(body);
    
            // Remove non-material fields (correlation IDs, timestamps)
            var materialFields = ExtractMaterialFields(jsonDoc);
    
            // Serialize canonically (sorted keys, no whitespace)
            var canonical = JsonSerializer.Serialize(materialFields, new JsonSerializerOptions
            {
                PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
                WriteIndented = false,
                DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
                Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping
            });
    
            // Compute SHA-256 hash
            var hash = SHA256.HashData(Encoding.UTF8.GetBytes(canonical));
            return Convert.ToHexString(hash);
        }
    
        private Dictionary<string, object> ExtractMaterialFields(JsonDocument doc)
        {
            var material = new Dictionary<string, object>();
    
            foreach (var property in doc.RootElement.EnumerateObject())
            {
                // Skip non-material fields
                if (IsNonMaterialField(property.Name))
                {
                    continue;
                }
    
                material[property.Name] = property.Value;
            }
    
            return material;
        }
    
        private bool IsNonMaterialField(string fieldName)
        {
            // Fields that don't affect business meaning
            var nonMaterialFields = new[]
            {
                "correlationId", "traceId", "traceparent", "requestId",
                "timestamp", "requestedAt", "clientVersion"
            };
    
            return nonMaterialFields.Contains(fieldName, StringComparer.OrdinalIgnoreCase);
        }
    }
    

  • Conflict Response

    HTTP/1.1 409 Conflict
    Content-Type: application/problem+json
    
    {
      "type": "https://audit.connectsoft.io/problems/idempotency-conflict",
      "title": "Idempotency Conflict",
      "status": 409,
      "detail": "The same idempotency key was used with different request content",
      "idempotencyKey": "acme-corp:app-123:evt-001",
      "originalRequestHash": "a1b2c3d4...",
      "currentRequestHash": "e5f6g7h8...",
      "firstSeenAt": "2025-10-30T12:00:00Z"
    }
    

Code Examples: - Payload hash computation (complete) - Canonical JSON serialization - Conflict detection logic - Conflict response formatting

Diagrams: - Conflict detection flow - Hash computation process

Deliverables: - Payload hasher implementation - Conflict detection logic - Response standards


Topic 24: Conflict Resolution Strategies

What will be covered: - Resolution Strategies

Strategy 1: Strict (ATP Default)
  - Reject conflicts with 409
  - Client must use different idempotency key
  - Preserves audit trail integrity

Strategy 2: Last-Write-Wins
  - Accept conflict, overwrite with latest
  - Risk: Lost updates
  - Use case: Non-critical operations

Strategy 3: Content-Based
  - If content identical (hash match), treat as exact duplicate
  - If content differs, reject with 409
  - ATP uses this for content-hash fallback keys

Strategy 4: Warn-But-Allow
  - Log conflict warning
  - Process request (generate new ID)
  - Return warning in response
  - Use case: Development/testing only

  • ATP Conflict Policy
  • Default: Strict (reject with 409)
  • Content Hash Keys: Content-based (hash match = duplicate)
  • Development: Warn-but-allow (feature flag)

Code Examples: - All resolution strategies - ATP policy configuration - Feature flag for strategy selection

Diagrams: - Resolution strategy decision tree - Policy comparison

Deliverables: - Resolution strategy implementations - Policy configuration - Selection guide


CYCLE 13: Distributed Idempotency (~2,500 lines)

Topic 25: Cross-Service Idempotency

What will be covered: - Distributed Idempotency Challenges - Multiple services processing same request - Each service needs idempotency (not just first service) - Consistent key propagation across services

  • Idempotency Key Propagation

    // Service A (Ingestion)
    [HttpPost("api/v1/events")]
    public async Task<IActionResult> IngestEvent([FromHeader(Name = "Idempotency-Key")] string idempotencyKey)
    {
        // Check idempotency gate
        var gateResult = await _gate.TryReserveAsync(tenantId, idempotencyKey);
    
        if (gateResult.IsDuplicate)
        {
            return Ok(gateResult.CachedResult);
        }
    
        // Process and publish event
        await _bus.Publish(new AuditAppendedEvent
        {
            AuditRecordId = recordId,
            IdempotencyKey = idempotencyKey,  // Propagate key
            TenantId = tenantId
        });
    
        return Created();
    }
    
    // Service B (Projection)
    public class AuditAppendedConsumer : IConsumer<AuditAppendedEvent>
    {
        public async Task Consume(ConsumeContext<AuditAppendedEvent> context)
        {
            var evt = context.Message;
    
            // Use propagated idempotency key for Service B's inbox
            var messageId = context.MessageId?.ToString();
            var idempotencyKey = evt.IdempotencyKey;  // From event
    
            // Check Service B's inbox
            if (await _inbox.HasProcessedAsync("AuditAppendedConsumer", messageId))
            {
                return;  // Duplicate in Service B
            }
    
            // Process...
        }
    }
    

  • Correlation-Based Idempotency

  • Use CorrelationId for end-to-end deduplication
  • Each service tracks (ConsumerName, CorrelationId)
  • Allows replay with same CorrelationId

Code Examples: - Cross-service idempotency patterns - Key propagation - Correlation-based deduplication

Diagrams: - Distributed idempotency flow - Key propagation across services

Deliverables: - Cross-service patterns - Propagation guide - Correlation strategies


Topic 26: Saga Idempotency

What will be covered: - Saga State Idempotency

public class ExportSaga : MassTransitStateMachine<ExportSagaState>
{
    public ExportSaga()
    {
        InstanceState(x => x.CurrentState);

        Event(() => ExportRequested, x => x.CorrelateById(ctx => ctx.Message.ExportJobId));
        Event(() => ExportCompleted, x => x.CorrelateById(ctx => ctx.Message.ExportJobId));

        Initially(
            When(ExportRequested)
                .Then(ctx =>
                {
                    // Idempotent state transition
                    ctx.Saga.ExportJobId = ctx.Message.ExportJobId;
                    ctx.Saga.TenantId = ctx.Message.TenantId;
                    ctx.Saga.StartedAt = DateTime.UtcNow;
                })
                .TransitionTo(Processing)
                .PublishAsync(ctx => ctx.Init<StartExport>(new
                {
                    ctx.Message.ExportJobId,
                    ctx.Message.TenantId
                }))
        );

        During(Processing,
            When(ExportCompleted)
                .Then(ctx =>
                {
                    ctx.Saga.CompletedAt = DateTime.UtcNow;
                })
                .TransitionTo(Completed)
                .Finalize()
        );
    }
}

  • Saga Idempotency Guarantees
  • State transitions idempotent (same event = same state)
  • Correlation by business key (ExportJobId)
  • Version-based optimistic concurrency
  • Duplicate events ignored (already in target state)

Code Examples: - Idempotent saga implementation - State transition patterns - Correlation configuration

Diagrams: - Saga state machine with idempotency - Correlation flow

Deliverables: - Saga idempotency guide - State transition patterns - Correlation strategies


CYCLE 14: Testing Outbox/Inbox/Idempotency (~2,500 lines)

Topic 27: Testing Strategies

What will be covered: - Unit Testing Outbox

[TestClass]
public class OutboxRepositoryTests
{
    [TestMethod]
    public async Task Should_SaveEventToOutbox()
    {
        // Arrange
        var outboxRepo = CreateOutboxRepository();
        var message = new OutboxMessage
        {
            EventId = Ulid.NewUlid().ToString(),
            EventType = "test.event.v1",
            PayloadJson = "{\"test\":true}"
        };

        // Act
        await outboxRepo.SaveAsync(message);

        // Assert
        var unpublished = await outboxRepo.GetUnpublishedAsync();
        Assert.AreEqual(1, unpublished.Count);
        Assert.AreEqual(message.EventId, unpublished[0].EventId);
    }

    [TestMethod]
    public async Task Should_MarkAsPublished()
    {
        // Arrange
        var outboxRepo = CreateOutboxRepository();
        var message = new OutboxMessage { EventId = Ulid.NewUlid().ToString(), ... };
        await outboxRepo.SaveAsync(message);

        // Act
        await outboxRepo.MarkAsPublishedAsync(message.EventId, DateTime.UtcNow);

        // Assert
        var unpublished = await outboxRepo.GetUnpublishedAsync();
        Assert.AreEqual(0, unpublished.Count);
    }
}

  • Integration Testing Inbox

    [TestClass]
    public class InboxIntegrationTests
    {
        [TestMethod]
        public async Task Should_DetectDuplicate()
        {
            // Arrange
            var inbox = CreateInboxRepository();
            var messageId = Ulid.NewUlid().ToString();
    
            // Act - First delivery
            var firstCheck = await inbox.HasProcessedAsync("TestConsumer", messageId);
            Assert.IsFalse(firstCheck);
    
            await inbox.MarkProcessedAsync("TestConsumer", messageId, DateTime.UtcNow);
    
            // Act - Duplicate delivery
            var duplicateCheck = await inbox.HasProcessedAsync("TestConsumer", messageId);
            Assert.IsTrue(duplicateCheck);
        }
    }
    

  • End-to-End Idempotency Test

    [TestMethod]
    public async Task Should_HandleDuplicateRequests_Idempotently()
    {
        // Arrange
        var client = CreateTestClient();
        var idempotencyKey = $"test-tenant:test-app:{Ulid.NewUlid()}";
        var request = new { eventType = "test.event", data = "test" };
    
        // Act - First request
        var response1 = await client.PostAsJsonAsync("/api/v1/events", request, headers =>
        {
            headers.Add("Idempotency-Key", idempotencyKey);
        });
    
        var result1 = await response1.Content.ReadFromJsonAsync<CreateEventResponse>();
    
        // Act - Duplicate request (same key, same payload)
        var response2 = await client.PostAsJsonAsync("/api/v1/events", request, headers =>
        {
            headers.Add("Idempotency-Key", idempotencyKey);
        });
    
        var result2 = await response2.Content.ReadFromJsonAsync<CreateEventResponse>();
    
        // Assert
        Assert.AreEqual(201, (int)response1.StatusCode);
        Assert.AreEqual(200, (int)response2.StatusCode);  // Duplicate returns 200
        Assert.AreEqual(result1.AuditRecordId, result2.AuditRecordId);  // Same resource
    
        // Verify only one event persisted
        var events = await _repository.GetAllAsync();
        Assert.AreEqual(1, events.Count);
    }
    

Code Examples: - Complete test suites (outbox, inbox, idempotency gate) - Duplicate detection tests - Conflict detection tests - End-to-end idempotency tests

Diagrams: - Test architecture - Test data flow

Deliverables: - Complete test suite - Test helpers - Testing guide


Topic 28: Chaos Testing for Idempotency

What will be covered: - Fault Injection Tests

[TestMethod]
public async Task Should_HandleDatabaseFailure_DuringOutboxSave()
{
    // Arrange
    var dbMock = CreateFailingDatabase();  // Simulates transient failure
    var service = new AuditService(dbMock, ...);

    // Act & Assert - First attempt fails
    await Assert.ThrowsExceptionAsync<DbException>(async () =>
    {
        await service.AppendAuditEventAsync(auditEvent);
    });

    // Verify: No event persisted, no event published
    Assert.AreEqual(0, await _repository.CountAsync());
    Assert.AreEqual(0, await _outbox.GetUnpublishedCountAsync());

    // Act - Retry with working database
    dbMock.EnableSuccess();
    var recordId = await service.AppendAuditEventAsync(auditEvent);

    // Assert - Event saved, outbox contains event
    Assert.IsNotNull(recordId);
    Assert.AreEqual(1, await _repository.CountAsync());
    Assert.AreEqual(1, await _outbox.GetUnpublishedCountAsync());
}

[TestMethod]
public async Task Should_HandleMessageRedelivery()
{
    // Simulate message delivered twice
    var message = CreateTestMessage();

    // First delivery
    await _consumer.Consume(CreateContext(message));
    var count1 = await _repository.CountAsync();

    // Duplicate delivery (simulated redelivery)
    await _consumer.Consume(CreateContext(message));
    var count2 = await _repository.CountAsync();

    // Assert: Processed only once
    Assert.AreEqual(1, count1);
    Assert.AreEqual(1, count2);
}

Code Examples: - Fault injection tests - Redelivery simulation - Chaos scenarios (network failures, crashes)

Diagrams: - Chaos test scenarios

Deliverables: - Chaos test suite - Fault injection utilities - Resilience validation


CYCLE 15: Monitoring & Observability (~3,000 lines)

Topic 29: Outbox/Inbox Metrics

What will be covered: - Outbox Metrics

public class OutboxMetrics
{
    // Outbox backlog (unpublished events)
    [Counter("outbox_backlog_total", "Unpublished events in outbox")]
    public void RecordBacklog(int count, string tenantId);

    // Outbox publish rate
    [Counter("outbox_published_total", "Events published from outbox")]
    public void RecordPublished(string eventType, string tenantId);

    // Outbox publish latency
    [Histogram("outbox_publish_duration_ms", "Time from event creation to publish")]
    public void RecordPublishLatency(double durationMs, string eventType);

    // Outbox publish failures
    [Counter("outbox_publish_failed_total", "Failed outbox publishes")]
    public void RecordPublishFailure(string eventType, string error);
}

  • Inbox Metrics

    public class InboxMetrics
    {
        // Duplicate detection rate
        [Counter("inbox_duplicates_total", "Duplicate messages detected")]
        public void RecordDuplicate(string consumerName, string tenantId);
    
        // Inbox check latency
        [Histogram("inbox_check_duration_ms", "Time to check inbox for duplicates")]
        public void RecordCheckLatency(double durationMs);
    
        // Inbox size
        [Gauge("inbox_entries_total", "Total inbox entries")]
        public void RecordInboxSize(long count);
    }
    

  • Idempotency Gate Metrics

    public class IdempotencyMetrics
    {
        // Duplicate requests
        [Counter("idempotency_duplicates_total", "Duplicate HTTP requests")]
        public void RecordDuplicate(string endpoint, string tenantId);
    
        // Conflicts
        [Counter("idempotency_conflicts_total", "Idempotency key conflicts")]
        public void RecordConflict(string endpoint, string tenantId);
    
        // Cache hit rate
        [Counter("idempotency_cache_hits_total", "Idempotency cache hits")]
        public void RecordCacheHit();
    
        [Counter("idempotency_cache_misses_total", "Idempotency cache misses")]
        public void RecordCacheMiss();
    }
    

Code Examples: - Complete metrics collection (all patterns) - Prometheus exporters - Application Insights integration

Diagrams: - Metrics architecture - Dashboard layout

Deliverables: - Metrics collection library - Dashboard templates - Query library


Topic 30: Application Insights Queries

What will be covered: - Outbox Monitoring Queries

// Outbox backlog over time
customMetrics
| where name == "outbox_backlog_total"
| extend TenantId = tostring(customDimensions.TenantId)
| summarize AvgBacklog = avg(value), MaxBacklog = max(value) by bin(timestamp, 5m), TenantId
| order by timestamp desc

// Outbox publish latency (P50, P95, P99)
customMetrics
| where name == "outbox_publish_duration_ms"
| extend EventType = tostring(customDimensions.EventType)
| summarize 
    P50 = percentile(value, 50),
    P95 = percentile(value, 95),
    P99 = percentile(value, 99)
    by EventType

// Outbox publish failures
customMetrics
| where name == "outbox_publish_failed_total"
| extend EventType = tostring(customDimensions.EventType)
| extend Error = tostring(customDimensions.Error)
| summarize FailureCount = sum(value) by EventType, Error
| order by FailureCount desc

  • Inbox Monitoring Queries
    // Duplicate detection rate
    customMetrics
    | where name == "inbox_duplicates_total"
    | extend ConsumerName = tostring(customDimensions.ConsumerName)
    | summarize DuplicateCount = sum(value) by bin(timestamp, 5m), ConsumerName
    | order by timestamp desc
    
    // Inbox deduplication effectiveness
    let total = customMetrics | where name == "inbox_messages_processed_total" | summarize sum(value);
    let duplicates = customMetrics | where name == "inbox_duplicates_total" | summarize sum(value);
    print DuplicatePercentage = (duplicates * 100.0) / total
    

Code Examples: - Complete query library (Kusto) - Alert rules (Azure Monitor) - Dashboard JSON

Diagrams: - Dashboard layouts (mockups)

Deliverables: - Query library (complete) - Alert rules - Dashboard templates


CYCLE 16: Best Practices & Troubleshooting (~3,000 lines)

Topic 31: Best Practices

What will be covered: - Outbox Best Practices - ✅ Always use outbox for critical events - ✅ Batch publish from outbox (performance) - ✅ Monitor outbox backlog (alert if >1000) - ✅ Cleanup published events regularly (7-day retention) - ✅ Use partition keys (TenantId) for ordering - ✅ Set max retry attempts (5-10) - ✅ Move to DLQ after max attempts

  • Inbox Best Practices
  • ✅ Always check inbox before processing
  • ✅ Record inbox entry in same transaction as processing
  • ✅ Use Redis for performance (SQL for durability)
  • ✅ Set appropriate TTL (7-30 days)
  • ✅ Cleanup expired entries daily
  • ✅ Monitor duplicate detection rate

  • Idempotency Best Practices

  • ✅ Use client-generated keys (ULIDs)
  • ✅ Require Idempotency-Key header for mutations
  • ✅ Cache idempotency results (24-hour TTL)
  • ✅ Return cached result for exact duplicates
  • ✅ Return 409 for conflicts
  • ✅ Hash payloads for conflict detection
  • ✅ Exclude non-material fields from hash
  • ✅ Monitor conflict rate (alert if >1%)

  • Anti-Patterns to Avoid

  • ❌ Direct publish without outbox (dual-write)
  • ❌ No inbox deduplication (duplicate side effects)
  • ❌ Non-idempotent operations (x = x + 1)
  • ❌ No idempotency key validation
  • ❌ Infinite key retention (no TTL)
  • ❌ No conflict detection (silent overwrites)
  • ❌ No monitoring (can't detect issues)

Code Examples: - Best practice implementations - Anti-pattern examples - Code review checklist

Diagrams: - Best practices checklist

Deliverables: - Best practices handbook - Anti-pattern catalog - Code review checklist


Topic 32: Troubleshooting

What will be covered: - Common Problems - Problem: Outbox backlog growing - Cause: Relay worker not running, publish failures - Solution: Check relay worker health, review publish errors - Debug: Query outbox for unpublished events, check error messages

  • Problem: Duplicate messages still processed

    • Cause: Inbox not checked, transaction not committed
    • Solution: Verify inbox check before processing, ensure atomic transaction
    • Debug: Check inbox table, review consumer logs
  • Problem: 409 Conflict errors

    • Cause: Same idempotency key with different payload
    • Solution: Client bug (key reuse), investigate client logic
    • Debug: Compare payload hashes, review client code
  • Problem: Slow idempotency checks

    • Cause: Database bottleneck, no caching
    • Solution: Add Redis cache layer, index optimization
    • Debug: Query execution plans, add Redis

Code Examples: - Troubleshooting queries - Debug utilities - Common fixes

Diagrams: - Troubleshooting decision tree

Deliverables: - Troubleshooting guide - Debug tools - Common problems catalog


Summary of Deliverables

Across all 16 cycles, this documentation will provide:

  1. Fundamentals
  2. Dual-write problem and outbox solution
  3. At-least-once delivery and inbox solution
  4. Idempotency principles and patterns

  5. Outbox Implementation

  6. Table schema (SQL + MongoDB)
  7. Repository implementation
  8. Relay worker (background service)
  9. Event publishing with outbox

  10. Inbox Implementation

  11. Table schema (SQL + MongoDB + Redis)
  12. Repository implementation
  13. Deduplication logic
  14. Idempotent consumer patterns

  15. Idempotency Gates

  16. HTTP middleware (Idempotency-Key header)
  17. Idempotency table schema
  18. Conflict detection and resolution
  19. Response caching

  20. MassTransit Integration

  21. Built-in outbox/inbox configuration
  22. Entity Framework integration
  23. Custom outbox/inbox for NHibernate

  24. Key Management

  25. Key generation strategies (ULID, sequence, content-hash)
  26. ATP key format standard
  27. Key storage and hashing
  28. TTL policies

  29. Advanced Topics

  30. Distributed idempotency (cross-service)
  31. Saga idempotency
  32. Correlation-based deduplication

  33. Operations

  34. Testing strategies (unit, integration, E2E, chaos)
  35. Monitoring and metrics
  36. Application Insights queries
  37. Best practices and troubleshooting


This documentation plan covers complete implementation of transactional outbox, inbox deduplication, and idempotency patterns for ATP, from database schemas and repository implementations to MassTransit integration, relay workers, conflict detection, testing strategies, monitoring, and operational best practices, ensuring exactly-once processing semantics in a distributed, at-least-once delivery environment.