clj-bson-rpc

0.2.1


bson rpc protocol

dependencies

com.taoensso/timbre
4.1.4
congomongo
0.4.6
manifold
0.1.1
org.clojure/clojure
1.7.0
org.clojure/core.async
0.2.374
org.clojure/data.json
0.2.6



(this space intentionally left almost blank)
 

BSON-RPC Connection over manifold duplex stream.

Practical applications include (but limited to) using aleph to establish TCP connection (+ TLS) between the client and the server.

After an RPC connection has been set on both sides, the RPC nodes are principally equal. The TCP client may provide services which the TCP server then uses and vice versa.

(ns clj-bson-rpc.rpc)

Constants and Helpers

(def rpc-error
  {:parse-error      {:code -32700 :message "Parse error"}
   :invalid-request  {:code -32600 :message "Invalid Request"}
   :method-not-found {:code -32601 :message "Method not found"}
   :invalid-params   {:code -32602 :message "Invalid params"}
   :internal-error   {:code -32603 :message "Internal error"}
   :server-error     {:code -32000 :message "Server error"}})
(def rpc-version "2.0")
(defonce identifier-counter (atom 1))
(defn default-id-generator [] (str "id-" (swap! identifier-counter inc)))

Message Type Identifiers

(defn request?
  [{:keys [protocol-keyword]} msg]
  (and (= (get msg protocol-keyword) rpc-version)
       (string? (:method msg))
       (contains? msg :id) (or (string? (:id msg)) (integer? (:id msg)) (nil? (:id msg)))))
(defn notification?
  [{:keys [protocol-keyword]} msg]
  (and (= (get msg protocol-keyword) rpc-version)
       (string? (:method msg))
       (not (contains? msg :id))))
(defn response?
  [{:keys [protocol-keyword]} msg]
  (let [result-and-no-error (and (contains? msg :result) (not (contains? msg :error)))
        error-and-no-result (and (not (contains? msg :result))
                                 (integer? (get-in msg [:error :code]))
                                 (string? (get-in msg [:error :message])))]
    (and (= (get msg protocol-keyword) rpc-version)
         (contains? msg :id) (or (string? (:id msg)) (integer? (:id msg)))
         (or result-and-no-error error-and-no-result))))
(defn nil-id-error-response?
  [{:keys [protocol-keyword]} msg]
  (let [error-and-no-result (and (not (contains? msg :result))
                                 (integer? (get-in msg [:error :code]))
                                 (string? (get-in msg [:error :message])))]
    (and (= (get msg protocol-keyword) rpc-version)
         (contains? msg :id) (nil? (:id msg))
         error-and-no-result)))

Inbound Requests, Notifications

(defn handle-request
  [protocol-keyword request-handlers msg]
  (let [method (name (:method msg))
        params (:params msg)
        id (:id msg)
        ok-response (fn [result] {protocol-keyword rpc-version
                                  :result result
                                  :id id})
        invalid-params (fn [e-msg] {protocol-keyword rpc-version
                                    :error (assoc (:invalid-params rpc-error) :data e-msg)
                                    :id id})
        method-not-found (fn [] {protocol-keyword rpc-version
                                 :error (:method-not-found rpc-error)
                                 :id id})]
    (if (contains? request-handlers method)
      (try
        (ok-response (apply (get request-handlers method) params))
        (catch clojure.lang.ArityException e (invalid-params (str e))))
      (method-not-found))))
(defn handle-notification
  [notification-error-handler notification-handlers msg]
  (let [method (name (:method msg))
        params (:params msg)]
    (if (contains? notification-handlers method)
      (apply (get notification-handlers method) params)
      (notification-error-handler
        (ex-info (str "No handler found for notification:" method)
                 {:type :notification-handler-not-found :method method})))))
 

Utilities for binary/byte-buffer manipulations.

(ns clj-bson-rpc.bytes
  (:import
    [java.nio ByteBuffer ByteOrder]))

Read int32-le from byte-array

(defn peek-int32-le
  [bbuf]
  (-> (ByteBuffer/wrap bbuf 0 4)
      (.order ByteOrder/LITTLE_ENDIAN)
      (.getInt)
      (long)))

Concatenate byte buffers.

(defn bbuf-concat
  [a b]
  (if (empty? a)
    b
    (byte-array (concat a b))))
(defn bbuf-split-at
  [n bbuf]
  (mapv byte-array (split-at n bbuf)))
(defn bbuf-split-before
  [bt bbuf]
  (mapv byte-array (split-with (fn [b] (not= b bt)) bbuf)))
(defn bbuf-split-after
  [bt bbuf]
  (let [[fst snd] (split-with (fn [b] (not= b bt)) bbuf)]
    [(byte-array (concat fst [(first snd)])) (byte-array (rest snd))]))
 

BSON encoding and decoding. Utilizes mongodb codecs for clojure <-> bson conversions.

