Skip to content

Messaging Implementation - Audit Trail Platform (ATP)

Event-driven messaging with MassTransit & Azure Service Bus — ATP's messaging layer implements publish/subscribe patterns, saga orchestration, transactional outbox, idempotent consumers, and dead-letter handling using MassTransit with Azure Service Bus for reliable, scalable, multi-tenant message-driven workflows.


📋 Documentation Generation Plan

This document will be generated in 16 cycles. Current progress:

Cycle Topics Estimated Lines Status
Cycle 1 Messaging Architecture & Event-Driven Design (1-2) ~3,000 ⏳ Not Started
Cycle 2 MassTransit Configuration & Setup (3-4) ~3,500 ⏳ Not Started
Cycle 3 Azure Service Bus Integration (5-6) ~3,000 ⏳ Not Started
Cycle 4 Message Contracts & Publishing (7-8) ~3,500 ⏳ Not Started
Cycle 5 Consumers & Message Handlers (9-10) ~3,000 ⏳ Not Started
Cycle 6 Saga State Machines (11-12) ~4,000 ⏳ Not Started
Cycle 7 Outbox Pattern Implementation (13-14) ~3,000 ⏳ Not Started
Cycle 8 Inbox Pattern & Idempotency (15-16) ~3,000 ⏳ Not Started
Cycle 9 Dead-Letter Queue (DLQ) Handling (17-18) ~2,500 ⏳ Not Started
Cycle 10 Retry Policies & Circuit Breakers (19-20) ~2,500 ⏳ Not Started
Cycle 11 Multi-Tenant Message Isolation (21-22) ~3,000 ⏳ Not Started
Cycle 12 Message Routing & Topology (23-24) ~2,500 ⏳ Not Started
Cycle 13 Observability & Monitoring (25-26) ~3,000 ⏳ Not Started
Cycle 14 Testing Strategies (27-28) ~2,500 ⏳ Not Started
Cycle 15 Performance & Scalability (29-30) ~2,500 ⏳ Not Started
Cycle 16 Operations & Best Practices (31-32) ~3,000 ⏳ Not Started

Total Estimated Lines: ~48,000


Purpose & Scope

This document defines ATP's messaging implementation using MassTransit as the messaging abstraction layer over Azure Service Bus, covering event-driven architecture, message contracts, consumer patterns, saga orchestration, transactional outbox/inbox, retry handling, dead-letter queues, and observability.

Key Technologies & Patterns

  • MassTransit 8.x: Modern .NET messaging framework with abstractions over transports
  • Azure Service Bus: Managed message broker (topics, queues, subscriptions)
  • Publish/Subscribe: Event-driven choreography pattern
  • Saga State Machines: Long-running process orchestration
  • Transactional Outbox: Atomic database + message publishing
  • Inbox Pattern: Idempotent message consumption with deduplication
  • DLQ (Dead-Letter Queue): Poison message handling
  • Correlation & Causation: Distributed tracing across message flows
  • ConnectSoft.Extensions.MessagingModel: Proven abstractions from microservice template

Detailed Cycle Plan

CYCLE 1: Messaging Architecture & Event-Driven Design (~3,000 lines)

Topic 1: ATP Messaging Architecture Overview

What will be covered: - Event-Driven Architecture (EDA) in ATP - Why event-driven? (decoupling, scalability, resilience) - Choreography vs. orchestration (ATP uses both) - ATP event-driven flows (ingestion → projection → integrity → export)

  • Messaging Patterns in ATP
  • Publish/Subscribe: Domain events (audit.accepted, projection.updated, policy.changed)
  • Command/Request-Response: Export requests, policy evaluations
  • Saga Orchestration: Long-running workflows (export packages, integrity verification)
  • Competing Consumers: Parallel message processing with load distribution

  • Message Types

  • Domain Events: Something that happened (immutable, past tense)
  • Integration Events: Cross-service notifications
  • Commands: Intent to perform action (imperative, present tense)
  • Requests: Query for information (request/response pattern)

  • ATP Message Taxonomy

    Events (Publish/Subscribe):
    - audit.appended               (Ingestion → Integrity, Projection)
    - audit.accepted               (Ingestion → Projection, Search)
    - projection.updated           (Projection → Query, Export)
    - integrity.verified           (Integrity → Export, Admin)
    - export.requested             (API → Export service)
    - export.completed             (Export → API, Webhooks)
    - policy.changed               (Admin → ALL services for cache bust)
    
    Commands (Direct/Queue):
    - RebuildProjection            (Admin → Projection worker)
    - ReplayFromDLQ                (Admin → DLQ processor)
    - SealSegment                  (Integrity scheduler → Integrity worker)
    

  • Guaranteed Delivery Semantics

  • At-Least-Once Delivery: Azure Service Bus guarantees
  • Exactly-Once Processing: Achieved via idempotency (inbox pattern)
  • Ordering: Within partition key only (tenantId or tenantId:sourceId)
  • Back-Pressure: Consumer concurrency limits, queue depth monitoring

Code Examples: - Event vs. Command interface markers - Message taxonomy diagram - Event-driven flow sequence diagrams

Diagrams: - ATP messaging architecture (services, queues, topics) - Event choreography (audit flow) - Saga orchestration (export workflow) - Message delivery guarantees

Deliverables: - Messaging architecture overview - EDA pattern catalog - Message type taxonomy - Delivery guarantees specification


Topic 2: MassTransit & Azure Service Bus Strategy

What will be covered: - Why MassTransit? - Transport abstraction (can swap Azure Service Bus for RabbitMQ, Amazon SQS) - Built-in retry policies, circuit breakers, rate limiting - Saga state machine support (Automatonymous) - Outbox pattern support - Request/response pattern - Message scheduling and delayed delivery - Comprehensive observability (OpenTelemetry, logs, metrics)

  • Why Azure Service Bus?
  • Managed Service: No infrastructure management
  • Enterprise Features: Topics/subscriptions, sessions, duplicate detection
  • Security: Managed Identity, RBAC, private endpoints
  • Reliability: Built-in DLQ, at-least-once delivery
  • Scalability: Partitioned entities, premium tier with high throughput
  • Compliance: SOC 2, ISO 27001, HIPAA-ready

  • Alternative Transports (for reference)

  • RabbitMQ (self-hosted, on-prem scenarios)
  • Amazon SQS/SNS (AWS deployments)
  • In-Memory (development, testing)

  • ATP Transport Decision

  • Primary: Azure Service Bus (Premium tier for production)
  • Dev/Test: In-Memory or Azure Service Bus (Standard tier)
  • Rationale: Native Azure integration, managed service, enterprise features

Code Examples: - MassTransit abstraction interfaces (IBus, IPublishEndpoint) - Azure Service Bus topic/queue/subscription model - Transport configuration comparison

Diagrams: - MassTransit abstraction layers - Azure Service Bus topology - Alternative transport comparison

Deliverables: - Technology selection rationale - Transport strategy document - Topology design principles


CYCLE 2: MassTransit Configuration & Setup (~3,500 lines)

Topic 3: MassTransit Registration & DI Setup

What will be covered: - AddMassTransit Extension Method

services.AddMassTransit(cfg =>
{
    // Set endpoint name formatter (kebab-case)
    cfg.SetKebabCaseEndpointNameFormatter();

    // Register consumers
    cfg.AddConsumer<AuditAcceptedEventConsumer>();
    cfg.AddConsumer<ProjectionUpdatedEventConsumer>();
    cfg.AddConsumer<ExportRequestedCommandConsumer>();

    // Register saga state machines
    cfg.AddSagaStateMachine<ExportWorkflowSaga, ExportWorkflowState>()
       .NHibernateRepository(repo =>
       {
           // Persist saga state to NHibernate
           repo.UseConnectionString(connectionString);
           repo.Dialect<MsSql2012Dialect>();
       });

    // Configure transport (Azure Service Bus)
    cfg.UsingAzureServiceBus((context, config) =>
    {
        config.Host(connectionString);

        // Configure receive endpoints
        config.ConfigureEndpoints(context);
    });
});

  • Endpoint Name Formatter
  • Kebab-case convention: audit-accepted-event (queue name)
  • Consistency across services
  • Readability in Azure Portal

  • Consumer Registration

  • Automatic consumer discovery from assembly
  • Manual consumer registration
  • Consumer definitions for custom configuration

  • Saga State Machine Registration

  • Saga persistence options (NHibernate, MongoDB, EF Core, Redis)
  • ATP uses NHibernate for saga state (same as domain)
  • Saga repository configuration

  • Transport Configuration

  • UsingAzureServiceBus method
  • Host connection string (from Key Vault)
  • Managed Identity authentication (production)
  • ConfigureEndpoints auto-wiring

Code Examples: - Complete MassTransit registration (ATP) - Consumer registration patterns - Saga state machine registration - Endpoint configuration

Diagrams: - MassTransit DI registration flow - Consumer/saga wiring - Endpoint auto-configuration

Deliverables: - MassTransit setup guide - Registration patterns - Configuration templates


Topic 4: Configuration Options & Environment-Specific Settings

What will be covered: - MassTransit Options Configuration

