• Что бы вступить в ряды "Принятый кодер" Вам нужно:
    Написать 10 полезных сообщений или тем и Получить 10 симпатий.
    Для того кто не хочет терять время,может пожертвовать средства для поддержки сервеса, и вступить в ряды VIP на месяц, дополнительная информация в лс.

  • Пользаватели которые будут спамить, уходят в бан без предупреждения. Спам сообщения определяется администрацией и модератором.

  • Гость, Что бы Вы хотели увидеть на нашем Форуме? Изложить свои идеи и пожелания по улучшению форума Вы можете поделиться с нами здесь. ----> Перейдите сюда
  • Все пользователи не прошедшие проверку электронной почты будут заблокированы. Все вопросы с разблокировкой обращайтесь по адресу электронной почте : info@guardianelinks.com . Не пришло сообщение о проверке или о сбросе также сообщите нам.

Sistema de Control de Jobs en Tiempo Real con Channels y Background Services en .NET

Sascha Оффлайн

Sascha

Заместитель Администратора
Команда форума
Администратор
Регистрация
9 Май 2015
Сообщения
1,605
Баллы
155

Introducción


En el desarrollo moderno de aplicaciones, es común necesitar ejecutar procesos en segundo plano que se comuniquen con nuestra API de forma eficiente y segura. Tradicionalmente, esto se resolvía con implementaciones complejas usando locks, colas manuales o infraestructura externa como RabbitMQ. Sin embargo, .NET ofrece una solución simple pero elegante: System.Threading.Channels.

En este artículo, exploraremos cómo construir un sistema de control de jobs en tiempo real utilizando:

  • 🔧 Channels para comunicación thread-safe entre componentes
  • 🔄 Background Services para tareas recurrentes
  • 🚀 Minimal APIs para endpoints modernos y limpios
  • ⚡ TaskCompletionSource para comunicación bidireccional

Al finalizar, tendrás un proyecto funcional que puedes adaptar para casos de uso reales como procesamiento de emails, análisis de imágenes, generación de reportes, y más.

Nota: El código fuente siempre lo encontrarás en mi github ->

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



¿Qué son los Channels?
Los Channels en .NET son estructuras de datos thread-safe diseñadas para escenarios productor-consumidor. Piensa en ellos como una "tubería" donde un lado escribe datos y el otro los lee, sin preocuparte por locks o sincronización manual.

¿Por qué usarlos?

  • ✅ Thread-safe por diseño
  • ✅ Alta performance con bajo overhead
  • ✅ Backpressure integrado (control de flujo)
  • ✅ Ideal para comunicación entre hilos/tareas
  • ✅ Alternativa simple a colas externas (RabbitMQ, Redis) para escenarios internos
  • ✅ Optimizado para async/await (usa ValueTask internamente)
Arquitectura del Proyecto


Este proyecto demuestra cómo controlar un Background Job desde una API usando Channels para comunicación bidireccional:


┌──────────────┐ ┌─────────┐ ┌──────────────────┐
│ API Request │ ──────> │ Channel │ ──────> │ Background Job │
│ (Productor) │ │ (Cola) │ │ (Consumidor) │
└──────────────┘ └─────────┘ └──────────────────┘
↑ │
└────── TaskCompletionSource ─────────────────┘
(Respuesta)



Paso 1: Definir el Modelo de Comunicación


Primero, necesitamos estructuras para enviar comandos y recibir respuestas:


public enum CommandType { Start, Stop, GetStatus }

public class JobCommand
{
public CommandType Type { get; set; }
public TaskCompletionSource<JobStatus>? ResponseTask { get; set; }
}

public class JobStatus
{
public bool IsRunning { get; set; }
public int ExecutionCount { get; set; }
public DateTime? LastExecutionTime { get; set; }
public string Message { get; set; } = string.Empty;
}




💡 Clave: TaskCompletionSource nos permite crear una Task que completaremos manualmente cuando tengamos la respuesta, haciendo posible la comunicación bidireccional.

Paso 2: Crear el Background Service (Consumidor)