(ns clj-bson-rpc.bson
  (:import
    [org.bson BasicBSONDecoder BasicBSONEncoder])
  (:require
    [clj-bson-rpc.bytes :refer [bbuf-concat bbuf-split-at peek-int32-le]]
    [manifold.stream :as stream]
    [somnium.congomongo.coerce :refer [coerce]]))

Encode message to BSON byte-array

(defn encode
  [m]
  (.encode (BasicBSONEncoder.) (coerce m [:clojure :mongo])))

Decode BSON byte-array to message

(defn decode
  [b]
  (coerce (.readObject (BasicBSONDecoder.) b) [:mongo :clojure]))

Lift BSON message(s) from source stream s based on BSON frame recognition, decode to clojure (Map messages) and push to the result stream. Various errors will produce ex-info items to the result stream.

(defn- stream-decoder
  [s max-len]
  (let [source (stream/->source s)
        sink (stream/stream)
        find-frames (fn [bbuff]
                      (loop [frames [] remainder bbuff]
                        (let [blen (count remainder)
                              rlen (when (>= blen 4) (peek-int32-le remainder))
                              err-long #(ex-info (str "Message length exceeds the maximum allowed. "
                                                      "Max: " max-len ", Message len: " rlen)
                                                 {:type :exceeds-max-length :bytes remainder})
                              err-frame #(ex-info "Message framing error."
                                                  {:type :invalid-framing :bytes remainder})]
                          (cond
                            (nil? rlen)      [frames remainder]
                            (> rlen max-len) [(conj frames (err-long)) remainder]
                            (< rlen 5)       [(conj frames (err-frame)) remainder]
                            (>= blen rlen)   (let [[frame remainder] (bbuf-split-at rlen remainder)]
                                               (recur (conj frames frame) remainder))
                            :else            [frames remainder]))))
        decode-frame (fn [frame]
                       (if (instance? clojure.lang.ExceptionInfo frame)
                         frame
                         (try
                           (decode frame)
                           (catch Exception e
                             (ex-info (str e) {:type :invalid-bson :bytes frame})))))
        buffer (atom (byte-array []))
        decoder (fn [new-bytes]
                  (let [[frames remainder] (find-frames (bbuf-concat @buffer new-bytes))
                        trail (if (and (stream/drained? source) (seq remainder))
                                [(ex-info "Non-decodable trailing bytes when stream was closed."
                                          {:type :trailing-garbage :bytes remainder})]
                                [])]
                    (reset! buffer remainder)
                    (->> (into frames trail)
                         (mapv decode-frame)
                         (stream/put-all! sink))))]
    (stream/connect-via source decoder sink {:downstream? false})
    (stream/on-drained source #(do (decoder []) (stream/close! sink)))
    sink))

BSON codec for a manifold duplex stream.

  • s - Duplex stream of raw bytes (possibly from aleph)
  • max-len - Optional argument for limiting incoming message maximum size.

    Returns a duplex stream which:

    • As a sink takes clojure maps which are encoded and sent to the byte stream s
    • As a source produces decoded (clojurized) BSON messages from s

    Any non-decodable byte sequences from the raw stream are converted to clojure.lang.ExceptionInfo objects which are takeable from the returned duplex stream. Thus take! from this source will either:

    • Block waiting a new message.
    • Return a bson message.
    • Return a clojure.lang.ExceptionInfo object.
      • Message framing error (bson length is negative or < 5)-> :type :invalid-framing
      • Message decoding error from bson parser-> :type :invalid-bson ! Stream s must be closed.
      • Non-decodable trailing bytes when stream closed-> :type :trailing-garbage
    • Return nil if underlying stream s is closed and source is drained.
(defn bson-codec
  [s & {:keys [max-len] :or {max-len (Integer/MAX_VALUE)}}]
  (let [out (stream/stream)]
    (stream/connect (stream/map encode out) s)
    (stream/splice out (stream-decoder s max-len))))
 

JSON-RPC 2.0 and BSON-RPC Connections over manifold duplex stream.

Practical applications include using aleph to establish TCP connection (+ TLS) between the client and the server.

After an RPC connection has been set on both sides, the RPC nodes are principally equal. The TCP client may provide services which the TCP server then uses and vice versa.

(ns clj-bson-rpc.core
  (:require
    [clj-bson-rpc.bson :refer [bson-codec]]
    [clj-bson-rpc.json :refer [json-codec]]
    [clj-bson-rpc.rpc :as rpc :refer [rpc-error rpc-version]]
    [clojure.core.async :refer [<!! go]]
    [manifold.stream :as stream]
    [taoensso.timbre :as log]))

Default Handlers

(defn- default-timeout-handler
  [{:keys [close! connection-id]}]
  (close!)
  (log/error connection-id "- Connection closed due to idle timeout."))
(defn- default-close-handler
  [{:keys [close! connection-id]}]
  (close!)
  (log/trace connection-id "- Connection closed by peer."))
(defn- default-nil-id-error-handler
  [rpc-ctx msg]
  (log/error (:connection-id rpc-ctx) "- Error response from peer:" (:error msg)))
(defn- default-notification-error-handler
  [rpc-ctx e]
  (log/error (:connection-id rpc-ctx) "- Notification handler failed:" e))
