Low-Level Design (LLD) - Audit Trail Platform¶
Purpose & Scope¶
Purpose: Detailed low-level design documentation for ATP components, covering class designs, interfaces, data access patterns, service interactions, algorithms, and implementation specifics that guide developers during implementation.
Scope: This document covers:
- Component-Level Design: Detailed class structures, interfaces, and implementations for each ATP service
- Data Access Patterns: Repository patterns, NHibernate mappings, MongoDB collections, query patterns
- Service Interactions: Detailed API contracts, message schemas, protocol implementations
- Domain Logic: Business rule implementations, validation logic, state machines
- Infrastructure Integration: Dependency injection, configuration, middleware, interceptors
- Error Handling: Exception hierarchies, error response formats, retry policies
- Performance Optimizations: Caching strategies, batch processing, connection pooling
- Security Implementation: Authentication/authorization flows, encryption, key management
Audience: Software engineers, architects, technical leads implementing ATP services
Relationship to Other Documents:
- High-Level Design: See hld.md for system-level architecture, service boundaries, and integration patterns
- Components: See components.md for component overviews and responsibilities
- Data Architecture: See data-architecture.md for data models and storage strategies
- Sequence Flows: See sequence-flows.md for detailed interaction flows
- Domain Model: See ../domain/ubiquitous-language.md for domain concepts
Table of Contents¶
- Design Principles
- Gateway Service LLD
- Ingestion Service LLD
- Policy Service LLD
- Query Service LLD
- Projection Service LLD
- Export Service LLD
- Integrity Service LLD
- Admin Service LLD
- Shared Components LLD
- Data Access Layer
- Message Bus Integration
- Security Implementation
- Error Handling Patterns
- Performance Optimizations
Design Principles¶
ATP LLD Principles¶
- Clean Architecture: Separation of concerns (Domain, Application, Infrastructure)
- Domain-Driven Design (DDD): Rich domain models, aggregates, value objects
- SOLID Principles: Single responsibility, dependency inversion, interface segregation
- Testability: Dependency injection, mockable interfaces, unit testable
- Observability First: Instrumentation built into core logic, not bolted on
- Security by Design: Zero-trust, defense in depth, least privilege
- Performance Aware: Async/await, batching, caching, connection pooling
- Compliance Ready: Audit trails, tamper-evidence, data residency
Architecture Layers¶
┌─────────────────────────────────────────┐
│ Presentation Layer │
│ (Controllers, API Gateway, gRPC) │
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Application Layer │
│ (Use Cases, Orchestration, DTOs) │
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Domain Layer │
│ (Entities, Aggregates, Domain Services)│
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Infrastructure Layer │
│ (Repositories, Messaging, Storage) │
└─────────────────────────────────────────┘
Gateway Service LLD¶
Component Structure¶
ConnectSoft.ATP.Gateway/
├── API/
│ ├── Controllers/
│ │ ├── AuditController.cs # Ingestion endpoints
│ │ ├── QueryController.cs # Query endpoints (proxy)
│ │ ├── ExportController.cs # Export endpoints (proxy)
│ │ └── HealthController.cs # Health checks
│ ├── Middleware/
│ │ ├── TenantResolutionMiddleware.cs
│ │ ├── AuthenticationMiddleware.cs
│ │ ├── AuthorizationMiddleware.cs
│ │ ├── CorrelationMiddleware.cs
│ │ └── ErrorHandlingMiddleware.cs
│ └── Models/
│ ├── AuditRecordRequest.cs
│ ├── QueryRequest.cs
│ └── ErrorResponse.cs
├── Application/
│ ├── UseCases/
│ │ ├── IngestAuditRecordUseCase.cs
│ │ ├── QueryAuditRecordsUseCase.cs
│ │ └── ValidateRequestUseCase.cs
│ ├── Services/
│ │ ├── TenantResolver.cs
│ │ ├── RequestValidator.cs
│ │ └── SchemaValidator.cs
│ └── DTOs/
│ └── AuditRecordDto.cs
├── Domain/
│ └── ValueObjects/
│ ├── TenantId.cs
│ └── CorrelationId.cs
└── Infrastructure/
├── Clients/
│ ├── IngestionServiceClient.cs
│ ├── QueryServiceClient.cs
│ └── PolicyServiceClient.cs
└── Configuration/
└── GatewayOptions.cs
Core Classes¶
AuditController¶
namespace ConnectSoft.ATP.Gateway.API.Controllers;
[ApiController]
[Route("api/v1/audit")]
public class AuditController : ControllerBase
{
private readonly ILogger<AuditController> _logger;
private readonly IIngestAuditRecordUseCase _ingestUseCase;
private readonly IRequestValidator _requestValidator;
private readonly ITenantResolver _tenantResolver;
public AuditController(
ILogger<AuditController> logger,
IIngestAuditRecordUseCase ingestUseCase,
IRequestValidator requestValidator,
ITenantResolver tenantResolver)
{
_logger = logger;
_ingestUseCase = ingestUseCase;
_requestValidator = requestValidator;
_tenantResolver = tenantResolver;
}
[HttpPost("records")]
[ProducesResponseType(typeof(AuditRecordResponse), StatusCodes.Status201Created)]
[ProducesResponseType(typeof(ErrorResponse), StatusCodes.Status400BadRequest)]
[ProducesResponseType(StatusCodes.Status401Unauthorized)]
[ProducesResponseType(StatusCodes.Status429TooManyRequests)]
public async Task<ActionResult<AuditRecordResponse>> IngestRecord(
[FromBody] AuditRecordRequest request,
CancellationToken cancellationToken)
{
// Tenant resolved by middleware, available in HttpContext.Items
var tenantId = _tenantResolver.GetCurrentTenantId();
var correlationId = Activity.Current?.TraceId.ToString();
// Validate request
var validationResult = await _requestValidator.ValidateAsync(
request, tenantId, cancellationToken);
if (!validationResult.IsValid)
{
return BadRequest(new ErrorResponse
{
Error = "ValidationFailed",
Message = validationResult.ErrorMessage,
Details = validationResult.ValidationErrors
});
}
// Execute use case
var result = await _ingestUseCase.ExecuteAsync(
new IngestAuditRecordCommand
{
TenantId = tenantId,
AuditRecord = request.ToDomain(),
CorrelationId = correlationId,
RequestId = Request.Headers["X-Request-ID"].FirstOrDefault()
},
cancellationToken);
if (result.IsSuccess)
{
return CreatedAtAction(
nameof(GetRecord),
new { id = result.AuditRecordId },
new AuditRecordResponse
{
AuditRecordId = result.AuditRecordId,
Status = "Accepted",
Message = "Audit record queued for processing"
});
}
// Handle failure
return HandleFailure(result);
}
[HttpGet("records/{id}")]
public async Task<ActionResult<AuditRecordResponse>> GetRecord(
string id,
CancellationToken cancellationToken)
{
// Implementation...
}
}
TenantResolutionMiddleware¶
namespace ConnectSoft.ATP.Gateway.API.Middleware;
public class TenantResolutionMiddleware
{
private readonly RequestDelegate _next;
private readonly ILogger<TenantResolutionMiddleware> _logger;
private readonly ITenantResolver _tenantResolver;
public TenantResolutionMiddleware(
RequestDelegate next,
ILogger<TenantResolutionMiddleware> logger,
ITenantResolver tenantResolver)
{
_next = next;
_logger = logger;
_tenantResolver = tenantResolver;
}
public async Task InvokeAsync(HttpContext context)
{
// Resolve tenant from:
// 1. Subdomain (acme-corp.atp.connectsoft.dev)
// 2. Header (X-Tenant-Id)
// 3. JWT claim (tenant_id)
// 4. Path parameter (/api/v1/tenants/{tenantId}/audit)
var tenantId = await _tenantResolver.ResolveAsync(context);
if (tenantId == null)
{
context.Response.StatusCode = StatusCodes.Status401Unauthorized;
await context.Response.WriteAsJsonAsync(new ErrorResponse
{
Error = "TenantNotFound",
Message = "Unable to resolve tenant from request"
});
return;
}
// Store in context for downstream use
context.Items["TenantId"] = tenantId;
// Add to activity/baggage for observability
Activity.Current?.SetTag("tenant.id", tenantId);
Baggage.SetBaggage("tenant.id", tenantId);
await _next(context);
}
}
Ingestion Service LLD¶
Component Structure¶
ConnectSoft.ATP.Ingestion/
├── API/
│ └── Controllers/
│ └── IngestionController.cs
├── Application/
│ ├── UseCases/
│ │ └── IngestAuditRecordUseCase.cs
│ ├── Services/
│ │ ├── AuditRecordValidator.cs
│ │ ├── IdempotencyChecker.cs
│ │ ├── ClassificationService.cs
│ │ └── PolicyEvaluator.cs
│ └── DTOs/
│ └── AuditRecordDto.cs
├── Domain/
│ ├── Entities/
│ │ ├── AuditRecord.cs # Aggregate root
│ │ └── AuditRecordId.cs # Value object (ULID)
│ ├── ValueObjects/
│ │ ├── Classification.cs
│ │ ├── Actor.cs
│ │ └── Resource.cs
│ ├── Services/
│ │ └── IClassificationService.cs
│ └── Events/
│ └── AuditRecordAcceptedEvent.cs
├── Infrastructure/
│ ├── Persistence/
│ │ ├── Repositories/
│ │ │ └── IAuditRecordRepository.cs
│ │ └── Mappings/
│ │ └── AuditRecordMap.cs # NHibernate mapping
│ ├── Messaging/
│ │ ├── Outbox/
│ │ │ └── OutboxMessage.cs
│ │ └── Publishers/
│ │ └── AuditRecordEventPublisher.cs
│ └── Clients/
│ └── PolicyServiceClient.cs
└── Background/
└── OutboxRelayWorker.cs
Core Classes¶
AuditRecord (Aggregate Root)¶
namespace ConnectSoft.ATP.Ingestion.Domain.Entities;
public class AuditRecord : AggregateRoot<AuditRecordId>
{
public TenantId TenantId { get; private set; }
public DateTimeOffset CreatedAt { get; private set; }
public DateTimeOffset ObservedAt { get; private set; }
public DateTimeOffset? EffectiveAt { get; private set; }
public Action Action { get; private set; }
public Resource Resource { get; private set; }
public Actor Actor { get; private set; }
public CorrelationTraceId CorrelationTraceId { get; private set; }
public CorrelationRequestId? CorrelationRequestId { get; private set; }
public DecisionOutcome? DecisionOutcome { get; private set; }
public IdempotencyKey? IdempotencyKey { get; private set; }
public SchemaVersion SchemaVersion { get; private set; }
public Payload Payload { get; private set; }
public Classification Classification { get; private set; }
public PolicyVersion PolicyVersion { get; private set; }
public RetentionPolicy RetentionPolicy { get; private set; }
private AuditRecord() { } // For persistence
public static AuditRecord Create(
TenantId tenantId,
Action action,
Resource resource,
Actor actor,
Payload payload,
DateTimeOffset observedAt,
Classification classification,
PolicyVersion policyVersion,
RetentionPolicy retentionPolicy,
IdempotencyKey? idempotencyKey = null,
DateTimeOffset? effectiveAt = null,
CorrelationRequestId? correlationRequestId = null)
{
var record = new AuditRecord
{
Id = AuditRecordId.New(),
TenantId = tenantId,
CreatedAt = DateTimeOffset.UtcNow,
ObservedAt = observedAt,
EffectiveAt = effectiveAt ?? observedAt,
Action = action,
Resource = resource,
Actor = actor,
CorrelationTraceId = CorrelationTraceId.FromActivity(),
CorrelationRequestId = correlationRequestId,
IdempotencyKey = idempotencyKey,
SchemaVersion = SchemaVersion.Current,
Payload = payload,
Classification = classification,
PolicyVersion = policyVersion,
RetentionPolicy = retentionPolicy
};
record.AddDomainEvent(new AuditRecordAcceptedEvent(
record.Id,
record.TenantId,
record.CreatedAt));
return record;
}
// Business logic methods
public bool CanBePurged(DateTimeOffset now)
{
if (RetentionPolicy.OnLegalHold)
return false;
return now >= RetentionPolicy.EligibleForDeletionAt;
}
public void MarkAsDeleted(DateTimeOffset deletedAt, Actor deletedBy)
{
if (!CanBePurged(deletedAt))
throw new DomainException("Cannot delete record on legal hold");
// Mark for deletion (soft delete first)
AddDomainEvent(new AuditRecordDeletedEvent(Id, TenantId, deletedAt, deletedBy));
}
}
IngestAuditRecordUseCase¶
namespace ConnectSoft.ATP.Ingestion.Application.UseCases;
public class IngestAuditRecordUseCase : IIngestAuditRecordUseCase
{
private readonly ILogger<IngestAuditRecordUseCase> _logger;
private readonly IAuditRecordRepository _repository;
private readonly IAuditRecordValidator _validator;
private readonly IIdempotencyChecker _idempotencyChecker;
private readonly IClassificationService _classificationService;
private readonly IPolicyEvaluator _policyEvaluator;
private readonly IOutboxMessageRepository _outboxRepository;
private readonly IUnitOfWork _unitOfWork;
public IngestAuditRecordUseCase(
ILogger<IngestAuditRecordUseCase> logger,
IAuditRecordRepository repository,
IAuditRecordValidator validator,
IIdempotencyChecker idempotencyChecker,
IClassificationService classificationService,
IPolicyEvaluator policyEvaluator,
IOutboxMessageRepository outboxRepository,
IUnitOfWork unitOfWork)
{
_logger = logger;
_repository = repository;
_validator = validator;
_idempotencyChecker = idempotencyChecker;
_classificationService = classificationService;
_policyEvaluator = policyEvaluator;
_outboxRepository = outboxRepository;
_unitOfWork = unitOfWork;
}
public async Task<IngestAuditRecordResult> ExecuteAsync(
IngestAuditRecordCommand command,
CancellationToken cancellationToken)
{
using var activity = ActivitySource.StartActivity("Ingestion.IngestRecord");
activity?.SetTag("tenant.id", command.TenantId);
activity?.SetTag("correlation.id", command.CorrelationId);
try
{
// 1. Validate request
var validationResult = await _validator.ValidateAsync(
command.AuditRecord, cancellationToken);
if (!validationResult.IsValid)
{
activity?.SetStatus(ActivityStatusCode.Error, validationResult.ErrorMessage);
return IngestAuditRecordResult.Failure(validationResult.ErrorMessage);
}
// 2. Check idempotency
if (command.AuditRecord.IdempotencyKey != null)
{
var existingRecordId = await _idempotencyChecker.CheckAsync(
command.TenantId,
command.AuditRecord.IdempotencyKey,
cancellationToken);
if (existingRecordId != null)
{
_logger.LogInformation(
"Duplicate request detected, returning existing record {AuditRecordId}",
existingRecordId);
activity?.SetTag("idempotent", true);
return IngestAuditRecordResult.Success(existingRecordId);
}
}
// 3. Classify record
var classification = await _classificationService.ClassifyAsync(
command.AuditRecord,
cancellationToken);
activity?.SetTag("classification", classification.ToString());
// 4. Evaluate policy
var policyResult = await _policyEvaluator.EvaluateAsync(
command.TenantId,
command.AuditRecord,
classification,
cancellationToken);
activity?.SetTag("policy.version", policyResult.PolicyVersion);
activity?.SetTag("decision.outcome", policyResult.DecisionOutcome?.ToString());
// 5. Create domain entity
var auditRecord = AuditRecord.Create(
command.TenantId,
command.AuditRecord.Action,
command.AuditRecord.Resource,
command.AuditRecord.Actor,
command.AuditRecord.Payload,
command.AuditRecord.ObservedAt,
classification,
policyResult.PolicyVersion,
policyResult.RetentionPolicy,
command.AuditRecord.IdempotencyKey,
command.AuditRecord.EffectiveAt,
command.CorrelationRequestId);
// 6. Persist to database (with outbox pattern)
await _repository.AddAsync(auditRecord, cancellationToken);
// 7. Add outbox message for event publishing
var outboxMessage = OutboxMessage.Create(
auditRecord.DomainEvents.First(),
command.TenantId);
await _outboxRepository.AddAsync(outboxMessage, cancellationToken);
// 8. Commit transaction
await _unitOfWork.CommitAsync(cancellationToken);
activity?.SetTag("audit.record.id", auditRecord.Id);
activity?.SetStatus(ActivityStatusCode.Ok);
_logger.LogInformation(
"Audit record ingested successfully: {AuditRecordId} for tenant {TenantId}",
auditRecord.Id, command.TenantId);
return IngestAuditRecordResult.Success(auditRecord.Id);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
activity?.RecordException(ex);
_logger.LogError(ex,
"Failed to ingest audit record for tenant {TenantId}",
command.TenantId);
throw;
}
}
}
AuditRecordRepository (NHibernate)¶
namespace ConnectSoft.ATP.Ingestion.Infrastructure.Persistence.Repositories;
public class AuditRecordRepository : IAuditRecordRepository
{
private readonly ISessionFactory _sessionFactory;
private readonly ILogger<AuditRecordRepository> _logger;
public AuditRecordRepository(
ISessionFactory sessionFactory,
ILogger<AuditRecordRepository> logger)
{
_sessionFactory = sessionFactory;
_logger = logger;
}
public async Task<AuditRecordId?> AddAsync(
AuditRecord auditRecord,
CancellationToken cancellationToken)
{
using var session = _sessionFactory.OpenSession();
using var transaction = session.BeginTransaction();
try
{
// Check idempotency constraint
if (auditRecord.IdempotencyKey != null)
{
var existing = await session.QueryOver<AuditRecord>()
.Where(x => x.TenantId == auditRecord.TenantId)
.And(x => x.IdempotencyKey == auditRecord.IdempotencyKey)
.SingleOrDefaultAsync(cancellationToken);
if (existing != null)
{
return existing.Id;
}
}
await session.SaveAsync(auditRecord, cancellationToken);
await transaction.CommitAsync(cancellationToken);
return auditRecord.Id;
}
catch (Exception ex)
{
await transaction.RollbackAsync(cancellationToken);
_logger.LogError(ex,
"Failed to persist audit record {AuditRecordId}",
auditRecord.Id);
throw;
}
}
public async Task<AuditRecord?> GetByIdAsync(
TenantId tenantId,
AuditRecordId id,
CancellationToken cancellationToken)
{
using var session = _sessionFactory.OpenSession();
return await session.QueryOver<AuditRecord>()
.Where(x => x.Id == id)
.And(x => x.TenantId == tenantId)
.SingleOrDefaultAsync(cancellationToken);
}
public async Task<IEnumerable<AuditRecord>> GetByTimeRangeAsync(
TenantId tenantId,
DateTimeOffset from,
DateTimeOffset to,
CancellationToken cancellationToken)
{
using var session = _sessionFactory.OpenSession();
return await session.QueryOver<AuditRecord>()
.Where(x => x.TenantId == tenantId)
.And(x => x.CreatedAt >= from)
.And(x => x.CreatedAt <= to)
.ListAsync(cancellationToken);
}
}
Policy Service LLD¶
Component Structure¶
ConnectSoft.ATP.Policy/
├── Application/
│ ├── UseCases/
│ │ └── EvaluatePolicyUseCase.cs
│ ├── Services/
│ │ ├── PolicyRuleEngine.cs
│ │ ├── PolicyCache.cs
│ │ └── PolicyVersionResolver.cs
│ └── DTOs/
│ └── PolicyEvaluationResultDto.cs
├── Domain/
│ ├── Entities/
│ │ ├── PolicyRule.cs
│ │ └── PolicyVersion.cs
│ ├── ValueObjects/
│ │ ├── PolicyRuleExpression.cs
│ │ └── RetentionPeriod.cs
│ └── Services/
│ └── IPolicyEvaluationService.cs
└── Infrastructure/
├── Persistence/
│ └── IPolicyRuleRepository.cs
├── Cache/
│ └── RedisPolicyCache.cs
└── Evaluation/
│ └── RuleEvaluator.cs
Core Classes¶
PolicyRuleEngine¶
namespace ConnectSoft.ATP.Policy.Application.Services;
public class PolicyRuleEngine : IPolicyRuleEngine
{
private readonly ILogger<PolicyRuleEngine> _logger;
private readonly IPolicyRuleRepository _repository;
private readonly IPolicyCache _cache;
private readonly IRuleEvaluator _ruleEvaluator;
private readonly IPolicyVersionResolver _versionResolver;
public PolicyRuleEngine(
ILogger<PolicyRuleEngine> logger,
IPolicyRuleRepository repository,
IPolicyCache cache,
IRuleEvaluator ruleEvaluator,
IPolicyVersionResolver versionResolver)
{
_logger = logger;
_repository = repository;
_cache = cache;
_ruleEvaluator = ruleEvaluator;
_versionResolver = versionResolver;
}
public async Task<PolicyEvaluationResult> EvaluateAsync(
TenantId tenantId,
AuditRecord auditRecord,
Classification classification,
CancellationToken cancellationToken)
{
using var activity = ActivitySource.StartActivity("Policy.Evaluate");
activity?.SetTag("tenant.id", tenantId);
activity?.SetTag("classification", classification.ToString());
try
{
// 1. Resolve policy version
var policyVersion = await _versionResolver.ResolveVersionAsync(
tenantId,
cancellationToken);
activity?.SetTag("policy.version", policyVersion.ToString());
// 2. Get rules (from cache or database)
var rules = await GetRulesAsync(tenantId, policyVersion, cancellationToken);
activity?.SetTag("rule.count", rules.Count);
// 3. Evaluate rules in order
var evaluationContext = new PolicyEvaluationContext
{
TenantId = tenantId,
AuditRecord = auditRecord,
Classification = classification,
PolicyVersion = policyVersion
};
foreach (var rule in rules.OrderBy(r => r.Priority))
{
var match = await _ruleEvaluator.EvaluateAsync(
rule,
evaluationContext,
cancellationToken);
if (match)
{
activity?.SetTag("matched.rule.id", rule.Id);
return new PolicyEvaluationResult
{
PolicyVersion = policyVersion,
MatchedRule = rule,
DecisionOutcome = rule.DecisionOutcome,
RetentionPolicy = rule.RetentionPolicy,
RedactionRules = rule.RedactionRules
};
}
}
// 4. Default policy if no rules match
return CreateDefaultPolicyResult(policyVersion);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
activity?.RecordException(ex);
_logger.LogError(ex, "Policy evaluation failed for tenant {TenantId}", tenantId);
throw;
}
}
private async Task<IEnumerable<PolicyRule>> GetRulesAsync(
TenantId tenantId,
PolicyVersion version,
CancellationToken cancellationToken)
{
// Try cache first
var cacheKey = $"policy:rules:{tenantId}:{version}";
var cachedRules = await _cache.GetAsync<IEnumerable<PolicyRule>>(cacheKey, cancellationToken);
if (cachedRules != null)
{
return cachedRules;
}
// Load from database
var rules = await _repository.GetByTenantAndVersionAsync(
tenantId,
version,
cancellationToken);
// Cache for 5 minutes
await _cache.SetAsync(
cacheKey,
rules,
TimeSpan.FromMinutes(5),
cancellationToken);
return rules;
}
}
Query Service LLD¶
Component Structure¶
ConnectSoft.ATP.Query/
├── API/
│ └── Controllers/
│ └── QueryController.cs
├── Application/
│ ├── UseCases/
│ │ ├── QueryAuditRecordsUseCase.cs
│ │ └── SearchAuditRecordsUseCase.cs
│ ├── Services/
│ │ ├── QueryBuilder.cs
│ │ ├── ResultProjector.cs
│ │ └── CacheManager.cs
│ └── DTOs/
│ └── QueryResultDto.cs
├── Domain/
│ └── ValueObjects/
│ └── QueryFilter.cs
└── Infrastructure/
├── Persistence/
│ ├── IReadModelRepository.cs
│ └── CosmosDbRepository.cs
├── Search/
│ └── ISearchIndex.cs
└── Cache/
└── RedisQueryCache.cs
Core Classes¶
QueryBuilder¶
namespace ConnectSoft.ATP.Query.Application.Services;
public class QueryBuilder : IQueryBuilder
{
public IQueryable<AuditRecordProjection> BuildQuery(
TenantId tenantId,
QueryFilter filter,
QueryOptions options)
{
var query = _context.AuditRecordProjections
.Where(x => x.TenantId == tenantId);
// Apply time range filter
if (filter.TimeRange != null)
{
query = query.Where(x =>
x.CreatedAt >= filter.TimeRange.From &&
x.CreatedAt <= filter.TimeRange.To);
}
// Apply action filter
if (filter.Actions != null && filter.Actions.Any())
{
query = query.Where(x => filter.Actions.Contains(x.Action));
}
// Apply resource filter
if (filter.ResourceType != null)
{
query = query.Where(x => x.ResourceType == filter.ResourceType);
}
if (filter.ResourceId != null)
{
query = query.Where(x => x.ResourceId == filter.ResourceId);
}
// Apply actor filter
if (filter.ActorId != null)
{
query = query.Where(x => x.ActorId == filter.ActorId);
}
// Apply classification filter
if (filter.Classifications != null && filter.Classifications.Any())
{
query = query.Where(x => filter.Classifications.Contains(x.Classification));
}
// Apply sorting
query = options.SortBy switch
{
SortBy.CreatedAtAsc => query.OrderBy(x => x.CreatedAt),
SortBy.CreatedAtDesc => query.OrderByDescending(x => x.CreatedAt),
SortBy.ObservedAtAsc => query.OrderBy(x => x.ObservedAt),
SortBy.ObservedAtDesc => query.OrderByDescending(x => x.ObservedAt),
_ => query.OrderByDescending(x => x.CreatedAt)
};
return query;
}
}
Data Access Layer¶
Repository Pattern¶
namespace ConnectSoft.ATP.Shared.Infrastructure.Persistence;
public interface IRepository<TEntity, TKey> where TEntity : Entity<TKey>
{
Task<TEntity?> GetByIdAsync(TKey id, CancellationToken cancellationToken);
Task<IEnumerable<TEntity>> GetAllAsync(CancellationToken cancellationToken);
Task AddAsync(TEntity entity, CancellationToken cancellationToken);
Task UpdateAsync(TEntity entity, CancellationToken cancellationToken);
Task DeleteAsync(TKey id, CancellationToken cancellationToken);
}
public interface IUnitOfWork : IDisposable
{
Task<int> SaveChangesAsync(CancellationToken cancellationToken);
Task CommitAsync(CancellationToken cancellationToken);
Task RollbackAsync(CancellationToken cancellationToken);
}
NHibernate Mappings¶
namespace ConnectSoft.ATP.Ingestion.Infrastructure.Persistence.Mappings;
public class AuditRecordMap : ClassMapping<AuditRecord>
{
public AuditRecordMap()
{
Table("AuditRecords");
Id(x => x.Id, m =>
{
m.Column("AuditRecordId");
m.Type(NHibernateUtil.String);
m.Length(26); // ULID length
});
Property(x => x.TenantId, m =>
{
m.Column("TenantId");
m.Type(NHibernateUtil.String);
m.Length(128);
m.NotNullable(true);
m.Index("IX_Audit_Tenant_CreatedAt");
});
Property(x => x.CreatedAt, m =>
{
m.Column("CreatedAt");
m.Type(NHibernateUtil.DateTimeOffset);
m.NotNullable(true);
m.Index("IX_Audit_Tenant_CreatedAt");
});
Component(x => x.Action, m =>
{
m.Property(a => a.Value, p => p.Column("Action"));
m.Property(a => a.Category, p => p.Column("ActionCategory"));
});
Component(x => x.Resource, m =>
{
m.Property(r => r.Type, p => p.Column("ResourceType"));
m.Property(r => r.Id, p => p.Column("ResourceId"));
m.Property(r => r.Path, p => p.Column("ResourcePath"));
});
Component(x => x.Actor, m =>
{
m.Property(a => a.Id, p => p.Column("ActorId"));
m.Property(a => a.Type, p => p.Column("ActorType"));
});
Property(x => x.Payload, m =>
{
m.Column("PayloadJson");
m.Type<JsonType<Payload>>();
m.NotNullable(true);
});
Property(x => x.Classification, m =>
{
m.Column("Classification");
m.Type(NHibernateUtil.Enum<Classification>());
});
// Unique constraint for idempotency
Property(x => x.IdempotencyKey, m =>
{
m.Column("IdempotencyKey");
m.UniqueKey("UX_Audit_Tenant_Idem");
});
// WORM enforcement: Read-only after creation
// (enforced at database level with triggers)
}
}
Message Bus Integration¶
Outbox Pattern Implementation¶
namespace ConnectSoft.ATP.Shared.Infrastructure.Messaging.Outbox;
public class OutboxMessage : Entity<OutboxMessageId>
{
public TenantId TenantId { get; private set; }
public string EventType { get; private set; }
public string Payload { get; private set; } // JSON serialized
public DateTimeOffset CreatedAt { get; private set; }
public OutboxMessageStatus Status { get; private set; }
public DateTimeOffset? ProcessedAt { get; private set; }
public int RetryCount { get; private set; }
public static OutboxMessage Create(
IDomainEvent domainEvent,
TenantId tenantId)
{
return new OutboxMessage
{
Id = OutboxMessageId.New(),
TenantId = tenantId,
EventType = domainEvent.GetType().Name,
Payload = JsonSerializer.Serialize(domainEvent),
CreatedAt = DateTimeOffset.UtcNow,
Status = OutboxMessageStatus.Pending,
RetryCount = 0
};
}
public void MarkAsProcessing()
{
Status = OutboxMessageStatus.Processing;
}
public void MarkAsProcessed()
{
Status = OutboxMessageStatus.Processed;
ProcessedAt = DateTimeOffset.UtcNow;
}
public void MarkAsFailed()
{
Status = OutboxMessageStatus.Failed;
RetryCount++;
}
}
// Outbox Relay Worker (Background Service)
public class OutboxRelayWorker : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<OutboxRelayWorker> _logger;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await ProcessOutboxMessagesAsync(stoppingToken);
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing outbox messages");
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
}
}
}
private async Task ProcessOutboxMessagesAsync(CancellationToken cancellationToken)
{
using var scope = _serviceProvider.CreateScope();
var outboxRepository = scope.ServiceProvider
.GetRequiredService<IOutboxMessageRepository>();
var messagePublisher = scope.ServiceProvider
.GetRequiredService<IMessagePublisher>();
var unitOfWork = scope.ServiceProvider
.GetRequiredService<IUnitOfWork>();
// Get pending messages (limit batch size)
var pendingMessages = await outboxRepository.GetPendingAsync(
maxCount: 100,
cancellationToken);
foreach (var message in pendingMessages)
{
try
{
message.MarkAsProcessing();
// Publish to message bus
await messagePublisher.PublishAsync(
message.EventType,
message.Payload,
message.TenantId,
cancellationToken);
message.MarkAsProcessed();
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to publish outbox message {MessageId}",
message.Id);
message.MarkAsFailed();
// Dead-letter after max retries
if (message.RetryCount >= 5)
{
await outboxRepository.MoveToDeadLetterAsync(
message,
cancellationToken);
}
}
}
await unitOfWork.CommitAsync(cancellationToken);
}
}
Security Implementation¶
Authentication Middleware¶
namespace ConnectSoft.ATP.Gateway.API.Middleware;
public class AuthenticationMiddleware
{
private readonly RequestDelegate _next;
private readonly ILogger<AuthenticationMiddleware> _logger;
private readonly IJwtTokenValidator _tokenValidator;
private readonly IApiKeyValidator _apiKeyValidator;
public async Task InvokeAsync(HttpContext context)
{
// Try JWT token first
if (context.Request.Headers.TryGetValue("Authorization", out var authHeader))
{
var token = ExtractBearerToken(authHeader);
if (token != null)
{
var principal = await _tokenValidator.ValidateAsync(token);
if (principal != null)
{
context.User = principal;
await _next(context);
return;
}
}
}
// Try API key
if (context.Request.Headers.TryGetValue("X-API-Key", out var apiKey))
{
var principal = await _apiKeyValidator.ValidateAsync(apiKey);
if (principal != null)
{
context.User = principal;
await _next(context);
return;
}
}
// Unauthorized
context.Response.StatusCode = StatusCodes.Status401Unauthorized;
await context.Response.WriteAsJsonAsync(new ErrorResponse
{
Error = "Unauthorized",
Message = "Authentication required"
});
}
}
Authorization Handler (ABAC)¶
namespace ConnectSoft.ATP.Gateway.API.Authorization;
public class AuditRecordAuthorizationHandler :
AuthorizationHandler<AuditRecordRequirement, AuditRecord>
{
protected override Task HandleRequirementAsync(
AuthorizationHandlerContext context,
AuditRecordRequirement requirement,
AuditRecord resource)
{
// Extract tenant from user claims
var userTenantId = context.User.FindFirst("tenant_id")?.Value;
// Check tenant match
if (userTenantId != resource.TenantId.Value)
{
context.Fail();
return Task.CompletedTask;
}
// Check permissions
var permission = requirement.Permission;
var userPermissions = context.User.FindAll("permission").Select(c => c.Value);
if (!userPermissions.Contains(permission))
{
context.Fail();
return Task.CompletedTask;
}
context.Succeed(requirement);
return Task.CompletedTask;
}
}
Error Handling Patterns¶
Exception Hierarchy¶
namespace ConnectSoft.ATP.Shared.Domain.Exceptions;
// Base exception
public abstract class DomainException : Exception
{
public string ErrorCode { get; }
protected DomainException(string errorCode, string message)
: base(message)
{
ErrorCode = errorCode;
}
}
// Specific exceptions
public class ValidationException : DomainException
{
public ValidationException(string message, IEnumerable<string> errors)
: base("VALIDATION_FAILED", message)
{
ValidationErrors = errors.ToList();
}
public IReadOnlyList<string> ValidationErrors { get; }
}
public class TenantNotFoundException : DomainException
{
public TenantNotFoundException(TenantId tenantId)
: base("TENANT_NOT_FOUND", $"Tenant {tenantId} not found")
{
TenantId = tenantId;
}
public TenantId TenantId { get; }
}
public class IdempotencyConflictException : DomainException
{
public IdempotencyConflictException(AuditRecordId existingRecordId)
: base("IDEMPOTENCY_CONFLICT",
$"Record with same idempotency key already exists: {existingRecordId}")
{
ExistingRecordId = existingRecordId;
}
public AuditRecordId ExistingRecordId { get; }
}
Global Error Handler¶
namespace ConnectSoft.ATP.Gateway.API.Middleware;
public class ErrorHandlingMiddleware
{
private readonly RequestDelegate _next;
private readonly ILogger<ErrorHandlingMiddleware> _logger;
public async Task InvokeAsync(HttpContext context)
{
try
{
await _next(context);
}
catch (Exception ex)
{
await HandleExceptionAsync(context, ex);
}
}
private async Task HandleExceptionAsync(HttpContext context, Exception exception)
{
var response = context.Response;
response.ContentType = "application/json";
var errorResponse = exception switch
{
ValidationException ve => new ErrorResponse
{
Error = ve.ErrorCode,
Message = ve.Message,
StatusCode = StatusCodes.Status400BadRequest,
Details = ve.ValidationErrors
},
TenantNotFoundException tnf => new ErrorResponse
{
Error = tnf.ErrorCode,
Message = tnf.Message,
StatusCode = StatusCodes.Status404NotFound
},
IdempotencyConflictException ic => new ErrorResponse
{
Error = ic.ErrorCode,
Message = ic.Message,
StatusCode = StatusCodes.Status409Conflict,
Details = new[] { ic.ExistingRecordId.ToString() }
},
UnauthorizedAccessException => new ErrorResponse
{
Error = "UNAUTHORIZED",
Message = "Access denied",
StatusCode = StatusCodes.Status401Unauthorized
},
_ => new ErrorResponse
{
Error = "INTERNAL_ERROR",
Message = "An unexpected error occurred",
StatusCode = StatusCodes.Status500InternalServerError
}
};
response.StatusCode = errorResponse.StatusCode;
await response.WriteAsJsonAsync(errorResponse);
// Log error
_logger.LogError(exception,
"Error processing request: {ErrorCode} - {Message}",
errorResponse.Error,
errorResponse.Message);
}
}
Performance Optimizations¶
Caching Strategy¶
namespace ConnectSoft.ATP.Policy.Infrastructure.Cache;
public class RedisPolicyCache : IPolicyCache
{
private readonly IDatabase _database;
private readonly ILogger<RedisPolicyCache> _logger;
private readonly IMemoryCache _localCache; // L1 cache
public async Task<T?> GetAsync<T>(
string key,
CancellationToken cancellationToken)
{
// L1: Check local memory cache first
if (_localCache.TryGetValue(key, out T? value))
{
return value;
}
// L2: Check Redis
var redisValue = await _database.StringGetAsync(key);
if (redisValue.HasValue)
{
var deserialized = JsonSerializer.Deserialize<T>(redisValue!);
// Populate L1 cache
_localCache.Set(key, deserialized, TimeSpan.FromMinutes(1));
return deserialized;
}
return default;
}
public async Task SetAsync<T>(
string key,
T value,
TimeSpan? expiration,
CancellationToken cancellationToken)
{
var serialized = JsonSerializer.Serialize(value);
// Set in Redis (L2)
await _database.StringSetAsync(
key,
serialized,
expiration ?? TimeSpan.FromMinutes(5));
// Set in local cache (L1)
_localCache.Set(key, value, TimeSpan.FromMinutes(1));
}
}
Batch Processing¶
namespace ConnectSoft.ATP.Projection.Application.Services;
public class BatchProcessor<T>
{
private readonly int _batchSize;
private readonly TimeSpan _maxWaitTime;
private readonly SemaphoreSlim _semaphore;
public async Task ProcessBatchAsync(
IEnumerable<T> items,
Func<IEnumerable<T>, CancellationToken, Task> processor,
CancellationToken cancellationToken)
{
var batches = items
.Chunk(_batchSize)
.Select((batch, index) => new { Batch = batch, Index = index });
var tasks = batches.Select(async batch =>
{
await _semaphore.WaitAsync(cancellationToken);
try
{
await processor(batch.Batch, cancellationToken);
}
finally
{
_semaphore.Release();
}
});
await Task.WhenAll(tasks);
}
}
Summary¶
This LLD document provides detailed implementation guidance for ATP components:
- Component Structures: Organized by service (Gateway, Ingestion, Policy, Query, etc.)
- Class Designs: Detailed class definitions with properties, methods, and business logic
- Data Access: Repository patterns, NHibernate mappings, query builders
- Message Bus: Outbox pattern implementation for reliable event publishing
- Security: Authentication/authorization middleware and handlers
- Error Handling: Exception hierarchy and global error handling
- Performance: Caching strategies, batch processing, connection pooling
Implementation Guidelines: - Follow Clean Architecture principles (Domain, Application, Infrastructure separation) - Use Domain-Driven Design (aggregates, value objects, domain services) - Implement repository pattern for data access - Use outbox pattern for reliable messaging - Instrument all operations for observability - Follow security-by-design principles
Next Steps: - Review component-specific LLD sections for detailed implementations - Follow class designs and interfaces when implementing services - Ensure all services follow the same patterns for consistency - Add unit and integration tests for each component
Document Version: 1.0
Last Updated: 2025-10-30
Maintained By: Architecture & Engineering Team