Skip to content

Commit

Permalink
Merge pull request #528 from MohamedSabthar/event-stream
Browse files Browse the repository at this point in the history
Add native methods to handle text/event-stream content type
  • Loading branch information
MohamedSabthar authored Jul 16, 2024
2 parents abf505a + 57d9281 commit d4e9f7a
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 20 deletions.
8 changes: 4 additions & 4 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "ballerina"
name = "mime"
version = "2.9.1"
version = "2.10.0"
authors = ["Ballerina"]
keywords = ["mime", "multipart", "entity"]
repository = "https://github.com/ballerina-platform/module-ballerina-mime"
Expand All @@ -15,11 +15,11 @@ graalvmCompatible = true
[[platform.java17.dependency]]
groupId = "io.ballerina.stdlib"
artifactId = "mime-native"
version = "2.9.1"
path = "../native/build/libs/mime-native-2.9.1-SNAPSHOT.jar"
version = "2.10.0"
path = "../native/build/libs/mime-native-2.10.0-SNAPSHOT.jar"

[[platform.java17.dependency]]
path = "../test-utils/build/libs/mime-test-utils-2.9.1-SNAPSHOT.jar"
path = "../test-utils/build/libs/mime-test-utils-2.10.0-SNAPSHOT.jar"
scope = "testOnly"

