From 814d2082cbd49d250380d3f8055ad137d5031e28 Mon Sep 17 00:00:00 2001 From: SEAN Date: Tue, 10 Feb 2026 16:25:25 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20ScheduleWorker=20=EA=B5=AC=ED=98=84=20(?= =?UTF-8?q?#112)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- SPMS.Infrastructure/DependencyInjection.cs | 1 + SPMS.Infrastructure/Workers/ScheduleWorker.cs | 142 ++++++++++++++++++ 2 files changed, 143 insertions(+) create mode 100644 SPMS.Infrastructure/Workers/ScheduleWorker.cs diff --git a/SPMS.Infrastructure/DependencyInjection.cs b/SPMS.Infrastructure/DependencyInjection.cs index 9d68f22..495f11c 100644 --- a/SPMS.Infrastructure/DependencyInjection.cs +++ b/SPMS.Infrastructure/DependencyInjection.cs @@ -70,6 +70,7 @@ public static class DependencyInjection // Workers services.AddHostedService(); + services.AddHostedService(); // Token Store & Email Service services.AddMemoryCache(); diff --git a/SPMS.Infrastructure/Workers/ScheduleWorker.cs b/SPMS.Infrastructure/Workers/ScheduleWorker.cs new file mode 100644 index 0000000..03fa280 --- /dev/null +++ b/SPMS.Infrastructure/Workers/ScheduleWorker.cs @@ -0,0 +1,142 @@ +using System.Globalization; +using System.Text; +using System.Text.Json; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using SPMS.Application.DTOs.Push; +using SPMS.Application.Interfaces; +using SPMS.Application.Settings; +using SPMS.Infrastructure.Messaging; + +namespace SPMS.Infrastructure.Workers; + +public class ScheduleWorker : BackgroundService +{ + private const int PollingIntervalSeconds = 30; + + private readonly RabbitMQConnection _rabbitConnection; + private readonly RabbitMQSettings _rabbitSettings; + private readonly IServiceScopeFactory _scopeFactory; + private readonly ILogger _logger; + + public ScheduleWorker( + RabbitMQConnection rabbitConnection, + IOptions rabbitSettings, + IServiceScopeFactory scopeFactory, + ILogger logger) + { + _rabbitConnection = rabbitConnection; + _rabbitSettings = rabbitSettings.Value; + _scopeFactory = scopeFactory; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("ScheduleWorker 시작 (폴링 간격: {Interval}초)", PollingIntervalSeconds); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + await PollScheduleQueueAsync(stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + _logger.LogError(ex, "ScheduleWorker 오류 — 5초 후 재시도"); + } + + await Task.Delay(TimeSpan.FromSeconds(PollingIntervalSeconds), stoppingToken); + } + + _logger.LogInformation("ScheduleWorker 종료"); + } + + private async Task PollScheduleQueueAsync(CancellationToken ct) + { + await using var channel = await _rabbitConnection.CreateChannelAsync(ct); + await channel.BasicQosAsync(0, 100, false, ct); + + int processedCount = 0; + int requeuedCount = 0; + + while (!ct.IsCancellationRequested) + { + var result = await channel.BasicGetAsync(_rabbitSettings.ScheduleQueue, false, ct); + if (result == null) + break; + + try + { + var body = Encoding.UTF8.GetString(result.Body.ToArray()); + var scheduleMessage = JsonSerializer.Deserialize(body); + + if (scheduleMessage == null) + { + _logger.LogWarning("Schedule 메시지 역직렬화 실패 — ACK 후 스킵"); + await channel.BasicAckAsync(result.DeliveryTag, false, ct); + continue; + } + + var scheduledAt = ParseScheduledAt(scheduleMessage.ScheduledAt); + if (scheduledAt == null) + { + _logger.LogWarning("scheduled_at 파싱 실패: {Value} — ACK 후 스킵", scheduleMessage.ScheduledAt); + await channel.BasicAckAsync(result.DeliveryTag, false, ct); + continue; + } + + // 아직 예약 시간이 도래하지 않음 → NACK + requeue + if (scheduledAt.Value > DateTime.UtcNow) + { + await channel.BasicNackAsync(result.DeliveryTag, false, true, ct); + requeuedCount++; + continue; + } + + // 예약 시간 도래 → push queue로 전달 + using var scope = _scopeFactory.CreateScope(); + var pushQueueService = scope.ServiceProvider.GetRequiredService(); + await pushQueueService.PublishPushMessageAsync(scheduleMessage.PushMessage, ct); + + await channel.BasicAckAsync(result.DeliveryTag, false, ct); + processedCount++; + + _logger.LogInformation( + "예약 메시지 발송 전달: scheduleId={ScheduleId}, messageId={MessageId}", + scheduleMessage.ScheduleId, scheduleMessage.MessageId); + } + catch (Exception ex) + { + _logger.LogError(ex, "Schedule 메시지 처리 중 오류"); + await channel.BasicNackAsync(result.DeliveryTag, false, true, ct); + } + } + + if (processedCount > 0 || requeuedCount > 0) + { + _logger.LogInformation( + "ScheduleWorker 폴링 완료: 처리={Processed}, 대기={Requeued}", + processedCount, requeuedCount); + } + } + + private static DateTime? ParseScheduledAt(string value) + { + if (DateTime.TryParse(value, CultureInfo.InvariantCulture, DateTimeStyles.AdjustToUniversal, out var dt)) + return dt; + + if (DateTimeOffset.TryParse(value, CultureInfo.InvariantCulture, DateTimeStyles.None, out var dto)) + return dto.UtcDateTime; + + return null; + } +}