Testing Services


Nameko’s conventions are designed to make testing as easy as possible. Services are likely to be small and single-purpose, and dependency injection makes it simple to replace and isolate pieces of functionality.

The examples below use pytest, which is what Nameko’s own test suite uses, but the helpers are test framework agnostic.

Unit Testing

Unit testing in Nameko usually means testing a single service in isolation – i.e. without any or most of its dependencies.

The worker_factory() utility will create a worker from a given service class, with its dependencies replaced by mock.MagicMock objects. Dependency functionality can then be imitated by adding side_effects and return_values:

""" Service unit testing best practice.

from nameko.rpc import RpcProxy, rpc
from nameko.testing.services import worker_factory

class ConversionService(object):
    """ Service under test
    name = "conversions"

    maths_rpc = RpcProxy("maths")

    def inches_to_cm(self, inches):
        return self.maths_rpc.multiply(inches, 2.54)

    def cms_to_inches(self, cms):
        return self.maths_rpc.divide(cms, 2.54)

def test_conversion_service():
    # create worker with mock dependencies
    service = worker_factory(ConversionService)

    # add side effects to the mock proxy to the "maths" service
    service.maths_rpc.multiply.side_effect = lambda x, y: x * y
    service.maths_rpc.divide.side_effect = lambda x, y: x / y

    # test inches_to_cm business logic
    assert service.inches_to_cm(300) == 762
    service.maths_rpc.multiply.assert_called_once_with(300, 2.54)

    # test cms_to_inches business logic
    assert service.cms_to_inches(762) == 300
    service.maths_rpc.divide.assert_called_once_with(762, 2.54)

In some circumstances it’s helpful to provide an alternative dependency, rather than use a mock. This may be a fully functioning replacement (e.g. a test database session) or a lightweight shim that provides partial functionality.

""" Service unit testing best practice, with an alternative dependency.

import pytest
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from nameko.rpc import rpc
from nameko.testing.services import worker_factory

# using community extension from http://pypi.python.org/pypi/nameko-sqlalchemy
from nameko_sqlalchemy import Session

Base = declarative_base()

class Result(Base):
    __tablename__ = 'model'
    id = Column(Integer, primary_key=True)
    value = Column(String(64))

class Service:
    """ Service under test
    name = "service"

    db = Session(Base)

    def save(self, value):
        result = Result(value=value)

def session():
    """ Create a test database and session
    engine = create_engine('sqlite:///:memory:')
    session_cls = sessionmaker(bind=engine)
    return session_cls()

def test_service(session):

    # create instance, providing the test database session
    service = worker_factory(Service, db=session)

    # verify ``save`` logic by querying the test database
    assert session.query(Result.value).all() == [("helloworld",)]

Integration Testing

Integration testing in Nameko means testing the interface between a number of services. The recommended way is to run all the services being tested in the normal way, and trigger behaviour by “firing” an entrypoint using a helper:

""" Service integration testing best practice.

from nameko.rpc import rpc, RpcProxy
from nameko.testing.utils import get_container
from nameko.testing.services import entrypoint_hook

class ServiceX:
    """ Service under test
    name = "service_x"

    y = RpcProxy("service_y")

    def remote_method(self, value):
        res = "{}-x".format(value)
        return self.y.append_identifier(res)

class ServiceY:
    """ Service under test
    name = "service_y"

    def append_identifier(self, value):
        return "{}-y".format(value)

def test_service_x_y_integration(runner_factory, rabbit_config):

    # run services in the normal manner
    runner = runner_factory(rabbit_config, ServiceX, ServiceY)

    # artificially fire the "remote_method" entrypoint on ServiceX
    # and verify response
    container = get_container(runner, ServiceX)
    with entrypoint_hook(container, "remote_method") as entrypoint:
        assert entrypoint("value") == "value-x-y"

Note that the interface between ServiceX and ServiceY here is just as if under normal operation.

Interfaces that are out of scope for a particular test can be deactivated with one of the following test helpers:


nameko.testing.services.restrict_entrypoints(container, *entrypoints)

Restrict the entrypoints on container to those named in entrypoints.

This method must be called before the container is started.


The following service definition has two entrypoints:

