Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In Springwolf UI, add support for sending RabbitMQ Message towards an exchange using a RoutingKey #366

Open
pdalfarr opened this issue Sep 25, 2023 · 10 comments
Labels
amqp enhancement New feature or request

Comments

@pdalfarr
Copy link
Contributor

Describe the feature request
In Springwolf UI, add support for sending RabbitMQ Message towards an exchange using a Routing Key.

Motivation
Springwolf is able to list all the Publishers and Subscribers and present a UI which, not only list these, but also allow to test them by sending message.
Regarding RabbitMq, this 'message sending' feature is working fine for RabbitMQ QUEUES, but does not work for EXCHANGE.
This feature request aims to implements this missing part.

Technical details

Here are some technical details I gathered so far:

The springwolf UI allows to send messages to QUEUES (when plugin.amqp.publishing.enabled: true )
which is OK when specifying queue name in AsyncOperation.channelName

But sending a message to a TOPIC (exchange) is not working as expected:

Let's take an example:

In one of the RabbitMQ your example, here

https://github.com/springwolf/springwolf-core/blob/master/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/stavshamir/springwolf/example/amqp/producers/ExampleProducer.java#L17-L28

we have:

   @AsyncPublisher(
            operation =
                    @AsyncOperation(
                            channelName = "example-producer-channel-publisher",
                            description = "Custom, optional description defined in the AsyncPublisher annotation"))
    @AmqpAsyncOperationBinding()
    public void sendMessage(ExamplePayloadDto msg) {
        // send
        AnotherPayloadDto dto = new AnotherPayloadDto("fooValue", msg);
        rabbitTemplate.convertAndSend("example-topic-exchange", "example-topic-routing-key", dto);
    }

Now, let's suppose we have, in another micro-service, a method with the following annotation:

    @Override
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(
                            name = "microservice-2-private-queue-name" // this is a private queue of this micro-service: I do not want this string value to be known by the first micro-service (micro-service 1), i.e., your sample code
                    ),
                    exchange = @Exchange(
                            name = "example-producer-channel-publisher", // the exchange from in your sample code
                            type = ExchangeTypes.TOPIC
                    ),
                    key = "example-topic-routing-key" // the key from your sample code
            )
    )

So, this microservice2 does declare a topic which receives messages send by the RabbitMQ 'sample' code :

rabbitTemplate.convertAndSend("example-topic-exchange", "example-topic-routing-key", dto);

This is working fine, BUT, when using the Springwolf UI, it does not work.

It seems that messages sent from the UI goes to a QUEUE named "example-producer-channel-publisher" along with a routing-key with the same value, i.e. "example-producer-channel-publisher"

Ideas:
Maybe we could make use of amqpchannelbinding to express the fact that we want 'the UI' to send message towards a given exchange (a topic in my example), along with a routing key ?

In the example I gave here, your code (micro-service 1) is sending a message to a topic (rabbitTemplate.convertAndSend("example-topic-exchange", "example-topic-routing-key", dto);
).
Of course, I do NOT want the "microservice-2-private-queue-name" queue name (from micro-service 2) to be known by the AsyncPublisher annotation of micro-service 1.

