bsonrpc - JSON/BSON RPC

Library for JSON RPC 2.0 and BSON RPC

Install with pip3

And use:
import bsonrpc

JSONRpc Objects

class bsonrpc.JSONRpc(socket, services=None, **options)[source]

Bases: bsonrpc.rpc.RpcBase

JSON RPC Connector. Implements the JSON-RPC 2.0 specification.

Connects via socket to RPC peer node. Provides access to the services provided by the peer node. Optional services parameter will take an object of which methods are accessible to the peer node.

Various methods of JSON message framing are available for the stream transport.

__init__(socket, services=None, **options)[source]
Parameters:
  • socket (socket.socket) – Socket connected to the peer. (Anything behaving like a socket and implementing socket methods close, recv, sendall and shutdown is equally viable)
  • services (@service_class Class | None) – Object providing request handlers and notification handlers to be exposed to peer. See Providing Services for details.
  • options – Modify behavior by overriding the library defaults.

Available options:

framing_cls

Selection of framing method implementation. Either select one of the following:

  • bsonrpc.JSONFramingNetstring
  • bsonrpc.JSONFramingNone
  • bsonrpc.JSONFramingRFC7464 (Default)

Or provide your own implementation class for some other framing type. See bsonrpc.framing for details.

concurrent_notification_handling

Affects by which strategy each notification handler will be launched to handle each notification. See About Threading Model for more info. Choices:

  • None (Default)
  • bsonrpc.ThreadingModel.THREADS
  • bsonrpc.ThreadingModel.GEVENT
concurrent_request_handling

Affects by which strategy each request handler will be launched to handle each request. See About Threading Model for more info. Choices:

  • None,
  • bsonrpc.ThreadingModel.THREADS (Default)
  • bsonrpc.ThreadingModel.GEVENT
connection_id
Label to use in logs to identify current connection. Default: ‘’
id_generator
A generator which must yield a unique ID on each next()-call. Used for generating ID’s for request messages. Default: internal default generator yielding integers 1, 2, ...
no_arguments_presentation

When RPC method is to be sent without arguments the JSON RPC 2.0 specification specifies that the params-key in the message MAY be omitted. However because this is not a strict requirement, an empty JSON Array or empty JSON Object are equally valid presentations of an empty argument collection. Choices:

  • bsonrpc.NoArgumentsPresentation.OMIT (Default)
  • bsonrpc.NoArgumentsPresentation.EMPTY_ARRAY
  • bsonrpc.NoArgumentsPresentation.EMPTY_OBJECT

This option affects only the structure of sent JSON messages. All valid schematic variations for incoming messages are recognized correctly regardless of this setting.

threading_model

Affects the concurrency implementation of the internal dispatcher and message stream decoder. See About Threading Model for more info Choices:

  • bsonrpc.ThreadingModel.THREADS (Default)
  • bsonrpc.ThreadingModel.GEVENT
custom_codec_implementation
Is by default None in which case this library uses python default json library. Otherwise an alternative json codec implementation can be provided as an object that must have callable attributes dumps and loads with identical function signatures to those standard json library.

All options as well as any possible custom/extra options are available as attributes of the constructed class object.

batch_call(batch_calls, timeout=None)[source]
Parameters:
  • batch_calls (bsonrpc.BatchBuilder | list of 4-tuples) –

    Batch of requests/notifications to be executed on the peer node. Use BatchBuilder() and pass the object here as a parameter.

    Example:

    bb = BatchBuilder(['swapit', 'times'], ['logit'])
    bb.swapit('hello')    # request
    bb.times(3, 5)        # request
    bb.logit('world')     # notification
    results = jsonrpc.batch_call(bb, timeout=15.0)
    # results: ['olleh', 15]
    

    Note that BatchBuilder is used and behaves like the peer-proxy returned by .get_peer_proxy().

    Instead of BatchBuilder you may give a simple list argument which must be in the following format: [(“r”/”n”, “<method-name>”, args, kwargs), ...]

  • timeout (float | None) – Timeout in seconds for waiting results. Default: None
Returns:

  • list of results to requests, in order of original requests. Each result may be:
    • a single return value or
    • a tuple of return values or
    • an Exception object
  • None if batch_calls contained only notifications.

Raises:

ResponseTimeout in case batch_calls contains requests, for which response batch did not arrive within timeout.

close()

Close the connection and stop the internal dispatcher.

framing_cls

Default choice for JSON Framing

alias of JSONFramingRFC7464

get_peer_proxy(requests=None, notifications=None, timeout=None)

Get a RPC peer proxy object. Method calls to this object are delegated and executed on the connected RPC peer.