{
  "MassTransit": {
    "HostOptions": {
      "StartTimeout": "00:01:00",
      "StopTimeout": "00:01:00"
    },
    "AzureServiceBusTransport": {
      "AzureServiceBusHost": {
        "ConnectionString": "Endpoint=sb://...",
        "FullyQualifiedNamespace": "atp-prod.servicebus.windows.net",
        "UseManagedIdentity": true,
        "ClientId": "",
        "UseWebSockets": false,
        "OperationTimeoutSeconds": 60
      },
      "AzureServiceBusReceiveEndpoint": {
        "PrefetchCount": 32,
        "ConcurrentMessageLimit": 64
      },
      "RetryLimit": 5,
      "RetryMinBackoffSeconds": 1,
      "RetryMaxBackoffSeconds": 30
    }
  }
}

  • Environment-Specific Configuration
  • Development: In-Memory transport or Azure Service Bus (Standard)
  • Docker: Service Bus emulator or shared dev namespace
  • Test: In-Memory with simulated delays
  • Production: Azure Service Bus Premium with Managed Identity

  • Connection String Management

  • Stored in Azure Key Vault (production)
  • Retrieved via Managed Identity
  • No connection strings in code or config files
  • Rotation procedures

  • Managed Identity Authentication

    config.Host(fullyQualifiedNamespace, host =>
    {
        host.TokenCredential = new DefaultAzureCredential();
        // Or User-Assigned Managed Identity
        host.TokenCredential = new ManagedIdentityCredential(clientId);
    });
    

  • Performance Tuning Options

  • PrefetchCount: Messages fetched ahead (32-256)
  • ConcurrentMessageLimit: Parallel processing (64-512)
  • MaxAutoLockRenewalDuration: Long-running message processing
  • MaxConcurrentCalls: Per-consumer concurrency

Code Examples: - Complete appsettings.json (MassTransit section) - Environment-specific configurations - Managed Identity configuration - Performance tuning settings - Key Vault connection string retrieval

Diagrams: - Configuration hierarchy (env-specific) - Managed Identity authentication flow - Performance tuning impact

Deliverables: - Configuration reference - Environment setup guide - Performance tuning playbook


CYCLE 3: Azure Service Bus Integration (~3,000 lines)

Topic 5: Azure Service Bus Topology Design

What will be covered: - ATP Azure Service Bus Namespace - Premium Tier (production): 1-8 messaging units, dedicated capacity - Standard Tier (dev/test): Shared capacity, cost-effective - Namespace: sb-atp-prod-eastus2.servicebus.windows.net - Private endpoint for VNet isolation - Managed Identity RBAC for access control

  • Topics & Subscriptions

    Topics (Events - Publish/Subscribe):
    - atp.audit.v1                (audit.appended, audit.accepted)
    - atp.projection.v1           (projection.updated)
    - atp.integrity.v1            (integrity.verified)
    - atp.export.v1               (export.requested, export.completed)
    - atp.policy.v1               (policy.changed)
    
    Subscriptions per Topic:
    - atp.audit.v1:
        - ingestion-service       (for processing appended events)
        - projection-service      (for building read models)
        - integrity-service       (for hash chain computation)
    
    - atp.projection.v1:
        - query-service           (for read model updates)
        - export-service          (for package generation)
    
    - atp.policy.v1:
        - ALL-services            (broadcast policy changes)
    

  • Queues (Commands/Requests)

    Queues (Direct Messaging):
    - atp.rebuild-projection      (admin commands)
    - atp.replay-dlq              (DLQ replay requests)
    - atp.seal-segment            (scheduled integrity sealing)
    - atp.export-package          (export command queue)
    

  • Dead-Letter Queues (DLQ)

  • Automatic DLQ per subscription: {subscription}/$DeadLetterQueue
  • Automatic DLQ per queue: {queue}/$DeadLetterQueue
  • DLQ monitoring and replay procedures

  • Message Sessions

  • Ordered message processing per session
  • Session ID = tenantId (for strict ordering per tenant)
  • Use case: Policy updates must be processed in order

  • Duplicate Detection

  • Enable duplicate detection window (1-10 minutes)
  • MessageId-based deduplication
  • Use ULID as MessageId for natural ordering + uniqueness

  • Partitioning

  • Enable partitioning for higher throughput
  • Partition key = tenantId or tenantId:sourceId
  • Load distribution across multiple brokers

Code Examples: - Topic/subscription creation (Pulumi) - Queue configuration with sessions - Duplicate detection setup - Partitioning configuration - DLQ monitoring query

Diagrams: - ATP Azure Service Bus topology - Topic/subscription model - Partitioning strategy - DLQ architecture

Deliverables: - Topology design specification - Provisioning scripts (Pulumi) - Naming conventions - Partitioning strategy


Topic 6: Azure Service Bus Security & Access Control

What will be covered: - Managed Identity Authentication - System-assigned Managed Identity per service - User-assigned Managed Identity for shared access - No Shared Access Signatures (SAS) in application code

  • RBAC Roles
  • Azure Service Bus Data Owner: Full access (admin only)
  • Azure Service Bus Data Sender: Publish messages (producers)
  • Azure Service Bus Data Receiver: Consume messages (consumers)
  • Least privilege: Each service gets only required roles

  • ATP RBAC Assignments

    Service                  | Role                          | Scope
    -------------------------|-------------------------------|----------------------------
    Ingestion                | Data Sender                   | Topic: atp.audit.v1
    Projection               | Data Receiver                 | Subscription: projection-service
    Projection               | Data Sender                   | Topic: atp.projection.v1
    Integrity                | Data Receiver                 | Subscription: integrity-service
    Integrity                | Data Sender                   | Topic: atp.integrity.v1
    Export                   | Data Receiver                 | Queue: atp.export-package
    Export                   | Data Sender                   | Topic: atp.export.v1
    Admin                    | Data Sender                   | All queues (commands)
    

  • Private Endpoints

  • Service Bus accessible only via VNet
  • Private endpoint in ATP VNet
  • DNS resolution via Private DNS Zone
  • No public internet access

  • Network Security

  • Firewall rules (allow only trusted IPs for management)
  • VNet service endpoints (fallback if private endpoint not available)
  • TLS 1.2+ enforcement

  • Audit Logging

  • Azure Monitor Diagnostic Settings
  • Log all administrative operations
  • Log authentication failures
  • Log message delivery failures

Code Examples: - Managed Identity configuration in MassTransit - RBAC role assignment (Azure CLI/Pulumi) - Private endpoint setup (Pulumi) - Diagnostic settings configuration - Audit log query (Kusto/KQL)

Diagrams: - Managed Identity authentication flow - RBAC role assignments - Private endpoint network topology - Audit logging architecture

Deliverables: - Security configuration guide - RBAC assignment matrix - Network security setup - Audit logging queries


CYCLE 4: Message Contracts & Publishing (~3,500 lines)

Topic 7: Message Contract Design

What will be covered: - Message Contract Principles - Immutable: Once published, never change schema version - Versioned: SchemaVersion field for evolution - Self-Contained: All info needed to process (no hidden dependencies) - Documented: XML comments, AsyncAPI specs

  • ATP Message Envelope

    public interface IMessageEnvelope
    {
        string EventId { get; }           // ULID (unique, time-ordered)
        string EventType { get; }         // e.g., "audit.accepted"
        string SchemaVersion { get; }     // e.g., "1.2.0"
        DateTime OccurredAt { get; }      // UTC timestamp
        string TenantId { get; }          // Tenant context
        string Edition { get; }           // Edition context
        CorrelationContext Correlation { get; } // Tracing
        string IdempotencyKey { get; }    // Deduplication
    }
    
    public class CorrelationContext
    {
        public string TraceId { get; set; }     // W3C trace ID (hex32)
        public string SpanId { get; set; }      // W3C span ID (hex16)
        public string CorrelationId { get; set; } // Request ID
        public string CausationId { get; set; }   // Parent event ID
    }
    

  • ATP Domain Event Example

    public class AuditRecordAcceptedEvent : IEvent
    {
        // Envelope properties
        public string EventId { get; set; }
        public string EventType => "audit.accepted";
        public string SchemaVersion => "1.0.0";
        public DateTime OccurredAt { get; set; }
        public string TenantId { get; set; }
        public string Edition { get; set; }
        public CorrelationContext Correlation { get; set; }
        public string IdempotencyKey { get; set; }
    
        // Payload (event-specific data)
        public string AuditRecordId { get; set; }
        public string Action { get; set; }
        public string ResourceType { get; set; }
        public string ResourceId { get; set; }
        public string ActorId { get; set; }
        public DataClassification Classification { get; set; }
        public int PolicyVersion { get; set; }
        public int PayloadBytes { get; set; }
    }
    

  • Message Contract Validation

  • FluentValidation rules
  • Required fields enforced
  • String length limits
  • Pattern validation (ULID format, etc.)
  • MassTransit validation pipeline

  • Schema Evolution Strategy

  • Additive changes: Add optional fields (bump minor version)
  • Breaking changes: New event type or major version bump
  • Dual publishing: Publish both old and new versions during migration
  • Consumer compatibility: Support N-1 schema versions

Code Examples: - IMessageEnvelope interface - AuditRecordAcceptedEvent (complete) - ProjectionUpdatedEvent - ExportRequestedCommand - IntegrityVerifiedEvent - PolicyChangedEvent - FluentValidation rules for message - Schema versioning example

Diagrams: - Message envelope structure - Event inheritance hierarchy - Schema evolution timeline

Deliverables: - Message contract specifications - All ATP event/command definitions - Validation rules - Evolution strategy guide


Topic 8: Message Publishing Patterns

What will be covered: - Publishing via IPublishEndpoint

public class IngestionService
{
    private readonly IPublishEndpoint _publishEndpoint;