public class JobProcessor : BackgroundService
{
private readonly Channel<JobCommand> _channel;
private bool _isJobRunning = false;
private int _executionCount = 0;

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("JobProcessor iniciado. Esperando comandos...");

// ✅ Patrón recomendado por Microsoft: WaitToReadAsync + TryRead
// Más eficiente que ReadAllAsync para alta concurrencia
while (await _channel.Reader.WaitToReadAsync(stoppingToken))
{
while (_channel.Reader.TryRead(out var command))
{
try
{
await ProcessCommandAsync(command, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error procesando comando");
// ✅ Notificar errores al productor
command.ResponseTask?.TrySetException(ex);
}
}
}
}

private async Task ProcessCommandAsync(JobCommand command, CancellationToken token)
{
switch (command.Type)
{
case CommandType.Start:
_isJobRunning = true;
_ = Task.Run(async () => await RunRecurringJobAsync(token));

// Enviar respuesta al productor
command.ResponseTask?.SetResult(new JobStatus
{
IsRunning = true,
Message = "Job iniciado"
});
break;

case CommandType.Stop:
_isJobRunning = false;
command.ResponseTask?.SetResult(new JobStatus
{
IsRunning = false,
Message = "Job detenido"
});
break;

case CommandType.GetStatus:
command.ResponseTask?.SetResult(new JobStatus
{
IsRunning = _isJobRunning,
ExecutionCount = _executionCount
});
break;
}
}
}




💡 Explicación de las mejoras:

  • WaitToReadAsync() + TryRead(): Patrón recomendado por Microsoft para mejor performance
  • TrySetException(): Propaga errores al productor de forma segura
  • Bucle anidado: Procesa múltiples comandos en batch cuando están disponibles
Paso 3: Configurar el Channel y el Servicio


En Program.cs:


// ✅ Bounded Channel con opciones optimizadas (recomendado por Microsoft)
builder.Services.AddSingleton(Channel.CreateBounded<JobCommand>(
new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait, // Backpressure automático
SingleWriter = false, // Múltiples endpoints pueden escribir
SingleReader = true // Solo un BackgroundService consume
}));

// Registrar el Background Service
builder.Services.AddHostedService<JobProcessor>();




💡 ¿Bounded vs Unbounded?

CaracterísticaUnboundedBounded
CapacidadIlimitadaLimitada (configurable)
MemoriaPuede crecer sin controlControlada
BackpressureNoSí (automático)
Uso recomendadoProductores lentosProductores rápidos
PerformanceWrites síncronosWrites pueden ser async

Modos de Bounded Channel (FullMode):

  • Wait (recomendado): Espera hasta que haya espacio (backpressure)
  • DropWrite: Descarta el nuevo elemento
  • DropOldest: Descarta el elemento más antiguo
  • DropNewest: Descarta el elemento más reciente

Opciones de optimización:

  • SingleWriter: true = mejor performance si solo un productor escribe
  • SingleReader: true = mejor performance si solo un consumidor lee
  • AllowSynchronousContinuations: false (default) para evitar bloqueos
Paso 4: Crear los Endpoints (Productores)


public static IEndpointRouteBuilder MapJobEndpoints(this IEndpointRouteBuilder app)
{
var jobGroup = app.MapGroup("api/job");

jobGroup.MapPost("start", async (Channel<JobCommand> channel) =>
{
// Crear TaskCompletionSource para esperar la respuesta
var tcs = new TaskCompletionSource<JobStatus>();

var command = new JobCommand
{
Type = CommandType.Start,
ResponseTask = tcs
};

// ✅ WriteAsync maneja backpressure automáticamente
await channel.Writer.WriteAsync(command);

// Esperar respuesta del consumidor
var status = await tcs.Task;
return Results.Ok(status);
});

// Endpoints similares para stop y status...

return app;
}




💡 Flujo:

  1. API recibe request → Crea TaskCompletionSource
  2. Escribe comando en el Channel con WriteAsync() (maneja backpressure)
  3. Espera que el consumidor complete la Task
  4. Retorna la respuesta al cliente
¿Por qué este patrón?

