-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DO NOT MERGE YET] GH-3103: Introduce CloudEvents transformers #3246
base: main
Are you sure you want to change the base?
Conversation
/** | ||
* Cloud event headers prefix as a {@value HEADER_PREFIX}. | ||
*/ | ||
public static final String HEADER_PREFIX = "ce_"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ce-
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for feedback!
However we talk here about a general approach for any possible protocol supported here in Spring Integration. I see you use proposed prefix for HTTP, but ours for Kafka. BTW Any other headers mapping in Spring Integration uses an underscore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for feedback!
However we talk here about a general approach for any possible protocol supported here in Spring Integration. I see you use proposed prefix for HTTP, but ours for Kafka. BTW Any other headers mapping in Spring Integration uses an underscore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A generalized approach is not going to work if you also want to interoperate with other spec implementors, like the goland sdk over http will reject your messages as invalid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It won't because we will re-map our headers according agreed HTTP protocol binder.
.map(Accessor::extensionsOf) | ||
.map(ExtensionFormat::marshal) | ||
.map(HeaderMapper::map) | ||
.map(Json::binaryMarshal) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the payload may not be json
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But that’s exactly what you do for Kafka and HTTP. What am I missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But that’s exactly what you do for Kafka and HTTP. What am I missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only assume json if it is structured mode, and we pre-compute data into a byte array before attempting to create a structured message.
for binary, we have a set of supported marshalers based on media type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I think we can give it a chance to be configurable.
That's an improvement we can do.
Thanks.
GenericMessage<String> message = new GenericMessage<>("test"); | ||
Message<?> result = transformer.transform(message); | ||
assertThat(result.getHeaders()) | ||
.containsEntry("ce_type", String.class.getName()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note these all need to be ce-*
not ce_
if this is for http.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, one more time. What we do here is a general framework approach when you would like to build a CloudEvent
and carry it from component to component as our Message<?>
content.
This is not about protocol binding per se.
Of course when we come with this content to the HTTP level we will remap these ce_
to ce-
.
I have designed this after this library general approach for target protocol header mapping, e.g. https://github.com/spring-projects/spring-integration/blob/master/spring-integration-http/src/main/java/org/springframework/integration/http/HttpHeaders.java#L26, https://github.com/spring-projects/spring-integration/blob/master/spring-integration-redis/src/main/java/org/springframework/integration/redis/support/RedisHeaders.java#L34 and so on.
Plus I have looked to CE Kafka protocol binder and this is that prefix: https://github.com/cloudevents/sdk-java/blob/master/kafka/src/main/java/io/cloudevents/v1/kafka/HeaderMapper.java#L25
Rather than implement this yourself, you should come help fix https://github.com/cloudevents/sdk-java |
I'm not sure what you suggest us to fix over there since here we try to come up with some framework-specific tool which should help end-users to deal with CE well-know Spring Integration way. Thank you for all your feedback! Now we have something else to think about 😄 |
ok it sounds like I misunderstood this, you are not implementing for HTTP, but for some custom Message<> type and you are free to use any method to add prefix (or no prefix if you don't need them). In that case you might consider writing a small spec with this PR to allow others to also implement the same choices you are making for Message<>. We had to do this for PubSub to be spec compliant: https://github.com/google/knative-gcp/blob/master/docs/spec/pubsub-protocol-binding.md And this now lets pubsub be linked from the CloudEvents spec: https://github.com/cloudevents/spec/blob/master/proprietary-specs.md It would be pretty neat to have Spring in that list too! |
*/ | ||
public static Map<String, String> map(Map<String, String> attributes, Map<String, String> extensions) { | ||
Assert.notNull(attributes, "'attributes' must noy be null"); | ||
Assert.notNull(extensions, "'extensions' must noy be null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check spelling 'noy'
@@ -0,0 +1,4 @@ | |||
/** | |||
* Provides classes supporting for Cloud Events. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'supporting for' to 'support' or just 'supporting'
Fixes spring-projects#3103 * Add an `io.cloudevents:cloudevents-api` optional dependency * Introduce a `HeaderMapper` and `Marshallers` in the `support.cloudevents` to marshal `CloudEvent` instances * Introduce a `ToCloudEventTransformer` to build a `CloudEvent` instance from a `Message` and optional marshaling logic if necessary. Such a transformer could be used as a general purpose CE protocol binder before sending a result message into the target protocol channel adapter
…ent headers in the message * Introduce a `ContentTypeDelegatingDataMarshaller` based on the `org.springframework.core.codec.Encoder` abstraction to delegate * Use `ContentTypeDelegatingDataMarshaller` from the `ToCloudEventTransformer` * Modify `ToCloudEventTransformerTests` to use constants from the `CloudEventHeaders` and verify that `text/plain` marshalling works well for cloud events
Fixes #3103
io.cloudevents:cloudevents-api
optional dependencyHeaderMapper
andMarshallers
in thesupport.cloudevents
to marshal
CloudEvent
instancesToCloudEventTransformer
to build aCloudEvent
instancefrom a
Message
and optional marshaling logic if necessary.Such a transformer could be used as a general purpose CE protocol binder
before sending a result message into the target protocol channel adapter