API

AMQP

class nameko.amqp.consume.Consumer(amqp_uri, ssl=None, queues=None, callbacks=None, heartbeat=None, prefetch_count=None, accept=None, **consumer_options)

Helper utility for consuming messages from RabbitMQ.

connection

Provide the connection parameters for kombu’s ConsumerMixin.

The Connection object is a declaration of connection parameters that is lazily evaluated. It doesn’t represent an established connection to the broker at this point.

get_consumers(consumer_cls, channel)

Kombu callback to set up consumers.

Called after any (re)connection to the broker.

on_consume_ready(connection, channel, consumers, **kwargs)

Kombu callback when consumers are ready to accept messages.

Called after any (re)connection to the broker.

stop()

Stop this consumer.

Any messages received between when this method is called and the resulting consumer cancel will be requeued.

wait_until_consumer_ready()

Wait for initial connection.

class nameko.amqp.publish.Publisher(amqp_uri, use_confirms=None, serializer=None, compression=None, delivery_mode=None, mandatory=None, priority=None, expiration=None, declare=None, retry=None, retry_policy=None, ssl=None, **publish_kwargs)

Utility helper for publishing messages to RabbitMQ.

publish(payload, **kwargs)

Publish a message.

exception nameko.amqp.publish.UndeliverableMessage

Raised when publisher confirms are enabled and a message could not be routed or persisted

RPC

class nameko.rpc.Client(publish, register_for_reply, context_data, service_name=None, method_name=None)

Helper object for making RPC calls.

The target service and method name may be specified at construction time or by attribute or dict access, for example:

# target at construction time client = Client(

publish, register_for_reply, context_data, “target_service”, “target_method”

) client(*args, **kwargs)

# equivalent with attribute access client = Client(publish, register_for_reply, context_data) client = client.target_service.target_method # now fully-specified client(*args, **kwargs)

Calling a fully-specified Client will make the RPC call and block for the result. The call_async method initiates the call but returns an RpcReply object that can be used later to retrieve the result.

Parameters:
publish : callable

Function to publish an RPC request

register_for_reply : callable

Function to register a new call with a reply listener. Returns another function that should be used to retrieve the response.

context_data: dict

Worker context data to be sent as extra headers

service_name : str

Optional target service name, if known

method_name : str

Optional target method name, if known

class nameko.rpc.ClusterRpc(**publisher_options)

DependencyProvider for injecting an RPC client to a cluster of services into a service.

Parameters:
**publisher_options

Options to configure the Publisher that sends the message.

publisher_cls

alias of Publisher

nameko.rpc.Proxy

alias of Client

class nameko.rpc.ReplyListener(**consumer_options)

SharedExtension for consuming replies to RPC requests.

Creates a queue and consumes from it in a managed thread. RPC requests should register their correlation_id with register_for_reply() in order for the ReplyListener to capture the reply.

class ReplyConsumer(check_for_lost_replies, *args, **kwargs)

Subclass Consumer to add disconnection check

get_consumers(consumer_cls, channel)

Check for messages lost while the reply listener was disconnected from the broker.

ReplyListener.consumer_cls

alias of ReplyConsumer

ReplyListener.register_for_reply(correlation_id)

Register an RPC call with the given correlation_id for a reply.

Returns a function that can be used to retrieve the reply, blocking until it has been received.

class nameko.rpc.Responder(amqp_uri, exchange, message, ssl=None)

Helper object for publishing replies to RPC calls.

publisher_cls

alias of Publisher

class nameko.rpc.Rpc(expected_exceptions=(), sensitive_arguments=(), **kwargs)

A limitation of using a shared queue for all RPC entrypoints is that we can’t accept per-entrypoint consumer options. The best solution to this is to start using a queue per entrypoint, but this will require a consumer (and if using kombu, a connection) per entrypoint.

For the time being consumer options are not supported in RPC entrypoints.

Parameters:
expected_exceptions : exception class or tuple of exception classes

Specify exceptions that may be caused by the caller (e.g. by providing bad arguments). Saved on the entrypoint instance as entrypoint.expected_exceptions for later inspection by other extensions, for example a monitoring system.

sensitive_arguments : string or tuple of strings

Mark an argument or part of an argument as sensitive. Saved on the entrypoint instance as entrypoint.sensitive_arguments for later inspection by other extensions, for example a logging system.