    public async Task PublishAuditAcceptedAsync(
        AuditRecord auditRecord, 
        CancellationToken ct)
    {
        var @event = new AuditRecordAcceptedEvent
        {
            EventId = Ulid.NewUlid().ToString(),
            OccurredAt = DateTime.UtcNow,
            TenantId = auditRecord.TenantId,
            AuditRecordId = auditRecord.AuditRecordId,
            Action = auditRecord.Action,
            // ... other properties
        };

        await _publishEndpoint.Publish(@event, ctx =>
        {
            // Set message headers
            ctx.MessageId = Guid.Parse(@event.EventId);
            ctx.CorrelationId = Guid.Parse(auditRecord.CorrelationId);
            ctx.Headers.Set("x-tenant-id", auditRecord.TenantId);
            ctx.Headers.Set("x-schema-version", @event.SchemaVersion);

            // Set partition key (Azure Service Bus)
            ctx.SetPartitionKey(auditRecord.TenantId);

            // Set time-to-live (optional)
            ctx.TimeToLive = TimeSpan.FromDays(7);
        }, ct);
    }
}

  • Publishing via IBus (for scheduled/delayed messages)

    await bus.SchedulePublish(
        DateTime.UtcNow.AddMinutes(5),
        new SealSegmentCommand { SegmentId = segmentId },
        ct);
    

  • Publishing from Outbox (transactional)

    // 1. Write audit record + outbox entry (same transaction)
    unitOfWork.ExecuteTransactional(() =>
    {
        auditRecordRepository.Insert(auditRecord);
        outboxRepository.Insert(new OutboxMessage
        {
            EventType = "audit.accepted",
            EventPayload = JsonSerializer.Serialize(@event),
            // ...
        });
    });
    
    // 2. Background worker publishes from outbox
    // (Covered in Outbox Pattern cycle)
    

  • Correlation Context Propagation

  • Automatically propagate trace/correlation IDs
  • MassTransit built-in support for correlation
  • Custom headers for ATP-specific context

  • Message Metadata

  • MessageId: Unique identifier (ULID)
  • CorrelationId: Request correlation
  • ConversationId: Multi-message conversation tracking
  • InitiatorId: Saga that initiated message
  • TimeToLive: Message expiration
  • ContentType: application/json

  • Publish Topology Configuration

  • MassTransit determines topic from message type
  • Override with Publish(configurator => ...)
  • Fanout to multiple subscriptions automatically

Code Examples: - Publishing with IPublishEndpoint (complete) - Scheduled message publishing - Publishing with custom headers - Correlation context propagation - Publish topology configuration

Diagrams: - Publishing flow (service → MassTransit → Service Bus) - Correlation context propagation - Publish topology

Deliverables: - Publishing pattern guide - Correlation strategy - Metadata specification


CYCLE 5: Consumers & Message Handlers (~3,000 lines)

Topic 9: Consumer Implementation

What will be covered: - IConsumer Interface

public class AuditAcceptedEventConsumer : IConsumer<AuditRecordAcceptedEvent>
{
    private readonly IAuditEventProjector _projector;
    private readonly ILogger<AuditAcceptedEventConsumer> _logger;

    public async Task Consume(ConsumeContext<AuditRecordAcceptedEvent> context)
    {
        var @event = context.Message;

        _logger.LogInformation(
            "Processing AuditRecordAcceptedEvent: {AuditRecordId} for tenant {TenantId}",
            @event.AuditRecordId,
            @event.TenantId);

        try
        {
            // Project event to read model (idempotent)
            await _projector.ProjectAsync(@event, context.CancellationToken);

            // Message automatically acknowledged on success
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, 
                "Failed to process AuditRecordAcceptedEvent: {AuditRecordId}",
                @event.AuditRecordId);

            // Exception causes retry or DLQ (based on retry policy)
            throw;
        }
    }
}

  • Consumer Registration

    cfg.AddConsumer<AuditAcceptedEventConsumer>(consumerCfg =>
    {
        // Consumer-specific configuration
        consumerCfg.UseConcurrentMessageLimit(64);
        consumerCfg.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));
    });
    

  • Consumer Definitions (for custom endpoint configuration)

    public class AuditAcceptedEventConsumerDefinition 
        : ConsumerDefinition<AuditAcceptedEventConsumer>
    {
        protected override void ConfigureConsumer(
            IReceiveEndpointConfigurator endpointConfigurator,
            IConsumerConfigurator<AuditAcceptedEventConsumer> consumerConfigurator)
        {
            endpointConfigurator.PrefetchCount = 32;
            endpointConfigurator.ConcurrentMessageLimit = 64;
    
            // Partition by tenant for ordered processing
            endpointConfigurator.UsePartitioner<AuditRecordAcceptedEvent>(
                64, // partition count
                p => p.Message.TenantId.GetHashCode());
        }
    }
    

  • Idempotent Consumer Pattern

    public async Task Consume(ConsumeContext<AuditRecordAcceptedEvent> context)
    {
        var @event = context.Message;
    
        // Check if already processed (inbox pattern)
        var alreadyProcessed = await _inboxRepository.ExistsAsync(
            @event.EventId, 
            context.CancellationToken);
    
        if (alreadyProcessed)
        {
            _logger.LogInformation("Event {EventId} already processed, skipping",
                @event.EventId);
            return; // Acknowledge without reprocessing
        }
    
        // Process the event
        await _projector.ProjectAsync(@event, context.CancellationToken);
    
        // Record in inbox (deduplication)
        await _inboxRepository.RecordAsync(
            @event.EventId,
            @event.EventType,
            DateTime.UtcNow,
            context.CancellationToken);
    }
    

  • Consumer Concurrency

  • ConcurrentMessageLimit: Max parallel messages per consumer instance
  • PrefetchCount: Messages fetched ahead from broker
  • Partitioning: Distribute messages across consumer instances

Code Examples: - Complete consumer implementations: - AuditAcceptedEventConsumer (Projection service) - ProjectionUpdatedEventConsumer (Query service) - ExportRequestedConsumer (Export service) - PolicyChangedEventConsumer (All services) - Consumer definition with custom config - Idempotent consumer with inbox check - Partitioned consumer

Diagrams: - Consumer processing flow - Idempotent consumer pattern - Partitioned consumer architecture

Deliverables: - Consumer implementation guide - All ATP consumer implementations - Idempotency patterns - Concurrency tuning guide


Topic 10: Message Handler Patterns

What will be covered: - Handler Composition - Separate handlers for different responsibilities - Projector: Update read models - Notifier: Send notifications - Validator: Pre-processing validation

  • Error Handling in Consumers
  • Try/catch with logging
  • Throw exceptions to trigger retry
  • Return successfully for poison messages (after logging)
  • Custom fault handling

  • Consume Filters (Middleware)

    public class TenantContextFilter<T> : IFilter<ConsumeContext<T>>
        where T : class
    {
        public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
        {
            // Extract tenant context from message
            if (context.Message is IMessageEnvelope envelope)
            {
                // Set tenant context for downstream processing
                TenantContext.Current = new TenantContext
                {
                    TenantId = envelope.TenantId,
                    Edition = envelope.Edition
                };
            }
    
            await next.Send(context);
    
            // Cleanup
            TenantContext.Current = null;
        }
    
        public void Probe(ProbeContext context) { }
    }
    
    // Usage:
    cfg.UseConsumeFilter(typeof(TenantContextFilter<>), context);
    

  • Scoped Consume Filter (logging, metrics)

    public class LoggingFilter<T> : IFilter<ConsumeContext<T>>
        where T : class
    {
        private readonly ILogger _logger;
    
        public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
        {
            var stopwatch = Stopwatch.StartNew();
    
            _logger.LogInformation("Consuming message {MessageType}: {MessageId}",
                typeof(T).Name, context.MessageId);
    
            try
            {
                await next.Send(context);
    
                stopwatch.Stop();
                _logger.LogInformation("Consumed message {MessageType} in {Duration}ms",
                    typeof(T).Name, stopwatch.ElapsedMilliseconds);
            }
            catch (Exception ex)
            {
                stopwatch.Stop();
                _logger.LogError(ex, "Failed to consume message {MessageType} after {Duration}ms",
                    typeof(T).Name, stopwatch.ElapsedMilliseconds);
                throw;
            }
        }
    }
    

  • Tenant Isolation in Consumers

  • Validate tenantId from message matches expected tenant
  • Reject cross-tenant messages
  • Apply tenant-scoped repository filters

Code Examples: - Handler composition pattern - Error handling strategies - Tenant context filter (complete) - Logging filter (complete) - Metrics filter - Tenant isolation validation

Diagrams: - Handler composition - Consume filter pipeline - Tenant context propagation

Deliverables: - Handler pattern catalog - Filter implementations - Tenant isolation guide


CYCLE 6: Saga State Machines (~4,000 lines)

Topic 11: Saga Pattern Fundamentals

