From 5bf0eb1a2524b8627c89c3561d46426573256f28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Ger=C5=A1ak?= Date: Thu, 21 Nov 2024 12:14:04 +0100 Subject: [PATCH] Changes to support graphql-ws spec https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md --- .../lacinia/pedestal/subscriptions.clj | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/src/com/walmartlabs/lacinia/pedestal/subscriptions.clj b/src/com/walmartlabs/lacinia/pedestal/subscriptions.clj index f013e37..bb4d203 100644 --- a/src/com/walmartlabs/lacinia/pedestal/subscriptions.clj +++ b/src/com/walmartlabs/lacinia/pedestal/subscriptions.clj @@ -133,13 +133,16 @@ ;; Otherwise it's a message from the client to be acted upon. (let [{:keys [id payload type]} data] (case type + "ping" + (when (>! response-data-ch {:type :pong}) + (recur connection-state)) "connection_init" (when (>! response-data-ch {:type :connection_ack}) (recur (assoc connection-state :connection-params payload))) ;; TODO: Track state, don't allow start, etc. until after connection_init - "start" + ("start" "subscribe") (if (contains? (:subs connection-state) id) (do (log/trace :event ::ignoring-duplicate :id id) @@ -150,7 +153,7 @@ sub-shutdown-ch (execute-query-interceptors id payload response-data-ch cleanup-ch merged-context)] (recur (assoc-in connection-state [:subs id] sub-shutdown-ch))))) - "stop" + ("stop" "complete") (do (log/trace :event ::stop :id id) (when-some [sub-shutdown-ch (get-in connection-state [:subs id])] @@ -247,6 +250,12 @@ :payload payload}) (close! response-data-ch)))})) +(defn ^:private protocol-response-type + [context] + (case (::subprotocol context) + ("graphql-transport-ws") :next + :data)) + (def send-operation-response-interceptor "Interceptor responsible for the :response key of the context (set when a request is either a query or mutation, but not a subscription). The :response data @@ -257,7 +266,7 @@ :leave (fn [context] (when-let [response (:response context)] (let [{:keys [id response-data-ch]} (:request context)] - (put! response-data-ch {:type :data + (put! response-data-ch {:type (protocol-response-type context) :id id :payload response}) (put! response-data-ch {:type :complete @@ -326,6 +335,7 @@ thread) ch)) + (defn ^:private execute-subscription [context parsed-query] (let [{:keys [::values-chan-fn request]} context @@ -382,7 +392,7 @@ (resolve/on-deliver! (fn [response] (log/trace :response response :id id) (put! response-data-ch - {:type :data + {:type (protocol-response-type context) :id id :payload response}) (let [new-count (swap! *execution-count dec)] @@ -535,14 +545,15 @@ : Used to create the channel of text responses sent to the client. The default is 10 (a non-lossy channel)." [compiled-schema options] - (let [{:keys [keep-alive-ms app-context send-buffer-or-n response-chan-fn values-chan-fn session-initializer] + (let [{:keys [keep-alive-ms app-context send-buffer-or-n response-chan-fn values-chan-fn session-initializer context-initializer] :or {keep-alive-ms 25000 send-buffer-or-n 10 response-chan-fn #(chan 10) values-chan-fn #(chan 1)}} options interceptors (or (:subscription-interceptors options) (default-subscription-interceptors compiled-schema app-context)) - base-context (-> {::values-chan-fn values-chan-fn} + base-context (-> {::values-chan-fn values-chan-fn + } (chain/terminate-when :response) (chain/enqueue interceptors)) on-open (fn [^Session session ^EndpointConfig config] @@ -557,7 +568,11 @@ ; client text -> server ws-text-ch (chan 1) ; client text -> client data - ws-data-ch (chan 10)] + ws-data-ch (chan 10) + base-context (as-> base-context context + (if-not (fn? context-initializer) context + (context-initializer context session)) + (assoc context ::subprotocol (.getNegotiatedSubprotocol session)))] (response-encode-loop response-data-ch send-ch) (ws-parse-loop session-id ws-text-ch ws-data-ch response-data-ch) (connection-loop session-id keep-alive-ms ws-data-ch response-data-ch base-context) @@ -575,7 +590,7 @@ (log/error :event ::error :session-id session-id :exception t))] (-> options (select-keys [:idle-timeout-ms]) - (assoc :subprotocols ["graphql-ws"] + (assoc :subprotocols ["graphql-ws" "graphql-transport-ws"] :on-open on-open :on-close on-close :on-text on-text