using System.Text; using System.Text.Json; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RabbitMQ.Client; using RabbitMQ.Client.Events; using SPMS.Application.DTOs.Push; using SPMS.Application.Interfaces; using SPMS.Application.Settings; using SPMS.Domain.Entities; using SPMS.Domain.Enums; using SPMS.Domain.Interfaces; using SPMS.Infrastructure.Messaging; namespace SPMS.Infrastructure.Workers; public class PushWorker : BackgroundService { private const int MaxRetryCount = 3; private readonly RabbitMQConnection _rabbitConnection; private readonly RabbitMQSettings _rabbitSettings; private readonly IServiceScopeFactory _scopeFactory; private readonly IDuplicateChecker _duplicateChecker; private readonly IBulkJobStore _bulkJobStore; private readonly ITokenCacheService _tokenCache; private readonly IFcmSender _fcmSender; private readonly IApnsSender _apnsSender; private readonly ILogger _logger; public PushWorker( RabbitMQConnection rabbitConnection, IOptions rabbitSettings, IServiceScopeFactory scopeFactory, IDuplicateChecker duplicateChecker, IBulkJobStore bulkJobStore, ITokenCacheService tokenCache, IFcmSender fcmSender, IApnsSender apnsSender, ILogger logger) { _rabbitConnection = rabbitConnection; _rabbitSettings = rabbitSettings.Value; _scopeFactory = scopeFactory; _duplicateChecker = duplicateChecker; _bulkJobStore = bulkJobStore; _tokenCache = tokenCache; _fcmSender = fcmSender; _apnsSender = apnsSender; _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("PushWorker 시작"); while (!stoppingToken.IsCancellationRequested) { try { await ConsumeAsync(stoppingToken); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { break; } catch (Exception ex) { _logger.LogError(ex, "PushWorker 오류 — 5초 후 재시도"); await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); } } _logger.LogInformation("PushWorker 종료"); } private async Task ConsumeAsync(CancellationToken stoppingToken) { await using var channel = await _rabbitConnection.CreateChannelAsync(stoppingToken); await channel.BasicQosAsync(0, _rabbitSettings.PrefetchCount, false, stoppingToken); var tcs = new TaskCompletionSource(); stoppingToken.Register(() => tcs.TrySetResult()); var consumer = new AsyncEventingBasicConsumer(channel); consumer.ReceivedAsync += async (_, ea) => { try { await ProcessMessageAsync(channel, ea, stoppingToken); } catch (Exception ex) { _logger.LogError(ex, "메시지 처리 중 예외 발생"); await HandleRetryAsync(channel, ea, stoppingToken); } }; await channel.BasicConsumeAsync( queue: _rabbitSettings.PushQueue, autoAck: false, consumer: consumer, cancellationToken: stoppingToken); _logger.LogInformation("PushWorker Consumer 등록 완료: queue={Queue}", _rabbitSettings.PushQueue); await tcs.Task; } private async Task ProcessMessageAsync(IChannel channel, BasicDeliverEventArgs ea, CancellationToken ct) { var body = Encoding.UTF8.GetString(ea.Body.ToArray()); var pushMessage = JsonSerializer.Deserialize(body); if (pushMessage == null) { _logger.LogWarning("메시지 역직렬화 실패 — ACK 후 스킵"); await channel.BasicAckAsync(ea.DeliveryTag, false, ct); return; } // [0] Bulk job 취소 체크 if (!string.IsNullOrEmpty(pushMessage.JobId) && await _bulkJobStore.IsCancelledAsync(pushMessage.JobId, ct)) { _logger.LogInformation("Bulk job 취소됨 — 스킵: jobId={JobId}, requestId={RequestId}", pushMessage.JobId, pushMessage.RequestId); await channel.BasicAckAsync(ea.DeliveryTag, false, ct); return; } // [1] Redis 중복 체크 if (await _duplicateChecker.IsDuplicateAsync(pushMessage.RequestId, ct)) { _logger.LogInformation("중복 메시지 스킵: requestId={RequestId}", pushMessage.RequestId); await channel.BasicAckAsync(ea.DeliveryTag, false, ct); return; } // [1.5] Bulk job processing 상태 전환 if (!string.IsNullOrEmpty(pushMessage.JobId)) await _bulkJobStore.SetProcessingAsync(pushMessage.JobId, ct); using var scope = _scopeFactory.CreateScope(); var serviceRepo = scope.ServiceProvider.GetRequiredService(); var deviceRepo = scope.ServiceProvider.GetRequiredService(); var unitOfWork = scope.ServiceProvider.GetRequiredService(); // 서비스 조회 var service = await serviceRepo.GetByIdAsync(pushMessage.ServiceId); if (service == null) { _logger.LogWarning("서비스 없음: serviceId={ServiceId}", pushMessage.ServiceId); await channel.BasicAckAsync(ea.DeliveryTag, false, ct); return; } // [2] send_type별 대상 Device 조회 var devices = await ResolveDevicesAsync(deviceRepo, pushMessage, ct); if (devices.Count == 0) { _logger.LogInformation("발송 대상 없음: requestId={RequestId}", pushMessage.RequestId); await channel.BasicAckAsync(ea.DeliveryTag, false, ct); return; } // [3] 플랫폼별 분류 var iosDevices = devices.Where(d => d.Platform == Platform.iOS).ToList(); var androidDevices = devices.Where(d => d.Platform == Platform.Android).ToList(); var customData = pushMessage.CustomData? .ToDictionary(kv => kv.Key, kv => kv.Value.ToString() ?? string.Empty); if (!string.IsNullOrEmpty(pushMessage.LinkUrl)) (customData ??= new Dictionary())["link_url"] = pushMessage.LinkUrl; // [4] 배치 발송 var allResults = new List<(Device Device, PushResultDto Result)>(); if (androidDevices.Count > 0 && !string.IsNullOrEmpty(service.FcmCredentials)) { var fcmResults = await SendFcmBatchAsync( service.FcmCredentials, androidDevices, pushMessage, customData, ct); allResults.AddRange(fcmResults); } if (iosDevices.Count > 0 && !string.IsNullOrEmpty(service.ApnsPrivateKey) && !string.IsNullOrEmpty(service.ApnsKeyId) && !string.IsNullOrEmpty(service.ApnsTeamId) && !string.IsNullOrEmpty(service.ApnsBundleId)) { var apnsResults = await SendApnsBatchAsync( service, iosDevices, pushMessage, customData, ct); allResults.AddRange(apnsResults); } // [5] 결과 처리: PushSendLog INSERT + Device 상태 변경 var pushSendLogRepo = scope.ServiceProvider.GetRequiredService>(); var dailyStatRepo = scope.ServiceProvider.GetRequiredService>(); int successCount = 0, failCount = 0; foreach (var (device, result) in allResults) { var log = new PushSendLog { ServiceId = pushMessage.ServiceId, MessageId = long.TryParse(pushMessage.MessageId, out var mid) ? mid : 0, DeviceId = device.Id, Status = result.IsSuccess ? PushResult.Success : PushResult.Failed, FailReason = result.ErrorMessage, SentAt = DateTime.UtcNow }; await pushSendLogRepo.AddAsync(log); if (result.IsSuccess) successCount++; else failCount++; // 영구 오류: 디바이스 비활성화 if (result.ShouldRemoveDevice) { device.IsActive = false; device.UpdatedAt = DateTime.UtcNow; deviceRepo.Update(device); } } // [6] DailyStat 증분 업데이트 var today = DateOnly.FromDateTime(DateTime.UtcNow); var stats = await dailyStatRepo.FindAsync( s => s.ServiceId == pushMessage.ServiceId && s.StatDate == today); var stat = stats.FirstOrDefault(); if (stat != null) { stat.SentCnt += allResults.Count; stat.SuccessCnt += successCount; stat.FailCnt += failCount; dailyStatRepo.Update(stat); } else { await dailyStatRepo.AddAsync(new DailyStat { ServiceId = pushMessage.ServiceId, StatDate = today, SentCnt = allResults.Count, SuccessCnt = successCount, FailCnt = failCount, CreatedAt = DateTime.UtcNow }); } await unitOfWork.SaveChangesAsync(ct); _logger.LogInformation( "푸시 발송 완료: requestId={RequestId}, 성공={Success}, 실패={Fail}, 총={Total}", pushMessage.RequestId, successCount, failCount, allResults.Count); // [7] Bulk job 진행률 업데이트 if (!string.IsNullOrEmpty(pushMessage.JobId)) { if (successCount > 0) await _bulkJobStore.IncrementSentAsync(pushMessage.JobId, ct); else await _bulkJobStore.IncrementFailedAsync(pushMessage.JobId, ct); await _bulkJobStore.TryCompleteAsync(pushMessage.JobId, ct); } // [8] ACK await channel.BasicAckAsync(ea.DeliveryTag, false, ct); } private async Task> ResolveDevicesAsync( IDeviceRepository deviceRepo, PushMessageDto message, CancellationToken ct) { var sendType = message.SendType.ToLowerInvariant(); switch (sendType) { case "single": { var deviceIds = ParseDeviceIds(message.Target); if (deviceIds.Count == 0) return []; var devices = new List(); foreach (var id in deviceIds) { // Redis 캐시 우선 조회 var cached = await _tokenCache.GetDeviceInfoAsync(message.ServiceId, id); if (cached != null) { if (cached.IsActive && cached.PushAgreed) { devices.Add(new Device { Id = id, ServiceId = message.ServiceId, DeviceToken = cached.Token, Platform = (Platform)cached.Platform, IsActive = true, PushAgreed = true }); } continue; } // 캐시 미스 → DB 조회 후 캐시 저장 var device = await deviceRepo.GetByIdAndServiceAsync(id, message.ServiceId); if (device != null) { await _tokenCache.SetDeviceInfoAsync(message.ServiceId, id, new CachedDeviceInfo(device.DeviceToken, (int)device.Platform, device.IsActive, device.PushAgreed)); if (device is { IsActive: true, PushAgreed: true }) devices.Add(device); } } return devices; } case "group": { var tags = ParseTags(message.Target); if (tags.Count == 0) return []; var allDevices = await deviceRepo.FindAsync( d => d.ServiceId == message.ServiceId && d.IsActive && d.PushAgreed); return allDevices .Where(d => !string.IsNullOrEmpty(d.Tags) && tags.Any(tag => d.Tags.Contains(tag))) .ToList(); } case "broadcast": { var allDevices = await deviceRepo.FindAsync( d => d.ServiceId == message.ServiceId && d.IsActive && d.PushAgreed); return allDevices.ToList(); } default: _logger.LogWarning("지원하지 않는 send_type: {SendType}", sendType); return []; } } private static List ParseDeviceIds(PushTargetDto target) { if (target.Value == null) return []; try { if (target.Value.Value.ValueKind == JsonValueKind.Array) { return target.Value.Value.EnumerateArray() .Where(e => e.ValueKind == JsonValueKind.Number) .Select(e => e.GetInt64()) .ToList(); } } catch { } return []; } private static List ParseTags(PushTargetDto target) { if (target.Value == null) return []; try { if (target.Value.Value.ValueKind == JsonValueKind.Array) { return target.Value.Value.EnumerateArray() .Where(e => e.ValueKind == JsonValueKind.String) .Select(e => e.GetString()!) .ToList(); } } catch { } return []; } private async Task> SendFcmBatchAsync( string fcmCredentials, List devices, PushMessageDto message, Dictionary? customData, CancellationToken ct) { var results = new List<(Device, PushResultDto)>(); var tokens = devices.Select(d => d.DeviceToken).ToList(); var batchResults = await _fcmSender.SendBatchAsync( fcmCredentials, tokens, message.Title, message.Body, message.ImageUrl, customData, ct); for (int i = 0; i < batchResults.Count && i < devices.Count; i++) results.Add((devices[i], batchResults[i])); return results; } private async Task> SendApnsBatchAsync( Service service, List devices, PushMessageDto message, Dictionary? customData, CancellationToken ct) { var results = new List<(Device, PushResultDto)>(); var tokens = devices.Select(d => d.DeviceToken).ToList(); var batchResults = await _apnsSender.SendBatchAsync( service.ApnsPrivateKey!, service.ApnsKeyId!, service.ApnsTeamId!, service.ApnsBundleId!, tokens, message.Title, message.Body, message.ImageUrl, customData, ct); for (int i = 0; i < batchResults.Count && i < devices.Count; i++) results.Add((devices[i], batchResults[i])); return results; } private async Task HandleRetryAsync(IChannel channel, BasicDeliverEventArgs ea, CancellationToken ct) { var retryCount = 0; if (ea.BasicProperties.Headers?.TryGetValue("x-retry-count", out var retryObj) == true) { retryCount = retryObj is int r ? r : 0; } if (retryCount < MaxRetryCount) { _logger.LogWarning("메시지 재시도 ({RetryCount}/{MaxRetry})", retryCount + 1, MaxRetryCount); await channel.BasicNackAsync(ea.DeliveryTag, false, true, ct); } else { _logger.LogError("메시지 최대 재시도 초과 — 폐기: deliveryTag={Tag}", ea.DeliveryTag); await channel.BasicNackAsync(ea.DeliveryTag, false, false, ct); } } }