feat: 대용량 발송/상태조회/취소 API 구현 (#130)

- POST /v1/in/push/send/bulk: CSV 대량 발송 (비동기)
- POST /v1/in/push/job/status: Job 상태 조회
- POST /v1/in/push/job/cancel: Job 취소
- BulkJobStore: Redis Hash 기반 Job 상태 관리
- PushWorker: Job 진행률 추적 및 취소 체크

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
seonkyu.kim 2026-02-10 22:55:39 +09:00
parent dc487609b3
commit 830cbf2edc
15 changed files with 485 additions and 1 deletions

View File

@ -63,6 +63,41 @@ public class PushController : ControllerBase
return Ok(ApiResponse<PushLogResponseDto>.Success(result));
}
[HttpPost("send/bulk")]
[SwaggerOperation(Summary = "대용량 발송", Description = "CSV 파일로 대량 푸시 발송을 요청합니다.")]
public async Task<IActionResult> SendBulkAsync(IFormFile file, [FromForm(Name = "message_code")] string messageCode)
{
var serviceId = GetServiceId();
if (file == null || file.Length == 0)
throw new Domain.Exceptions.SpmsException(ErrorCodes.BadRequest, "CSV 파일은 필수입니다.", 400);
if (string.IsNullOrWhiteSpace(messageCode))
throw new Domain.Exceptions.SpmsException(ErrorCodes.BadRequest, "message_code는 필수입니다.", 400);
await using var stream = file.OpenReadStream();
var result = await _pushService.SendBulkAsync(serviceId, stream, messageCode);
return Ok(ApiResponse<BulkSendResponseDto>.Success(result, "발송 요청이 접수되었습니다."));
}
[HttpPost("job/status")]
[SwaggerOperation(Summary = "발송 상태 조회", Description = "대용량/태그 발송 작업의 상태를 조회합니다.")]
public async Task<IActionResult> GetJobStatusAsync([FromBody] JobStatusRequestDto request)
{
var serviceId = GetServiceId();
var result = await _pushService.GetJobStatusAsync(serviceId, request);
return Ok(ApiResponse<JobStatusResponseDto>.Success(result, "조회 성공"));
}
[HttpPost("job/cancel")]
[SwaggerOperation(Summary = "발송 취소", Description = "대기 중이거나 처리 중인 작업을 취소합니다.")]
public async Task<IActionResult> CancelJobAsync([FromBody] JobCancelRequestDto request)
{
var serviceId = GetServiceId();
var result = await _pushService.CancelJobAsync(serviceId, request);
return Ok(ApiResponse<JobCancelResponseDto>.Success(result, "발송이 취소되었습니다."));
}
private long GetServiceId()
{
if (HttpContext.Items.TryGetValue("ServiceId", out var serviceIdObj) && serviceIdObj is long serviceId)

View File

@ -0,0 +1,13 @@
namespace SPMS.Application.DTOs.Push;
public class BulkJobInfo
{
public string JobId { get; set; } = string.Empty;
public string Status { get; set; } = "queued";
public long ServiceId { get; set; }
public int TotalCount { get; set; }
public int SentCount { get; set; }
public int FailedCount { get; set; }
public DateTime? StartedAt { get; set; }
public DateTime? CompletedAt { get; set; }
}

View File

@ -0,0 +1,15 @@
using System.Text.Json.Serialization;
namespace SPMS.Application.DTOs.Push;
public class BulkSendResponseDto
{
[JsonPropertyName("job_id")]
public string JobId { get; set; } = string.Empty;
[JsonPropertyName("status")]
public string Status { get; set; } = "queued";
[JsonPropertyName("total_count")]
public int TotalCount { get; set; }
}

View File

@ -0,0 +1,9 @@
using System.Text.Json.Serialization;
namespace SPMS.Application.DTOs.Push;
public class JobCancelRequestDto
{
[JsonPropertyName("job_id")]
public string JobId { get; set; } = string.Empty;
}

View File

@ -0,0 +1,15 @@
using System.Text.Json.Serialization;
namespace SPMS.Application.DTOs.Push;
public class JobCancelResponseDto
{
[JsonPropertyName("job_id")]
public string JobId { get; set; } = string.Empty;
[JsonPropertyName("status")]
public string Status { get; set; } = "cancelled";
[JsonPropertyName("cancelled_count")]
public int CancelledCount { get; set; }
}

View File

@ -0,0 +1,9 @@
using System.Text.Json.Serialization;
namespace SPMS.Application.DTOs.Push;
public class JobStatusRequestDto
{
[JsonPropertyName("job_id")]
public string JobId { get; set; } = string.Empty;
}

View File

@ -0,0 +1,30 @@
using System.Text.Json.Serialization;
namespace SPMS.Application.DTOs.Push;
public class JobStatusResponseDto
{
[JsonPropertyName("job_id")]
public string JobId { get; set; } = string.Empty;
[JsonPropertyName("status")]
public string Status { get; set; } = string.Empty;
[JsonPropertyName("total_count")]
public int TotalCount { get; set; }
[JsonPropertyName("sent_count")]
public int SentCount { get; set; }
[JsonPropertyName("failed_count")]
public int FailedCount { get; set; }
[JsonPropertyName("progress")]
public int Progress { get; set; }
[JsonPropertyName("started_at")]
public DateTime? StartedAt { get; set; }
[JsonPropertyName("completed_at")]
public DateTime? CompletedAt { get; set; }
}

View File

@ -39,4 +39,7 @@ public class PushMessageDto
[JsonPropertyName("created_at")]
public string CreatedAt { get; set; } = string.Empty;
[JsonPropertyName("job_id")]
public string? JobId { get; set; }
}

