SPMS_API/SPMS.Infrastructure/Caching/BulkJobStore.cs
seonkyu.kim 830cbf2edc feat: 대용량 발송/상태조회/취소 API 구현 (#130)
- POST /v1/in/push/send/bulk: CSV 대량 발송 (비동기)
- POST /v1/in/push/job/status: Job 상태 조회
- POST /v1/in/push/job/cancel: Job 취소
- BulkJobStore: Redis Hash 기반 Job 상태 관리
- PushWorker: Job 진행률 추적 및 취소 체크

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 22:55:39 +09:00

153 lines
5.5 KiB
C#

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using SPMS.Application.DTOs.Push;
using SPMS.Application.Interfaces;
using SPMS.Application.Settings;
using StackExchange.Redis;
namespace SPMS.Infrastructure.Caching;
public class BulkJobStore : IBulkJobStore
{
private static readonly TimeSpan JobTtl = TimeSpan.FromDays(7);
private readonly RedisConnection _redis;
private readonly RedisSettings _settings;
private readonly ILogger<BulkJobStore> _logger;
public BulkJobStore(
RedisConnection redis,
IOptions<RedisSettings> settings,
ILogger<BulkJobStore> logger)
{
_redis = redis;
_settings = settings.Value;
_logger = logger;
}
private string Key(string jobId) => $"{_settings.InstanceName}bulk_job:{jobId}";
public async Task<string> CreateJobAsync(long serviceId, int totalCount, CancellationToken ct = default)
{
var jobId = $"bulk_{DateTime.UtcNow:yyyyMMdd}_{Guid.NewGuid().ToString("N")[..8]}";
var key = Key(jobId);
var db = await _redis.GetDatabaseAsync();
var entries = new HashEntry[]
{
new("status", "queued"),
new("service_id", serviceId),
new("total_count", totalCount),
new("sent_count", 0),
new("failed_count", 0),
new("started_at", ""),
new("completed_at", "")
};
await db.HashSetAsync(key, entries);
await db.KeyExpireAsync(key, JobTtl);
_logger.LogInformation("Bulk job 생성: jobId={JobId}, totalCount={Total}", jobId, totalCount);
return jobId;
}
public async Task<BulkJobInfo?> GetJobAsync(string jobId, CancellationToken ct = default)
{
var db = await _redis.GetDatabaseAsync();
var entries = await db.HashGetAllAsync(Key(jobId));
if (entries.Length == 0)
return null;
var dict = entries.ToDictionary(e => (string)e.Name!, e => (string)e.Value!);
return new BulkJobInfo
{
JobId = jobId,
Status = dict.GetValueOrDefault("status", "unknown"),
ServiceId = long.TryParse(dict.GetValueOrDefault("service_id"), out var sid) ? sid : 0,
TotalCount = int.TryParse(dict.GetValueOrDefault("total_count"), out var tc) ? tc : 0,
SentCount = int.TryParse(dict.GetValueOrDefault("sent_count"), out var sc) ? sc : 0,
FailedCount = int.TryParse(dict.GetValueOrDefault("failed_count"), out var fc) ? fc : 0,
StartedAt = DateTime.TryParse(dict.GetValueOrDefault("started_at"), out var sa) ? sa : null,
CompletedAt = DateTime.TryParse(dict.GetValueOrDefault("completed_at"), out var ca) ? ca : null
};
}
public async Task SetProcessingAsync(string jobId, CancellationToken ct = default)
{
var db = await _redis.GetDatabaseAsync();
var key = Key(jobId);
var currentStatus = (string?)(await db.HashGetAsync(key, "status"));
if (currentStatus == "queued")
{
await db.HashSetAsync(key, [
new HashEntry("status", "processing"),
new HashEntry("started_at", DateTime.UtcNow.ToString("o"))
]);
}
}
public async Task IncrementSentAsync(string jobId, CancellationToken ct = default)
{
var db = await _redis.GetDatabaseAsync();
await db.HashIncrementAsync(Key(jobId), "sent_count");
}
public async Task IncrementFailedAsync(string jobId, CancellationToken ct = default)
{
var db = await _redis.GetDatabaseAsync();
await db.HashIncrementAsync(Key(jobId), "failed_count");
}
public async Task TryCompleteAsync(string jobId, CancellationToken ct = default)
{
var db = await _redis.GetDatabaseAsync();
var key = Key(jobId);
var values = await db.HashGetAsync(key, ["status", "total_count", "sent_count", "failed_count"]);
var status = (string?)values[0];
var total = (int?)values[1] ?? 0;
var sent = (int?)values[2] ?? 0;
var failed = (int?)values[3] ?? 0;
if (status == "processing" && sent + failed >= total)
{
await db.HashSetAsync(key, [
new HashEntry("status", "completed"),
new HashEntry("completed_at", DateTime.UtcNow.ToString("o"))
]);
_logger.LogInformation("Bulk job 완료: jobId={JobId}, sent={Sent}, failed={Failed}",
jobId, sent, failed);
}
}
public async Task<bool> IsCancelledAsync(string jobId, CancellationToken ct = default)
{
var db = await _redis.GetDatabaseAsync();
var status = (string?)(await db.HashGetAsync(Key(jobId), "status"));
return status == "cancelled";
}
public async Task<int> CancelAsync(string jobId, CancellationToken ct = default)
{
var db = await _redis.GetDatabaseAsync();
var key = Key(jobId);
var values = await db.HashGetAsync(key, ["total_count", "sent_count", "failed_count"]);
var total = (int?)values[0] ?? 0;
var sent = (int?)values[1] ?? 0;
var failed = (int?)values[2] ?? 0;
var cancelledCount = Math.Max(0, total - sent - failed);
await db.HashSetAsync(key, [
new HashEntry("status", "cancelled"),
new HashEntry("completed_at", DateTime.UtcNow.ToString("o"))
]);
_logger.LogInformation("Bulk job 취소: jobId={JobId}, cancelled={Cancelled}", jobId, cancelledCount);
return cancelledCount;
}
}