SPMS_API/SPMS.Infrastructure/Messaging/PushQueueService.cs

67 lines
2.4 KiB
C#

using System.Text.Json;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using SPMS.Application.DTOs.Push;
using SPMS.Application.Interfaces;
using SPMS.Application.Settings;
namespace SPMS.Infrastructure.Messaging;
public class PushQueueService : IPushQueueService
{
private readonly RabbitMQConnection _connection;
private readonly RabbitMQSettings _settings;
private readonly ILogger<PushQueueService> _logger;
public PushQueueService(
RabbitMQConnection connection,
IOptions<RabbitMQSettings> settings,
ILogger<PushQueueService> logger)
{
_connection = connection;
_settings = settings.Value;
_logger = logger;
}
public async Task PublishPushMessageAsync(PushMessageDto message, CancellationToken cancellationToken = default)
{
await PublishAsync(_settings.Exchange, "push", message.RequestId, message, cancellationToken);
_logger.LogInformation(
"Push 메시지 발행: request_id={RequestId}, send_type={SendType}, service_id={ServiceId}",
message.RequestId, message.SendType, message.ServiceId);
}
public async Task PublishScheduleMessageAsync(ScheduleMessageDto message, CancellationToken cancellationToken = default)
{
await PublishAsync(_settings.Exchange, "schedule", message.PushMessage.RequestId, message, cancellationToken);
_logger.LogInformation(
"Schedule 메시지 발행: schedule_id={ScheduleId}, scheduled_at={ScheduledAt}, service_id={ServiceId}",
message.ScheduleId, message.ScheduledAt, message.ServiceId);
}
private async Task PublishAsync<T>(string exchange, string routingKey, string messageId, T message,
CancellationToken cancellationToken)
{
await using var channel = await _connection.CreateChannelAsync(cancellationToken);
var body = JsonSerializer.SerializeToUtf8Bytes(message);
var properties = new BasicProperties
{
Persistent = true,
ContentType = "application/json",
MessageId = messageId,
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds())
};
await channel.BasicPublishAsync(
exchange: exchange,
routingKey: routingKey,
mandatory: false,
basicProperties: properties,
body: body,
cancellationToken: cancellationToken);
}
}