Skip to content

Commit

Permalink
Add a part to be implemented in ListenNotifyResource (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
njlbenn authored Jun 15, 2024
1 parent 9c02efa commit 00c0211
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 57 deletions.
56 changes: 55 additions & 1 deletion code/exercise_013_Listen_and_Notify/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,61 @@ In this exercise, we will play a bit with Postgres’ `LISTEN / NOTIFY` feature,

The `LISTEN / NOTIFY` feature of Postgres allows you to setup a connection to Postgres, and listen for evens that pass by on a channel, as well as notifying such channels. With the reactive sql clients, we can connect to these channels as a `Multi` in Quarkus.

* Get the code for `ListenNotifyResource` by executing this command from the command line: `cmtc pull-template src/main/java/com/lunatech/training/quarkus/ListenNotifyResource.java <root folder of exercise repo>`.
* Create the class `ListenNotifyResource` from the following template:

```java
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.pgclient.PgConnection;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.pgclient.PgNotification;
import org.jboss.resteasy.reactive.RestStreamElementType;

import jakarta.inject.Inject;
import jakarta.ws.rs.*;
import jakarta.ws.rs.core.MediaType;

@Path("/channel")
public class ListenNotifyResource {

@Inject
PgPool client;

@Path("{channel}")
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<JsonObject> listen(@PathParam("channel") String channel) {
return client
.getConnection()
.map(PgConnection::cast)
.toMulti()
.flatMap(connection ->
connection.query("LISTEN " + channel)
.execute()
.toMulti()
.flatMap(__ -> streamNotifications(connection))
)
.map(PgNotification::toJson);
}

@Path("{channel}")
@POST
@Produces(MediaType.TEXT_PLAIN)
@Consumes(MediaType.WILDCARD)
public Uni<String> notify(@PathParam("channel") String channel, String stuff) {
return client.preparedQuery("NOTIFY " + channel + ", $$" + stuff + "$$")
.execute()
.map(rs -> "Posted to " + channel + " channel");
}

// Use PgConnection::notificationHandler to register a handler that emits PgNotification values on a Multi stream
private Multi<PgNotification> streamNotifications(PgConnection connection) {
// To be implemented
}
}
```

* Connect to the channel `milkshakes` using the following cURL command.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.pgclient.PgConnection;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.SqlConnection;
import io.vertx.pgclient.PgNotification;
import org.jboss.resteasy.reactive.RestStreamElementType;

Expand All @@ -26,31 +25,30 @@ public class ListenNotifyResource {
public Multi<JsonObject> listen(@PathParam("channel") String channel) {
return client
.getConnection()
.map(PgConnection::cast)
.toMulti()
.flatMap(connection -> {
Multi<PgNotification> notifications = Multi.createFrom().
emitter(c -> toPgConnection(connection).notificationHandler(c::emit));

return connection.query("LISTEN " + channel)
.execute()
.toMulti()
.flatMap(__ -> notifications);
})
.flatMap(connection ->
connection.query("LISTEN " + channel)
.execute()
.toMulti()
.flatMap(__ -> streamNotifications(connection))
)
.map(PgNotification::toJson);
}

@Path("{channel}")
@POST
@Produces(MediaType.TEXT_PLAIN)
@Consumes(MediaType.WILDCARD)
public Uni<String> notif(@PathParam("channel") String channel, String stuff) {
public Uni<String> notify(@PathParam("channel") String channel, String stuff) {
return client.preparedQuery("NOTIFY " + channel + ", $$" + stuff + "$$")
.execute()
.map(rs -> "Posted to " + channel + " channel");
}

// We have to do some type juggling here. Solved in the mutiny client v2.
PgConnection toPgConnection(SqlConnection sqlConnection) {
return new PgConnection((io.vertx.pgclient.PgConnection) sqlConnection.getDelegate());
// Use PgConnection::notificationHandler to register a handler that emits PgNotification values on a Multi stream
private Multi<PgNotification> streamNotifications(PgConnection connection) {
return Multi.createFrom()
.emitter(multiEmitter -> connection.notificationHandler(multiEmitter::emit));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.pgclient.PgConnection;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.SqlConnection;
import io.vertx.pgclient.PgNotification;
import org.jboss.resteasy.reactive.RestStreamElementType;

Expand All @@ -26,31 +25,30 @@ public class ListenNotifyResource {
public Multi<JsonObject> listen(@PathParam("channel") String channel) {
return client
.getConnection()
.map(PgConnection::cast)
.toMulti()
.flatMap(connection -> {
Multi<PgNotification> notifications = Multi.createFrom().
emitter(c -> toPgConnection(connection).notificationHandler(c::emit));

return connection.query("LISTEN " + channel)
.execute()
.toMulti()
.flatMap(__ -> notifications);
})
.flatMap(connection ->
connection.query("LISTEN " + channel)
.execute()
.toMulti()
.flatMap(__ -> streamNotifications(connection))
)
.map(PgNotification::toJson);
}

@Path("{channel}")
@POST
@Produces(MediaType.TEXT_PLAIN)
@Consumes(MediaType.WILDCARD)
public Uni<String> notif(@PathParam("channel") String channel, String stuff) {
public Uni<String> notify(@PathParam("channel") String channel, String stuff) {
return client.preparedQuery("NOTIFY " + channel + ", $$" + stuff + "$$")
.execute()
.map(rs -> "Posted to " + channel + " channel");
}

// We have to do some type juggling here. Solved in the mutiny client v2.
PgConnection toPgConnection(SqlConnection sqlConnection) {
return new PgConnection((io.vertx.pgclient.PgConnection) sqlConnection.getDelegate());
// Use PgConnection::notificationHandler to register a handler that emits PgNotification values on a Multi stream
private Multi<PgNotification> streamNotifications(PgConnection connection) {
return Multi.createFrom()
.emitter(multiEmitter -> connection.notificationHandler(multiEmitter::emit));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.pgclient.PgConnection;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.SqlConnection;
import io.vertx.pgclient.PgNotification;
import org.jboss.resteasy.reactive.RestStreamElementType;

Expand All @@ -26,31 +25,30 @@ public class ListenNotifyResource {
public Multi<JsonObject> listen(@PathParam("channel") String channel) {
return client
.getConnection()
.map(PgConnection::cast)
.toMulti()
.flatMap(connection -> {
Multi<PgNotification> notifications = Multi.createFrom().
emitter(c -> toPgConnection(connection).notificationHandler(c::emit));

return connection.query("LISTEN " + channel)
.execute()
.toMulti()
.flatMap(__ -> notifications);
})
.flatMap(connection ->
connection.query("LISTEN " + channel)
.execute()
.toMulti()
.flatMap(__ -> streamNotifications(connection))
)
.map(PgNotification::toJson);
}

@Path("{channel}")
@POST
@Produces(MediaType.TEXT_PLAIN)
@Consumes(MediaType.WILDCARD)
public Uni<String> notif(@PathParam("channel") String channel, String stuff) {
public Uni<String> notify(@PathParam("channel") String channel, String stuff) {
return client.preparedQuery("NOTIFY " + channel + ", $$" + stuff + "$$")
.execute()
.map(rs -> "Posted to " + channel + " channel");
}

// We have to do some type juggling here. Solved in the mutiny client v2.
PgConnection toPgConnection(SqlConnection sqlConnection) {
return new PgConnection((io.vertx.pgclient.PgConnection) sqlConnection.getDelegate());
// Use PgConnection::notificationHandler to register a handler that emits PgNotification values on a Multi stream
private Multi<PgNotification> streamNotifications(PgConnection connection) {
return Multi.createFrom()
.emitter(multiEmitter -> connection.notificationHandler(multiEmitter::emit));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.pgclient.PgConnection;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.SqlConnection;
import io.vertx.pgclient.PgNotification;
import org.jboss.resteasy.reactive.RestStreamElementType;

Expand All @@ -26,31 +25,30 @@ public class ListenNotifyResource {
public Multi<JsonObject> listen(@PathParam("channel") String channel) {
return client
.getConnection()
.map(PgConnection::cast)
.toMulti()
.flatMap(connection -> {
Multi<PgNotification> notifications = Multi.createFrom().
emitter(c -> toPgConnection(connection).notificationHandler(c::emit));

return connection.query("LISTEN " + channel)
.execute()
.toMulti()
.flatMap(__ -> notifications);
})
.flatMap(connection ->
connection.query("LISTEN " + channel)
.execute()
.toMulti()
.flatMap(__ -> streamNotifications(connection))
)
.map(PgNotification::toJson);
}

@Path("{channel}")
@POST
@Produces(MediaType.TEXT_PLAIN)
@Consumes(MediaType.WILDCARD)
public Uni<String> notif(@PathParam("channel") String channel, String stuff) {
public Uni<String> notify(@PathParam("channel") String channel, String stuff) {
return client.preparedQuery("NOTIFY " + channel + ", $$" + stuff + "$$")
.execute()
.map(rs -> "Posted to " + channel + " channel");
}

// We have to do some type juggling here. Solved in the mutiny client v2.
PgConnection toPgConnection(SqlConnection sqlConnection) {
return new PgConnection((io.vertx.pgclient.PgConnection) sqlConnection.getDelegate());
// Use PgConnection::notificationHandler to register a handler that emits PgNotification values on a Multi stream
private Multi<PgNotification> streamNotifications(PgConnection connection) {
return Multi.createFrom()
.emitter(multiEmitter -> connection.notificationHandler(multiEmitter::emit));
}
}

0 comments on commit 00c0211

Please sign in to comment.