What will be covered: - What is a Saga? - Long-running process (hours, days, weeks) - Coordinates multiple services/steps - Maintains state between steps - Compensation logic for failures (distributed transactions)

  • Saga vs. Choreography
  • Choreography: Services react to events (loosely coupled)
  • Saga (Orchestration): Central coordinator directs workflow
  • ATP uses both: Choreography for simple flows, Saga for complex workflows

  • MassTransit Saga State Machines

  • Built on Automatonymous library
  • Define states and events
  • Define transitions between states
  • Persisted state (survive service restarts)

  • ATP Saga Use Cases

    1. Export Workflow Saga
       - Export requested → Package created → Segments retrieved → 
         Package signed → Manifest generated → Export completed → Notification sent
    
    2. Integrity Sealing Saga
       - Segment accumulated → Seal triggered → Hash computed → 
         Merkle tree built → Signature generated → Block sealed → Verification published
    
    3. Tenant Onboarding Saga (future)
       - Tenant created → Resources provisioned → Policies initialized → 
         Admin notified → Onboarding completed
    

  • Saga State

  • CorrelationId: Unique saga instance identifier (Guid or string)
  • CurrentState: State machine current state (string)
  • Saga-specific data: Accumulates data across steps
  • Version: Optimistic concurrency control

  • Saga Events

  • Initiating events: Start the saga
  • Transition events: Move between states
  • Fault events: Handle failures
  • Completed events: Mark saga as complete

Code Examples: - Saga pattern comparison (choreography vs. orchestration) - Saga state interface - Saga event definition - Simple saga state machine outline

Diagrams: - Saga orchestration flow - Saga state machine diagram - Choreography vs. saga comparison

Deliverables: - Saga pattern overview - ATP saga use case catalog - State machine fundamentals


Topic 12: Saga State Machine Implementation

What will be covered: - Export Workflow Saga (Complete Example)

public class ExportWorkflowSaga : MassTransitStateMachine<ExportWorkflowState>
{
    public ExportWorkflowSaga()
    {
        // Define instance state
        InstanceState(x => x.CurrentState);

        // Define events
        Event(() => ExportRequested, x =>
        {
            x.CorrelateById(ctx => ctx.Message.ExportJobId);
            x.InsertOnInitial = true;
            x.SetSagaFactory(ctx => new ExportWorkflowState
            {
                CorrelationId = ctx.Message.ExportJobId,
                TenantId = ctx.Message.TenantId,
                // ...
            });
        });

        Event(() => PackageCreated, x => x.CorrelateById(ctx => ctx.Message.ExportJobId));
        Event(() => PackageSigned, x => x.CorrelateById(ctx => ctx.Message.ExportJobId));
        Event(() => ExportFailed, x => x.CorrelateById(ctx => ctx.Message.ExportJobId));

        // Define behavior
        Initially(
            When(ExportRequested)
                .Then(ctx =>
                {
                    ctx.Saga.TenantId = ctx.Message.TenantId;
                    ctx.Saga.StartedAt = DateTime.UtcNow;
                    // ... initialize saga state
                })
                .Publish(ctx => new CreateExportPackageCommand
                {
                    ExportJobId = ctx.Saga.CorrelationId,
                    TenantId = ctx.Saga.TenantId,
                    // ...
                })
                .TransitionTo(CreatingPackage));

        During(CreatingPackage,
            When(PackageCreated)
                .Then(ctx =>
                {
                    ctx.Saga.PackageUrl = ctx.Message.PackageUrl;
                    ctx.Saga.RecordCount = ctx.Message.RecordCount;
                })
                .Publish(ctx => new SignExportPackageCommand
                {
                    ExportJobId = ctx.Saga.CorrelationId,
                    PackageUrl = ctx.Saga.PackageUrl,
                })
                .TransitionTo(SigningPackage));

        During(SigningPackage,
            When(PackageSigned)
                .Then(ctx =>
                {
                    ctx.Saga.SignatureUrl = ctx.Message.SignatureUrl;
                    ctx.Saga.CompletedAt = DateTime.UtcNow;
                })
                .Publish(ctx => new ExportCompletedEvent
                {
                    ExportJobId = ctx.Saga.CorrelationId,
                    TenantId = ctx.Saga.TenantId,
                    PackageUrl = ctx.Saga.PackageUrl,
                    SignatureUrl = ctx.Saga.SignatureUrl,
                })
                .Finalize());

        // Fault handling
        DuringAny(
            When(ExportFailed)
                .Then(ctx =>
                {
                    ctx.Saga.ErrorMessage = ctx.Message.ErrorMessage;
                    ctx.Saga.FailedAt = DateTime.UtcNow;
                })
                .Publish(ctx => new ExportFailedEvent { /* ... */ })
                .TransitionTo(Failed));

        SetCompletedWhenFinalized();
    }

    public State CreatingPackage { get; private set; }
    public State SigningPackage { get; private set; }
    public State Failed { get; private set; }

    public Event<ExportRequestedEvent> ExportRequested { get; private set; }
    public Event<PackageCreatedEvent> PackageCreated { get; private set; }
    public Event<PackageSignedEvent> PackageSigned { get; private set; }
    public Event<ExportFailedEvent> ExportFailed { get; private set; }
}

public class ExportWorkflowState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
    public int Version { get; set; }

    public string TenantId { get; set; }
    public string ExportFilter { get; set; }
    public string PackageUrl { get; set; }
    public string SignatureUrl { get; set; }
    public int RecordCount { get; set; }
    public DateTime StartedAt { get; set; }
    public DateTime? CompletedAt { get; set; }
    public DateTime? FailedAt { get; set; }
    public string ErrorMessage { get; set; }
}

  • Saga State Persistence
  • NHibernate repository (ATP default)
  • Saga state entity mapping (FluentNHibernate)
  • Optimistic concurrency (Version column)
  • Indexed on CorrelationId

  • Saga Compensation

  • DuringAny + fault events
  • Compensating transactions
  • Rollback/undo logic
  • Error notifications

  • Saga Timeout Handling

  • Schedule for timeouts
  • RequestTimeout pattern
  • Timeout events trigger compensation

Code Examples: - Complete ExportWorkflowSaga (above) - IntegritySealingSaga - Saga state entity mapping - Compensation logic example - Timeout handling example

Diagrams: - Export workflow saga state machine - Saga compensation flow - Saga persistence architecture

Deliverables: - Saga implementation guide - ATP saga implementations - State persistence setup - Compensation patterns


CYCLE 7: Outbox Pattern Implementation (~3,000 lines)

Topic 13: Transactional Outbox Pattern

What will be covered: - Why Outbox Pattern? - Atomic database write + message publish - Prevents dual-write problem (DB succeeds, message fails) - Guarantees at-least-once message delivery - Decouples message publishing from business transaction

  • Outbox Table Schema

    CREATE TABLE ConnectSoft.Audit.Outbox (
        OutboxId CHAR(26) PRIMARY KEY,         -- ULID
        TenantId NVARCHAR(128) NOT NULL,
        EventType NVARCHAR(255) NOT NULL,      -- Message type name
        EventPayload NVARCHAR(MAX) NOT NULL,   -- JSON serialized message
        CreatedAt DATETIME2(3) NOT NULL,
        ProcessedAt DATETIME2(3) NULL,
        Status TINYINT NOT NULL,                -- 0=Pending, 1=Processed, 2=Failed
        RetryCount INT NOT NULL DEFAULT 0,
        NextRetryAt DATETIME2(3) NULL,
        ErrorMessage NVARCHAR(MAX) NULL,
        CorrelationId CHAR(26) NULL,            -- For tracing
        INDEX IX_Outbox_Status_CreatedAt (Status, CreatedAt),
        INDEX IX_Outbox_Status_NextRetryAt (Status, NextRetryAt) WHERE Status = 2
    );
    

  • Atomic Write + Outbox Insert

    public async Task<AuditRecord> AppendAuditRecordAsync(
        AppendCommand command, 
        CancellationToken ct)
    {
        AuditRecord auditRecord = null;
        AuditRecordAcceptedEvent @event = null;
    
        await _unitOfWork.ExecuteTransactionalAsync(async () =>
        {
            // 1. Business logic: Create audit record
            auditRecord = AuditRecord.Create(command);
            await _auditRecordRepository.InsertAsync(auditRecord, ct);
    
            // 2. Create domain event
            @event = new AuditRecordAcceptedEvent
            {
                EventId = Ulid.NewUlid().ToString(),
                AuditRecordId = auditRecord.AuditRecordId,
                TenantId = auditRecord.TenantId,
                // ... other properties
            };
    
            // 3. Insert outbox entry (same transaction)
            var outboxMessage = new OutboxMessage
            {
                OutboxId = @event.EventId,
                TenantId = @event.TenantId,
                EventType = @event.EventType,
                EventPayload = JsonSerializer.Serialize(@event),
                CreatedAt = DateTime.UtcNow,
                Status = OutboxStatus.Pending,
                CorrelationId = auditRecord.CorrelationId
            };
            await _outboxRepository.InsertAsync(outboxMessage, ct);
    
        }, ct);
    
        return auditRecord;
        // Note: Message not yet published, but guaranteed to be published
        // by outbox processor background worker
    }
    

  • Outbox Processor Background Worker

    public class OutboxProcessor : BackgroundService
    {
        private readonly IServiceProvider _serviceProvider;
        private readonly ILogger<OutboxProcessor> _logger;
    
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    await ProcessPendingMessagesAsync(stoppingToken);
                    await ProcessRetryMessagesAsync(stoppingToken);
    
                    // Poll every 5 seconds
                    await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Error in outbox processor");
                }
            }
        }
    
        private async Task ProcessPendingMessagesAsync(CancellationToken ct)
        {
            using var scope = _serviceProvider.CreateScope();
            var outboxRepository = scope.ServiceProvider.GetRequiredService<IOutboxRepository>();
            var publishEndpoint = scope.ServiceProvider.GetRequiredService<IPublishEndpoint>();
    
            // Fetch pending messages (batch of 100)
            var pendingMessages = await outboxRepository.GetPendingAsync(100, ct);
    
            foreach (var outboxMessage in pendingMessages)
            {
                try
                {
                    // Deserialize and publish
                    var @event = DeserializeEvent(outboxMessage.EventType, outboxMessage.EventPayload);
                    await publishEndpoint.Publish(@event, ctx =>
                    {
                        ctx.MessageId = Guid.Parse(outboxMessage.OutboxId);
                        ctx.CorrelationId = Guid.Parse(outboxMessage.CorrelationId);
                    }, ct);
    
                    // Mark as processed
                    outboxMessage.Status = OutboxStatus.Processed;
                    outboxMessage.ProcessedAt = DateTime.UtcNow;
                    await outboxRepository.UpdateAsync(outboxMessage, ct);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Failed to publish outbox message {OutboxId}", 
                        outboxMessage.OutboxId);
    
                    // Mark as failed, schedule retry
                    outboxMessage.Status = OutboxStatus.Failed;
                    outboxMessage.RetryCount++;
                    outboxMessage.NextRetryAt = DateTime.UtcNow.AddSeconds(
                        Math.Pow(2, outboxMessage.RetryCount)); // Exponential backoff
                    outboxMessage.ErrorMessage = ex.Message;
                    await outboxRepository.UpdateAsync(outboxMessage, ct);
                }
            }
        }
    }
    

  • Outbox Cleanup

  • Periodically delete processed messages (after N days)
  • Archive to cold storage for compliance (optional)
  • Monitor outbox growth