class Service(object):
    name = "service"

    def foo(self, arg):

    def bar(self, arg)

    def baz(self, arg):

container = ServiceContainer(Service, config)

To disable the timer entrypoint on foo, leaving just the RPC entrypoints:

restrict_entrypoints(container, "bar", "baz")

Note that it is not possible to identify multiple entrypoints on the same method individually.


nameko.testing.services.replace_dependencies(container, *dependencies, **dependency_map)

Replace the dependency providers on container with instances of MockDependencyProvider.

Dependencies named in *dependencies will be replaced with a MockDependencyProvider, which injects a MagicMock instead of the dependency.

Alternatively, you may use keyword arguments to name a dependency and provide the replacement value that the MockDependencyProvider should inject.

Return the MockDependencyProvider.dependency for every dependency specified in the (*dependencies) args so that calls to the replaced dependencies can be inspected. Return a single object if only one dependency was replaced, and a generator yielding the replacements in the same order as dependencies otherwise. Note that any replaced dependencies specified via kwargs **dependency_map will not be returned.

Replacements are made on the container instance and have no effect on the service class. New container instances are therefore unaffected by replacements on previous instances.


from nameko.rpc import RpcProxy, rpc
from nameko.standalone.rpc import ServiceRpcProxy

class ConversionService(object):
    name = "conversions"

    maths_rpc = RpcProxy("maths")

    def inches_to_cm(self, inches):
        return self.maths_rpc.multiply(inches, 2.54)

    def cm_to_inches(self, cms):
        return self.maths_rpc.divide(cms, 2.54)

container = ServiceContainer(ConversionService, config)
mock_maths_rpc = replace_dependencies(container, "maths_rpc")
mock_maths_rpc.divide.return_value = 39.37


with ServiceRpcProxy('conversions', config) as proxy:

# assert that the dependency was called as expected
mock_maths_rpc.divide.assert_called_once_with(100, 2.54)

Providing a specific replacement by keyword:

class StubMaths(object):

    def divide(self, val1, val2):
        return val1 / val2

replace_dependencies(container, maths_rpc=StubMaths())


with ServiceRpcProxy('conversions', config) as proxy:
    assert proxy.cm_to_inches(127) == 50.0

Complete Example

The following integration testing example makes use of both scope-restricting helpers:

This file defines several toy services that interact to form a shop of the
famous ACME Corporation. The AcmeShopService relies on the StockService,
InvoiceService and PaymentService to fulfil its orders. They are not best
practice examples! They're minimal services provided for the test at the
bottom of the file.

``test_shop_integration`` is a full integration test of the ACME shop
"checkout flow". It demonstrates how to test the multiple ACME services in
combination with each other, including limiting service interactions by
replacing certain entrypoints and dependencies.

from collections import defaultdict

import pytest

from nameko.extensions import DependencyProvider
from nameko.events import EventDispatcher, event_handler
from nameko.exceptions import RemoteError
from nameko.rpc import rpc, RpcProxy
from nameko.standalone.rpc import ServiceRpcProxy
from nameko.testing.services import replace_dependencies, restrict_entrypoints
from nameko.testing.utils import get_container
from nameko.timer import timer

class NotLoggedInError(Exception):

class ItemOutOfStockError(Exception):

class ItemDoesNotExistError(Exception):