(defn- default-invalid-id-response-handler
  [rpc-ctx msg]
  (log/error (:connection-id rpc-ctx) "- Response handling failed, id:" (:id msg) "is not valid. Msg:" msg))

Inbound Requests, Notifications and Responses

(defn- handle-request
  [{:keys [close! close-server! connection-id protocol-keyword request-handlers socket]} msg]
  (log/trace connection-id "- Received request:" msg)
  (let [post-action (atom nil)
        ok-response (fn [result] {protocol-keyword rpc-version
                                  :result result
                                  :id (:id msg)})
        server-error (fn [e-msg] {protocol-keyword rpc-version
                                  :error (assoc (:server-error rpc-error) :data e-msg)
                                  :id (:id msg)})
        response (try
                   (rpc/handle-request protocol-keyword request-handlers msg)
                   (catch clojure.lang.ExceptionInfo e
                     (if (= (:type (ex-data e)) :rpc-control)
                       (do (reset! post-action (:action (ex-data e)))
                           (ok-response (:response (ex-data e))))
                       (server-error (str e))))
                   (catch Exception e (server-error (str e))))]
    (if @(stream/put! socket response)
        (log/trace connection-id "- Sent response:" response)
        (do (close!)
            (log/error connection-id "- Failed to send a response to a request."
                       "Services and socket closed. (failed response: " response ")")))
    (case @post-action
      :close (close!)
      :close-server (close-server!)
      :close-all (do (close!) (close-server!))
      nil)))
(defn- handle-notification
  [{:keys [close! close-server! connection-id notification-error-handler notification-handlers] :as rpc-ctx} msg]
  (log/trace connection-id "- Received notification:" msg)
  (let [post-action (atom nil)]
    (try
      (rpc/handle-notification (partial notification-error-handler rpc-ctx) notification-handlers msg)
      (catch clojure.lang.ExceptionInfo e
        (if (= (:type (ex-data e)) :rpc-control)
          (reset! post-action (:action (ex-data e)))
          (notification-error-handler rpc-ctx e)))
      (catch Exception e
        (notification-error-handler rpc-ctx e)))
    (case @post-action
      :close (close!)
      :close-server (close-server!)
      :close-all (do (close!) (close-server!))
      nil)))
(defn- handle-response
  [{:keys [connection-id invalid-id-response-handler response-channels] :as rpc-ctx} msg]
  (log/trace connection-id "- Received response:" msg)
  (let [id (:id msg)
        channel (atom nil)]
    (swap! response-channels (fn [m] (reset! channel (get m id)) (dissoc m id)))
    (if @channel
      (if (not @(stream/put! @channel msg))
        (log/error connection-id "- Sending response failed." msg))
      (invalid-id-response-handler rpc-ctx msg))))

Inbound Garbage

(defn- handle-parse-error
  [{:keys [close! connection-id protocol-keyword socket] :as rpc-ctx} msg]
  (go
    (let [try-send-error! (fn [] (try
                                   (if (not @(stream/put! socket {protocol-keyword rpc-version
                                                                  :error (assoc (:parse-error rpc-error) :data msg)
                                                                  :id nil}))
                                     (log/error connection-id "- Failed to inform peer about parse error:" msg))
                                   (catch Exception e
                                     (log/error connection-id "- Failed to inform peer about parse error:" e))))
          irrecoverable-error (fn [] (try-send-error!)
                                     (close!)
                                     (log/error connection-id "- Irrecoverable parsing error."
                                                              "Services and socket closed." msg))
          transient-error (fn [] (try-send-error!)
                                 (log/error connection-id "-" msg))]
      (case (:type (ex-data msg))
        :exceeds-max-length (irrecoverable-error)
        :invalid-framing (irrecoverable-error)
        :invalid-json (if (= (:json-framing rpc-ctx) :none) (irrecoverable-error) (transient-error))
        :invalid-bson (transient-error)
        :trailing-garbage (log/warn connection-id "-" msg)
        (log/error connection-id "- Unexpected parse error:" msg)))))
(defn- handle-schema-error
  [{:keys [connection-id protocol-keyword socket]} msg]
  (go
    (let [id (:id msg)]
      (if (contains? msg :method)
        (stream/put! socket {protocol-keyword rpc-version
                             :error (assoc (:invalid-request rpc-error) :data msg)
                             :id id}))  ;; effort made - do not care if socket errors
      (log/error connection-id "- Invalid request:" msg))))

Service Dispatcher

Dispatch Incoming Requests, Notifications and Responses to wrapped handlers.