Code Examples: - Outbox table DDL (complete) - OutboxMessage entity and mapping - Atomic write + outbox insert (complete) - OutboxProcessor background worker (complete) - Outbox cleanup job - Outbox monitoring queries

Diagrams: - Outbox pattern sequence diagram - Background processor flow - Retry with exponential backoff

Deliverables: - Outbox pattern implementation (complete) - Background processor setup - Cleanup procedures - Monitoring queries


Topic 14: Outbox Optimization & Monitoring

What will be covered: - Batch Publishing - Fetch multiple pending messages - Publish in batch (if transport supports) - Reduce round-trips

  • Partitioned Outbox Processing
  • Multiple worker instances
  • Partition by TenantId hash
  • Avoid processing same message twice (use row locking)

  • Outbox Monitoring

  • Metrics:
    • Pending message count
    • Failed message count
    • Average age of pending messages
    • Processing throughput (messages/sec)
  • Alerts:

    • Pending count > threshold (e.g., 1000)
    • Average age > threshold (e.g., 1 minute)
    • Failed count increasing
  • Outbox Troubleshooting

  • Query stuck messages
  • Replay failed messages manually
  • Investigate error patterns

Code Examples: - Batch publishing optimization - Partitioned processor (multiple workers) - Outbox monitoring query (Kusto/KQL) - Alert rule definitions - Troubleshooting queries

Diagrams: - Partitioned outbox processing - Monitoring dashboard layout

Deliverables: - Optimization techniques - Monitoring setup - Troubleshooting guide


CYCLE 8: Inbox Pattern & Idempotency (~3,000 lines)

Topic 15: Inbox Pattern for Idempotent Consumers

What will be covered: - Why Inbox Pattern? - At-least-once delivery means duplicates possible - Consumers must be idempotent (process once, ignore duplicates) - Inbox table tracks processed messages - Prevents duplicate processing

  • Inbox Table Schema

    CREATE TABLE ConnectSoft.Audit.Inbox (
        InboxId CHAR(26) PRIMARY KEY,          -- EventId from message
        ConsumerName NVARCHAR(255) NOT NULL,   -- Consumer identifier
        EventType NVARCHAR(255) NOT NULL,
        TenantId NVARCHAR(128) NOT NULL,
        ProcessedAt DATETIME2(3) NOT NULL,
        CorrelationId CHAR(26) NULL,
        UNIQUE (InboxId, ConsumerName)          -- Composite uniqueness
    );
    
    -- Cleanup old entries (after N days)
    CREATE INDEX IX_Inbox_ProcessedAt ON ConnectSoft.Audit.Inbox (ProcessedAt);
    

  • Idempotent Consumer with Inbox Check

    public class AuditAcceptedEventConsumer : IConsumer<AuditRecordAcceptedEvent>
    {
        private readonly IAuditEventProjector _projector;
        private readonly IInboxRepository _inboxRepository;
        private readonly IUnitOfWork _unitOfWork;
    
        public async Task Consume(ConsumeContext<AuditRecordAcceptedEvent> context)
        {
            var @event = context.Message;
            var consumerName = nameof(AuditAcceptedEventConsumer);
    
            // Check inbox (already processed?)
            var alreadyProcessed = await _inboxRepository.ExistsAsync(
                @event.EventId, 
                consumerName,
                context.CancellationToken);
    
            if (alreadyProcessed)
            {
                _logger.LogInformation(
                    "Event {EventId} already processed by {Consumer}, skipping",
                    @event.EventId, consumerName);
                return; // Acknowledge without reprocessing
            }
    
            // Process + record inbox (atomically)
            await _unitOfWork.ExecuteTransactionalAsync(async () =>
            {
                // 1. Process the event
                await _projector.ProjectAsync(@event, context.CancellationToken);
    
                // 2. Record in inbox (same transaction)
                var inboxEntry = new InboxEntry
                {
                    InboxId = @event.EventId,
                    ConsumerName = consumerName,
                    EventType = @event.EventType,
                    TenantId = @event.TenantId,
                    ProcessedAt = DateTime.UtcNow,
                    CorrelationId = @event.Correlation.CorrelationId
                };
                await _inboxRepository.InsertAsync(inboxEntry, context.CancellationToken);
    
            }, context.CancellationToken);
        }
    }
    

  • Inbox Cleanup

  • Periodically delete old entries (after N days, e.g., 30 days)
  • Scheduled job or background worker
  • Keep inbox size manageable

  • Alternative: Natural Idempotency

  • Some operations are naturally idempotent (upsert by key)
  • Projection update: Upsert by (TenantId, AuditRecordId)
  • No inbox needed if operation is truly idempotent

Code Examples: - Inbox table DDL - InboxEntry entity and mapping - Idempotent consumer with inbox check (complete) - Inbox cleanup job - Natural idempotency example (upsert)

Diagrams: - Inbox pattern sequence diagram - Idempotency decision tree

Deliverables: - Inbox pattern implementation - Idempotency strategies - Cleanup procedures


Topic 16: Idempotency Keys & Deduplication

What will be covered: - Idempotency Key in Messages - Client-provided or generated - Format: {tenantId}:{sourceId}:{sequence} or ULID - Included in message envelope - Used for deduplication across retries

  • Deduplication Strategies
  • Message-Level: MessageId-based (Azure Service Bus duplicate detection)
  • Consumer-Level: Inbox pattern (per-consumer deduplication)
  • Business-Level: Natural key uniqueness (e.g., AuditRecordId)

  • Azure Service Bus Duplicate Detection

  • Enable on topic/queue (1-10 minute window)
  • MessageId used as deduplication key
  • Duplicate messages auto-dropped by broker
  • Use ULID as MessageId for time-ordered uniqueness

  • Idempotency in Projections

  • Upsert by (TenantId, AuditRecordId)
  • Last-write-wins if duplicate (same result)
  • Deterministic projections (same input → same output)

  • Idempotency Testing

  • Replay same message twice
  • Verify no duplicate side effects
  • Verify inbox prevents reprocessing
  • Load testing with intentional duplicates

Code Examples: - Idempotency key generation - Azure Service Bus duplicate detection configuration - Upsert-based idempotent operation - Idempotency test scenario

Diagrams: - Deduplication strategy layers - Idempotency key flow

Deliverables: - Idempotency strategy guide - Deduplication configuration - Testing scenarios


CYCLE 9: Dead-Letter Queue (DLQ) Handling (~2,500 lines)

Topic 17: DLQ Architecture & Monitoring

What will be covered: - What is a Dead-Letter Queue? - Messages that fail processing after max retries - Automatically moved to DLQ by Azure Service Bus - Preserves original message and metadata - Requires manual intervention (triage, fix, replay)

  • ATP DLQ Strategy
  • Automatic DLQ per subscription/queue
  • MaxDeliveryCount: 5-10 attempts before DLQ
  • Lock Duration: 60-120 seconds per attempt
  • DLQ Monitoring: Alert when messages land in DLQ

  • DLQ Message Metadata

  • DeadLetterReason: Why message failed (e.g., "MaxDeliveryCountExceeded")
  • DeadLetterErrorDescription: Error details
  • EnqueuedTimeUtc: When originally sent
  • DeadLetterTimeUtc: When moved to DLQ
  • DeliveryCount: How many times attempted
  • Original message body and headers

  • DLQ Monitoring Dashboard

  • Metrics:
    • DLQ message count per queue/subscription
    • DLQ growth rate
    • Average age of DLQ messages
    • DLQ message distribution by error type
  • Alerts:

    • Any message in DLQ (immediate)
    • DLQ count > threshold (e.g., 10)
    • DLQ messages older than N hours
  • DLQ Triage Workflow

  • Alert fires (message in DLQ)
  • SRE reviews DLQ message details
  • Classify error: Transient vs. permanent
  • Fix root cause (if code bug, deploy fix)
  • Replay message (manual or automated)
  • Verify success (message processed, removed from DLQ)

Code Examples: - DLQ monitoring query (Azure Service Bus SDK) - DLQ alert rule (Azure Monitor) - DLQ message inspection code - Error classification logic

