From 4f38e31710042c4e7748488c6c10774aef49dd4e Mon Sep 17 00:00:00 2001 From: SEAN Date: Tue, 10 Feb 2026 15:34:15 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20RabbitMQ=20=EC=9D=B8=ED=94=84=EB=9D=BC?= =?UTF-8?q?=20=EC=84=A4=EC=A0=95=20(Exchange/Queue)=20(#102)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- SPMS.API/appsettings.json | 7 +- SPMS.Application/DTOs/Push/PushMessageDto.cs | 42 +++++++++ SPMS.Application/DTOs/Push/PushTargetDto.cs | 13 +++ .../DTOs/Push/ScheduleMessageDto.cs | 21 +++++ .../Interfaces/IPushQueueService.cs | 9 ++ SPMS.Application/Settings/RabbitMQSettings.cs | 17 ++++ SPMS.Infrastructure/DependencyInjection.cs | 8 ++ .../Messaging/PushQueueService.cs | 66 ++++++++++++++ .../Messaging/RabbitMQConnection.cs | 79 ++++++++++++++++ .../Messaging/RabbitMQInitializer.cs | 91 +++++++++++++++++++ .../SPMS.Infrastructure.csproj | 3 + 11 files changed, 355 insertions(+), 1 deletion(-) create mode 100644 SPMS.Application/DTOs/Push/PushMessageDto.cs create mode 100644 SPMS.Application/DTOs/Push/PushTargetDto.cs create mode 100644 SPMS.Application/DTOs/Push/ScheduleMessageDto.cs create mode 100644 SPMS.Application/Interfaces/IPushQueueService.cs create mode 100644 SPMS.Application/Settings/RabbitMQSettings.cs create mode 100644 SPMS.Infrastructure/Messaging/PushQueueService.cs create mode 100644 SPMS.Infrastructure/Messaging/RabbitMQConnection.cs create mode 100644 SPMS.Infrastructure/Messaging/RabbitMQInitializer.cs diff --git a/SPMS.API/appsettings.json b/SPMS.API/appsettings.json index 577b74d..d5a32b5 100644 --- a/SPMS.API/appsettings.json +++ b/SPMS.API/appsettings.json @@ -15,7 +15,12 @@ "Port": 0, "UserName": "", "Password": "", - "VirtualHost": "/" + "VirtualHost": "/", + "Exchange": "spms.push.exchange", + "PushQueue": "spms.push.queue", + "ScheduleQueue": "spms.schedule.queue", + "PrefetchCount": 10, + "MessageTtl": 86400000 }, "CredentialEncryption": { "Key": "" diff --git a/SPMS.Application/DTOs/Push/PushMessageDto.cs b/SPMS.Application/DTOs/Push/PushMessageDto.cs new file mode 100644 index 0000000..379949a --- /dev/null +++ b/SPMS.Application/DTOs/Push/PushMessageDto.cs @@ -0,0 +1,42 @@ +using System.Text.Json.Serialization; + +namespace SPMS.Application.DTOs.Push; + +public class PushMessageDto +{ + [JsonPropertyName("message_id")] + public string MessageId { get; set; } = string.Empty; + + [JsonPropertyName("request_id")] + public string RequestId { get; set; } = string.Empty; + + [JsonPropertyName("service_id")] + public long ServiceId { get; set; } + + [JsonPropertyName("send_type")] + public string SendType { get; set; } = string.Empty; + + [JsonPropertyName("title")] + public string Title { get; set; } = string.Empty; + + [JsonPropertyName("body")] + public string Body { get; set; } = string.Empty; + + [JsonPropertyName("image_url")] + public string? ImageUrl { get; set; } + + [JsonPropertyName("link_url")] + public string? LinkUrl { get; set; } + + [JsonPropertyName("custom_data")] + public Dictionary? CustomData { get; set; } + + [JsonPropertyName("target")] + public PushTargetDto Target { get; set; } = new(); + + [JsonPropertyName("created_by")] + public long CreatedBy { get; set; } + + [JsonPropertyName("created_at")] + public string CreatedAt { get; set; } = string.Empty; +} diff --git a/SPMS.Application/DTOs/Push/PushTargetDto.cs b/SPMS.Application/DTOs/Push/PushTargetDto.cs new file mode 100644 index 0000000..e865a0f --- /dev/null +++ b/SPMS.Application/DTOs/Push/PushTargetDto.cs @@ -0,0 +1,13 @@ +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace SPMS.Application.DTOs.Push; + +public class PushTargetDto +{ + [JsonPropertyName("type")] + public string Type { get; set; } = string.Empty; + + [JsonPropertyName("value")] + public JsonElement? Value { get; set; } +} diff --git a/SPMS.Application/DTOs/Push/ScheduleMessageDto.cs b/SPMS.Application/DTOs/Push/ScheduleMessageDto.cs new file mode 100644 index 0000000..65cb610 --- /dev/null +++ b/SPMS.Application/DTOs/Push/ScheduleMessageDto.cs @@ -0,0 +1,21 @@ +using System.Text.Json.Serialization; + +namespace SPMS.Application.DTOs.Push; + +public class ScheduleMessageDto +{ + [JsonPropertyName("schedule_id")] + public string ScheduleId { get; set; } = string.Empty; + + [JsonPropertyName("message_id")] + public string MessageId { get; set; } = string.Empty; + + [JsonPropertyName("service_id")] + public long ServiceId { get; set; } + + [JsonPropertyName("scheduled_at")] + public string ScheduledAt { get; set; } = string.Empty; + + [JsonPropertyName("push_message")] + public PushMessageDto PushMessage { get; set; } = new(); +} diff --git a/SPMS.Application/Interfaces/IPushQueueService.cs b/SPMS.Application/Interfaces/IPushQueueService.cs new file mode 100644 index 0000000..b633229 --- /dev/null +++ b/SPMS.Application/Interfaces/IPushQueueService.cs @@ -0,0 +1,9 @@ +using SPMS.Application.DTOs.Push; + +namespace SPMS.Application.Interfaces; + +public interface IPushQueueService +{ + Task PublishPushMessageAsync(PushMessageDto message, CancellationToken cancellationToken = default); + Task PublishScheduleMessageAsync(ScheduleMessageDto message, CancellationToken cancellationToken = default); +} diff --git a/SPMS.Application/Settings/RabbitMQSettings.cs b/SPMS.Application/Settings/RabbitMQSettings.cs new file mode 100644 index 0000000..50da0b0 --- /dev/null +++ b/SPMS.Application/Settings/RabbitMQSettings.cs @@ -0,0 +1,17 @@ +namespace SPMS.Application.Settings; + +public class RabbitMQSettings +{ + public const string SectionName = "RabbitMQ"; + + public string HostName { get; set; } = string.Empty; + public int Port { get; set; } = 5672; + public string UserName { get; set; } = string.Empty; + public string Password { get; set; } = string.Empty; + public string VirtualHost { get; set; } = "/"; + public string Exchange { get; set; } = "spms.push.exchange"; + public string PushQueue { get; set; } = "spms.push.queue"; + public string ScheduleQueue { get; set; } = "spms.schedule.queue"; + public ushort PrefetchCount { get; set; } = 10; + public int MessageTtl { get; set; } = 86400000; +} diff --git a/SPMS.Infrastructure/DependencyInjection.cs b/SPMS.Infrastructure/DependencyInjection.cs index ba1c68c..b877f58 100644 --- a/SPMS.Infrastructure/DependencyInjection.cs +++ b/SPMS.Infrastructure/DependencyInjection.cs @@ -2,8 +2,10 @@ using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using SPMS.Application.Interfaces; +using SPMS.Application.Settings; using SPMS.Domain.Interfaces; using SPMS.Infrastructure.Auth; +using SPMS.Infrastructure.Messaging; using SPMS.Infrastructure.Persistence; using SPMS.Infrastructure.Persistence.Repositories; using SPMS.Infrastructure.Security; @@ -43,6 +45,12 @@ public static class DependencyInjection // File Storage services.AddSingleton(); + // RabbitMQ + services.Configure(configuration.GetSection(RabbitMQSettings.SectionName)); + services.AddSingleton(); + services.AddHostedService(); + services.AddScoped(); + // Token Store & Email Service services.AddMemoryCache(); services.AddSingleton(); diff --git a/SPMS.Infrastructure/Messaging/PushQueueService.cs b/SPMS.Infrastructure/Messaging/PushQueueService.cs new file mode 100644 index 0000000..2578305 --- /dev/null +++ b/SPMS.Infrastructure/Messaging/PushQueueService.cs @@ -0,0 +1,66 @@ +using System.Text.Json; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using RabbitMQ.Client; +using SPMS.Application.DTOs.Push; +using SPMS.Application.Interfaces; +using SPMS.Application.Settings; + +namespace SPMS.Infrastructure.Messaging; + +public class PushQueueService : IPushQueueService +{ + private readonly RabbitMQConnection _connection; + private readonly RabbitMQSettings _settings; + private readonly ILogger _logger; + + public PushQueueService( + RabbitMQConnection connection, + IOptions settings, + ILogger logger) + { + _connection = connection; + _settings = settings.Value; + _logger = logger; + } + + public async Task PublishPushMessageAsync(PushMessageDto message, CancellationToken cancellationToken = default) + { + await PublishAsync(_settings.Exchange, "push", message.RequestId, message, cancellationToken); + _logger.LogInformation( + "Push 메시지 발행: request_id={RequestId}, send_type={SendType}, service_id={ServiceId}", + message.RequestId, message.SendType, message.ServiceId); + } + + public async Task PublishScheduleMessageAsync(ScheduleMessageDto message, CancellationToken cancellationToken = default) + { + await PublishAsync(_settings.Exchange, "schedule", message.PushMessage.RequestId, message, cancellationToken); + _logger.LogInformation( + "Schedule 메시지 발행: schedule_id={ScheduleId}, scheduled_at={ScheduledAt}, service_id={ServiceId}", + message.ScheduleId, message.ScheduledAt, message.ServiceId); + } + + private async Task PublishAsync(string exchange, string routingKey, string messageId, T message, + CancellationToken cancellationToken) + { + await using var channel = await _connection.CreateChannelAsync(cancellationToken); + + var body = JsonSerializer.SerializeToUtf8Bytes(message); + + var properties = new BasicProperties + { + Persistent = true, + ContentType = "application/json", + MessageId = messageId, + Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) + }; + + await channel.BasicPublishAsync( + exchange: exchange, + routingKey: routingKey, + mandatory: false, + basicProperties: properties, + body: body, + cancellationToken: cancellationToken); + } +} diff --git a/SPMS.Infrastructure/Messaging/RabbitMQConnection.cs b/SPMS.Infrastructure/Messaging/RabbitMQConnection.cs new file mode 100644 index 0000000..bb80f6e --- /dev/null +++ b/SPMS.Infrastructure/Messaging/RabbitMQConnection.cs @@ -0,0 +1,79 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using RabbitMQ.Client; +using SPMS.Application.Settings; + +namespace SPMS.Infrastructure.Messaging; + +public class RabbitMQConnection : IAsyncDisposable +{ + private readonly RabbitMQSettings _settings; + private readonly ILogger _logger; + private readonly SemaphoreSlim _semaphore = new(1, 1); + private IConnection? _connection; + + public RabbitMQConnection( + IOptions settings, + ILogger logger) + { + _settings = settings.Value; + _logger = logger; + } + + public async Task GetConnectionAsync(CancellationToken cancellationToken = default) + { + if (_connection is { IsOpen: true }) + return _connection; + + await _semaphore.WaitAsync(cancellationToken); + try + { + if (_connection is { IsOpen: true }) + return _connection; + + var factory = new ConnectionFactory + { + HostName = _settings.HostName, + Port = _settings.Port, + UserName = _settings.UserName, + Password = _settings.Password, + VirtualHost = _settings.VirtualHost, + AutomaticRecoveryEnabled = true, + NetworkRecoveryInterval = TimeSpan.FromSeconds(10), + RequestedHeartbeat = TimeSpan.FromSeconds(60) + }; + + _connection = await factory.CreateConnectionAsync(cancellationToken); + _logger.LogInformation("RabbitMQ 연결 성공: {HostName}:{Port}", _settings.HostName, _settings.Port); + + return _connection; + } + catch (Exception ex) + { + _logger.LogError(ex, "RabbitMQ 연결 실패: {HostName}:{Port}", _settings.HostName, _settings.Port); + throw; + } + finally + { + _semaphore.Release(); + } + } + + public async Task CreateChannelAsync(CancellationToken cancellationToken = default) + { + var connection = await GetConnectionAsync(cancellationToken); + return await connection.CreateChannelAsync(cancellationToken: cancellationToken); + } + + public async ValueTask DisposeAsync() + { + if (_connection is not null) + { + await _connection.CloseAsync(); + _connection.Dispose(); + _connection = null; + } + + _semaphore.Dispose(); + } +} diff --git a/SPMS.Infrastructure/Messaging/RabbitMQInitializer.cs b/SPMS.Infrastructure/Messaging/RabbitMQInitializer.cs new file mode 100644 index 0000000..1c22bba --- /dev/null +++ b/SPMS.Infrastructure/Messaging/RabbitMQInitializer.cs @@ -0,0 +1,91 @@ +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using RabbitMQ.Client; +using SPMS.Application.Settings; + +namespace SPMS.Infrastructure.Messaging; + +public class RabbitMQInitializer : IHostedService +{ + private readonly RabbitMQConnection _connection; + private readonly RabbitMQSettings _settings; + private readonly ILogger _logger; + + public RabbitMQInitializer( + RabbitMQConnection connection, + IOptions settings, + ILogger logger) + { + _connection = connection; + _settings = settings.Value; + _logger = logger; + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + try + { + await using var channel = await _connection.CreateChannelAsync(cancellationToken); + + // Exchange 선언: Direct, Durable + await channel.ExchangeDeclareAsync( + exchange: _settings.Exchange, + type: ExchangeType.Direct, + durable: true, + autoDelete: false, + arguments: null, + cancellationToken: cancellationToken); + + _logger.LogInformation("Exchange 선언 완료: {Exchange}", _settings.Exchange); + + var queueArgs = new Dictionary + { + { "x-message-ttl", _settings.MessageTtl } + }; + + // Push Queue 선언 + await channel.QueueDeclareAsync( + queue: _settings.PushQueue, + durable: true, + exclusive: false, + autoDelete: false, + arguments: queueArgs, + cancellationToken: cancellationToken); + + await channel.QueueBindAsync( + queue: _settings.PushQueue, + exchange: _settings.Exchange, + routingKey: "push", + cancellationToken: cancellationToken); + + _logger.LogInformation("Queue 선언 및 바인딩 완료: {Queue} → {Exchange} (routing_key: push)", + _settings.PushQueue, _settings.Exchange); + + // Schedule Queue 선언 + await channel.QueueDeclareAsync( + queue: _settings.ScheduleQueue, + durable: true, + exclusive: false, + autoDelete: false, + arguments: queueArgs, + cancellationToken: cancellationToken); + + await channel.QueueBindAsync( + queue: _settings.ScheduleQueue, + exchange: _settings.Exchange, + routingKey: "schedule", + cancellationToken: cancellationToken); + + _logger.LogInformation("Queue 선언 및 바인딩 완료: {Queue} → {Exchange} (routing_key: schedule)", + _settings.ScheduleQueue, _settings.Exchange); + } + catch (Exception ex) + { + _logger.LogError(ex, "RabbitMQ Exchange/Queue 초기화 실패"); + throw; + } + } + + public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask; +} diff --git a/SPMS.Infrastructure/SPMS.Infrastructure.csproj b/SPMS.Infrastructure/SPMS.Infrastructure.csproj index e66d498..46c0ff7 100644 --- a/SPMS.Infrastructure/SPMS.Infrastructure.csproj +++ b/SPMS.Infrastructure/SPMS.Infrastructure.csproj @@ -15,7 +15,10 @@ + + +