SPMS_API/SPMS.Infrastructure/Messaging/RabbitMQConnection.cs

80 lines
2.4 KiB
C#

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using SPMS.Application.Settings;
namespace SPMS.Infrastructure.Messaging;
public class RabbitMQConnection : IAsyncDisposable
{
private readonly RabbitMQSettings _settings;
private readonly ILogger<RabbitMQConnection> _logger;
private readonly SemaphoreSlim _semaphore = new(1, 1);
private IConnection? _connection;
public RabbitMQConnection(
IOptions<RabbitMQSettings> settings,
ILogger<RabbitMQConnection> logger)
{
_settings = settings.Value;
_logger = logger;
}
public async Task<IConnection> GetConnectionAsync(CancellationToken cancellationToken = default)
{
if (_connection is { IsOpen: true })
return _connection;
await _semaphore.WaitAsync(cancellationToken);
try
{
if (_connection is { IsOpen: true })
return _connection;
var factory = new ConnectionFactory
{
HostName = _settings.HostName,
Port = _settings.Port,
UserName = _settings.UserName,
Password = _settings.Password,
VirtualHost = _settings.VirtualHost,
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(10),
RequestedHeartbeat = TimeSpan.FromSeconds(60)
};
_connection = await factory.CreateConnectionAsync(cancellationToken);
_logger.LogInformation("RabbitMQ 연결 성공: {HostName}:{Port}", _settings.HostName, _settings.Port);
return _connection;
}
catch (Exception ex)
{
_logger.LogError(ex, "RabbitMQ 연결 실패: {HostName}:{Port}", _settings.HostName, _settings.Port);
throw;
}
finally
{
_semaphore.Release();
}
}
public async Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default)
{
var connection = await GetConnectionAsync(cancellationToken);
return await connection.CreateChannelAsync(cancellationToken: cancellationToken);
}
public async ValueTask DisposeAsync()
{
if (_connection is not null)
{
await _connection.CloseAsync();
_connection.Dispose();
_connection = null;
}
_semaphore.Dispose();
}
}