Diagrams: - DLQ architecture (automatic movement) - DLQ monitoring dashboard - Triage workflow

Deliverables: - DLQ strategy document - Monitoring setup - Triage procedures


Topic 18: DLQ Replay & Recovery

What will be covered: - Manual DLQ Replay - Azure Portal: Move message back to queue/topic - Service Bus Explorer tool - PowerShell/CLI scripts

  • Automated DLQ Replay
  • DLQ replay consumer
  • Read from DLQ
  • Re-publish to original queue/topic
  • Configurable replay strategies (all, filtered, one-by-one)

  • DLQ Replay Consumer Implementation

    public class DlqReplayConsumer : IConsumer<ReplayDlqCommand>
    {
        private readonly ServiceBusAdministrationClient _adminClient;
        private readonly IPublishEndpoint _publishEndpoint;
    
        public async Task Consume(ConsumeContext<ReplayDlqCommand> context)
        {
            var command = context.Message;
    
            // Read messages from DLQ
            var receiver = _serviceBusClient.CreateReceiver(
                command.QueueName,
                new ServiceBusReceiverOptions
                {
                    SubQueue = SubQueue.DeadLetter
                });
    
            var messages = await receiver.ReceiveMessagesAsync(
                maxMessages: command.BatchSize ?? 10,
                maxWaitTime: TimeSpan.FromSeconds(5));
    
            foreach (var message in messages)
            {
                try
                {
                    // Deserialize original message
                    var originalMessage = DeserializeMessage(message);
    
                    // Re-publish to original destination
                    await _publishEndpoint.Publish(originalMessage, context.CancellationToken);
    
                    // Complete (remove from DLQ)
                    await receiver.CompleteMessageAsync(message);
    
                    _logger.LogInformation("Replayed message {MessageId} from DLQ", 
                        message.MessageId);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Failed to replay message {MessageId}", 
                        message.MessageId);
    
                    // Abandon (leave in DLQ for manual review)
                    await receiver.AbandonMessageAsync(message);
                }
            }
        }
    }
    

  • Selective Replay

  • Filter DLQ messages by tenant, error type, time range
  • Replay only specific messages
  • Dry-run mode (validate without publishing)

  • DLQ Purge

  • Permanently delete poison messages
  • Only after root cause analysis
  • Requires approval (audit trail)

  • Prevent DLQ Recurrence

  • Identify common failure patterns
  • Fix bugs in consumer code
  • Add validation to prevent bad messages
  • Monitor for similar issues

Code Examples: - DLQ replay consumer (complete) - Manual replay script (PowerShell/CLI) - Selective replay with filters - DLQ purge script (with approval) - Failure pattern analysis query

Diagrams: - DLQ replay sequence - Automated replay architecture - Selective replay flow

Deliverables: - DLQ replay implementation - Manual replay scripts - Purge procedures - Prevention strategies


CYCLE 10: Retry Policies & Circuit Breakers (~2,500 lines)

Topic 19: Retry Policies

What will be covered: - MassTransit Retry Middleware

cfg.AddConsumer<AuditAcceptedEventConsumer>(consumerCfg =>
{
    consumerCfg.UseMessageRetry(retry =>
    {
        // Incremental retry: 1s, 2s, 4s, 8s, 16s
        retry.Incremental(5, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2));

        // OR Interval retry: Fixed 5-second intervals
        // retry.Interval(3, TimeSpan.FromSeconds(5));

        // OR Exponential retry: 1s, 2s, 4s, 8s, 16s, 32s
        // retry.Exponential(6, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(32), TimeSpan.FromSeconds(1));

        // Ignore specific exceptions (don't retry)
        retry.Ignore<ValidationException>();
        retry.Ignore<ArgumentException>();

        // Handle specific exceptions differently
        retry.Handle<SqlException>(ex => ex.Number == -2); // Timeout
    });
});

  • Retry Strategies
  • Immediate Retry: Retry instantly (for very transient errors)
  • Interval Retry: Fixed delay between retries
  • Incremental Retry: Increasing delay (1s, 3s, 5s)
  • Exponential Backoff: Exponentially increasing delay (1s, 2s, 4s, 8s)

  • ATP Retry Policy by Consumer

    Consumer                      | Retry Strategy      | Max Attempts | Backoff
    ------------------------------|---------------------|--------------|------------------
    AuditAcceptedEventConsumer    | Exponential         | 5            | 1s → 32s
    ProjectionUpdatedConsumer     | Incremental         | 3            | 2s → 10s
    ExportRequestedConsumer       | Interval            | 5            | 10s (fixed)
    PolicyChangedEventConsumer    | Immediate           | 2            | None
    IntegrityVerifiedConsumer     | Exponential         | 5            | 1s → 32s
    

  • Exception Handling

  • Transient Exceptions: Retry (e.g., SqlException timeout)
  • Permanent Exceptions: Don't retry (e.g., ValidationException)
  • Poison Messages: Log and move to DLQ after max retries

  • Jitter for Thundering Herd

  • Add randomness to backoff delay
  • Prevent all retries happening at same time
  • Reduce load spikes on downstream services

Code Examples: - Retry policy configurations (all strategies) - Exception filtering (ignore, handle) - Jitter implementation - Retry metrics tracking

Diagrams: - Retry strategy comparison - Exponential backoff timeline - Jitter effect visualization

Deliverables: - Retry policy guide - ATP retry configurations - Exception handling strategy


Topic 20: Circuit Breakers & Rate Limiting

What will be covered: - Circuit Breaker Pattern - Protect downstream services from overload - States: Closed (normal), Open (failing), Half-Open (testing recovery) - Threshold: N failures in M seconds → Open circuit - Recovery: After timeout, allow test requests (Half-Open)

  • MassTransit Circuit Breaker (Consumer-Level)

    cfg.AddConsumer<ExternalApiConsumer>(consumerCfg =>
    {
        consumerCfg.UseCircuitBreaker(cb =>
        {
            cb.TrackingPeriod = TimeSpan.FromMinutes(1);
            cb.TripThreshold = 15;  // Open after 15 failures
            cb.ActiveThreshold = 10; // Min messages before tracking
            cb.ResetInterval = TimeSpan.FromMinutes(5); // Half-open after 5 min
        });
    });
    

  • Rate Limiting (Consumer Protection)

    cfg.AddConsumer<HighVolumeConsumer>(consumerCfg =>
    {
        consumerCfg.UseRateLimit(1000, TimeSpan.FromMinutes(1)); // 1000 msgs/min max
    });
    

  • Concurrency Limiting

    cfg.AddConsumer<DatabaseWriteConsumer>(consumerCfg =>
    {
        consumerCfg.UseConcurrentMessageLimit(32); // Max 32 parallel
    });
    

  • ATP Protection Strategy

  • Circuit breakers on external dependencies (e.g., third-party APIs)
  • Rate limiting on high-volume consumers
  • Concurrency limiting on database-intensive consumers
  • Back-pressure signals (reject messages when overwhelmed)

Code Examples: - Circuit breaker configuration (complete) - Rate limiting configuration - Concurrency limiting configuration - Circuit breaker state monitoring - Back-pressure implementation

Diagrams: - Circuit breaker state machine - Rate limiting flow - Back-pressure architecture

Deliverables: - Circuit breaker implementation - Rate limiting guide - Protection strategy document


CYCLE 11: Multi-Tenant Message Isolation (~3,000 lines)

Topic 21: Tenant Context in Messages

What will be covered: - TenantId in Every Message - Required field in message envelope - Extracted at Gateway (from JWT) - Propagated across all message flows - Validated at each consumer

  • Tenant Context Middleware

    public class TenantContextMiddleware<T> : IFilter<ConsumeContext<T>>
        where T : class
    {
        public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
        {
            // Extract tenant context from message
            var tenantId = context.Message is IMessageEnvelope envelope
                ? envelope.TenantId
                : context.Headers.Get<string>("x-tenant-id");
    
            if (string.IsNullOrEmpty(tenantId))
            {
                throw new InvalidOperationException("TenantId missing from message");
            }
    
            // Set in AsyncLocal for downstream access
            TenantContext.Current = new TenantContext
            {
                TenantId = tenantId,
                Edition = context.Headers.Get<string>("x-edition")
            };
    
            try
            {
                await next.Send(context);
            }
            finally
            {
                TenantContext.Current = null;
            }
        }
    }
    

  • Partition Key = TenantId

  • Azure Service Bus partitioning by tenant
  • Load distribution across brokers
  • Ordered processing within tenant
  • Hot partition detection and mitigation

  • Subscription Filters (Tenant-Specific)

  • SQL filter: TenantId = 'acme'
  • Use case: Dedicated subscription for high-volume tenant
  • Use case: Region-specific subscriptions

Code Examples: - Tenant context middleware (complete) - Partition key configuration - Subscription filter (tenant-specific) - Tenant validation in consumer

Diagrams: - Tenant context propagation across messages - Partition key distribution - Subscription filter architecture

Deliverables: - Tenant isolation implementation - Partition strategy - Filter configuration guide


Topic 22: Cross-Tenant Message Prevention

What will be covered: - Validation Rules - Consumer must validate message tenantId - Reject messages for unexpected tenants - Log suspicious cross-tenant messages

  • Repository Tenant Scoping
  • All repository operations scoped to message's tenantId
  • Prevent accidental cross-tenant writes
  • NHibernate filter applied automatically

  • Audit Logging

  • Log every message consumption with tenantId
  • Detect anomalies (message for wrong tenant)
  • Alert on potential security issues

  • Testing Cross-Tenant Isolation

  • Publish message with tenantId A
  • Verify consumer for tenant B rejects it
  • Integration tests for all consumers

Code Examples: - Tenant validation in consumer - Repository scoping (NHibernate filter) - Cross-tenant audit log - Isolation test scenario

Diagrams: - Cross-tenant prevention flow - Repository tenant scoping

Deliverables: - Isolation enforcement guide - Validation patterns - Testing procedures


CYCLE 12: Message Routing & Topology (~2,500 lines)

Topic 23: Message Routing Patterns

What will be covered: - Topic-Based Routing (Pub/Sub) - Publish to topic - Multiple subscriptions (fanout) - Subscription filters for selective delivery

  • Direct Routing (Queue)
  • Send to specific queue
  • Single consumer (or competing consumers)
  • Use case: Commands, requests

  • Content-Based Routing

  • Route based on message properties
  • Subscription SQL filters
  • Use case: Route by classification, edition, region

  • MassTransit Send Topology

    // Publish (fanout to all subscriptions)
    await endpoint.Publish(new AuditRecordAcceptedEvent { ... });
    
    // Send (direct to specific queue)
    await endpoint.Send(new RebuildProjectionCommand { ... });
    
    // Send to specific endpoint
    var endpoint = await bus.GetSendEndpoint(new Uri("queue:atp-export-package"));
    await endpoint.Send(new CreateExportPackageCommand { ... });
    

  • Request/Response Pattern

    // Request
    var client = bus.CreateRequestClient<EvaluatePolicyRequest>();
    var response = await client.GetResponse<PolicyDecisionResponse>(new EvaluatePolicyRequest
    {
        TenantId = tenantId,
        EventPayload = payload
    });
    
    // Handler (responds)
    public class PolicyEvaluationConsumer : IConsumer<EvaluatePolicyRequest>
    {
        public async Task Consume(ConsumeContext<EvaluatePolicyRequest> context)
        {
            var decision = await _policyEngine.EvaluateAsync(context.Message);
    
            await context.RespondAsync(new PolicyDecisionResponse
            {
                Classification = decision.Classification,
                RetentionPolicyId = decision.RetentionPolicyId,
            });
        }
    }
    

Code Examples: - Publish vs. Send usage - Content-based routing (subscription filters) - Request/response pattern (complete) - Direct endpoint addressing

Diagrams: - Routing pattern comparison - Topic fanout vs. queue direct - Request/response flow

Deliverables: - Routing pattern guide - Topology design decisions - Request/response implementation


Topic 24: Message Serialization & Formatting

What will be covered: - MassTransit Serialization - Default: System.Text.Json (JSON) - Alternative: Newtonsoft.Json - Binary: MessagePack (for performance)

  • JSON Serialization Configuration

    cfg.UsingAzureServiceBus((context, config) =>
    {
        config.ConfigureJsonSerializerOptions(options =>
        {
            options.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
            options.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull;
            options.Converters.Add(new JsonStringEnumConverter());
        });
    
        // ... other configuration
    });
    

  • Message Envelope Standards

  • Content-Type header: application/json; charset=utf-8
  • Message body: JSON with camelCase properties
  • Headers: Custom ATP headers (x-tenant-id, etc.)

  • Encryption (if needed)

  • Encrypt sensitive payload fields
  • Use envelope encryption (DEK wrapped by KEK)
  • Decrypt in consumer

  • Compression (for large messages)

  • GZIP compression
  • Reduce message size
  • Automatic decompression in consumer

Code Examples: - JSON serialization configuration - Custom serializer (if needed) - Encryption/decryption in publish/consume - Compression configuration

Diagrams: - Serialization pipeline - Encryption envelope

Deliverables: - Serialization configuration - Encryption patterns - Compression strategies


CYCLE 13: Observability & Monitoring (~3,000 lines)

Topic 25: Message Tracing & Correlation

What will be covered: - W3C Trace Context Propagation - Traceparent header: 00-{traceId}-{spanId}-{flags} - Tracestate header: Vendor-specific context - MassTransit automatic trace propagation

  • Correlation IDs
  • CorrelationId: Request identifier (entire workflow)
  • ConversationId: Message conversation (request + responses)
  • InitiatorId: Saga that started workflow
  • CausationId: Event that caused this event (parent event ID)

  • OpenTelemetry Integration

  • MassTransit built-in OTel support
  • Automatic span creation for publish/consume
  • Span attributes: message type, tenant, queue name
  • Distributed trace across services

  • ATP Trace Visualization

  • Trace: API request → Ingestion → Outbox → Service Bus → Projection → Read Model
  • Identify bottlenecks (slow consumers)
  • Identify failures (error spans)
  • End-to-end latency measurement

Code Examples: - OTel configuration for MassTransit - Custom span attributes - Correlation ID propagation - Trace query (Application Insights)

Diagrams: - Distributed trace visualization - Correlation ID flow - Span hierarchy

Deliverables: - Tracing configuration - Correlation strategy - Trace analysis guide


Topic 26: Messaging Metrics & Dashboards

What will be covered: - MassTransit Metrics - Publish Metrics: - Messages published per second - Publish latency (time to send to broker) - Publish failures - Consume Metrics: - Messages consumed per second - Consume latency (time to process message) - Consumer exceptions - Retry count - DLQ count - Saga Metrics: - Active saga instances - Saga completion rate - Saga timeout rate

  • Azure Service Bus Metrics
  • Queue/topic message count (active, DLQ, scheduled)
  • Incoming/outgoing messages per second
  • Throttled requests
  • Server errors
  • Message size distribution

  • ATP Messaging Dashboard

  • Overview Panel: Total throughput, error rate, DLQ count
  • Per-Topic Panel: Message count, publish rate, consumer lag
  • Per-Consumer Panel: Consume rate, latency, error rate
  • Saga Panel: Active instances, completion rate, timeout rate
  • Outbox Panel: Pending count, age, failed count

  • Alerting Rules

  • DLQ count > 0 (immediate alert)
  • Consumer lag > 30 seconds
  • Publish failures > 1% error rate
  • Outbox age > 1 minute (stuck messages)
  • Queue depth > 10,000 messages

Code Examples: - MassTransit metrics configuration - Custom metric collection - Dashboard query (Kusto/KQL) - Alert rule definitions (Azure Monitor)

Diagrams: - Metrics collection architecture - Dashboard layout mockup - Alert flow

Deliverables: - Metrics catalog - Dashboard templates - Alert rules


CYCLE 14: Testing Strategies (~2,500 lines)

Topic 27: Unit Testing Messaging Components

What will be covered: - In-Memory Test Harness

[Test]
public async Task Consumer_Should_Process_Event()
{
    // Arrange
    await using var provider = new ServiceCollection()
        .AddMassTransitTestHarness(cfg =>
        {
            cfg.AddConsumer<AuditAcceptedEventConsumer>();
        })
        .BuildServiceProvider(true);

    var harness = provider.GetRequiredService<ITestHarness>();
    await harness.Start();

    // Act
    await harness.Bus.Publish(new AuditRecordAcceptedEvent
    {
        EventId = Ulid.NewUlid().ToString(),
        TenantId = "test-tenant",
        AuditRecordId = "01JE...",
    });

    // Assert
    Assert.IsTrue(await harness.Consumed.Any<AuditRecordAcceptedEvent>());

    var consumerHarness = harness.GetConsumerHarness<AuditAcceptedEventConsumer>();
    Assert.IsTrue(await consumerHarness.Consumed.Any<AuditRecordAcceptedEvent>());
}

  • Testing Consumers
  • Mock dependencies (repositories, services)
  • Verify message consumed
  • Verify side effects (database writes, event publishes)
  • Verify error handling

  • Testing Publishers

  • Verify message published
  • Verify message content
  • Verify headers and metadata

  • Testing Sagas

  • Simulate event sequences
  • Verify state transitions
  • Verify compensation logic
  • Test timeouts

Code Examples: - Test harness setup (complete) - Consumer unit test - Publisher unit test - Saga unit test - Mock dependencies setup

Diagrams: - Test harness architecture - Testing pyramid (unit, integration, E2E)

Deliverables: - Unit test suite - Test harness patterns - Mock strategies


Topic 28: Integration & Contract Testing

What will be covered: - Integration Testing with Real Transport - Docker container for Azure Service Bus emulator (Azurite) - Or connect to dedicated test namespace - End-to-end message flow

  • Contract Testing
  • Verify publisher produces expected schema
  • Verify consumer accepts expected schema
  • Pact or similar contract testing framework
  • Schema evolution compatibility tests

  • Performance Testing

  • Load testing with high message volumes
  • Measure throughput (messages/sec)
  • Measure latency (end-to-end)
  • Identify bottlenecks

  • Chaos Testing

  • Simulate broker failures
  • Simulate slow consumers
  • Simulate message loss (DLQ scenarios)
  • Verify resilience

Code Examples: - Integration test with real Service Bus - Contract test (publisher vs. consumer schema) - Load test script (JMeter, NBomber) - Chaos test scenarios

Diagrams: - Integration test architecture - Contract testing flow - Load test setup

Deliverables: - Integration test suite - Contract test suite - Performance benchmarks - Chaos test scenarios


CYCLE 15: Performance & Scalability (~2,500 lines)

Topic 29: Message Throughput Optimization

