diff --git a/SPMS.API/Controllers/PushController.cs b/SPMS.API/Controllers/PushController.cs index dd72d66..77ce4ea 100644 --- a/SPMS.API/Controllers/PushController.cs +++ b/SPMS.API/Controllers/PushController.cs @@ -63,6 +63,41 @@ public class PushController : ControllerBase return Ok(ApiResponse.Success(result)); } + [HttpPost("send/bulk")] + [SwaggerOperation(Summary = "대용량 발송", Description = "CSV 파일로 대량 푸시 발송을 요청합니다.")] + public async Task SendBulkAsync(IFormFile file, [FromForm(Name = "message_code")] string messageCode) + { + var serviceId = GetServiceId(); + + if (file == null || file.Length == 0) + throw new Domain.Exceptions.SpmsException(ErrorCodes.BadRequest, "CSV 파일은 필수입니다.", 400); + + if (string.IsNullOrWhiteSpace(messageCode)) + throw new Domain.Exceptions.SpmsException(ErrorCodes.BadRequest, "message_code는 필수입니다.", 400); + + await using var stream = file.OpenReadStream(); + var result = await _pushService.SendBulkAsync(serviceId, stream, messageCode); + return Ok(ApiResponse.Success(result, "발송 요청이 접수되었습니다.")); + } + + [HttpPost("job/status")] + [SwaggerOperation(Summary = "발송 상태 조회", Description = "대용량/태그 발송 작업의 상태를 조회합니다.")] + public async Task GetJobStatusAsync([FromBody] JobStatusRequestDto request) + { + var serviceId = GetServiceId(); + var result = await _pushService.GetJobStatusAsync(serviceId, request); + return Ok(ApiResponse.Success(result, "조회 성공")); + } + + [HttpPost("job/cancel")] + [SwaggerOperation(Summary = "발송 취소", Description = "대기 중이거나 처리 중인 작업을 취소합니다.")] + public async Task CancelJobAsync([FromBody] JobCancelRequestDto request) + { + var serviceId = GetServiceId(); + var result = await _pushService.CancelJobAsync(serviceId, request); + return Ok(ApiResponse.Success(result, "발송이 취소되었습니다.")); + } + private long GetServiceId() { if (HttpContext.Items.TryGetValue("ServiceId", out var serviceIdObj) && serviceIdObj is long serviceId) diff --git a/SPMS.Application/DTOs/Push/BulkJobInfo.cs b/SPMS.Application/DTOs/Push/BulkJobInfo.cs new file mode 100644 index 0000000..f32f498 --- /dev/null +++ b/SPMS.Application/DTOs/Push/BulkJobInfo.cs @@ -0,0 +1,13 @@ +namespace SPMS.Application.DTOs.Push; + +public class BulkJobInfo +{ + public string JobId { get; set; } = string.Empty; + public string Status { get; set; } = "queued"; + public long ServiceId { get; set; } + public int TotalCount { get; set; } + public int SentCount { get; set; } + public int FailedCount { get; set; } + public DateTime? StartedAt { get; set; } + public DateTime? CompletedAt { get; set; } +} diff --git a/SPMS.Application/DTOs/Push/BulkSendResponseDto.cs b/SPMS.Application/DTOs/Push/BulkSendResponseDto.cs new file mode 100644 index 0000000..9e4ada5 --- /dev/null +++ b/SPMS.Application/DTOs/Push/BulkSendResponseDto.cs @@ -0,0 +1,15 @@ +using System.Text.Json.Serialization; + +namespace SPMS.Application.DTOs.Push; + +public class BulkSendResponseDto +{ + [JsonPropertyName("job_id")] + public string JobId { get; set; } = string.Empty; + + [JsonPropertyName("status")] + public string Status { get; set; } = "queued"; + + [JsonPropertyName("total_count")] + public int TotalCount { get; set; } +} diff --git a/SPMS.Application/DTOs/Push/JobCancelRequestDto.cs b/SPMS.Application/DTOs/Push/JobCancelRequestDto.cs new file mode 100644 index 0000000..d8038ef --- /dev/null +++ b/SPMS.Application/DTOs/Push/JobCancelRequestDto.cs @@ -0,0 +1,9 @@ +using System.Text.Json.Serialization; + +namespace SPMS.Application.DTOs.Push; + +public class JobCancelRequestDto +{ + [JsonPropertyName("job_id")] + public string JobId { get; set; } = string.Empty; +} diff --git a/SPMS.Application/DTOs/Push/JobCancelResponseDto.cs b/SPMS.Application/DTOs/Push/JobCancelResponseDto.cs new file mode 100644 index 0000000..66df29d --- /dev/null +++ b/SPMS.Application/DTOs/Push/JobCancelResponseDto.cs @@ -0,0 +1,15 @@ +using System.Text.Json.Serialization; + +namespace SPMS.Application.DTOs.Push; + +public class JobCancelResponseDto +{ + [JsonPropertyName("job_id")] + public string JobId { get; set; } = string.Empty; + + [JsonPropertyName("status")] + public string Status { get; set; } = "cancelled"; + + [JsonPropertyName("cancelled_count")] + public int CancelledCount { get; set; } +} diff --git a/SPMS.Application/DTOs/Push/JobStatusRequestDto.cs b/SPMS.Application/DTOs/Push/JobStatusRequestDto.cs new file mode 100644 index 0000000..934a0f9 --- /dev/null +++ b/SPMS.Application/DTOs/Push/JobStatusRequestDto.cs @@ -0,0 +1,9 @@ +using System.Text.Json.Serialization; + +namespace SPMS.Application.DTOs.Push; + +public class JobStatusRequestDto +{ + [JsonPropertyName("job_id")] + public string JobId { get; set; } = string.Empty; +} diff --git a/SPMS.Application/DTOs/Push/JobStatusResponseDto.cs b/SPMS.Application/DTOs/Push/JobStatusResponseDto.cs new file mode 100644 index 0000000..8371c35 --- /dev/null +++ b/SPMS.Application/DTOs/Push/JobStatusResponseDto.cs @@ -0,0 +1,30 @@ +using System.Text.Json.Serialization; + +namespace SPMS.Application.DTOs.Push; + +public class JobStatusResponseDto +{ + [JsonPropertyName("job_id")] + public string JobId { get; set; } = string.Empty; + + [JsonPropertyName("status")] + public string Status { get; set; } = string.Empty; + + [JsonPropertyName("total_count")] + public int TotalCount { get; set; } + + [JsonPropertyName("sent_count")] + public int SentCount { get; set; } + + [JsonPropertyName("failed_count")] + public int FailedCount { get; set; } + + [JsonPropertyName("progress")] + public int Progress { get; set; } + + [JsonPropertyName("started_at")] + public DateTime? StartedAt { get; set; } + + [JsonPropertyName("completed_at")] + public DateTime? CompletedAt { get; set; } +} diff --git a/SPMS.Application/DTOs/Push/PushMessageDto.cs b/SPMS.Application/DTOs/Push/PushMessageDto.cs index 379949a..00a41ad 100644 --- a/SPMS.Application/DTOs/Push/PushMessageDto.cs +++ b/SPMS.Application/DTOs/Push/PushMessageDto.cs @@ -39,4 +39,7 @@ public class PushMessageDto [JsonPropertyName("created_at")] public string CreatedAt { get; set; } = string.Empty; + + [JsonPropertyName("job_id")] + public string? JobId { get; set; } } diff --git a/SPMS.Application/Interfaces/IBulkJobStore.cs b/SPMS.Application/Interfaces/IBulkJobStore.cs new file mode 100644 index 0000000..cee5146 --- /dev/null +++ b/SPMS.Application/Interfaces/IBulkJobStore.cs @@ -0,0 +1,15 @@ +using SPMS.Application.DTOs.Push; + +namespace SPMS.Application.Interfaces; + +public interface IBulkJobStore +{ + Task CreateJobAsync(long serviceId, int totalCount, CancellationToken ct = default); + Task GetJobAsync(string jobId, CancellationToken ct = default); + Task SetProcessingAsync(string jobId, CancellationToken ct = default); + Task IncrementSentAsync(string jobId, CancellationToken ct = default); + Task IncrementFailedAsync(string jobId, CancellationToken ct = default); + Task TryCompleteAsync(string jobId, CancellationToken ct = default); + Task IsCancelledAsync(string jobId, CancellationToken ct = default); + Task CancelAsync(string jobId, CancellationToken ct = default); +} diff --git a/SPMS.Application/Interfaces/IPushService.cs b/SPMS.Application/Interfaces/IPushService.cs index 3360979..226435d 100644 --- a/SPMS.Application/Interfaces/IPushService.cs +++ b/SPMS.Application/Interfaces/IPushService.cs @@ -9,4 +9,7 @@ public interface IPushService Task ScheduleAsync(long serviceId, PushScheduleRequestDto request); Task CancelScheduleAsync(PushScheduleCancelRequestDto request); Task GetLogAsync(long serviceId, PushLogRequestDto request); + Task SendBulkAsync(long serviceId, Stream csvStream, string messageCode); + Task GetJobStatusAsync(long serviceId, JobStatusRequestDto request); + Task CancelJobAsync(long serviceId, JobCancelRequestDto request); } diff --git a/SPMS.Application/Services/PushService.cs b/SPMS.Application/Services/PushService.cs index 993dd9a..315221a 100644 --- a/SPMS.Application/Services/PushService.cs +++ b/SPMS.Application/Services/PushService.cs @@ -16,17 +16,20 @@ public class PushService : IPushService private readonly IPushQueueService _pushQueueService; private readonly IScheduleCancelStore _scheduleCancelStore; private readonly IPushSendLogRepository _pushSendLogRepository; + private readonly IBulkJobStore _bulkJobStore; public PushService( IMessageRepository messageRepository, IPushQueueService pushQueueService, IScheduleCancelStore scheduleCancelStore, - IPushSendLogRepository pushSendLogRepository) + IPushSendLogRepository pushSendLogRepository, + IBulkJobStore bulkJobStore) { _messageRepository = messageRepository; _pushQueueService = pushQueueService; _scheduleCancelStore = scheduleCancelStore; _pushSendLogRepository = pushSendLogRepository; + _bulkJobStore = bulkJobStore; } public async Task SendAsync(long serviceId, PushSendRequestDto request) @@ -263,6 +266,157 @@ public class PushService : IPushService }; } + public async Task SendBulkAsync(long serviceId, Stream csvStream, string messageCode) + { + var message = await _messageRepository.GetByMessageCodeAndServiceAsync(messageCode, serviceId); + if (message == null) + throw new SpmsException(ErrorCodes.MessageNotFound, "존재하지 않는 메시지 코드입니다.", 404); + + var rows = await ParseCsvAsync(csvStream); + if (rows.Count == 0) + throw new SpmsException(ErrorCodes.BadRequest, "CSV 파일에 유효한 데이터가 없습니다.", 400); + + var jobId = await _bulkJobStore.CreateJobAsync(serviceId, rows.Count); + + foreach (var row in rows) + { + var variables = row.Variables; + var title = ApplyVariables(message.Title, variables); + var body = ApplyVariables(message.Body, variables); + + var requestId = Guid.NewGuid().ToString("N"); + + var pushMessage = new PushMessageDto + { + MessageId = message.Id.ToString(), + RequestId = requestId, + ServiceId = serviceId, + SendType = "single", + Title = title, + Body = body, + ImageUrl = message.ImageUrl, + LinkUrl = message.LinkUrl, + CustomData = ParseCustomData(message.CustomData), + Target = new PushTargetDto + { + Type = "device_ids", + Value = JsonSerializer.SerializeToElement(new[] { row.DeviceId }) + }, + CreatedBy = message.CreatedBy, + CreatedAt = DateTime.UtcNow.ToString("o"), + JobId = jobId + }; + + await _pushQueueService.PublishPushMessageAsync(pushMessage); + } + + return new BulkSendResponseDto + { + JobId = jobId, + Status = "queued", + TotalCount = rows.Count + }; + } + + public async Task GetJobStatusAsync(long serviceId, JobStatusRequestDto request) + { + if (string.IsNullOrWhiteSpace(request.JobId)) + throw new SpmsException(ErrorCodes.BadRequest, "job_id는 필수입니다.", 400); + + var job = await _bulkJobStore.GetJobAsync(request.JobId); + if (job == null) + throw new SpmsException(ErrorCodes.JobNotFound, "존재하지 않는 작업 ID입니다.", 404); + + if (job.ServiceId != serviceId) + throw new SpmsException(ErrorCodes.JobNotFound, "존재하지 않는 작업 ID입니다.", 404); + + var progress = job.TotalCount > 0 + ? (int)Math.Round((double)(job.SentCount + job.FailedCount) / job.TotalCount * 100) + : 0; + + return new JobStatusResponseDto + { + JobId = job.JobId, + Status = job.Status, + TotalCount = job.TotalCount, + SentCount = job.SentCount, + FailedCount = job.FailedCount, + Progress = progress, + StartedAt = job.StartedAt, + CompletedAt = job.CompletedAt + }; + } + + public async Task CancelJobAsync(long serviceId, JobCancelRequestDto request) + { + if (string.IsNullOrWhiteSpace(request.JobId)) + throw new SpmsException(ErrorCodes.BadRequest, "job_id는 필수입니다.", 400); + + var job = await _bulkJobStore.GetJobAsync(request.JobId); + if (job == null) + throw new SpmsException(ErrorCodes.JobNotFound, "존재하지 않는 작업 ID입니다.", 404); + + if (job.ServiceId != serviceId) + throw new SpmsException(ErrorCodes.JobNotFound, "존재하지 않는 작업 ID입니다.", 404); + + if (job.Status == "completed" || job.Status == "cancelled" || job.Status == "failed") + throw new SpmsException(ErrorCodes.JobAlreadyCompleted, "이미 완료되었거나 취소된 작업입니다.", 400); + + var cancelledCount = await _bulkJobStore.CancelAsync(request.JobId); + + return new JobCancelResponseDto + { + JobId = request.JobId, + Status = "cancelled", + CancelledCount = cancelledCount + }; + } + + private static async Task> ParseCsvAsync(Stream stream) + { + var rows = new List(); + + using var reader = new StreamReader(stream); + var headerLine = await reader.ReadLineAsync(); + if (string.IsNullOrWhiteSpace(headerLine)) + return rows; + + var headers = headerLine.Split(',').Select(h => h.Trim()).ToArray(); + var deviceIdIndex = Array.FindIndex(headers, h => h.Equals("device_id", StringComparison.OrdinalIgnoreCase)); + if (deviceIdIndex < 0) + throw new SpmsException(ErrorCodes.BadRequest, "CSV 헤더에 device_id 컬럼이 필요합니다.", 400); + + while (await reader.ReadLineAsync() is { } line) + { + if (string.IsNullOrWhiteSpace(line)) + continue; + + var values = line.Split(',').Select(v => v.Trim()).ToArray(); + if (values.Length <= deviceIdIndex) + continue; + + if (!long.TryParse(values[deviceIdIndex], out var deviceId)) + continue; + + var variables = new Dictionary(); + for (var i = 0; i < headers.Length && i < values.Length; i++) + { + if (i == deviceIdIndex) continue; + variables[headers[i]] = values[i]; + } + + rows.Add(new CsvRow { DeviceId = deviceId, Variables = variables }); + } + + return rows; + } + + private class CsvRow + { + public long DeviceId { get; init; } + public Dictionary Variables { get; init; } = new(); + } + private static string ApplyVariables(string template, Dictionary? variables) { if (variables == null || variables.Count == 0) diff --git a/SPMS.Domain/Common/ErrorCodes.cs b/SPMS.Domain/Common/ErrorCodes.cs index 552aa20..ef87aa1 100644 --- a/SPMS.Domain/Common/ErrorCodes.cs +++ b/SPMS.Domain/Common/ErrorCodes.cs @@ -42,6 +42,8 @@ public static class ErrorCodes // === Push (6) === public const string PushSendFailed = "161"; public const string PushStateChangeNotAllowed = "162"; + public const string JobNotFound = "163"; + public const string JobAlreadyCompleted = "164"; // === File (8) === public const string FileNotFound = "181"; diff --git a/SPMS.Infrastructure/Caching/BulkJobStore.cs b/SPMS.Infrastructure/Caching/BulkJobStore.cs new file mode 100644 index 0000000..e03c3ef --- /dev/null +++ b/SPMS.Infrastructure/Caching/BulkJobStore.cs @@ -0,0 +1,152 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using SPMS.Application.DTOs.Push; +using SPMS.Application.Interfaces; +using SPMS.Application.Settings; +using StackExchange.Redis; + +namespace SPMS.Infrastructure.Caching; + +public class BulkJobStore : IBulkJobStore +{ + private static readonly TimeSpan JobTtl = TimeSpan.FromDays(7); + + private readonly RedisConnection _redis; + private readonly RedisSettings _settings; + private readonly ILogger _logger; + + public BulkJobStore( + RedisConnection redis, + IOptions settings, + ILogger logger) + { + _redis = redis; + _settings = settings.Value; + _logger = logger; + } + + private string Key(string jobId) => $"{_settings.InstanceName}bulk_job:{jobId}"; + + public async Task CreateJobAsync(long serviceId, int totalCount, CancellationToken ct = default) + { + var jobId = $"bulk_{DateTime.UtcNow:yyyyMMdd}_{Guid.NewGuid().ToString("N")[..8]}"; + var key = Key(jobId); + + var db = await _redis.GetDatabaseAsync(); + var entries = new HashEntry[] + { + new("status", "queued"), + new("service_id", serviceId), + new("total_count", totalCount), + new("sent_count", 0), + new("failed_count", 0), + new("started_at", ""), + new("completed_at", "") + }; + + await db.HashSetAsync(key, entries); + await db.KeyExpireAsync(key, JobTtl); + + _logger.LogInformation("Bulk job 생성: jobId={JobId}, totalCount={Total}", jobId, totalCount); + return jobId; + } + + public async Task GetJobAsync(string jobId, CancellationToken ct = default) + { + var db = await _redis.GetDatabaseAsync(); + var entries = await db.HashGetAllAsync(Key(jobId)); + if (entries.Length == 0) + return null; + + var dict = entries.ToDictionary(e => (string)e.Name!, e => (string)e.Value!); + + return new BulkJobInfo + { + JobId = jobId, + Status = dict.GetValueOrDefault("status", "unknown"), + ServiceId = long.TryParse(dict.GetValueOrDefault("service_id"), out var sid) ? sid : 0, + TotalCount = int.TryParse(dict.GetValueOrDefault("total_count"), out var tc) ? tc : 0, + SentCount = int.TryParse(dict.GetValueOrDefault("sent_count"), out var sc) ? sc : 0, + FailedCount = int.TryParse(dict.GetValueOrDefault("failed_count"), out var fc) ? fc : 0, + StartedAt = DateTime.TryParse(dict.GetValueOrDefault("started_at"), out var sa) ? sa : null, + CompletedAt = DateTime.TryParse(dict.GetValueOrDefault("completed_at"), out var ca) ? ca : null + }; + } + + public async Task SetProcessingAsync(string jobId, CancellationToken ct = default) + { + var db = await _redis.GetDatabaseAsync(); + var key = Key(jobId); + + var currentStatus = (string?)(await db.HashGetAsync(key, "status")); + if (currentStatus == "queued") + { + await db.HashSetAsync(key, [ + new HashEntry("status", "processing"), + new HashEntry("started_at", DateTime.UtcNow.ToString("o")) + ]); + } + } + + public async Task IncrementSentAsync(string jobId, CancellationToken ct = default) + { + var db = await _redis.GetDatabaseAsync(); + await db.HashIncrementAsync(Key(jobId), "sent_count"); + } + + public async Task IncrementFailedAsync(string jobId, CancellationToken ct = default) + { + var db = await _redis.GetDatabaseAsync(); + await db.HashIncrementAsync(Key(jobId), "failed_count"); + } + + public async Task TryCompleteAsync(string jobId, CancellationToken ct = default) + { + var db = await _redis.GetDatabaseAsync(); + var key = Key(jobId); + + var values = await db.HashGetAsync(key, ["status", "total_count", "sent_count", "failed_count"]); + var status = (string?)values[0]; + var total = (int?)values[1] ?? 0; + var sent = (int?)values[2] ?? 0; + var failed = (int?)values[3] ?? 0; + + if (status == "processing" && sent + failed >= total) + { + await db.HashSetAsync(key, [ + new HashEntry("status", "completed"), + new HashEntry("completed_at", DateTime.UtcNow.ToString("o")) + ]); + _logger.LogInformation("Bulk job 완료: jobId={JobId}, sent={Sent}, failed={Failed}", + jobId, sent, failed); + } + } + + public async Task IsCancelledAsync(string jobId, CancellationToken ct = default) + { + var db = await _redis.GetDatabaseAsync(); + var status = (string?)(await db.HashGetAsync(Key(jobId), "status")); + return status == "cancelled"; + } + + public async Task CancelAsync(string jobId, CancellationToken ct = default) + { + var db = await _redis.GetDatabaseAsync(); + var key = Key(jobId); + + var values = await db.HashGetAsync(key, ["total_count", "sent_count", "failed_count"]); + var total = (int?)values[0] ?? 0; + var sent = (int?)values[1] ?? 0; + var failed = (int?)values[2] ?? 0; + + var cancelledCount = Math.Max(0, total - sent - failed); + + await db.HashSetAsync(key, [ + new HashEntry("status", "cancelled"), + new HashEntry("completed_at", DateTime.UtcNow.ToString("o")) + ]); + + _logger.LogInformation("Bulk job 취소: jobId={JobId}, cancelled={Cancelled}", jobId, cancelledCount); + return cancelledCount; + } +} diff --git a/SPMS.Infrastructure/DependencyInjection.cs b/SPMS.Infrastructure/DependencyInjection.cs index 87d794f..1a867c3 100644 --- a/SPMS.Infrastructure/DependencyInjection.cs +++ b/SPMS.Infrastructure/DependencyInjection.cs @@ -54,6 +54,7 @@ public static class DependencyInjection services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); // RabbitMQ services.Configure(configuration.GetSection(RabbitMQSettings.SectionName)); diff --git a/SPMS.Infrastructure/Workers/PushWorker.cs b/SPMS.Infrastructure/Workers/PushWorker.cs index db47dc5..92c7a4a 100644 --- a/SPMS.Infrastructure/Workers/PushWorker.cs +++ b/SPMS.Infrastructure/Workers/PushWorker.cs @@ -24,6 +24,7 @@ public class PushWorker : BackgroundService private readonly RabbitMQSettings _rabbitSettings; private readonly IServiceScopeFactory _scopeFactory; private readonly IDuplicateChecker _duplicateChecker; + private readonly IBulkJobStore _bulkJobStore; private readonly IFcmSender _fcmSender; private readonly IApnsSender _apnsSender; private readonly ILogger _logger; @@ -33,6 +34,7 @@ public class PushWorker : BackgroundService IOptions rabbitSettings, IServiceScopeFactory scopeFactory, IDuplicateChecker duplicateChecker, + IBulkJobStore bulkJobStore, IFcmSender fcmSender, IApnsSender apnsSender, ILogger logger) @@ -41,6 +43,7 @@ public class PushWorker : BackgroundService _rabbitSettings = rabbitSettings.Value; _scopeFactory = scopeFactory; _duplicateChecker = duplicateChecker; + _bulkJobStore = bulkJobStore; _fcmSender = fcmSender; _apnsSender = apnsSender; _logger = logger; @@ -115,6 +118,16 @@ public class PushWorker : BackgroundService return; } + // [0] Bulk job 취소 체크 + if (!string.IsNullOrEmpty(pushMessage.JobId) && + await _bulkJobStore.IsCancelledAsync(pushMessage.JobId, ct)) + { + _logger.LogInformation("Bulk job 취소됨 — 스킵: jobId={JobId}, requestId={RequestId}", + pushMessage.JobId, pushMessage.RequestId); + await channel.BasicAckAsync(ea.DeliveryTag, false, ct); + return; + } + // [1] Redis 중복 체크 if (await _duplicateChecker.IsDuplicateAsync(pushMessage.RequestId, ct)) { @@ -123,6 +136,10 @@ public class PushWorker : BackgroundService return; } + // [1.5] Bulk job processing 상태 전환 + if (!string.IsNullOrEmpty(pushMessage.JobId)) + await _bulkJobStore.SetProcessingAsync(pushMessage.JobId, ct); + using var scope = _scopeFactory.CreateScope(); var serviceRepo = scope.ServiceProvider.GetRequiredService(); var deviceRepo = scope.ServiceProvider.GetRequiredService(); @@ -242,6 +259,17 @@ public class PushWorker : BackgroundService "푸시 발송 완료: requestId={RequestId}, 성공={Success}, 실패={Fail}, 총={Total}", pushMessage.RequestId, successCount, failCount, allResults.Count); + // [7] Bulk job 진행률 업데이트 + if (!string.IsNullOrEmpty(pushMessage.JobId)) + { + if (successCount > 0) + await _bulkJobStore.IncrementSentAsync(pushMessage.JobId, ct); + else + await _bulkJobStore.IncrementFailedAsync(pushMessage.JobId, ct); + + await _bulkJobStore.TryCompleteAsync(pushMessage.JobId, ct); + } + // [8] ACK await channel.BasicAckAsync(ea.DeliveryTag, false, ct); }