View File

@ -0,0 +1,15 @@
using SPMS.Application.DTOs.Push;
namespace SPMS.Application.Interfaces;
public interface IBulkJobStore
{
Task<string> CreateJobAsync(long serviceId, int totalCount, CancellationToken ct = default);
Task<BulkJobInfo?> GetJobAsync(string jobId, CancellationToken ct = default);
Task SetProcessingAsync(string jobId, CancellationToken ct = default);
Task IncrementSentAsync(string jobId, CancellationToken ct = default);
Task IncrementFailedAsync(string jobId, CancellationToken ct = default);
Task TryCompleteAsync(string jobId, CancellationToken ct = default);
Task<bool> IsCancelledAsync(string jobId, CancellationToken ct = default);
Task<int> CancelAsync(string jobId, CancellationToken ct = default);
}

View File

@ -9,4 +9,7 @@ public interface IPushService
Task<PushScheduleResponseDto> ScheduleAsync(long serviceId, PushScheduleRequestDto request);
Task CancelScheduleAsync(PushScheduleCancelRequestDto request);
Task<PushLogResponseDto> GetLogAsync(long serviceId, PushLogRequestDto request);
Task<BulkSendResponseDto> SendBulkAsync(long serviceId, Stream csvStream, string messageCode);
Task<JobStatusResponseDto> GetJobStatusAsync(long serviceId, JobStatusRequestDto request);
Task<JobCancelResponseDto> CancelJobAsync(long serviceId, JobCancelRequestDto request);
}

View File