What will be covered: - Batching - Publish multiple messages in batch - Consume multiple messages in batch - Reduce round-trips to broker

  • Prefetching
  • PrefetchCount: Fetch N messages ahead
  • Reduces latency (messages ready to process)
  • Tune based on message processing time

  • Concurrent Processing

  • ConcurrentMessageLimit per consumer
  • Parallel message processing
  • CPU and memory considerations

  • Partitioning for Scale

  • Partition topics/queues by tenantId
  • Distribute load across multiple brokers
  • Scale out consumers (multiple instances per partition)

  • Premium Tier Optimization

  • Dedicated messaging units
  • Higher throughput limits
  • Lower latency
  • Geo-replication (optional)

Code Examples: - Batch publishing - Batch consuming - Prefetch configuration - Concurrent processing tuning - Partitioning optimization

Diagrams: - Batching performance improvement - Concurrent processing architecture - Partitioning scale-out

Deliverables: - Performance optimization guide - Tuning parameters - Scalability patterns


Topic 30: Scalability Patterns

What will be covered: - Horizontal Scaling (Consumer Scale-Out) - Deploy multiple consumer instances - Automatic load distribution (competing consumers) - Kubernetes HorizontalPodAutoscaler (HPA) based on queue depth

  • Vertical Scaling (Consumer Resources)
  • Increase CPU/memory per pod
  • Handle larger message volumes per instance

  • Auto-Scaling Triggers

  • Queue depth > threshold → Scale out
  • CPU > 70% → Scale up
  • Queue depth near zero → Scale in (after cooldown)

  • KEDA (Kubernetes Event-Driven Autoscaling)

    apiVersion: keda.sh/v1alpha1
    kind: ScaledObject
    metadata:
      name: projection-consumer-scaler
    spec:
      scaleTargetRef:
        name: projection-consumer
      minReplicaCount: 2
      maxReplicaCount: 20
      triggers:
      - type: azure-servicebus
        metadata:
          queueName: audit-accepted-event
          namespace: atp-prod
          messageCount: "50"  # Scale up if > 50 messages pending
    

  • Backlog Management

  • Monitor queue backlog depth
  • Alert if backlog grows beyond threshold
  • Scale consumers proactively
  • Throttle producers if needed (back-pressure)

Code Examples: - Kubernetes HPA configuration - KEDA ScaledObject (complete) - Auto-scaling trigger logic - Backlog monitoring query

Diagrams: - Horizontal scaling with HPA - KEDA auto-scaling architecture - Backlog management flow

Deliverables: - Scalability design - Auto-scaling configuration - Backlog management procedures


CYCLE 16: Operations & Best Practices (~3,000 lines)

Topic 31: Operational Runbooks

What will be covered: - Routine Operations - Daily Health Checks - Verify consumers running - Check queue depths - Review DLQ (should be empty) - Check outbox age - Weekly Maintenance - Review performance metrics - Optimize slow consumers - Clean up old inbox/outbox entries - Update retry policies if needed - Monthly Reviews - Capacity planning (queue growth) - Cost optimization (right-size messaging units) - Schema evolution planning

  • Incident Response
  • Consumer Down
    1. Alert fires (no consumption for N minutes)
    2. Check consumer pod status (Kubernetes)
    3. Review logs for errors
    4. Restart consumer if needed
    5. Verify backlog starts decreasing
  • DLQ Messages
    1. Alert fires (message in DLQ)
    2. Review DLQ message details
    3. Classify error (transient vs. permanent)
    4. Fix root cause (code bug, configuration)
    5. Replay message
  • Broker Outage

    1. Alert fires (Service Bus unavailable)
    2. Verify Azure Service Bus status
    3. Activate outbox pattern (messages queued locally)
    4. Wait for broker recovery
    5. Outbox processor publishes queued messages
  • Message Replay Procedures

  • Replay from DLQ (after fixing issue)
  • Replay from event store (rebuild projections)
  • Replay with filters (tenant, time range)
  • Dry-run mode (validate before live replay)

  • Emergency Procedures

  • Pause consumer (stop processing)
  • Purge queue (emergency, requires approval)
  • Throttle producer (back-pressure)
  • Circuit breaker manual open (protect downstream)

Code Examples: - Health check queries - Consumer restart script (Kubernetes) - DLQ triage script - Message replay script - Emergency pause procedure

Diagrams: - Operational workflow - Incident response flowchart - Replay procedure sequence

Deliverables: - Operations manual - Runbook templates - Incident response procedures - Emergency protocols


Topic 32: Best Practices & Anti-Patterns

What will be covered: - Messaging Best Practices - ✅ Design idempotent consumers (use inbox pattern or natural idempotency) - ✅ Use transactional outbox (atomic DB + message publishing) - ✅ Partition by tenantId (load distribution, ordered processing) - ✅ Include correlation IDs (distributed tracing) - ✅ Version messages (schema evolution) - ✅ Monitor DLQ (alert immediately on any message) - ✅ Retry with exponential backoff (avoid thundering herd) - ✅ Use circuit breakers (protect downstream services) - ✅ Validate messages (reject invalid messages early) - ✅ Test message flows (unit, integration, contract tests) - ✅ Document message contracts (AsyncAPI specs) - ✅ Secure with Managed Identity (no connection strings in code)

  • Anti-Patterns to Avoid
  • Non-idempotent consumers (duplicate processing causes errors)
  • Direct Service Bus SDK (bypasses MassTransit abstractions)
  • Synchronous request/response over messaging (defeats async purpose)
  • Large message payloads (use claim check pattern instead)
  • No retry policy (transient failures cause DLQ)
  • Ignoring DLQ (poison messages accumulate)
  • No correlation IDs (impossible to debug distributed flows)
  • Hardcoded queue names (use MassTransit conventions)
  • Missing tenant context (cross-tenant data leaks)
  • Blocking operations in consumers (starves thread pool)

  • Code Review Checklist

  • Consumer is idempotent (inbox or natural idempotency)
  • Outbox pattern used for critical events
  • Retry policy configured appropriately
  • Circuit breaker on external dependencies
  • TenantId validated in every consumer
  • Correlation IDs propagated
  • Async/await throughout (no blocking)
  • Message schema versioned
  • DLQ monitoring configured
  • Unit tests for consumers
  • Integration tests for message flows
  • Observability (logs, metrics, traces)

  • Performance Checklist

  • PrefetchCount tuned (32-256)
  • ConcurrentMessageLimit tuned (64-512)
  • Batch operations where applicable
  • No SELECT N+1 in consumer (use eager loading)
  • Circuit breaker protects slow dependencies
  • Partitioning enabled for high throughput
  • Premium tier for production (dedicated capacity)
  • Auto-scaling configured (KEDA)

Code Examples: - Idempotent consumer (good example) - Non-idempotent consumer (bad example) - Outbox pattern (good) - Direct publish without outbox (bad) - Async consumer (good) - Blocking consumer (bad)

Diagrams: - Best practices reference architecture - Anti-patterns to avoid (before/after)

Deliverables: - Best practices handbook - Anti-pattern catalog - Code review checklist - Performance checklist


Summary of Deliverables

Across all 16 cycles, this documentation will provide:

  1. Architecture & Patterns
  2. Event-driven architecture (EDA) overview
  3. Messaging patterns (pub/sub, command, saga)
  4. Message taxonomy and contracts
  5. Delivery guarantees and semantics

  6. MassTransit Implementation

  7. Configuration and DI setup
  8. Consumer registration patterns
  9. Publisher patterns
  10. Saga state machine implementations

  11. Azure Service Bus Integration

  12. Topology design (topics, queues, subscriptions, DLQ)
  13. Security (Managed Identity, RBAC, private endpoints)
  14. Performance tuning
  15. Monitoring and diagnostics

  16. Message Contracts

  17. ATP event/command definitions
  18. Message envelope specifications
  19. Schema versioning strategy
  20. Validation rules

  21. Consumers & Handlers

  22. Consumer implementations for all ATP events
  23. Idempotent consumer patterns (inbox)
  24. Message filters and middleware
  25. Error handling strategies

  26. Saga Orchestration

  27. Export workflow saga
  28. Integrity sealing saga
  29. Saga state persistence (NHibernate)
  30. Compensation patterns

  31. Reliability Patterns

  32. Transactional outbox implementation
  33. Inbox pattern for deduplication
  34. DLQ handling and replay procedures
  35. Retry policies and circuit breakers

  36. Multi-Tenant Isolation

  37. Tenant context propagation in messages
  38. Partition key strategies
  39. Cross-tenant prevention
  40. Subscription filters

  41. Observability

  42. Distributed tracing (OpenTelemetry)
  43. Messaging metrics and dashboards
  44. Alerting rules
  45. Troubleshooting guides

  46. Testing

    • Unit tests with test harness
    • Integration tests with real transport
    • Contract tests for schema compatibility
    • Performance and chaos testing
  47. Operations

    • Operational runbooks
    • Incident response procedures
    • Best practices and anti-patterns
    • Code review checklists

Next Steps

  1. Review & Approval: Validate cycle plan with architecture and platform teams
  2. Cycle 1 Generation: Begin content generation for messaging architecture overview
  3. Message Contracts: Define all ATP event and command classes
  4. Consumer Templates: Create consumer implementation templates
  5. Saga Implementations: Develop saga state machines for ATP workflows
  6. Test Suite: Build messaging test suite with test harness


This documentation plan covers the complete messaging implementation for ATP, from event-driven architecture and MassTransit configuration to saga orchestration, reliability patterns, and operational excellence, fully leveraging MassTransit with Azure Service Bus and ConnectSoft.Extensions.MessagingModel libraries.