seealso:nameko.utils.get_redacted_args()
class nameko.rpc.RpcCall(correlation_id, send_request, get_response)

Encapsulates a single RPC request and response.

Parameters:
correlation_id : str

Identifier for this call

send_request : callable

Function that initiates the request

get_response : callable

Function that retrieves the response

get_response()

Retrieve the response for this RPC call. Blocks if the response has not been received.

result()

Return the result of this RPC call, blocking if the response has not been received.

Raises a RemoteError if the remote service returned an error response.

send_request(*args, **kwargs)

Send the RPC request to the remote service

nameko.rpc.RpcProxy

alias of ServiceRpc

class nameko.rpc.ServiceRpc(target_service, **kwargs)

DependencyProvider for injecting an RPC client to a specific service into a service.

As per ClusterRpc but with a pre-specified target service.

Parameters:
target_service : str

Target service name

Events

Provides a high level interface to the core messaging module.

Events are special messages, which can be emitted by one service and handled by other listening services.

An event consists of an identifier and some data and is dispatched using an injection acquired from an instance of EventDispatcher.

Events are dispatched asynchronously. It is only guaranteed that the event has been dispatched, not that it was received or handled by a listener.

To listen to an event, a service must declare a handler using the handle_event() entrypoint, providing the target service and an event type filter.

Example:

# service A
def edit_foo(self, id):
    # ...
    self.dispatch('foo_updated', {'id': id})

# service B

@handle_event('service_a', 'foo_updated')
def bar(event_data):
    pass
class nameko.events.EventDispatcher(exchange=None, declare=None, **publisher_options)

Provides an event dispatcher method via dependency injection.

Events emitted will be dispatched via the service’s events exchange, which automatically gets declared by the event dispatcher as a topic exchange. The name for the exchange will be {service-name}.events.

Events, emitted via the dispatcher, will be serialized and published to the events exchange. The event’s type attribute is used as the routing key, which can be used for filtering on the listener’s side.

The dispatcher will return as soon as the event message has been published. There is no guarantee that any service will receive the event, only that the event has been successfully dispatched.

Example:

class Spammer(object):
    dispatch_spam = EventDispatcher()

    def emit_spam(self):
        evt_data = 'ham and eggs'
        self.dispatch_spam('spam.ham', evt_data)

Provides an AMQP message publisher method via dependency injection.

In AMQP, messages are published to exchanges and routed to bound queues. This dependency accepts the exchange to publish to and will ensure that it is declared before publishing.

Optionally, you may use the declare keyword argument to pass a list of other kombu.Exchange or kombu.Queue objects to declare before publishing.

Parameters:
exchange : kombu.Exchange

Destination exchange

declare : list

List of kombu.Exchange or kombu.Queue objects to declare before publishing.

**publisher_options

Options to configure the Publisher that sends the message.

If exchange is not provided, the message will be published to the default exchange.

Example:

class Foobar(object):

    publish = Publisher(exchange=...)

    def spam(self, data):
        self.publish('spam:' + data)
get_dependency(worker_ctx)

Inject a dispatch method onto the service instance

exception nameko.events.EventHandlerConfigurationError

Raised when an event handler is misconfigured.

Messaging

Provides core messaging decorators and dependency providers.

Standalone

Nameko components that can be used as standalone tools, without being hosted inside a nameko-managed service.

Intended to be used as test utilities and external controls, for example to initiate some action inside a nameko cluster.

Examples:

Use the RPC client to perform some addition on mathsservice:

>>> from nameko.standalone.rpc import ServiceRpcClient
>>>
>>> with ServiceRpcClient("mathsservice", config) as client:
...     result = client.add(2, 2)
...
>>> print result
4

Dispatch a custom_event as srcservice:

>>> from nameko.standalone.events import event_dispatcher
>>>
>>> with event_dispatcher("srcservice", config) as dispatch:
...     dispatch("custom_event", "msg")
...
>>>
class nameko.standalone.rpc.ClusterRpcClient(context_data=None, timeout=None, **publisher_options)

Single-threaded RPC client to a cluster of services. The target service and method are specified with attributes.

Method calls on the local object are converted into RPC calls to the target service.

Enables services not hosted by nameko to make RPC requests to a nameko cluster. It is commonly used as a context manager but may also be manually started and stopped.

Usage

As a context manager:

