Messaging Implementation - Audit Trail Platform (ATP)¶
Event-driven messaging with MassTransit & Azure Service Bus — ATP's messaging layer implements publish/subscribe patterns, saga orchestration, transactional outbox, idempotent consumers, and dead-letter handling using MassTransit with Azure Service Bus for reliable, scalable, multi-tenant message-driven workflows.
📋 Documentation Generation Plan¶
This document will be generated in 16 cycles. Current progress:
| Cycle | Topics | Estimated Lines | Status |
|---|---|---|---|
| Cycle 1 | Messaging Architecture & Event-Driven Design (1-2) | ~3,000 | ⏳ Not Started |
| Cycle 2 | MassTransit Configuration & Setup (3-4) | ~3,500 | ⏳ Not Started |
| Cycle 3 | Azure Service Bus Integration (5-6) | ~3,000 | ⏳ Not Started |
| Cycle 4 | Message Contracts & Publishing (7-8) | ~3,500 | ⏳ Not Started |
| Cycle 5 | Consumers & Message Handlers (9-10) | ~3,000 | ⏳ Not Started |
| Cycle 6 | Saga State Machines (11-12) | ~4,000 | ⏳ Not Started |
| Cycle 7 | Outbox Pattern Implementation (13-14) | ~3,000 | ⏳ Not Started |
| Cycle 8 | Inbox Pattern & Idempotency (15-16) | ~3,000 | ⏳ Not Started |
| Cycle 9 | Dead-Letter Queue (DLQ) Handling (17-18) | ~2,500 | ⏳ Not Started |
| Cycle 10 | Retry Policies & Circuit Breakers (19-20) | ~2,500 | ⏳ Not Started |
| Cycle 11 | Multi-Tenant Message Isolation (21-22) | ~3,000 | ⏳ Not Started |
| Cycle 12 | Message Routing & Topology (23-24) | ~2,500 | ⏳ Not Started |
| Cycle 13 | Observability & Monitoring (25-26) | ~3,000 | ⏳ Not Started |
| Cycle 14 | Testing Strategies (27-28) | ~2,500 | ⏳ Not Started |
| Cycle 15 | Performance & Scalability (29-30) | ~2,500 | ⏳ Not Started |
| Cycle 16 | Operations & Best Practices (31-32) | ~3,000 | ⏳ Not Started |
Total Estimated Lines: ~48,000
Purpose & Scope¶
This document defines ATP's messaging implementation using MassTransit as the messaging abstraction layer over Azure Service Bus, covering event-driven architecture, message contracts, consumer patterns, saga orchestration, transactional outbox/inbox, retry handling, dead-letter queues, and observability.
Key Technologies & Patterns
- MassTransit 8.x: Modern .NET messaging framework with abstractions over transports
- Azure Service Bus: Managed message broker (topics, queues, subscriptions)
- Publish/Subscribe: Event-driven choreography pattern
- Saga State Machines: Long-running process orchestration
- Transactional Outbox: Atomic database + message publishing
- Inbox Pattern: Idempotent message consumption with deduplication
- DLQ (Dead-Letter Queue): Poison message handling
- Correlation & Causation: Distributed tracing across message flows
- ConnectSoft.Extensions.MessagingModel: Proven abstractions from microservice template
Detailed Cycle Plan¶
CYCLE 1: Messaging Architecture & Event-Driven Design (~3,000 lines)¶
Topic 1: ATP Messaging Architecture Overview¶
What will be covered: - Event-Driven Architecture (EDA) in ATP - Why event-driven? (decoupling, scalability, resilience) - Choreography vs. orchestration (ATP uses both) - ATP event-driven flows (ingestion → projection → integrity → export)
- Messaging Patterns in ATP
- Publish/Subscribe: Domain events (audit.accepted, projection.updated, policy.changed)
- Command/Request-Response: Export requests, policy evaluations
- Saga Orchestration: Long-running workflows (export packages, integrity verification)
-
Competing Consumers: Parallel message processing with load distribution
-
Message Types
- Domain Events: Something that happened (immutable, past tense)
- Integration Events: Cross-service notifications
- Commands: Intent to perform action (imperative, present tense)
-
Requests: Query for information (request/response pattern)
-
ATP Message Taxonomy
Events (Publish/Subscribe): - audit.appended (Ingestion → Integrity, Projection) - audit.accepted (Ingestion → Projection, Search) - projection.updated (Projection → Query, Export) - integrity.verified (Integrity → Export, Admin) - export.requested (API → Export service) - export.completed (Export → API, Webhooks) - policy.changed (Admin → ALL services for cache bust) Commands (Direct/Queue): - RebuildProjection (Admin → Projection worker) - ReplayFromDLQ (Admin → DLQ processor) - SealSegment (Integrity scheduler → Integrity worker) -
Guaranteed Delivery Semantics
- At-Least-Once Delivery: Azure Service Bus guarantees
- Exactly-Once Processing: Achieved via idempotency (inbox pattern)
- Ordering: Within partition key only (tenantId or tenantId:sourceId)
- Back-Pressure: Consumer concurrency limits, queue depth monitoring
Code Examples: - Event vs. Command interface markers - Message taxonomy diagram - Event-driven flow sequence diagrams
Diagrams: - ATP messaging architecture (services, queues, topics) - Event choreography (audit flow) - Saga orchestration (export workflow) - Message delivery guarantees
Deliverables: - Messaging architecture overview - EDA pattern catalog - Message type taxonomy - Delivery guarantees specification
Topic 2: MassTransit & Azure Service Bus Strategy¶
What will be covered: - Why MassTransit? - Transport abstraction (can swap Azure Service Bus for RabbitMQ, Amazon SQS) - Built-in retry policies, circuit breakers, rate limiting - Saga state machine support (Automatonymous) - Outbox pattern support - Request/response pattern - Message scheduling and delayed delivery - Comprehensive observability (OpenTelemetry, logs, metrics)
- Why Azure Service Bus?
- Managed Service: No infrastructure management
- Enterprise Features: Topics/subscriptions, sessions, duplicate detection
- Security: Managed Identity, RBAC, private endpoints
- Reliability: Built-in DLQ, at-least-once delivery
- Scalability: Partitioned entities, premium tier with high throughput
-
Compliance: SOC 2, ISO 27001, HIPAA-ready
-
Alternative Transports (for reference)
- RabbitMQ (self-hosted, on-prem scenarios)
- Amazon SQS/SNS (AWS deployments)
-
In-Memory (development, testing)
-
ATP Transport Decision
- Primary: Azure Service Bus (Premium tier for production)
- Dev/Test: In-Memory or Azure Service Bus (Standard tier)
- Rationale: Native Azure integration, managed service, enterprise features
Code Examples: - MassTransit abstraction interfaces (IBus, IPublishEndpoint) - Azure Service Bus topic/queue/subscription model - Transport configuration comparison
Diagrams: - MassTransit abstraction layers - Azure Service Bus topology - Alternative transport comparison
Deliverables: - Technology selection rationale - Transport strategy document - Topology design principles
CYCLE 2: MassTransit Configuration & Setup (~3,500 lines)¶
Topic 3: MassTransit Registration & DI Setup¶
What will be covered: - AddMassTransit Extension Method
services.AddMassTransit(cfg =>
{
// Set endpoint name formatter (kebab-case)
cfg.SetKebabCaseEndpointNameFormatter();
// Register consumers
cfg.AddConsumer<AuditAcceptedEventConsumer>();
cfg.AddConsumer<ProjectionUpdatedEventConsumer>();
cfg.AddConsumer<ExportRequestedCommandConsumer>();
// Register saga state machines
cfg.AddSagaStateMachine<ExportWorkflowSaga, ExportWorkflowState>()
.NHibernateRepository(repo =>
{
// Persist saga state to NHibernate
repo.UseConnectionString(connectionString);
repo.Dialect<MsSql2012Dialect>();
});
// Configure transport (Azure Service Bus)
cfg.UsingAzureServiceBus((context, config) =>
{
config.Host(connectionString);
// Configure receive endpoints
config.ConfigureEndpoints(context);
});
});
- Endpoint Name Formatter
- Kebab-case convention:
audit-accepted-event(queue name) - Consistency across services
-
Readability in Azure Portal
-
Consumer Registration
- Automatic consumer discovery from assembly
- Manual consumer registration
-
Consumer definitions for custom configuration
-
Saga State Machine Registration
- Saga persistence options (NHibernate, MongoDB, EF Core, Redis)
- ATP uses NHibernate for saga state (same as domain)
-
Saga repository configuration
-
Transport Configuration
- UsingAzureServiceBus method
- Host connection string (from Key Vault)
- Managed Identity authentication (production)
- ConfigureEndpoints auto-wiring
Code Examples: - Complete MassTransit registration (ATP) - Consumer registration patterns - Saga state machine registration - Endpoint configuration
Diagrams: - MassTransit DI registration flow - Consumer/saga wiring - Endpoint auto-configuration
Deliverables: - MassTransit setup guide - Registration patterns - Configuration templates
Topic 4: Configuration Options & Environment-Specific Settings¶
What will be covered: - MassTransit Options Configuration
{
"MassTransit": {
"HostOptions": {
"StartTimeout": "00:01:00",
"StopTimeout": "00:01:00"
},
"AzureServiceBusTransport": {
"AzureServiceBusHost": {
"ConnectionString": "Endpoint=sb://...",
"FullyQualifiedNamespace": "atp-prod.servicebus.windows.net",
"UseManagedIdentity": true,
"ClientId": "",
"UseWebSockets": false,
"OperationTimeoutSeconds": 60
},
"AzureServiceBusReceiveEndpoint": {
"PrefetchCount": 32,
"ConcurrentMessageLimit": 64
},
"RetryLimit": 5,
"RetryMinBackoffSeconds": 1,
"RetryMaxBackoffSeconds": 30
}
}
}
- Environment-Specific Configuration
- Development: In-Memory transport or Azure Service Bus (Standard)
- Docker: Service Bus emulator or shared dev namespace
- Test: In-Memory with simulated delays
-
Production: Azure Service Bus Premium with Managed Identity
-
Connection String Management
- Stored in Azure Key Vault (production)
- Retrieved via Managed Identity
- No connection strings in code or config files
-
Rotation procedures
-
Managed Identity Authentication
-
Performance Tuning Options
- PrefetchCount: Messages fetched ahead (32-256)
- ConcurrentMessageLimit: Parallel processing (64-512)
- MaxAutoLockRenewalDuration: Long-running message processing
- MaxConcurrentCalls: Per-consumer concurrency
Code Examples: - Complete appsettings.json (MassTransit section) - Environment-specific configurations - Managed Identity configuration - Performance tuning settings - Key Vault connection string retrieval
Diagrams: - Configuration hierarchy (env-specific) - Managed Identity authentication flow - Performance tuning impact
Deliverables: - Configuration reference - Environment setup guide - Performance tuning playbook
CYCLE 3: Azure Service Bus Integration (~3,000 lines)¶
Topic 5: Azure Service Bus Topology Design¶
What will be covered:
- ATP Azure Service Bus Namespace
- Premium Tier (production): 1-8 messaging units, dedicated capacity
- Standard Tier (dev/test): Shared capacity, cost-effective
- Namespace: sb-atp-prod-eastus2.servicebus.windows.net
- Private endpoint for VNet isolation
- Managed Identity RBAC for access control
-
Topics & Subscriptions
Topics (Events - Publish/Subscribe): - atp.audit.v1 (audit.appended, audit.accepted) - atp.projection.v1 (projection.updated) - atp.integrity.v1 (integrity.verified) - atp.export.v1 (export.requested, export.completed) - atp.policy.v1 (policy.changed) Subscriptions per Topic: - atp.audit.v1: - ingestion-service (for processing appended events) - projection-service (for building read models) - integrity-service (for hash chain computation) - atp.projection.v1: - query-service (for read model updates) - export-service (for package generation) - atp.policy.v1: - ALL-services (broadcast policy changes) -
Queues (Commands/Requests)
-
Dead-Letter Queues (DLQ)
- Automatic DLQ per subscription:
{subscription}/$DeadLetterQueue - Automatic DLQ per queue:
{queue}/$DeadLetterQueue -
DLQ monitoring and replay procedures
-
Message Sessions
- Ordered message processing per session
- Session ID =
tenantId(for strict ordering per tenant) -
Use case: Policy updates must be processed in order
-
Duplicate Detection
- Enable duplicate detection window (1-10 minutes)
- MessageId-based deduplication
-
Use ULID as MessageId for natural ordering + uniqueness
-
Partitioning
- Enable partitioning for higher throughput
- Partition key =
tenantIdortenantId:sourceId - Load distribution across multiple brokers
Code Examples: - Topic/subscription creation (Pulumi) - Queue configuration with sessions - Duplicate detection setup - Partitioning configuration - DLQ monitoring query
Diagrams: - ATP Azure Service Bus topology - Topic/subscription model - Partitioning strategy - DLQ architecture
Deliverables: - Topology design specification - Provisioning scripts (Pulumi) - Naming conventions - Partitioning strategy
Topic 6: Azure Service Bus Security & Access Control¶
What will be covered: - Managed Identity Authentication - System-assigned Managed Identity per service - User-assigned Managed Identity for shared access - No Shared Access Signatures (SAS) in application code
- RBAC Roles
- Azure Service Bus Data Owner: Full access (admin only)
- Azure Service Bus Data Sender: Publish messages (producers)
- Azure Service Bus Data Receiver: Consume messages (consumers)
-
Least privilege: Each service gets only required roles
-
ATP RBAC Assignments
Service | Role | Scope -------------------------|-------------------------------|---------------------------- Ingestion | Data Sender | Topic: atp.audit.v1 Projection | Data Receiver | Subscription: projection-service Projection | Data Sender | Topic: atp.projection.v1 Integrity | Data Receiver | Subscription: integrity-service Integrity | Data Sender | Topic: atp.integrity.v1 Export | Data Receiver | Queue: atp.export-package Export | Data Sender | Topic: atp.export.v1 Admin | Data Sender | All queues (commands) -
Private Endpoints
- Service Bus accessible only via VNet
- Private endpoint in ATP VNet
- DNS resolution via Private DNS Zone
-
No public internet access
-
Network Security
- Firewall rules (allow only trusted IPs for management)
- VNet service endpoints (fallback if private endpoint not available)
-
TLS 1.2+ enforcement
-
Audit Logging
- Azure Monitor Diagnostic Settings
- Log all administrative operations
- Log authentication failures
- Log message delivery failures
Code Examples: - Managed Identity configuration in MassTransit - RBAC role assignment (Azure CLI/Pulumi) - Private endpoint setup (Pulumi) - Diagnostic settings configuration - Audit log query (Kusto/KQL)
Diagrams: - Managed Identity authentication flow - RBAC role assignments - Private endpoint network topology - Audit logging architecture
Deliverables: - Security configuration guide - RBAC assignment matrix - Network security setup - Audit logging queries
CYCLE 4: Message Contracts & Publishing (~3,500 lines)¶
Topic 7: Message Contract Design¶
What will be covered: - Message Contract Principles - Immutable: Once published, never change schema version - Versioned: SchemaVersion field for evolution - Self-Contained: All info needed to process (no hidden dependencies) - Documented: XML comments, AsyncAPI specs
-
ATP Message Envelope
public interface IMessageEnvelope { string EventId { get; } // ULID (unique, time-ordered) string EventType { get; } // e.g., "audit.accepted" string SchemaVersion { get; } // e.g., "1.2.0" DateTime OccurredAt { get; } // UTC timestamp string TenantId { get; } // Tenant context string Edition { get; } // Edition context CorrelationContext Correlation { get; } // Tracing string IdempotencyKey { get; } // Deduplication } public class CorrelationContext { public string TraceId { get; set; } // W3C trace ID (hex32) public string SpanId { get; set; } // W3C span ID (hex16) public string CorrelationId { get; set; } // Request ID public string CausationId { get; set; } // Parent event ID } -
ATP Domain Event Example
public class AuditRecordAcceptedEvent : IEvent { // Envelope properties public string EventId { get; set; } public string EventType => "audit.accepted"; public string SchemaVersion => "1.0.0"; public DateTime OccurredAt { get; set; } public string TenantId { get; set; } public string Edition { get; set; } public CorrelationContext Correlation { get; set; } public string IdempotencyKey { get; set; } // Payload (event-specific data) public string AuditRecordId { get; set; } public string Action { get; set; } public string ResourceType { get; set; } public string ResourceId { get; set; } public string ActorId { get; set; } public DataClassification Classification { get; set; } public int PolicyVersion { get; set; } public int PayloadBytes { get; set; } } -
Message Contract Validation
- FluentValidation rules
- Required fields enforced
- String length limits
- Pattern validation (ULID format, etc.)
-
MassTransit validation pipeline
-
Schema Evolution Strategy
- Additive changes: Add optional fields (bump minor version)
- Breaking changes: New event type or major version bump
- Dual publishing: Publish both old and new versions during migration
- Consumer compatibility: Support N-1 schema versions
Code Examples: - IMessageEnvelope interface - AuditRecordAcceptedEvent (complete) - ProjectionUpdatedEvent - ExportRequestedCommand - IntegrityVerifiedEvent - PolicyChangedEvent - FluentValidation rules for message - Schema versioning example
Diagrams: - Message envelope structure - Event inheritance hierarchy - Schema evolution timeline
Deliverables: - Message contract specifications - All ATP event/command definitions - Validation rules - Evolution strategy guide
Topic 8: Message Publishing Patterns¶
What will be covered: - Publishing via IPublishEndpoint
public class IngestionService
{
private readonly IPublishEndpoint _publishEndpoint;
public async Task PublishAuditAcceptedAsync(
AuditRecord auditRecord,
CancellationToken ct)
{
var @event = new AuditRecordAcceptedEvent
{
EventId = Ulid.NewUlid().ToString(),
OccurredAt = DateTime.UtcNow,
TenantId = auditRecord.TenantId,
AuditRecordId = auditRecord.AuditRecordId,
Action = auditRecord.Action,
// ... other properties
};
await _publishEndpoint.Publish(@event, ctx =>
{
// Set message headers
ctx.MessageId = Guid.Parse(@event.EventId);
ctx.CorrelationId = Guid.Parse(auditRecord.CorrelationId);
ctx.Headers.Set("x-tenant-id", auditRecord.TenantId);
ctx.Headers.Set("x-schema-version", @event.SchemaVersion);
// Set partition key (Azure Service Bus)
ctx.SetPartitionKey(auditRecord.TenantId);
// Set time-to-live (optional)
ctx.TimeToLive = TimeSpan.FromDays(7);
}, ct);
}
}
-
Publishing via IBus (for scheduled/delayed messages)
-
Publishing from Outbox (transactional)
// 1. Write audit record + outbox entry (same transaction) unitOfWork.ExecuteTransactional(() => { auditRecordRepository.Insert(auditRecord); outboxRepository.Insert(new OutboxMessage { EventType = "audit.accepted", EventPayload = JsonSerializer.Serialize(@event), // ... }); }); // 2. Background worker publishes from outbox // (Covered in Outbox Pattern cycle) -
Correlation Context Propagation
- Automatically propagate trace/correlation IDs
- MassTransit built-in support for correlation
-
Custom headers for ATP-specific context
-
Message Metadata
- MessageId: Unique identifier (ULID)
- CorrelationId: Request correlation
- ConversationId: Multi-message conversation tracking
- InitiatorId: Saga that initiated message
- TimeToLive: Message expiration
-
ContentType: application/json
-
Publish Topology Configuration
- MassTransit determines topic from message type
- Override with Publish
(configurator => ...) - Fanout to multiple subscriptions automatically
Code Examples: - Publishing with IPublishEndpoint (complete) - Scheduled message publishing - Publishing with custom headers - Correlation context propagation - Publish topology configuration
Diagrams: - Publishing flow (service → MassTransit → Service Bus) - Correlation context propagation - Publish topology
Deliverables: - Publishing pattern guide - Correlation strategy - Metadata specification
CYCLE 5: Consumers & Message Handlers (~3,000 lines)¶
Topic 9: Consumer Implementation¶
What will be covered:
- IConsumer
public class AuditAcceptedEventConsumer : IConsumer<AuditRecordAcceptedEvent>
{
private readonly IAuditEventProjector _projector;
private readonly ILogger<AuditAcceptedEventConsumer> _logger;
public async Task Consume(ConsumeContext<AuditRecordAcceptedEvent> context)
{
var @event = context.Message;
_logger.LogInformation(
"Processing AuditRecordAcceptedEvent: {AuditRecordId} for tenant {TenantId}",
@event.AuditRecordId,
@event.TenantId);
try
{
// Project event to read model (idempotent)
await _projector.ProjectAsync(@event, context.CancellationToken);
// Message automatically acknowledged on success
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to process AuditRecordAcceptedEvent: {AuditRecordId}",
@event.AuditRecordId);
// Exception causes retry or DLQ (based on retry policy)
throw;
}
}
}
-
Consumer Registration
-
Consumer Definitions (for custom endpoint configuration)
public class AuditAcceptedEventConsumerDefinition : ConsumerDefinition<AuditAcceptedEventConsumer> { protected override void ConfigureConsumer( IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<AuditAcceptedEventConsumer> consumerConfigurator) { endpointConfigurator.PrefetchCount = 32; endpointConfigurator.ConcurrentMessageLimit = 64; // Partition by tenant for ordered processing endpointConfigurator.UsePartitioner<AuditRecordAcceptedEvent>( 64, // partition count p => p.Message.TenantId.GetHashCode()); } } -
Idempotent Consumer Pattern
public async Task Consume(ConsumeContext<AuditRecordAcceptedEvent> context) { var @event = context.Message; // Check if already processed (inbox pattern) var alreadyProcessed = await _inboxRepository.ExistsAsync( @event.EventId, context.CancellationToken); if (alreadyProcessed) { _logger.LogInformation("Event {EventId} already processed, skipping", @event.EventId); return; // Acknowledge without reprocessing } // Process the event await _projector.ProjectAsync(@event, context.CancellationToken); // Record in inbox (deduplication) await _inboxRepository.RecordAsync( @event.EventId, @event.EventType, DateTime.UtcNow, context.CancellationToken); } -
Consumer Concurrency
- ConcurrentMessageLimit: Max parallel messages per consumer instance
- PrefetchCount: Messages fetched ahead from broker
- Partitioning: Distribute messages across consumer instances
Code Examples: - Complete consumer implementations: - AuditAcceptedEventConsumer (Projection service) - ProjectionUpdatedEventConsumer (Query service) - ExportRequestedConsumer (Export service) - PolicyChangedEventConsumer (All services) - Consumer definition with custom config - Idempotent consumer with inbox check - Partitioned consumer
Diagrams: - Consumer processing flow - Idempotent consumer pattern - Partitioned consumer architecture
Deliverables: - Consumer implementation guide - All ATP consumer implementations - Idempotency patterns - Concurrency tuning guide
Topic 10: Message Handler Patterns¶
What will be covered: - Handler Composition - Separate handlers for different responsibilities - Projector: Update read models - Notifier: Send notifications - Validator: Pre-processing validation
- Error Handling in Consumers
- Try/catch with logging
- Throw exceptions to trigger retry
- Return successfully for poison messages (after logging)
-
Custom fault handling
-
Consume Filters (Middleware)
public class TenantContextFilter<T> : IFilter<ConsumeContext<T>> where T : class { public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next) { // Extract tenant context from message if (context.Message is IMessageEnvelope envelope) { // Set tenant context for downstream processing TenantContext.Current = new TenantContext { TenantId = envelope.TenantId, Edition = envelope.Edition }; } await next.Send(context); // Cleanup TenantContext.Current = null; } public void Probe(ProbeContext context) { } } // Usage: cfg.UseConsumeFilter(typeof(TenantContextFilter<>), context); -
Scoped Consume Filter (logging, metrics)
public class LoggingFilter<T> : IFilter<ConsumeContext<T>> where T : class { private readonly ILogger _logger; public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next) { var stopwatch = Stopwatch.StartNew(); _logger.LogInformation("Consuming message {MessageType}: {MessageId}", typeof(T).Name, context.MessageId); try { await next.Send(context); stopwatch.Stop(); _logger.LogInformation("Consumed message {MessageType} in {Duration}ms", typeof(T).Name, stopwatch.ElapsedMilliseconds); } catch (Exception ex) { stopwatch.Stop(); _logger.LogError(ex, "Failed to consume message {MessageType} after {Duration}ms", typeof(T).Name, stopwatch.ElapsedMilliseconds); throw; } } } -
Tenant Isolation in Consumers
- Validate tenantId from message matches expected tenant
- Reject cross-tenant messages
- Apply tenant-scoped repository filters
Code Examples: - Handler composition pattern - Error handling strategies - Tenant context filter (complete) - Logging filter (complete) - Metrics filter - Tenant isolation validation
Diagrams: - Handler composition - Consume filter pipeline - Tenant context propagation
Deliverables: - Handler pattern catalog - Filter implementations - Tenant isolation guide
CYCLE 6: Saga State Machines (~4,000 lines)¶
Topic 11: Saga Pattern Fundamentals¶
What will be covered: - What is a Saga? - Long-running process (hours, days, weeks) - Coordinates multiple services/steps - Maintains state between steps - Compensation logic for failures (distributed transactions)
- Saga vs. Choreography
- Choreography: Services react to events (loosely coupled)
- Saga (Orchestration): Central coordinator directs workflow
-
ATP uses both: Choreography for simple flows, Saga for complex workflows
-
MassTransit Saga State Machines
- Built on Automatonymous library
- Define states and events
- Define transitions between states
-
Persisted state (survive service restarts)
-
ATP Saga Use Cases
1. Export Workflow Saga - Export requested → Package created → Segments retrieved → Package signed → Manifest generated → Export completed → Notification sent 2. Integrity Sealing Saga - Segment accumulated → Seal triggered → Hash computed → Merkle tree built → Signature generated → Block sealed → Verification published 3. Tenant Onboarding Saga (future) - Tenant created → Resources provisioned → Policies initialized → Admin notified → Onboarding completed -
Saga State
- CorrelationId: Unique saga instance identifier (Guid or string)
- CurrentState: State machine current state (string)
- Saga-specific data: Accumulates data across steps
-
Version: Optimistic concurrency control
-
Saga Events
- Initiating events: Start the saga
- Transition events: Move between states
- Fault events: Handle failures
- Completed events: Mark saga as complete
Code Examples: - Saga pattern comparison (choreography vs. orchestration) - Saga state interface - Saga event definition - Simple saga state machine outline
Diagrams: - Saga orchestration flow - Saga state machine diagram - Choreography vs. saga comparison
Deliverables: - Saga pattern overview - ATP saga use case catalog - State machine fundamentals
Topic 12: Saga State Machine Implementation¶
What will be covered: - Export Workflow Saga (Complete Example)
public class ExportWorkflowSaga : MassTransitStateMachine<ExportWorkflowState>
{
public ExportWorkflowSaga()
{
// Define instance state
InstanceState(x => x.CurrentState);
// Define events
Event(() => ExportRequested, x =>
{
x.CorrelateById(ctx => ctx.Message.ExportJobId);
x.InsertOnInitial = true;
x.SetSagaFactory(ctx => new ExportWorkflowState
{
CorrelationId = ctx.Message.ExportJobId,
TenantId = ctx.Message.TenantId,
// ...
});
});
Event(() => PackageCreated, x => x.CorrelateById(ctx => ctx.Message.ExportJobId));
Event(() => PackageSigned, x => x.CorrelateById(ctx => ctx.Message.ExportJobId));
Event(() => ExportFailed, x => x.CorrelateById(ctx => ctx.Message.ExportJobId));
// Define behavior
Initially(
When(ExportRequested)
.Then(ctx =>
{
ctx.Saga.TenantId = ctx.Message.TenantId;
ctx.Saga.StartedAt = DateTime.UtcNow;
// ... initialize saga state
})
.Publish(ctx => new CreateExportPackageCommand
{
ExportJobId = ctx.Saga.CorrelationId,
TenantId = ctx.Saga.TenantId,
// ...
})
.TransitionTo(CreatingPackage));
During(CreatingPackage,
When(PackageCreated)
.Then(ctx =>
{
ctx.Saga.PackageUrl = ctx.Message.PackageUrl;
ctx.Saga.RecordCount = ctx.Message.RecordCount;
})
.Publish(ctx => new SignExportPackageCommand
{
ExportJobId = ctx.Saga.CorrelationId,
PackageUrl = ctx.Saga.PackageUrl,
})
.TransitionTo(SigningPackage));
During(SigningPackage,
When(PackageSigned)
.Then(ctx =>
{
ctx.Saga.SignatureUrl = ctx.Message.SignatureUrl;
ctx.Saga.CompletedAt = DateTime.UtcNow;
})
.Publish(ctx => new ExportCompletedEvent
{
ExportJobId = ctx.Saga.CorrelationId,
TenantId = ctx.Saga.TenantId,
PackageUrl = ctx.Saga.PackageUrl,
SignatureUrl = ctx.Saga.SignatureUrl,
})
.Finalize());
// Fault handling
DuringAny(
When(ExportFailed)
.Then(ctx =>
{
ctx.Saga.ErrorMessage = ctx.Message.ErrorMessage;
ctx.Saga.FailedAt = DateTime.UtcNow;
})
.Publish(ctx => new ExportFailedEvent { /* ... */ })
.TransitionTo(Failed));
SetCompletedWhenFinalized();
}
public State CreatingPackage { get; private set; }
public State SigningPackage { get; private set; }
public State Failed { get; private set; }
public Event<ExportRequestedEvent> ExportRequested { get; private set; }
public Event<PackageCreatedEvent> PackageCreated { get; private set; }
public Event<PackageSignedEvent> PackageSigned { get; private set; }
public Event<ExportFailedEvent> ExportFailed { get; private set; }
}
public class ExportWorkflowState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public int Version { get; set; }
public string TenantId { get; set; }
public string ExportFilter { get; set; }
public string PackageUrl { get; set; }
public string SignatureUrl { get; set; }
public int RecordCount { get; set; }
public DateTime StartedAt { get; set; }
public DateTime? CompletedAt { get; set; }
public DateTime? FailedAt { get; set; }
public string ErrorMessage { get; set; }
}
- Saga State Persistence
- NHibernate repository (ATP default)
- Saga state entity mapping (FluentNHibernate)
- Optimistic concurrency (Version column)
-
Indexed on CorrelationId
-
Saga Compensation
- DuringAny + fault events
- Compensating transactions
- Rollback/undo logic
-
Error notifications
-
Saga Timeout Handling
- Schedule
for timeouts - RequestTimeout pattern
- Timeout events trigger compensation
Code Examples: - Complete ExportWorkflowSaga (above) - IntegritySealingSaga - Saga state entity mapping - Compensation logic example - Timeout handling example
Diagrams: - Export workflow saga state machine - Saga compensation flow - Saga persistence architecture
Deliverables: - Saga implementation guide - ATP saga implementations - State persistence setup - Compensation patterns
CYCLE 7: Outbox Pattern Implementation (~3,000 lines)¶
Topic 13: Transactional Outbox Pattern¶
What will be covered: - Why Outbox Pattern? - Atomic database write + message publish - Prevents dual-write problem (DB succeeds, message fails) - Guarantees at-least-once message delivery - Decouples message publishing from business transaction
-
Outbox Table Schema
CREATE TABLE ConnectSoft.Audit.Outbox ( OutboxId CHAR(26) PRIMARY KEY, -- ULID TenantId NVARCHAR(128) NOT NULL, EventType NVARCHAR(255) NOT NULL, -- Message type name EventPayload NVARCHAR(MAX) NOT NULL, -- JSON serialized message CreatedAt DATETIME2(3) NOT NULL, ProcessedAt DATETIME2(3) NULL, Status TINYINT NOT NULL, -- 0=Pending, 1=Processed, 2=Failed RetryCount INT NOT NULL DEFAULT 0, NextRetryAt DATETIME2(3) NULL, ErrorMessage NVARCHAR(MAX) NULL, CorrelationId CHAR(26) NULL, -- For tracing INDEX IX_Outbox_Status_CreatedAt (Status, CreatedAt), INDEX IX_Outbox_Status_NextRetryAt (Status, NextRetryAt) WHERE Status = 2 ); -
Atomic Write + Outbox Insert
public async Task<AuditRecord> AppendAuditRecordAsync( AppendCommand command, CancellationToken ct) { AuditRecord auditRecord = null; AuditRecordAcceptedEvent @event = null; await _unitOfWork.ExecuteTransactionalAsync(async () => { // 1. Business logic: Create audit record auditRecord = AuditRecord.Create(command); await _auditRecordRepository.InsertAsync(auditRecord, ct); // 2. Create domain event @event = new AuditRecordAcceptedEvent { EventId = Ulid.NewUlid().ToString(), AuditRecordId = auditRecord.AuditRecordId, TenantId = auditRecord.TenantId, // ... other properties }; // 3. Insert outbox entry (same transaction) var outboxMessage = new OutboxMessage { OutboxId = @event.EventId, TenantId = @event.TenantId, EventType = @event.EventType, EventPayload = JsonSerializer.Serialize(@event), CreatedAt = DateTime.UtcNow, Status = OutboxStatus.Pending, CorrelationId = auditRecord.CorrelationId }; await _outboxRepository.InsertAsync(outboxMessage, ct); }, ct); return auditRecord; // Note: Message not yet published, but guaranteed to be published // by outbox processor background worker } -
Outbox Processor Background Worker
public class OutboxProcessor : BackgroundService { private readonly IServiceProvider _serviceProvider; private readonly ILogger<OutboxProcessor> _logger; protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { try { await ProcessPendingMessagesAsync(stoppingToken); await ProcessRetryMessagesAsync(stoppingToken); // Poll every 5 seconds await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); } catch (Exception ex) { _logger.LogError(ex, "Error in outbox processor"); } } } private async Task ProcessPendingMessagesAsync(CancellationToken ct) { using var scope = _serviceProvider.CreateScope(); var outboxRepository = scope.ServiceProvider.GetRequiredService<IOutboxRepository>(); var publishEndpoint = scope.ServiceProvider.GetRequiredService<IPublishEndpoint>(); // Fetch pending messages (batch of 100) var pendingMessages = await outboxRepository.GetPendingAsync(100, ct); foreach (var outboxMessage in pendingMessages) { try { // Deserialize and publish var @event = DeserializeEvent(outboxMessage.EventType, outboxMessage.EventPayload); await publishEndpoint.Publish(@event, ctx => { ctx.MessageId = Guid.Parse(outboxMessage.OutboxId); ctx.CorrelationId = Guid.Parse(outboxMessage.CorrelationId); }, ct); // Mark as processed outboxMessage.Status = OutboxStatus.Processed; outboxMessage.ProcessedAt = DateTime.UtcNow; await outboxRepository.UpdateAsync(outboxMessage, ct); } catch (Exception ex) { _logger.LogError(ex, "Failed to publish outbox message {OutboxId}", outboxMessage.OutboxId); // Mark as failed, schedule retry outboxMessage.Status = OutboxStatus.Failed; outboxMessage.RetryCount++; outboxMessage.NextRetryAt = DateTime.UtcNow.AddSeconds( Math.Pow(2, outboxMessage.RetryCount)); // Exponential backoff outboxMessage.ErrorMessage = ex.Message; await outboxRepository.UpdateAsync(outboxMessage, ct); } } } } -
Outbox Cleanup
- Periodically delete processed messages (after N days)
- Archive to cold storage for compliance (optional)
- Monitor outbox growth
Code Examples: - Outbox table DDL (complete) - OutboxMessage entity and mapping - Atomic write + outbox insert (complete) - OutboxProcessor background worker (complete) - Outbox cleanup job - Outbox monitoring queries
Diagrams: - Outbox pattern sequence diagram - Background processor flow - Retry with exponential backoff
Deliverables: - Outbox pattern implementation (complete) - Background processor setup - Cleanup procedures - Monitoring queries
Topic 14: Outbox Optimization & Monitoring¶
What will be covered: - Batch Publishing - Fetch multiple pending messages - Publish in batch (if transport supports) - Reduce round-trips
- Partitioned Outbox Processing
- Multiple worker instances
- Partition by TenantId hash
-
Avoid processing same message twice (use row locking)
-
Outbox Monitoring
- Metrics:
- Pending message count
- Failed message count
- Average age of pending messages
- Processing throughput (messages/sec)
-
Alerts:
- Pending count > threshold (e.g., 1000)
- Average age > threshold (e.g., 1 minute)
- Failed count increasing
-
Outbox Troubleshooting
- Query stuck messages
- Replay failed messages manually
- Investigate error patterns
Code Examples: - Batch publishing optimization - Partitioned processor (multiple workers) - Outbox monitoring query (Kusto/KQL) - Alert rule definitions - Troubleshooting queries
Diagrams: - Partitioned outbox processing - Monitoring dashboard layout
Deliverables: - Optimization techniques - Monitoring setup - Troubleshooting guide
CYCLE 8: Inbox Pattern & Idempotency (~3,000 lines)¶
Topic 15: Inbox Pattern for Idempotent Consumers¶
What will be covered: - Why Inbox Pattern? - At-least-once delivery means duplicates possible - Consumers must be idempotent (process once, ignore duplicates) - Inbox table tracks processed messages - Prevents duplicate processing
-
Inbox Table Schema
CREATE TABLE ConnectSoft.Audit.Inbox ( InboxId CHAR(26) PRIMARY KEY, -- EventId from message ConsumerName NVARCHAR(255) NOT NULL, -- Consumer identifier EventType NVARCHAR(255) NOT NULL, TenantId NVARCHAR(128) NOT NULL, ProcessedAt DATETIME2(3) NOT NULL, CorrelationId CHAR(26) NULL, UNIQUE (InboxId, ConsumerName) -- Composite uniqueness ); -- Cleanup old entries (after N days) CREATE INDEX IX_Inbox_ProcessedAt ON ConnectSoft.Audit.Inbox (ProcessedAt); -
Idempotent Consumer with Inbox Check
public class AuditAcceptedEventConsumer : IConsumer<AuditRecordAcceptedEvent> { private readonly IAuditEventProjector _projector; private readonly IInboxRepository _inboxRepository; private readonly IUnitOfWork _unitOfWork; public async Task Consume(ConsumeContext<AuditRecordAcceptedEvent> context) { var @event = context.Message; var consumerName = nameof(AuditAcceptedEventConsumer); // Check inbox (already processed?) var alreadyProcessed = await _inboxRepository.ExistsAsync( @event.EventId, consumerName, context.CancellationToken); if (alreadyProcessed) { _logger.LogInformation( "Event {EventId} already processed by {Consumer}, skipping", @event.EventId, consumerName); return; // Acknowledge without reprocessing } // Process + record inbox (atomically) await _unitOfWork.ExecuteTransactionalAsync(async () => { // 1. Process the event await _projector.ProjectAsync(@event, context.CancellationToken); // 2. Record in inbox (same transaction) var inboxEntry = new InboxEntry { InboxId = @event.EventId, ConsumerName = consumerName, EventType = @event.EventType, TenantId = @event.TenantId, ProcessedAt = DateTime.UtcNow, CorrelationId = @event.Correlation.CorrelationId }; await _inboxRepository.InsertAsync(inboxEntry, context.CancellationToken); }, context.CancellationToken); } } -
Inbox Cleanup
- Periodically delete old entries (after N days, e.g., 30 days)
- Scheduled job or background worker
-
Keep inbox size manageable
-
Alternative: Natural Idempotency
- Some operations are naturally idempotent (upsert by key)
- Projection update: Upsert by (TenantId, AuditRecordId)
- No inbox needed if operation is truly idempotent
Code Examples: - Inbox table DDL - InboxEntry entity and mapping - Idempotent consumer with inbox check (complete) - Inbox cleanup job - Natural idempotency example (upsert)
Diagrams: - Inbox pattern sequence diagram - Idempotency decision tree
Deliverables: - Inbox pattern implementation - Idempotency strategies - Cleanup procedures
Topic 16: Idempotency Keys & Deduplication¶
What will be covered:
- Idempotency Key in Messages
- Client-provided or generated
- Format: {tenantId}:{sourceId}:{sequence} or ULID
- Included in message envelope
- Used for deduplication across retries
- Deduplication Strategies
- Message-Level: MessageId-based (Azure Service Bus duplicate detection)
- Consumer-Level: Inbox pattern (per-consumer deduplication)
-
Business-Level: Natural key uniqueness (e.g., AuditRecordId)
-
Azure Service Bus Duplicate Detection
- Enable on topic/queue (1-10 minute window)
- MessageId used as deduplication key
- Duplicate messages auto-dropped by broker
-
Use ULID as MessageId for time-ordered uniqueness
-
Idempotency in Projections
- Upsert by (TenantId, AuditRecordId)
- Last-write-wins if duplicate (same result)
-
Deterministic projections (same input → same output)
-
Idempotency Testing
- Replay same message twice
- Verify no duplicate side effects
- Verify inbox prevents reprocessing
- Load testing with intentional duplicates
Code Examples: - Idempotency key generation - Azure Service Bus duplicate detection configuration - Upsert-based idempotent operation - Idempotency test scenario
Diagrams: - Deduplication strategy layers - Idempotency key flow
Deliverables: - Idempotency strategy guide - Deduplication configuration - Testing scenarios
CYCLE 9: Dead-Letter Queue (DLQ) Handling (~2,500 lines)¶
Topic 17: DLQ Architecture & Monitoring¶
What will be covered: - What is a Dead-Letter Queue? - Messages that fail processing after max retries - Automatically moved to DLQ by Azure Service Bus - Preserves original message and metadata - Requires manual intervention (triage, fix, replay)
- ATP DLQ Strategy
- Automatic DLQ per subscription/queue
- MaxDeliveryCount: 5-10 attempts before DLQ
- Lock Duration: 60-120 seconds per attempt
-
DLQ Monitoring: Alert when messages land in DLQ
-
DLQ Message Metadata
- DeadLetterReason: Why message failed (e.g., "MaxDeliveryCountExceeded")
- DeadLetterErrorDescription: Error details
- EnqueuedTimeUtc: When originally sent
- DeadLetterTimeUtc: When moved to DLQ
- DeliveryCount: How many times attempted
-
Original message body and headers
-
DLQ Monitoring Dashboard
- Metrics:
- DLQ message count per queue/subscription
- DLQ growth rate
- Average age of DLQ messages
- DLQ message distribution by error type
-
Alerts:
- Any message in DLQ (immediate)
- DLQ count > threshold (e.g., 10)
- DLQ messages older than N hours
-
DLQ Triage Workflow
- Alert fires (message in DLQ)
- SRE reviews DLQ message details
- Classify error: Transient vs. permanent
- Fix root cause (if code bug, deploy fix)
- Replay message (manual or automated)
- Verify success (message processed, removed from DLQ)
Code Examples: - DLQ monitoring query (Azure Service Bus SDK) - DLQ alert rule (Azure Monitor) - DLQ message inspection code - Error classification logic
Diagrams: - DLQ architecture (automatic movement) - DLQ monitoring dashboard - Triage workflow
Deliverables: - DLQ strategy document - Monitoring setup - Triage procedures
Topic 18: DLQ Replay & Recovery¶
What will be covered: - Manual DLQ Replay - Azure Portal: Move message back to queue/topic - Service Bus Explorer tool - PowerShell/CLI scripts
- Automated DLQ Replay
- DLQ replay consumer
- Read from DLQ
- Re-publish to original queue/topic
-
Configurable replay strategies (all, filtered, one-by-one)
-
DLQ Replay Consumer Implementation
public class DlqReplayConsumer : IConsumer<ReplayDlqCommand> { private readonly ServiceBusAdministrationClient _adminClient; private readonly IPublishEndpoint _publishEndpoint; public async Task Consume(ConsumeContext<ReplayDlqCommand> context) { var command = context.Message; // Read messages from DLQ var receiver = _serviceBusClient.CreateReceiver( command.QueueName, new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter }); var messages = await receiver.ReceiveMessagesAsync( maxMessages: command.BatchSize ?? 10, maxWaitTime: TimeSpan.FromSeconds(5)); foreach (var message in messages) { try { // Deserialize original message var originalMessage = DeserializeMessage(message); // Re-publish to original destination await _publishEndpoint.Publish(originalMessage, context.CancellationToken); // Complete (remove from DLQ) await receiver.CompleteMessageAsync(message); _logger.LogInformation("Replayed message {MessageId} from DLQ", message.MessageId); } catch (Exception ex) { _logger.LogError(ex, "Failed to replay message {MessageId}", message.MessageId); // Abandon (leave in DLQ for manual review) await receiver.AbandonMessageAsync(message); } } } } -
Selective Replay
- Filter DLQ messages by tenant, error type, time range
- Replay only specific messages
-
Dry-run mode (validate without publishing)
-
DLQ Purge
- Permanently delete poison messages
- Only after root cause analysis
-
Requires approval (audit trail)
-
Prevent DLQ Recurrence
- Identify common failure patterns
- Fix bugs in consumer code
- Add validation to prevent bad messages
- Monitor for similar issues
Code Examples: - DLQ replay consumer (complete) - Manual replay script (PowerShell/CLI) - Selective replay with filters - DLQ purge script (with approval) - Failure pattern analysis query
Diagrams: - DLQ replay sequence - Automated replay architecture - Selective replay flow
Deliverables: - DLQ replay implementation - Manual replay scripts - Purge procedures - Prevention strategies
CYCLE 10: Retry Policies & Circuit Breakers (~2,500 lines)¶
Topic 19: Retry Policies¶
What will be covered: - MassTransit Retry Middleware
cfg.AddConsumer<AuditAcceptedEventConsumer>(consumerCfg =>
{
consumerCfg.UseMessageRetry(retry =>
{
// Incremental retry: 1s, 2s, 4s, 8s, 16s
retry.Incremental(5, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2));
// OR Interval retry: Fixed 5-second intervals
// retry.Interval(3, TimeSpan.FromSeconds(5));
// OR Exponential retry: 1s, 2s, 4s, 8s, 16s, 32s
// retry.Exponential(6, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(32), TimeSpan.FromSeconds(1));
// Ignore specific exceptions (don't retry)
retry.Ignore<ValidationException>();
retry.Ignore<ArgumentException>();
// Handle specific exceptions differently
retry.Handle<SqlException>(ex => ex.Number == -2); // Timeout
});
});
- Retry Strategies
- Immediate Retry: Retry instantly (for very transient errors)
- Interval Retry: Fixed delay between retries
- Incremental Retry: Increasing delay (1s, 3s, 5s)
-
Exponential Backoff: Exponentially increasing delay (1s, 2s, 4s, 8s)
-
ATP Retry Policy by Consumer
Consumer | Retry Strategy | Max Attempts | Backoff ------------------------------|---------------------|--------------|------------------ AuditAcceptedEventConsumer | Exponential | 5 | 1s → 32s ProjectionUpdatedConsumer | Incremental | 3 | 2s → 10s ExportRequestedConsumer | Interval | 5 | 10s (fixed) PolicyChangedEventConsumer | Immediate | 2 | None IntegrityVerifiedConsumer | Exponential | 5 | 1s → 32s -
Exception Handling
- Transient Exceptions: Retry (e.g., SqlException timeout)
- Permanent Exceptions: Don't retry (e.g., ValidationException)
-
Poison Messages: Log and move to DLQ after max retries
-
Jitter for Thundering Herd
- Add randomness to backoff delay
- Prevent all retries happening at same time
- Reduce load spikes on downstream services
Code Examples: - Retry policy configurations (all strategies) - Exception filtering (ignore, handle) - Jitter implementation - Retry metrics tracking
Diagrams: - Retry strategy comparison - Exponential backoff timeline - Jitter effect visualization
Deliverables: - Retry policy guide - ATP retry configurations - Exception handling strategy
Topic 20: Circuit Breakers & Rate Limiting¶
What will be covered: - Circuit Breaker Pattern - Protect downstream services from overload - States: Closed (normal), Open (failing), Half-Open (testing recovery) - Threshold: N failures in M seconds → Open circuit - Recovery: After timeout, allow test requests (Half-Open)
-
MassTransit Circuit Breaker (Consumer-Level)
cfg.AddConsumer<ExternalApiConsumer>(consumerCfg => { consumerCfg.UseCircuitBreaker(cb => { cb.TrackingPeriod = TimeSpan.FromMinutes(1); cb.TripThreshold = 15; // Open after 15 failures cb.ActiveThreshold = 10; // Min messages before tracking cb.ResetInterval = TimeSpan.FromMinutes(5); // Half-open after 5 min }); }); -
Rate Limiting (Consumer Protection)
-
Concurrency Limiting
-
ATP Protection Strategy
- Circuit breakers on external dependencies (e.g., third-party APIs)
- Rate limiting on high-volume consumers
- Concurrency limiting on database-intensive consumers
- Back-pressure signals (reject messages when overwhelmed)
Code Examples: - Circuit breaker configuration (complete) - Rate limiting configuration - Concurrency limiting configuration - Circuit breaker state monitoring - Back-pressure implementation
Diagrams: - Circuit breaker state machine - Rate limiting flow - Back-pressure architecture
Deliverables: - Circuit breaker implementation - Rate limiting guide - Protection strategy document
CYCLE 11: Multi-Tenant Message Isolation (~3,000 lines)¶
Topic 21: Tenant Context in Messages¶
What will be covered: - TenantId in Every Message - Required field in message envelope - Extracted at Gateway (from JWT) - Propagated across all message flows - Validated at each consumer
-
Tenant Context Middleware
public class TenantContextMiddleware<T> : IFilter<ConsumeContext<T>> where T : class { public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next) { // Extract tenant context from message var tenantId = context.Message is IMessageEnvelope envelope ? envelope.TenantId : context.Headers.Get<string>("x-tenant-id"); if (string.IsNullOrEmpty(tenantId)) { throw new InvalidOperationException("TenantId missing from message"); } // Set in AsyncLocal for downstream access TenantContext.Current = new TenantContext { TenantId = tenantId, Edition = context.Headers.Get<string>("x-edition") }; try { await next.Send(context); } finally { TenantContext.Current = null; } } } -
Partition Key = TenantId
- Azure Service Bus partitioning by tenant
- Load distribution across brokers
- Ordered processing within tenant
-
Hot partition detection and mitigation
-
Subscription Filters (Tenant-Specific)
- SQL filter:
TenantId = 'acme' - Use case: Dedicated subscription for high-volume tenant
- Use case: Region-specific subscriptions
Code Examples: - Tenant context middleware (complete) - Partition key configuration - Subscription filter (tenant-specific) - Tenant validation in consumer
Diagrams: - Tenant context propagation across messages - Partition key distribution - Subscription filter architecture
Deliverables: - Tenant isolation implementation - Partition strategy - Filter configuration guide
Topic 22: Cross-Tenant Message Prevention¶
What will be covered: - Validation Rules - Consumer must validate message tenantId - Reject messages for unexpected tenants - Log suspicious cross-tenant messages
- Repository Tenant Scoping
- All repository operations scoped to message's tenantId
- Prevent accidental cross-tenant writes
-
NHibernate filter applied automatically
-
Audit Logging
- Log every message consumption with tenantId
- Detect anomalies (message for wrong tenant)
-
Alert on potential security issues
-
Testing Cross-Tenant Isolation
- Publish message with tenantId A
- Verify consumer for tenant B rejects it
- Integration tests for all consumers
Code Examples: - Tenant validation in consumer - Repository scoping (NHibernate filter) - Cross-tenant audit log - Isolation test scenario
Diagrams: - Cross-tenant prevention flow - Repository tenant scoping
Deliverables: - Isolation enforcement guide - Validation patterns - Testing procedures
CYCLE 12: Message Routing & Topology (~2,500 lines)¶
Topic 23: Message Routing Patterns¶
What will be covered: - Topic-Based Routing (Pub/Sub) - Publish to topic - Multiple subscriptions (fanout) - Subscription filters for selective delivery
- Direct Routing (Queue)
- Send to specific queue
- Single consumer (or competing consumers)
-
Use case: Commands, requests
-
Content-Based Routing
- Route based on message properties
- Subscription SQL filters
-
Use case: Route by classification, edition, region
-
MassTransit Send Topology
// Publish (fanout to all subscriptions) await endpoint.Publish(new AuditRecordAcceptedEvent { ... }); // Send (direct to specific queue) await endpoint.Send(new RebuildProjectionCommand { ... }); // Send to specific endpoint var endpoint = await bus.GetSendEndpoint(new Uri("queue:atp-export-package")); await endpoint.Send(new CreateExportPackageCommand { ... }); -
Request/Response Pattern
// Request var client = bus.CreateRequestClient<EvaluatePolicyRequest>(); var response = await client.GetResponse<PolicyDecisionResponse>(new EvaluatePolicyRequest { TenantId = tenantId, EventPayload = payload }); // Handler (responds) public class PolicyEvaluationConsumer : IConsumer<EvaluatePolicyRequest> { public async Task Consume(ConsumeContext<EvaluatePolicyRequest> context) { var decision = await _policyEngine.EvaluateAsync(context.Message); await context.RespondAsync(new PolicyDecisionResponse { Classification = decision.Classification, RetentionPolicyId = decision.RetentionPolicyId, }); } }
Code Examples: - Publish vs. Send usage - Content-based routing (subscription filters) - Request/response pattern (complete) - Direct endpoint addressing
Diagrams: - Routing pattern comparison - Topic fanout vs. queue direct - Request/response flow
Deliverables: - Routing pattern guide - Topology design decisions - Request/response implementation
Topic 24: Message Serialization & Formatting¶
What will be covered: - MassTransit Serialization - Default: System.Text.Json (JSON) - Alternative: Newtonsoft.Json - Binary: MessagePack (for performance)
-
JSON Serialization Configuration
cfg.UsingAzureServiceBus((context, config) => { config.ConfigureJsonSerializerOptions(options => { options.PropertyNamingPolicy = JsonNamingPolicy.CamelCase; options.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull; options.Converters.Add(new JsonStringEnumConverter()); }); // ... other configuration }); -
Message Envelope Standards
- Content-Type header:
application/json; charset=utf-8 - Message body: JSON with camelCase properties
-
Headers: Custom ATP headers (x-tenant-id, etc.)
-
Encryption (if needed)
- Encrypt sensitive payload fields
- Use envelope encryption (DEK wrapped by KEK)
-
Decrypt in consumer
-
Compression (for large messages)
- GZIP compression
- Reduce message size
- Automatic decompression in consumer
Code Examples: - JSON serialization configuration - Custom serializer (if needed) - Encryption/decryption in publish/consume - Compression configuration
Diagrams: - Serialization pipeline - Encryption envelope
Deliverables: - Serialization configuration - Encryption patterns - Compression strategies
CYCLE 13: Observability & Monitoring (~3,000 lines)¶
Topic 25: Message Tracing & Correlation¶
What will be covered:
- W3C Trace Context Propagation
- Traceparent header: 00-{traceId}-{spanId}-{flags}
- Tracestate header: Vendor-specific context
- MassTransit automatic trace propagation
- Correlation IDs
- CorrelationId: Request identifier (entire workflow)
- ConversationId: Message conversation (request + responses)
- InitiatorId: Saga that started workflow
-
CausationId: Event that caused this event (parent event ID)
-
OpenTelemetry Integration
- MassTransit built-in OTel support
- Automatic span creation for publish/consume
- Span attributes: message type, tenant, queue name
-
Distributed trace across services
-
ATP Trace Visualization
- Trace: API request → Ingestion → Outbox → Service Bus → Projection → Read Model
- Identify bottlenecks (slow consumers)
- Identify failures (error spans)
- End-to-end latency measurement
Code Examples: - OTel configuration for MassTransit - Custom span attributes - Correlation ID propagation - Trace query (Application Insights)
Diagrams: - Distributed trace visualization - Correlation ID flow - Span hierarchy
Deliverables: - Tracing configuration - Correlation strategy - Trace analysis guide
Topic 26: Messaging Metrics & Dashboards¶
What will be covered: - MassTransit Metrics - Publish Metrics: - Messages published per second - Publish latency (time to send to broker) - Publish failures - Consume Metrics: - Messages consumed per second - Consume latency (time to process message) - Consumer exceptions - Retry count - DLQ count - Saga Metrics: - Active saga instances - Saga completion rate - Saga timeout rate
- Azure Service Bus Metrics
- Queue/topic message count (active, DLQ, scheduled)
- Incoming/outgoing messages per second
- Throttled requests
- Server errors
-
Message size distribution
-
ATP Messaging Dashboard
- Overview Panel: Total throughput, error rate, DLQ count
- Per-Topic Panel: Message count, publish rate, consumer lag
- Per-Consumer Panel: Consume rate, latency, error rate
- Saga Panel: Active instances, completion rate, timeout rate
-
Outbox Panel: Pending count, age, failed count
-
Alerting Rules
- DLQ count > 0 (immediate alert)
- Consumer lag > 30 seconds
- Publish failures > 1% error rate
- Outbox age > 1 minute (stuck messages)
- Queue depth > 10,000 messages
Code Examples: - MassTransit metrics configuration - Custom metric collection - Dashboard query (Kusto/KQL) - Alert rule definitions (Azure Monitor)
Diagrams: - Metrics collection architecture - Dashboard layout mockup - Alert flow
Deliverables: - Metrics catalog - Dashboard templates - Alert rules
CYCLE 14: Testing Strategies (~2,500 lines)¶
Topic 27: Unit Testing Messaging Components¶
What will be covered: - In-Memory Test Harness
[Test]
public async Task Consumer_Should_Process_Event()
{
// Arrange
await using var provider = new ServiceCollection()
.AddMassTransitTestHarness(cfg =>
{
cfg.AddConsumer<AuditAcceptedEventConsumer>();
})
.BuildServiceProvider(true);
var harness = provider.GetRequiredService<ITestHarness>();
await harness.Start();
// Act
await harness.Bus.Publish(new AuditRecordAcceptedEvent
{
EventId = Ulid.NewUlid().ToString(),
TenantId = "test-tenant",
AuditRecordId = "01JE...",
});
// Assert
Assert.IsTrue(await harness.Consumed.Any<AuditRecordAcceptedEvent>());
var consumerHarness = harness.GetConsumerHarness<AuditAcceptedEventConsumer>();
Assert.IsTrue(await consumerHarness.Consumed.Any<AuditRecordAcceptedEvent>());
}
- Testing Consumers
- Mock dependencies (repositories, services)
- Verify message consumed
- Verify side effects (database writes, event publishes)
-
Verify error handling
-
Testing Publishers
- Verify message published
- Verify message content
-
Verify headers and metadata
-
Testing Sagas
- Simulate event sequences
- Verify state transitions
- Verify compensation logic
- Test timeouts
Code Examples: - Test harness setup (complete) - Consumer unit test - Publisher unit test - Saga unit test - Mock dependencies setup
Diagrams: - Test harness architecture - Testing pyramid (unit, integration, E2E)
Deliverables: - Unit test suite - Test harness patterns - Mock strategies
Topic 28: Integration & Contract Testing¶
What will be covered: - Integration Testing with Real Transport - Docker container for Azure Service Bus emulator (Azurite) - Or connect to dedicated test namespace - End-to-end message flow
- Contract Testing
- Verify publisher produces expected schema
- Verify consumer accepts expected schema
- Pact or similar contract testing framework
-
Schema evolution compatibility tests
-
Performance Testing
- Load testing with high message volumes
- Measure throughput (messages/sec)
- Measure latency (end-to-end)
-
Identify bottlenecks
-
Chaos Testing
- Simulate broker failures
- Simulate slow consumers
- Simulate message loss (DLQ scenarios)
- Verify resilience
Code Examples: - Integration test with real Service Bus - Contract test (publisher vs. consumer schema) - Load test script (JMeter, NBomber) - Chaos test scenarios
Diagrams: - Integration test architecture - Contract testing flow - Load test setup
Deliverables: - Integration test suite - Contract test suite - Performance benchmarks - Chaos test scenarios
CYCLE 15: Performance & Scalability (~2,500 lines)¶
Topic 29: Message Throughput Optimization¶
What will be covered: - Batching - Publish multiple messages in batch - Consume multiple messages in batch - Reduce round-trips to broker
- Prefetching
- PrefetchCount: Fetch N messages ahead
- Reduces latency (messages ready to process)
-
Tune based on message processing time
-
Concurrent Processing
- ConcurrentMessageLimit per consumer
- Parallel message processing
-
CPU and memory considerations
-
Partitioning for Scale
- Partition topics/queues by tenantId
- Distribute load across multiple brokers
-
Scale out consumers (multiple instances per partition)
-
Premium Tier Optimization
- Dedicated messaging units
- Higher throughput limits
- Lower latency
- Geo-replication (optional)
Code Examples: - Batch publishing - Batch consuming - Prefetch configuration - Concurrent processing tuning - Partitioning optimization
Diagrams: - Batching performance improvement - Concurrent processing architecture - Partitioning scale-out
Deliverables: - Performance optimization guide - Tuning parameters - Scalability patterns
Topic 30: Scalability Patterns¶
What will be covered: - Horizontal Scaling (Consumer Scale-Out) - Deploy multiple consumer instances - Automatic load distribution (competing consumers) - Kubernetes HorizontalPodAutoscaler (HPA) based on queue depth
- Vertical Scaling (Consumer Resources)
- Increase CPU/memory per pod
-
Handle larger message volumes per instance
-
Auto-Scaling Triggers
- Queue depth > threshold → Scale out
- CPU > 70% → Scale up
-
Queue depth near zero → Scale in (after cooldown)
-
KEDA (Kubernetes Event-Driven Autoscaling)
apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: projection-consumer-scaler spec: scaleTargetRef: name: projection-consumer minReplicaCount: 2 maxReplicaCount: 20 triggers: - type: azure-servicebus metadata: queueName: audit-accepted-event namespace: atp-prod messageCount: "50" # Scale up if > 50 messages pending -
Backlog Management
- Monitor queue backlog depth
- Alert if backlog grows beyond threshold
- Scale consumers proactively
- Throttle producers if needed (back-pressure)
Code Examples: - Kubernetes HPA configuration - KEDA ScaledObject (complete) - Auto-scaling trigger logic - Backlog monitoring query
Diagrams: - Horizontal scaling with HPA - KEDA auto-scaling architecture - Backlog management flow
Deliverables: - Scalability design - Auto-scaling configuration - Backlog management procedures
CYCLE 16: Operations & Best Practices (~3,000 lines)¶
Topic 31: Operational Runbooks¶
What will be covered: - Routine Operations - Daily Health Checks - Verify consumers running - Check queue depths - Review DLQ (should be empty) - Check outbox age - Weekly Maintenance - Review performance metrics - Optimize slow consumers - Clean up old inbox/outbox entries - Update retry policies if needed - Monthly Reviews - Capacity planning (queue growth) - Cost optimization (right-size messaging units) - Schema evolution planning
- Incident Response
- Consumer Down
- Alert fires (no consumption for N minutes)
- Check consumer pod status (Kubernetes)
- Review logs for errors
- Restart consumer if needed
- Verify backlog starts decreasing
- DLQ Messages
- Alert fires (message in DLQ)
- Review DLQ message details
- Classify error (transient vs. permanent)
- Fix root cause (code bug, configuration)
- Replay message
-
Broker Outage
- Alert fires (Service Bus unavailable)
- Verify Azure Service Bus status
- Activate outbox pattern (messages queued locally)
- Wait for broker recovery
- Outbox processor publishes queued messages
-
Message Replay Procedures
- Replay from DLQ (after fixing issue)
- Replay from event store (rebuild projections)
- Replay with filters (tenant, time range)
-
Dry-run mode (validate before live replay)
-
Emergency Procedures
- Pause consumer (stop processing)
- Purge queue (emergency, requires approval)
- Throttle producer (back-pressure)
- Circuit breaker manual open (protect downstream)
Code Examples: - Health check queries - Consumer restart script (Kubernetes) - DLQ triage script - Message replay script - Emergency pause procedure
Diagrams: - Operational workflow - Incident response flowchart - Replay procedure sequence
Deliverables: - Operations manual - Runbook templates - Incident response procedures - Emergency protocols
Topic 32: Best Practices & Anti-Patterns¶
What will be covered: - Messaging Best Practices - ✅ Design idempotent consumers (use inbox pattern or natural idempotency) - ✅ Use transactional outbox (atomic DB + message publishing) - ✅ Partition by tenantId (load distribution, ordered processing) - ✅ Include correlation IDs (distributed tracing) - ✅ Version messages (schema evolution) - ✅ Monitor DLQ (alert immediately on any message) - ✅ Retry with exponential backoff (avoid thundering herd) - ✅ Use circuit breakers (protect downstream services) - ✅ Validate messages (reject invalid messages early) - ✅ Test message flows (unit, integration, contract tests) - ✅ Document message contracts (AsyncAPI specs) - ✅ Secure with Managed Identity (no connection strings in code)
- Anti-Patterns to Avoid
- ❌ Non-idempotent consumers (duplicate processing causes errors)
- ❌ Direct Service Bus SDK (bypasses MassTransit abstractions)
- ❌ Synchronous request/response over messaging (defeats async purpose)
- ❌ Large message payloads (use claim check pattern instead)
- ❌ No retry policy (transient failures cause DLQ)
- ❌ Ignoring DLQ (poison messages accumulate)
- ❌ No correlation IDs (impossible to debug distributed flows)
- ❌ Hardcoded queue names (use MassTransit conventions)
- ❌ Missing tenant context (cross-tenant data leaks)
-
❌ Blocking operations in consumers (starves thread pool)
-
Code Review Checklist
- Consumer is idempotent (inbox or natural idempotency)
- Outbox pattern used for critical events
- Retry policy configured appropriately
- Circuit breaker on external dependencies
- TenantId validated in every consumer
- Correlation IDs propagated
- Async/await throughout (no blocking)
- Message schema versioned
- DLQ monitoring configured
- Unit tests for consumers
- Integration tests for message flows
-
Observability (logs, metrics, traces)
-
Performance Checklist
- PrefetchCount tuned (32-256)
- ConcurrentMessageLimit tuned (64-512)
- Batch operations where applicable
- No SELECT N+1 in consumer (use eager loading)
- Circuit breaker protects slow dependencies
- Partitioning enabled for high throughput
- Premium tier for production (dedicated capacity)
- Auto-scaling configured (KEDA)
Code Examples: - Idempotent consumer (good example) - Non-idempotent consumer (bad example) - Outbox pattern (good) - Direct publish without outbox (bad) - Async consumer (good) - Blocking consumer (bad)
Diagrams: - Best practices reference architecture - Anti-patterns to avoid (before/after)
Deliverables: - Best practices handbook - Anti-pattern catalog - Code review checklist - Performance checklist
Summary of Deliverables¶
Across all 16 cycles, this documentation will provide:
- Architecture & Patterns
- Event-driven architecture (EDA) overview
- Messaging patterns (pub/sub, command, saga)
- Message taxonomy and contracts
-
Delivery guarantees and semantics
-
MassTransit Implementation
- Configuration and DI setup
- Consumer registration patterns
- Publisher patterns
-
Saga state machine implementations
-
Azure Service Bus Integration
- Topology design (topics, queues, subscriptions, DLQ)
- Security (Managed Identity, RBAC, private endpoints)
- Performance tuning
-
Monitoring and diagnostics
-
Message Contracts
- ATP event/command definitions
- Message envelope specifications
- Schema versioning strategy
-
Validation rules
-
Consumers & Handlers
- Consumer implementations for all ATP events
- Idempotent consumer patterns (inbox)
- Message filters and middleware
-
Error handling strategies
-
Saga Orchestration
- Export workflow saga
- Integrity sealing saga
- Saga state persistence (NHibernate)
-
Compensation patterns
-
Reliability Patterns
- Transactional outbox implementation
- Inbox pattern for deduplication
- DLQ handling and replay procedures
-
Retry policies and circuit breakers
-
Multi-Tenant Isolation
- Tenant context propagation in messages
- Partition key strategies
- Cross-tenant prevention
-
Subscription filters
-
Observability
- Distributed tracing (OpenTelemetry)
- Messaging metrics and dashboards
- Alerting rules
-
Troubleshooting guides
-
Testing
- Unit tests with test harness
- Integration tests with real transport
- Contract tests for schema compatibility
- Performance and chaos testing
-
Operations
- Operational runbooks
- Incident response procedures
- Best practices and anti-patterns
- Code review checklists
Next Steps¶
- Review & Approval: Validate cycle plan with architecture and platform teams
- Cycle 1 Generation: Begin content generation for messaging architecture overview
- Message Contracts: Define all ATP event and command classes
- Consumer Templates: Create consumer implementation templates
- Saga Implementations: Develop saga state machines for ATP workflows
- Test Suite: Build messaging test suite with test harness
Related Documentation¶
- Architecture Overview: Event-driven communication plan
- Events Catalog: All ATP domain events
- Message Schemas: AsyncAPI specifications
- Outbox/Inbox Pattern: Detailed outbox/inbox implementation
- Idempotency: Idempotency patterns
- Persistence: NHibernate repository and Unit of Work patterns
- Observability: Distributed tracing and monitoring
- Pulumi: Azure Service Bus provisioning
- Testing Strategy: Overall testing approach
This documentation plan covers the complete messaging implementation for ATP, from event-driven architecture and MassTransit configuration to saga orchestration, reliability patterns, and operational excellence, fully leveraging MassTransit with Azure Service Bus and ConnectSoft.Extensions.MessagingModel libraries.