feat: ScheduleWorker 구현 (#112) #113

Merged
seonkyu.kim merged 1 commits from feature/#112-schedule-worker into develop 2026-02-10 07:27:03 +00:00
2 changed files with 143 additions and 0 deletions

View File

@ -70,6 +70,7 @@ public static class DependencyInjection
// Workers // Workers
services.AddHostedService<PushWorker>(); services.AddHostedService<PushWorker>();
services.AddHostedService<ScheduleWorker>();
// Token Store & Email Service // Token Store & Email Service
services.AddMemoryCache(); services.AddMemoryCache();

View File

@ -0,0 +1,142 @@
using System.Globalization;
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using SPMS.Application.DTOs.Push;
using SPMS.Application.Interfaces;
using SPMS.Application.Settings;
using SPMS.Infrastructure.Messaging;
namespace SPMS.Infrastructure.Workers;
public class ScheduleWorker : BackgroundService
{
private const int PollingIntervalSeconds = 30;
private readonly RabbitMQConnection _rabbitConnection;
private readonly RabbitMQSettings _rabbitSettings;
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<ScheduleWorker> _logger;
public ScheduleWorker(
RabbitMQConnection rabbitConnection,
IOptions<RabbitMQSettings> rabbitSettings,
IServiceScopeFactory scopeFactory,
ILogger<ScheduleWorker> logger)
{
_rabbitConnection = rabbitConnection;
_rabbitSettings = rabbitSettings.Value;
_scopeFactory = scopeFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("ScheduleWorker 시작 (폴링 간격: {Interval}초)", PollingIntervalSeconds);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await PollScheduleQueueAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "ScheduleWorker 오류 — 5초 후 재시도");
}
await Task.Delay(TimeSpan.FromSeconds(PollingIntervalSeconds), stoppingToken);
}
_logger.LogInformation("ScheduleWorker 종료");
}
private async Task PollScheduleQueueAsync(CancellationToken ct)
{
await using var channel = await _rabbitConnection.CreateChannelAsync(ct);
await channel.BasicQosAsync(0, 100, false, ct);
int processedCount = 0;
int requeuedCount = 0;
while (!ct.IsCancellationRequested)
{
var result = await channel.BasicGetAsync(_rabbitSettings.ScheduleQueue, false, ct);
if (result == null)
break;
try
{
var body = Encoding.UTF8.GetString(result.Body.ToArray());
var scheduleMessage = JsonSerializer.Deserialize<ScheduleMessageDto>(body);
if (scheduleMessage == null)
{
_logger.LogWarning("Schedule 메시지 역직렬화 실패 — ACK 후 스킵");
await channel.BasicAckAsync(result.DeliveryTag, false, ct);
continue;
}
var scheduledAt = ParseScheduledAt(scheduleMessage.ScheduledAt);
if (scheduledAt == null)
{
_logger.LogWarning("scheduled_at 파싱 실패: {Value} — ACK 후 스킵", scheduleMessage.ScheduledAt);
await channel.BasicAckAsync(result.DeliveryTag, false, ct);
continue;
}
// 아직 예약 시간이 도래하지 않음 → NACK + requeue
if (scheduledAt.Value > DateTime.UtcNow)
{
await channel.BasicNackAsync(result.DeliveryTag, false, true, ct);
requeuedCount++;
continue;
}
// 예약 시간 도래 → push queue로 전달
using var scope = _scopeFactory.CreateScope();
var pushQueueService = scope.ServiceProvider.GetRequiredService<IPushQueueService>();
await pushQueueService.PublishPushMessageAsync(scheduleMessage.PushMessage, ct);
await channel.BasicAckAsync(result.DeliveryTag, false, ct);
processedCount++;
_logger.LogInformation(
"예약 메시지 발송 전달: scheduleId={ScheduleId}, messageId={MessageId}",
scheduleMessage.ScheduleId, scheduleMessage.MessageId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Schedule 메시지 처리 중 오류");
await channel.BasicNackAsync(result.DeliveryTag, false, true, ct);
}
}
if (processedCount > 0 || requeuedCount > 0)
{
_logger.LogInformation(
"ScheduleWorker 폴링 완료: 처리={Processed}, 대기={Requeued}",
processedCount, requeuedCount);
}
}
private static DateTime? ParseScheduledAt(string value)
{
if (DateTime.TryParse(value, CultureInfo.InvariantCulture, DateTimeStyles.AdjustToUniversal, out var dt))
return dt;
if (DateTimeOffset.TryParse(value, CultureInfo.InvariantCulture, DateTimeStyles.None, out var dto))
return dto.UtcDateTime;
return null;
}
}