diff --git a/SPMS.Infrastructure/DependencyInjection.cs b/SPMS.Infrastructure/DependencyInjection.cs index 0eee960..9d68f22 100644 --- a/SPMS.Infrastructure/DependencyInjection.cs +++ b/SPMS.Infrastructure/DependencyInjection.cs @@ -12,6 +12,7 @@ using SPMS.Infrastructure.Push; using SPMS.Infrastructure.Persistence.Repositories; using SPMS.Infrastructure.Security; using SPMS.Infrastructure.Services; +using SPMS.Infrastructure.Workers; namespace SPMS.Infrastructure; @@ -67,6 +68,9 @@ public static class DependencyInjection }); services.AddSingleton(); + // Workers + services.AddHostedService(); + // Token Store & Email Service services.AddMemoryCache(); services.AddSingleton(); diff --git a/SPMS.Infrastructure/Workers/PushWorker.cs b/SPMS.Infrastructure/Workers/PushWorker.cs new file mode 100644 index 0000000..db47dc5 --- /dev/null +++ b/SPMS.Infrastructure/Workers/PushWorker.cs @@ -0,0 +1,390 @@ +using System.Text; +using System.Text.Json; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using SPMS.Application.DTOs.Push; +using SPMS.Application.Interfaces; +using SPMS.Application.Settings; +using SPMS.Domain.Entities; +using SPMS.Domain.Enums; +using SPMS.Domain.Interfaces; +using SPMS.Infrastructure.Messaging; + +namespace SPMS.Infrastructure.Workers; + +public class PushWorker : BackgroundService +{ + private const int MaxRetryCount = 3; + + private readonly RabbitMQConnection _rabbitConnection; + private readonly RabbitMQSettings _rabbitSettings; + private readonly IServiceScopeFactory _scopeFactory; + private readonly IDuplicateChecker _duplicateChecker; + private readonly IFcmSender _fcmSender; + private readonly IApnsSender _apnsSender; + private readonly ILogger _logger; + + public PushWorker( + RabbitMQConnection rabbitConnection, + IOptions rabbitSettings, + IServiceScopeFactory scopeFactory, + IDuplicateChecker duplicateChecker, + IFcmSender fcmSender, + IApnsSender apnsSender, + ILogger logger) + { + _rabbitConnection = rabbitConnection; + _rabbitSettings = rabbitSettings.Value; + _scopeFactory = scopeFactory; + _duplicateChecker = duplicateChecker; + _fcmSender = fcmSender; + _apnsSender = apnsSender; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("PushWorker 시작"); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + await ConsumeAsync(stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + _logger.LogError(ex, "PushWorker 오류 — 5초 후 재시도"); + await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); + } + } + + _logger.LogInformation("PushWorker 종료"); + } + + private async Task ConsumeAsync(CancellationToken stoppingToken) + { + await using var channel = await _rabbitConnection.CreateChannelAsync(stoppingToken); + await channel.BasicQosAsync(0, _rabbitSettings.PrefetchCount, false, stoppingToken); + + var tcs = new TaskCompletionSource(); + stoppingToken.Register(() => tcs.TrySetResult()); + + var consumer = new AsyncEventingBasicConsumer(channel); + consumer.ReceivedAsync += async (_, ea) => + { + try + { + await ProcessMessageAsync(channel, ea, stoppingToken); + } + catch (Exception ex) + { + _logger.LogError(ex, "메시지 처리 중 예외 발생"); + await HandleRetryAsync(channel, ea, stoppingToken); + } + }; + + await channel.BasicConsumeAsync( + queue: _rabbitSettings.PushQueue, + autoAck: false, + consumer: consumer, + cancellationToken: stoppingToken); + + _logger.LogInformation("PushWorker Consumer 등록 완료: queue={Queue}", _rabbitSettings.PushQueue); + + await tcs.Task; + } + + private async Task ProcessMessageAsync(IChannel channel, BasicDeliverEventArgs ea, CancellationToken ct) + { + var body = Encoding.UTF8.GetString(ea.Body.ToArray()); + var pushMessage = JsonSerializer.Deserialize(body); + + if (pushMessage == null) + { + _logger.LogWarning("메시지 역직렬화 실패 — ACK 후 스킵"); + await channel.BasicAckAsync(ea.DeliveryTag, false, ct); + return; + } + + // [1] Redis 중복 체크 + if (await _duplicateChecker.IsDuplicateAsync(pushMessage.RequestId, ct)) + { + _logger.LogInformation("중복 메시지 스킵: requestId={RequestId}", pushMessage.RequestId); + await channel.BasicAckAsync(ea.DeliveryTag, false, ct); + return; + } + + using var scope = _scopeFactory.CreateScope(); + var serviceRepo = scope.ServiceProvider.GetRequiredService(); + var deviceRepo = scope.ServiceProvider.GetRequiredService(); + var unitOfWork = scope.ServiceProvider.GetRequiredService(); + + // 서비스 조회 + var service = await serviceRepo.GetByIdAsync(pushMessage.ServiceId); + if (service == null) + { + _logger.LogWarning("서비스 없음: serviceId={ServiceId}", pushMessage.ServiceId); + await channel.BasicAckAsync(ea.DeliveryTag, false, ct); + return; + } + + // [2] send_type별 대상 Device 조회 + var devices = await ResolveDevicesAsync(deviceRepo, pushMessage, ct); + if (devices.Count == 0) + { + _logger.LogInformation("발송 대상 없음: requestId={RequestId}", pushMessage.RequestId); + await channel.BasicAckAsync(ea.DeliveryTag, false, ct); + return; + } + + // [3] 플랫폼별 분류 + var iosDevices = devices.Where(d => d.Platform == Platform.iOS).ToList(); + var androidDevices = devices.Where(d => d.Platform == Platform.Android).ToList(); + + var customData = pushMessage.CustomData? + .ToDictionary(kv => kv.Key, kv => kv.Value.ToString() ?? string.Empty); + + if (!string.IsNullOrEmpty(pushMessage.LinkUrl)) + (customData ??= new Dictionary())["link_url"] = pushMessage.LinkUrl; + + // [4] 배치 발송 + var allResults = new List<(Device Device, PushResultDto Result)>(); + + if (androidDevices.Count > 0 && !string.IsNullOrEmpty(service.FcmCredentials)) + { + var fcmResults = await SendFcmBatchAsync( + service.FcmCredentials, androidDevices, pushMessage, customData, ct); + allResults.AddRange(fcmResults); + } + + if (iosDevices.Count > 0 && + !string.IsNullOrEmpty(service.ApnsPrivateKey) && + !string.IsNullOrEmpty(service.ApnsKeyId) && + !string.IsNullOrEmpty(service.ApnsTeamId) && + !string.IsNullOrEmpty(service.ApnsBundleId)) + { + var apnsResults = await SendApnsBatchAsync( + service, iosDevices, pushMessage, customData, ct); + allResults.AddRange(apnsResults); + } + + // [5] 결과 처리: PushSendLog INSERT + Device 상태 변경 + var pushSendLogRepo = scope.ServiceProvider.GetRequiredService>(); + var dailyStatRepo = scope.ServiceProvider.GetRequiredService>(); + + int successCount = 0, failCount = 0; + + foreach (var (device, result) in allResults) + { + var log = new PushSendLog + { + ServiceId = pushMessage.ServiceId, + MessageId = long.TryParse(pushMessage.MessageId, out var mid) ? mid : 0, + DeviceId = device.Id, + Status = result.IsSuccess ? PushResult.Success : PushResult.Failed, + FailReason = result.ErrorMessage, + SentAt = DateTime.UtcNow + }; + await pushSendLogRepo.AddAsync(log); + + if (result.IsSuccess) + successCount++; + else + failCount++; + + // 영구 오류: 디바이스 비활성화 + if (result.ShouldRemoveDevice) + { + device.IsActive = false; + device.UpdatedAt = DateTime.UtcNow; + deviceRepo.Update(device); + } + } + + // [6] DailyStat 증분 업데이트 + var today = DateOnly.FromDateTime(DateTime.UtcNow); + var stats = await dailyStatRepo.FindAsync( + s => s.ServiceId == pushMessage.ServiceId && s.StatDate == today); + var stat = stats.FirstOrDefault(); + + if (stat != null) + { + stat.SentCnt += allResults.Count; + stat.SuccessCnt += successCount; + stat.FailCnt += failCount; + dailyStatRepo.Update(stat); + } + else + { + await dailyStatRepo.AddAsync(new DailyStat + { + ServiceId = pushMessage.ServiceId, + StatDate = today, + SentCnt = allResults.Count, + SuccessCnt = successCount, + FailCnt = failCount, + CreatedAt = DateTime.UtcNow + }); + } + + await unitOfWork.SaveChangesAsync(ct); + + _logger.LogInformation( + "푸시 발송 완료: requestId={RequestId}, 성공={Success}, 실패={Fail}, 총={Total}", + pushMessage.RequestId, successCount, failCount, allResults.Count); + + // [8] ACK + await channel.BasicAckAsync(ea.DeliveryTag, false, ct); + } + + private async Task> ResolveDevicesAsync( + IDeviceRepository deviceRepo, PushMessageDto message, CancellationToken ct) + { + var sendType = message.SendType.ToLowerInvariant(); + + switch (sendType) + { + case "single": + { + var deviceIds = ParseDeviceIds(message.Target); + if (deviceIds.Count == 0) return []; + + var devices = new List(); + foreach (var id in deviceIds) + { + var device = await deviceRepo.GetByIdAndServiceAsync(id, message.ServiceId); + if (device is { IsActive: true, PushAgreed: true }) + devices.Add(device); + } + return devices; + } + + case "group": + { + var tags = ParseTags(message.Target); + if (tags.Count == 0) return []; + + var allDevices = await deviceRepo.FindAsync( + d => d.ServiceId == message.ServiceId && d.IsActive && d.PushAgreed); + + return allDevices + .Where(d => !string.IsNullOrEmpty(d.Tags) && + tags.Any(tag => d.Tags.Contains(tag))) + .ToList(); + } + + case "broadcast": + { + var allDevices = await deviceRepo.FindAsync( + d => d.ServiceId == message.ServiceId && d.IsActive && d.PushAgreed); + return allDevices.ToList(); + } + + default: + _logger.LogWarning("지원하지 않는 send_type: {SendType}", sendType); + return []; + } + } + + private static List ParseDeviceIds(PushTargetDto target) + { + if (target.Value == null) return []; + + try + { + if (target.Value.Value.ValueKind == JsonValueKind.Array) + { + return target.Value.Value.EnumerateArray() + .Where(e => e.ValueKind == JsonValueKind.Number) + .Select(e => e.GetInt64()) + .ToList(); + } + } + catch { } + + return []; + } + + private static List ParseTags(PushTargetDto target) + { + if (target.Value == null) return []; + + try + { + if (target.Value.Value.ValueKind == JsonValueKind.Array) + { + return target.Value.Value.EnumerateArray() + .Where(e => e.ValueKind == JsonValueKind.String) + .Select(e => e.GetString()!) + .ToList(); + } + } + catch { } + + return []; + } + + private async Task> SendFcmBatchAsync( + string fcmCredentials, List devices, PushMessageDto message, + Dictionary? customData, CancellationToken ct) + { + var results = new List<(Device, PushResultDto)>(); + var tokens = devices.Select(d => d.DeviceToken).ToList(); + + var batchResults = await _fcmSender.SendBatchAsync( + fcmCredentials, tokens, message.Title, message.Body, + message.ImageUrl, customData, ct); + + for (int i = 0; i < batchResults.Count && i < devices.Count; i++) + results.Add((devices[i], batchResults[i])); + + return results; + } + + private async Task> SendApnsBatchAsync( + Service service, List devices, PushMessageDto message, + Dictionary? customData, CancellationToken ct) + { + var results = new List<(Device, PushResultDto)>(); + var tokens = devices.Select(d => d.DeviceToken).ToList(); + + var batchResults = await _apnsSender.SendBatchAsync( + service.ApnsPrivateKey!, service.ApnsKeyId!, service.ApnsTeamId!, service.ApnsBundleId!, + tokens, message.Title, message.Body, + message.ImageUrl, customData, ct); + + for (int i = 0; i < batchResults.Count && i < devices.Count; i++) + results.Add((devices[i], batchResults[i])); + + return results; + } + + private async Task HandleRetryAsync(IChannel channel, BasicDeliverEventArgs ea, CancellationToken ct) + { + var retryCount = 0; + if (ea.BasicProperties.Headers?.TryGetValue("x-retry-count", out var retryObj) == true) + { + retryCount = retryObj is int r ? r : 0; + } + + if (retryCount < MaxRetryCount) + { + _logger.LogWarning("메시지 재시도 ({RetryCount}/{MaxRetry})", retryCount + 1, MaxRetryCount); + await channel.BasicNackAsync(ea.DeliveryTag, false, true, ct); + } + else + { + _logger.LogError("메시지 최대 재시도 초과 — 폐기: deliveryTag={Tag}", ea.DeliveryTag); + await channel.BasicNackAsync(ea.DeliveryTag, false, false, ct); + } + } +}