diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index b2f9022..f1ea0e4 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -5,7 +5,7 @@ [ballerina] dependencies-toml-version = "2" -distribution-version = "2201.10.0-20240803-215800-d7bdc125" +distribution-version = "2201.10.0-20240806-083400-aabac46a" [[package]] org = "ballerina" @@ -107,7 +107,6 @@ dependencies = [ org = "ballerina" name = "log" version = "2.9.0" -scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, {org = "ballerina", name = "jballerina.java"}, @@ -138,7 +137,6 @@ modules = [ org = "ballerina" name = "observe" version = "1.3.0" -scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] diff --git a/ballerina/event_stream_writer.bal b/ballerina/event_stream_writer.bal new file mode 100644 index 0000000..b25ac06 --- /dev/null +++ b/ballerina/event_stream_writer.bal @@ -0,0 +1,56 @@ +// Copyright (c) 2024 WSO2 LLC. (https://www.wso2.com). +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/jballerina.java; +import ballerina/log; + +class EventStreamWriter { + stream eventStream; + + isolated function init(stream eventStream) { + self.eventStream = eventStream; + } + + isolated function writeEventStream() { + do { + while true { + record {byte[] value;}? event = check self.eventStream.next(); + if event is () { + self.closeEventStream(); + return; + } + check externWriteEventStreamBytesToOutputStream(self, event.value); + } + } on fail error err { + log:printError("unable to write event stream to wire", err); + self.closeEventStream(); + return; + } + } + + isolated function closeEventStream() { + error? result = self.eventStream.close(); + if result is error { + log:printError("unable to close the stream", result); + } + } +} + +isolated function externWriteEventStreamBytesToOutputStream(EventStreamWriter eventStreamWriter, byte[] bytes) +returns error? = @java:Method { + 'class: "io.ballerina.stdlib.mime.nativeimpl.MimeEntityBody", + name: "writeEventStreamBytesToOutputStream" +} external; diff --git a/native/src/main/java/io/ballerina/stdlib/mime/nativeimpl/MimeEntityBody.java b/native/src/main/java/io/ballerina/stdlib/mime/nativeimpl/MimeEntityBody.java index 8a62f54..a25a7a5 100644 --- a/native/src/main/java/io/ballerina/stdlib/mime/nativeimpl/MimeEntityBody.java +++ b/native/src/main/java/io/ballerina/stdlib/mime/nativeimpl/MimeEntityBody.java @@ -309,5 +309,9 @@ public static void setXml(BObject entityObj, BXml xmlContent, BString contentTyp MimeUtil.setMediaTypeToEntity(entityObj, contentType != null ? contentType.getValue() : APPLICATION_XML); } + public static Object writeEventStreamBytesToOutputStream(BObject eventStreamWriter, BArray bytes) { + return EntityBodyHandler.writeEventStreamBytesToOutputStream(eventStreamWriter, bytes.getBytes()); + } + private MimeEntityBody() {} } diff --git a/native/src/main/java/io/ballerina/stdlib/mime/util/EntityBodyHandler.java b/native/src/main/java/io/ballerina/stdlib/mime/util/EntityBodyHandler.java index 1369a29..a101286 100644 --- a/native/src/main/java/io/ballerina/stdlib/mime/util/EntityBodyHandler.java +++ b/native/src/main/java/io/ballerina/stdlib/mime/util/EntityBodyHandler.java @@ -63,8 +63,6 @@ import static io.ballerina.stdlib.mime.util.MimeConstants.MESSAGE_DATA_SOURCE; import static io.ballerina.stdlib.mime.util.MimeConstants.MULTIPART_AS_PRIMARY_TYPE; import static io.ballerina.stdlib.mime.util.MimeConstants.TEXT_EVENT_STREAM; -import static io.ballerina.stdlib.mime.util.MimeUtil.isNotNullAndEmpty; -import static io.ballerina.stdlib.mime.util.MimeUtil.removeJavaExceptionPrefix; /** * Entity body related operations are included here. @@ -77,6 +75,9 @@ public class EntityBodyHandler { private static final Type MIME_ENTITY_TYPE = TypeUtils.getType(ValueCreator.createObjectValue(MimeUtil.getMimePackage(), ENTITY)); private static final ArrayType mimeEntityArrayType = TypeCreator.createArrayType(MIME_ENTITY_TYPE); + public static final String OUTPUT_STREAM = "output_stream_object"; + public static final String WRITE_EVENT_STREAM_METHOD = "writeEventStream"; + public static final String EVENT_STREAM_WRITER_OBJECT = "EventStreamWriter"; /** * Get a byte channel for a given text data. @@ -193,9 +194,9 @@ public static Object constructJsonDataSource(BObject entityObj) { public static Object constructJsonDataSource(BObject entity, InputStream inputStream) { Object jsonData; String contentTypeValue = EntityHeaderHandler.getHeaderValue(entity, CONTENT_TYPE); - if (isNotNullAndEmpty(contentTypeValue)) { + if (MimeUtil.isNotNullAndEmpty(contentTypeValue)) { String charsetValue = MimeUtil.getContentTypeParamValue(contentTypeValue, CHARSET); - if (isNotNullAndEmpty(charsetValue)) { + if (MimeUtil.isNotNullAndEmpty(charsetValue)) { jsonData = JsonUtils.parse(inputStream, charsetValue); } else { jsonData = JsonUtils.parse(inputStream); @@ -236,9 +237,9 @@ public static BXml constructXmlDataSource(BObject entityObj) { public static BXml constructXmlDataSource(BObject entityObj, InputStream inputStream) { BXml xmlContent; String contentTypeValue = EntityHeaderHandler.getHeaderValue(entityObj, CONTENT_TYPE); - if (isNotNullAndEmpty(contentTypeValue)) { + if (MimeUtil.isNotNullAndEmpty(contentTypeValue)) { String charsetValue = MimeUtil.getContentTypeParamValue(contentTypeValue, CHARSET); - if (isNotNullAndEmpty(charsetValue)) { + if (MimeUtil.isNotNullAndEmpty(charsetValue)) { xmlContent = XmlUtils.parse(inputStream, charsetValue); } else { xmlContent = XmlUtils.parse(inputStream); @@ -279,9 +280,9 @@ public static BString constructStringDataSource(BObject entityObj) { public static BString constructStringDataSource(BObject entity, InputStream inputStream) { BString textContent; String contentTypeValue = EntityHeaderHandler.getHeaderValue(entity, CONTENT_TYPE); - if (isNotNullAndEmpty(contentTypeValue)) { + if (MimeUtil.isNotNullAndEmpty(contentTypeValue)) { String charsetValue = MimeUtil.getContentTypeParamValue(contentTypeValue, CHARSET); - if (isNotNullAndEmpty(charsetValue)) { + if (MimeUtil.isNotNullAndEmpty(charsetValue)) { textContent = StringUtils.getStringFromInputStream(inputStream, charsetValue); } else { textContent = StringUtils.getStringFromInputStream(inputStream); @@ -408,7 +409,7 @@ public void notifySuccess(Object result) { latch.countDown(); throw ErrorCreator.createError(StringUtils.fromString( "Error occurred while writing the stream content: " - + removeJavaExceptionPrefix(e.getMessage()))); + + MimeUtil.removeJavaExceptionPrefix(e.getMessage()))); } writeContent(env, entity, outputStream, iteratorObj, latch); } @@ -432,43 +433,40 @@ public void notifyFailure(BError bError) { public static void writeEventStreamToOutputStream(Environment env, BObject entity, OutputStream outputStream) { BStream eventByteStream = EntityBodyHandler.getEventStream(entity); if (eventByteStream != null) { - BObject iteratorObj = eventByteStream.getIteratorObj(); - writeEvent(env, entity, outputStream, iteratorObj); + BObject eventStreamWriter = ValueCreator.createObjectValue(MimeUtil.getMimePackage(), + EVENT_STREAM_WRITER_OBJECT, eventByteStream); + eventStreamWriter.addNativeData(ENTITY, entity); + eventStreamWriter.addNativeData(OUTPUT_STREAM, outputStream); + writeEvent(env, eventStreamWriter); } } - private static void writeEvent(Environment env, BObject entity, OutputStream outputStream, - BObject iteratorObj) { - env.getRuntime().invokeMethodAsyncConcurrently(iteratorObj, BYTE_STREAM_NEXT_FUNC, null, null, new Callback() { - @Override - public void notifySuccess(Object result) { - if (result == null) { - entity.addNativeData(ENTITY_BYTE_STREAM, null); - EntityBodyHandler.closeMessageOutputStream(outputStream); - return; - } - if (result instanceof BError error) { - entity.addNativeData(ENTITY_BYTE_STREAM, null); - this.notifyFailure(error); - } - try { - writeContentPart((BMap) result, outputStream); - } catch (Exception e) { - EntityBodyHandler.closeMessageOutputStream(outputStream); - throw ErrorCreator.createError(StringUtils.fromString( - "Error occurred while writing the stream content: " - + removeJavaExceptionPrefix(e.getMessage()))); - } - writeEvent(env, entity, outputStream, iteratorObj); - } - - @Override - public void notifyFailure(BError bError) { - EntityBodyHandler.closeMessageOutputStream(outputStream); - throw ErrorCreator.createError(StringUtils.fromString( - "Error occurred while streaming content: " + bError.getMessage())); - } - }, null, null); + private static void writeEvent(Environment env, BObject eventStreamWriter) { + env.getRuntime().invokeMethodAsyncConcurrently(eventStreamWriter, WRITE_EVENT_STREAM_METHOD, null, null, + new Callback() { + @Override + public void notifySuccess(Object result) { + BObject entity = (BObject) eventStreamWriter.getNativeData(ENTITY); + OutputStream outputStream = (OutputStream) eventStreamWriter.getNativeData(OUTPUT_STREAM); + if (result == null) { + entity.addNativeData(ENTITY_BYTE_STREAM, null); + EntityBodyHandler.closeMessageOutputStream(outputStream); + return; + } + if (result instanceof BError error) { + entity.addNativeData(ENTITY_BYTE_STREAM, null); + throw error; + } + } + + @Override + public void notifyFailure(BError bError) { + OutputStream outputStream = (OutputStream) eventStreamWriter.getNativeData(OUTPUT_STREAM); + EntityBodyHandler.closeMessageOutputStream(outputStream); + throw ErrorCreator.createError(StringUtils.fromString( + "Error occurred while streaming content: " + bError.getMessage())); + } + }, null, null); } private static void closeMessageOutputStream(OutputStream messageOutputStream) { @@ -483,13 +481,26 @@ private static void closeMessageOutputStream(OutputStream messageOutputStream) { private static void writeContentPart(BMap part, OutputStream outputStream) { BArray arrayValue = part.getArrayValue(FIELD_VALUE); - byte[] bytes = arrayValue.getBytes(); + writeContentPart(arrayValue.getBytes(), outputStream); + } + + private static void writeContentPart(byte[] bytes, OutputStream outputStream) { try (ByteArrayInputStream str = new ByteArrayInputStream(bytes)) { MimeUtil.writeInputToOutputStream(str, outputStream); } catch (IOException e) { throw ErrorCreator.createError(StringUtils.fromString( - "Error occurred while writing content parts to outputstream: " + e.getMessage())); + "Error occurred while writing content parts to output stream: " + e.getMessage())); + } + } + + public static Object writeEventStreamBytesToOutputStream(BObject eventStreamWriter, byte[] bytes) { + OutputStream outputStream = (OutputStream) eventStreamWriter.getNativeData(OUTPUT_STREAM); + try { + writeContentPart(bytes, outputStream); + } catch (Exception e) { + return ErrorCreator.createError(StringUtils.fromString(MimeUtil.removeJavaExceptionPrefix(e.getMessage()))); } + return null; } /** @@ -501,7 +512,7 @@ private static void writeContentPart(BMap part, OutputStream outputStream) { */ public static void decodeEntityBody(BObject entityObj, Channel byteChannel) throws IOException { String contentType = MimeUtil.getContentTypeWithParameters(entityObj); - if (!isNotNullAndEmpty(contentType) || !contentType.startsWith(MULTIPART_AS_PRIMARY_TYPE)) { + if (!MimeUtil.isNotNullAndEmpty(contentType) || !contentType.startsWith(MULTIPART_AS_PRIMARY_TYPE)) { return; } try {