feat: RabbitMQ 인프라 설정 (Exchange/Queue) (#102) #103

Merged
seonkyu.kim merged 1 commits from feature/#102-rabbitmq-infrastructure into develop 2026-02-10 06:39:34 +00:00
11 changed files with 355 additions and 1 deletions

View File

@ -15,7 +15,12 @@
"Port": 0, "Port": 0,
"UserName": "", "UserName": "",
"Password": "", "Password": "",
"VirtualHost": "/" "VirtualHost": "/",
"Exchange": "spms.push.exchange",
"PushQueue": "spms.push.queue",
"ScheduleQueue": "spms.schedule.queue",
"PrefetchCount": 10,
"MessageTtl": 86400000
}, },
"CredentialEncryption": { "CredentialEncryption": {
"Key": "" "Key": ""

View File

@ -0,0 +1,42 @@
using System.Text.Json.Serialization;
namespace SPMS.Application.DTOs.Push;
public class PushMessageDto
{
[JsonPropertyName("message_id")]
public string MessageId { get; set; } = string.Empty;
[JsonPropertyName("request_id")]
public string RequestId { get; set; } = string.Empty;
[JsonPropertyName("service_id")]
public long ServiceId { get; set; }
[JsonPropertyName("send_type")]
public string SendType { get; set; } = string.Empty;
[JsonPropertyName("title")]
public string Title { get; set; } = string.Empty;
[JsonPropertyName("body")]
public string Body { get; set; } = string.Empty;
[JsonPropertyName("image_url")]
public string? ImageUrl { get; set; }
[JsonPropertyName("link_url")]
public string? LinkUrl { get; set; }
[JsonPropertyName("custom_data")]
public Dictionary<string, object>? CustomData { get; set; }
[JsonPropertyName("target")]
public PushTargetDto Target { get; set; } = new();
[JsonPropertyName("created_by")]
public long CreatedBy { get; set; }
[JsonPropertyName("created_at")]
public string CreatedAt { get; set; } = string.Empty;
}

View File

@ -0,0 +1,13 @@
using System.Text.Json;
using System.Text.Json.Serialization;
namespace SPMS.Application.DTOs.Push;
public class PushTargetDto
{
[JsonPropertyName("type")]
public string Type { get; set; } = string.Empty;
[JsonPropertyName("value")]
public JsonElement? Value { get; set; }
}

View File

@ -0,0 +1,21 @@
using System.Text.Json.Serialization;
namespace SPMS.Application.DTOs.Push;
public class ScheduleMessageDto
{
[JsonPropertyName("schedule_id")]
public string ScheduleId { get; set; } = string.Empty;
[JsonPropertyName("message_id")]
public string MessageId { get; set; } = string.Empty;
[JsonPropertyName("service_id")]
public long ServiceId { get; set; }
[JsonPropertyName("scheduled_at")]
public string ScheduledAt { get; set; } = string.Empty;
[JsonPropertyName("push_message")]
public PushMessageDto PushMessage { get; set; } = new();
}

View File

@ -0,0 +1,9 @@
using SPMS.Application.DTOs.Push;
namespace SPMS.Application.Interfaces;
public interface IPushQueueService
{
Task PublishPushMessageAsync(PushMessageDto message, CancellationToken cancellationToken = default);
Task PublishScheduleMessageAsync(ScheduleMessageDto message, CancellationToken cancellationToken = default);
}

View File

@ -0,0 +1,17 @@
namespace SPMS.Application.Settings;
public class RabbitMQSettings
{
public const string SectionName = "RabbitMQ";
public string HostName { get; set; } = string.Empty;
public int Port { get; set; } = 5672;
public string UserName { get; set; } = string.Empty;
public string Password { get; set; } = string.Empty;
public string VirtualHost { get; set; } = "/";
public string Exchange { get; set; } = "spms.push.exchange";
public string PushQueue { get; set; } = "spms.push.queue";
public string ScheduleQueue { get; set; } = "spms.schedule.queue";
public ushort PrefetchCount { get; set; } = 10;
public int MessageTtl { get; set; } = 86400000;
}

View File

@ -2,8 +2,10 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using SPMS.Application.Interfaces; using SPMS.Application.Interfaces;
using SPMS.Application.Settings;
using SPMS.Domain.Interfaces; using SPMS.Domain.Interfaces;
using SPMS.Infrastructure.Auth; using SPMS.Infrastructure.Auth;
using SPMS.Infrastructure.Messaging;
using SPMS.Infrastructure.Persistence; using SPMS.Infrastructure.Persistence;
using SPMS.Infrastructure.Persistence.Repositories; using SPMS.Infrastructure.Persistence.Repositories;
using SPMS.Infrastructure.Security; using SPMS.Infrastructure.Security;
@ -43,6 +45,12 @@ public static class DependencyInjection
// File Storage // File Storage
services.AddSingleton<IFileStorageService, LocalFileStorageService>(); services.AddSingleton<IFileStorageService, LocalFileStorageService>();
// RabbitMQ
services.Configure<RabbitMQSettings>(configuration.GetSection(RabbitMQSettings.SectionName));
services.AddSingleton<RabbitMQConnection>();
services.AddHostedService<RabbitMQInitializer>();
services.AddScoped<IPushQueueService, PushQueueService>();
// Token Store & Email Service // Token Store & Email Service
services.AddMemoryCache(); services.AddMemoryCache();
services.AddSingleton<ITokenStore, InMemoryTokenStore>(); services.AddSingleton<ITokenStore, InMemoryTokenStore>();

View File

@ -0,0 +1,66 @@
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using SPMS.Application.DTOs.Push;
using SPMS.Application.Interfaces;
using SPMS.Application.Settings;
namespace SPMS.Infrastructure.Messaging;
public class PushQueueService : IPushQueueService
{
private readonly RabbitMQConnection _connection;
private readonly RabbitMQSettings _settings;
private readonly ILogger<PushQueueService> _logger;
public PushQueueService(
RabbitMQConnection connection,
IOptions<RabbitMQSettings> settings,
ILogger<PushQueueService> logger)
{
_connection = connection;
_settings = settings.Value;
_logger = logger;
}
public async Task PublishPushMessageAsync(PushMessageDto message, CancellationToken cancellationToken = default)
{
await PublishAsync(_settings.Exchange, "push", message.RequestId, message, cancellationToken);
_logger.LogInformation(
"Push 메시지 발행: request_id={RequestId}, send_type={SendType}, service_id={ServiceId}",
message.RequestId, message.SendType, message.ServiceId);
}
public async Task PublishScheduleMessageAsync(ScheduleMessageDto message, CancellationToken cancellationToken = default)
{
await PublishAsync(_settings.Exchange, "schedule", message.PushMessage.RequestId, message, cancellationToken);
_logger.LogInformation(
"Schedule 메시지 발행: schedule_id={ScheduleId}, scheduled_at={ScheduledAt}, service_id={ServiceId}",
message.ScheduleId, message.ScheduledAt, message.ServiceId);
}
private async Task PublishAsync<T>(string exchange, string routingKey, string messageId, T message,
CancellationToken cancellationToken)
{
await using var channel = await _connection.CreateChannelAsync(cancellationToken);
var body = JsonSerializer.SerializeToUtf8Bytes(message);
var properties = new BasicProperties
{
Persistent = true,
ContentType = "application/json",
MessageId = messageId,
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds())
};
await channel.BasicPublishAsync(
exchange: exchange,
routingKey: routingKey,
mandatory: false,
basicProperties: properties,
body: body,
cancellationToken: cancellationToken);
}
}

View File

@ -0,0 +1,79 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using SPMS.Application.Settings;
namespace SPMS.Infrastructure.Messaging;
public class RabbitMQConnection : IAsyncDisposable
{
private readonly RabbitMQSettings _settings;
private readonly ILogger<RabbitMQConnection> _logger;
private readonly SemaphoreSlim _semaphore = new(1, 1);
private IConnection? _connection;
public RabbitMQConnection(
IOptions<RabbitMQSettings> settings,
ILogger<RabbitMQConnection> logger)
{
_settings = settings.Value;
_logger = logger;
}
public async Task<IConnection> GetConnectionAsync(CancellationToken cancellationToken = default)
{
if (_connection is { IsOpen: true })
return _connection;
await _semaphore.WaitAsync(cancellationToken);
try
{
if (_connection is { IsOpen: true })
return _connection;
var factory = new ConnectionFactory
{
HostName = _settings.HostName,
Port = _settings.Port,
UserName = _settings.UserName,
Password = _settings.Password,
VirtualHost = _settings.VirtualHost,
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(10),
RequestedHeartbeat = TimeSpan.FromSeconds(60)
};
_connection = await factory.CreateConnectionAsync(cancellationToken);
_logger.LogInformation("RabbitMQ 연결 성공: {HostName}:{Port}", _settings.HostName, _settings.Port);
return _connection;
}
catch (Exception ex)
{
_logger.LogError(ex, "RabbitMQ 연결 실패: {HostName}:{Port}", _settings.HostName, _settings.Port);
throw;
}
finally
{
_semaphore.Release();
}
}
public async Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default)
{
var connection = await GetConnectionAsync(cancellationToken);
return await connection.CreateChannelAsync(cancellationToken: cancellationToken);
}
public async ValueTask DisposeAsync()
{
if (_connection is not null)
{
await _connection.CloseAsync();
_connection.Dispose();
_connection = null;
}
_semaphore.Dispose();
}
}

