using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using SPMS.Application.DTOs.Push; using SPMS.Application.Interfaces; using SPMS.Application.Settings; using StackExchange.Redis; namespace SPMS.Infrastructure.Caching; public class BulkJobStore : IBulkJobStore { private static readonly TimeSpan JobTtl = TimeSpan.FromDays(7); private readonly RedisConnection _redis; private readonly RedisSettings _settings; private readonly ILogger _logger; public BulkJobStore( RedisConnection redis, IOptions settings, ILogger logger) { _redis = redis; _settings = settings.Value; _logger = logger; } private string Key(string jobId) => $"{_settings.InstanceName}bulk_job:{jobId}"; public async Task CreateJobAsync(long serviceId, int totalCount, CancellationToken ct = default) { var jobId = $"bulk_{DateTime.UtcNow:yyyyMMdd}_{Guid.NewGuid().ToString("N")[..8]}"; var key = Key(jobId); var db = await _redis.GetDatabaseAsync(); var entries = new HashEntry[] { new("status", "queued"), new("service_id", serviceId), new("total_count", totalCount), new("sent_count", 0), new("failed_count", 0), new("started_at", ""), new("completed_at", "") }; await db.HashSetAsync(key, entries); await db.KeyExpireAsync(key, JobTtl); _logger.LogInformation("Bulk job 생성: jobId={JobId}, totalCount={Total}", jobId, totalCount); return jobId; } public async Task GetJobAsync(string jobId, CancellationToken ct = default) { var db = await _redis.GetDatabaseAsync(); var entries = await db.HashGetAllAsync(Key(jobId)); if (entries.Length == 0) return null; var dict = entries.ToDictionary(e => (string)e.Name!, e => (string)e.Value!); return new BulkJobInfo { JobId = jobId, Status = dict.GetValueOrDefault("status", "unknown"), ServiceId = long.TryParse(dict.GetValueOrDefault("service_id"), out var sid) ? sid : 0, TotalCount = int.TryParse(dict.GetValueOrDefault("total_count"), out var tc) ? tc : 0, SentCount = int.TryParse(dict.GetValueOrDefault("sent_count"), out var sc) ? sc : 0, FailedCount = int.TryParse(dict.GetValueOrDefault("failed_count"), out var fc) ? fc : 0, StartedAt = DateTime.TryParse(dict.GetValueOrDefault("started_at"), out var sa) ? sa : null, CompletedAt = DateTime.TryParse(dict.GetValueOrDefault("completed_at"), out var ca) ? ca : null }; } public async Task SetProcessingAsync(string jobId, CancellationToken ct = default) { var db = await _redis.GetDatabaseAsync(); var key = Key(jobId); var currentStatus = (string?)(await db.HashGetAsync(key, "status")); if (currentStatus == "queued") { await db.HashSetAsync(key, [ new HashEntry("status", "processing"), new HashEntry("started_at", DateTime.UtcNow.ToString("o")) ]); } } public async Task IncrementSentAsync(string jobId, CancellationToken ct = default) { var db = await _redis.GetDatabaseAsync(); await db.HashIncrementAsync(Key(jobId), "sent_count"); } public async Task IncrementFailedAsync(string jobId, CancellationToken ct = default) { var db = await _redis.GetDatabaseAsync(); await db.HashIncrementAsync(Key(jobId), "failed_count"); } public async Task TryCompleteAsync(string jobId, CancellationToken ct = default) { var db = await _redis.GetDatabaseAsync(); var key = Key(jobId); var values = await db.HashGetAsync(key, ["status", "total_count", "sent_count", "failed_count"]); var status = (string?)values[0]; var total = (int?)values[1] ?? 0; var sent = (int?)values[2] ?? 0; var failed = (int?)values[3] ?? 0; if (status == "processing" && sent + failed >= total) { await db.HashSetAsync(key, [ new HashEntry("status", "completed"), new HashEntry("completed_at", DateTime.UtcNow.ToString("o")) ]); _logger.LogInformation("Bulk job 완료: jobId={JobId}, sent={Sent}, failed={Failed}", jobId, sent, failed); } } public async Task IsCancelledAsync(string jobId, CancellationToken ct = default) { var db = await _redis.GetDatabaseAsync(); var status = (string?)(await db.HashGetAsync(Key(jobId), "status")); return status == "cancelled"; } public async Task CancelAsync(string jobId, CancellationToken ct = default) { var db = await _redis.GetDatabaseAsync(); var key = Key(jobId); var values = await db.HashGetAsync(key, ["total_count", "sent_count", "failed_count"]); var total = (int?)values[0] ?? 0; var sent = (int?)values[1] ?? 0; var failed = (int?)values[2] ?? 0; var cancelledCount = Math.Max(0, total - sent - failed); await db.HashSetAsync(key, [ new HashEntry("status", "cancelled"), new HashEntry("completed_at", DateTime.UtcNow.ToString("o")) ]); _logger.LogInformation("Bulk job 취소: jobId={JobId}, cancelled={Cancelled}", jobId, cancelledCount); return cancelledCount; } }