feat: PushWorker 구현 (#110) #111
|
|
@ -12,6 +12,7 @@ using SPMS.Infrastructure.Push;
|
|||
using SPMS.Infrastructure.Persistence.Repositories;
|
||||
using SPMS.Infrastructure.Security;
|
||||
using SPMS.Infrastructure.Services;
|
||||
using SPMS.Infrastructure.Workers;
|
||||
|
||||
namespace SPMS.Infrastructure;
|
||||
|
||||
|
|
@ -67,6 +68,9 @@ public static class DependencyInjection
|
|||
});
|
||||
services.AddSingleton<IApnsSender, ApnsSender>();
|
||||
|
||||
// Workers
|
||||
services.AddHostedService<PushWorker>();
|
||||
|
||||
// Token Store & Email Service
|
||||
services.AddMemoryCache();
|
||||
services.AddSingleton<ITokenStore, InMemoryTokenStore>();
|
||||
|
|
|
|||
390
SPMS.Infrastructure/Workers/PushWorker.cs
Normal file
390
SPMS.Infrastructure/Workers/PushWorker.cs
Normal file
|
|
@ -0,0 +1,390 @@
|
|||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using RabbitMQ.Client;
|
||||
using RabbitMQ.Client.Events;
|
||||
using SPMS.Application.DTOs.Push;
|
||||
using SPMS.Application.Interfaces;
|
||||
using SPMS.Application.Settings;
|
||||
using SPMS.Domain.Entities;
|
||||
using SPMS.Domain.Enums;
|
||||
using SPMS.Domain.Interfaces;
|
||||
using SPMS.Infrastructure.Messaging;
|
||||
|
||||
namespace SPMS.Infrastructure.Workers;
|
||||
|
||||
public class PushWorker : BackgroundService
|
||||
{
|
||||
private const int MaxRetryCount = 3;
|
||||
|
||||
private readonly RabbitMQConnection _rabbitConnection;
|
||||
private readonly RabbitMQSettings _rabbitSettings;
|
||||
private readonly IServiceScopeFactory _scopeFactory;
|
||||
private readonly IDuplicateChecker _duplicateChecker;
|
||||
private readonly IFcmSender _fcmSender;
|
||||
private readonly IApnsSender _apnsSender;
|
||||
private readonly ILogger<PushWorker> _logger;
|
||||
|
||||
public PushWorker(
|
||||
RabbitMQConnection rabbitConnection,
|
||||
IOptions<RabbitMQSettings> rabbitSettings,
|
||||
IServiceScopeFactory scopeFactory,
|
||||
IDuplicateChecker duplicateChecker,
|
||||
IFcmSender fcmSender,
|
||||
IApnsSender apnsSender,
|
||||
ILogger<PushWorker> logger)
|
||||
{
|
||||
_rabbitConnection = rabbitConnection;
|
||||
_rabbitSettings = rabbitSettings.Value;
|
||||
_scopeFactory = scopeFactory;
|
||||
_duplicateChecker = duplicateChecker;
|
||||
_fcmSender = fcmSender;
|
||||
_apnsSender = apnsSender;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_logger.LogInformation("PushWorker 시작");
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await ConsumeAsync(stoppingToken);
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "PushWorker 오류 — 5초 후 재시도");
|
||||
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
|
||||
}
|
||||
}
|
||||
|
||||
_logger.LogInformation("PushWorker 종료");
|
||||
}
|
||||
|
||||
private async Task ConsumeAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
await using var channel = await _rabbitConnection.CreateChannelAsync(stoppingToken);
|
||||
await channel.BasicQosAsync(0, _rabbitSettings.PrefetchCount, false, stoppingToken);
|
||||
|
||||
var tcs = new TaskCompletionSource();
|
||||
stoppingToken.Register(() => tcs.TrySetResult());
|
||||
|
||||
var consumer = new AsyncEventingBasicConsumer(channel);
|
||||
consumer.ReceivedAsync += async (_, ea) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
await ProcessMessageAsync(channel, ea, stoppingToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "메시지 처리 중 예외 발생");
|
||||
await HandleRetryAsync(channel, ea, stoppingToken);
|
||||
}
|
||||
};
|
||||
|
||||
await channel.BasicConsumeAsync(
|
||||
queue: _rabbitSettings.PushQueue,
|
||||
autoAck: false,
|
||||
consumer: consumer,
|
||||
cancellationToken: stoppingToken);
|
||||
|
||||
_logger.LogInformation("PushWorker Consumer 등록 완료: queue={Queue}", _rabbitSettings.PushQueue);
|
||||
|
||||
await tcs.Task;
|
||||
}
|
||||
|
||||
private async Task ProcessMessageAsync(IChannel channel, BasicDeliverEventArgs ea, CancellationToken ct)
|
||||
{
|
||||
var body = Encoding.UTF8.GetString(ea.Body.ToArray());
|
||||
var pushMessage = JsonSerializer.Deserialize<PushMessageDto>(body);
|
||||
|
||||
if (pushMessage == null)
|
||||
{
|
||||
_logger.LogWarning("메시지 역직렬화 실패 — ACK 후 스킵");
|
||||
await channel.BasicAckAsync(ea.DeliveryTag, false, ct);
|
||||
return;
|
||||
}
|
||||
|
||||
// [1] Redis 중복 체크
|
||||
if (await _duplicateChecker.IsDuplicateAsync(pushMessage.RequestId, ct))
|
||||
{
|
||||
_logger.LogInformation("중복 메시지 스킵: requestId={RequestId}", pushMessage.RequestId);
|
||||
await channel.BasicAckAsync(ea.DeliveryTag, false, ct);
|
||||
return;
|
||||
}
|
||||
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var serviceRepo = scope.ServiceProvider.GetRequiredService<IServiceRepository>();
|
||||
var deviceRepo = scope.ServiceProvider.GetRequiredService<IDeviceRepository>();
|
||||
var unitOfWork = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
|
||||
|
||||
// 서비스 조회
|
||||
var service = await serviceRepo.GetByIdAsync(pushMessage.ServiceId);
|
||||
if (service == null)
|
||||
{
|
||||
_logger.LogWarning("서비스 없음: serviceId={ServiceId}", pushMessage.ServiceId);
|
||||
await channel.BasicAckAsync(ea.DeliveryTag, false, ct);
|
||||
return;
|
||||
}
|
||||
|
||||
// [2] send_type별 대상 Device 조회
|
||||
var devices = await ResolveDevicesAsync(deviceRepo, pushMessage, ct);
|
||||
if (devices.Count == 0)
|
||||
{
|
||||
_logger.LogInformation("발송 대상 없음: requestId={RequestId}", pushMessage.RequestId);
|
||||
await channel.BasicAckAsync(ea.DeliveryTag, false, ct);
|
||||
return;
|
||||
}
|
||||
|
||||
// [3] 플랫폼별 분류
|
||||
var iosDevices = devices.Where(d => d.Platform == Platform.iOS).ToList();
|
||||
var androidDevices = devices.Where(d => d.Platform == Platform.Android).ToList();
|
||||
|
||||
var customData = pushMessage.CustomData?
|
||||
.ToDictionary(kv => kv.Key, kv => kv.Value.ToString() ?? string.Empty);
|
||||
|
||||
if (!string.IsNullOrEmpty(pushMessage.LinkUrl))
|
||||
(customData ??= new Dictionary<string, string>())["link_url"] = pushMessage.LinkUrl;
|
||||
|
||||
// [4] 배치 발송
|
||||
var allResults = new List<(Device Device, PushResultDto Result)>();
|
||||
|
||||
if (androidDevices.Count > 0 && !string.IsNullOrEmpty(service.FcmCredentials))
|
||||
{
|
||||
var fcmResults = await SendFcmBatchAsync(
|
||||
service.FcmCredentials, androidDevices, pushMessage, customData, ct);
|
||||
allResults.AddRange(fcmResults);
|
||||
}
|
||||
|
||||
if (iosDevices.Count > 0 &&
|
||||
!string.IsNullOrEmpty(service.ApnsPrivateKey) &&
|
||||
!string.IsNullOrEmpty(service.ApnsKeyId) &&
|
||||
!string.IsNullOrEmpty(service.ApnsTeamId) &&
|
||||
!string.IsNullOrEmpty(service.ApnsBundleId))
|
||||
{
|
||||
var apnsResults = await SendApnsBatchAsync(
|
||||
service, iosDevices, pushMessage, customData, ct);
|
||||
allResults.AddRange(apnsResults);
|
||||
}
|
||||
|
||||
// [5] 결과 처리: PushSendLog INSERT + Device 상태 변경
|
||||
var pushSendLogRepo = scope.ServiceProvider.GetRequiredService<IRepository<PushSendLog>>();
|
||||
var dailyStatRepo = scope.ServiceProvider.GetRequiredService<IRepository<DailyStat>>();
|
||||
|
||||
int successCount = 0, failCount = 0;
|
||||
|
||||
foreach (var (device, result) in allResults)
|
||||
{
|
||||
var log = new PushSendLog
|
||||
{
|
||||
ServiceId = pushMessage.ServiceId,
|
||||
MessageId = long.TryParse(pushMessage.MessageId, out var mid) ? mid : 0,
|
||||
DeviceId = device.Id,
|
||||
Status = result.IsSuccess ? PushResult.Success : PushResult.Failed,
|
||||
FailReason = result.ErrorMessage,
|
||||
SentAt = DateTime.UtcNow
|
||||
};
|
||||
await pushSendLogRepo.AddAsync(log);
|
||||
|
||||
if (result.IsSuccess)
|
||||
successCount++;
|
||||
else
|
||||
failCount++;
|
||||
|
||||
// 영구 오류: 디바이스 비활성화
|
||||
if (result.ShouldRemoveDevice)
|
||||
{
|
||||
device.IsActive = false;
|
||||
device.UpdatedAt = DateTime.UtcNow;
|
||||
deviceRepo.Update(device);
|
||||
}
|
||||
}
|
||||
|
||||
// [6] DailyStat 증분 업데이트
|
||||
var today = DateOnly.FromDateTime(DateTime.UtcNow);
|
||||
var stats = await dailyStatRepo.FindAsync(
|
||||
s => s.ServiceId == pushMessage.ServiceId && s.StatDate == today);
|
||||
var stat = stats.FirstOrDefault();
|
||||
|
||||
if (stat != null)
|
||||
{
|
||||
stat.SentCnt += allResults.Count;
|
||||
stat.SuccessCnt += successCount;
|
||||
stat.FailCnt += failCount;
|
||||
dailyStatRepo.Update(stat);
|
||||
}
|
||||
else
|
||||
{
|
||||
await dailyStatRepo.AddAsync(new DailyStat
|
||||
{
|
||||
ServiceId = pushMessage.ServiceId,
|
||||
StatDate = today,
|
||||
SentCnt = allResults.Count,
|
||||
SuccessCnt = successCount,
|
||||
FailCnt = failCount,
|
||||
CreatedAt = DateTime.UtcNow
|
||||
});
|
||||
}
|
||||
|
||||
await unitOfWork.SaveChangesAsync(ct);
|
||||
|
||||
_logger.LogInformation(
|
||||
"푸시 발송 완료: requestId={RequestId}, 성공={Success}, 실패={Fail}, 총={Total}",
|
||||
pushMessage.RequestId, successCount, failCount, allResults.Count);
|
||||
|
||||
// [8] ACK
|
||||
await channel.BasicAckAsync(ea.DeliveryTag, false, ct);
|
||||
}
|
||||
|
||||
private async Task<List<Device>> ResolveDevicesAsync(
|
||||
IDeviceRepository deviceRepo, PushMessageDto message, CancellationToken ct)
|
||||
{
|
||||
var sendType = message.SendType.ToLowerInvariant();
|
||||
|
||||
switch (sendType)
|
||||
{
|
||||
case "single":
|
||||
{
|
||||
var deviceIds = ParseDeviceIds(message.Target);
|
||||
if (deviceIds.Count == 0) return [];
|
||||
|
||||
var devices = new List<Device>();
|
||||
foreach (var id in deviceIds)
|
||||
{
|
||||
var device = await deviceRepo.GetByIdAndServiceAsync(id, message.ServiceId);
|
||||
if (device is { IsActive: true, PushAgreed: true })
|
||||
devices.Add(device);
|
||||
}
|
||||
return devices;
|
||||
}
|
||||
|
||||
case "group":
|
||||
{
|
||||
var tags = ParseTags(message.Target);
|
||||
if (tags.Count == 0) return [];
|
||||
|
||||
var allDevices = await deviceRepo.FindAsync(
|
||||
d => d.ServiceId == message.ServiceId && d.IsActive && d.PushAgreed);
|
||||
|
||||
return allDevices
|
||||
.Where(d => !string.IsNullOrEmpty(d.Tags) &&
|
||||
tags.Any(tag => d.Tags.Contains(tag)))
|
||||
.ToList();
|
||||
}
|
||||
|
||||
case "broadcast":
|
||||
{
|
||||
var allDevices = await deviceRepo.FindAsync(
|
||||
d => d.ServiceId == message.ServiceId && d.IsActive && d.PushAgreed);
|
||||
return allDevices.ToList();
|
||||
}
|
||||
|
||||
default:
|
||||
_logger.LogWarning("지원하지 않는 send_type: {SendType}", sendType);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
private static List<long> ParseDeviceIds(PushTargetDto target)
|
||||
{
|
||||
if (target.Value == null) return [];
|
||||
|
||||
try
|
||||
{
|
||||
if (target.Value.Value.ValueKind == JsonValueKind.Array)
|
||||
{
|
||||
return target.Value.Value.EnumerateArray()
|
||||
.Where(e => e.ValueKind == JsonValueKind.Number)
|
||||
.Select(e => e.GetInt64())
|
||||
.ToList();
|
||||
}
|
||||
}
|
||||
catch { }
|
||||
|
||||
return [];
|
||||
}
|
||||
|
||||
private static List<string> ParseTags(PushTargetDto target)
|
||||
{
|
||||
if (target.Value == null) return [];
|
||||
|
||||
try
|
||||
{
|
||||
if (target.Value.Value.ValueKind == JsonValueKind.Array)
|
||||
{
|
||||
return target.Value.Value.EnumerateArray()
|
||||
.Where(e => e.ValueKind == JsonValueKind.String)
|
||||
.Select(e => e.GetString()!)
|
||||
.ToList();
|
||||
}
|
||||
}
|
||||
catch { }
|
||||
|
||||
return [];
|
||||
}
|
||||
|
||||
private async Task<List<(Device Device, PushResultDto Result)>> SendFcmBatchAsync(
|
||||
string fcmCredentials, List<Device> devices, PushMessageDto message,
|
||||
Dictionary<string, string>? customData, CancellationToken ct)
|
||||
{
|
||||
var results = new List<(Device, PushResultDto)>();
|
||||
var tokens = devices.Select(d => d.DeviceToken).ToList();
|
||||
|
||||
var batchResults = await _fcmSender.SendBatchAsync(
|
||||
fcmCredentials, tokens, message.Title, message.Body,
|
||||
message.ImageUrl, customData, ct);
|
||||
|
||||
for (int i = 0; i < batchResults.Count && i < devices.Count; i++)
|
||||
results.Add((devices[i], batchResults[i]));
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
private async Task<List<(Device Device, PushResultDto Result)>> SendApnsBatchAsync(
|
||||
Service service, List<Device> devices, PushMessageDto message,
|
||||
Dictionary<string, string>? customData, CancellationToken ct)
|
||||
{
|
||||
var results = new List<(Device, PushResultDto)>();
|
||||
var tokens = devices.Select(d => d.DeviceToken).ToList();
|
||||
|
||||
var batchResults = await _apnsSender.SendBatchAsync(
|
||||
service.ApnsPrivateKey!, service.ApnsKeyId!, service.ApnsTeamId!, service.ApnsBundleId!,
|
||||
tokens, message.Title, message.Body,
|
||||
message.ImageUrl, customData, ct);
|
||||
|
||||
for (int i = 0; i < batchResults.Count && i < devices.Count; i++)
|
||||
results.Add((devices[i], batchResults[i]));
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
private async Task HandleRetryAsync(IChannel channel, BasicDeliverEventArgs ea, CancellationToken ct)
|
||||
{
|
||||
var retryCount = 0;
|
||||
if (ea.BasicProperties.Headers?.TryGetValue("x-retry-count", out var retryObj) == true)
|
||||
{
|
||||
retryCount = retryObj is int r ? r : 0;
|
||||
}
|
||||
|
||||
if (retryCount < MaxRetryCount)
|
||||
{
|
||||
_logger.LogWarning("메시지 재시도 ({RetryCount}/{MaxRetry})", retryCount + 1, MaxRetryCount);
|
||||
await channel.BasicNackAsync(ea.DeliveryTag, false, true, ct);
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogError("메시지 최대 재시도 초과 — 폐기: deliveryTag={Tag}", ea.DeliveryTag);
|
||||
await channel.BasicNackAsync(ea.DeliveryTag, false, false, ct);
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user