SPMS_API/SPMS.Infrastructure/Workers/PushWorker.cs

488 lines
18 KiB
C#

using System.Text;
using System.Text.Json;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using SPMS.Application.DTOs.Push;
using SPMS.Application.Interfaces;
using SPMS.Application.Settings;
using SPMS.Domain.Entities;
using SPMS.Domain.Enums;
using SPMS.Domain.Interfaces;
using SPMS.Infrastructure.Messaging;
namespace SPMS.Infrastructure.Workers;
public class PushWorker : BackgroundService
{
private const int MaxRetryCount = 3;
private readonly RabbitMQConnection _rabbitConnection;
private readonly RabbitMQSettings _rabbitSettings;
private readonly IServiceScopeFactory _scopeFactory;
private readonly IDuplicateChecker _duplicateChecker;
private readonly IBulkJobStore _bulkJobStore;
private readonly ITokenCacheService _tokenCache;
private readonly IWebhookService _webhookService;
private readonly IFcmSender _fcmSender;
private readonly IApnsSender _apnsSender;
private readonly ILogger<PushWorker> _logger;
public PushWorker(
RabbitMQConnection rabbitConnection,
IOptions<RabbitMQSettings> rabbitSettings,
IServiceScopeFactory scopeFactory,
IDuplicateChecker duplicateChecker,
IBulkJobStore bulkJobStore,
ITokenCacheService tokenCache,
IWebhookService webhookService,
IFcmSender fcmSender,
IApnsSender apnsSender,
ILogger<PushWorker> logger)
{
_rabbitConnection = rabbitConnection;
_rabbitSettings = rabbitSettings.Value;
_scopeFactory = scopeFactory;
_duplicateChecker = duplicateChecker;
_bulkJobStore = bulkJobStore;
_tokenCache = tokenCache;
_webhookService = webhookService;
_fcmSender = fcmSender;
_apnsSender = apnsSender;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("PushWorker 시작");
while (!stoppingToken.IsCancellationRequested)
{
try
{
await ConsumeAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "PushWorker 오류 — 5초 후 재시도");
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
_logger.LogInformation("PushWorker 종료");
}
private async Task ConsumeAsync(CancellationToken stoppingToken)
{
await using var channel = await _rabbitConnection.CreateChannelAsync(stoppingToken);
await channel.BasicQosAsync(0, _rabbitSettings.PrefetchCount, false, stoppingToken);
var tcs = new TaskCompletionSource();
stoppingToken.Register(() => tcs.TrySetResult());
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (_, ea) =>
{
try
{
await ProcessMessageAsync(channel, ea, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "메시지 처리 중 예외 발생");
await HandleRetryAsync(channel, ea, stoppingToken);
}
};
await channel.BasicConsumeAsync(
queue: _rabbitSettings.PushQueue,
autoAck: false,
consumer: consumer,
cancellationToken: stoppingToken);
_logger.LogInformation("PushWorker Consumer 등록 완료: queue={Queue}", _rabbitSettings.PushQueue);
await tcs.Task;
}
private async Task ProcessMessageAsync(IChannel channel, BasicDeliverEventArgs ea, CancellationToken ct)
{
var body = Encoding.UTF8.GetString(ea.Body.ToArray());
var pushMessage = JsonSerializer.Deserialize<PushMessageDto>(body);
if (pushMessage == null)
{
_logger.LogWarning("메시지 역직렬화 실패 — ACK 후 스킵");
await channel.BasicAckAsync(ea.DeliveryTag, false, ct);
return;
}
// [0] Bulk job 취소 체크
if (!string.IsNullOrEmpty(pushMessage.JobId) &&
await _bulkJobStore.IsCancelledAsync(pushMessage.JobId, ct))
{
_logger.LogInformation("Bulk job 취소됨 — 스킵: jobId={JobId}, requestId={RequestId}",
pushMessage.JobId, pushMessage.RequestId);
await channel.BasicAckAsync(ea.DeliveryTag, false, ct);
return;
}
// [1] Redis 중복 체크
if (await _duplicateChecker.IsDuplicateAsync(pushMessage.RequestId, ct))
{
_logger.LogInformation("중복 메시지 스킵: requestId={RequestId}", pushMessage.RequestId);
await channel.BasicAckAsync(ea.DeliveryTag, false, ct);
return;
}
// [1.5] Bulk job processing 상태 전환
if (!string.IsNullOrEmpty(pushMessage.JobId))
await _bulkJobStore.SetProcessingAsync(pushMessage.JobId, ct);
using var scope = _scopeFactory.CreateScope();
var serviceRepo = scope.ServiceProvider.GetRequiredService<IServiceRepository>();
var deviceRepo = scope.ServiceProvider.GetRequiredService<IDeviceRepository>();
var tagRepo = scope.ServiceProvider.GetRequiredService<ITagRepository>();
var unitOfWork = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
// 서비스 조회
var service = await serviceRepo.GetByIdAsync(pushMessage.ServiceId);
if (service == null)
{
_logger.LogWarning("서비스 없음: serviceId={ServiceId}", pushMessage.ServiceId);
await channel.BasicAckAsync(ea.DeliveryTag, false, ct);
return;
}
// [2] send_type별 대상 Device 조회
var devices = await ResolveDevicesAsync(deviceRepo, tagRepo, pushMessage, ct);
if (devices.Count == 0)
{
_logger.LogInformation("발송 대상 없음: requestId={RequestId}", pushMessage.RequestId);
await channel.BasicAckAsync(ea.DeliveryTag, false, ct);
return;
}
// [3] 플랫폼별 분류
var iosDevices = devices.Where(d => d.Platform == Platform.iOS).ToList();
var androidDevices = devices.Where(d => d.Platform == Platform.Android).ToList();
var customData = pushMessage.CustomData?
.ToDictionary(kv => kv.Key, kv => kv.Value.ToString() ?? string.Empty);
if (!string.IsNullOrEmpty(pushMessage.LinkUrl))
(customData ??= new Dictionary<string, string>())["link_url"] = pushMessage.LinkUrl;
// [4] 배치 발송
var allResults = new List<(Device Device, PushResultDto Result)>();
if (androidDevices.Count > 0 && !string.IsNullOrEmpty(service.FcmCredentials))
{
var fcmResults = await SendFcmBatchAsync(
service.FcmCredentials, androidDevices, pushMessage, customData, ct);
allResults.AddRange(fcmResults);
}
if (iosDevices.Count > 0 &&
!string.IsNullOrEmpty(service.ApnsPrivateKey) &&
!string.IsNullOrEmpty(service.ApnsKeyId) &&
!string.IsNullOrEmpty(service.ApnsTeamId) &&
!string.IsNullOrEmpty(service.ApnsBundleId))
{
var apnsResults = await SendApnsBatchAsync(
service, iosDevices, pushMessage, customData, ct);
allResults.AddRange(apnsResults);
}
// [5] 결과 처리: PushSendLog INSERT + Device 상태 변경
var pushSendLogRepo = scope.ServiceProvider.GetRequiredService<IRepository<PushSendLog>>();
var dailyStatRepo = scope.ServiceProvider.GetRequiredService<IRepository<DailyStat>>();
int successCount = 0, failCount = 0;
foreach (var (device, result) in allResults)
{
var log = new PushSendLog
{
ServiceId = pushMessage.ServiceId,
MessageId = long.TryParse(pushMessage.MessageId, out var mid) ? mid : 0,
DeviceId = device.Id,
Status = result.IsSuccess ? PushResult.Success : PushResult.Failed,
FailReason = result.ErrorMessage,
SentAt = DateTime.UtcNow
};
await pushSendLogRepo.AddAsync(log);
if (result.IsSuccess)
successCount++;
else
failCount++;
// 영구 오류: 디바이스 비활성화
if (result.ShouldRemoveDevice)
{
device.IsActive = false;
device.UpdatedAt = DateTime.UtcNow;
deviceRepo.Update(device);
}
}
// [6] DailyStat 증분 업데이트
var today = DateOnly.FromDateTime(DateTime.UtcNow);
var stats = await dailyStatRepo.FindAsync(
s => s.ServiceId == pushMessage.ServiceId && s.StatDate == today);
var stat = stats.FirstOrDefault();
if (stat != null)
{
stat.SentCnt += allResults.Count;
stat.SuccessCnt += successCount;
stat.FailCnt += failCount;
dailyStatRepo.Update(stat);
}
else
{
await dailyStatRepo.AddAsync(new DailyStat
{
ServiceId = pushMessage.ServiceId,
StatDate = today,
SentCnt = allResults.Count,
SuccessCnt = successCount,
FailCnt = failCount,
CreatedAt = DateTime.UtcNow
});
}
await unitOfWork.SaveChangesAsync(ct);
_logger.LogInformation(
"푸시 발송 완료: requestId={RequestId}, 성공={Success}, 실패={Fail}, 총={Total}",
pushMessage.RequestId, successCount, failCount, allResults.Count);
// [6.5] 웹훅 발송
var webhookEvent = failCount > 0 && successCount == 0
? WebhookEvent.PushFailed
: WebhookEvent.PushSent;
_ = _webhookService.SendAsync(pushMessage.ServiceId, webhookEvent, new
{
request_id = pushMessage.RequestId,
message_id = pushMessage.MessageId,
send_type = pushMessage.SendType,
success_count = successCount,
fail_count = failCount,
total_count = allResults.Count
});
// [7] Bulk job 진행률 업데이트
if (!string.IsNullOrEmpty(pushMessage.JobId))
{
if (successCount > 0)
await _bulkJobStore.IncrementSentAsync(pushMessage.JobId, ct);
else
await _bulkJobStore.IncrementFailedAsync(pushMessage.JobId, ct);
await _bulkJobStore.TryCompleteAsync(pushMessage.JobId, ct);
}
// [8] ACK
await channel.BasicAckAsync(ea.DeliveryTag, false, ct);
}
private async Task<List<Device>> ResolveDevicesAsync(
IDeviceRepository deviceRepo, ITagRepository tagRepo, PushMessageDto message, CancellationToken ct)
{
var sendType = message.SendType.ToLowerInvariant();
switch (sendType)
{
case "single":
{
var deviceIds = ParseDeviceIds(message.Target);
if (deviceIds.Count == 0) return [];
var devices = new List<Device>();
foreach (var id in deviceIds)
{
// Redis 캐시 우선 조회
var cached = await _tokenCache.GetDeviceInfoAsync(message.ServiceId, id);
if (cached != null)
{
if (cached.IsActive && cached.PushAgreed)
{
devices.Add(new Device
{
Id = id,
ServiceId = message.ServiceId,
DeviceToken = cached.Token,
Platform = (Platform)cached.Platform,
IsActive = true,
PushAgreed = true
});
}
continue;
}
// 캐시 미스 → DB 조회 후 캐시 저장
var device = await deviceRepo.GetByIdAndServiceAsync(id, message.ServiceId);
if (device != null)
{
await _tokenCache.SetDeviceInfoAsync(message.ServiceId, id,
new CachedDeviceInfo(device.DeviceToken, (int)device.Platform,
device.IsActive, device.PushAgreed));
if (device is { IsActive: true, PushAgreed: true })
devices.Add(device);
}
}
return devices;
}
case "group":
{
// tag_code(string) 목록 파싱
var tagCodes = ParseTags(message.Target);
if (tagCodes.Count == 0) return [];
// tag_code → tag_id 변환
var tags = await tagRepo.GetByTagCodesAndServiceAsync(tagCodes, message.ServiceId);
var tagIdStrings = tags.Select(t => t.Id.ToString()).ToHashSet();
if (tagIdStrings.Count == 0) return [];
var allDevices = await deviceRepo.FindAsync(
d => d.ServiceId == message.ServiceId && d.IsActive && d.PushAgreed);
return allDevices
.Where(d => !string.IsNullOrEmpty(d.Tags) &&
tagIdStrings.Any(tagIdStr => d.Tags.Contains(tagIdStr)))
.ToList();
}
case "broadcast":
{
var allDevices = await deviceRepo.FindAsync(
d => d.ServiceId == message.ServiceId && d.IsActive && d.PushAgreed);
return allDevices.ToList();
}
default:
_logger.LogWarning("지원하지 않는 send_type: {SendType}", sendType);
return [];
}
}
private static List<long> ParseDeviceIds(PushTargetDto target)
{
if (target.Value == null) return [];
try
{
if (target.Value.Value.ValueKind == JsonValueKind.Array)
{
return target.Value.Value.EnumerateArray()
.Where(e => e.ValueKind == JsonValueKind.Number)
.Select(e => e.GetInt64())
.ToList();
}
}
catch { }
return [];
}
private static List<string> ParseTags(PushTargetDto target)
{
if (target.Value == null) return [];
try
{
var element = target.Value.Value;
// { tags: [...], match: "or" } 형태
if (element.ValueKind == JsonValueKind.Object &&
element.TryGetProperty("tags", out var tagsElement) &&
tagsElement.ValueKind == JsonValueKind.Array)
{
return tagsElement.EnumerateArray()
.Where(e => e.ValueKind == JsonValueKind.String)
.Select(e => e.GetString()!)
.ToList();
}
// 직접 배열 형태 (호환)
if (element.ValueKind == JsonValueKind.Array)
{
return element.EnumerateArray()
.Where(e => e.ValueKind == JsonValueKind.String)
.Select(e => e.GetString()!)
.ToList();
}
}
catch { }
return [];
}
private async Task<List<(Device Device, PushResultDto Result)>> SendFcmBatchAsync(
string fcmCredentials, List<Device> devices, PushMessageDto message,
Dictionary<string, string>? customData, CancellationToken ct)
{
var results = new List<(Device, PushResultDto)>();
var tokens = devices.Select(d => d.DeviceToken).ToList();
var batchResults = await _fcmSender.SendBatchAsync(
fcmCredentials, tokens, message.Title, message.Body,
message.ImageUrl, customData, ct);
for (int i = 0; i < batchResults.Count && i < devices.Count; i++)
results.Add((devices[i], batchResults[i]));
return results;
}
private async Task<List<(Device Device, PushResultDto Result)>> SendApnsBatchAsync(
Service service, List<Device> devices, PushMessageDto message,
Dictionary<string, string>? customData, CancellationToken ct)
{
var results = new List<(Device, PushResultDto)>();
var tokens = devices.Select(d => d.DeviceToken).ToList();
var batchResults = await _apnsSender.SendBatchAsync(
service.ApnsPrivateKey!, service.ApnsKeyId!, service.ApnsTeamId!, service.ApnsBundleId!,
tokens, message.Title, message.Body,
message.ImageUrl, customData, ct);
for (int i = 0; i < batchResults.Count && i < devices.Count; i++)
results.Add((devices[i], batchResults[i]));
return results;
}
private async Task HandleRetryAsync(IChannel channel, BasicDeliverEventArgs ea, CancellationToken ct)
{
var retryCount = 0;
if (ea.BasicProperties.Headers?.TryGetValue("x-retry-count", out var retryObj) == true)
{
retryCount = retryObj is int r ? r : 0;
}
if (retryCount < MaxRetryCount)
{
_logger.LogWarning("메시지 재시도 ({RetryCount}/{MaxRetry})", retryCount + 1, MaxRetryCount);
await channel.BasicNackAsync(ea.DeliveryTag, false, true, ct);
}
else
{
_logger.LogError("메시지 최대 재시도 초과 — 폐기: deliveryTag={Tag}", ea.DeliveryTag);
await channel.BasicNackAsync(ea.DeliveryTag, false, false, ct);
}
}
}