-
Notifications
You must be signed in to change notification settings - Fork 2
/
DependencyInjection.cs
144 lines (124 loc) · 6.21 KB
/
DependencyInjection.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#region Usings
using MassTransit;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Records.Shared.Infra.Idempotence.Abstractions;
using Records.Shared.Infra.Idempotence.Redis;
using Records.Shared.Infra.MessageBroker.Abstractions;
using System;
using System.Reflection;
#endregion
namespace Records.Shared.Infra.MessageBroker.MassTransit.DI;
/// <summary>
/// Extensions methods for dependency injection.
/// </summary>
public static class DependencyInjection
{
#region Public methods
/// <summary>
/// Registers the necessary services with the DI framework.
/// </summary>
/// <param name="services">The service collection.</param>
/// <param name="configuration">Represents a set of key/value application configuration properties.</param>
/// <param name="consumersAssembly">The assembly to scan for consumers.</param>
/// <returns>The same service collection.</returns>
public static IServiceCollection AddMessageBrokerWithMassTransitForApi(
this IServiceCollection services,
IConfiguration configuration,
Assembly consumersAssembly)
{
return AddMessageBrokerWithMassTransit(services, configuration, consumersAssembly, true);
}
/// <summary>
/// Registers the necessary services with the DI framework.
/// </summary>
/// <param name="services">The service collection.</param>
/// <param name="configuration">Represents a set of key/value application configuration properties.</param>
/// <param name="consumersAssembly">The assembly to scan for consumers.</param>
/// <returns>The same service collection.</returns>
public static IServiceCollection AddMessageBrokerWithMassTransitForWorker(
this IServiceCollection services,
IConfiguration configuration,
Assembly consumersAssembly)
{
return AddMessageBrokerWithMassTransit(services, configuration, consumersAssembly, false);
}
/// <summary>
/// Registers the necessary services with the DI framework.
/// </summary>
/// <param name="services">The service collection.</param>
/// <param name="configuration">Represents a set of key/value application configuration properties.</param>
/// <param name="consumersAssembly">The assembly to scan for consumers.</param>
/// <param name="forApi">True, registers the EventBus for APIs. Otherwise, registers for Workers.</param>
/// <returns>The same service collection.</returns>
private static IServiceCollection AddMessageBrokerWithMassTransit(
this IServiceCollection services,
IConfiguration configuration,
Assembly consumersAssembly,
bool forApi)
{
ArgumentNullException.ThrowIfNull(configuration);
// Idempotency service for messages.
services.AddSingleton<IIdempotentMessageService, IdempotentMessageService>();
services.AddSingleton<IObsoleteMessageService, ObsoleteMessageService>();
// MessageBrokerSettings.
MessageBrokerSettings? messageBrokerSettings = configuration.GetSection(MessageBrokerSettings.SettingsKey).Get<MessageBrokerSettings>()
?? throw new Exception("MessageBroker settings not found in appsetings");
services.AddSingleton(sp => sp.GetRequiredService<IOptions<MessageBrokerSettings>>().Value);
// IEventBus.
// View: EventBusForApi vs EventBusForWorker.
if (forApi)
{
// For APIs (uses IPublishEndpoint and ISendEndpointProvider).
services.AddTransient<IEventBus, EventBusForApi>();
}
else
{
// For Workers (uses IBus).
services.AddTransient<IEventBus, EventBusForWorker>();
}
// MassTransit + consumers.
services.AddMassTransit(busCfg =>
{
busCfg.SetKebabCaseEndpointNameFormatter();
////var entryAssembly = Assembly.GetEntryAssembly(); // NOTE: It only works if its are in this assembly.
////Assembly entryAssembly = Assembly.GetAssembly(PersonCreatedIntegrationEventConsumer);
Assembly entryAssembly = consumersAssembly;
busCfg.AddConsumers(entryAssembly);
// Used in cfg.ConfigureEndpoints bellow.
////busCfg.SetEndpointNameFormatter(new CustomEndpointNameFormatter("my-custom-prefix-{0}"));
////busCfg.SetEndpointNameFormatter(new KebabCaseEndpointNameFormatter("test", true));
busCfg.UsingRabbitMq((context, cfg) =>
{
cfg.Host(messageBrokerSettings.Host, h =>
{
h.Username(messageBrokerSettings.Username);
h.Password(messageBrokerSettings.Password);
});
// Default retry policy (3 time with 5sec. of interval) for all consumers.
// If you want a custom policy for a specific consumer, create a ConsumerDefinition,
// that ConsumerDefinition will override this global policy.
if (messageBrokerSettings.DefaultRetryPolicyEnable)
{
cfg.UseMessageRetry(r => r.Interval(
messageBrokerSettings.DefaultRetryPolicyMaxRetries,
messageBrokerSettings.DefaultRetryPolicyInterval));
}
// (Obsolet) Now uses ConsumerMetadataSupport class. See IdempotentConsumerFilter
// obsolete comment for more details.
// Idempotency filter.
////cfg.UseConsumeFilter(typeof(IdempotentConsumerFilter<>), context);
// Adds the global exception filter.
cfg.UseConsumeFilter(typeof(GlobalExceptionFilter<>), context);
// Configure the endpoints for all defined consumer, saga, and activity types.
// NOTE: It includes the namespace because we consume the same event y several projects
// and we use the same consumer name, so this way will prefix the namespace to the
// queues names.
cfg.ConfigureEndpoints(context, new KebabCaseEndpointNameFormatter(true));
});
});
return services;
}
#endregion
}