67 lines
2.4 KiB
C#
67 lines
2.4 KiB
C#
using System.Text.Json;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using RabbitMQ.Client;
|
|
using SPMS.Application.DTOs.Push;
|
|
using SPMS.Application.Interfaces;
|
|
using SPMS.Application.Settings;
|
|
|
|
namespace SPMS.Infrastructure.Messaging;
|
|
|
|
public class PushQueueService : IPushQueueService
|
|
{
|
|
private readonly RabbitMQConnection _connection;
|
|
private readonly RabbitMQSettings _settings;
|
|
private readonly ILogger<PushQueueService> _logger;
|
|
|
|
public PushQueueService(
|
|
RabbitMQConnection connection,
|
|
IOptions<RabbitMQSettings> settings,
|
|
ILogger<PushQueueService> logger)
|
|
{
|
|
_connection = connection;
|
|
_settings = settings.Value;
|
|
_logger = logger;
|
|
}
|
|
|
|
public async Task PublishPushMessageAsync(PushMessageDto message, CancellationToken cancellationToken = default)
|
|
{
|
|
await PublishAsync(_settings.Exchange, "push", message.RequestId, message, cancellationToken);
|
|
_logger.LogInformation(
|
|
"Push 메시지 발행: request_id={RequestId}, send_type={SendType}, service_id={ServiceId}",
|
|
message.RequestId, message.SendType, message.ServiceId);
|
|
}
|
|
|
|
public async Task PublishScheduleMessageAsync(ScheduleMessageDto message, CancellationToken cancellationToken = default)
|
|
{
|
|
await PublishAsync(_settings.Exchange, "schedule", message.PushMessage.RequestId, message, cancellationToken);
|
|
_logger.LogInformation(
|
|
"Schedule 메시지 발행: schedule_id={ScheduleId}, scheduled_at={ScheduledAt}, service_id={ServiceId}",
|
|
message.ScheduleId, message.ScheduledAt, message.ServiceId);
|
|
}
|
|
|
|
private async Task PublishAsync<T>(string exchange, string routingKey, string messageId, T message,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
await using var channel = await _connection.CreateChannelAsync(cancellationToken);
|
|
|
|
var body = JsonSerializer.SerializeToUtf8Bytes(message);
|
|
|
|
var properties = new BasicProperties
|
|
{
|
|
Persistent = true,
|
|
ContentType = "application/json",
|
|
MessageId = messageId,
|
|
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds())
|
|
};
|
|
|
|
await channel.BasicPublishAsync(
|
|
exchange: exchange,
|
|
routingKey: routingKey,
|
|
mandatory: false,
|
|
basicProperties: properties,
|
|
body: body,
|
|
cancellationToken: cancellationToken);
|
|
}
|
|
}
|