(defn- run-rpc-services
  [{:keys [async-notification-handling async-request-handling
           connection-closed-handler connection-id idle-timeout idle-timeout-handler
           nil-id-error-handler response-channels run-services socket]
    :as rpc-ctx}]
  (let [take! (if idle-timeout
                (fn [s] (stream/try-take! s nil idle-timeout :timeout))
                (fn [s] (stream/take! s)))]
    (go
      (log/trace connection-id "- Start RPC message dispatcher.")
      (while @run-services
        (let [msg @(take! socket)]
          (log/trace connection-id "- Dispatch message.")
          (cond
            (nil? msg)                                 (connection-closed-handler rpc-ctx)
            (= msg :timeout)                           (idle-timeout-handler rpc-ctx)
            (instance? clojure.lang.ExceptionInfo msg) (handle-parse-error rpc-ctx msg)
            (rpc/request? rpc-ctx msg)                 (if async-request-handling
                                                         (go (handle-request rpc-ctx msg))
                                                         (handle-request rpc-ctx msg))
            (rpc/notification? rpc-ctx msg)            (if async-notification-handling
                                                         (go (handle-notification rpc-ctx msg))
                                                         (handle-notification rpc-ctx msg))
            (rpc/response? rpc-ctx msg)                (handle-response rpc-ctx msg)
            (rpc/nil-id-error-response? rpc-ctx msg)   (nil-id-error-handler rpc-ctx msg)
            :else                                      (handle-schema-error rpc-ctx msg))))
      (doseq [chn (vals @response-channels)] ; Signal to waiters that connection is closed
        (stream/put! chn :closed))
      (log/trace connection-id "- Exit RPC message dispatcher."))))

Public Interface

connect-rpc! default options. See connect-bson-rpc! and connect-json-rpc! for semantic explanations. These values are used for those options which are not provided in the options Map argument.

Defaults:

  • :async-notification-handling - Default value is false, which is based on an assumption that typically notifications are order-sensitive messages e.g. progress reports which must be handled in the exact order in which they are received.
  • :async-request-handling - Default value is true. Async handling allows handlers to execute even the most time-consuming tasks without blocking other incoming messages. Yet the client side can deside whether to call requests in tight sequential order or to call them in parallel.
  • :connection-closed-handler - Default handler stops the service for the current stream and closes the stream. (No effect on the server socket/object.)
  • :connection-id - Arbitrary unique ID to identify connection, shown in logging so that multiple connections with separate clients can be traced. nil -> generated integer id.
  • :id-generator - Default generator generates 'id-1', 'id-2', etc. ids for outbound requests.
  • :idle-timeout - Default value nil disables idle timeouts.
  • :idle-timeout-handler - Default handler stops the service for the current stream and closes the stream. (No effect on the the server socket/object.)
  • :invalid-id-response-handler - Default handler logs the error.
  • :json-framing - Default :none means JSON messages are streamed consequentially without framing.
  • :json-key-fn- Default clojure.core/keyword, JSON Object keys are keywordized.
  • :max-len - The max capacity defined in bson specification: 2 147 483 647 bytes (Max Int32)
  • :nil-id-error-handler - Default handler logs the error message sent by rpc peer node.
  • :notification-error-handler - Default handler logs the error.
  • :server - Default value nil
(def default-options
  {:async-notification-handling false
   :async-request-handling true
   :connection-closed-handler default-close-handler
   :connection-id nil
   :id-generator rpc/default-id-generator
   :idle-timeout nil
   :idle-timeout-handler default-timeout-handler
   :invalid-id-response-handler default-invalid-id-response-handler
   :json-framing :none
   :json-key-fn keyword
   :max-len (Integer/MAX_VALUE)
   :nil-id-error-handler default-nil-id-error-handler
   :notification-error-handler default-notification-error-handler
   :server nil})

Call this function within your request or notification handler in order to disconnect current tcp connection. close-connection! does not return.

  • response - Response sent to a request (if within a request handler) just before disconnection. Defaults to nil if not provided.
(defn close-connection!
  ([] (close-connection! nil))
  ([response]
   (throw (ex-info "" {:type :rpc-control :action :close :response response}))))

Call this function within your request or notification handler in order to close server socket. Does not disconnect your current connection. close-server! does not return.

  • response - Response sent to a request (if within a request handler). Defaults to nil if not provided.
(defn close-server!
  ([] (close-server! nil))
  ([response]
   (throw (ex-info "" {:type :rpc-control :action :close-server :response response}))))

Call this function within your request or notification handler in order to close both current connection and the server socket. close-connection-and-server! does not return.

  • response - Response sent to a request (if within a request handler) just before disconnection. Defaults to nil if not provided.
(defn close-connection-and-server!
  ([] (close-connection-and-server! nil))
  ([response]
   (throw (ex-info "" {:type :rpc-control :action :close-all :response response}))))

Connect BSON/JSON RPC Services. See: connect-bson-rcp! and connect-json-rpc!

