dependencies
| (this space intentionally left almost blank) | ||||||||||||||||||
BSON-RPC Connection over manifold duplex stream. Practical applications include (but limited to) using aleph to establish TCP connection (+ TLS) between the client and the server. After an RPC connection has been set on both sides, the RPC nodes are principally equal. The TCP client may provide services which the TCP server then uses and vice versa. | (ns clj-bson-rpc.rpc) | ||||||||||||||||||
Constants and Helpers | |||||||||||||||||||
(def rpc-error {:parse-error {:code -32700 :message "Parse error"} :invalid-request {:code -32600 :message "Invalid Request"} :method-not-found {:code -32601 :message "Method not found"} :invalid-params {:code -32602 :message "Invalid params"} :internal-error {:code -32603 :message "Internal error"} :server-error {:code -32000 :message "Server error"}}) | |||||||||||||||||||
(def rpc-version "2.0") | |||||||||||||||||||
(defonce identifier-counter (atom 1)) (defn default-id-generator [] (str "id-" (swap! identifier-counter inc))) | |||||||||||||||||||
Message Type Identifiers | |||||||||||||||||||
(defn request? [{:keys [protocol-keyword]} msg] (and (= (get msg protocol-keyword) rpc-version) (string? (:method msg)) (contains? msg :id) (or (string? (:id msg)) (integer? (:id msg)) (nil? (:id msg))))) | |||||||||||||||||||
(defn notification? [{:keys [protocol-keyword]} msg] (and (= (get msg protocol-keyword) rpc-version) (string? (:method msg)) (not (contains? msg :id)))) | |||||||||||||||||||
(defn response? [{:keys [protocol-keyword]} msg] (let [result-and-no-error (and (contains? msg :result) (not (contains? msg :error))) error-and-no-result (and (not (contains? msg :result)) (integer? (get-in msg [:error :code])) (string? (get-in msg [:error :message])))] (and (= (get msg protocol-keyword) rpc-version) (contains? msg :id) (or (string? (:id msg)) (integer? (:id msg))) (or result-and-no-error error-and-no-result)))) | |||||||||||||||||||
(defn nil-id-error-response? [{:keys [protocol-keyword]} msg] (let [error-and-no-result (and (not (contains? msg :result)) (integer? (get-in msg [:error :code])) (string? (get-in msg [:error :message])))] (and (= (get msg protocol-keyword) rpc-version) (contains? msg :id) (nil? (:id msg)) error-and-no-result))) | |||||||||||||||||||
Inbound Requests, Notifications | |||||||||||||||||||
(defn handle-request [protocol-keyword request-handlers msg] (let [method (name (:method msg)) params (:params msg) id (:id msg) ok-response (fn [result] {protocol-keyword rpc-version :result result :id id}) invalid-params (fn [e-msg] {protocol-keyword rpc-version :error (assoc (:invalid-params rpc-error) :data e-msg) :id id}) method-not-found (fn [] {protocol-keyword rpc-version :error (:method-not-found rpc-error) :id id})] (if (contains? request-handlers method) (try (ok-response (apply (get request-handlers method) params)) (catch clojure.lang.ArityException e (invalid-params (str e)))) (method-not-found)))) | |||||||||||||||||||
(defn handle-notification [notification-error-handler notification-handlers msg] (let [method (name (:method msg)) params (:params msg)] (if (contains? notification-handlers method) (apply (get notification-handlers method) params) (notification-error-handler (ex-info (str "No handler found for notification:" method) {:type :notification-handler-not-found :method method}))))) | |||||||||||||||||||
Utilities for binary/byte-buffer manipulations. | (ns clj-bson-rpc.bytes (:import [java.nio ByteBuffer ByteOrder])) | ||||||||||||||||||
Read int32-le from byte-array | (defn peek-int32-le [bbuf] (-> (ByteBuffer/wrap bbuf 0 4) (.order ByteOrder/LITTLE_ENDIAN) (.getInt) (long))) | ||||||||||||||||||
Concatenate byte buffers. | (defn bbuf-concat [a b] (if (empty? a) b (byte-array (concat a b)))) | ||||||||||||||||||
(defn bbuf-split-at [n bbuf] (mapv byte-array (split-at n bbuf))) | |||||||||||||||||||
(defn bbuf-split-before [bt bbuf] (mapv byte-array (split-with (fn [b] (not= b bt)) bbuf))) | |||||||||||||||||||
(defn bbuf-split-after [bt bbuf] (let [[fst snd] (split-with (fn [b] (not= b bt)) bbuf)] [(byte-array (concat fst [(first snd)])) (byte-array (rest snd))])) | |||||||||||||||||||
BSON encoding and decoding. Utilizes mongodb codecs for clojure <-> bson conversions. | (ns clj-bson-rpc.bson (:import [org.bson BasicBSONDecoder BasicBSONEncoder]) (:require [clj-bson-rpc.bytes :refer [bbuf-concat bbuf-split-at peek-int32-le]] [manifold.stream :as stream] [somnium.congomongo.coerce :refer [coerce]])) | ||||||||||||||||||
Encode message to BSON byte-array | (defn encode [m] (.encode (BasicBSONEncoder.) (coerce m [:clojure :mongo]))) | ||||||||||||||||||
Decode BSON byte-array to message | (defn decode [b] (coerce (.readObject (BasicBSONDecoder.) b) [:mongo :clojure])) | ||||||||||||||||||
Lift BSON message(s) from source stream | (defn- stream-decoder [s max-len] (let [source (stream/->source s) sink (stream/stream) find-frames (fn [bbuff] (loop [frames [] remainder bbuff] (let [blen (count remainder) rlen (when (>= blen 4) (peek-int32-le remainder)) err-long #(ex-info (str "Message length exceeds the maximum allowed. " "Max: " max-len ", Message len: " rlen) {:type :exceeds-max-length :bytes remainder}) err-frame #(ex-info "Message framing error." {:type :invalid-framing :bytes remainder})] (cond (nil? rlen) [frames remainder] (> rlen max-len) [(conj frames (err-long)) remainder] (< rlen 5) [(conj frames (err-frame)) remainder] (>= blen rlen) (let [[frame remainder] (bbuf-split-at rlen remainder)] (recur (conj frames frame) remainder)) :else [frames remainder])))) decode-frame (fn [frame] (if (instance? clojure.lang.ExceptionInfo frame) frame (try (decode frame) (catch Exception e (ex-info (str e) {:type :invalid-bson :bytes frame}))))) buffer (atom (byte-array [])) decoder (fn [new-bytes] (let [[frames remainder] (find-frames (bbuf-concat @buffer new-bytes)) trail (if (and (stream/drained? source) (seq remainder)) [(ex-info "Non-decodable trailing bytes when stream was closed." {:type :trailing-garbage :bytes remainder})] [])] (reset! buffer remainder) (->> (into frames trail) (mapv decode-frame) (stream/put-all! sink))))] (stream/connect-via source decoder sink {:downstream? false}) (stream/on-drained source #(do (decoder []) (stream/close! sink))) sink)) | ||||||||||||||||||
BSON codec for a manifold duplex stream.
| (defn bson-codec [s & {:keys [max-len] :or {max-len (Integer/MAX_VALUE)}}] (let [out (stream/stream)] (stream/connect (stream/map encode out) s) (stream/splice out (stream-decoder s max-len)))) | ||||||||||||||||||
JSON-RPC 2.0 and BSON-RPC Connections over manifold duplex stream. Practical applications include using aleph to establish TCP connection (+ TLS) between the client and the server. After an RPC connection has been set on both sides, the RPC nodes are principally equal. The TCP client may provide services which the TCP server then uses and vice versa. | (ns clj-bson-rpc.core (:require [clj-bson-rpc.bson :refer [bson-codec]] [clj-bson-rpc.json :refer [json-codec]] [clj-bson-rpc.rpc :as rpc :refer [rpc-error rpc-version]] [clojure.core.async :refer [<!! go]] [manifold.stream :as stream] [taoensso.timbre :as log])) | ||||||||||||||||||
Default Handlers | |||||||||||||||||||
(defn- default-timeout-handler [{:keys [close! connection-id]}] (close!) (log/error connection-id "- Connection closed due to idle timeout.")) | |||||||||||||||||||
(defn- default-close-handler [{:keys [close! connection-id]}] (close!) (log/trace connection-id "- Connection closed by peer.")) | |||||||||||||||||||
(defn- default-nil-id-error-handler [rpc-ctx msg] (log/error (:connection-id rpc-ctx) "- Error response from peer:" (:error msg))) | |||||||||||||||||||
(defn- default-notification-error-handler [rpc-ctx e] (log/error (:connection-id rpc-ctx) "- Notification handler failed:" e)) | |||||||||||||||||||
(defn- default-invalid-id-response-handler [rpc-ctx msg] (log/error (:connection-id rpc-ctx) "- Response handling failed, id:" (:id msg) "is not valid. Msg:" msg)) | |||||||||||||||||||
Inbound Requests, Notifications and Responses | |||||||||||||||||||
(defn- handle-request [{:keys [close! close-server! connection-id protocol-keyword request-handlers socket]} msg] (log/trace connection-id "- Received request:" msg) (let [post-action (atom nil) ok-response (fn [result] {protocol-keyword rpc-version :result result :id (:id msg)}) server-error (fn [e-msg] {protocol-keyword rpc-version :error (assoc (:server-error rpc-error) :data e-msg) :id (:id msg)}) response (try (rpc/handle-request protocol-keyword request-handlers msg) (catch clojure.lang.ExceptionInfo e (if (= (:type (ex-data e)) :rpc-control) (do (reset! post-action (:action (ex-data e))) (ok-response (:response (ex-data e)))) (server-error (str e)))) (catch Exception e (server-error (str e))))] (if @(stream/put! socket response) (log/trace connection-id "- Sent response:" response) (do (close!) (log/error connection-id "- Failed to send a response to a request." "Services and socket closed. (failed response: " response ")"))) (case @post-action :close (close!) :close-server (close-server!) :close-all (do (close!) (close-server!)) nil))) | |||||||||||||||||||
(defn- handle-notification [{:keys [close! close-server! connection-id notification-error-handler notification-handlers] :as rpc-ctx} msg] (log/trace connection-id "- Received notification:" msg) (let [post-action (atom nil)] (try (rpc/handle-notification (partial notification-error-handler rpc-ctx) notification-handlers msg) (catch clojure.lang.ExceptionInfo e (if (= (:type (ex-data e)) :rpc-control) (reset! post-action (:action (ex-data e))) (notification-error-handler rpc-ctx e))) (catch Exception e (notification-error-handler rpc-ctx e))) (case @post-action :close (close!) :close-server (close-server!) :close-all (do (close!) (close-server!)) nil))) | |||||||||||||||||||
(defn- handle-response [{:keys [connection-id invalid-id-response-handler response-channels] :as rpc-ctx} msg] (log/trace connection-id "- Received response:" msg) (let [id (:id msg) channel (atom nil)] (swap! response-channels (fn [m] (reset! channel (get m id)) (dissoc m id))) (if @channel (if (not @(stream/put! @channel msg)) (log/error connection-id "- Sending response failed." msg)) (invalid-id-response-handler rpc-ctx msg)))) | |||||||||||||||||||
Inbound Garbage | |||||||||||||||||||
(defn- handle-parse-error [{:keys [close! connection-id protocol-keyword socket] :as rpc-ctx} msg] (go (let [try-send-error! (fn [] (try (if (not @(stream/put! socket {protocol-keyword rpc-version :error (assoc (:parse-error rpc-error) :data msg) :id nil})) (log/error connection-id "- Failed to inform peer about parse error:" msg)) (catch Exception e (log/error connection-id "- Failed to inform peer about parse error:" e)))) irrecoverable-error (fn [] (try-send-error!) (close!) (log/error connection-id "- Irrecoverable parsing error." "Services and socket closed." msg)) transient-error (fn [] (try-send-error!) (log/error connection-id "-" msg))] (case (:type (ex-data msg)) :exceeds-max-length (irrecoverable-error) :invalid-framing (irrecoverable-error) :invalid-json (if (= (:json-framing rpc-ctx) :none) (irrecoverable-error) (transient-error)) :invalid-bson (transient-error) :trailing-garbage (log/warn connection-id "-" msg) (log/error connection-id "- Unexpected parse error:" msg))))) | |||||||||||||||||||
(defn- handle-schema-error [{:keys [connection-id protocol-keyword socket]} msg] (go (let [id (:id msg)] (if (contains? msg :method) (stream/put! socket {protocol-keyword rpc-version :error (assoc (:invalid-request rpc-error) :data msg) :id id})) ;; effort made - do not care if socket errors (log/error connection-id "- Invalid request:" msg)))) | |||||||||||||||||||
Service Dispatcher | |||||||||||||||||||
Dispatch Incoming Requests, Notifications and Responses to wrapped handlers. | (defn- run-rpc-services [{:keys [async-notification-handling async-request-handling connection-closed-handler connection-id idle-timeout idle-timeout-handler nil-id-error-handler response-channels run-services socket] :as rpc-ctx}] (let [take! (if idle-timeout (fn [s] (stream/try-take! s nil idle-timeout :timeout)) (fn [s] (stream/take! s)))] (go (log/trace connection-id "- Start RPC message dispatcher.") (while @run-services (let [msg @(take! socket)] (log/trace connection-id "- Dispatch message.") (cond (nil? msg) (connection-closed-handler rpc-ctx) (= msg :timeout) (idle-timeout-handler rpc-ctx) (instance? clojure.lang.ExceptionInfo msg) (handle-parse-error rpc-ctx msg) (rpc/request? rpc-ctx msg) (if async-request-handling (go (handle-request rpc-ctx msg)) (handle-request rpc-ctx msg)) (rpc/notification? rpc-ctx msg) (if async-notification-handling (go (handle-notification rpc-ctx msg)) (handle-notification rpc-ctx msg)) (rpc/response? rpc-ctx msg) (handle-response rpc-ctx msg) (rpc/nil-id-error-response? rpc-ctx msg) (nil-id-error-handler rpc-ctx msg) :else (handle-schema-error rpc-ctx msg)))) (doseq [chn (vals @response-channels)] ; Signal to waiters that connection is closed (stream/put! chn :closed)) (log/trace connection-id "- Exit RPC message dispatcher.")))) | ||||||||||||||||||
Public Interface | |||||||||||||||||||
Defaults:
| (def default-options {:async-notification-handling false :async-request-handling true :connection-closed-handler default-close-handler :connection-id nil :id-generator rpc/default-id-generator :idle-timeout nil :idle-timeout-handler default-timeout-handler :invalid-id-response-handler default-invalid-id-response-handler :json-framing :none :json-key-fn keyword :max-len (Integer/MAX_VALUE) :nil-id-error-handler default-nil-id-error-handler :notification-error-handler default-notification-error-handler :server nil}) | ||||||||||||||||||
Call this function within your request or notification handler in order
to disconnect current tcp connection.
| (defn close-connection! ([] (close-connection! nil)) ([response] (throw (ex-info "" {:type :rpc-control :action :close :response response})))) | ||||||||||||||||||
Call this function within your request or notification handler in order
to close server socket. Does not disconnect your current connection.
| (defn close-server! ([] (close-server! nil)) ([response] (throw (ex-info "" {:type :rpc-control :action :close-server :response response})))) | ||||||||||||||||||
Call this function within your request or notification handler in order
to close both current connection and the server socket.
| (defn close-connection-and-server! ([] (close-connection-and-server! nil)) ([response] (throw (ex-info "" {:type :rpc-control :action :close-all :response response})))) | ||||||||||||||||||
Connect BSON/JSON RPC Services.
See: | (defn- connect-rpc! [s codec request-handlers notification-handlers options] (let [xson-stream (case codec :bson (bson-codec s :max-len (get options :max-len (Integer/MAX_VALUE))) :json (json-codec s :json-framing (get options :json-framing :none) :json-key-fn (get options :json-key-fn keyword) :max-len (get options :max-len (Integer/MAX_VALUE))) (throw (ex-info "Invalid codec."))) default-protocol-keyword (keyword (str (name codec) "rpc")) run-services (atom true) response-channels (atom {}) rpc-ctx (into (into default-options options) {:close! #(do (stream/close! xson-stream) (reset! run-services false)) :close-server! #(if (:server options) (.close (:server options))) :connection-id (if (nil? (:connection-id options)) (str (swap! rpc/identifier-counter inc)) (:connection-id options)) :protocol-keyword (get options :protocol-keyword default-protocol-keyword) :response-channels response-channels :run-services run-services :socket xson-stream})] (let [keys->strings (fn [m] (into {} (mapv (fn [[k v]] [(name k) v]) m))) notification-handlers (keys->strings (if (fn? notification-handlers) (notification-handlers rpc-ctx) notification-handlers)) request-handlers (keys->strings (if (fn? request-handlers) (request-handlers rpc-ctx) request-handlers)) rpc-ctx (assoc rpc-ctx :notification-handlers notification-handlers :request-handlers request-handlers)] (run-rpc-services rpc-ctx) rpc-ctx))) | ||||||||||||||||||
Connect rpc services and create a context for sending BSON-RPC (2.0) requests and notifications to the RPC Peer Node over TCP connection.
| (defn connect-bson-rpc! ([s] (connect-rpc! s :bson {} {} default-options)) ([s options] (connect-rpc! s :bson {} {} options)) ([s request-handlers notification-handlers] (connect-rpc! s :bson request-handlers notification-handlers default-options)) ([s request-handlers notification-handlers options] (connect-rpc! s :bson request-handlers notification-handlers options))) | ||||||||||||||||||
Connect rpc services and create a context for sending JSON-RPC 2.0 requests and notifications to the RPC Peer Node over TCP connection.
| (defn connect-json-rpc! ([s] (connect-rpc! s :json {} {} default-options)) ([s options] (connect-rpc! s :json {} {} options)) ([s request-handlers notification-handlers] (connect-rpc! s :json request-handlers notification-handlers default-options)) ([s request-handlers notification-handlers options] (connect-rpc! s :json request-handlers notification-handlers options))) | ||||||||||||||||||
RPC Request in a
| (defn async-request! [rpc-ctx method & params] (go (let [id ((:id-generator rpc-ctx)) request {(:protocol-keyword rpc-ctx) rpc-version :id id :method (name method) :params params} response-timeout (:response-timeout rpc-ctx) response-channels (:response-channels rpc-ctx) channel (stream/stream)] (swap! response-channels (fn [m] (assoc m id (stream/->sink channel)))) (let [success @(stream/put! (:socket rpc-ctx) request) result (if success (if (nil? response-timeout) @(stream/take! (stream/->source channel)) @(stream/try-take! (stream/->source channel) nil response-timeout :timeout)) (if (stream/closed? (:socket rpc-ctx)) :closed :send-failure))] (swap! response-channels (fn [m] (dissoc m id))) result)))) | ||||||||||||||||||
RPC Request in a
| (defn async-request-with-timeout! [rpc-ctx timeout method & params] (apply async-request! (assoc rpc-ctx :response-timeout timeout) method params)) | ||||||||||||||||||
RPC Request to the peer node. Waits for a response indefinitely.
| (defn request! [rpc-ctx method & params] (let [response (<!! (apply async-request! rpc-ctx method params)) error->ex-info (fn [e] (let [code (get-in e [:error :code]) message (get-in e [:error :message]) data (get-in e [:error :data])] (ex-info message {:type :rpc-peer :code code :details data})))] (cond (= response :timeout) (throw (ex-info "Timeout" {:type :rpc-response-timeout})) (= response :closed) (throw (ex-info "Connection closed" {:type :rpc-connection-closed})) (= response :send-failure) (throw (ex-info "Buffer overflow" {:type :rpc-buffer-overflow})) (contains? response :result) (:result response) (contains? response :error) (throw (error->ex-info response)) :else (throw (ex-info "Unknown" {:type :rpc-unknown}))))) | ||||||||||||||||||
RPC Request to the peer node. Waits for the response for up to the timeout length of time.
| (defn request-with-timeout! [rpc-ctx timeout method & params] (let [rpc-ctx (assoc rpc-ctx :response-timeout timeout)] (apply request! rpc-ctx method params))) | ||||||||||||||||||
Send RPC Notification to peer. Return boolean success value.
| (defn notify! [rpc-ctx method & params] (let [notification {(:protocol-keyword rpc-ctx) rpc-version :method (name method) :params params}] @(stream/put! (:socket rpc-ctx) notification))) | ||||||||||||||||||
Utility for closing the connection from outside of request handler.
Use | (defn close! [rpc-ctx] (stream/close! (:socket rpc-ctx))) | ||||||||||||||||||
JSON encoding and decoding. | (ns clj-bson-rpc.json (:require [clj-bson-rpc.bytes :refer [bbuf-concat bbuf-split-after bbuf-split-before]] [clojure.core.async :refer [go]] [clojure.data.json :as json] [manifold.deferred :refer [success-deferred]] [manifold.stream :as stream])) | ||||||||||||||||||
Utility for providing stream reader access for stream parser Throws IOException's | (defn- new-writer-reader-pipe [] (let [op (java.io.PipedOutputStream.) ip (java.io.PipedInputStream. op) reader (->> (java.nio.charset.Charset/forName "UTF8") (java.io.InputStreamReader. ip) (java.io.BufferedReader.))] [op reader])) | ||||||||||||||||||
Decode JSON messages from manifold stream | (defn- frameless-decoder [s key-fn] (let [source (stream/->source s) sink (stream/stream) [p-sink reader] (new-writer-reader-pipe) decoder (fn [] (go (let [continue (atom true)] (try (while @continue (if-let [msg (try (json/read reader :eof-error? false :key-fn key-fn) (catch java.io.EOFException e (ex-info (str e) {:type :trailing-garbage})) (catch Exception e (ex-info (str e) {:type :invalid-json})))] (stream/put! sink msg) (reset! continue false))) (catch Exception e))) (stream/close! sink))) push-bytes (fn [new-bytes] (try (.write p-sink new-bytes 0 (count new-bytes)) (success-deferred true) (catch Exception e (.close p-sink) (success-deferred false))))] (decoder) (stream/connect-via source push-bytes sink {:downstream? false}) (stream/on-drained source #(.close p-sink)) sink)) | ||||||||||||||||||
Decode JSON messages from manifold stream | (defn- rfc-7464-decoder [s key-fn max-len] (let [source (stream/->source s) sink (stream/stream) err-frame (fn [garbage] (ex-info "Framing error, skipping out-of-frame bytes." {:type :invalid-framing :bytes garbage})) err-long (fn [mbytes mlen] (ex-info (str "Message length exceeds the maximum allowed. " "Max: " max-len ", Message len: " mlen) {:type :exceeds-max-length :bytes mbytes})) find-frames (fn [bbuff] (loop [frames [] remainder bbuff] (if (and (some #{0x1e} remainder) (some #{0x0a} remainder)) (if (not= (first remainder) 0x1e) (let [[garbage remainder] (bbuf-split-before 0x1e remainder)] (recur (conj frames (err-frame garbage)) remainder)) (let [[frame-bytes remainder] (bbuf-split-after 0x0a remainder) msg-len (- (count frame-bytes) 2)] (if (> msg-len max-len) (recur (conj frames (err-long frame-bytes msg-len)) remainder) (recur (conj frames (byte-array (butlast (rest frame-bytes)))) remainder)))) [frames remainder]))) decode-frame (fn [frame] (if (instance? clojure.lang.ExceptionInfo frame) frame (try (json/read-str (String. frame "UTF-8") :key-fn key-fn) (catch Exception e (ex-info (str e) {:type :invalid-json :bytes frame}))))) buffer (atom (byte-array [])) decoder (fn [new-bytes] (let [[frames remainder] (find-frames (bbuf-concat @buffer new-bytes)) trail (if (and (stream/drained? source) (seq remainder)) [(ex-info "Non-decodable trailing bytes when stream was closed." {:type :trailing-garbage :bytes remainder})] [])] (reset! buffer remainder) (->> (into frames trail) (mapv decode-frame) (stream/put-all! sink))))] (stream/connect-via source decoder sink {:downstream? false}) (stream/on-drained source #(do (decoder []) (stream/close! sink))) sink)) | ||||||||||||||||||
JSON codec for a manifold duplex stream.
| (defn json-codec [s & {:keys [json-framing json-key-fn max-len] :or {json-framing :none json-key-fn keyword max-len (Integer/MAX_VALUE)}}] (let [out (stream/stream) encode (fn [m] (.getBytes (json/write-str m) "UTF-8")) encoder (case json-framing :none encode :rfc-7464 (fn [m] (byte-array (concat [0x1e] (encode m) [0x0a]))) (throw (ex-info "Unsupported framing."))) stream-decoder (case json-framing :none (fn [s] (frameless-decoder s json-key-fn)) :rfc-7464 (fn [s] (rfc-7464-decoder s json-key-fn max-len)) (throw (ex-info "Unsupported framing.")))] (stream/connect (stream/map encoder out) s) (stream/splice out (stream-decoder s)))) | ||||||||||||||||||