SPMS_API/SPMS.Infrastructure/Messaging/RabbitMQInitializer.cs

92 lines
3.1 KiB
C#

using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using SPMS.Application.Settings;
namespace SPMS.Infrastructure.Messaging;
public class RabbitMQInitializer : IHostedService
{
private readonly RabbitMQConnection _connection;
private readonly RabbitMQSettings _settings;
private readonly ILogger<RabbitMQInitializer> _logger;
public RabbitMQInitializer(
RabbitMQConnection connection,
IOptions<RabbitMQSettings> settings,
ILogger<RabbitMQInitializer> logger)
{
_connection = connection;
_settings = settings.Value;
_logger = logger;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
try
{
await using var channel = await _connection.CreateChannelAsync(cancellationToken);
// Exchange 선언: Direct, Durable
await channel.ExchangeDeclareAsync(
exchange: _settings.Exchange,
type: ExchangeType.Direct,
durable: true,
autoDelete: false,
arguments: null,
cancellationToken: cancellationToken);
_logger.LogInformation("Exchange 선언 완료: {Exchange}", _settings.Exchange);
var queueArgs = new Dictionary<string, object?>
{
{ "x-message-ttl", _settings.MessageTtl }
};
// Push Queue 선언
await channel.QueueDeclareAsync(
queue: _settings.PushQueue,
durable: true,
exclusive: false,
autoDelete: false,
arguments: queueArgs,
cancellationToken: cancellationToken);
await channel.QueueBindAsync(
queue: _settings.PushQueue,
exchange: _settings.Exchange,
routingKey: "push",
cancellationToken: cancellationToken);
_logger.LogInformation("Queue 선언 및 바인딩 완료: {Queue} → {Exchange} (routing_key: push)",
_settings.PushQueue, _settings.Exchange);
// Schedule Queue 선언
await channel.QueueDeclareAsync(
queue: _settings.ScheduleQueue,
durable: true,
exclusive: false,
autoDelete: false,
arguments: queueArgs,
cancellationToken: cancellationToken);
await channel.QueueBindAsync(
queue: _settings.ScheduleQueue,
exchange: _settings.Exchange,
routingKey: "schedule",
cancellationToken: cancellationToken);
_logger.LogInformation("Queue 선언 및 바인딩 완료: {Queue} → {Exchange} (routing_key: schedule)",
_settings.ScheduleQueue, _settings.Exchange);
}
catch (Exception ex)
{
_logger.LogError(ex, "RabbitMQ Exchange/Queue 초기화 실패");
throw;
}
}
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}