Sin Channels:


// ❌ Locks manuales, propenso a errores
private static readonly object _lock = new();
private static Queue<Command> _queue = new();

public void AddCommand(Command cmd)
{
lock(_lock) { _queue.Enqueue(cmd); }
}



Con Channels:


// ✅ Thread-safe automático, limpio, con backpressure
await channel.Writer.WriteAsync(command);



Casos de Uso Reales con Channels

1. Cola de Emails/Notificaciones


// Microsoft recomienda Bounded para prevenir OutOfMemory
var emailChannel = Channel.CreateBounded<EmailMessage>(
new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait
});
// API recibe requests → Encola en Channel → Background envía emails en lotes



2. Procesamiento de Imágenes


Channel<ImageProcessingJob> imageChannel;
// Upload de imágenes → Channel → Worker redimensiona/optimiza en background



3. Logs Centralizados


// DropOldest para logs: si está lleno, descarta los más antiguos
var logChannel = Channel.CreateBounded<LogEntry>(
new BoundedChannelOptions(5000)
{
FullMode = BoundedChannelFullMode.DropOldest
});



4. Rate Limiting / Throttling


var boundedChannel = Channel.CreateBounded<Request>(100);
// Limita a 100 requests concurrentes, el resto espera (backpressure)



5. Event Sourcing Interno


Channel<DomainEvent> eventChannel;
// Eventos de dominio → Channel → Múltiples handlers procesan en paralelo



6. Pipeline de Datos (ejemplo oficial de Microsoft)


// Patrón de processing pipeline con múltiples stages
Channel<RawData> inputChannel;
Channel<ProcessedData> outputChannel;
// Stage 1: Raw → Validated → Stage 2: Validated → Enriched



Ventajas vs Alternativas

EscenarioChannelQueue ExternoBlockingCollection
Performance⚡ Muy alta🐢 Red overhead✅ Alta
Configuración✅ Cero❌ Infraestructura✅ Mínima
Backpressure✅ Integrado⚠ Manual⚠ Manual
Async/Await✅ Nativo (ValueTask)⚠ Depende❌ Bloquea threads
Escalabilidad🏠 Single-app🌍 Multi-app🏠 Single-app
Memoria✅ Bounded options⚠ Depende⚠ Manual

Usa Channels cuando:

  • ✅ Comunicación dentro de la misma aplicación
  • ✅ Necesitas alta performance y bajo latency
  • ✅ Quieres simplicidad sin infraestructura externa
  • ✅ Trabajas con async/await
  • ✅ Necesitas backpressure automático

Usa Queue externo (RabbitMQ/Azure Service Bus) cuando:

  • ❌ Necesitas comunicación entre múltiples aplicaciones/servicios
  • ❌ Requieres persistencia de mensajes
  • ❌ Necesitas escalabilidad horizontal
  • ❌ Requieres garantías de entrega (at-least-once, exactly-once)
⚡ Mejores Prácticas de Microsoft Learn

1. *Consumer Pattern *


// ✅ WaitToReadAsync + TryRead (más eficiente)
while (await reader.WaitToReadAsync(cancellationToken))
{
while (reader.TryRead(out var item))
{
// Procesar item
}
}

// ❌ ReadAllAsync (menos eficiente para alta carga)
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
// Procesar item
}



2. Producer Pattern


// ✅ WriteAsync para backpressure automático
await writer.WriteAsync(item, cancellationToken);

// ⚠ TryWrite solo para unbounded o cuando no quieres esperar
if (!writer.TryWrite(item))
{
// Channel lleno, manejar alternativa
}



3. Signal Completion


// ✅ Siempre señalar cuando terminas de escribir (esto cierra el canal)
writer.Complete();

// O con error
writer.Complete(exception);



4. Manejo de Errores


try
{
await ProcessCommand(command);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing");
command.ResponseTask?.TrySetException(ex); // ✅ Propagar al productor
}



Conclusión


Los Channels representan una evolución significativa en cómo manejamos la comunicación asíncrona en .NET. Lo que tradicionalmente requería código complejo con locks, semáforos y manejo manual de concurrencia, ahora se puede lograr con una API limpia, segura y de alto rendimiento.