with ClusterRpc() as client:
    client.target_service.method()
    client.other_service.method()

The equivalent call, manually starting and stopping:

client = ClusterRpc()
client = client.start()
try:
    client.target_service.method()
    client.other_service.method()
finally:
    client.stop()

If you call start() you must eventually call stop() to close the connection to the broker.

You may also supply context_data, a dictionary of data to be serialised into the AMQP message headers.

When the name of the service is not legal in Python, you can also use a dict-like syntax:

with ClusterRpc() as client:
    client['service-name'].method()
    client['other-service'].method()
publisher_cls

alias of Publisher

nameko.standalone.rpc.ClusterRpcProxy

alias of ClusterRpcClient

class nameko.standalone.rpc.ReplyListener(queue, timeout=None, uri=None, ssl=None, **kwargs)

Single-threaded listener for RPC replies.

Creates a queue and consumes from it on demand. RPC requests should register their correlation_id with register_for_reply() in order for the ReplyListener to capture the reply.

class ReplyConsumer(check_for_lost_replies, *args, **kwargs)

Subclass Consumer to add disconnection check

get_consumers(consumer_cls, channel)

Check for messages lost while the reply listener was disconnected from the broker.

ReplyListener.consume_reply(correlation_id)

Consume from the reply queue until the reply for the given correlation_id is received.

ReplyListener.consumer_cls

alias of ReplyConsumer

ReplyListener.register_for_reply(correlation_id)

Register an RPC call with the given correlation_id for a reply.

Returns a function that can be used to retrieve the reply, blocking until it has been received.

class nameko.standalone.rpc.ServiceRpcClient(service_name, *args, **kwargs)

Single-threaded RPC client to a named service.

As per ClusterRpc but with a pre-specified target service.

nameko.standalone.rpc.ServiceRpcProxy

alias of ServiceRpcClient

nameko.standalone.events.event_dispatcher(**kwargs)

Return a function that dispatches nameko events.

nameko.standalone.events.get_event_exchange(service_name)

Get an exchange for service_name events.

HTTP

class nameko.web.server.BindAddress(address, port)
address

Alias for field number 0

port

Alias for field number 1

class nameko.web.server.WebServer

A SharedExtension that wraps a WSGI interface for processing HTTP requests.

WebServer can be subclassed to add additional WSGI functionality through overriding the get_wsgi_server and get_wsgi_app methods.

get_wsgi_app()

Get the WSGI application used to process requests.

This method can be overridden to apply WSGI middleware or replace the WSGI application all together.

get_wsgi_server(sock, wsgi_app, protocol=<class nameko.web.server.HttpOnlyProtocol>, debug=False)

Get the WSGI server used to process requests.

class nameko.web.websocket.SocketInfo(socket, data)
data

Alias for field number 1

socket

Alias for field number 0

Testing

Utilities for testing nameko services.

class nameko.testing.services.Once(*args, **kwargs)

Entrypoint that spawns a worker exactly once, as soon as the service container started.

nameko.testing.services.entrypoint_hook(*args, **kwds)

Yield a function providing an entrypoint into a hosted service.

The yielded function may be called as if it were the bare method defined in the service class. Intended to be used as an integration testing utility.

Parameters:
container : ServiceContainer

The container hosting the service owning the entrypoint

method_name : str

The name of the entrypoint decorated method on the service class

context_data : dict

Context data to provide for the call, e.g. a language, auth token or session.

Usage

To verify that ServiceX and ServiceY are compatible, make an integration test that checks their interaction:

nameko.testing.services.entrypoint_waiter(*args, **kwds)

Context manager that waits until an entrypoint has fired, and the generated worker has exited and been torn down.

It yields a nameko.testing.waiting.WaitResult object that can be used to get the result returned (exception raised) by the entrypoint after the waiter has exited.

Parameters:
container : ServiceContainer

The container hosting the service owning the entrypoint

method_name : str

The name of the entrypoint decorated method on the service class

timeout : int

Maximum seconds to wait

callback : callable

Function to conditionally control whether the entrypoint_waiter should exit for a particular invocation

The timeout argument specifies the maximum number of seconds the entrypoint_waiter should wait before exiting. It can be disabled by passing None. The default is 30 seconds.

Optionally allows a callback to be provided which is invoked whenever the entrypoint fires. If provided, the callback must return True for the entrypoint_waiter to exit. The signature for the callback function is:

def callback(worker_ctx, result, exc_info):
    pass

Where there parameters are as follows:

worker_ctx (WorkerContext): WorkerContext of the entrypoint call.

result (object): The return value of the entrypoint.

exc_info (tuple): Tuple as returned by sys.exc_info if the
entrypoint raised an exception, otherwise None.

Usage

class Service(object):
    name = "service"

    @event_handler('srcservice', 'eventtype')
    def handle_event(self, msg):
        return msg

container = ServiceContainer(Service)
container.start()

# basic
with entrypoint_waiter(container, 'handle_event'):
    ...  # action that dispatches event

# giving access to the result
with entrypoint_waiter(container, 'handle_event') as result:
    ...  # action that dispatches event
res = result.get()

# with custom timeout
with entrypoint_waiter(container, 'handle_event', timeout=5):
    ...  # action that dispatches event

# with callback that waits until entrypoint stops raising
def callback(worker_ctx, result, exc_info):
    if exc_info is None:
        return True

with entrypoint_waiter(container, 'handle_event', callback=callback):
    ...  # action that dispatches event
nameko.testing.services.replace_dependencies(container, *dependencies, **dependency_map)

Replace the dependency providers on container with instances of MockDependencyProvider.

Dependencies named in *dependencies will be replaced with a MockDependencyProvider, which injects a MagicMock instead of the dependency.

Alternatively, you may use keyword arguments to name a dependency and provide the replacement value that the MockDependencyProvider should inject.

Return the MockDependencyProvider.dependency for every dependency specified in the (*dependencies) args so that calls to the replaced dependencies can be inspected. Return a single object if only one dependency was replaced, and a generator yielding the replacements in the same order as dependencies otherwise. Note that any replaced dependencies specified via kwargs **dependency_map will not be returned.

Replacements are made on the container instance and have no effect on the service class. New container instances are therefore unaffected by replacements on previous instances.

Usage

from nameko.rpc import ServiceRpc, rpc
from nameko.standalone.rpc import ServiceRpcClient

class ConversionService(object):
    name = "conversions"

    maths_rpc = ServiceRpc("maths")

    @rpc
    def inches_to_cm(self, inches):
        return self.maths_rpc.multiply(inches, 2.54)

    @rpc
    def cm_to_inches(self, cms):
        return self.maths_rpc.divide(cms, 2.54)

container = ServiceContainer(ConversionService)
mock_maths_rpc = replace_dependencies(container, "maths_rpc")
mock_maths_rpc.divide.return_value = 39.37

container.start()

with ServiceRpcClient('conversions') as client:
    client.cm_to_inches(100)

# assert that the dependency was called as expected
mock_maths_rpc.divide.assert_called_once_with(100, 2.54)

Providing a specific replacement by keyword:

class StubMaths(object):

    def divide(self, val1, val2):
        return val1 / val2

replace_dependencies(container, maths_rpc=StubMaths())

container.start()

with ServiceRpcClient('conversions') as client:
    assert client.cm_to_inches(127) == 50.0
nameko.testing.services.restrict_entrypoints(container, *entrypoints)

Restrict the entrypoints on container to those named in entrypoints.

This method must be called before the container is started.

Usage

The following service definition has two entrypoints:

class Service(object):
    name = "service"

    @timer(interval=1)
    def foo(self, arg):
        pass

    @rpc
    def bar(self, arg)
        pass

    @rpc
    def baz(self, arg):
        pass

container = ServiceContainer(Service)

To disable the timer entrypoint on foo, leaving just the RPC entrypoints:

restrict_entrypoints(container, "bar", "baz")

Note that it is not possible to identify multiple entrypoints on the same method individually.

nameko.testing.services.worker_factory(service_cls, **dependencies)

Return an instance of service_cls with its injected dependencies replaced with MagicMock objects, or as given in dependencies.

Usage

The following example service proxies calls to a “maths” service via an ServiceRpc dependency:

from nameko.rpc import ServiceRpc, rpc

class ConversionService(object):
    name = "conversions"

    maths_rpc = ServiceRpc("maths")

    @rpc
    def inches_to_cm(self, inches):
        return self.maths_rpc.multiply(inches, 2.54)

    @rpc
    def cm_to_inches(self, cms):
        return self.maths_rpc.divide(cms, 2.54)