(defn- connect-rpc!
  [s codec request-handlers notification-handlers options]
   (let [xson-stream (case codec
                       :bson (bson-codec s :max-len (get options :max-len (Integer/MAX_VALUE)))
                       :json (json-codec s :json-framing (get options :json-framing :none)
                                           :json-key-fn (get options :json-key-fn keyword)
                                           :max-len (get options :max-len (Integer/MAX_VALUE)))
                       (throw (ex-info "Invalid codec.")))
         default-protocol-keyword (keyword (str (name codec) "rpc"))
         run-services (atom true)
         response-channels (atom {})
         rpc-ctx (into (into default-options options)
                       {:close! #(do (stream/close! xson-stream)
                                     (reset! run-services false))
                        :close-server! #(if (:server options) (.close (:server options)))
                        :connection-id (if (nil? (:connection-id options))
                                         (str (swap! rpc/identifier-counter inc))
                                         (:connection-id options))
                        :protocol-keyword (get options :protocol-keyword default-protocol-keyword)
                        :response-channels response-channels
                        :run-services run-services
                        :socket xson-stream})]
     (let [keys->strings (fn [m] (into {} (mapv (fn [[k v]] [(name k) v]) m)))
           notification-handlers (keys->strings
                                   (if (fn? notification-handlers)
                                     (notification-handlers rpc-ctx)
                                     notification-handlers))
           request-handlers (keys->strings
                              (if (fn? request-handlers)
                                (request-handlers rpc-ctx)
                                request-handlers))
           rpc-ctx (assoc rpc-ctx
                          :notification-handlers notification-handlers
                          :request-handlers request-handlers)]
       (run-rpc-services rpc-ctx)
       rpc-ctx)))

Connect rpc services and create a context for sending BSON-RPC (2.0) requests and notifications to the RPC Peer Node over TCP connection.

  • s - Manifold duplex stream connected to the rpc peer node. (e.g. from aleph.tcp/start-server to the handler or from aleph.tcp/client)
  • request-handlers

    • A Map of request handlers: {::String/Keyword ::Function}. These functions are exposed to be callable by the rpc peer node. Function return values are sent back to peer node and any thrown errors are sent as error responses to the peer node.
    • Alternatively this parameter accepts a function which takes rpc-ctx and returns an above-mentioned Map of request handlers. Necessary if any request handler needs to send notifications to peer during the processing of request. example function:

      ``` (defn generate-request-handlers [rpc-ctx] {:quick-task quick-task :long-process (partial long-process rpc-ctx)}) ``` where the long-process will thus have the rpc-ctx and will be able to call (notify! rpc-ctx :report-progress details) or even (request! rpc-ctx ...)

  • notification-handlers
    • A Map of notification handlers: {::String/Keyword ::Function}. These functions will receive the named notifications sent by the peer node. Any errors thrown by these handlers will be delegated to a callback defined by (:notification-error-handler options)
    • Alternatively can be a function which takes rpc-ctx and returns a Map of handlers.
  • options - A Map of optional arguments. default-options are used as a baseline of which any or all values can be overridden with ones provided by options.

    Valid keys for options:

  • :async-notification-handling - Boolean for async handling of notifications.

    • false -> Handler functions are guaranteed to be called in the message receiving order. Next incoming message can't be processed until the handler function returns.
    • true -> Handlers executed in go-blocks -> random order.
  • :async-request-handling - Boolean for async handling of requests. Async handling allows multiple requests to processed in parallel (if client so wishes). Note that client can enforce synchronous processing simply by waiting the answer to previous request before calling new request.
    • Dispatching of responses is synchronous regardless of this setting. If :async-notification-handling was set to false then all notifications possibly sent by a (peer node) response handler will be processed by the time the response is returned.
  • :connection-closed-handler - Is called when peer closes the connection. One argument: rpc-ctx. Return value ignored.
  • :connection-id - ID to use in server logging to identify current connection.
  • :id-generator - Is called when a new ID for outgoing rpc request is needed. No arguments. Must return a string or integer which should be unique over the duration of the connection.
  • :idle-timeout - Timeout in milliseconds. idle-timeout-handler will be triggered if timeout is enabled and nothing has been received from peer node within idle-timeout. Disable by setting to nil.
  • :idle-timeout-handler - One argument: rpc-ctx. Return value ignored.
  • :invalid-id-response-handler - Two arguments: rpc-ctx and message. Return value ignored. Used if peer sends a response in which ID does not match with any sent requests waiting for a response.
  • :max-len - Incoming message max length.
  • :nil-id-error-handler - Is called when an error response with id: null is received from the peer node. (Normal error responses are marshalled to throw errors within request! calls.) Two arguments: rpc-ctx and message (::String). Return value ignored.
  • :notification-error-handler - Two arguments: rpc-ctx and thrown Exception object. Return value ignored.
  • :protocol-keyword - Affects the name of the keyword used in BSON message documents. Defaults to :bsonrpc if option is omitted. Having value "2.0" Rationale: BSON-RPC is derived from and closely matches JSON-RPC 2.0. (Support for Version 1.0 of the protocol not planned.)
  • :server - A java.io.Closeable object. Give if your handlers need the ability to close the server object.

    Returns rpc-ctx to be used with request! and notify!.

(defn connect-bson-rpc!
  ([s] (connect-rpc! s :bson {} {} default-options))
  ([s options] (connect-rpc! s :bson {} {} options))
  ([s request-handlers notification-handlers]
   (connect-rpc! s :bson request-handlers notification-handlers default-options))
  ([s request-handlers notification-handlers options]
   (connect-rpc! s :bson request-handlers notification-handlers options)))