[[platform.java17.dependency]]
Expand Down
4 changes: 2 additions & 2 deletions ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ modules = [
[[package]]
org = "ballerina"
name = "mime"
version = "2.9.1"
version = "2.10.0"
dependencies = [
{org = "ballerina", name = "io"},
{org = "ballerina", name = "jballerina.java"},
Expand All @@ -127,7 +127,7 @@ modules = [
[[package]]
org = "ballerina"
name = "observe"
version = "1.2.2"
version = "1.2.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
Expand Down
3 changes: 3 additions & 0 deletions ballerina/media_types.bal
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,6 @@ public const string TEXT_HTML = "text/html";

# Represents the `text/xml` media type.
public const string TEXT_XML = "text/xml";

# Represents the `text/event-stream` media type.
public const string TEXT_EVENT_STREAM= "text/event-stream";
9 changes: 6 additions & 3 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased]

### Added
- [Add MIME entity methods to handle server sent events](https://github.com/ballerina-platform/ballerina-library/issues/6739)

## Changed
- [Make some of the Java classes proper utility classes](https://github.com/ballerina-platform/ballerina-standard-library/issues/4940)

## [2.9.0] - 2023-09-15

## Changed
### Changed
- [Change `com.sun.activation:jakarta.activation` to `jakarta.activation:jakarta.activation-api`](https://github.com/ballerina-platform/ballerina-standard-library/issues/4789)

## [2.6.0] - 2022-02-20

## Fixed
### Fixed
- [Binary payload retrieved from the `http:Request` has different content-length than the original payload](https://github.com/ballerina-platform/ballerina-standard-library/issues/3662)

## [2.5.0] - 2022-11-29
Expand All @@ -26,7 +29,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [2.0.0] - 2021-10-10

## Fixed
### Fixed
- [Unable to get more than 8kb size of byte[] chunks in http byte[] stream](https://github.com/ballerina-platform/ballerina-standard-library/issues/2002)

## [1.1.0-alpha6] - 2021-04-02
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
org.gradle.caching=true
group=io.ballerina.stdlib
version=2.9.1-SNAPSHOT
version=2.10.0-SNAPSHOT
ballerinaLangVersion=2201.8.0

mimepullVersion=1.9.11
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static io.ballerina.stdlib.mime.util.MimeConstants.FIRST_BODY_PART_INDEX;
import static io.ballerina.stdlib.mime.util.MimeConstants.MESSAGE_DATA_SOURCE;
import static io.ballerina.stdlib.mime.util.MimeConstants.MULTIPART_AS_PRIMARY_TYPE;
import static io.ballerina.stdlib.mime.util.MimeConstants.TEXT_EVENT_STREAM;
import static io.ballerina.stdlib.mime.util.MimeUtil.isNotNullAndEmpty;

/**
Expand Down Expand Up @@ -388,22 +389,15 @@ public static void writeByteStreamToOutputStream(Environment env, BObject entity

private static void writeContent(Environment env, BObject entity, OutputStream outputStream,
BObject iteratorObj, CountDownLatch latch) {
env.getRuntime().invokeMethodAsync(iteratorObj, BYTE_STREAM_NEXT_FUNC, null, null, new Callback() {
env.getRuntime().invokeMethodAsyncConcurrently(iteratorObj, BYTE_STREAM_NEXT_FUNC, null, null, new Callback() {
@Override
public void notifySuccess(Object result) {
if (result == null) {
entity.addNativeData(ENTITY_BYTE_STREAM, null);
latch.countDown();
return;
}
BArray arrayValue = ((BMap) result).getArrayValue(FIELD_VALUE);
byte[] bytes = arrayValue.getBytes();
try (ByteArrayInputStream str = new ByteArrayInputStream(bytes)) {
MimeUtil.writeInputToOutputStream(str, outputStream);
} catch (IOException e) {
throw ErrorCreator.createError(StringUtils.fromString(
"Error occurred while writing content parts to outputstream: " + e.getMessage()));
}
writeContentPart((BMap) result, outputStream);
writeContent(env, entity, outputStream, iteratorObj, latch);
}

Expand All @@ -413,7 +407,66 @@ public void notifyFailure(BError bError) {
throw ErrorCreator.createError(StringUtils.fromString(
"Error occurred while streaming content: " + bError.getMessage()));
}
});
}, null, null);
}

/**
* Write event-stream directly to the output-stream without converting it to a data source.
*
* @param env the environment of the resource invoked
* @param entity Represent a ballerina entity
* @param outputStream Represent the output-stream that the message should be written to
*/
public static void writeEventStreamToOutputStream(Environment env, BObject entity, OutputStream outputStream) {
BStream eventByteStream = EntityBodyHandler.getEventStream(entity);
if (eventByteStream != null) {
BObject iteratorObj = eventByteStream.getIteratorObj();
writeEvent(env, entity, outputStream, iteratorObj);
}
}

private static void writeEvent(Environment env, BObject entity, OutputStream outputStream,
BObject iteratorObj) {
env.getRuntime().invokeMethodAsyncConcurrently(iteratorObj, BYTE_STREAM_NEXT_FUNC, null, null, new Callback() {
@Override
public void notifySuccess(Object result) {
if (result == null) {
entity.addNativeData(ENTITY_BYTE_STREAM, null);
EntityBodyHandler.closeMessageOutputStream(outputStream);
return;
}
writeContentPart((BMap) result, outputStream);
writeEvent(env, entity, outputStream, iteratorObj);
}

@Override
public void notifyFailure(BError bError) {
EntityBodyHandler.closeMessageOutputStream(outputStream);
throw ErrorCreator.createError(StringUtils.fromString(
"Error occurred while streaming content: " + bError.getMessage()));
}
}, null, null);
}

private static void closeMessageOutputStream(OutputStream messageOutputStream) {
try {
if (messageOutputStream != null) {
messageOutputStream.close();
}
} catch (IOException e) {
log.error("Couldn't close message output stream", e);
}
}

private static void writeContentPart(BMap part, OutputStream outputStream) {
BArray arrayValue = part.getArrayValue(FIELD_VALUE);
byte[] bytes = arrayValue.getBytes();
try (ByteArrayInputStream str = new ByteArrayInputStream(bytes)) {
MimeUtil.writeInputToOutputStream(str, outputStream);
} catch (IOException e) {
throw ErrorCreator.createError(StringUtils.fromString(
"Error occurred while writing content parts to outputstream: " + e.getMessage()));
}
}

/**
Expand Down Expand Up @@ -456,6 +509,17 @@ public static BStream getByteStream(BObject entityObj) {
(ENTITY_BYTE_STREAM) : null;
}

/**
* Obtains the byte stream if the content type is text/event-stream.
*
* @param entityObj Represent a ballerina entity
* @return A Ballerina byte stream
*/
public static BStream getEventStream(BObject entityObj) {
String contentType = MimeUtil.getContentTypeWithParameters(entityObj);
return contentType.startsWith(TEXT_EVENT_STREAM) ? getByteStream(entityObj) : null;
}

public static void closeByteChannel(Channel byteChannel) {
try {
byteChannel.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ private MimeConstants() {}
*/
public static final String MULTIPART_MIXED = "multipart/mixed";

/**
* Content-type text/event-stream.
*/
public static final String TEXT_EVENT_STREAM = "text/event-stream";

public static final String JSON_SUFFIX = "+json";

public static final String JSON_TYPE_IDENTIFIER = "/json";
Expand Down

0 comments on commit d4e9f7a

Please sign in to comment.