Use the worker_factory to create an instance of ConversionService with its dependencies replaced by MagicMock objects:

service = worker_factory(ConversionService)

Nameko’s entrypoints do not modify the service methods, so instance methods can be called directly with the same signature. The replaced dependencies can be used as any other MagicMock object, so a complete unit test for the conversion service may look like this:

# create worker instance
service = worker_factory(ConversionService)

# replace "maths" service
service.maths_rpc.multiply.side_effect = lambda x, y: x * y
service.maths_rpc.divide.side_effect = lambda x, y: x / y

# test inches_to_cm business logic
assert service.inches_to_cm(300) == 762
service.maths_rpc.multiply.assert_called_once_with(300, 2.54)

# test cms_to_inches business logic
assert service.cms_to_inches(762) == 300
service.maths_rpc.divide.assert_called_once_with(762, 2.54)

Providing Dependencies

The **dependencies kwargs to worker_factory can be used to provide a replacement dependency instead of a mock. For example, to unit test a service against a real database:

If a named dependency provider does not exist on service_cls, a ExtensionNotFound exception is raised.

Common testing utilities.

class nameko.testing.utils.ResourcePipeline(create, destroy, size=3)

Creates and destroys resources in background threads.

Creates up to size resources ahead of time so the caller avoids waiting for lazy creation.

nameko.testing.utils.assert_stops_raising(fn, exception_type=<type 'exceptions.Exception'>, timeout=10, interval=0.1)

Assert that fn returns successfully within timeout seconds, trying every interval seconds.