Parameters:
  • requests (list of str | None) – A list of method names which can be called and will be delegated to peer node as requests. Default: None -> All arbitrary attributes are handled as requests to the peer.
  • notifications (list of str | None) – A list of method names which can be called and will be delegated to peer node as notifications. Default: None -> If requests is not None all other attributes are handled as notifications.
  • timeout (float | None) – Timeout in seconds, maximum time to wait responses to each Request.
Returns:

A proxy object. Attribute method calls delegated over RPC.

get_peer_proxy() (without arguments) will return a proxy where all attribute method calls are turned into Requests, except calls via .n which are turned into Notifications. Example:

proxy = rpc.get_peer_proxy()
proxy.n.log_this('hello')          # -> Notification
result = proxy.swap_this('Alise')  # -> Request

But if arguments are given then the interpretation is explicit and .n-delegator is not used:

proxy = rpc.get_peer_proxy(['swap_this'], ['log_this'])
proxy.log_this('hello')            # -> Notification
result = proxy.swap_this('esilA')  # -> Request
invoke_notification(method_name, *args, **kwargs)

Send an RPC Notification.

Parameters:
  • method_name (str) – Name of the notification method.
  • args – Arguments
  • kwargs – Keyword Arguments.
NOTE:
Use either arguments or keyword arguments. Both can’t be used simultaneously in a single call.
invoke_request(method_name, *args, **kwargs)

Invoke RPC Request.

Parameters:
  • method_name (str) – Name of the request method.
  • args – Arguments
  • kwargs – Keyword Arguments.
Returns:

Response value(s) from peer.

Raises:

BsonRpcError

A timeout for the request can be set by giving a special keyword argument timeout (float value of seconds) which can be prefixed by any number of underscores - if necessary - to differentiate it from the actual keyword arguments going to the peer node method call.

