feat: RabbitMQ 인프라 설정 (Exchange/Queue) (#102)
This commit is contained in:
parent
1cae5c3754
commit
4f38e31710
|
|
@ -15,7 +15,12 @@
|
||||||
"Port": 0,
|
"Port": 0,
|
||||||
"UserName": "",
|
"UserName": "",
|
||||||
"Password": "",
|
"Password": "",
|
||||||
"VirtualHost": "/"
|
"VirtualHost": "/",
|
||||||
|
"Exchange": "spms.push.exchange",
|
||||||
|
"PushQueue": "spms.push.queue",
|
||||||
|
"ScheduleQueue": "spms.schedule.queue",
|
||||||
|
"PrefetchCount": 10,
|
||||||
|
"MessageTtl": 86400000
|
||||||
},
|
},
|
||||||
"CredentialEncryption": {
|
"CredentialEncryption": {
|
||||||
"Key": ""
|
"Key": ""
|
||||||
|
|
|
||||||
42
SPMS.Application/DTOs/Push/PushMessageDto.cs
Normal file
42
SPMS.Application/DTOs/Push/PushMessageDto.cs
Normal file
|
|
@ -0,0 +1,42 @@
|
||||||
|
using System.Text.Json.Serialization;
|
||||||
|
|
||||||
|
namespace SPMS.Application.DTOs.Push;
|
||||||
|
|
||||||
|
public class PushMessageDto
|
||||||
|
{
|
||||||
|
[JsonPropertyName("message_id")]
|
||||||
|
public string MessageId { get; set; } = string.Empty;
|
||||||
|
|
||||||
|
[JsonPropertyName("request_id")]
|
||||||
|
public string RequestId { get; set; } = string.Empty;
|
||||||
|
|
||||||
|
[JsonPropertyName("service_id")]
|
||||||
|
public long ServiceId { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("send_type")]
|
||||||
|
public string SendType { get; set; } = string.Empty;
|
||||||
|
|
||||||
|
[JsonPropertyName("title")]
|
||||||
|
public string Title { get; set; } = string.Empty;
|
||||||
|
|
||||||
|
[JsonPropertyName("body")]
|
||||||
|
public string Body { get; set; } = string.Empty;
|
||||||
|
|
||||||
|
[JsonPropertyName("image_url")]
|
||||||
|
public string? ImageUrl { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("link_url")]
|
||||||
|
public string? LinkUrl { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("custom_data")]
|
||||||
|
public Dictionary<string, object>? CustomData { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("target")]
|
||||||
|
public PushTargetDto Target { get; set; } = new();
|
||||||
|
|
||||||
|
[JsonPropertyName("created_by")]
|
||||||
|
public long CreatedBy { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("created_at")]
|
||||||
|
public string CreatedAt { get; set; } = string.Empty;
|
||||||
|
}
|
||||||
13
SPMS.Application/DTOs/Push/PushTargetDto.cs
Normal file
13
SPMS.Application/DTOs/Push/PushTargetDto.cs
Normal file
|
|
@ -0,0 +1,13 @@
|
||||||
|
using System.Text.Json;
|
||||||
|
using System.Text.Json.Serialization;
|
||||||
|
|
||||||
|
namespace SPMS.Application.DTOs.Push;
|
||||||
|
|
||||||
|
public class PushTargetDto
|
||||||
|
{
|
||||||
|
[JsonPropertyName("type")]
|
||||||
|
public string Type { get; set; } = string.Empty;
|
||||||
|
|
||||||
|
[JsonPropertyName("value")]
|
||||||
|
public JsonElement? Value { get; set; }
|
||||||
|
}
|
||||||
21
SPMS.Application/DTOs/Push/ScheduleMessageDto.cs
Normal file
21
SPMS.Application/DTOs/Push/ScheduleMessageDto.cs
Normal file
|
|
@ -0,0 +1,21 @@
|
||||||
|
using System.Text.Json.Serialization;
|
||||||
|
|
||||||
|
namespace SPMS.Application.DTOs.Push;
|
||||||
|
|
||||||
|
public class ScheduleMessageDto
|
||||||
|
{
|
||||||
|
[JsonPropertyName("schedule_id")]
|
||||||
|
public string ScheduleId { get; set; } = string.Empty;
|
||||||
|
|
||||||
|
[JsonPropertyName("message_id")]
|
||||||
|
public string MessageId { get; set; } = string.Empty;
|
||||||
|
|
||||||
|
[JsonPropertyName("service_id")]
|
||||||
|
public long ServiceId { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("scheduled_at")]
|
||||||
|
public string ScheduledAt { get; set; } = string.Empty;
|
||||||
|
|
||||||
|
[JsonPropertyName("push_message")]
|
||||||
|
public PushMessageDto PushMessage { get; set; } = new();
|
||||||
|
}
|
||||||
9
SPMS.Application/Interfaces/IPushQueueService.cs
Normal file
9
SPMS.Application/Interfaces/IPushQueueService.cs
Normal file
|
|
@ -0,0 +1,9 @@
|
||||||
|
using SPMS.Application.DTOs.Push;
|
||||||
|
|
||||||
|
namespace SPMS.Application.Interfaces;
|
||||||
|
|
||||||
|
public interface IPushQueueService
|
||||||
|
{
|
||||||
|
Task PublishPushMessageAsync(PushMessageDto message, CancellationToken cancellationToken = default);
|
||||||
|
Task PublishScheduleMessageAsync(ScheduleMessageDto message, CancellationToken cancellationToken = default);
|
||||||
|
}
|
||||||
17
SPMS.Application/Settings/RabbitMQSettings.cs
Normal file
17
SPMS.Application/Settings/RabbitMQSettings.cs
Normal file
|
|
@ -0,0 +1,17 @@
|
||||||
|
namespace SPMS.Application.Settings;
|
||||||
|
|
||||||
|
public class RabbitMQSettings
|
||||||
|
{
|
||||||
|
public const string SectionName = "RabbitMQ";
|
||||||
|
|
||||||
|
public string HostName { get; set; } = string.Empty;
|
||||||
|
public int Port { get; set; } = 5672;
|
||||||
|
public string UserName { get; set; } = string.Empty;
|
||||||
|
public string Password { get; set; } = string.Empty;
|
||||||
|
public string VirtualHost { get; set; } = "/";
|
||||||
|
public string Exchange { get; set; } = "spms.push.exchange";
|
||||||
|
public string PushQueue { get; set; } = "spms.push.queue";
|
||||||
|
public string ScheduleQueue { get; set; } = "spms.schedule.queue";
|
||||||
|
public ushort PrefetchCount { get; set; } = 10;
|
||||||
|
public int MessageTtl { get; set; } = 86400000;
|
||||||
|
}
|
||||||
|
|
@ -2,8 +2,10 @@ using Microsoft.EntityFrameworkCore;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using SPMS.Application.Interfaces;
|
using SPMS.Application.Interfaces;
|
||||||
|
using SPMS.Application.Settings;
|
||||||
using SPMS.Domain.Interfaces;
|
using SPMS.Domain.Interfaces;
|
||||||
using SPMS.Infrastructure.Auth;
|
using SPMS.Infrastructure.Auth;
|
||||||
|
using SPMS.Infrastructure.Messaging;
|
||||||
using SPMS.Infrastructure.Persistence;
|
using SPMS.Infrastructure.Persistence;
|
||||||
using SPMS.Infrastructure.Persistence.Repositories;
|
using SPMS.Infrastructure.Persistence.Repositories;
|
||||||
using SPMS.Infrastructure.Security;
|
using SPMS.Infrastructure.Security;
|
||||||
|
|
@ -43,6 +45,12 @@ public static class DependencyInjection
|
||||||
// File Storage
|
// File Storage
|
||||||
services.AddSingleton<IFileStorageService, LocalFileStorageService>();
|
services.AddSingleton<IFileStorageService, LocalFileStorageService>();
|
||||||
|
|
||||||
|
// RabbitMQ
|
||||||
|
services.Configure<RabbitMQSettings>(configuration.GetSection(RabbitMQSettings.SectionName));
|
||||||
|
services.AddSingleton<RabbitMQConnection>();
|
||||||
|
services.AddHostedService<RabbitMQInitializer>();
|
||||||
|
services.AddScoped<IPushQueueService, PushQueueService>();
|
||||||
|
|
||||||
// Token Store & Email Service
|
// Token Store & Email Service
|
||||||
services.AddMemoryCache();
|
services.AddMemoryCache();
|
||||||
services.AddSingleton<ITokenStore, InMemoryTokenStore>();
|
services.AddSingleton<ITokenStore, InMemoryTokenStore>();
|
||||||
|
|
|
||||||
66
SPMS.Infrastructure/Messaging/PushQueueService.cs
Normal file
66
SPMS.Infrastructure/Messaging/PushQueueService.cs
Normal file
|
|
@ -0,0 +1,66 @@
|
||||||
|
using System.Text.Json;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using RabbitMQ.Client;
|
||||||
|
using SPMS.Application.DTOs.Push;
|
||||||
|
using SPMS.Application.Interfaces;
|
||||||
|
using SPMS.Application.Settings;
|
||||||
|
|
||||||
|
namespace SPMS.Infrastructure.Messaging;
|
||||||
|
|
||||||
|
public class PushQueueService : IPushQueueService
|
||||||
|
{
|
||||||
|
private readonly RabbitMQConnection _connection;
|
||||||
|
private readonly RabbitMQSettings _settings;
|
||||||
|
private readonly ILogger<PushQueueService> _logger;
|
||||||
|
|
||||||
|
public PushQueueService(
|
||||||
|
RabbitMQConnection connection,
|
||||||
|
IOptions<RabbitMQSettings> settings,
|
||||||
|
ILogger<PushQueueService> logger)
|
||||||
|
{
|
||||||
|
_connection = connection;
|
||||||
|
_settings = settings.Value;
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task PublishPushMessageAsync(PushMessageDto message, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
await PublishAsync(_settings.Exchange, "push", message.RequestId, message, cancellationToken);
|
||||||
|
_logger.LogInformation(
|
||||||
|
"Push 메시지 발행: request_id={RequestId}, send_type={SendType}, service_id={ServiceId}",
|
||||||
|
message.RequestId, message.SendType, message.ServiceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task PublishScheduleMessageAsync(ScheduleMessageDto message, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
await PublishAsync(_settings.Exchange, "schedule", message.PushMessage.RequestId, message, cancellationToken);
|
||||||
|
_logger.LogInformation(
|
||||||
|
"Schedule 메시지 발행: schedule_id={ScheduleId}, scheduled_at={ScheduledAt}, service_id={ServiceId}",
|
||||||
|
message.ScheduleId, message.ScheduledAt, message.ServiceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task PublishAsync<T>(string exchange, string routingKey, string messageId, T message,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
await using var channel = await _connection.CreateChannelAsync(cancellationToken);
|
||||||
|
|
||||||
|
var body = JsonSerializer.SerializeToUtf8Bytes(message);
|
||||||
|
|
||||||
|
var properties = new BasicProperties
|
||||||
|
{
|
||||||
|
Persistent = true,
|
||||||
|
ContentType = "application/json",
|
||||||
|
MessageId = messageId,
|
||||||
|
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds())
|
||||||
|
};
|
||||||
|
|
||||||
|
await channel.BasicPublishAsync(
|
||||||
|
exchange: exchange,
|
||||||
|
routingKey: routingKey,
|
||||||
|
mandatory: false,
|
||||||
|
basicProperties: properties,
|
||||||
|
body: body,
|
||||||
|
cancellationToken: cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
79
SPMS.Infrastructure/Messaging/RabbitMQConnection.cs
Normal file
79
SPMS.Infrastructure/Messaging/RabbitMQConnection.cs
Normal file
|
|
@ -0,0 +1,79 @@
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using RabbitMQ.Client;
|
||||||
|
using SPMS.Application.Settings;
|
||||||
|
|
||||||
|
namespace SPMS.Infrastructure.Messaging;
|
||||||
|
|
||||||
|
public class RabbitMQConnection : IAsyncDisposable
|
||||||
|
{
|
||||||
|
private readonly RabbitMQSettings _settings;
|
||||||
|
private readonly ILogger<RabbitMQConnection> _logger;
|
||||||
|
private readonly SemaphoreSlim _semaphore = new(1, 1);
|
||||||
|
private IConnection? _connection;
|
||||||
|
|
||||||
|
public RabbitMQConnection(
|
||||||
|
IOptions<RabbitMQSettings> settings,
|
||||||
|
ILogger<RabbitMQConnection> logger)
|
||||||
|
{
|
||||||
|
_settings = settings.Value;
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<IConnection> GetConnectionAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
if (_connection is { IsOpen: true })
|
||||||
|
return _connection;
|
||||||
|
|
||||||
|
await _semaphore.WaitAsync(cancellationToken);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (_connection is { IsOpen: true })
|
||||||
|
return _connection;
|
||||||
|
|
||||||
|
var factory = new ConnectionFactory
|
||||||
|
{
|
||||||
|
HostName = _settings.HostName,
|
||||||
|
Port = _settings.Port,
|
||||||
|
UserName = _settings.UserName,
|
||||||
|
Password = _settings.Password,
|
||||||
|
VirtualHost = _settings.VirtualHost,
|
||||||
|
AutomaticRecoveryEnabled = true,
|
||||||
|
NetworkRecoveryInterval = TimeSpan.FromSeconds(10),
|
||||||
|
RequestedHeartbeat = TimeSpan.FromSeconds(60)
|
||||||
|
};
|
||||||
|
|
||||||
|
_connection = await factory.CreateConnectionAsync(cancellationToken);
|
||||||
|
_logger.LogInformation("RabbitMQ 연결 성공: {HostName}:{Port}", _settings.HostName, _settings.Port);
|
||||||
|
|
||||||
|
return _connection;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "RabbitMQ 연결 실패: {HostName}:{Port}", _settings.HostName, _settings.Port);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_semaphore.Release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
var connection = await GetConnectionAsync(cancellationToken);
|
||||||
|
return await connection.CreateChannelAsync(cancellationToken: cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async ValueTask DisposeAsync()
|
||||||
|
{
|
||||||
|
if (_connection is not null)
|
||||||
|
{
|
||||||
|
await _connection.CloseAsync();
|
||||||
|
_connection.Dispose();
|
||||||
|
_connection = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
_semaphore.Dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
91
SPMS.Infrastructure/Messaging/RabbitMQInitializer.cs
Normal file
91
SPMS.Infrastructure/Messaging/RabbitMQInitializer.cs
Normal file
|
|
@ -0,0 +1,91 @@
|
||||||
|
using Microsoft.Extensions.Hosting;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using RabbitMQ.Client;
|
||||||
|
using SPMS.Application.Settings;
|
||||||
|
|
||||||
|
namespace SPMS.Infrastructure.Messaging;
|
||||||
|
|
||||||
|
public class RabbitMQInitializer : IHostedService
|
||||||
|
{
|
||||||
|
private readonly RabbitMQConnection _connection;
|
||||||
|
private readonly RabbitMQSettings _settings;
|
||||||
|
private readonly ILogger<RabbitMQInitializer> _logger;
|
||||||
|
|
||||||
|
public RabbitMQInitializer(
|
||||||
|
RabbitMQConnection connection,
|
||||||
|
IOptions<RabbitMQSettings> settings,
|
||||||
|
ILogger<RabbitMQInitializer> logger)
|
||||||
|
{
|
||||||
|
_connection = connection;
|
||||||
|
_settings = settings.Value;
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task StartAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await using var channel = await _connection.CreateChannelAsync(cancellationToken);
|
||||||
|
|
||||||
|
// Exchange 선언: Direct, Durable
|
||||||
|
await channel.ExchangeDeclareAsync(
|
||||||
|
exchange: _settings.Exchange,
|
||||||
|
type: ExchangeType.Direct,
|
||||||
|
durable: true,
|
||||||
|
autoDelete: false,
|
||||||
|
arguments: null,
|
||||||
|
cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
_logger.LogInformation("Exchange 선언 완료: {Exchange}", _settings.Exchange);
|
||||||
|
|
||||||
|
var queueArgs = new Dictionary<string, object?>
|
||||||
|
{
|
||||||
|
{ "x-message-ttl", _settings.MessageTtl }
|
||||||
|
};
|
||||||
|
|
||||||
|
// Push Queue 선언
|
||||||
|
await channel.QueueDeclareAsync(
|
||||||
|
queue: _settings.PushQueue,
|
||||||
|
durable: true,
|
||||||
|
exclusive: false,
|
||||||
|
autoDelete: false,
|
||||||
|
arguments: queueArgs,
|
||||||
|
cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
await channel.QueueBindAsync(
|
||||||
|
queue: _settings.PushQueue,
|
||||||
|
exchange: _settings.Exchange,
|
||||||
|
routingKey: "push",
|
||||||
|
cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
_logger.LogInformation("Queue 선언 및 바인딩 완료: {Queue} → {Exchange} (routing_key: push)",
|
||||||
|
_settings.PushQueue, _settings.Exchange);
|
||||||
|
|
||||||
|
// Schedule Queue 선언
|
||||||
|
await channel.QueueDeclareAsync(
|
||||||
|
queue: _settings.ScheduleQueue,
|
||||||
|
durable: true,
|
||||||
|
exclusive: false,
|
||||||
|
autoDelete: false,
|
||||||
|
arguments: queueArgs,
|
||||||
|
cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
await channel.QueueBindAsync(
|
||||||
|
queue: _settings.ScheduleQueue,
|
||||||
|
exchange: _settings.Exchange,
|
||||||
|
routingKey: "schedule",
|
||||||
|
cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
_logger.LogInformation("Queue 선언 및 바인딩 완료: {Queue} → {Exchange} (routing_key: schedule)",
|
||||||
|
_settings.ScheduleQueue, _settings.Exchange);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "RabbitMQ Exchange/Queue 초기화 실패");
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
@ -15,7 +15,10 @@
|
||||||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.0" />
|
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.0" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="10.0.2" />
|
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="10.0.2" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="10.0.2" />
|
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="10.0.2" />
|
||||||
|
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="10.0.2" />
|
||||||
|
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="10.0.2" />
|
||||||
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="9.0.0" />
|
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="9.0.0" />
|
||||||
|
<PackageReference Include="RabbitMQ.Client" Version="7.2.0" />
|
||||||
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="8.15.0" />
|
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="8.15.0" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user