feat: PushWorker 구현 (#110) #111

Merged
seonkyu.kim merged 1 commits from feature/#110-push-worker into develop 2026-02-10 07:18:11 +00:00
2 changed files with 394 additions and 0 deletions
Showing only changes of commit f36f8f47a9 - Show all commits

View File

@ -12,6 +12,7 @@ using SPMS.Infrastructure.Push;
using SPMS.Infrastructure.Persistence.Repositories; using SPMS.Infrastructure.Persistence.Repositories;
using SPMS.Infrastructure.Security; using SPMS.Infrastructure.Security;
using SPMS.Infrastructure.Services; using SPMS.Infrastructure.Services;
using SPMS.Infrastructure.Workers;
namespace SPMS.Infrastructure; namespace SPMS.Infrastructure;
@ -67,6 +68,9 @@ public static class DependencyInjection
}); });
services.AddSingleton<IApnsSender, ApnsSender>(); services.AddSingleton<IApnsSender, ApnsSender>();
// Workers
services.AddHostedService<PushWorker>();
// Token Store & Email Service // Token Store & Email Service
services.AddMemoryCache(); services.AddMemoryCache();
services.AddSingleton<ITokenStore, InMemoryTokenStore>(); services.AddSingleton<ITokenStore, InMemoryTokenStore>();

View File