View File

@ -0,0 +1,91 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using SPMS.Application.Settings;
namespace SPMS.Infrastructure.Messaging;
public class RabbitMQInitializer : IHostedService
{
private readonly RabbitMQConnection _connection;
private readonly RabbitMQSettings _settings;
private readonly ILogger<RabbitMQInitializer> _logger;
public RabbitMQInitializer(
RabbitMQConnection connection,
IOptions<RabbitMQSettings> settings,
ILogger<RabbitMQInitializer> logger)
{
_connection = connection;
_settings = settings.Value;
_logger = logger;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
try
{
await using var channel = await _connection.CreateChannelAsync(cancellationToken);
// Exchange 선언: Direct, Durable
await channel.ExchangeDeclareAsync(
exchange: _settings.Exchange,
type: ExchangeType.Direct,
durable: true,
autoDelete: false,
arguments: null,
cancellationToken: cancellationToken);
_logger.LogInformation("Exchange 선언 완료: {Exchange}", _settings.Exchange);
var queueArgs = new Dictionary<string, object?>
{
{ "x-message-ttl", _settings.MessageTtl }
};
// Push Queue 선언
await channel.QueueDeclareAsync(
queue: _settings.PushQueue,
durable: true,
exclusive: false,
autoDelete: false,
arguments: queueArgs,
cancellationToken: cancellationToken);
await channel.QueueBindAsync(
queue: _settings.PushQueue,
exchange: _settings.Exchange,
routingKey: "push",
cancellationToken: cancellationToken);
_logger.LogInformation("Queue 선언 및 바인딩 완료: {Queue} → {Exchange} (routing_key: push)",
_settings.PushQueue, _settings.Exchange);
// Schedule Queue 선언
await channel.QueueDeclareAsync(
queue: _settings.ScheduleQueue,
durable: true,
exclusive: false,
autoDelete: false,
arguments: queueArgs,
cancellationToken: cancellationToken);
await channel.QueueBindAsync(
queue: _settings.ScheduleQueue,
exchange: _settings.Exchange,
routingKey: "schedule",
cancellationToken: cancellationToken);
_logger.LogInformation("Queue 선언 및 바인딩 완료: {Queue} → {Exchange} (routing_key: schedule)",
_settings.ScheduleQueue, _settings.Exchange);
}
catch (Exception ex)
{
_logger.LogError(ex, "RabbitMQ Exchange/Queue 초기화 실패");
throw;
}
}
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}

View File

@ -15,7 +15,10 @@
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.0" /> <PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="10.0.2" /> <PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="10.0.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="10.0.2" /> <PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="10.0.2" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="10.0.2" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="10.0.2" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="9.0.0" /> <PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="9.0.0" />
<PackageReference Include="RabbitMQ.Client" Version="7.2.0" />
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="8.15.0" /> <PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="8.15.0" />
</ItemGroup> </ItemGroup>