Skip to content

Commit

Permalink
Fix publish message counter
Browse files Browse the repository at this point in the history
  • Loading branch information
filipbekic01 committed Sep 26, 2024
1 parent ae5bd0a commit 0d62347
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 14 deletions.
2 changes: 2 additions & 0 deletions backend/ResQueue/ResQueue/Dtos/PublishDto.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@ namespace ResQueue.Dtos;

public record PublishDto(
string ExchangeId,
string QueueId,
string BrokerId,
string[] MessageIds
);
2 changes: 1 addition & 1 deletion backend/ResQueue/ResQueue/Endpoints/BrokerEndpoints.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public static void MapBrokerEndpoints(this IEndpointRouteBuilder routes)
return result.IsSuccess
? Results.Ok(result.Value)
: Results.Problem(result.Problem!);
}).AddRetryFilter();
}); // Do not add retry filter

group.MapPost("/test-connection",
async (IHttpClientFactory httpClientFactory, [FromBody] CreateBrokerDto dto) =>
Expand Down
4 changes: 2 additions & 2 deletions backend/ResQueue/ResQueue/Endpoints/MessageEndpoints.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public static void MapMessageEndpoints(this IEndpointRouteBuilder routes)
return result.IsSuccess
? Results.Ok(result.Value)
: Results.Problem(result.Problem!);
}).AddRetryFilter();
}); // Do not add retry filter

group.MapPost("publish",
async (IPublishMessagesFeature publishMessagesFeature, HttpContext httpContext,
Expand All @@ -147,7 +147,7 @@ public static void MapMessageEndpoints(this IEndpointRouteBuilder routes)
return result.IsSuccess
? Results.Ok(result.Value)
: Results.Problem(result.Problem!);
}).AddRetryFilter();
}); // Do not add retry filter

group.MapPost("review",
async (IReviewMessagesFeature publishMessagesFeature, HttpContext httpContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ public record PublishMessagesFeatureResponse();
public class PublishMessagesFeature(
IMongoCollection<Message> messagesCollection,
IMongoCollection<Exchange> exchangesCollection,
IMongoCollection<Queue> queuesCollection,
IMongoCollection<Models.Broker> brokersCollection,
UserManager<User> userManager
UserManager<User> userManager,
IMongoClient mongoClient
) : IPublishMessagesFeature
{
public async Task<OperationResult<PublishMessagesFeatureResponse>> ExecuteAsync(
Expand All @@ -34,14 +36,26 @@ public async Task<OperationResult<PublishMessagesFeatureResponse>> ExecuteAsync(
});
}

var broker = await brokersCollection
.Find(Builders<Models.Broker>.Filter.And(
Builders<Models.Broker>.Filter.Eq(b => b.Id, ObjectId.Parse(request.Dto.BrokerId)),
Builders<Models.Broker>.Filter.ElemMatch(b => b.AccessList, a => a.UserId == user.Id)
))
.SingleAsync();

var exchange = await exchangesCollection
.Find(Builders<Exchange>.Filter.Eq(b => b.Id, ObjectId.Parse(request.Dto.ExchangeId)))
.Find(Builders<Exchange>.Filter.And(
Builders<Exchange>.Filter.Eq(b => b.BrokerId, broker.Id),
Builders<Exchange>.Filter.Eq(b => b.Id, ObjectId.Parse(request.Dto.ExchangeId))
))
.SingleAsync();

var broker = await brokersCollection.Find(Builders<Models.Broker>.Filter.And(
Builders<Models.Broker>.Filter.Eq(b => b.Id, exchange.BrokerId),
Builders<Models.Broker>.Filter.ElemMatch(b => b.AccessList, a => a.UserId == user.Id)
)).SingleAsync();
var queue = await queuesCollection
.Find(Builders<Queue>.Filter.And(
Builders<Queue>.Filter.Eq(b => b.BrokerId, broker.Id),
Builders<Queue>.Filter.Eq(b => b.Id, ObjectId.Parse(request.Dto.QueueId))
))
.SingleAsync();

var messagesFilter =
Builders<Message>.Filter.In(b => b.Id, request.Dto.MessageIds.Select(ObjectId.Parse).ToList());
Expand Down Expand Up @@ -137,11 +151,50 @@ await messagesCollection

channel.BasicPublish(exchange.RawData.GetValue("name").AsString, "", false, props, body);

using var session = await mongoClient.StartSessionAsync();
session.StartTransaction();

// Define the update pipeline
var updatePipeline = new[]
{
// First stage: Decrement Messages and ensure it's not less than 0
new BsonDocument("$set", new BsonDocument
{
{
"Messages", new BsonDocument("$max", new BsonArray
{
0,
new BsonDocument("$subtract", new BsonArray { "$Messages", 1 })
})
}
}),
// Second stage: Update TotalMessages using the updated Messages and RawData.messages
new BsonDocument("$set", new BsonDocument
{
{
"TotalMessages", new BsonDocument("$max", new BsonArray
{
0,
new BsonDocument("$add", new BsonArray { "$Messages", "$RawData.messages" })
})
}
})
};

// Apply the update pipeline to the queues collection
await queuesCollection.UpdateOneAsync(
session,
x => x.Id == queue.Id,
Builders<Queue>.Update.Pipeline(updatePipeline)
);

await messagesCollection.UpdateOneAsync(
Builders<Message>.Filter
.Eq(b => b.Id, message.Id),
Builders<Message>.Update
.Set(b => b.DeletedAt, DateTime.UtcNow));
session,
Builders<Message>.Filter.Eq(b => b.Id, message.Id),
Builders<Message>.Update.Set(b => b.DeletedAt, DateTime.UtcNow)
);

await session.CommitTransactionAsync();
});

return OperationResult<PublishMessagesFeatureResponse>.Success(new PublishMessagesFeatureResponse());
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/api/messages/publishMessagesMutation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import axios from 'axios'

export interface PublishRequest {
exchangeId: string
queueId: string
brokerId: string
messageIds: string[]
}

Expand Down
4 changes: 3 additions & 1 deletion frontend/src/features/message/MessageActions.vue
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ const publishMessages = () => {
accept: () => {
publishMessagesAsync({
exchangeId: selectedExchange.value.id,
messageIds: props.selectedMessageIds
messageIds: props.selectedMessageIds,
brokerId: props.broker.id,
queueId: props.rabbitMqQueue.id
})
.then(() => {
toast.add({
Expand Down

0 comments on commit 0d62347

Please sign in to comment.