Connect rpc services and create a context for sending JSON-RPC 2.0 requests and notifications to the RPC Peer Node over TCP connection.

  • s - Manifold duplex stream connected to the rpc peer node. (e.g. from aleph.tcp/start-server to the handler or from aleph.tcp/client)
  • request-handlers

    • A Map of request handlers: {::String/Keyword ::Function}. These functions are exposed to be callable by the rpc peer node. Function return values are sent back to peer node and any thrown errors are sent as error responses to the peer node.
    • Alternatively this parameter accepts a function which takes rpc-ctx and returns an above-mentioned Map of request handlers. Necessary if any request handler needs to send notifications to peer during the processing of request. example function:

      ``` (defn generate-request-handlers [rpc-ctx] {:quick-task quick-task :long-process (partial long-process rpc-ctx)}) ``` where the long-process will thus have the rpc-ctx and will be able to call (notify! rpc-ctx :report-progress details) or even (request! rpc-ctx ...)

  • notification-handlers
    • A Map of notification handlers: {::String/Keyword ::Function}. These functions will receive the named notifications sent by the peer node. Any errors thrown by these handlers will be delegated to a callback defined by (:notification-error-handler options)
    • Alternatively can be a function which takes rpc-ctx and returns a Map of handlers.
  • options - A Map of optional arguments. default-options are used as a baseline of which any or all values can be overridden with ones provided by options.

    Valid keys for options:

  • :async-notification-handling - Boolean for async handling of notifications.

    • false -> Handler functions are guaranteed to be called in the message receiving order. Next incoming message can't be processed until the handler function returns.
    • true -> Handlers executed in go-blocks -> random order.
  • :async-request-handling - Boolean for async handling of requests. Async handling allows multiple requests to processed in parallel (if client so wishes). Note that client can enforce synchronous processing simply by waiting the answer to previous request before calling new request.
    • Dispatching of responses is synchronous regardless of this setting. If :async-notification-handling was set to false then all notifications possibly sent by a (peer node) response handler will be processed by the time the response is returned.
  • :connection-closed-handler - Is called when peer closes the connection. One argument: rpc-ctx. Return value ignored.
  • :connection-id - ID to use in server logging to identify current connection.
  • :id-generator - Is called when a new ID for outgoing rpc request is needed. No arguments. Must return a string or integer which should be unique over the duration of the connection.
  • :idle-timeout - Timeout in milliseconds. idle-timeout-handler will be triggered if timeout is enabled and nothing has been received from peer node within idle-timeout. Disable by setting to nil.
  • :idle-timeout-handler - One argument: rpc-ctx. Return value ignored.
  • :invalid-id-response-handler - Two arguments: rpc-ctx and message. Return value ignored. Used if peer sends a response in which ID does not match with any sent requests waiting for a response.
  • :json-framing - One of the following keywords:
    • :none - http://www.simple-is-better.org/json-rpc/transport_sockets.html
    • :rfc-7464 - https://tools.ietf.org/html/rfc7464
  • :json-key-fn - JSON Object keys decode converter. Provide custom converter, otherwise by default clojure.core/keyword is used. Use clojure.core/identity to keep keys as strings. Encoding keywords are always converted to strings.
  • :max-len - Incoming message max length. This option is ignored if :json-framing is :none
  • :nil-id-error-handler - Is called when an error response with id: null is received from the peer node. (Normal error responses are marshalled to throw errors within request! calls.) Two arguments: rpc-ctx and message (::String). Return value ignored.
  • :notification-error-handler - Two arguments: rpc-ctx and thrown Exception object. Return value ignored.
  • :protocol-keyword - Affects the name of the keyword used in JSON message documents. Defaults to :jsonrpc if option is omitted. Having value "2.0" as is required by the JSON-RPC 2.0 Specification. (Support for Version 1.0 of the protocol not planned.)
  • :server - A java.io.Closeable object. Give if your handlers need the ability to close the server object.

    Returns rpc-ctx to be used with request! and notify!.

(defn connect-json-rpc!
  ([s] (connect-rpc! s :json {} {} default-options))
  ([s options] (connect-rpc! s :json {} {} options))
  ([s request-handlers notification-handlers]
   (connect-rpc! s :json request-handlers notification-handlers default-options))
  ([s request-handlers notification-handlers options]
   (connect-rpc! s :json request-handlers notification-handlers options)))

RPC Request in a clojure.core.async go-block.

  • rpc-ctx - The Context from connect-rpc!.
  • method - Remote method name - a keyword or string.
  • params - Parameters for the remote method.

    Returns a channel which will receive:

  • The Result message from the RPC Peer Node or

  • :closed or
  • :send-failure
