Skip to content

Commit

Permalink
Merge pull request #364 from christophd/issue/359/cloud-events
Browse files Browse the repository at this point in the history
fix(#359): Improve Cloud events send/receive
  • Loading branch information
christophd authored Jan 12, 2022
2 parents 4d2ffbe + 782d685 commit 8f5ae7a
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import com.consol.citrus.annotations.CitrusFramework;
import com.consol.citrus.annotations.CitrusResource;
import com.consol.citrus.context.TestContext;
import com.consol.citrus.http.message.HttpMessage;
import com.consol.citrus.message.MessageType;
import io.cucumber.datatable.DataTable;
import io.cucumber.java.Before;
import io.cucumber.java.Scenario;
import io.cucumber.java.en.Given;
import io.cucumber.java.en.Then;
import org.citrusframework.yaks.knative.ce.CloudEventMessage;
import org.citrusframework.yaks.knative.ce.CloudEventSupport;
import org.citrusframework.yaks.kubernetes.KubernetesSteps;
import org.springframework.http.HttpStatus;
Expand Down Expand Up @@ -90,12 +90,12 @@ public void setEventData(String data) {

@Then("^(?:receive|verify) Knative event$")
public void receiveEvent(DataTable attributes) {
receiveEvent(CloudEventSupport.createEventRequest(eventData, attributes.asMap(String.class, String.class)));
receiveEvent(CloudEventSupport.createEventMessage(eventData, attributes.asMap(String.class, String.class)));
}

@Then("^(?:receive|verify) Knative event as json$")
public void receiveEventJson(String json) {
receiveEvent(CloudEventSupport.createEventRequest(eventData, CloudEventSupport.attributesFromJson(json)));
receiveEvent(CloudEventSupport.createEventMessage(eventData, CloudEventSupport.attributesFromJson(json)));
}

@Given("^create Knative event consumer service ([^\\s]+)$")
Expand All @@ -112,8 +112,8 @@ public void createService(String serviceName, String targetPort) {
* Receives cloud event as Http request.
* @param request
*/
private void receiveEvent(HttpMessage request) {
kubernetesSteps.receiveServiceRequest(request, MessageType.JSON);
private void receiveEvent(CloudEventMessage request) {
kubernetesSteps.receiveServiceRequest(request, MessageType.valueOf(request.getType()));
kubernetesSteps.sendServiceResponse(HttpStatus.ACCEPTED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.consol.citrus.http.actions.HttpClientRequestActionBuilder;
import com.consol.citrus.http.client.HttpClient;
import com.consol.citrus.http.client.HttpClientBuilder;
import com.consol.citrus.http.message.HttpMessage;
import io.cucumber.datatable.DataTable;
import io.cucumber.java.Before;
import io.cucumber.java.Scenario;
Expand All @@ -42,6 +41,7 @@
import org.apache.http.conn.ssl.TrustAllStrategy;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.ssl.SSLContexts;
import org.citrusframework.yaks.knative.ce.CloudEventMessage;
import org.citrusframework.yaks.knative.ce.CloudEventSupport;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
Expand Down Expand Up @@ -118,23 +118,35 @@ public void setEventData(String data) {

@When("^(?:create|send) Knative event$")
public void createEvent(DataTable attributes) {
sendEventRequest(CloudEventSupport.createEventRequest(eventData, attributes.asMap(String.class, String.class)));
sendEvent(CloudEventSupport.createEventMessage(eventData, attributes.asMap(String.class, String.class)));
}

@When("^(?:create|send) Knative event as json$")
public void createEventJson(String json) {
sendEventRequest(CloudEventSupport.createEventRequest(eventData, CloudEventSupport.attributesFromJson(json)));
sendEvent(CloudEventSupport.createEventMessage(eventData, CloudEventSupport.attributesFromJson(json)));
}

/**
* Sends event request as Http request and verify accepted response.
* @param request
*/
private void sendEventRequest(HttpMessage request) {
private void sendEvent(CloudEventMessage request) {
if (Objects.isNull(request.getContentType())) {
request.contentType(MediaType.APPLICATION_JSON_VALUE);
}

if (request.getEventId() == null) {
request.eventId("yaks-test-event");
}

if (request.getEventType() == null) {
request.eventType("yaks-test");
}

if (request.getSource() == null) {
request.source("yaks-test-source");
}

request.setHeader("Host", KnativeSettings.getBrokerHost());

HttpClientRequestActionBuilder.HttpMessageBuilderSupport requestBuilder = http().client(httpClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ public static CloudEvent v1_0() {
return new CloudEvent(
"1.0",
Arrays.asList(
Attribute.create("Ce-Id", "id", "yaks-test-event"),
Attribute.create("Ce-Source", "source", "yaks-test-source"),
Attribute.create("Ce-Specversion", "specversion", "1.0"),
Attribute.create("Ce-Type", "type", "yaks-test"),
Attribute.create("Ce-Subject", "subject"),
Attribute.create("Ce-Dataschema", "dataschema"),
Attribute.create("Ce-Time", "time"),
Attribute.create("Content-Type", "datacontenttype")
Attribute.ID,
Attribute.SOURCE,
Attribute.SPEC_VERSION,
Attribute.TYPE,
Attribute.SUBJECT,
Attribute.DATA_SCHEMA,
Attribute.TIME,
Attribute.CONTENT_TYPE
)
);
}
Expand All @@ -66,43 +66,58 @@ public static CloudEvent v1_0() {
* Cloud event attribute with Http header name and Json field name representation. Optional default value
* can be specified.
*/
public interface Attribute {
public enum Attribute {

ID("Ce-Id", "id"),
SOURCE("Ce-Source", "source"),
SPEC_VERSION("Ce-Specversion", "specversion", "1.0"),
TYPE("Ce-Type", "type"),
SUBJECT("Ce-Subject", "subject"),
DATA_SCHEMA("Ce-Dataschema", "dataschema"),
TIME("Ce-Time", "time"),
CONTENT_TYPE("Content-Type", "datacontenttype");

private final String http;
private final String json;
private final String defaultValue;

/**
* The name of the http header.
*/
String http();
public String http() {
return this.http;
}

/**
* The name of the json field.
*/
String json();
public String json() {
return this.json;
}

/**
* Default value if any.
*/
String defaultValue();
public String defaultValue() {
return this.defaultValue;
}

/**
* Checks if this attribute provides a default value.
* @return
*/
public boolean hasDefaultValue() {
return defaultValue != null;
}

static Attribute create(String http, String json) {
return create(http, json, null);
Attribute(String http, String json) {
this(http, json, null);
}

static Attribute create(String http, String json, String defaultValue) {
return new Attribute() {
@Override
public String http() {
return http;
}

@Override
public String json() {
return json;
}

@Override
public String defaultValue() {
return defaultValue;
}
};
Attribute(String http, String json, String defaultValue) {
this.http = http;
this.json = json;
this.defaultValue = defaultValue;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.citrusframework.yaks.knative.ce;

import java.util.HashMap;
import java.util.Map;

import com.consol.citrus.http.message.HttpMessage;

/**
* @author Christoph Deppisch
*/
public class CloudEventMessage extends HttpMessage {

private final Map<CloudEvent.Attribute, Object> attributes = new HashMap<>();

public CloudEventMessage eventId(String id) {
return setAttribute(CloudEvent.Attribute.ID, id);
}

public Object getEventId() {
return getAttribute(CloudEvent.Attribute.ID);
}

public CloudEventMessage eventType(String type) {
return setAttribute(CloudEvent.Attribute.TYPE, type);
}

public Object getEventType() {
return getAttribute(CloudEvent.Attribute.TYPE);
}

public CloudEventMessage specVersion(String version) {
return setAttribute(CloudEvent.Attribute.SPEC_VERSION, version);
}

public Object getSpecVersion() {
return getAttribute(CloudEvent.Attribute.SPEC_VERSION);
}

public CloudEventMessage source(String source) {
return setAttribute(CloudEvent.Attribute.SOURCE, source);
}

public Object getSource() {
return getAttribute(CloudEvent.Attribute.SOURCE);
}

public CloudEventMessage subject(String subject) {
return setAttribute(CloudEvent.Attribute.SUBJECT, subject);
}

public Object getSubject() {
return getAttribute(CloudEvent.Attribute.SUBJECT);
}

public CloudEventMessage time(String time) {
return setAttribute(CloudEvent.Attribute.TIME, time);
}

public Object getTime() {
return getAttribute(CloudEvent.Attribute.TIME);
}

public CloudEventMessage dataSchema(String schema) {
return setAttribute(CloudEvent.Attribute.DATA_SCHEMA, schema);
}

public Object getDataSchema() {
return getAttribute(CloudEvent.Attribute.DATA_SCHEMA);
}

public Object getAttribute(CloudEvent.Attribute attribute) {
return attributes.get(attribute);
}

public CloudEventMessage setAttribute(CloudEvent.Attribute attribute, Object value) {
attributes.put(attribute, value);
header(attribute.http(), value);
return this;
}

public static CloudEventMessage fromEvent(CloudEvent event) {
CloudEventMessage message = new CloudEventMessage();

event.attributes().stream()
.filter(CloudEvent.Attribute::hasDefaultValue)
.forEach(a -> message.setAttribute(a, a.defaultValue()));

return message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import com.consol.citrus.exceptions.CitrusRuntimeException;
import com.consol.citrus.http.message.HttpMessage;
import com.consol.citrus.message.MessageType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -48,8 +47,9 @@ private CloudEventSupport() {
* @param attributes
* @return
*/
public static HttpMessage createEventRequest(String eventData, Map<String, String> attributes) {
HttpMessage request = new HttpMessage();
public static CloudEventMessage createEventMessage(String eventData, Map<String, String> attributes) {
CloudEventMessage request = CloudEventMessage.fromEvent(CloudEvent.v1_0());
request.setType(MessageType.JSON);
request.method(HttpMethod.POST);

if (attributes.containsKey("data")) {
Expand All @@ -58,15 +58,21 @@ public static HttpMessage createEventRequest(String eventData, Map<String, Strin
request.setPayload(eventData);
}

for (CloudEvent.Attribute attribute : CloudEvent.v1_0().attributes()) {
if (attributes.containsKey(attribute.http())) {
request.setHeader(attribute.http(), attributes.get(attribute.http()));
} else if (attributes.containsKey(attribute.json())) {
request.setHeader(attribute.http(), attributes.get(attribute.json()));
} else if (!Objects.isNull(attribute.defaultValue())) {
request.setHeader(attribute.http(), attribute.defaultValue());
}
}
attributes.entrySet()
.stream()
.filter(entry -> !entry.getKey().equals("data"))
.forEach(entry -> {
Optional<CloudEvent.Attribute> attribute = CloudEvent.v1_0().attributes()
.stream()
.filter(a -> a.http().equals(entry.getKey()) || a.json().equals(entry.getKey()))
.findFirst();

if (attribute.isPresent()) {
request.setAttribute(attribute.get(), entry.getValue());
} else {
request.header(entry.getKey(), entry.getValue());
}
});

return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class KnativeTestSteps {

@Given("^create test event$")
public void sendTestEvents(String json) {
HttpMessage eventRequest = CloudEventSupport.createEventRequest("", CloudEventSupport.attributesFromJson(json));
HttpMessage eventRequest = CloudEventSupport.createEventMessage("", CloudEventSupport.attributesFromJson(json));

runner.run(http().client("http://localhost:${knativeServicePort}/")
.send()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ Feature: Knative event consumer
| data | {"msg": "Hello Knative!"} |
Then verify test event accepted

Scenario: Receive event selected attributes
Given expect Knative event data: {"msg": "Hello Knative!"}
When receive Knative event
| type | greeting |
| subject | hello |
Then verify test event accepted

Scenario: Receive event data
Given expect Knative event data: {"msg": "Hello Knative!"}
When receive Knative event
Expand Down

0 comments on commit 8f5ae7a

Please sign in to comment.