@ -0,0 +1,390 @@
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using SPMS.Application.DTOs.Push;
using SPMS.Application.Interfaces;
using SPMS.Application.Settings;
using SPMS.Domain.Entities;
using SPMS.Domain.Enums;
using SPMS.Domain.Interfaces;
using SPMS.Infrastructure.Messaging;
namespace SPMS.Infrastructure.Workers;
public class PushWorker : BackgroundService
{
private const int MaxRetryCount = 3;
private readonly RabbitMQConnection _rabbitConnection;
private readonly RabbitMQSettings _rabbitSettings;
private readonly IServiceScopeFactory _scopeFactory;
private readonly IDuplicateChecker _duplicateChecker;
private readonly IFcmSender _fcmSender;
private readonly IApnsSender _apnsSender;
private readonly ILogger<PushWorker> _logger;
public PushWorker(
RabbitMQConnection rabbitConnection,
IOptions<RabbitMQSettings> rabbitSettings,
IServiceScopeFactory scopeFactory,
IDuplicateChecker duplicateChecker,
IFcmSender fcmSender,
IApnsSender apnsSender,
ILogger<PushWorker> logger)
{
_rabbitConnection = rabbitConnection;
_rabbitSettings = rabbitSettings.Value;
_scopeFactory = scopeFactory;
_duplicateChecker = duplicateChecker;
_fcmSender = fcmSender;
_apnsSender = apnsSender;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("PushWorker 시작");
while (!stoppingToken.IsCancellationRequested)
{
try
{
await ConsumeAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "PushWorker 오류 — 5초 후 재시도");
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
_logger.LogInformation("PushWorker 종료");
}
private async Task ConsumeAsync(CancellationToken stoppingToken)
{
await using var channel = await _rabbitConnection.CreateChannelAsync(stoppingToken);
await channel.BasicQosAsync(0, _rabbitSettings.PrefetchCount, false, stoppingToken);
var tcs = new TaskCompletionSource();
stoppingToken.Register(() => tcs.TrySetResult());
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (_, ea) =>
{
try
{
await ProcessMessageAsync(channel, ea, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "메시지 처리 중 예외 발생");
await HandleRetryAsync(channel, ea, stoppingToken);
}
};
await channel.BasicConsumeAsync(
queue: _rabbitSettings.PushQueue,
autoAck: false,
consumer: consumer,
cancellationToken: stoppingToken);
_logger.LogInformation("PushWorker Consumer 등록 완료: queue={Queue}", _rabbitSettings.PushQueue);
await tcs.Task;
}
private async Task ProcessMessageAsync(IChannel channel, BasicDeliverEventArgs ea, CancellationToken ct)
{
var body = Encoding.UTF8.GetString(ea.Body.ToArray());
var pushMessage = JsonSerializer.Deserialize<PushMessageDto>(body);
if (pushMessage == null)
{
_logger.LogWarning("메시지 역직렬화 실패 — ACK 후 스킵");
await channel.BasicAckAsync(ea.DeliveryTag, false, ct);
return;
}
// [1] Redis 중복 체크
if (await _duplicateChecker.IsDuplicateAsync(pushMessage.RequestId, ct))
{
_logger.LogInformation("중복 메시지 스킵: requestId={RequestId}", pushMessage.RequestId);
await channel.BasicAckAsync(ea.DeliveryTag, false, ct);
return;
}
using var scope = _scopeFactory.CreateScope();
var serviceRepo = scope.ServiceProvider.GetRequiredService<IServiceRepository>();
var deviceRepo = scope.ServiceProvider.GetRequiredService<IDeviceRepository>();
var unitOfWork = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
// 서비스 조회
var service = await serviceRepo.GetByIdAsync(pushMessage.ServiceId);
if (service == null)
{
_logger.LogWarning("서비스 없음: serviceId={ServiceId}", pushMessage.ServiceId);
await channel.BasicAckAsync(ea.DeliveryTag, false, ct);
return;
}
// [2] send_type별 대상 Device 조회
var devices = await ResolveDevicesAsync(deviceRepo, pushMessage, ct);
if (devices.Count == 0)
{
_logger.LogInformation("발송 대상 없음: requestId={RequestId}", pushMessage.RequestId);
await channel.BasicAckAsync(ea.DeliveryTag, false, ct);
return;
}
// [3] 플랫폼별 분류
var iosDevices = devices.Where(d => d.Platform == Platform.iOS).ToList();
var androidDevices = devices.Where(d => d.Platform == Platform.Android).ToList();
var customData = pushMessage.CustomData?
.ToDictionary(kv => kv.Key, kv => kv.Value.ToString() ?? string.Empty);
if (!string.IsNullOrEmpty(pushMessage.LinkUrl))
(customData ??= new Dictionary<string, string>())["link_url"] = pushMessage.LinkUrl;
// [4] 배치 발송
var allResults = new List<(Device Device, PushResultDto Result)>();
if (androidDevices.Count > 0 && !string.IsNullOrEmpty(service.FcmCredentials))
{
var fcmResults = await SendFcmBatchAsync(
service.FcmCredentials, androidDevices, pushMessage, customData, ct);
allResults.AddRange(fcmResults);
}
if (iosDevices.Count > 0 &&
!string.IsNullOrEmpty(service.ApnsPrivateKey) &&
!string.IsNullOrEmpty(service.ApnsKeyId) &&
!string.IsNullOrEmpty(service.ApnsTeamId) &&
!string.IsNullOrEmpty(service.ApnsBundleId))
{
var apnsResults = await SendApnsBatchAsync(
service, iosDevices, pushMessage, customData, ct);
allResults.AddRange(apnsResults);
}
// [5] 결과 처리: PushSendLog INSERT + Device 상태 변경
var pushSendLogRepo = scope.ServiceProvider.GetRequiredService<IRepository<PushSendLog>>();
var dailyStatRepo = scope.ServiceProvider.GetRequiredService<IRepository<DailyStat>>();
int successCount = 0, failCount = 0;
foreach (var (device, result) in allResults)
{
var log = new PushSendLog
{
ServiceId = pushMessage.ServiceId,
MessageId = long.TryParse(pushMessage.MessageId, out var mid) ? mid : 0,
DeviceId = device.Id,
Status = result.IsSuccess ? PushResult.Success : PushResult.Failed,
FailReason = result.ErrorMessage,
SentAt = DateTime.UtcNow
};
await pushSendLogRepo.AddAsync(log);
if (result.IsSuccess)
successCount++;
else
failCount++;
// 영구 오류: 디바이스 비활성화
if (result.ShouldRemoveDevice)
{
device.IsActive = false;
device.UpdatedAt = DateTime.UtcNow;
deviceRepo.Update(device);
}
}
// [6] DailyStat 증분 업데이트
var today = DateOnly.FromDateTime(DateTime.UtcNow);
var stats = await dailyStatRepo.FindAsync(
s => s.ServiceId == pushMessage.ServiceId && s.StatDate == today);
var stat = stats.FirstOrDefault();
if (stat != null)
{
stat.SentCnt += allResults.Count;
stat.SuccessCnt += successCount;
stat.FailCnt += failCount;
dailyStatRepo.Update(stat);
}
else
{
await dailyStatRepo.AddAsync(new DailyStat
{
ServiceId = pushMessage.ServiceId,
StatDate = today,
SentCnt = allResults.Count,
SuccessCnt = successCount,
FailCnt = failCount,
CreatedAt = DateTime.UtcNow
});
}
await unitOfWork.SaveChangesAsync(ct);
_logger.LogInformation(
"푸시 발송 완료: requestId={RequestId}, 성공={Success}, 실패={Fail}, 총={Total}",
pushMessage.RequestId, successCount, failCount, allResults.Count);
// [8] ACK
await channel.BasicAckAsync(ea.DeliveryTag, false, ct);
}
private async Task<List<Device>> ResolveDevicesAsync(
IDeviceRepository deviceRepo, PushMessageDto message, CancellationToken ct)
{
var sendType = message.SendType.ToLowerInvariant();
switch (sendType)
{
case "single":
{
var deviceIds = ParseDeviceIds(message.Target);
if (deviceIds.Count == 0) return [];
var devices = new List<Device>();
foreach (var id in deviceIds)
{
var device = await deviceRepo.GetByIdAndServiceAsync(id, message.ServiceId);
if (device is { IsActive: true, PushAgreed: true })
devices.Add(device);
}
return devices;
}
case "group":
{
var tags = ParseTags(message.Target);
if (tags.Count == 0) return [];
var allDevices = await deviceRepo.FindAsync(
d => d.ServiceId == message.ServiceId && d.IsActive && d.PushAgreed);
return allDevices
.Where(d => !string.IsNullOrEmpty(d.Tags) &&
tags.Any(tag => d.Tags.Contains(tag)))
.ToList();
}
case "broadcast":
{
var allDevices = await deviceRepo.FindAsync(
d => d.ServiceId == message.ServiceId && d.IsActive && d.PushAgreed);
return allDevices.ToList();
}
default:
_logger.LogWarning("지원하지 않는 send_type: {SendType}", sendType);
return [];
}
}
private static List<long> ParseDeviceIds(PushTargetDto target)
{
if (target.Value == null) return [];
try
{
if (target.Value.Value.ValueKind == JsonValueKind.Array)
{
return target.Value.Value.EnumerateArray()
.Where(e => e.ValueKind == JsonValueKind.Number)
.Select(e => e.GetInt64())
.ToList();
}
}
catch { }
return [];
}
private static List<string> ParseTags(PushTargetDto target)
{
if (target.Value == null) return [];
try
{
if (target.Value.Value.ValueKind == JsonValueKind.Array)
{
return target.Value.Value.EnumerateArray()
.Where(e => e.ValueKind == JsonValueKind.String)
.Select(e => e.GetString()!)
.ToList();
}
}
catch { }
return [];
}
private async Task<List<(Device Device, PushResultDto Result)>> SendFcmBatchAsync(
string fcmCredentials, List<Device> devices, PushMessageDto message,
Dictionary<string, string>? customData, CancellationToken ct)
{
var results = new List<(Device, PushResultDto)>();
var tokens = devices.Select(d => d.DeviceToken).ToList();
var batchResults = await _fcmSender.SendBatchAsync(
fcmCredentials, tokens, message.Title, message.Body,
message.ImageUrl, customData, ct);
for (int i = 0; i < batchResults.Count && i < devices.Count; i++)
results.Add((devices[i], batchResults[i]));
return results;
}
private async Task<List<(Device Device, PushResultDto Result)>> SendApnsBatchAsync(
Service service, List<Device> devices, PushMessageDto message,
Dictionary<string, string>? customData, CancellationToken ct)
{
var results = new List<(Device, PushResultDto)>();
var tokens = devices.Select(d => d.DeviceToken).ToList();
var batchResults = await _apnsSender.SendBatchAsync(
service.ApnsPrivateKey!, service.ApnsKeyId!, service.ApnsTeamId!, service.ApnsBundleId!,
tokens, message.Title, message.Body,
message.ImageUrl, customData, ct);
for (int i = 0; i < batchResults.Count && i < devices.Count; i++)
results.Add((devices[i], batchResults[i]));
return results;
}
private async Task HandleRetryAsync(IChannel channel, BasicDeliverEventArgs ea, CancellationToken ct)
{
var retryCount = 0;
if (ea.BasicProperties.Headers?.TryGetValue("x-retry-count", out var retryObj) == true)
{
retryCount = retryObj is int r ? r : 0;
}
if (retryCount < MaxRetryCount)
{
_logger.LogWarning("메시지 재시도 ({RetryCount}/{MaxRetry})", retryCount + 1, MaxRetryCount);
await channel.BasicNackAsync(ea.DeliveryTag, false, true, ct);
}
else
{
_logger.LogError("메시지 최대 재시도 초과 — 폐기: deliveryTag={Tag}", ea.DeliveryTag);
await channel.BasicNackAsync(ea.DeliveryTag, false, false, ct);
}
}
}