class ShoppingBasket(DependencyProvider):
    """ A shopping basket tied to the current ``user_id``.
    def __init__(self):
        self.baskets = defaultdict(list)

    def get_dependency(self, worker_ctx):

        class Basket(object):
            def __init__(self, basket):
                self._basket = basket
                self.worker_ctx = worker_ctx

            def add(self, item):

            def __iter__(self):
                for item in self._basket:
                    yield item

            user_id = worker_ctx.data['user_id']
        except KeyError:
            raise NotLoggedInError()
        return Basket(self.baskets[user_id])

class AcmeShopService:
    name = "acmeshopservice"

    user_basket = ShoppingBasket()
    stock_rpc = RpcProxy('stockservice')
    invoice_rpc = RpcProxy('invoiceservice')
    payment_rpc = RpcProxy('paymentservice')

    fire_event = EventDispatcher()

    def add_to_basket(self, item_code):
        """ Add item identified by ``item_code`` to the shopping basket.

        This is a toy example! Ignore the obvious race condition.
        stock_level = self.stock_rpc.check_stock(item_code)
        if stock_level > 0:
            self.fire_event("item_added_to_basket", item_code)
            return item_code

        raise ItemOutOfStockError(item_code)

    def checkout(self):
        """ Take payment for all items in the shopping basket.
        total_price = sum(self.stock_rpc.check_price(item)
                          for item in self.user_basket)

        # prepare invoice
        invoice = self.invoice_rpc.prepare_invoice(total_price)

        # take payment

        # fire checkout event if prepare_invoice and take_payment succeeded
        checkout_event_data = {
            'invoice': invoice,
            'items': list(self.user_basket)
        self.fire_event("checkout_complete", checkout_event_data)
        return total_price

class Warehouse(DependencyProvider):
    """ A database of items in the warehouse.

    This is a toy example! A dictionary is not a database.
    def __init__(self):
        self.database = {
            'anvil': {
                'price': 100,
                'stock': 3
            'dehydrated_boulders': {
                'price': 999,
                'stock': 12
            'invisible_paint': {
                'price': 10,
                'stock': 30
            'toothpicks': {
                'price': 1,
                'stock': 0

    def get_dependency(self, worker_ctx):
        return self.database

class StockService:
    name = "stockservice"

    warehouse = Warehouse()

    def check_price(self, item_code):
        """ Check the price of an item.
            return self.warehouse[item_code]['price']
        except KeyError:
            raise ItemDoesNotExistError(item_code)

    def check_stock(self, item_code):
        """ Check the stock level of an item.
            return self.warehouse[item_code]['stock']
        except KeyError:
            raise ItemDoesNotExistError(item_code)

    def monitor_stock(self):
        """ Periodic stock monitoring method. Can also be triggered manually
        over RPC.

        This is an expensive process that we don't want to exercise during
        integration testing...
        raise NotImplementedError()

    @event_handler('acmeshopservice', "checkout_complete")
    def dispatch_items(self, event_data):
        """ Dispatch items from stock on successful checkouts.

        This is an expensive process that we don't want to exercise during
        integration testing...
        raise NotImplementedError()

class AddressBook(DependencyProvider):
    """ A database of user details, keyed on user_id.
    def __init__(self):
        self.address_book = {
            'wile_e_coyote': {
                'username': 'wile_e_coyote',
                'fullname': 'Wile E Coyote',
                'address': '12 Long Road, High Cliffs, Utah',

    def get_dependency(self, worker_ctx):
        def get_user_details():
                user_id = worker_ctx.data['user_id']
            except KeyError:
                raise NotLoggedInError()
            return self.address_book.get(user_id)
        return get_user_details

class InvoiceService:
    name = "invoiceservice"

    get_user_details = AddressBook()

    def prepare_invoice(self, amount):
        """ Prepare an invoice for ``amount`` for the current user.
        address = self.get_user_details().get('address')
        fullname = self.get_user_details().get('fullname')
        username = self.get_user_details().get('username')

        msg = "Dear {}. Please pay ${} to ACME Corp.".format(fullname, amount)
        invoice = {
            'message': msg,
            'amount': amount,
            'customer': username,
            'address': address
        return invoice

class PaymentService:
    name = "paymentservice"

    def take_payment(self, invoice):
        """ Take payment from a customer according to ``invoice``.

        This is an expensive process that we don't want to exercise during
        integration testing...
        raise NotImplementedError()

# =============================================================================
# Begin test
# =============================================================================

def rpc_proxy_factory(rabbit_config):
    """ Factory fixture for standalone RPC proxies.

    Proxies are started automatically so they can be used without a ``with``
    statement. All created proxies are stopped at the end of the test, when
    this fixture closes.
    all_proxies = []

    def make_proxy(service_name, **kwargs):
        proxy = ServiceRpcProxy(service_name, rabbit_config, **kwargs)
        return proxy.start()

    yield make_proxy

    for proxy in all_proxies:

def test_shop_checkout_integration(
    rabbit_config, runner_factory, rpc_proxy_factory
    """ Simulate a checkout flow as an integration test.

    Requires instances of AcmeShopService, StockService and InvoiceService
    to be running. Explicitly replaces the rpc proxy to PaymentService so
    that service doesn't need to be hosted.

    Also replaces the event dispatcher dependency on AcmeShopService and
    disables the timer entrypoint on StockService. Limiting the interactions
    of services in this way reduces the scope of the integration test and
    eliminates undesirable side-effects (e.g. processing events unnecessarily).
    context_data = {'user_id': 'wile_e_coyote'}
    shop = rpc_proxy_factory('acmeshopservice', context_data=context_data)

    runner = runner_factory(
        rabbit_config, AcmeShopService, StockService, InvoiceService)

    # replace ``event_dispatcher`` and ``payment_rpc``  dependencies on
    # AcmeShopService with ``MockDependencyProvider``\s
    shop_container = get_container(runner, AcmeShopService)
    fire_event, payment_rpc = replace_dependencies(
        shop_container, "fire_event", "payment_rpc")

    # restrict entrypoints on StockService
    stock_container = get_container(runner, StockService)
    restrict_entrypoints(stock_container, "check_price", "check_stock")


    # add some items to the basket
    assert shop.add_to_basket("anvil") == "anvil"
    assert shop.add_to_basket("invisible_paint") == "invisible_paint"

    # try to buy something that's out of stock
    with pytest.raises(RemoteError) as exc_info:
    assert exc_info.value.exc_type == "ItemOutOfStockError"

    # provide a mock response from the payment service
    payment_rpc.take_payment.return_value = "Payment complete."

    # checkout
    res = shop.checkout()

    total_amount = 100 + 10
    assert res == total_amount

    # verify integration with mocked out payment service
        'customer': "wile_e_coyote",
        'address': "12 Long Road, High Cliffs, Utah",
        'amount': total_amount,
        'message': "Dear Wile E Coyote. Please pay $110 to ACME Corp."

    # verify events fired as expected
    assert fire_event.call_count == 3

if __name__ == "__main__":
    import sys

Other Helpers


The entrypoint hook allows a service entrypoint to be called manually. This is useful during integration testing if it is difficult or expensive to fake to external event that would cause an entrypoint to fire.

You can provide context_data for the call to mimic specific call context, for example language, user agent or auth token.

import pytest

from nameko.contextdata import Language
from nameko.rpc import rpc
from nameko.testing.services import entrypoint_hook

class HelloService:
    """ Service under test
    name = "hello_service"

    language = Language()

    def hello(self, name):
        greeting = "Hello"
        if self.language == "fr":
            greeting = "Bonjour"
        elif self.language == "de":
            greeting = "Gutentag"

        return "{}, {}!".format(greeting, name)

@pytest.mark.parametrize("language, greeting", [
    ("en", "Hello"),
    ("fr", "Bonjour"),
    ("de", "Gutentag"),
def test_hello_languages(language, greeting, container_factory, rabbit_config):

    container = container_factory(HelloService, rabbit_config)

    context_data = {'language': language}
    with entrypoint_hook(container, 'hello', context_data) as hook:
        assert hook("Matt") == "{}, Matt!".format(greeting)


The entrypoint waiter is a context manager that does not exit until a named entrypoint has fired and completed. This is useful when testing integration points between services that are asynchronous, for example receiving an event:

from nameko.events import event_handler
from nameko.standalone.events import event_dispatcher
from nameko.testing.services import entrypoint_waiter

class ServiceB:
    """ Event listening service.
    name = "service_b"

    @event_handler("service_a", "event_type")
    def handle_event(self, payload):
        print("service b received", payload)

def test_event_interface(container_factory, rabbit_config):

    container = container_factory(ServiceB, rabbit_config)

    dispatch = event_dispatcher(rabbit_config)

    # prints "service b received payload" before "exited"
    with entrypoint_waiter(container, 'handle_event'):
        dispatch("service_a", "event_type", "payload")

Note that the context manager waits not only for the entrypoint method to complete, but also for any dependency teardown. Dependency-based loggers such as (TODO: link to bundled logger) for example would have also completed.

Using pytest

Nameko’s test suite uses pytest, and makes some useful configuration and fixtures available for your own tests if you choose to use pytest.

They are contained in nameko.testing.pytest. This module is automatically registered as a pytest plugin by setuptools if you have pytest installed.