And I want my microservice-2 being able to change 'at will' the name of it's own internal/private queue.
microservice-1should only know about the exchange+routing-key of micro-service 2.
In other words, microservice-1, must send message to this (let's say public) exchange, and this is what is actually done in the code.
So I guess this should be reflected in the annotation, as well as in the behavior of Springwolf UI.

I tested this on my side in my project, and the messages send from the UI get delivered to micro-service 2 if I set, in micro-service 1, the channelName to "microservice-2-private-queue-name".

Describe alternatives you've considered
I haven't considered any alternatives yet.

@pdalfarr pdalfarr added the enhancement New feature or request label Sep 25, 2023
@timonback
Copy link
Member

Thanks for this elaborate explanation.
Now, I understand that an exchange with a routingKey is a feature to hide the underlying queue name - or adjust it dynamically.

Basically you want to use springwolf-ui on microservice 1 to publish a message. It should take the info from rabbitTemplate.convertAndSend("example-topic-exchange", "example-topic-routing-key", dto);.
The exchange knows the private queue of microservice 2 and does the mapping internally.

Springwolf UI does not support this at this point.
I see three things that would need to change:

  1. Microservice 1 needs to be aware of the routingKey, i.e. through the AmqpAsyncChannelBinding annotation
  2. The Springwolf UI would need to extract the routingKey from the doc and render - specific for amqp - an additional publishing field - or at least pass it on.
  3. The SpringwolfAmqpProducer needs to send the message to the specified routingKey that is supplied from the frontend.

Feel free to contribute this feature, we are happy to help.

@timonback timonback added the amqp label Oct 6, 2023
@pdalfarr
Copy link
Contributor Author

pdalfarr commented Nov 22, 2023

Just adding a link and a picture to illustrate the different kind of exchanges supported by RabbitMQ:

image

src: https://hevodata.com/learn/rabbitmq-exchange-type/

So the 'complete chain' is:

Producer > Channel > Exchange > Binding > Routing Key > Queue > Consumer

@pdalfarr
Copy link
Contributor Author

pdalfarr commented May 31, 2024

I am not really sure of myself here, but still I do share some thoughts to try to move things forward.
The text below is a kind of brainstorming if you will.

I had a look at the code and I think that maybe we could first change a bit the code without adding a new 'routingKey' attribute you mentioned.

1. What if we also add bindings here:

ChannelObject channelItem = channelBuilder
.messages(Map.of(message.getMessageId(), MessageReference.toComponentMessage(message)))
.build();

So we we would have something like

 ChannelObject channelItem = channelBuilder 
         .messages(Map.of(message.getMessageId(), MessageReference.toComponentMessage(message))) 
         .bindings(toChannelBindings(operation.getBindings()) 
         .build();

2. So ChannelBinding would be available for these 2 methods here

(In my case, when I use AsyncPublisher + AsyncOperation and AmqpAsyncOperationBinding, the code below does not find any "amqp" bindings) :

private String getExchangeName(ChannelObject channelItem) {
String exchange = "";
if (channelItem.getBindings() != null && channelItem.getBindings().containsKey("amqp")) {
AMQPChannelBinding channelBinding =
(AMQPChannelBinding) channelItem.getBindings().get("amqp");
if (channelBinding.getExchange() != null
&& channelBinding.getExchange().getName() != null) {
exchange = channelBinding.getExchange().getName();
}
}
return exchange;
}
private String getRoutingKey(Operation operation) {
String routingKey = "";
if (operation != null
&& operation.getBindings() != null
&& operation.getBindings().containsKey("amqp")) {
AMQPOperationBinding operationBinding =
(AMQPOperationBinding) operation.getBindings().get("amqp");
if (!CollectionUtils.isEmpty(operationBinding.getCc())) {
routingKey = operationBinding.getCc().get(0);
}
}
return routingKey;

By doing this I think we should be able to deal with the type of AMQPChannelBinding

If it's AMQPChannelType.ROUTING_KEY, then we should publish like so:

    rabbitTemplate.convertAndSend("exchange-name", "routing-key-as-per-AMQPOperationBinding.cc[0]-OR-empty-string-if-not-defined", dto); 

and if it's AMQPChannelType.QUEUE, then we should publish like so:

    rabbitTemplate.convertAndSend("", "queue-name-as-routing-key", dto); // we publish towards the 'default exchange'

for the latter case, maybe we should be able to to specify / or to obtain from some place , the exchange, other than default exchange?
Or maybe the AMQPChannelType.QUEUEis meant to declare a queue, but not a binding to it.. so there is no point of taking this into in the from of performing a rabbitTemplate.convertAndSend method call.. this I am not really sure of.

3. About existing annotations

FYI, the method I am testing in the frame of this explanation has the following annotations

    @AsyncPublisher(
            operation = @AsyncOperation(
                    channelName = "", // I want to publish on exchange "", is this the proper way to specify this? this generate some weird things in asyncapi.yaml , like '_send_other_service.get-observations.v1' and such ... 
                    description = "other_service.get-observations.v1" + " description",
                    payloadType = GetObservationsRequest.class
            )
    )
    @AmqpAsyncOperationBinding(
            // cc[0] = routingKey, as defined by the other service
            // is this OK? or should I write cc = "other_service.get-observations.v1", without { } ?
            cc = {"other_service.get-observations.v1"}
    )
AmqpAsyncOperationBinding.cc[0] )
    @Override
    public void sendObservationsRequest(

So, here, I am expecting SpringWolf UI to do this:

rabbitTemplate.convertAndSend( AsyncPublisher.operation.channelName , AmqpAsyncOperationBinding.cc[0] )

which is, with actual values :

rabbitTemplate.convertAndSend( "" , "other_service.get-observations.v1")

but what I do observe is this:

rabbitTemplate.convertAndSend( "" , "")

so, routingKey is not 'taken from' AmqpAsyncOperationBinding.cc[0]

4. Possible or not ?

What is your opinion on the idea presented here?
Is it feasible and correct regarding the existing code base?
I am a bit worried about adding bindings in ChannelObject as I have no idea about the implication of such a change.
I do not have the big picture of SpringWolf in mind, so maybe it would break things here and there.

(
5. What's next

Once done, we could could possibly think of a feature in SpringWolf UI which would be 'overwrite routingKey'.
But I guess we could first focus in the current proposal here.
)

@pdalfarr
Copy link
Contributor Author

pdalfarr commented Jun 3, 2024

FYI, here is a screenshot of RabbitMQ management UI.
This screenshot illustrates how one can send a message "towards a queue".
More precisely, the message will be sent

  • to the "Default Exchange" (i.e., the Direct Exchange with name "")
  • with routingKey being the name of the queue (here "contr.test")

(The "Default exchange" then will 'route' the message to the queue)

image

Notes

  • Headers can be specified: SpringWolf already support this thanks to @AsyncOperation.Headers , which is great.
  • Properties can be defined: as far as I know, SpringWolf does not support Properties. Or maybe I missed something.
    Properties section would be nice to have because it would allow to specify additional info, like 'correlation-id', 'reply-to', and other essential properties
  • (FYI, UI to send a message towards an Exchange (instead of a queue) is a bit different:
    it takes into account exchange name (of course) and also has an addition 'routingKey' form input

@pdalfarr
Copy link
Contributor Author

pdalfarr commented Jun 5, 2024

@timonback @sam0r040
I just created a PR to share some new unit tests with you.
I think there is something fishy with asyncapi.yaml generation using @RabbitListenr as input.
PR availalbe here:
#790
If you run ApiIntegrationTest tests, we will see what I mean ;-)

@timonback
Copy link
Member

Thanks for the PR @pdalfarr

I guess we need to into the asyncapi docs how amqp is mapped to channels, particular routing keys.
In case you find time to check, this seems to be one of the relevant lines within springwolf:

@pdalfarr
Copy link
Contributor Author

pdalfarr commented Jun 5, 2024

You are welcome.
I'll try to investigate.
Also, FYI, I sent out a message in a bottle here https://stackoverflow.com/questions/78581918/async-api-bare-minimum-asyncapi-yaml-for-simple-rabbitlistener

@pdalfarr
Copy link
Contributor Author

@timonback FYI, a guy from Modelina answered to my SO question here : https://stackoverflow.com/questions/78581918/async-api-bare-minimum-asyncapi-yaml-for-simple-rabbitlistener?noredirect=1#comment138573527_78581918
inital contact with him was established here: asyncapi/modelina#1376

I think there is valuable information there for your springwolf-amqp plugin ;-)

@timonback
Copy link
Member

Thanks for the contact.

I get the feeling that we need to deeply look into the amqp plugin implementation and how exchanges, routing keys and queues are mapped to asyncapi and springwolf.

timonback added a commit to timonback/springwolf-core that referenced this issue Aug 27, 2024
@timonback
Copy link
Member

Relates to asyncapi/bindings#259

timonback added a commit to timonback/springwolf-core that referenced this issue Sep 3, 2024
timonback added a commit to timonback/springwolf-core that referenced this issue Sep 7, 2024
timonback added a commit that referenced this issue Sep 13, 2024
* tests: refactoring + add yaml endpoint testing

* tests: additional RabbitListener tests

* chore(amqp): fix queue configuration

* chore(amqp): simplify local testing

* chore: update asyncapi.yaml gradle script

* feat(amqp): scan all queues

In addition to the routingKeys

* test(amqp): update asyncapi artifacts

Part of GH-366

* test(ui): use valid mock data

* chore(ui): fix formatting

* test(amqp): update asyncapi artifacts

* feat(ui): show channel bindings

* test: add WaitStrategy for ApiSystemTest

* chore(amqp): use non-exclusive queue in example

* chore(amqp): use non-exclusive queue in example

* test(amqp): persist patched asyncapi.yaml

* chore(amqp): add spring-messaging dependency in example

* test(amqp): wait for ready amqp server

* test(amqp): update e2e

* test(amqp): cleanup

---------

Co-authored-by: Pascal Dal Farra <[email protected]>
timonback pushed a commit to timonback/springwolf-core that referenced this issue Sep 28, 2024
test(amqp): update e2e

test(amqp): wait for ready amqp server

chore(amqp): add spring-messaging dependency in example

test(amqp): persist patched asyncapi.yaml

chore(amqp): use non-exclusive queue in example

test: add WaitStrategy for ApiSystemTest

feat(ui): show channel bindings

test(amqp): update asyncapi artifacts

chore(ui): fix formatting

test(ui): use valid mock data

test(amqp): update asyncapi artifacts

Part of springwolfGH-366

feat(amqp): scan all queues

In addition to the routingKeys

chore: update asyncapi.yaml gradle script

chore(amqp): simplify local testing

chore(amqp): fix queue configuration

tests: additional RabbitListener tests

tests: refactoring + add yaml endpoint testing
ruskaof pushed a commit to ruskaof/springwolf-core that referenced this issue Nov 20, 2024
* tests: refactoring + add yaml endpoint testing

* tests: additional RabbitListener tests

* chore(amqp): fix queue configuration

* chore(amqp): simplify local testing

* chore: update asyncapi.yaml gradle script

* feat(amqp): scan all queues

In addition to the routingKeys

* test(amqp): update asyncapi artifacts

Part of springwolfGH-366

* test(ui): use valid mock data

* chore(ui): fix formatting

* test(amqp): update asyncapi artifacts

* feat(ui): show channel bindings

* test: add WaitStrategy for ApiSystemTest

* chore(amqp): use non-exclusive queue in example

* chore(amqp): use non-exclusive queue in example

* test(amqp): persist patched asyncapi.yaml

* chore(amqp): add spring-messaging dependency in example

* test(amqp): wait for ready amqp server

* test(amqp): update e2e

* test(amqp): cleanup

---------

Co-authored-by: Pascal Dal Farra <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
amqp enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants