diff --git a/code/exercise_013_Listen_and_Notify/README.md b/code/exercise_013_Listen_and_Notify/README.md index 5cfd0d1..230c878 100644 --- a/code/exercise_013_Listen_and_Notify/README.md +++ b/code/exercise_013_Listen_and_Notify/README.md @@ -4,7 +4,61 @@ In this exercise, we will play a bit with Postgres’ `LISTEN / NOTIFY` feature, The `LISTEN / NOTIFY` feature of Postgres allows you to setup a connection to Postgres, and listen for evens that pass by on a channel, as well as notifying such channels. With the reactive sql clients, we can connect to these channels as a `Multi` in Quarkus. -* Get the code for `ListenNotifyResource` by executing this command from the command line: `cmtc pull-template src/main/java/com/lunatech/training/quarkus/ListenNotifyResource.java `. +* Create the class `ListenNotifyResource` from the following template: + +```java +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.vertx.core.json.JsonObject; +import io.vertx.mutiny.pgclient.PgConnection; +import io.vertx.mutiny.pgclient.PgPool; +import io.vertx.pgclient.PgNotification; +import org.jboss.resteasy.reactive.RestStreamElementType; + +import jakarta.inject.Inject; +import jakarta.ws.rs.*; +import jakarta.ws.rs.core.MediaType; + +@Path("/channel") +public class ListenNotifyResource { + + @Inject + PgPool client; + + @Path("{channel}") + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + @RestStreamElementType(MediaType.APPLICATION_JSON) + public Multi listen(@PathParam("channel") String channel) { + return client + .getConnection() + .map(PgConnection::cast) + .toMulti() + .flatMap(connection -> + connection.query("LISTEN " + channel) + .execute() + .toMulti() + .flatMap(__ -> streamNotifications(connection)) + ) + .map(PgNotification::toJson); + } + + @Path("{channel}") + @POST + @Produces(MediaType.TEXT_PLAIN) + @Consumes(MediaType.WILDCARD) + public Uni notify(@PathParam("channel") String channel, String stuff) { + return client.preparedQuery("NOTIFY " + channel + ", $$" + stuff + "$$") + .execute() + .map(rs -> "Posted to " + channel + " channel"); + } + + // Use PgConnection::notificationHandler to register a handler that emits PgNotification values on a Multi stream + private Multi streamNotifications(PgConnection connection) { + // To be implemented + } +} +``` * Connect to the channel `milkshakes` using the following cURL command. diff --git a/code/exercise_013_Listen_and_Notify/src/main/java/com/lunatech/training/quarkus/ListenNotifyResource.java b/code/exercise_013_Listen_and_Notify/src/main/java/com/lunatech/training/quarkus/ListenNotifyResource.java index 394b09f..d44a5d0 100644 --- a/code/exercise_013_Listen_and_Notify/src/main/java/com/lunatech/training/quarkus/ListenNotifyResource.java +++ b/code/exercise_013_Listen_and_Notify/src/main/java/com/lunatech/training/quarkus/ListenNotifyResource.java @@ -5,7 +5,6 @@ import io.vertx.core.json.JsonObject; import io.vertx.mutiny.pgclient.PgConnection; import io.vertx.mutiny.pgclient.PgPool; -import io.vertx.mutiny.sqlclient.SqlConnection; import io.vertx.pgclient.PgNotification; import org.jboss.resteasy.reactive.RestStreamElementType; @@ -26,16 +25,14 @@ public class ListenNotifyResource { public Multi listen(@PathParam("channel") String channel) { return client .getConnection() + .map(PgConnection::cast) .toMulti() - .flatMap(connection -> { - Multi notifications = Multi.createFrom(). - emitter(c -> toPgConnection(connection).notificationHandler(c::emit)); - - return connection.query("LISTEN " + channel) - .execute() - .toMulti() - .flatMap(__ -> notifications); - }) + .flatMap(connection -> + connection.query("LISTEN " + channel) + .execute() + .toMulti() + .flatMap(__ -> streamNotifications(connection)) + ) .map(PgNotification::toJson); } @@ -43,14 +40,15 @@ public Multi listen(@PathParam("channel") String channel) { @POST @Produces(MediaType.TEXT_PLAIN) @Consumes(MediaType.WILDCARD) - public Uni notif(@PathParam("channel") String channel, String stuff) { + public Uni notify(@PathParam("channel") String channel, String stuff) { return client.preparedQuery("NOTIFY " + channel + ", $$" + stuff + "$$") .execute() .map(rs -> "Posted to " + channel + " channel"); } - // We have to do some type juggling here. Solved in the mutiny client v2. - PgConnection toPgConnection(SqlConnection sqlConnection) { - return new PgConnection((io.vertx.pgclient.PgConnection) sqlConnection.getDelegate()); + // Use PgConnection::notificationHandler to register a handler that emits PgNotification values on a Multi stream + private Multi streamNotifications(PgConnection connection) { + return Multi.createFrom() + .emitter(multiEmitter -> connection.notificationHandler(multiEmitter::emit)); } } diff --git a/code/exercise_014_Internal_Channels/src/main/java/com/lunatech/training/quarkus/ListenNotifyResource.java b/code/exercise_014_Internal_Channels/src/main/java/com/lunatech/training/quarkus/ListenNotifyResource.java index 394b09f..d44a5d0 100644 --- a/code/exercise_014_Internal_Channels/src/main/java/com/lunatech/training/quarkus/ListenNotifyResource.java +++ b/code/exercise_014_Internal_Channels/src/main/java/com/lunatech/training/quarkus/ListenNotifyResource.java @@ -5,7 +5,6 @@ import io.vertx.core.json.JsonObject; import io.vertx.mutiny.pgclient.PgConnection; import io.vertx.mutiny.pgclient.PgPool; -import io.vertx.mutiny.sqlclient.SqlConnection; import io.vertx.pgclient.PgNotification; import org.jboss.resteasy.reactive.RestStreamElementType; @@ -26,16 +25,14 @@ public class ListenNotifyResource { public Multi listen(@PathParam("channel") String channel) { return client .getConnection() + .map(PgConnection::cast) .toMulti() - .flatMap(connection -> { - Multi notifications = Multi.createFrom(). - emitter(c -> toPgConnection(connection).notificationHandler(c::emit)); - - return connection.query("LISTEN " + channel) - .execute() - .toMulti() - .flatMap(__ -> notifications); - }) + .flatMap(connection -> + connection.query("LISTEN " + channel) + .execute() + .toMulti() + .flatMap(__ -> streamNotifications(connection)) + ) .map(PgNotification::toJson); } @@ -43,14 +40,15 @@ public Multi listen(@PathParam("channel") String channel) { @POST @Produces(MediaType.TEXT_PLAIN) @Consumes(MediaType.WILDCARD) - public Uni notif(@PathParam("channel") String channel, String stuff) { + public Uni notify(@PathParam("channel") String channel, String stuff) { return client.preparedQuery("NOTIFY " + channel + ", $$" + stuff + "$$") .execute() .map(rs -> "Posted to " + channel + " channel"); } - // We have to do some type juggling here. Solved in the mutiny client v2. - PgConnection toPgConnection(SqlConnection sqlConnection) { - return new PgConnection((io.vertx.pgclient.PgConnection) sqlConnection.getDelegate()); + // Use PgConnection::notificationHandler to register a handler that emits PgNotification values on a Multi stream + private Multi streamNotifications(PgConnection connection) { + return Multi.createFrom() + .emitter(multiEmitter -> connection.notificationHandler(multiEmitter::emit)); } } diff --git a/code/exercise_015_Connecting_to_Kafka/src/main/java/com/lunatech/training/quarkus/ListenNotifyResource.java b/code/exercise_015_Connecting_to_Kafka/src/main/java/com/lunatech/training/quarkus/ListenNotifyResource.java index 394b09f..d44a5d0 100644 --- a/code/exercise_015_Connecting_to_Kafka/src/main/java/com/lunatech/training/quarkus/ListenNotifyResource.java +++ b/code/exercise_015_Connecting_to_Kafka/src/main/java/com/lunatech/training/quarkus/ListenNotifyResource.java @@ -5,7 +5,6 @@ import io.vertx.core.json.JsonObject; import io.vertx.mutiny.pgclient.PgConnection; import io.vertx.mutiny.pgclient.PgPool; -import io.vertx.mutiny.sqlclient.SqlConnection; import io.vertx.pgclient.PgNotification; import org.jboss.resteasy.reactive.RestStreamElementType; @@ -26,16 +25,14 @@ public class ListenNotifyResource { public Multi listen(@PathParam("channel") String channel) { return client .getConnection() + .map(PgConnection::cast) .toMulti() - .flatMap(connection -> { - Multi notifications = Multi.createFrom(). - emitter(c -> toPgConnection(connection).notificationHandler(c::emit)); - - return connection.query("LISTEN " + channel) - .execute() - .toMulti() - .flatMap(__ -> notifications); - }) + .flatMap(connection -> + connection.query("LISTEN " + channel) + .execute() + .toMulti() + .flatMap(__ -> streamNotifications(connection)) + ) .map(PgNotification::toJson); } @@ -43,14 +40,15 @@ public Multi listen(@PathParam("channel") String channel) { @POST @Produces(MediaType.TEXT_PLAIN) @Consumes(MediaType.WILDCARD) - public Uni notif(@PathParam("channel") String channel, String stuff) { + public Uni notify(@PathParam("channel") String channel, String stuff) { return client.preparedQuery("NOTIFY " + channel + ", $$" + stuff + "$$") .execute() .map(rs -> "Posted to " + channel + " channel"); } - // We have to do some type juggling here. Solved in the mutiny client v2. - PgConnection toPgConnection(SqlConnection sqlConnection) { - return new PgConnection((io.vertx.pgclient.PgConnection) sqlConnection.getDelegate()); + // Use PgConnection::notificationHandler to register a handler that emits PgNotification values on a Multi stream + private Multi streamNotifications(PgConnection connection) { + return Multi.createFrom() + .emitter(multiEmitter -> connection.notificationHandler(multiEmitter::emit)); } } diff --git a/code/exercise_016_Dead_Letter_Queue_and_Stream_filtering/src/main/java/com/lunatech/training/quarkus/ListenNotifyResource.java b/code/exercise_016_Dead_Letter_Queue_and_Stream_filtering/src/main/java/com/lunatech/training/quarkus/ListenNotifyResource.java index 394b09f..d44a5d0 100644 --- a/code/exercise_016_Dead_Letter_Queue_and_Stream_filtering/src/main/java/com/lunatech/training/quarkus/ListenNotifyResource.java +++ b/code/exercise_016_Dead_Letter_Queue_and_Stream_filtering/src/main/java/com/lunatech/training/quarkus/ListenNotifyResource.java @@ -5,7 +5,6 @@ import io.vertx.core.json.JsonObject; import io.vertx.mutiny.pgclient.PgConnection; import io.vertx.mutiny.pgclient.PgPool; -import io.vertx.mutiny.sqlclient.SqlConnection; import io.vertx.pgclient.PgNotification; import org.jboss.resteasy.reactive.RestStreamElementType; @@ -26,16 +25,14 @@ public class ListenNotifyResource { public Multi listen(@PathParam("channel") String channel) { return client .getConnection() + .map(PgConnection::cast) .toMulti() - .flatMap(connection -> { - Multi notifications = Multi.createFrom(). - emitter(c -> toPgConnection(connection).notificationHandler(c::emit)); - - return connection.query("LISTEN " + channel) - .execute() - .toMulti() - .flatMap(__ -> notifications); - }) + .flatMap(connection -> + connection.query("LISTEN " + channel) + .execute() + .toMulti() + .flatMap(__ -> streamNotifications(connection)) + ) .map(PgNotification::toJson); } @@ -43,14 +40,15 @@ public Multi listen(@PathParam("channel") String channel) { @POST @Produces(MediaType.TEXT_PLAIN) @Consumes(MediaType.WILDCARD) - public Uni notif(@PathParam("channel") String channel, String stuff) { + public Uni notify(@PathParam("channel") String channel, String stuff) { return client.preparedQuery("NOTIFY " + channel + ", $$" + stuff + "$$") .execute() .map(rs -> "Posted to " + channel + " channel"); } - // We have to do some type juggling here. Solved in the mutiny client v2. - PgConnection toPgConnection(SqlConnection sqlConnection) { - return new PgConnection((io.vertx.pgclient.PgConnection) sqlConnection.getDelegate()); + // Use PgConnection::notificationHandler to register a handler that emits PgNotification values on a Multi stream + private Multi streamNotifications(PgConnection connection) { + return Multi.createFrom() + .emitter(multiEmitter -> connection.notificationHandler(multiEmitter::emit)); } }