bsonrpc - JSON/BSON RPC¶
Library for JSON RPC 2.0 and BSON RPC
Install with pip3
- And use:
import bsonrpc
JSONRpc Objects¶
-
class
bsonrpc.
JSONRpc
(socket, services=None, **options)[source]¶ Bases:
bsonrpc.rpc.RpcBase
JSON RPC Connector. Implements the JSON-RPC 2.0 specification.
Connects via socket to RPC peer node. Provides access to the services provided by the peer node. Optional
services
parameter will take an object of which methods are accessible to the peer node.Various methods of JSON message framing are available for the stream transport.
-
__init__
(socket, services=None, **options)[source]¶ Parameters: - socket (socket.socket) – Socket connected to the peer. (Anything behaving like
a socket and implementing socket methods
close
,recv
,sendall
andshutdown
is equally viable) - services (
@service_class
Class |None
) – Object providing request handlers and notification handlers to be exposed to peer. See Providing Services for details. - options – Modify behavior by overriding the library defaults.
Available options:
- framing_cls
Selection of framing method implementation. Either select one of the following:
bsonrpc.JSONFramingNetstring
bsonrpc.JSONFramingNone
bsonrpc.JSONFramingRFC7464
(Default)
Or provide your own implementation class for some other framing type. See bsonrpc.framing for details.
- concurrent_notification_handling
Affects by which strategy each notification handler will be launched to handle each notification. See About Threading Model for more info. Choices:
None
(Default)bsonrpc.ThreadingModel.THREADS
bsonrpc.ThreadingModel.GEVENT
- concurrent_request_handling
Affects by which strategy each request handler will be launched to handle each request. See About Threading Model for more info. Choices:
None
,bsonrpc.ThreadingModel.THREADS
(Default)bsonrpc.ThreadingModel.GEVENT
- connection_id
- Label to use in logs to identify current connection. Default: ‘’
- id_generator
- A generator which must yield a unique ID on each next()-call. Used for generating ID’s for request messages. Default: internal default generator yielding integers 1, 2, ...
- no_arguments_presentation
When RPC method is to be sent without arguments the JSON RPC 2.0 specification specifies that the
params
-key in the message MAY be omitted. However because this is not a strict requirement, an empty JSON Array or empty JSON Object are equally valid presentations of an empty argument collection. Choices:bsonrpc.NoArgumentsPresentation.OMIT
(Default)bsonrpc.NoArgumentsPresentation.EMPTY_ARRAY
bsonrpc.NoArgumentsPresentation.EMPTY_OBJECT
This option affects only the structure of sent JSON messages. All valid schematic variations for incoming messages are recognized correctly regardless of this setting.
- threading_model
Affects the concurrency implementation of the internal dispatcher and message stream decoder. See About Threading Model for more info Choices:
bsonrpc.ThreadingModel.THREADS
(Default)bsonrpc.ThreadingModel.GEVENT
- custom_codec_implementation
- Is by default
None
in which case this library uses python default json library. Otherwise an alternative json codec implementation can be provided as an object that must have callable attributesdumps
andloads
with identical function signatures to those standard json library.
All options as well as any possible custom/extra options are available as attributes of the constructed class object.
- socket (socket.socket) – Socket connected to the peer. (Anything behaving like
a socket and implementing socket methods
-
batch_call
(batch_calls, timeout=None)[source]¶ Parameters: - batch_calls (bsonrpc.BatchBuilder | list of 4-tuples) –
Batch of requests/notifications to be executed on the peer node. Use
BatchBuilder()
and pass the object here as a parameter.Example:
bb = BatchBuilder(['swapit', 'times'], ['logit']) bb.swapit('hello') # request bb.times(3, 5) # request bb.logit('world') # notification results = jsonrpc.batch_call(bb, timeout=15.0) # results: ['olleh', 15]
Note that
BatchBuilder
is used and behaves like the peer-proxy returned by.get_peer_proxy()
.Instead of BatchBuilder you may give a simple list argument which must be in the following format: [(“r”/”n”, “<method-name>”, args, kwargs), ...]
- timeout (float | None) – Timeout in seconds for waiting results. Default: None
Returns: - list of results to requests, in order of original requests.
Each result may be:
- a single return value or
- a tuple of return values or
- an Exception object
None
ifbatch_calls
contained only notifications.
Raises: ResponseTimeout in case batch_calls contains requests, for which response batch did not arrive within timeout.
- batch_calls (bsonrpc.BatchBuilder | list of 4-tuples) –
-
close
()¶ Close the connection and stop the internal dispatcher.
-
framing_cls
¶ Default choice for JSON Framing
alias of
JSONFramingRFC7464
-
get_peer_proxy
(requests=None, notifications=None, timeout=None)¶ Get a RPC peer proxy object. Method calls to this object are delegated and executed on the connected RPC peer.
Parameters: - requests (list of str | None) – A list of method names which can be called and will be delegated to peer node as requests. Default: None -> All arbitrary attributes are handled as requests to the peer.
- notifications (list of str | None) – A list of method names which can be called and
will be delegated to peer node as notifications.
Default: None -> If
requests
is notNone
all other attributes are handled as notifications. - timeout (float | None) – Timeout in seconds, maximum time to wait responses to each Request.
Returns: A proxy object. Attribute method calls delegated over RPC.
get_peer_proxy()
(without arguments) will return a proxy where all attribute method calls are turned into Requests, except calls via.n
which are turned into Notifications. Example:proxy = rpc.get_peer_proxy() proxy.n.log_this('hello') # -> Notification result = proxy.swap_this('Alise') # -> Request
But if arguments are given then the interpretation is explicit and
.n
-delegator is not used:proxy = rpc.get_peer_proxy(['swap_this'], ['log_this']) proxy.log_this('hello') # -> Notification result = proxy.swap_this('esilA') # -> Request
-
invoke_notification
(method_name, *args, **kwargs)¶ Send an RPC Notification.
Parameters: - method_name (str) – Name of the notification method.
- args – Arguments
- kwargs – Keyword Arguments.
- NOTE:
- Use either arguments or keyword arguments. Both can’t be used simultaneously in a single call.
-
invoke_request
(method_name, *args, **kwargs)¶ Invoke RPC Request.
Parameters: - method_name (str) – Name of the request method.
- args – Arguments
- kwargs – Keyword Arguments.
Returns: Response value(s) from peer.
Raises: BsonRpcError
A timeout for the request can be set by giving a special keyword argument
timeout
(float value of seconds) which can be prefixed by any number of underscores - if necessary - to differentiate it from the actual keyword arguments going to the peer node method call.e.g.
invoke_request('testing', [], {'_timeout': 22, '__timeout: 10.0})
would call a request methodtesting(_timeout=22)
on the RPC peer and wait for the response for 10 seconds.- NOTE:
- Use either arguments or keyword arguments. Both can’t be used in a single call. (Naturally the timeout argument does not count to the rule.)
-
is_closed
¶ Property: bool – Closed by peer node or with close()
-
join
(timeout=None)¶ Wait for the internal dispatcher to shut down.
Parameters: timeout (float | None) – Timeout in seconds, max time to wait.
-
protocol
= 'jsonrpc'¶ Protocol name used in messages
-
protocol_version
= '2.0'¶ Protocol version used in messages
-
BSONRpc Objects¶
-
class
bsonrpc.
BSONRpc
(socket, services=None, **options)[source]¶ Bases:
bsonrpc.rpc.RpcBase
BSON RPC Connector. Follows closely JSON-RPC 2.0 specification with only few differences:
- Batches are not supported since BSON does not support top-level lists.
- Keyword ‘jsonrpc’ has been replaced by ‘bsonrpc’
Connects via socket to RPC peer node. Provides access to the services provided by the peer node and makes local services available for the peer.
To use BSONRpc you need to install
pymongo
-package (see requirements.txt)-
__init__
(socket, services=None, **options)[source]¶ Parameters: - socket (socket.socket) – Socket connected to the peer. (Anything behaving like
a socket and implementing socket methods
close
,recv
,sendall
andshutdown
is equally viable) - services (
@service_class
Class |None
) – Object providing request handlers and notification handlers to be exposed to peer. See Providing Services for details. - options – Modify behavior by overriding the library defaults.
Available options:
- concurrent_notification_handling
Affects by which strategy each notification handler will be launched to handle each notification. See About Threading Model for more info. Choices:
None
(Default)bsonrpc.ThreadingModel.THREADS
bsonrpc.ThreadingModel.GEVENT
- concurrent_request_handling
Affects by which strategy each request handler will be launched to handle each request. See About Threading Model for more info. Choices:
None
,bsonrpc.ThreadingModel.THREADS
(Default)bsonrpc.ThreadingModel.GEVENT
- connection_id
- Label to use in logs to identify current connection. Default: ‘’
- id_generator
- A generator which must yield a unique ID on each next()-call. Used for generating ID’s for request messages. Default: internal default generator yielding integers 1, 2, ...
- no_arguments_presentation
When RPC method is to be sent without arguments the JSON RPC 2.0 specification specifies that the
params
-key in the message MAY be omitted. However because this is not a strict requirement, an empty JSON Array or empty JSON Object are equally valid presentations of an empty argument collection. Choices:bsonrpc.NoArgumentsPresentation.OMIT
(Default)bsonrpc.NoArgumentsPresentation.EMPTY_ARRAY
bsonrpc.NoArgumentsPresentation.EMPTY_OBJECT
This option affects only the structure of sent JSON messages. All valid schematic variations for incoming messages are recognized correctly regardless of this setting.
- threading_model
Affects the concurrency implementation of the internal dispatcher and message stream decoder. See About Threading Model for more info Choices:
bsonrpc.ThreadingModel.THREADS
(Default)bsonrpc.ThreadingModel.GEVENT
- custom_codec_implementation
Is by default
None
in which case this library is able to automatically use bson codec from either pymongo or bson (https://pypi.python.org/pypi/pymongo or https://pypi.python.org/pypi/bson) libraries depending on which ever is installed on the system.Otherwise if you provide a custom codec it must have callable attibutes (aka member methods)
dumps
andloads
with function signatures identical to those of the bson:0.4.6 library.
All options as well as any possible custom/extra options are available as attributes of the constructed class object.
- socket (socket.socket) – Socket connected to the peer. (Anything behaving like
a socket and implementing socket methods
-
close
()¶ Close the connection and stop the internal dispatcher.
-
get_peer_proxy
(requests=None, notifications=None, timeout=None)¶ Get a RPC peer proxy object. Method calls to this object are delegated and executed on the connected RPC peer.
Parameters: - requests (list of str | None) – A list of method names which can be called and will be delegated to peer node as requests. Default: None -> All arbitrary attributes are handled as requests to the peer.
- notifications (list of str | None) – A list of method names which can be called and
will be delegated to peer node as notifications.
Default: None -> If
requests
is notNone
all other attributes are handled as notifications. - timeout (float | None) – Timeout in seconds, maximum time to wait responses to each Request.
Returns: A proxy object. Attribute method calls delegated over RPC.
get_peer_proxy()
(without arguments) will return a proxy where all attribute method calls are turned into Requests, except calls via.n
which are turned into Notifications. Example:proxy = rpc.get_peer_proxy() proxy.n.log_this('hello') # -> Notification result = proxy.swap_this('Alise') # -> Request
But if arguments are given then the interpretation is explicit and
.n
-delegator is not used:proxy = rpc.get_peer_proxy(['swap_this'], ['log_this']) proxy.log_this('hello') # -> Notification result = proxy.swap_this('esilA') # -> Request
-
invoke_notification
(method_name, *args, **kwargs)¶ Send an RPC Notification.
Parameters: - method_name (str) – Name of the notification method.
- args – Arguments
- kwargs – Keyword Arguments.
- NOTE:
- Use either arguments or keyword arguments. Both can’t be used simultaneously in a single call.
-
invoke_request
(method_name, *args, **kwargs)¶ Invoke RPC Request.
Parameters: - method_name (str) – Name of the request method.
- args – Arguments
- kwargs – Keyword Arguments.
Returns: Response value(s) from peer.
Raises: BsonRpcError
A timeout for the request can be set by giving a special keyword argument
timeout
(float value of seconds) which can be prefixed by any number of underscores - if necessary - to differentiate it from the actual keyword arguments going to the peer node method call.e.g.
invoke_request('testing', [], {'_timeout': 22, '__timeout: 10.0})
would call a request methodtesting(_timeout=22)
on the RPC peer and wait for the response for 10 seconds.- NOTE:
- Use either arguments or keyword arguments. Both can’t be used in a single call. (Naturally the timeout argument does not count to the rule.)
-
is_closed
¶ Property: bool – Closed by peer node or with close()
-
join
(timeout=None)¶ Wait for the internal dispatcher to shut down.
Parameters: timeout (float | None) – Timeout in seconds, max time to wait.
-
protocol
= 'bsonrpc'¶ Protocol name used in messages
-
protocol_version
= '2.0'¶ Protocol version used in messages
Providing Services¶
In order to provide remote callable functions the JSONRpc Objects and
BSONRpc Objects do expect to get a bsonrpc.service_class
-decorated
Class instance as an argument. Use the decorators introduced below
to announce methods either as request handlers or notification handlers.
Decorators¶
-
bsonrpc.
service_class
(cls)[source]¶ A class decorator enabling the instances of the class to be used as a
services
-provider in JSONRpc Objects and BSONRpc Objects.Use decorators
request
,notification
,rpc_request
andrpc_notification
to expose methods for the RPC peer node.
-
bsonrpc.
request
(method)[source]¶ A method decorator announcing the method to be exposed as a request handler.
This decorator assumes that the method parameters are trivially exposed to the peer node in ‘as-is’ manner.
-
bsonrpc.
notification
(method)[source]¶ A method decorator announcing the method to be exposed as a notification handler.
This decorator assumes that the method parameters are trivially exposed to the peer node in ‘as-is’ manner.
-
bsonrpc.
rpc_request
(method)[source]¶ A method decorator announcing the method to be exposed as a request handler.
This decorator assumes that the first parameter (after
self
) takes a BSONRpc/JSONRpc object reference as an argument, so that the method will have an access to make RPC callbacks on the peer node (requests and notifications) during its execution. From the second parameter onward the parameters are exposed as-is to the peer node.
-
bsonrpc.
rpc_notification
(method)[source]¶ A method decorator announcing the method to be exposed as a notification handler.
This decorator assumes that the first parameter (after
self
) takes a BSONRpc/JSONRpc object reference as an argument. From the second parameter onward the parameters are exposed as-is to the peer node.
About rpc-reference¶
The JSONRpc/BSONRpc reference object given at runtime to rpc_request
and rpc_notification
-decorated methods enables accessing of
JSONRpc/BSONRpc-methods from service-methods, with following differences:
.close()
is renamed to.abort()
to highlight the abnormal nature of discontinued request handling. Causes the return value of the request handler to be discarded..close_after_response()
(Takes no arguments) is available. This will trigger the connection to be closed right after the return value turned into a response message has been sent to the peer node.
Service Provider Example¶
from bsonrpc import service_class, request, rpc_request, notification
@service_class
class MyServices(object):
# __init__ etc..
@request
def revert(self, txt):
return ''.join(reversed(list(txt)))
@rpc_request
def calculate(self, rpc, a, b, c):
rpc.invoke_notification('report_progress', '.')
rpc.invoke_notification('report_progress', '..')
return a * b * c
@rpc_request
def final_message(self, rpc, txt):
# Custom attribute 'my_custom_label', set via options
self.log(rpc.my_custom_label + txt)
rpc.close_after_response()
return 'Good Bye!'
@notification
def log(self, fmt, *args):
print('From peer: ' + fmt % args)
bsonrpc.framing¶
This module provides classes implementing different JSON-RPC 2.0 framing options. Currently RFC-7464, Netstring and Frameless -message framings are included.
In case you need to use a different framing method, you can provide your own implementor class for this library via options.
- The class you provide must have the following classmethods and behavior:
extract_message
@classmethod def extract_message(cls, raw_bytes) # Args: # raw_bytes (bytes): 1 - N bytes from stream # Returns: # bytes, bytes (tuple of 2 (builtins.bytes)) # * The 1st value must be either: # * None - if the given `raw_bytes`-argument does not # contain enough bytes to lift a complete # JSON message from it. # * Unframed bytes supposedly containing exactly 1 JSON # message. # * The 2nd value consists of the remaining bytes of # `raw_bytes` if/when a framed message has been lifted # to become the unframed message in the 1st return value. # Raises: # Library framework will coerce any raised Exceptions into # bsonrpc.exceptions.FramingError -exceptions. return msg_bytes, rest_bytes
into_frame
@classmethod def into_frame(cls, message_bytes): # Args: # message_bytes (bytes): 1 complete JSON message serialized # into bytes. # Returns: # bytes # == framed message. # Raises: # Library framework will coerce any raised Exceptions into # bsonrpc.exceptions.FramingError -exceptions. return framed_bytes
About Threading Model¶
This library contains concurrent execution threads for:
Decoder receiving bytes from socket, extracting and decoding messages to python data objects and putting them to queue for dispatcher to consume.
Dispatcher consuming messages from the socket-queue, dispatching incoming responses to local waiter(s), requests and notifications to service handlers via selected strategy (3 & 4) and spawning batch collectors to handle bathces.
To handle an incoming request the Dispatcher can either execute the responsible request handler without spawning any threads or can spawn a thread to execute that handler, depending on the selected
concurrent_request_handling
-strategy.If a handler is executed without threading the Dispatcher cannot take any new messages for processing from the queue until the handler has returned. Spawning allows simultaneous processing of multiple requests which is usually desirable. Control flow is not any less deterministic as it is fully controlled by the rpc peer node using the service interface.
In identical way the selected
concurrent_notification_handling
-strategy will determine notification handler spawning.
For basic concurrency (points 1 & 2 above) this library can be configured to use
either basic python threads or gevent (*) lib greenlets. This is done with
the threading_model
-option.
The following option combinations have been unittested:
threading_model | concurrent_request_handling | concurrent_notification_handling |
---|---|---|
ThreadingModel.THREADS | ThreadingModel.THREADS | None |
ThreadingModel.THREADS | ThreadingModel.THREADS | ThreadingModel.THREADS |
ThreadingModel.GEVENT | ThreadingModel.GEVENT | None |
ThreadingModel.GEVENT | ThreadingModel.GEVENT | ThreadingModel.GEVENT |
(*) see requirements.txt for minimal version requirements.