(defn async-request!
  [rpc-ctx method & params]
  (go
    (let [id ((:id-generator rpc-ctx))
          request {(:protocol-keyword rpc-ctx) rpc-version
                   :id id
                   :method (name method)
                   :params params}
          response-timeout (:response-timeout rpc-ctx)
          response-channels (:response-channels rpc-ctx)
          channel (stream/stream)]
      (swap! response-channels (fn [m] (assoc m id (stream/->sink channel))))
      (let [success @(stream/put! (:socket rpc-ctx) request)
            result (if success
                     (if (nil? response-timeout)
                       @(stream/take! (stream/->source channel))
                       @(stream/try-take! (stream/->source channel) nil response-timeout :timeout))
                     (if (stream/closed? (:socket rpc-ctx)) :closed :send-failure))]
        (swap! response-channels (fn [m] (dissoc m id)))
        result))))

RPC Request in a clojure.core.async go-block.

  • timeout - Milliseconds to wait for remote method return value.
  • Otherwise identical to async-request!

    Returns a channel which will receive results identical to async-request! or possibly the value :timeout if waiting of result timed out.

(defn async-request-with-timeout!
  [rpc-ctx timeout method & params]
  (apply async-request! (assoc rpc-ctx :response-timeout timeout) method params))

RPC Request to the peer node. Waits for a response indefinitely.

  • rpc-ctx - Context returned by connect-rpc!.
  • method - Remote method name - a keyword or string.
  • params - Parameters for the remote method.

    Returns: The return value from the remote method or Throws:

  • clojure.lang.ExceptionInfo with ex-data mappings:

    • {:type :rpc-peer :code :details } on peer node errors.
    • {:type :rpc-connection-closed} If either this node or peer node closed the connection.
    • {:type :rpc-buffer-overflow} Send buffer full.
(defn request!
  [rpc-ctx method & params]
  (let [response (<!! (apply async-request! rpc-ctx method params))
        error->ex-info (fn [e]
                         (let [code (get-in e [:error :code])
                               message (get-in e [:error :message])
                               data (get-in e [:error :data])]
                           (ex-info message {:type :rpc-peer :code code :details data})))]
    (cond
      (= response :timeout) (throw (ex-info "Timeout" {:type :rpc-response-timeout}))
      (= response :closed) (throw (ex-info "Connection closed" {:type :rpc-connection-closed}))
      (= response :send-failure) (throw (ex-info "Buffer overflow" {:type :rpc-buffer-overflow}))
      (contains? response :result) (:result response)
      (contains? response :error) (throw (error->ex-info response))
      :else (throw (ex-info "Unknown" {:type :rpc-unknown})))))

RPC Request to the peer node. Waits for the response for up to the timeout length of time.

  • timeout - Milliseconds to wait for remote method return value.
  • Otherwise identical to request!

    Returns: The return value from the remote method or Throws: Identically to request! or {:type :rpc-response-timeout} when timeouted.

(defn request-with-timeout!
  [rpc-ctx timeout method & params]
  (let [rpc-ctx (assoc rpc-ctx :response-timeout timeout)]
    (apply request! rpc-ctx method params)))

Send RPC Notification to peer. Return boolean success value.

  • rpc-ctx - Context returned by connect-rpc!
  • method - Remote notification handler name - a keyword or string.
  • params - Parameters for the notification handler.
(defn notify!
  [rpc-ctx method & params]
  (let [notification {(:protocol-keyword rpc-ctx) rpc-version
                      :method (name method)
                      :params params}]
    @(stream/put! (:socket rpc-ctx) notification)))

Utility for closing the connection from outside of request handler. Use close-connection! within response handler.

(defn close!
  [rpc-ctx]
  (stream/close! (:socket rpc-ctx)))
 

JSON encoding and decoding.

(ns clj-bson-rpc.json
  (:require
    [clj-bson-rpc.bytes :refer [bbuf-concat bbuf-split-after bbuf-split-before]]
    [clojure.core.async :refer [go]]
    [clojure.data.json :as json]
    [manifold.deferred :refer [success-deferred]]
    [manifold.stream :as stream]))

Utility for providing stream reader access for stream parser Throws IOException's

(defn- new-writer-reader-pipe
  []
  (let [op (java.io.PipedOutputStream.)
        ip (java.io.PipedInputStream. op)
        reader (->> (java.nio.charset.Charset/forName "UTF8")
                    (java.io.InputStreamReader. ip)
                    (java.io.BufferedReader.))]
    [op reader]))

Decode JSON messages from manifold stream s without framing. For details visit: http://www.simple-is-better.org/json-rpc/transport_sockets.html #pipelined-requests-responses-json-splitter

