Skip to content

Commit

Permalink
Merge pull request #535 from MohamedSabthar/master
Browse files Browse the repository at this point in the history
Migrate the event stream iteration logic to ballerina side.
  • Loading branch information
MohamedSabthar authored Aug 7, 2024
2 parents dac5209 + bb64601 commit ce981e9
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 49 deletions.
4 changes: 1 addition & 3 deletions ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

[ballerina]
dependencies-toml-version = "2"
distribution-version = "2201.10.0-20240803-215800-d7bdc125"
distribution-version = "2201.10.0-20240806-083400-aabac46a"

[[package]]
org = "ballerina"
Expand Down Expand Up @@ -107,7 +107,6 @@ dependencies = [
org = "ballerina"
name = "log"
version = "2.9.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "io"},
{org = "ballerina", name = "jballerina.java"},
Expand Down Expand Up @@ -138,7 +137,6 @@ modules = [
org = "ballerina"
name = "observe"
version = "1.3.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]
Expand Down
56 changes: 56 additions & 0 deletions ballerina/event_stream_writer.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) 2024 WSO2 LLC. (https://www.wso2.com).
//
// WSO2 LLC. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/jballerina.java;
import ballerina/log;

class EventStreamWriter {
stream<byte[], error?> eventStream;

isolated function init(stream<byte[], error?> eventStream) {
self.eventStream = eventStream;
}

isolated function writeEventStream() {
do {
while true {
record {byte[] value;}? event = check self.eventStream.next();
if event is () {
self.closeEventStream();
return;
}
check externWriteEventStreamBytesToOutputStream(self, event.value);
}
} on fail error err {
log:printError("unable to write event stream to wire", err);
self.closeEventStream();
return;
}
}

isolated function closeEventStream() {
error? result = self.eventStream.close();
if result is error {
log:printError("unable to close the stream", result);
}
}
}

isolated function externWriteEventStreamBytesToOutputStream(EventStreamWriter eventStreamWriter, byte[] bytes)
returns error? = @java:Method {
'class: "io.ballerina.stdlib.mime.nativeimpl.MimeEntityBody",
name: "writeEventStreamBytesToOutputStream"
} external;
Original file line number Diff line number Diff line change
Expand Up @@ -309,5 +309,9 @@ public static void setXml(BObject entityObj, BXml xmlContent, BString contentTyp
MimeUtil.setMediaTypeToEntity(entityObj, contentType != null ? contentType.getValue() : APPLICATION_XML);
}

public static Object writeEventStreamBytesToOutputStream(BObject eventStreamWriter, BArray bytes) {
return EntityBodyHandler.writeEventStreamBytesToOutputStream(eventStreamWriter, bytes.getBytes());
}

private MimeEntityBody() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@
import static io.ballerina.stdlib.mime.util.MimeConstants.MESSAGE_DATA_SOURCE;
import static io.ballerina.stdlib.mime.util.MimeConstants.MULTIPART_AS_PRIMARY_TYPE;
import static io.ballerina.stdlib.mime.util.MimeConstants.TEXT_EVENT_STREAM;
import static io.ballerina.stdlib.mime.util.MimeUtil.isNotNullAndEmpty;
import static io.ballerina.stdlib.mime.util.MimeUtil.removeJavaExceptionPrefix;

/**
* Entity body related operations are included here.
Expand All @@ -77,6 +75,9 @@ public class EntityBodyHandler {
private static final Type MIME_ENTITY_TYPE =
TypeUtils.getType(ValueCreator.createObjectValue(MimeUtil.getMimePackage(), ENTITY));
private static final ArrayType mimeEntityArrayType = TypeCreator.createArrayType(MIME_ENTITY_TYPE);
public static final String OUTPUT_STREAM = "output_stream_object";
public static final String WRITE_EVENT_STREAM_METHOD = "writeEventStream";
public static final String EVENT_STREAM_WRITER_OBJECT = "EventStreamWriter";

/**
* Get a byte channel for a given text data.
Expand Down Expand Up @@ -193,9 +194,9 @@ public static Object constructJsonDataSource(BObject entityObj) {
public static Object constructJsonDataSource(BObject entity, InputStream inputStream) {
Object jsonData;
String contentTypeValue = EntityHeaderHandler.getHeaderValue(entity, CONTENT_TYPE);
if (isNotNullAndEmpty(contentTypeValue)) {
if (MimeUtil.isNotNullAndEmpty(contentTypeValue)) {
String charsetValue = MimeUtil.getContentTypeParamValue(contentTypeValue, CHARSET);
if (isNotNullAndEmpty(charsetValue)) {
if (MimeUtil.isNotNullAndEmpty(charsetValue)) {
jsonData = JsonUtils.parse(inputStream, charsetValue);
} else {
jsonData = JsonUtils.parse(inputStream);
Expand Down Expand Up @@ -236,9 +237,9 @@ public static BXml constructXmlDataSource(BObject entityObj) {
public static BXml constructXmlDataSource(BObject entityObj, InputStream inputStream) {
BXml xmlContent;
String contentTypeValue = EntityHeaderHandler.getHeaderValue(entityObj, CONTENT_TYPE);
if (isNotNullAndEmpty(contentTypeValue)) {
if (MimeUtil.isNotNullAndEmpty(contentTypeValue)) {
String charsetValue = MimeUtil.getContentTypeParamValue(contentTypeValue, CHARSET);
if (isNotNullAndEmpty(charsetValue)) {
if (MimeUtil.isNotNullAndEmpty(charsetValue)) {
xmlContent = XmlUtils.parse(inputStream, charsetValue);
} else {
xmlContent = XmlUtils.parse(inputStream);
Expand Down Expand Up @@ -279,9 +280,9 @@ public static BString constructStringDataSource(BObject entityObj) {
public static BString constructStringDataSource(BObject entity, InputStream inputStream) {
BString textContent;
String contentTypeValue = EntityHeaderHandler.getHeaderValue(entity, CONTENT_TYPE);
if (isNotNullAndEmpty(contentTypeValue)) {
if (MimeUtil.isNotNullAndEmpty(contentTypeValue)) {
String charsetValue = MimeUtil.getContentTypeParamValue(contentTypeValue, CHARSET);
if (isNotNullAndEmpty(charsetValue)) {
if (MimeUtil.isNotNullAndEmpty(charsetValue)) {
textContent = StringUtils.getStringFromInputStream(inputStream, charsetValue);
} else {
textContent = StringUtils.getStringFromInputStream(inputStream);
Expand Down Expand Up @@ -408,7 +409,7 @@ public void notifySuccess(Object result) {
latch.countDown();
throw ErrorCreator.createError(StringUtils.fromString(
"Error occurred while writing the stream content: "
+ removeJavaExceptionPrefix(e.getMessage())));
+ MimeUtil.removeJavaExceptionPrefix(e.getMessage())));
}
writeContent(env, entity, outputStream, iteratorObj, latch);
}
Expand All @@ -432,43 +433,40 @@ public void notifyFailure(BError bError) {
public static void writeEventStreamToOutputStream(Environment env, BObject entity, OutputStream outputStream) {
BStream eventByteStream = EntityBodyHandler.getEventStream(entity);
if (eventByteStream != null) {
BObject iteratorObj = eventByteStream.getIteratorObj();
writeEvent(env, entity, outputStream, iteratorObj);
BObject eventStreamWriter = ValueCreator.createObjectValue(MimeUtil.getMimePackage(),
EVENT_STREAM_WRITER_OBJECT, eventByteStream);
eventStreamWriter.addNativeData(ENTITY, entity);
eventStreamWriter.addNativeData(OUTPUT_STREAM, outputStream);
writeEvent(env, eventStreamWriter);
}
}

private static void writeEvent(Environment env, BObject entity, OutputStream outputStream,
BObject iteratorObj) {
env.getRuntime().invokeMethodAsyncConcurrently(iteratorObj, BYTE_STREAM_NEXT_FUNC, null, null, new Callback() {
@Override
public void notifySuccess(Object result) {
if (result == null) {
entity.addNativeData(ENTITY_BYTE_STREAM, null);
EntityBodyHandler.closeMessageOutputStream(outputStream);
return;
}
if (result instanceof BError error) {
entity.addNativeData(ENTITY_BYTE_STREAM, null);
this.notifyFailure(error);
}
try {
writeContentPart((BMap) result, outputStream);
} catch (Exception e) {
EntityBodyHandler.closeMessageOutputStream(outputStream);
throw ErrorCreator.createError(StringUtils.fromString(
"Error occurred while writing the stream content: "
+ removeJavaExceptionPrefix(e.getMessage())));
}
writeEvent(env, entity, outputStream, iteratorObj);
}

@Override
public void notifyFailure(BError bError) {
EntityBodyHandler.closeMessageOutputStream(outputStream);
throw ErrorCreator.createError(StringUtils.fromString(
"Error occurred while streaming content: " + bError.getMessage()));
}
}, null, null);
private static void writeEvent(Environment env, BObject eventStreamWriter) {
env.getRuntime().invokeMethodAsyncConcurrently(eventStreamWriter, WRITE_EVENT_STREAM_METHOD, null, null,
new Callback() {
@Override
public void notifySuccess(Object result) {
BObject entity = (BObject) eventStreamWriter.getNativeData(ENTITY);
OutputStream outputStream = (OutputStream) eventStreamWriter.getNativeData(OUTPUT_STREAM);
if (result == null) {
entity.addNativeData(ENTITY_BYTE_STREAM, null);
EntityBodyHandler.closeMessageOutputStream(outputStream);
return;
}
if (result instanceof BError error) {
entity.addNativeData(ENTITY_BYTE_STREAM, null);
throw error;
}
}

@Override
public void notifyFailure(BError bError) {
OutputStream outputStream = (OutputStream) eventStreamWriter.getNativeData(OUTPUT_STREAM);
EntityBodyHandler.closeMessageOutputStream(outputStream);
throw ErrorCreator.createError(StringUtils.fromString(
"Error occurred while streaming content: " + bError.getMessage()));
}
}, null, null);
}

private static void closeMessageOutputStream(OutputStream messageOutputStream) {
Expand All @@ -483,13 +481,26 @@ private static void closeMessageOutputStream(OutputStream messageOutputStream) {

private static void writeContentPart(BMap part, OutputStream outputStream) {
BArray arrayValue = part.getArrayValue(FIELD_VALUE);
byte[] bytes = arrayValue.getBytes();
writeContentPart(arrayValue.getBytes(), outputStream);
}

private static void writeContentPart(byte[] bytes, OutputStream outputStream) {
try (ByteArrayInputStream str = new ByteArrayInputStream(bytes)) {
MimeUtil.writeInputToOutputStream(str, outputStream);
} catch (IOException e) {
throw ErrorCreator.createError(StringUtils.fromString(
"Error occurred while writing content parts to outputstream: " + e.getMessage()));
"Error occurred while writing content parts to output stream: " + e.getMessage()));
}
}

public static Object writeEventStreamBytesToOutputStream(BObject eventStreamWriter, byte[] bytes) {
OutputStream outputStream = (OutputStream) eventStreamWriter.getNativeData(OUTPUT_STREAM);
try {
writeContentPart(bytes, outputStream);
} catch (Exception e) {
return ErrorCreator.createError(StringUtils.fromString(MimeUtil.removeJavaExceptionPrefix(e.getMessage())));
}
return null;
}

/**
Expand All @@ -501,7 +512,7 @@ private static void writeContentPart(BMap part, OutputStream outputStream) {
*/
public static void decodeEntityBody(BObject entityObj, Channel byteChannel) throws IOException {
String contentType = MimeUtil.getContentTypeWithParameters(entityObj);
if (!isNotNullAndEmpty(contentType) || !contentType.startsWith(MULTIPART_AS_PRIMARY_TYPE)) {
if (!MimeUtil.isNotNullAndEmpty(contentType) || !contentType.startsWith(MULTIPART_AS_PRIMARY_TYPE)) {
return;
}
try {
Expand Down

0 comments on commit ce981e9

Please sign in to comment.