@ -16,17 +16,20 @@ public class PushService : IPushService
private readonly IPushQueueService _pushQueueService;
private readonly IScheduleCancelStore _scheduleCancelStore;
private readonly IPushSendLogRepository _pushSendLogRepository;
private readonly IBulkJobStore _bulkJobStore;
public PushService(
IMessageRepository messageRepository,
IPushQueueService pushQueueService,
IScheduleCancelStore scheduleCancelStore,
IPushSendLogRepository pushSendLogRepository)
IPushSendLogRepository pushSendLogRepository,
IBulkJobStore bulkJobStore)
{
_messageRepository = messageRepository;
_pushQueueService = pushQueueService;
_scheduleCancelStore = scheduleCancelStore;
_pushSendLogRepository = pushSendLogRepository;
_bulkJobStore = bulkJobStore;
}
public async Task<PushSendResponseDto> SendAsync(long serviceId, PushSendRequestDto request)
@ -263,6 +266,157 @@ public class PushService : IPushService
};
}
public async Task<BulkSendResponseDto> SendBulkAsync(long serviceId, Stream csvStream, string messageCode)
{
var message = await _messageRepository.GetByMessageCodeAndServiceAsync(messageCode, serviceId);
if (message == null)
throw new SpmsException(ErrorCodes.MessageNotFound, "존재하지 않는 메시지 코드입니다.", 404);
var rows = await ParseCsvAsync(csvStream);
if (rows.Count == 0)
throw new SpmsException(ErrorCodes.BadRequest, "CSV 파일에 유효한 데이터가 없습니다.", 400);
var jobId = await _bulkJobStore.CreateJobAsync(serviceId, rows.Count);
foreach (var row in rows)
{
var variables = row.Variables;
var title = ApplyVariables(message.Title, variables);
var body = ApplyVariables(message.Body, variables);
var requestId = Guid.NewGuid().ToString("N");
var pushMessage = new PushMessageDto
{
MessageId = message.Id.ToString(),
RequestId = requestId,
ServiceId = serviceId,
SendType = "single",
Title = title,
Body = body,
ImageUrl = message.ImageUrl,
LinkUrl = message.LinkUrl,
CustomData = ParseCustomData(message.CustomData),
Target = new PushTargetDto
{
Type = "device_ids",
Value = JsonSerializer.SerializeToElement(new[] { row.DeviceId })
},
CreatedBy = message.CreatedBy,
CreatedAt = DateTime.UtcNow.ToString("o"),
JobId = jobId
};
await _pushQueueService.PublishPushMessageAsync(pushMessage);
}
return new BulkSendResponseDto
{
JobId = jobId,
Status = "queued",
TotalCount = rows.Count
};
}
public async Task<JobStatusResponseDto> GetJobStatusAsync(long serviceId, JobStatusRequestDto request)
{
if (string.IsNullOrWhiteSpace(request.JobId))
throw new SpmsException(ErrorCodes.BadRequest, "job_id는 필수입니다.", 400);
var job = await _bulkJobStore.GetJobAsync(request.JobId);
if (job == null)
throw new SpmsException(ErrorCodes.JobNotFound, "존재하지 않는 작업 ID입니다.", 404);
if (job.ServiceId != serviceId)
throw new SpmsException(ErrorCodes.JobNotFound, "존재하지 않는 작업 ID입니다.", 404);
var progress = job.TotalCount > 0
? (int)Math.Round((double)(job.SentCount + job.FailedCount) / job.TotalCount * 100)
: 0;
return new JobStatusResponseDto
{
JobId = job.JobId,
Status = job.Status,
TotalCount = job.TotalCount,
SentCount = job.SentCount,
FailedCount = job.FailedCount,
Progress = progress,
StartedAt = job.StartedAt,
CompletedAt = job.CompletedAt
};
}
public async Task<JobCancelResponseDto> CancelJobAsync(long serviceId, JobCancelRequestDto request)
{
if (string.IsNullOrWhiteSpace(request.JobId))
throw new SpmsException(ErrorCodes.BadRequest, "job_id는 필수입니다.", 400);
var job = await _bulkJobStore.GetJobAsync(request.JobId);
if (job == null)
throw new SpmsException(ErrorCodes.JobNotFound, "존재하지 않는 작업 ID입니다.", 404);
if (job.ServiceId != serviceId)
throw new SpmsException(ErrorCodes.JobNotFound, "존재하지 않는 작업 ID입니다.", 404);
if (job.Status == "completed" || job.Status == "cancelled" || job.Status == "failed")
throw new SpmsException(ErrorCodes.JobAlreadyCompleted, "이미 완료되었거나 취소된 작업입니다.", 400);
var cancelledCount = await _bulkJobStore.CancelAsync(request.JobId);
return new JobCancelResponseDto
{
JobId = request.JobId,
Status = "cancelled",
CancelledCount = cancelledCount
};
}
private static async Task<List<CsvRow>> ParseCsvAsync(Stream stream)
{
var rows = new List<CsvRow>();
using var reader = new StreamReader(stream);
var headerLine = await reader.ReadLineAsync();
if (string.IsNullOrWhiteSpace(headerLine))
return rows;
var headers = headerLine.Split(',').Select(h => h.Trim()).ToArray();
var deviceIdIndex = Array.FindIndex(headers, h => h.Equals("device_id", StringComparison.OrdinalIgnoreCase));
if (deviceIdIndex < 0)
throw new SpmsException(ErrorCodes.BadRequest, "CSV 헤더에 device_id 컬럼이 필요합니다.", 400);
while (await reader.ReadLineAsync() is { } line)
{
if (string.IsNullOrWhiteSpace(line))
continue;
var values = line.Split(',').Select(v => v.Trim()).ToArray();
if (values.Length <= deviceIdIndex)
continue;
if (!long.TryParse(values[deviceIdIndex], out var deviceId))
continue;
var variables = new Dictionary<string, string>();
for (var i = 0; i < headers.Length && i < values.Length; i++)
{
if (i == deviceIdIndex) continue;
variables[headers[i]] = values[i];
}
rows.Add(new CsvRow { DeviceId = deviceId, Variables = variables });
}
return rows;
}
private class CsvRow
{
public long DeviceId { get; init; }
public Dictionary<string, string> Variables { get; init; } = new();
}
private static string ApplyVariables(string template, Dictionary<string, string>? variables)
{
if (variables == null || variables.Count == 0)

View File

@ -42,6 +42,8 @@ public static class ErrorCodes
// === Push (6) ===
public const string PushSendFailed = "161";
public const string PushStateChangeNotAllowed = "162";
public const string JobNotFound = "163";
public const string JobAlreadyCompleted = "164";
// === File (8) ===
public const string FileNotFound = "181";

View File

@ -0,0 +1,152 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using SPMS.Application.DTOs.Push;
using SPMS.Application.Interfaces;
using SPMS.Application.Settings;
using StackExchange.Redis;
namespace SPMS.Infrastructure.Caching;
public class BulkJobStore : IBulkJobStore
{
private static readonly TimeSpan JobTtl = TimeSpan.FromDays(7);
private readonly RedisConnection _redis;
private readonly RedisSettings _settings;
private readonly ILogger<BulkJobStore> _logger;
public BulkJobStore(
RedisConnection redis,
IOptions<RedisSettings> settings,
ILogger<BulkJobStore> logger)
{
_redis = redis;
_settings = settings.Value;
_logger = logger;
}
private string Key(string jobId) => $"{_settings.InstanceName}bulk_job:{jobId}";
public async Task<string> CreateJobAsync(long serviceId, int totalCount, CancellationToken ct = default)
{
var jobId = $"bulk_{DateTime.UtcNow:yyyyMMdd}_{Guid.NewGuid().ToString("N")[..8]}";
var key = Key(jobId);
var db = await _redis.GetDatabaseAsync();
var entries = new HashEntry[]
{
new("status", "queued"),
new("service_id", serviceId),
new("total_count", totalCount),
new("sent_count", 0),
new("failed_count", 0),
new("started_at", ""),
new("completed_at", "")
};
await db.HashSetAsync(key, entries);
await db.KeyExpireAsync(key, JobTtl);
_logger.LogInformation("Bulk job 생성: jobId={JobId}, totalCount={Total}", jobId, totalCount);
return jobId;
}
public async Task<BulkJobInfo?> GetJobAsync(string jobId, CancellationToken ct = default)
{
var db = await _redis.GetDatabaseAsync();
var entries = await db.HashGetAllAsync(Key(jobId));
if (entries.Length == 0)
return null;
var dict = entries.ToDictionary(e => (string)e.Name!, e => (string)e.Value!);
return new BulkJobInfo
{
JobId = jobId,
Status = dict.GetValueOrDefault("status", "unknown"),
ServiceId = long.TryParse(dict.GetValueOrDefault("service_id"), out var sid) ? sid : 0,
TotalCount = int.TryParse(dict.GetValueOrDefault("total_count"), out var tc) ? tc : 0,
SentCount = int.TryParse(dict.GetValueOrDefault("sent_count"), out var sc) ? sc : 0,
FailedCount = int.TryParse(dict.GetValueOrDefault("failed_count"), out var fc) ? fc : 0,
StartedAt = DateTime.TryParse(dict.GetValueOrDefault("started_at"), out var sa) ? sa : null,
CompletedAt = DateTime.TryParse(dict.GetValueOrDefault("completed_at"), out var ca) ? ca : null
};
}
public async Task SetProcessingAsync(string jobId, CancellationToken ct = default)
{
var db = await _redis.GetDatabaseAsync();
var key = Key(jobId);
var currentStatus = (string?)(await db.HashGetAsync(key, "status"));
if (currentStatus == "queued")
{
await db.HashSetAsync(key, [
new HashEntry("status", "processing"),
new HashEntry("started_at", DateTime.UtcNow.ToString("o"))
]);
}
}
public async Task IncrementSentAsync(string jobId, CancellationToken ct = default)
{
var db = await _redis.GetDatabaseAsync();
await db.HashIncrementAsync(Key(jobId), "sent_count");
}
public async Task IncrementFailedAsync(string jobId, CancellationToken ct = default)
{
var db = await _redis.GetDatabaseAsync();
await db.HashIncrementAsync(Key(jobId), "failed_count");
}
public async Task TryCompleteAsync(string jobId, CancellationToken ct = default)
{
var db = await _redis.GetDatabaseAsync();
var key = Key(jobId);
var values = await db.HashGetAsync(key, ["status", "total_count", "sent_count", "failed_count"]);
var status = (string?)values[0];
var total = (int?)values[1] ?? 0;
var sent = (int?)values[2] ?? 0;
var failed = (int?)values[3] ?? 0;
if (status == "processing" && sent + failed >= total)
{
await db.HashSetAsync(key, [
new HashEntry("status", "completed"),
new HashEntry("completed_at", DateTime.UtcNow.ToString("o"))
]);
_logger.LogInformation("Bulk job 완료: jobId={JobId}, sent={Sent}, failed={Failed}",
jobId, sent, failed);
}
}
public async Task<bool> IsCancelledAsync(string jobId, CancellationToken ct = default)
{
var db = await _redis.GetDatabaseAsync();
var status = (string?)(await db.HashGetAsync(Key(jobId), "status"));
return status == "cancelled";
}
public async Task<int> CancelAsync(string jobId, CancellationToken ct = default)
{
var db = await _redis.GetDatabaseAsync();
var key = Key(jobId);
var values = await db.HashGetAsync(key, ["total_count", "sent_count", "failed_count"]);
var total = (int?)values[0] ?? 0;
var sent = (int?)values[1] ?? 0;
var failed = (int?)values[2] ?? 0;
var cancelledCount = Math.Max(0, total - sent - failed);
await db.HashSetAsync(key, [
new HashEntry("status", "cancelled"),
new HashEntry("completed_at", DateTime.UtcNow.ToString("o"))
]);
_logger.LogInformation("Bulk job 취소: jobId={JobId}, cancelled={Cancelled}", jobId, cancelledCount);
return cancelledCount;
}
}

View File

@ -54,6 +54,7 @@ public static class DependencyInjection
services.AddSingleton<RedisConnection>();
services.AddSingleton<IDuplicateChecker, DuplicateChecker>();
services.AddSingleton<IScheduleCancelStore, ScheduleCancelStore>();
services.AddSingleton<IBulkJobStore, BulkJobStore>();
// RabbitMQ
services.Configure<RabbitMQSettings>(configuration.GetSection(RabbitMQSettings.SectionName));

View File

@ -24,6 +24,7 @@ public class PushWorker : BackgroundService
private readonly RabbitMQSettings _rabbitSettings;
private readonly IServiceScopeFactory _scopeFactory;
private readonly IDuplicateChecker _duplicateChecker;
private readonly IBulkJobStore _bulkJobStore;
private readonly IFcmSender _fcmSender;
private readonly IApnsSender _apnsSender;
private readonly ILogger<PushWorker> _logger;
@ -33,6 +34,7 @@ public class PushWorker : BackgroundService
IOptions<RabbitMQSettings> rabbitSettings,
IServiceScopeFactory scopeFactory,
IDuplicateChecker duplicateChecker,
IBulkJobStore bulkJobStore,
IFcmSender fcmSender,
IApnsSender apnsSender,
ILogger<PushWorker> logger)
@ -41,6 +43,7 @@ public class PushWorker : BackgroundService
_rabbitSettings = rabbitSettings.Value;
_scopeFactory = scopeFactory;
_duplicateChecker = duplicateChecker;
_bulkJobStore = bulkJobStore;
_fcmSender = fcmSender;
_apnsSender = apnsSender;
_logger = logger;
@ -115,6 +118,16 @@ public class PushWorker : BackgroundService
return;
}
// [0] Bulk job 취소 체크
if (!string.IsNullOrEmpty(pushMessage.JobId) &&
await _bulkJobStore.IsCancelledAsync(pushMessage.JobId, ct))
{
_logger.LogInformation("Bulk job 취소됨 — 스킵: jobId={JobId}, requestId={RequestId}",
pushMessage.JobId, pushMessage.RequestId);
await channel.BasicAckAsync(ea.DeliveryTag, false, ct);
return;
}
// [1] Redis 중복 체크
if (await _duplicateChecker.IsDuplicateAsync(pushMessage.RequestId, ct))
{
@ -123,6 +136,10 @@ public class PushWorker : BackgroundService
return;
}
// [1.5] Bulk job processing 상태 전환
if (!string.IsNullOrEmpty(pushMessage.JobId))
await _bulkJobStore.SetProcessingAsync(pushMessage.JobId, ct);
using var scope = _scopeFactory.CreateScope();
var serviceRepo = scope.ServiceProvider.GetRequiredService<IServiceRepository>();
var deviceRepo = scope.ServiceProvider.GetRequiredService<IDeviceRepository>();
@ -242,6 +259,17 @@ public class PushWorker : BackgroundService
"푸시 발송 완료: requestId={RequestId}, 성공={Success}, 실패={Fail}, 총={Total}",
pushMessage.RequestId, successCount, failCount, allResults.Count);
// [7] Bulk job 진행률 업데이트
if (!string.IsNullOrEmpty(pushMessage.JobId))
{
if (successCount > 0)
await _bulkJobStore.IncrementSentAsync(pushMessage.JobId, ct);
else
await _bulkJobStore.IncrementFailedAsync(pushMessage.JobId, ct);
await _bulkJobStore.TryCompleteAsync(pushMessage.JobId, ct);
}
// [8] ACK
await channel.BasicAckAsync(ea.DeliveryTag, false, ct);
}