(defn- frameless-decoder
  [s key-fn]
  (let [source (stream/->source s)
        sink (stream/stream)
        [p-sink reader] (new-writer-reader-pipe)
        decoder (fn []
                  (go
                    (let [continue (atom true)]
                      (try
                        (while @continue
                          (if-let [msg (try
                                         (json/read reader :eof-error? false :key-fn key-fn)
                                         (catch java.io.EOFException e
                                           (ex-info (str e) {:type :trailing-garbage}))
                                         (catch Exception e
                                           (ex-info (str e) {:type :invalid-json})))]
                            (stream/put! sink msg)
                            (reset! continue false)))
                        (catch Exception e)))
                    (stream/close! sink)))
        push-bytes (fn [new-bytes]
                     (try
                       (.write p-sink new-bytes 0 (count new-bytes))
                       (success-deferred true)
                       (catch Exception e
                         (.close p-sink)
                         (success-deferred false))))]
    (decoder)
    (stream/connect-via source push-bytes sink {:downstream? false})
    (stream/on-drained source #(.close p-sink))
    sink))

Decode JSON messages from manifold stream s using rfc-7464 framing. Framing: 0x1E + message + 0x0A (=newline) For details visit: https://tools.ietf.org/html/rfc7464

(defn- rfc-7464-decoder
  [s key-fn max-len]
  (let [source (stream/->source s)
        sink (stream/stream)
        err-frame (fn [garbage]
                    (ex-info "Framing error, skipping out-of-frame bytes."
                             {:type :invalid-framing :bytes garbage}))
        err-long (fn [mbytes mlen]
                   (ex-info (str "Message length exceeds the maximum allowed. "
                                 "Max: " max-len ", Message len: " mlen)
                            {:type :exceeds-max-length :bytes mbytes}))
        find-frames (fn [bbuff]
                      (loop [frames [] remainder bbuff]
                        (if (and (some #{0x1e} remainder)
                                 (some #{0x0a} remainder))
                          (if (not= (first remainder) 0x1e)
                            (let [[garbage remainder] (bbuf-split-before 0x1e remainder)]
                              (recur (conj frames (err-frame garbage)) remainder))
                            (let [[frame-bytes remainder] (bbuf-split-after 0x0a remainder)
                                  msg-len (- (count frame-bytes) 2)]
                              (if (> msg-len max-len)
                                (recur (conj frames (err-long frame-bytes msg-len)) remainder)
                                (recur (conj frames (byte-array (butlast (rest frame-bytes)))) remainder))))
                          [frames remainder])))
        decode-frame (fn [frame]
                       (if (instance? clojure.lang.ExceptionInfo frame)
                         frame
                         (try
                           (json/read-str (String. frame "UTF-8") :key-fn key-fn)
                           (catch Exception e
                             (ex-info (str e) {:type :invalid-json :bytes frame})))))
        buffer (atom (byte-array []))
        decoder (fn [new-bytes]
                  (let [[frames remainder] (find-frames (bbuf-concat @buffer new-bytes))
                        trail (if (and (stream/drained? source) (seq remainder))
                                [(ex-info "Non-decodable trailing bytes when stream was closed."
                                          {:type :trailing-garbage :bytes remainder})]
                                [])]
                    (reset! buffer remainder)
                    (->> (into frames trail)
                         (mapv decode-frame)
                         (stream/put-all! sink))))]
    (stream/connect-via source decoder sink {:downstream? false})
    (stream/on-drained source #(do (decoder []) (stream/close! sink)))
    sink))

JSON codec for a manifold duplex stream.

  • s - Duplex stream of raw bytes (possibly from aleph)
  • :json-framing - One of following keywords:
    • :none
    • :rfc-7464 Defaults to: :none NOTE: Framing :none cannot reliably recover from decoding errors -> user should close the stream and not attempt to take! further items. (This does not apply to :rfc-7464 framing which recovers automatically.)
  • :json-key-fn - Map keys converter function. Default: clojure.core/keyword
  • :max-len - Optional argument for limiting incoming message maximum size. Works only with :rfc-7464

    Returns a duplex stream which:

    • As a sink takes clojure maps which are encoded and sent to the byte stream s
    • As a source produces decoded (clojurized) JSON messages from s

    Any non-decodable byte sequences from the raw stream are converted to clojure.lang.ExceptionInfo objects which are takeable from the returned duplex stream. Thus take! from this source will either:

    • Block waiting a new message.
    • Return a JSON message.
    • Return a clojure.lang.ExceptionInfo object.
      • Message framing error -> (:type (ex-data error)) is :invalid-framing
      • Message decoding error -> type is :invalid-json
      • Non-decodable trailing bytes when stream closed-> :type :trailing-garbage
    • Return nil if underlying stream s is closed and source is drained.
(defn json-codec
  [s & {:keys [json-framing json-key-fn max-len]
        :or {json-framing :none
             json-key-fn keyword
             max-len (Integer/MAX_VALUE)}}]
  (let [out (stream/stream)
        encode (fn [m]
                 (.getBytes
                   (json/write-str m)
                   "UTF-8"))
        encoder (case json-framing
                  :none encode
                  :rfc-7464 (fn [m] (byte-array (concat [0x1e] (encode m) [0x0a])))
                  (throw (ex-info "Unsupported framing.")))
        stream-decoder (case json-framing
                         :none     (fn [s] (frameless-decoder s json-key-fn))
                         :rfc-7464 (fn [s] (rfc-7464-decoder s json-key-fn max-len))
                         (throw (ex-info "Unsupported framing.")))]
    (stream/connect (stream/map encoder out) s)
    (stream/splice out (stream-decoder s))))