¿Por qué usar Channels?


A lo largo de este tutorial, hemos visto cómo Channels ofrece ventajas significativas:

  1. Simplicidad: No necesitas infraestructura externa para empezar
  2. Performance: Diseñados desde cero para async/await con ValueTask
  3. Seguridad: Thread-safe por diseño, sin preocupaciones por race conditions
  4. Control: Backpressure automático previene sobrecarga del sistema
  5. Flexibilidad: Configuración granular según tus necesidades específicas
Cuándo usar Channels


✅ Comunicación intra-proceso: Coordinación entre componentes de la misma aplicación

✅ Alta frecuencia: Miles de mensajes por segundo con mínimo overhead

✅ Backpressure crítico: Necesitas controlar la velocidad de producción/consumo

✅ Simplicidad operacional: Quieres evitar dependencias de infraestructura externa

✅ Desarrollo rápido: Prototipado y desarrollo local sin complicaciones

Cuándo NO usar Channels


❌ Comunicación inter-proceso: Si necesitas comunicar múltiples aplicaciones/servicios

❌ Persistencia requerida: Si los mensajes deben sobrevivir reinicios

❌ Distribución geográfica: Múltiples datacenters o regiones

❌ Garantías de entrega avanzadas: Exactly-once, dead letter queues, retries configurables

❌ Monitoreo centralizado: Necesitas observabilidad empresarial de mensajería

Impacto en tu arquitectura


Este patrón es especialmente valioso cuando:

  • Estás construyendo aplicaciones monolíticas modernas que necesitan procesamiento asíncrono
  • Quieres reducir costos de infraestructura eliminando dependencias de message brokers
  • Necesitas optimizar performance con procesamiento en memoria
  • Buscas simplicidad operacional sin sacrificar escalabilidad vertical
Key Takeaways


Los 5 principios esenciales:

  1. ✅ Usa Bounded Channels con FullMode.Wait para prevenir OutOfMemory
  2. ✅ Patrón WaitToReadAsync + TryRead para máximo throughput
  3. ✅ Configura SingleWriter/SingleReader cuando sea posible para mejor performance
  4. ✅ Siempre llama Complete() para señalizar fin de producción
  5. ✅ Propaga errores con TrySetException() para debugging efectivo
Evolución del patrón


Este proyecto es una base sólida que puedes extender según tus necesidades:

  • Múltiples consumidores: Escala horizontalmente agregando más BackgroundServices
  • Priorización: Implementa múltiples channels con diferentes prioridades
  • Monitoring: Integra métricas de performance y observabilidad
  • Resiliencia: Agrega retry policies y circuit breakers

Los Channels de .NET demuestran que no siempre necesitas herramientas complejas para resolver problemas complejos. A veces, la solución más elegante es la que viene incorporada en tu framework.

Próximos Pasos


¿Listo para llevar este conocimiento al siguiente nivel? Aquí tienes algunas ideas para expandir este proyecto:

1. Implementar Múltiples Consumidores


// Escalar procesamiento con múltiples workers
builder.Services.AddHostedService<JobProcessor>(); // Worker 1
builder.Services.AddHostedService<JobProcessor>(); // Worker 2
builder.Services.AddHostedService<JobProcessor>(); // Worker 3




Aprenderás: Paralelización, distribución de carga, sincronización entre workers

2. Agregar Sistema de Prioridades


public enum JobPriority { Low, Normal, High, Critical }

// Crear channels separados por prioridad
var highPriorityChannel = Channel.CreateBounded<JobCommand>(50);
var normalPriorityChannel = Channel.CreateBounded<JobCommand>(100);
var lowPriorityChannel = Channel.CreateBounded<JobCommand>(200);




Aprenderás: Gestión de prioridades, routing inteligente, SLA por prioridad

3. Integrar Observabilidad


// Métricas con System.Diagnostics.Metrics
var meter = new Meter("BackgroundJobs");
var jobsProcessed = meter.CreateCounter<long>("jobs_processed");
var queueDepth = meter.CreateObservableGauge("queue_depth",
() => channel.Reader.Count);




Aprenderás: OpenTelemetry, métricas personalizadas, dashboards con Grafana/Prometheus

4. Implementar Persistencia


// Guardar estado en caso de restart
public class PersistentJobProcessor : BackgroundService
{
private readonly IJobStateRepository _repository;

protected override async Task ExecuteAsync(CancellationToken token)
{
// Recuperar jobs pendientes al iniciar
await _repository.RestorePendingJobsAsync(token);
// ... continuar procesamiento normal
}
}




Aprenderás: State management, recovery strategies, durabilidad

5. Agregar Pipeline de Procesamiento


// Pipeline multi-etapa
var rawChannel = Channel.CreateBounded<RawData>(100);
var validatedChannel = Channel.CreateBounded<ValidatedData>(100);
var enrichedChannel = Channel.CreateBounded<EnrichedData>(100);

// Stage 1: Validación
builder.Services.AddHostedService<ValidationProcessor>();
// Stage 2: Enriquecimiento
builder.Services.AddHostedService<EnrichmentProcessor>();
// Stage 3: Persistencia
builder.Services.AddHostedService<PersistenceProcessor>();




Aprenderás: Pipeline pattern, ETL processes, data transformation

6. Implementar Rate Limiting Avanzado


// Rate limiter con ventanas deslizantes
public class RateLimitedJobProcessor : BackgroundService
{
private readonly RateLimiter _rateLimiter;

protected override async Task ExecuteAsync(CancellationToken token)
{
while (await _channel.Reader.WaitToReadAsync(token))
{
using var lease = await _rateLimiter.AcquireAsync(1, token);
if (lease.IsAcquired)
{
// Procesar job
}
}
}
}




Aprenderás: Rate limiting patterns, token bucket, leaky bucket

7. Crear Dashboard de Monitoreo


// SignalR para updates en tiempo real
builder.Services.AddSignalR();

// Notificar estado a clientes conectados
await _hubContext.Clients.All.SendAsync("JobStatusUpdate", new {
QueueDepth = channel.Reader.Count,
ProcessingRate = jobsPerSecond,
ActiveJobs = activeJobCount
});




Aprenderás: Real-time updates, SignalR, live dashboards

8. Añadir Resiliencia


// Polly para retry policies
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(3, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));

await retryPolicy.ExecuteAsync(async () =>
{
await ProcessJobAsync(command);
});




Aprenderás: Retry patterns, circuit breakers, fallback strategies

9. Implementar Health Checks


// Health check para el channel
builder.Services.AddHealthChecks()
.AddCheck<ChannelHealthCheck>("channel_health")
.AddCheck<JobProcessorHealthCheck>("job_processor_health");

public class ChannelHealthCheck : IHealthCheck
{
public Task<HealthCheckResult> CheckHealthAsync()
{
var queueDepth = _channel.Reader.Count;
return queueDepth < 1000
? HealthCheckResult.Healthy()
: HealthCheckResult.Degraded("Queue depth high");
}
}




Aprenderás: Health monitoring, readiness/liveness probes, Kubernetes integration

10. Migrar a Arquitectura Distribuida


// Cuando crezcas más allá de un solo servidor
// Considera migrar a:
// - Azure Service Bus para messaging distribuido
// - Azure Queue Storage para simplicidad y bajo costo
// - RabbitMQ para control total
// - Redis Streams para alta performance

// Mantén la misma interfaz, cambia la implementación
public interface IJobQueue
{
Task EnqueueAsync(JobCommand command);
Task<JobCommand> DequeueAsync(CancellationToken token);
}

// Implementación con Channels (actual)
public class InMemoryJobQueue : IJobQueue { }

// Implementación con Azure Service Bus (futuro)
public class ServiceBusJobQueue : IJobQueue { }




Aprenderás: Estrategias de migración, abstracciones, arquitectura evolutiva

Recursos para Continuar Aprendiendo


Documentación Oficial:

Artículos Avanzados:




Источник:

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

 
Вверх Снизу