Skip to content

Commit

Permalink
Fix output format of jsonrpc receive notification
Browse files Browse the repository at this point in the history
Fixes #1313
  • Loading branch information
AsamK committed Aug 26, 2023
1 parent bc3bdbb commit 87e79bc
Showing 1 changed file with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ContainerNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

Expand Down Expand Up @@ -54,8 +55,8 @@ public void handleConnection(final MultiAccountManager c) {
this.commandHandler = new SignalJsonRpcCommandHandler(c, this::getCommand);

if (!noReceiveOnStart) {
this.subscribeReceive(c.getManagers());
c.addOnManagerAddedHandler(this::subscribeReceive);
this.subscribeReceive(c.getManagers(), true);
c.addOnManagerAddedHandler(m -> subscribeReceive(m, true));
c.addOnManagerRemovedHandler(this::unsubscribeReceive);
}

Expand All @@ -66,7 +67,7 @@ public void handleConnection(final Manager m) {
this.commandHandler = new SignalJsonRpcCommandHandler(m, this::getCommand);

if (!noReceiveOnStart) {
subscribeReceive(m);
subscribeReceive(m, true);
}

final var currentThread = Thread.currentThread();
Expand All @@ -77,17 +78,23 @@ public void handleConnection(final Manager m) {

private static final AtomicInteger nextSubscriptionId = new AtomicInteger(0);

private int subscribeReceive(final Manager manager) {
return subscribeReceive(List.of(manager));
private int subscribeReceive(final Manager manager, boolean internalSubscription) {
return subscribeReceive(List.of(manager), internalSubscription);
}

private int subscribeReceive(final List<Manager> managers) {
private int subscribeReceive(final List<Manager> managers, boolean internalSubscription) {
final var subscriptionId = nextSubscriptionId.getAndIncrement();
final var handlers = managers.stream().map(m -> {
final var receiveMessageHandler = new JsonReceiveMessageHandler(m, s -> {
final var params = new ObjectNode(objectMapper.getNodeFactory());
params.set("subscription", IntNode.valueOf(subscriptionId));
params.set("result", objectMapper.valueToTree(s));
ContainerNode<?> params;
if (internalSubscription) {
params = objectMapper.valueToTree(s);
} else {
final var paramsNode = new ObjectNode(objectMapper.getNodeFactory());
paramsNode.set("subscription", IntNode.valueOf(subscriptionId));
paramsNode.set("result", objectMapper.valueToTree(s));
params = paramsNode;
}
final var jsonRpcRequest = JsonRpcRequest.forNotification("receive", params, null);
try {
jsonRpcSender.sendRequest(jsonRpcRequest);
Expand Down Expand Up @@ -162,15 +169,15 @@ public String getName() {
public void handleCommand(
final Void request, final Manager m, final JsonWriter jsonWriter
) throws CommandException {
final var subscriptionId = subscribeReceive(m);
final var subscriptionId = subscribeReceive(m, false);
jsonWriter.write(subscriptionId);
}

@Override
public void handleCommand(
final Void request, final MultiAccountManager c, final JsonWriter jsonWriter
) throws CommandException {
final var subscriptionId = subscribeReceive(c.getManagers());
final var subscriptionId = subscribeReceive(c.getManagers(), false);
jsonWriter.write(subscriptionId);
}
}
Expand Down

0 comments on commit 87e79bc

Please sign in to comment.