e.g. invoke_request('testing', [], {'_timeout': 22, '__timeout: 10.0}) would call a request method testing(_timeout=22) on the RPC peer and wait for the response for 10 seconds.

NOTE:
Use either arguments or keyword arguments. Both can’t be used in a single call. (Naturally the timeout argument does not count to the rule.)
is_closed
Property:bool – Closed by peer node or with close()
join(timeout=None)

Wait for the internal dispatcher to shut down.

Parameters:timeout (float | None) – Timeout in seconds, max time to wait.
protocol = 'jsonrpc'

Protocol name used in messages

protocol_version = '2.0'

Protocol version used in messages

BSONRpc Objects

class bsonrpc.BSONRpc(socket, services=None, **options)[source]

Bases: bsonrpc.rpc.RpcBase

BSON RPC Connector. Follows closely JSON-RPC 2.0 specification with only few differences:

  • Batches are not supported since BSON does not support top-level lists.
  • Keyword ‘jsonrpc’ has been replaced by ‘bsonrpc’

Connects via socket to RPC peer node. Provides access to the services provided by the peer node and makes local services available for the peer.

To use BSONRpc you need to install pymongo-package (see requirements.txt)

__init__(socket, services=None, **options)[source]
Parameters:
  • socket (socket.socket) – Socket connected to the peer. (Anything behaving like a socket and implementing socket methods close, recv, sendall and shutdown is equally viable)
  • services (@service_class Class | None) – Object providing request handlers and notification handlers to be exposed to peer. See Providing Services for details.
  • options – Modify behavior by overriding the library defaults.

Available options:

concurrent_notification_handling

Affects by which strategy each notification handler will be launched to handle each notification. See About Threading Model for more info. Choices:

  • None (Default)
  • bsonrpc.ThreadingModel.THREADS
  • bsonrpc.ThreadingModel.GEVENT
concurrent_request_handling

Affects by which strategy each request handler will be launched to handle each request. See About Threading Model for more info. Choices:

  • None,
  • bsonrpc.ThreadingModel.THREADS (Default)
  • bsonrpc.ThreadingModel.GEVENT
connection_id
Label to use in logs to identify current connection. Default: ‘’
id_generator
A generator which must yield a unique ID on each next()-call. Used for generating ID’s for request messages. Default: internal default generator yielding integers 1, 2, ...
no_arguments_presentation

When RPC method is to be sent without arguments the JSON RPC 2.0 specification specifies that the params-key in the message MAY be omitted. However because this is not a strict requirement, an empty JSON Array or empty JSON Object are equally valid presentations of an empty argument collection. Choices:

  • bsonrpc.NoArgumentsPresentation.OMIT (Default)
  • bsonrpc.NoArgumentsPresentation.EMPTY_ARRAY
  • bsonrpc.NoArgumentsPresentation.EMPTY_OBJECT

This option affects only the structure of sent JSON messages. All valid schematic variations for incoming messages are recognized correctly regardless of this setting.

threading_model

Affects the concurrency implementation of the internal dispatcher and message stream decoder. See About Threading Model for more info Choices:

  • bsonrpc.ThreadingModel.THREADS (Default)
  • bsonrpc.ThreadingModel.GEVENT
custom_codec_implementation

Is by default None in which case this library is able to automatically use bson codec from either pymongo or bson (https://pypi.python.org/pypi/pymongo or https://pypi.python.org/pypi/bson) libraries depending on which ever is installed on the system.

Otherwise if you provide a custom codec it must have callable attibutes (aka member methods) dumps and loads with function signatures identical to those of the bson:0.4.6 library.

All options as well as any possible custom/extra options are available as attributes of the constructed class object.

close()

Close the connection and stop the internal dispatcher.

get_peer_proxy(requests=None, notifications=None, timeout=None)

Get a RPC peer proxy object. Method calls to this object are delegated and executed on the connected RPC peer.

Parameters:
  • requests (list of str | None) – A list of method names which can be called and will be delegated to peer node as requests. Default: None -> All arbitrary attributes are handled as requests to the peer.
  • notifications (list of str | None) – A list of method names which can be called and will be delegated to peer node as notifications. Default: None -> If requests is not None all other attributes are handled as notifications.
  • timeout (float | None) – Timeout in seconds, maximum time to wait responses to each Request.
Returns:

A proxy object. Attribute method calls delegated over RPC.

get_peer_proxy() (without arguments) will return a proxy where all attribute method calls are turned into Requests, except calls via .n which are turned into Notifications. Example:

proxy = rpc.get_peer_proxy()
proxy.n.log_this('hello')          # -> Notification
result = proxy.swap_this('Alise')  # -> Request

But if arguments are given then the interpretation is explicit and .n-delegator is not used:

proxy = rpc.get_peer_proxy(['swap_this'], ['log_this'])
proxy.log_this('hello')            # -> Notification
result = proxy.swap_this('esilA')  # -> Request
invoke_notification(method_name, *args, **kwargs)

Send an RPC Notification.

Parameters:
  • method_name (str) – Name of the notification method.
  • args – Arguments
  • kwargs – Keyword Arguments.
NOTE:
Use either arguments or keyword arguments. Both can’t be used simultaneously in a single call.
invoke_request(method_name, *args, **kwargs)

Invoke RPC Request.

Parameters:
  • method_name (str) – Name of the request method.
  • args – Arguments
  • kwargs – Keyword Arguments.
Returns:

Response value(s) from peer.

Raises:

BsonRpcError

A timeout for the request can be set by giving a special keyword argument timeout (float value of seconds) which can be prefixed by any number of underscores - if necessary - to differentiate it from the actual keyword arguments going to the peer node method call.

e.g. invoke_request('testing', [], {'_timeout': 22, '__timeout: 10.0}) would call a request method testing(_timeout=22) on the RPC peer and wait for the response for 10 seconds.

NOTE:
Use either arguments or keyword arguments. Both can’t be used in a single call. (Naturally the timeout argument does not count to the rule.)
is_closed
Property:bool – Closed by peer node or with close()
join(timeout=None)

Wait for the internal dispatcher to shut down.

Parameters:timeout (float | None) – Timeout in seconds, max time to wait.
protocol = 'bsonrpc'

Protocol name used in messages

protocol_version = '2.0'

Protocol version used in messages

Providing Services

In order to provide remote callable functions the JSONRpc Objects and BSONRpc Objects do expect to get a bsonrpc.service_class-decorated Class instance as an argument. Use the decorators introduced below to announce methods either as request handlers or notification handlers.

Decorators

bsonrpc.service_class(cls)[source]

A class decorator enabling the instances of the class to be used as a services-provider in JSONRpc Objects and BSONRpc Objects.

Use decorators request, notification, rpc_request and rpc_notification to expose methods for the RPC peer node.

bsonrpc.request(method)[source]

A method decorator announcing the method to be exposed as a request handler.

This decorator assumes that the method parameters are trivially exposed to the peer node in ‘as-is’ manner.

bsonrpc.notification(method)[source]

A method decorator announcing the method to be exposed as a notification handler.

This decorator assumes that the method parameters are trivially exposed to the peer node in ‘as-is’ manner.

bsonrpc.rpc_request(method)[source]

A method decorator announcing the method to be exposed as a request handler.

This decorator assumes that the first parameter (after self) takes a BSONRpc/JSONRpc object reference as an argument, so that the method will have an access to make RPC callbacks on the peer node (requests and notifications) during its execution. From the second parameter onward the parameters are exposed as-is to the peer node.

bsonrpc.rpc_notification(method)[source]

A method decorator announcing the method to be exposed as a notification handler.

This decorator assumes that the first parameter (after self) takes a BSONRpc/JSONRpc object reference as an argument. From the second parameter onward the parameters are exposed as-is to the peer node.

About rpc-reference

The JSONRpc/BSONRpc reference object given at runtime to rpc_request and rpc_notification -decorated methods enables accessing of JSONRpc/BSONRpc-methods from service-methods, with following differences:

  • .close() is renamed to .abort() to highlight the abnormal nature of discontinued request handling. Causes the return value of the request handler to be discarded.
  • .close_after_response() (Takes no arguments) is available. This will trigger the connection to be closed right after the return value turned into a response message has been sent to the peer node.

Service Provider Example

from bsonrpc import service_class, request, rpc_request, notification

@service_class
class MyServices(object):

    # __init__  etc..

    @request
    def revert(self, txt):
        return ''.join(reversed(list(txt)))

    @rpc_request
    def calculate(self, rpc, a, b, c):
        rpc.invoke_notification('report_progress', '.')
        rpc.invoke_notification('report_progress', '..')
        return a * b * c

    @rpc_request
    def final_message(self, rpc, txt):
        # Custom attribute 'my_custom_label', set via options
        self.log(rpc.my_custom_label + txt)
        rpc.close_after_response()
        return 'Good Bye!'

    @notification
    def log(self, fmt, *args):
        print('From peer: ' + fmt % args)

bsonrpc.framing

This module provides classes implementing different JSON-RPC 2.0 framing options. Currently RFC-7464, Netstring and Frameless -message framings are included.

In case you need to use a different framing method, you can provide your own implementor class for this library via options.

The class you provide must have the following classmethods and behavior:
  • extract_message
    @classmethod
    def extract_message(cls, raw_bytes)
        # Args:
        #    raw_bytes (bytes): 1 - N bytes from stream
        # Returns:
        #    bytes, bytes    (tuple of 2 (builtins.bytes))
        #       * The 1st value must be either:
        #           * None - if the given `raw_bytes`-argument does not
        #                    contain enough bytes to lift a complete
        #                    JSON message from it.
        #           * Unframed bytes supposedly containing exactly 1 JSON
        #             message.
        #       * The 2nd value consists of the remaining bytes of
        #         `raw_bytes` if/when a framed message has been lifted
        #         to become the unframed message in the 1st return value.
        # Raises:
        #    Library framework will coerce any raised Exceptions into
        #    bsonrpc.exceptions.FramingError -exceptions.
        return msg_bytes, rest_bytes
    
  • into_frame
    @classmethod
    def into_frame(cls, message_bytes):
        # Args:
        #    message_bytes (bytes): 1 complete JSON message serialized
        #                           into bytes.
        # Returns:
        #    bytes
        #       == framed message.
        # Raises:
        #    Library framework will coerce any raised Exceptions into
        #    bsonrpc.exceptions.FramingError -exceptions.
        return framed_bytes
    

About Threading Model

This library contains concurrent execution threads for:

  1. Decoder receiving bytes from socket, extracting and decoding messages to python data objects and putting them to queue for dispatcher to consume.

  2. Dispatcher consuming messages from the socket-queue, dispatching incoming responses to local waiter(s), requests and notifications to service handlers via selected strategy (3 & 4) and spawning batch collectors to handle bathces.

  3. To handle an incoming request the Dispatcher can either execute the responsible request handler without spawning any threads or can spawn a thread to execute that handler, depending on the selected concurrent_request_handling -strategy.

    If a handler is executed without threading the Dispatcher cannot take any new messages for processing from the queue until the handler has returned. Spawning allows simultaneous processing of multiple requests which is usually desirable. Control flow is not any less deterministic as it is fully controlled by the rpc peer node using the service interface.

  4. In identical way the selected concurrent_notification_handling-strategy will determine notification handler spawning.

For basic concurrency (points 1 & 2 above) this library can be configured to use either basic python threads or gevent (*) lib greenlets. This is done with the threading_model -option.

The following option combinations have been unittested:

threading_model concurrent_request_handling concurrent_notification_handling
ThreadingModel.THREADS ThreadingModel.THREADS None
ThreadingModel.THREADS ThreadingModel.THREADS ThreadingModel.THREADS
ThreadingModel.GEVENT ThreadingModel.GEVENT None
ThreadingModel.GEVENT ThreadingModel.GEVENT ThreadingModel.GEVENT

(*) see requirements.txt for minimal version requirements.