- POST /v1/in/push/send/bulk: CSV 대량 발송 (비동기) - POST /v1/in/push/job/status: Job 상태 조회 - POST /v1/in/push/job/cancel: Job 취소 - BulkJobStore: Redis Hash 기반 Job 상태 관리 - PushWorker: Job 진행률 추적 및 취소 체크 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
153 lines
5.5 KiB
C#
153 lines
5.5 KiB
C#
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using SPMS.Application.DTOs.Push;
|
|
using SPMS.Application.Interfaces;
|
|
using SPMS.Application.Settings;
|
|
using StackExchange.Redis;
|
|
|
|
namespace SPMS.Infrastructure.Caching;
|
|
|
|
public class BulkJobStore : IBulkJobStore
|
|
{
|
|
private static readonly TimeSpan JobTtl = TimeSpan.FromDays(7);
|
|
|
|
private readonly RedisConnection _redis;
|
|
private readonly RedisSettings _settings;
|
|
private readonly ILogger<BulkJobStore> _logger;
|
|
|
|
public BulkJobStore(
|
|
RedisConnection redis,
|
|
IOptions<RedisSettings> settings,
|
|
ILogger<BulkJobStore> logger)
|
|
{
|
|
_redis = redis;
|
|
_settings = settings.Value;
|
|
_logger = logger;
|
|
}
|
|
|
|
private string Key(string jobId) => $"{_settings.InstanceName}bulk_job:{jobId}";
|
|
|
|
public async Task<string> CreateJobAsync(long serviceId, int totalCount, CancellationToken ct = default)
|
|
{
|
|
var jobId = $"bulk_{DateTime.UtcNow:yyyyMMdd}_{Guid.NewGuid().ToString("N")[..8]}";
|
|
var key = Key(jobId);
|
|
|
|
var db = await _redis.GetDatabaseAsync();
|
|
var entries = new HashEntry[]
|
|
{
|
|
new("status", "queued"),
|
|
new("service_id", serviceId),
|
|
new("total_count", totalCount),
|
|
new("sent_count", 0),
|
|
new("failed_count", 0),
|
|
new("started_at", ""),
|
|
new("completed_at", "")
|
|
};
|
|
|
|
await db.HashSetAsync(key, entries);
|
|
await db.KeyExpireAsync(key, JobTtl);
|
|
|
|
_logger.LogInformation("Bulk job 생성: jobId={JobId}, totalCount={Total}", jobId, totalCount);
|
|
return jobId;
|
|
}
|
|
|
|
public async Task<BulkJobInfo?> GetJobAsync(string jobId, CancellationToken ct = default)
|
|
{
|
|
var db = await _redis.GetDatabaseAsync();
|
|
var entries = await db.HashGetAllAsync(Key(jobId));
|
|
if (entries.Length == 0)
|
|
return null;
|
|
|
|
var dict = entries.ToDictionary(e => (string)e.Name!, e => (string)e.Value!);
|
|
|
|
return new BulkJobInfo
|
|
{
|
|
JobId = jobId,
|
|
Status = dict.GetValueOrDefault("status", "unknown"),
|
|
ServiceId = long.TryParse(dict.GetValueOrDefault("service_id"), out var sid) ? sid : 0,
|
|
TotalCount = int.TryParse(dict.GetValueOrDefault("total_count"), out var tc) ? tc : 0,
|
|
SentCount = int.TryParse(dict.GetValueOrDefault("sent_count"), out var sc) ? sc : 0,
|
|
FailedCount = int.TryParse(dict.GetValueOrDefault("failed_count"), out var fc) ? fc : 0,
|
|
StartedAt = DateTime.TryParse(dict.GetValueOrDefault("started_at"), out var sa) ? sa : null,
|
|
CompletedAt = DateTime.TryParse(dict.GetValueOrDefault("completed_at"), out var ca) ? ca : null
|
|
};
|
|
}
|
|
|
|
public async Task SetProcessingAsync(string jobId, CancellationToken ct = default)
|
|
{
|
|
var db = await _redis.GetDatabaseAsync();
|
|
var key = Key(jobId);
|
|
|
|
var currentStatus = (string?)(await db.HashGetAsync(key, "status"));
|
|
if (currentStatus == "queued")
|
|
{
|
|
await db.HashSetAsync(key, [
|
|
new HashEntry("status", "processing"),
|
|
new HashEntry("started_at", DateTime.UtcNow.ToString("o"))
|
|
]);
|
|
}
|
|
}
|
|
|
|
public async Task IncrementSentAsync(string jobId, CancellationToken ct = default)
|
|
{
|
|
var db = await _redis.GetDatabaseAsync();
|
|
await db.HashIncrementAsync(Key(jobId), "sent_count");
|
|
}
|
|
|
|
public async Task IncrementFailedAsync(string jobId, CancellationToken ct = default)
|
|
{
|
|
var db = await _redis.GetDatabaseAsync();
|
|
await db.HashIncrementAsync(Key(jobId), "failed_count");
|
|
}
|
|
|
|
public async Task TryCompleteAsync(string jobId, CancellationToken ct = default)
|
|
{
|
|
var db = await _redis.GetDatabaseAsync();
|
|
var key = Key(jobId);
|
|
|
|
var values = await db.HashGetAsync(key, ["status", "total_count", "sent_count", "failed_count"]);
|
|
var status = (string?)values[0];
|
|
var total = (int?)values[1] ?? 0;
|
|
var sent = (int?)values[2] ?? 0;
|
|
var failed = (int?)values[3] ?? 0;
|
|
|
|
if (status == "processing" && sent + failed >= total)
|
|
{
|
|
await db.HashSetAsync(key, [
|
|
new HashEntry("status", "completed"),
|
|
new HashEntry("completed_at", DateTime.UtcNow.ToString("o"))
|
|
]);
|
|
_logger.LogInformation("Bulk job 완료: jobId={JobId}, sent={Sent}, failed={Failed}",
|
|
jobId, sent, failed);
|
|
}
|
|
}
|
|
|
|
public async Task<bool> IsCancelledAsync(string jobId, CancellationToken ct = default)
|
|
{
|
|
var db = await _redis.GetDatabaseAsync();
|
|
var status = (string?)(await db.HashGetAsync(Key(jobId), "status"));
|
|
return status == "cancelled";
|
|
}
|
|
|
|
public async Task<int> CancelAsync(string jobId, CancellationToken ct = default)
|
|
{
|
|
var db = await _redis.GetDatabaseAsync();
|
|
var key = Key(jobId);
|
|
|
|
var values = await db.HashGetAsync(key, ["total_count", "sent_count", "failed_count"]);
|
|
var total = (int?)values[0] ?? 0;
|
|
var sent = (int?)values[1] ?? 0;
|
|
var failed = (int?)values[2] ?? 0;
|
|
|
|
var cancelledCount = Math.Max(0, total - sent - failed);
|
|
|
|
await db.HashSetAsync(key, [
|
|
new HashEntry("status", "cancelled"),
|
|
new HashEntry("completed_at", DateTime.UtcNow.ToString("o"))
|
|
]);
|
|
|
|
_logger.LogInformation("Bulk job 취소: jobId={JobId}, cancelled={Cancelled}", jobId, cancelledCount);
|
|
return cancelledCount;
|
|
}
|
|
}
|