feat: ScheduleWorker 구현 (#112)
All checks were successful
SPMS_API/pipeline/head This commit looks good
All checks were successful
SPMS_API/pipeline/head This commit looks good
Reviewed-on: https://git.ipstein.myds.me/SPMS/SPMS_API/pulls/113
This commit is contained in:
commit
b5d6c70b16
|
|
@ -70,6 +70,7 @@ public static class DependencyInjection
|
|||
|
||||
// Workers
|
||||
services.AddHostedService<PushWorker>();
|
||||
services.AddHostedService<ScheduleWorker>();
|
||||
|
||||
// Token Store & Email Service
|
||||
services.AddMemoryCache();
|
||||
|
|
|
|||
142
SPMS.Infrastructure/Workers/ScheduleWorker.cs
Normal file
142
SPMS.Infrastructure/Workers/ScheduleWorker.cs
Normal file
|
|
@ -0,0 +1,142 @@
|
|||
using System.Globalization;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using RabbitMQ.Client;
|
||||
using RabbitMQ.Client.Events;
|
||||
using SPMS.Application.DTOs.Push;
|
||||
using SPMS.Application.Interfaces;
|
||||
using SPMS.Application.Settings;
|
||||
using SPMS.Infrastructure.Messaging;
|
||||
|
||||
namespace SPMS.Infrastructure.Workers;
|
||||
|
||||
public class ScheduleWorker : BackgroundService
|
||||
{
|
||||
private const int PollingIntervalSeconds = 30;
|
||||
|
||||
private readonly RabbitMQConnection _rabbitConnection;
|
||||
private readonly RabbitMQSettings _rabbitSettings;
|
||||
private readonly IServiceScopeFactory _scopeFactory;
|
||||
private readonly ILogger<ScheduleWorker> _logger;
|
||||
|
||||
public ScheduleWorker(
|
||||
RabbitMQConnection rabbitConnection,
|
||||
IOptions<RabbitMQSettings> rabbitSettings,
|
||||
IServiceScopeFactory scopeFactory,
|
||||
ILogger<ScheduleWorker> logger)
|
||||
{
|
||||
_rabbitConnection = rabbitConnection;
|
||||
_rabbitSettings = rabbitSettings.Value;
|
||||
_scopeFactory = scopeFactory;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_logger.LogInformation("ScheduleWorker 시작 (폴링 간격: {Interval}초)", PollingIntervalSeconds);
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await PollScheduleQueueAsync(stoppingToken);
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "ScheduleWorker 오류 — 5초 후 재시도");
|
||||
}
|
||||
|
||||
await Task.Delay(TimeSpan.FromSeconds(PollingIntervalSeconds), stoppingToken);
|
||||
}
|
||||
|
||||
_logger.LogInformation("ScheduleWorker 종료");
|
||||
}
|
||||
|
||||
private async Task PollScheduleQueueAsync(CancellationToken ct)
|
||||
{
|
||||
await using var channel = await _rabbitConnection.CreateChannelAsync(ct);
|
||||
await channel.BasicQosAsync(0, 100, false, ct);
|
||||
|
||||
int processedCount = 0;
|
||||
int requeuedCount = 0;
|
||||
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
var result = await channel.BasicGetAsync(_rabbitSettings.ScheduleQueue, false, ct);
|
||||
if (result == null)
|
||||
break;
|
||||
|
||||
try
|
||||
{
|
||||
var body = Encoding.UTF8.GetString(result.Body.ToArray());
|
||||
var scheduleMessage = JsonSerializer.Deserialize<ScheduleMessageDto>(body);
|
||||
|
||||
if (scheduleMessage == null)
|
||||
{
|
||||
_logger.LogWarning("Schedule 메시지 역직렬화 실패 — ACK 후 스킵");
|
||||
await channel.BasicAckAsync(result.DeliveryTag, false, ct);
|
||||
continue;
|
||||
}
|
||||
|
||||
var scheduledAt = ParseScheduledAt(scheduleMessage.ScheduledAt);
|
||||
if (scheduledAt == null)
|
||||
{
|
||||
_logger.LogWarning("scheduled_at 파싱 실패: {Value} — ACK 후 스킵", scheduleMessage.ScheduledAt);
|
||||
await channel.BasicAckAsync(result.DeliveryTag, false, ct);
|
||||
continue;
|
||||
}
|
||||
|
||||
// 아직 예약 시간이 도래하지 않음 → NACK + requeue
|
||||
if (scheduledAt.Value > DateTime.UtcNow)
|
||||
{
|
||||
await channel.BasicNackAsync(result.DeliveryTag, false, true, ct);
|
||||
requeuedCount++;
|
||||
continue;
|
||||
}
|
||||
|
||||
// 예약 시간 도래 → push queue로 전달
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var pushQueueService = scope.ServiceProvider.GetRequiredService<IPushQueueService>();
|
||||
await pushQueueService.PublishPushMessageAsync(scheduleMessage.PushMessage, ct);
|
||||
|
||||
await channel.BasicAckAsync(result.DeliveryTag, false, ct);
|
||||
processedCount++;
|
||||
|
||||
_logger.LogInformation(
|
||||
"예약 메시지 발송 전달: scheduleId={ScheduleId}, messageId={MessageId}",
|
||||
scheduleMessage.ScheduleId, scheduleMessage.MessageId);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Schedule 메시지 처리 중 오류");
|
||||
await channel.BasicNackAsync(result.DeliveryTag, false, true, ct);
|
||||
}
|
||||
}
|
||||
|
||||
if (processedCount > 0 || requeuedCount > 0)
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"ScheduleWorker 폴링 완료: 처리={Processed}, 대기={Requeued}",
|
||||
processedCount, requeuedCount);
|
||||
}
|
||||
}
|
||||
|
||||
private static DateTime? ParseScheduledAt(string value)
|
||||
{
|
||||
if (DateTime.TryParse(value, CultureInfo.InvariantCulture, DateTimeStyles.AdjustToUniversal, out var dt))
|
||||
return dt;
|
||||
|
||||
if (DateTimeOffset.TryParse(value, CultureInfo.InvariantCulture, DateTimeStyles.None, out var dto))
|
||||
return dto.UtcDateTime;
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user