If exception_type is provided, fail unless the exception thrown is an instance of exception_type. If not specified, any :class:`Exception instance is allowed.

nameko.testing.utils.get_container(runner, service_cls)

Inspect runner.containers and return the first item that is hosting an instance of service_cls.

nameko.testing.utils.get_extension(container, extension_cls, **match_attrs)

Inspect container.extensions and return the first item that is an instance of extension_cls.

Optionally also require that the instance has an attribute with a particular value as given in the match_attrs kwargs.

nameko.testing.utils.wait_for_call(*args, **kwds)

Return a context manager that waits timeout seconds for mock_method to be called, yielding the mock if so.

Raises an eventlet.Timeout if the method was not called within timeout seconds.

Utils

nameko.utils.get_redacted_args(entrypoint, *args, **kwargs)

Utility function for use with entrypoints that are marked with sensitive_arguments – e.g. nameko.rpc.Rpc and nameko.events.EventHandler.

Parameters:
entrypoint : Entrypoint

The entrypoint that fired.

args : tuple

Positional arguments for the method call.

kwargs : dict

Keyword arguments for the method call.

The entrypoint should have a sensitive_arguments attribute, the value of which is a string or tuple of strings specifying the arguments or partial arguments that should be redacted. To partially redact an argument, the following syntax is used:

<argument-name>.<dict-key>[<list-index>]
Returns:A dictionary as returned by inspect.getcallargs(), but with sensitive arguments or partial arguments redacted.

Note

This function does not raise if one of the sensitive_arguments doesn’t match or partially match the calling args and kwargs. This allows “fuzzier” pattern matching (e.g. redact a field if it is present, and otherwise do nothing).

To avoid exposing sensitive arguments through a typo, it is recommend to test the configuration of each entrypoint with sensitive_arguments individually. For example:

class Service(object):
    @rpc(sensitive_arguments="foo.bar")
    def method(self, foo):
        pass

container = ServiceContainer(Service, {})
entrypoint = get_extension(container, Rpc, method_name="method")

# no redaction
foo = "arg"
expected_foo = {'foo': "arg"}
assert get_redacted_args(entrypoint, foo) == expected

# 'bar' key redacted
foo = {'bar': "secret value", 'baz': "normal value"}
expected = {'foo': {'bar': "********", 'baz': "normal value"}}
assert get_redacted_args(entrypoint, foo) == expected

See also

The tests for this utility demonstrate its full usage: test.test_utils.TestGetRedactedArgs

nameko.utils.import_from_path(path)

Import and return the object at path if it exists.

Raises an ImportError if the object is not found.

nameko.utils.sanitize_url(url)

Redact password in urls.

Core

class nameko.extensions.Extension

Note that Extension.__init__ is called during bind() as well as at instantiation time, so avoid side-effects in this method. Use setup() instead.

Furthermore, bind() and iter_extensions() use introspection to find any sub-extensions that an extension may declare. Any descriptors on the extension should expect to be called during introspection, which happens between ServiceContainer.__init__ and ServiceContainer.setup.

Extension.container gives access to the ServiceContainer instance to which the Extension is bound, otherwise None.

bind(container)

Get an instance of this Extension to bind to container.

kill()

Called to stop this extension without grace.

Extensions should urgently shut down here. This means stopping as soon as possible by omitting cleanup. This may be distinct from stop() for certain dependencies.

Extensions should not raise during kill, since the container is already dying. Instead they should log what is appropriate and swallow the exception to allow the container kill to continue.

setup()

Called on bound Extensions before the container starts.

Extensions should do any required initialisation here.

start()

Called on bound Extensions when the container has successfully started.

This is only called after all other Extensions have successfully returned from Extension.setup(). If the Extension reacts to external events, it should now start acting upon them.

stop()

Called when the service container begins to shut down.

Extensions should do any graceful shutdown here.

nameko.extensions.iter_extensions(extension)

Depth-first iterator over sub-extensions on extension.

exception nameko.exceptions.CommandError

Raise from subcommands to report error back to the user

exception nameko.exceptions.ConnectionNotFound

Unknown websocket connection id

exception nameko.exceptions.ContainerBeingKilled

Raised by Container.spawn_worker() if it has started a kill sequence.

Entrypoints should catch this and react as if they hadn’t been available in the first place, e.g. an rpc consumer should probably requeue the message.

We need this because eventlet may yield during the execution of Container.kill(), giving entrypoints a chance to fire before they themselves have been killed.

exception nameko.exceptions.RemoteError(exc_type=None, value=u'')

Exception to raise at the caller if an exception occurred in the remote worker.

nameko.exceptions.deserialize(data)

Deserialize data to an exception instance.

If the exc_path value matches an exception registered as deserializable, return an instance of that exception type. Otherwise, return a RemoteError instance describing the exception that occurred.

nameko.exceptions.deserialize_to_instance(exc_type)

Decorator that registers exc_type as deserializable back into an instance, rather than a RemoteError. See deserialize().

nameko.exceptions.get_module_path(exc_type)

Return the dotted module path of exc_type, including the class name.

e.g.:

>>> get_module_path(MethodNotFound)
>>> "nameko.exceptions.MethodNotFound"
nameko.exceptions.safe_for_serialization(value)

Transform a value in preparation for serializing as json

no-op for strings, mappings and iterables have their entries made safe, and all other values are stringified, with a fallback value if that fails

nameko.exceptions.serialize(exc)

Serialize self.exc into a data dictionary representing it.

class nameko.serialization.SerializationConfig(serializer, accept)
accept

Alias for field number 1

serializer

Alias for field number 0

class nameko.runners.ServiceRunner

Allows the user to serve a number of services concurrently. The caller can register a number of service classes with a name and then use the start method to serve them and the stop and kill methods to stop them. The wait method will block until all services have stopped.

Example:

runner = ServiceRunner()
runner.add_service(Foobar)
runner.add_service(Spam)

add_sig_term_handler(runner.kill)

runner.start()

runner.wait()
add_service(cls)

Add a service class to the runner. There can only be one service class for a given service name. Service classes must be registered before calling start()

kill()

Kill all running containers concurrently. The method will block until all containers have stopped.

start()

Start all the registered services.

A new container is created for each service using the container class provided in the __init__ method.

All containers are started concurrently and the method will block until all have completed their startup routine.

stop()

Stop all running containers concurrently. The method blocks until all containers have stopped.

wait()

Wait for all running containers to stop.

nameko.runners.run_services(*args, **kwds)

Serves a number of services for a contextual block. The caller can specify a number of service classes then serve them either stopping (default) or killing them on exiting the contextual block.

Example:

with run_services(Foobar, Spam) as runner:
    # interact with services and stop them on exiting the block

# services stopped

Additional configuration available to :class:ServiceRunner instances can be specified through keyword arguments:

with run_services(Foobar, Spam, kill_on_exit=True):
    # interact with services

# services killed
Parameters:
services : service definitions

Services to be served for the contextual block

kill_on_exit : bool (default=False)

If True, run kill() on the service containers when exiting the contextual block. Otherwise stop() will be called on the service containers on exiting the block.

Returns:

The configured ServiceRunner instance