- Регистрация
- 9 Май 2015
- Сообщения
- 1,605
- Баллы
- 155
Introducción
En el desarrollo moderno de aplicaciones, es común necesitar ejecutar procesos en segundo plano que se comuniquen con nuestra API de forma eficiente y segura. Tradicionalmente, esto se resolvía con implementaciones complejas usando locks, colas manuales o infraestructura externa como RabbitMQ. Sin embargo, .NET ofrece una solución simple pero elegante: System.Threading.Channels.
En este artículo, exploraremos cómo construir un sistema de control de jobs en tiempo real utilizando:
Channels para comunicación thread-safe entre componentes
Background Services para tareas recurrentes
Minimal APIs para endpoints modernos y limpios
TaskCompletionSource para comunicación bidireccional
Al finalizar, tendrás un proyecto funcional que puedes adaptar para casos de uso reales como procesamiento de emails, análisis de imágenes, generación de reportes, y más.
Los Channels en .NET son estructuras de datos thread-safe diseñadas para escenarios productor-consumidor. Piensa en ellos como una "tubería" donde un lado escribe datos y el otro los lee, sin preocuparte por locks o sincronización manual.Nota: El código fuente siempre lo encontrarás en mi github ->
¿Qué son los Channels?
¿Por qué usarlos?
Thread-safe por diseño
Alta performance con bajo overhead
Backpressure integrado (control de flujo)
Ideal para comunicación entre hilos/tareas
Alternativa simple a colas externas (RabbitMQ, Redis) para escenarios internos
Optimizado para async/await (usa ValueTask internamente)
Este proyecto demuestra cómo controlar un Background Job desde una API usando Channels para comunicación bidireccional:
┌──────────────┐ ┌─────────┐ ┌──────────────────┐
│ API Request │ ──────> │ Channel │ ──────> │ Background Job │
│ (Productor) │ │ (Cola) │ │ (Consumidor) │
└──────────────┘ └─────────┘ └──────────────────┘
↑ │
└────── TaskCompletionSource ─────────────────┘
(Respuesta)
Paso 1: Definir el Modelo de Comunicación
Primero, necesitamos estructuras para enviar comandos y recibir respuestas:
public enum CommandType { Start, Stop, GetStatus }
public class JobCommand
{
public CommandType Type { get; set; }
public TaskCompletionSource<JobStatus>? ResponseTask { get; set; }
}
public class JobStatus
{
public bool IsRunning { get; set; }
public int ExecutionCount { get; set; }
public DateTime? LastExecutionTime { get; set; }
public string Message { get; set; } = string.Empty;
}
Paso 2: Crear el Background Service (Consumidor)
public class JobProcessor : BackgroundService
{
private readonly Channel<JobCommand> _channel;
private bool _isJobRunning = false;
private int _executionCount = 0;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("JobProcessor iniciado. Esperando comandos...");
//
// Más eficiente que ReadAllAsync para alta concurrencia
while (await _channel.Reader.WaitToReadAsync(stoppingToken))
{
while (_channel.Reader.TryRead(out var command))
{
try
{
await ProcessCommandAsync(command, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error procesando comando");
//
command.ResponseTask?.TrySetException(ex);
}
}
}
}
private async Task ProcessCommandAsync(JobCommand command, CancellationToken token)
{
switch (command.Type)
{
case CommandType.Start:
_isJobRunning = true;
_ = Task.Run(async () => await RunRecurringJobAsync(token));
// Enviar respuesta al productor
command.ResponseTask?.SetResult(new JobStatus
{
IsRunning = true,
Message = "Job iniciado"
});
break;
case CommandType.Stop:
_isJobRunning = false;
command.ResponseTask?.SetResult(new JobStatus
{
IsRunning = false,
Message = "Job detenido"
});
break;
case CommandType.GetStatus:
command.ResponseTask?.SetResult(new JobStatus
{
IsRunning = _isJobRunning,
ExecutionCount = _executionCount
});
break;
}
}
}
- WaitToReadAsync() + TryRead(): Patrón recomendado por Microsoft para mejor performance
- TrySetException(): Propaga errores al productor de forma segura
- Bucle anidado: Procesa múltiples comandos en batch cuando están disponibles
En Program.cs:
//
builder.Services.AddSingleton(Channel.CreateBounded<JobCommand>(
new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait, // Backpressure automático
SingleWriter = false, // Múltiples endpoints pueden escribir
SingleReader = true // Solo un BackgroundService consume
}));
// Registrar el Background Service
builder.Services.AddHostedService<JobProcessor>();
| Característica | Unbounded | Bounded |
|---|---|---|
| Capacidad | Ilimitada | Limitada (configurable) |
| Memoria | Puede crecer sin control | Controlada |
| Backpressure | No | Sí (automático) |
| Uso recomendado | Productores lentos | Productores rápidos |
| Performance | Writes síncronos | Writes pueden ser async |
Modos de Bounded Channel (FullMode):
- Wait (recomendado): Espera hasta que haya espacio (backpressure)
- DropWrite: Descarta el nuevo elemento
- DropOldest: Descarta el elemento más antiguo
- DropNewest: Descarta el elemento más reciente
Opciones de optimización:
- SingleWriter: true = mejor performance si solo un productor escribe
- SingleReader: true = mejor performance si solo un consumidor lee
- AllowSynchronousContinuations: false (default) para evitar bloqueos
public static IEndpointRouteBuilder MapJobEndpoints(this IEndpointRouteBuilder app)
{
var jobGroup = app.MapGroup("api/job");
jobGroup.MapPost("start", async (Channel<JobCommand> channel) =>
{
// Crear TaskCompletionSource para esperar la respuesta
var tcs = new TaskCompletionSource<JobStatus>();
var command = new JobCommand
{
Type = CommandType.Start,
ResponseTask = tcs
};
//
await channel.Writer.WriteAsync(command);
// Esperar respuesta del consumidor
var status = await tcs.Task;
return Results.Ok(status);
});
// Endpoints similares para stop y status...
return app;
}
- API recibe request → Crea TaskCompletionSource
- Escribe comando en el Channel con WriteAsync() (maneja backpressure)
- Espera que el consumidor complete la Task
- Retorna la respuesta al cliente
Sin Channels:
//
private static readonly object _lock = new();
private static Queue<Command> _queue = new();
public void AddCommand(Command cmd)
{
lock(_lock) { _queue.Enqueue(cmd); }
}
Con Channels:
//
await channel.Writer.WriteAsync(command);
Casos de Uso Reales con Channels
1. Cola de Emails/Notificaciones
// Microsoft recomienda Bounded para prevenir OutOfMemory
var emailChannel = Channel.CreateBounded<EmailMessage>(
new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait
});
// API recibe requests → Encola en Channel → Background envía emails en lotes
2. Procesamiento de Imágenes
Channel<ImageProcessingJob> imageChannel;
// Upload de imágenes → Channel → Worker redimensiona/optimiza en background
3. Logs Centralizados
// DropOldest para logs: si está lleno, descarta los más antiguos
var logChannel = Channel.CreateBounded<LogEntry>(
new BoundedChannelOptions(5000)
{
FullMode = BoundedChannelFullMode.DropOldest
});
4. Rate Limiting / Throttling
var boundedChannel = Channel.CreateBounded<Request>(100);
// Limita a 100 requests concurrentes, el resto espera (backpressure)
5. Event Sourcing Interno
Channel<DomainEvent> eventChannel;
// Eventos de dominio → Channel → Múltiples handlers procesan en paralelo
6. Pipeline de Datos (ejemplo oficial de Microsoft)
// Patrón de processing pipeline con múltiples stages
Channel<RawData> inputChannel;
Channel<ProcessedData> outputChannel;
// Stage 1: Raw → Validated → Stage 2: Validated → Enriched
Ventajas vs Alternativas
| Escenario | Channel | Queue Externo | BlockingCollection |
|---|---|---|---|
| Performance | |||
| Configuración | |||
| Backpressure | |||
| Async/Await | |||
| Escalabilidad | |||
| Memoria |
Usa Channels cuando:
Comunicación dentro de la misma aplicación
Necesitas alta performance y bajo latency
Quieres simplicidad sin infraestructura externa
Trabajas con async/await
Necesitas backpressure automático
Usa Queue externo (RabbitMQ/Azure Service Bus) cuando:
Necesitas comunicación entre múltiples aplicaciones/servicios
Requieres persistencia de mensajes
Necesitas escalabilidad horizontal
Requieres garantías de entrega (at-least-once, exactly-once)
1. *Consumer Pattern *
//
while (await reader.WaitToReadAsync(cancellationToken))
{
while (reader.TryRead(out var item))
{
// Procesar item
}
}
//
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
// Procesar item
}
2. Producer Pattern
//
await writer.WriteAsync(item, cancellationToken);
//
if (!writer.TryWrite(item))
{
// Channel lleno, manejar alternativa
}
3. Signal Completion
//
writer.Complete();
// O con error
writer.Complete(exception);
4. Manejo de Errores
try
{
await ProcessCommand(command);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing");
command.ResponseTask?.TrySetException(ex); //
}
Conclusión
Los Channels representan una evolución significativa en cómo manejamos la comunicación asíncrona en .NET. Lo que tradicionalmente requería código complejo con locks, semáforos y manejo manual de concurrencia, ahora se puede lograr con una API limpia, segura y de alto rendimiento.
¿Por qué usar Channels?
A lo largo de este tutorial, hemos visto cómo Channels ofrece ventajas significativas:
- Simplicidad: No necesitas infraestructura externa para empezar
- Performance: Diseñados desde cero para async/await con ValueTask
- Seguridad: Thread-safe por diseño, sin preocupaciones por race conditions
- Control: Backpressure automático previene sobrecarga del sistema
- Flexibilidad: Configuración granular según tus necesidades específicas
Cuándo NO usar Channels
Impacto en tu arquitectura
Este patrón es especialmente valioso cuando:
- Estás construyendo aplicaciones monolíticas modernas que necesitan procesamiento asíncrono
- Quieres reducir costos de infraestructura eliminando dependencias de message brokers
- Necesitas optimizar performance con procesamiento en memoria
- Buscas simplicidad operacional sin sacrificar escalabilidad vertical
Los 5 principios esenciales:
Usa Bounded Channels con FullMode.Wait para prevenir OutOfMemory
Patrón WaitToReadAsync + TryRead para máximo throughput
Configura SingleWriter/SingleReader cuando sea posible para mejor performance
Siempre llama Complete() para señalizar fin de producción
Propaga errores con TrySetException() para debugging efectivo
Este proyecto es una base sólida que puedes extender según tus necesidades:
- Múltiples consumidores: Escala horizontalmente agregando más BackgroundServices
- Priorización: Implementa múltiples channels con diferentes prioridades
- Monitoring: Integra métricas de performance y observabilidad
- Resiliencia: Agrega retry policies y circuit breakers
Los Channels de .NET demuestran que no siempre necesitas herramientas complejas para resolver problemas complejos. A veces, la solución más elegante es la que viene incorporada en tu framework.
Próximos Pasos
¿Listo para llevar este conocimiento al siguiente nivel? Aquí tienes algunas ideas para expandir este proyecto:
1. Implementar Múltiples Consumidores
// Escalar procesamiento con múltiples workers
builder.Services.AddHostedService<JobProcessor>(); // Worker 1
builder.Services.AddHostedService<JobProcessor>(); // Worker 2
builder.Services.AddHostedService<JobProcessor>(); // Worker 3
Aprenderás: Paralelización, distribución de carga, sincronización entre workers
2. Agregar Sistema de Prioridades
public enum JobPriority { Low, Normal, High, Critical }
// Crear channels separados por prioridad
var highPriorityChannel = Channel.CreateBounded<JobCommand>(50);
var normalPriorityChannel = Channel.CreateBounded<JobCommand>(100);
var lowPriorityChannel = Channel.CreateBounded<JobCommand>(200);
Aprenderás: Gestión de prioridades, routing inteligente, SLA por prioridad
3. Integrar Observabilidad
// Métricas con System.Diagnostics.Metrics
var meter = new Meter("BackgroundJobs");
var jobsProcessed = meter.CreateCounter<long>("jobs_processed");
var queueDepth = meter.CreateObservableGauge("queue_depth",
() => channel.Reader.Count);
Aprenderás: OpenTelemetry, métricas personalizadas, dashboards con Grafana/Prometheus
4. Implementar Persistencia
// Guardar estado en caso de restart
public class PersistentJobProcessor : BackgroundService
{
private readonly IJobStateRepository _repository;
protected override async Task ExecuteAsync(CancellationToken token)
{
// Recuperar jobs pendientes al iniciar
await _repository.RestorePendingJobsAsync(token);
// ... continuar procesamiento normal
}
}
Aprenderás: State management, recovery strategies, durabilidad
5. Agregar Pipeline de Procesamiento
// Pipeline multi-etapa
var rawChannel = Channel.CreateBounded<RawData>(100);
var validatedChannel = Channel.CreateBounded<ValidatedData>(100);
var enrichedChannel = Channel.CreateBounded<EnrichedData>(100);
// Stage 1: Validación
builder.Services.AddHostedService<ValidationProcessor>();
// Stage 2: Enriquecimiento
builder.Services.AddHostedService<EnrichmentProcessor>();
// Stage 3: Persistencia
builder.Services.AddHostedService<PersistenceProcessor>();
Aprenderás: Pipeline pattern, ETL processes, data transformation
6. Implementar Rate Limiting Avanzado
// Rate limiter con ventanas deslizantes
public class RateLimitedJobProcessor : BackgroundService
{
private readonly RateLimiter _rateLimiter;
protected override async Task ExecuteAsync(CancellationToken token)
{
while (await _channel.Reader.WaitToReadAsync(token))
{
using var lease = await _rateLimiter.AcquireAsync(1, token);
if (lease.IsAcquired)
{
// Procesar job
}
}
}
}
Aprenderás: Rate limiting patterns, token bucket, leaky bucket
7. Crear Dashboard de Monitoreo
// SignalR para updates en tiempo real
builder.Services.AddSignalR();
// Notificar estado a clientes conectados
await _hubContext.Clients.All.SendAsync("JobStatusUpdate", new {
QueueDepth = channel.Reader.Count,
ProcessingRate = jobsPerSecond,
ActiveJobs = activeJobCount
});
Aprenderás: Real-time updates, SignalR, live dashboards
8. Añadir Resiliencia
// Polly para retry policies
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(3, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
await retryPolicy.ExecuteAsync(async () =>
{
await ProcessJobAsync(command);
});
Aprenderás: Retry patterns, circuit breakers, fallback strategies
9. Implementar Health Checks
// Health check para el channel
builder.Services.AddHealthChecks()
.AddCheck<ChannelHealthCheck>("channel_health")
.AddCheck<JobProcessorHealthCheck>("job_processor_health");
public class ChannelHealthCheck : IHealthCheck
{
public Task<HealthCheckResult> CheckHealthAsync()
{
var queueDepth = _channel.Reader.Count;
return queueDepth < 1000
? HealthCheckResult.Healthy()
: HealthCheckResult.Degraded("Queue depth high");
}
}
Aprenderás: Health monitoring, readiness/liveness probes, Kubernetes integration
10. Migrar a Arquitectura Distribuida
// Cuando crezcas más allá de un solo servidor
// Considera migrar a:
// - Azure Service Bus para messaging distribuido
// - Azure Queue Storage para simplicidad y bajo costo
// - RabbitMQ para control total
// - Redis Streams para alta performance
// Mantén la misma interfaz, cambia la implementación
public interface IJobQueue
{
Task EnqueueAsync(JobCommand command);
Task<JobCommand> DequeueAsync(CancellationToken token);
}
// Implementación con Channels (actual)
public class InMemoryJobQueue : IJobQueue { }
// Implementación con Azure Service Bus (futuro)
public class ServiceBusJobQueue : IJobQueue { }
Aprenderás: Estrategias de migración, abstracciones, arquitectura evolutiva
Recursos para Continuar Aprendiendo
Documentación Oficial:
Artículos Avanzados:
Источник: