using System.Globalization; using System.Text; using System.Text.Json; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RabbitMQ.Client; using RabbitMQ.Client.Events; using SPMS.Application.DTOs.Push; using SPMS.Application.Interfaces; using SPMS.Application.Settings; using SPMS.Infrastructure.Messaging; namespace SPMS.Infrastructure.Workers; public class ScheduleWorker : BackgroundService { private const int PollingIntervalSeconds = 30; private readonly RabbitMQConnection _rabbitConnection; private readonly RabbitMQSettings _rabbitSettings; private readonly IServiceScopeFactory _scopeFactory; private readonly ILogger _logger; public ScheduleWorker( RabbitMQConnection rabbitConnection, IOptions rabbitSettings, IServiceScopeFactory scopeFactory, ILogger logger) { _rabbitConnection = rabbitConnection; _rabbitSettings = rabbitSettings.Value; _scopeFactory = scopeFactory; _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("ScheduleWorker 시작 (폴링 간격: {Interval}초)", PollingIntervalSeconds); while (!stoppingToken.IsCancellationRequested) { try { await PollScheduleQueueAsync(stoppingToken); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { break; } catch (Exception ex) { _logger.LogError(ex, "ScheduleWorker 오류 — 5초 후 재시도"); } await Task.Delay(TimeSpan.FromSeconds(PollingIntervalSeconds), stoppingToken); } _logger.LogInformation("ScheduleWorker 종료"); } private async Task PollScheduleQueueAsync(CancellationToken ct) { await using var channel = await _rabbitConnection.CreateChannelAsync(ct); await channel.BasicQosAsync(0, 100, false, ct); int processedCount = 0; int requeuedCount = 0; while (!ct.IsCancellationRequested) { var result = await channel.BasicGetAsync(_rabbitSettings.ScheduleQueue, false, ct); if (result == null) break; try { var body = Encoding.UTF8.GetString(result.Body.ToArray()); var scheduleMessage = JsonSerializer.Deserialize(body); if (scheduleMessage == null) { _logger.LogWarning("Schedule 메시지 역직렬화 실패 — ACK 후 스킵"); await channel.BasicAckAsync(result.DeliveryTag, false, ct); continue; } var scheduledAt = ParseScheduledAt(scheduleMessage.ScheduledAt); if (scheduledAt == null) { _logger.LogWarning("scheduled_at 파싱 실패: {Value} — ACK 후 스킵", scheduleMessage.ScheduledAt); await channel.BasicAckAsync(result.DeliveryTag, false, ct); continue; } // 아직 예약 시간이 도래하지 않음 → NACK + requeue if (scheduledAt.Value > DateTime.UtcNow) { await channel.BasicNackAsync(result.DeliveryTag, false, true, ct); requeuedCount++; continue; } // 예약 시간 도래 → push queue로 전달 using var scope = _scopeFactory.CreateScope(); var pushQueueService = scope.ServiceProvider.GetRequiredService(); await pushQueueService.PublishPushMessageAsync(scheduleMessage.PushMessage, ct); await channel.BasicAckAsync(result.DeliveryTag, false, ct); processedCount++; _logger.LogInformation( "예약 메시지 발송 전달: scheduleId={ScheduleId}, messageId={MessageId}", scheduleMessage.ScheduleId, scheduleMessage.MessageId); } catch (Exception ex) { _logger.LogError(ex, "Schedule 메시지 처리 중 오류"); await channel.BasicNackAsync(result.DeliveryTag, false, true, ct); } } if (processedCount > 0 || requeuedCount > 0) { _logger.LogInformation( "ScheduleWorker 폴링 완료: 처리={Processed}, 대기={Requeued}", processedCount, requeuedCount); } } private static DateTime? ParseScheduledAt(string value) { if (DateTime.TryParse(value, CultureInfo.InvariantCulture, DateTimeStyles.AdjustToUniversal, out var dt)) return dt; if (DateTimeOffset.TryParse(value, CultureInfo.InvariantCulture, DateTimeStyles.None, out var dto)) return dto.UtcDateTime; return null; } }