Testing Services

Philosophy

Nameko’s conventions are designed to make testing as easy as possible. Services are likely to be small and single-purpose, and dependency injection makes it simple to replace and isolate pieces of functionality.

The examples below use pytest, which is what Nameko’s own test suite uses, but the helpers are test framework agnostic.

Unit Testing

Unit testing in Nameko usually means testing a single service in isolation – i.e. without any or most of its dependencies.

The worker_factory() utility will create a worker from a given service class, with its dependencies replaced by mock.MagicMock objects. Dependency functionality can then be imitated by adding side_effects and return_values:

""" Service unit testing best practice.
"""

from nameko.rpc import RpcProxy, rpc
from nameko.testing.services import worker_factory


class ConversionService(object):
    """ Service under test
    """
    name = "conversions"

    maths_rpc = RpcProxy("maths")

    @rpc
    def inches_to_cm(self, inches):
        return self.maths_rpc.multiply(inches, 2.54)

    @rpc
    def cms_to_inches(self, cms):
        return self.maths_rpc.divide(cms, 2.54)


def test_conversion_service():
    # create worker with mock dependencies
    service = worker_factory(ConversionService)

    # add side effects to the mock proxy to the "maths" service
    service.maths_rpc.multiply.side_effect = lambda x, y: x * y
    service.maths_rpc.divide.side_effect = lambda x, y: x / y

    # test inches_to_cm business logic
    assert service.inches_to_cm(300) == 762
    service.maths_rpc.multiply.assert_called_once_with(300, 2.54)

    # test cms_to_inches business logic
    assert service.cms_to_inches(762) == 300
    service.maths_rpc.divide.assert_called_once_with(762, 2.54)

In some circumstances it’s helpful to provide an alternative dependency, rather than use a mock. This may be a fully functioning replacement (e.g. a test database session) or a lightweight shim that provides partial functionality.

""" Service unit testing best practice, with an alternative dependency.
"""

import pytest
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from nameko.rpc import rpc
from nameko.testing.services import worker_factory

# using community extension from http://pypi.python.org/pypi/nameko-sqlalchemy
from nameko_sqlalchemy import Session


Base = declarative_base()


class Result(Base):
    __tablename__ = 'model'
    id = Column(Integer, primary_key=True)
    value = Column(String(64))


class Service:
    """ Service under test
    """
    name = "service"

    db = Session(Base)

    @rpc
    def save(self, value):
        result = Result(value=value)
        self.db.add(result)
        self.db.commit()


@pytest.fixture
def session():
    """ Create a test database and session
    """
    engine = create_engine('sqlite:///:memory:')
    Base.metadata.create_all(engine)
    session_cls = sessionmaker(bind=engine)
    return session_cls()


def test_service(session):

    # create instance, providing the test database session
    service = worker_factory(Service, db=session)

    # verify ``save`` logic by querying the test database
    service.save("helloworld")
    assert session.query(Result.value).all() == [("helloworld",)]

Integration Testing

Integration testing in Nameko means testing the interface between a number of services. The recommended way is to run all the services being tested in the normal way, and trigger behaviour by “firing” an entrypoint using a helper:

""" Service integration testing best practice.
"""

from nameko.rpc import rpc, RpcProxy
from nameko.testing.utils import get_container
from nameko.testing.services import entrypoint_hook


class ServiceX:
    """ Service under test
    """
    name = "service_x"

    y = RpcProxy("service_y")

    @rpc
    def remote_method(self, value):
        res = "{}-x".format(value)
        return self.y.append_identifier(res)


class ServiceY:
    """ Service under test
    """
    name = "service_y"

    @rpc
    def append_identifier(self, value):
        return "{}-y".format(value)


def test_service_x_y_integration(runner_factory, rabbit_config):

    # run services in the normal manner
    runner = runner_factory(rabbit_config, ServiceX, ServiceY)
    runner.start()

    # artificially fire the "remote_method" entrypoint on ServiceX
    # and verify response
    container = get_container(runner, ServiceX)
    with entrypoint_hook(container, "remote_method") as entrypoint:
        assert entrypoint("value") == "value-x-y"

Note that the interface between ServiceX and ServiceY here is just as if under normal operation.

Interfaces that are out of scope for a particular test can be deactivated with one of the following test helpers:

restrict_entrypoints

nameko.testing.services.restrict_entrypoints(container, *entrypoints)

Restrict the entrypoints on container to those named in entrypoints.

This method must be called before the container is started.

Usage

The following service definition has two entrypoints:

class Service(object):
    name = "service"

    @timer(interval=1)
    def foo(self, arg):
        pass

    @rpc
    def bar(self, arg)
        pass

    @rpc
    def baz(self, arg):
        pass

container = ServiceContainer(Service, config)

To disable the timer entrypoint on foo, leaving just the RPC entrypoints:

restrict_entrypoints(container, "bar", "baz")

Note that it is not possible to identify multiple entrypoints on the same method individually.

replace_dependencies

nameko.testing.services.replace_dependencies(container, *dependencies, **dependency_map)

Replace the dependency providers on container with instances of MockDependencyProvider.

Dependencies named in *dependencies will be replaced with a MockDependencyProvider, which injects a MagicMock instead of the dependency.

Alternatively, you may use keyword arguments to name a dependency and provide the replacement value that the MockDependencyProvider should inject.

Return the MockDependencyProvider.dependency for every dependency specified in the (*dependencies) args so that calls to the replaced dependencies can be inspected. Return a single object if only one dependency was replaced, and a generator yielding the replacements in the same order as dependencies otherwise. Note that any replaced dependencies specified via kwargs **dependency_map will not be returned.

Replacements are made on the container instance and have no effect on the service class. New container instances are therefore unaffected by replacements on previous instances.

Usage

from nameko.rpc import RpcProxy, rpc
from nameko.standalone.rpc import ServiceRpcProxy

class ConversionService(object):
    name = "conversions"

    maths_rpc = RpcProxy("maths")

    @rpc
    def inches_to_cm(self, inches):
        return self.maths_rpc.multiply(inches, 2.54)

    @rpc
    def cm_to_inches(self, cms):
        return self.maths_rpc.divide(cms, 2.54)

container = ServiceContainer(ConversionService, config)
mock_maths_rpc = replace_dependencies(container, "maths_rpc")
mock_maths_rpc.divide.return_value = 39.37

container.start()

with ServiceRpcProxy('conversions', config) as proxy:
    proxy.cm_to_inches(100)

# assert that the dependency was called as expected
mock_maths_rpc.divide.assert_called_once_with(100, 2.54)

Providing a specific replacement by keyword:

class StubMaths(object):

    def divide(self, val1, val2):
        return val1 / val2

replace_dependencies(container, maths_rpc=StubMaths())

container.start()

with ServiceRpcProxy('conversions', config) as proxy:
    assert proxy.cm_to_inches(127) == 50.0

Complete Example

The following integration testing example makes use of both scope-restricting helpers:

"""
This file defines several toy services that interact to form a shop of the
famous ACME Corporation. The AcmeShopService relies on the StockService,
InvoiceService and PaymentService to fulfil its orders. They are not best
practice examples! They're minimal services provided for the test at the
bottom of the file.

``test_shop_integration`` is a full integration test of the ACME shop
"checkout flow". It demonstrates how to test the multiple ACME services in
combination with each other, including limiting service interactions by
replacing certain entrypoints and dependencies.
"""

from collections import defaultdict

import pytest

from nameko.extensions import DependencyProvider
from nameko.events import EventDispatcher, event_handler
from nameko.exceptions import RemoteError
from nameko.rpc import rpc, RpcProxy
from nameko.standalone.rpc import ServiceRpcProxy
from nameko.testing.services import replace_dependencies, restrict_entrypoints
from nameko.testing.utils import get_container
from nameko.timer import timer


class NotLoggedInError(Exception):
    pass


class ItemOutOfStockError(Exception):
    pass


class ItemDoesNotExistError(Exception):
    pass


class ShoppingBasket(DependencyProvider):
    """ A shopping basket tied to the current ``user_id``.
    """
    def __init__(self):
        self.baskets = defaultdict(list)

    def get_dependency(self, worker_ctx):

        class Basket(object):
            def __init__(self, basket):
                self._basket = basket
                self.worker_ctx = worker_ctx

            def add(self, item):
                self._basket.append(item)

            def __iter__(self):
                for item in self._basket:
                    yield item

        try:
            user_id = worker_ctx.data['user_id']
        except KeyError:
            raise NotLoggedInError()
        return Basket(self.baskets[user_id])


class AcmeShopService:
    name = "acmeshopservice"

    user_basket = ShoppingBasket()
    stock_rpc = RpcProxy('stockservice')
    invoice_rpc = RpcProxy('invoiceservice')
    payment_rpc = RpcProxy('paymentservice')

    fire_event = EventDispatcher()

    @rpc
    def add_to_basket(self, item_code):
        """ Add item identified by ``item_code`` to the shopping basket.

        This is a toy example! Ignore the obvious race condition.
        """
        stock_level = self.stock_rpc.check_stock(item_code)
        if stock_level > 0:
            self.user_basket.add(item_code)
            self.fire_event("item_added_to_basket", item_code)
            return item_code

        raise ItemOutOfStockError(item_code)

    @rpc
    def checkout(self):
        """ Take payment for all items in the shopping basket.
        """
        total_price = sum(self.stock_rpc.check_price(item)
                          for item in self.user_basket)

        # prepare invoice
        invoice = self.invoice_rpc.prepare_invoice(total_price)

        # take payment
        self.payment_rpc.take_payment(invoice)

        # fire checkout event if prepare_invoice and take_payment succeeded
        checkout_event_data = {
            'invoice': invoice,
            'items': list(self.user_basket)
        }
        self.fire_event("checkout_complete", checkout_event_data)
        return total_price


class Warehouse(DependencyProvider):
    """ A database of items in the warehouse.

    This is a toy example! A dictionary is not a database.
    """
    def __init__(self):
        self.database = {
            'anvil': {
                'price': 100,
                'stock': 3
            },
            'dehydrated_boulders': {
                'price': 999,
                'stock': 12
            },
            'invisible_paint': {
                'price': 10,
                'stock': 30
            },
            'toothpicks': {
                'price': 1,
                'stock': 0
            }
        }

    def get_dependency(self, worker_ctx):
        return self.database


class StockService:
    name = "stockservice"

    warehouse = Warehouse()

    @rpc
    def check_price(self, item_code):
        """ Check the price of an item.
        """
        try:
            return self.warehouse[item_code]['price']
        except KeyError:
            raise ItemDoesNotExistError(item_code)

    @rpc
    def check_stock(self, item_code):
        """ Check the stock level of an item.
        """
        try:
            return self.warehouse[item_code]['stock']
        except KeyError:
            raise ItemDoesNotExistError(item_code)

    @rpc
    @timer(100)
    def monitor_stock(self):
        """ Periodic stock monitoring method. Can also be triggered manually
        over RPC.

        This is an expensive process that we don't want to exercise during
        integration testing...
        """
        raise NotImplementedError()

    @event_handler('acmeshopservice', "checkout_complete")
    def dispatch_items(self, event_data):
        """ Dispatch items from stock on successful checkouts.

        This is an expensive process that we don't want to exercise during
        integration testing...
        """
        raise NotImplementedError()


class AddressBook(DependencyProvider):
    """ A database of user details, keyed on user_id.
    """
    def __init__(self):
        self.address_book = {
            'wile_e_coyote': {
                'username': 'wile_e_coyote',
                'fullname': 'Wile E Coyote',
                'address': '12 Long Road, High Cliffs, Utah',
            },
        }

    def get_dependency(self, worker_ctx):
        def get_user_details():
            try:
                user_id = worker_ctx.data['user_id']
            except KeyError:
                raise NotLoggedInError()
            return self.address_book.get(user_id)
        return get_user_details


class InvoiceService:
    name = "invoiceservice"

    get_user_details = AddressBook()

    @rpc
    def prepare_invoice(self, amount):
        """ Prepare an invoice for ``amount`` for the current user.
        """
        address = self.get_user_details().get('address')
        fullname = self.get_user_details().get('fullname')
        username = self.get_user_details().get('username')

        msg = "Dear {}. Please pay ${} to ACME Corp.".format(fullname, amount)
        invoice = {
            'message': msg,
            'amount': amount,
            'customer': username,
            'address': address
        }
        return invoice


class PaymentService:
    name = "paymentservice"

    @rpc
    def take_payment(self, invoice):
        """ Take payment from a customer according to ``invoice``.

        This is an expensive process that we don't want to exercise during
        integration testing...
        """
        raise NotImplementedError()

# =============================================================================
# Begin test
# =============================================================================


@pytest.yield_fixture
def rpc_proxy_factory(rabbit_config):
    """ Factory fixture for standalone RPC proxies.

    Proxies are started automatically so they can be used without a ``with``
    statement. All created proxies are stopped at the end of the test, when
    this fixture closes.
    """
    all_proxies = []

    def make_proxy(service_name, **kwargs):
        proxy = ServiceRpcProxy(service_name, rabbit_config, **kwargs)
        all_proxies.append(proxy)
        return proxy.start()

    yield make_proxy

    for proxy in all_proxies:
        proxy.stop()


def test_shop_checkout_integration(
    rabbit_config, runner_factory, rpc_proxy_factory
):
    """ Simulate a checkout flow as an integration test.

    Requires instances of AcmeShopService, StockService and InvoiceService
    to be running. Explicitly replaces the rpc proxy to PaymentService so
    that service doesn't need to be hosted.

    Also replaces the event dispatcher dependency on AcmeShopService and
    disables the timer entrypoint on StockService. Limiting the interactions
    of services in this way reduces the scope of the integration test and
    eliminates undesirable side-effects (e.g. processing events unnecessarily).
    """
    context_data = {'user_id': 'wile_e_coyote'}
    shop = rpc_proxy_factory('acmeshopservice', context_data=context_data)

    runner = runner_factory(
        rabbit_config, AcmeShopService, StockService, InvoiceService)

    # replace ``event_dispatcher`` and ``payment_rpc``  dependencies on
    # AcmeShopService with ``MockDependencyProvider``\s
    shop_container = get_container(runner, AcmeShopService)
    fire_event, payment_rpc = replace_dependencies(
        shop_container, "fire_event", "payment_rpc")

    # restrict entrypoints on StockService
    stock_container = get_container(runner, StockService)
    restrict_entrypoints(stock_container, "check_price", "check_stock")

    runner.start()

    # add some items to the basket
    assert shop.add_to_basket("anvil") == "anvil"
    assert shop.add_to_basket("invisible_paint") == "invisible_paint"

    # try to buy something that's out of stock
    with pytest.raises(RemoteError) as exc_info:
        shop.add_to_basket("toothpicks")
    assert exc_info.value.exc_type == "ItemOutOfStockError"

    # provide a mock response from the payment service
    payment_rpc.take_payment.return_value = "Payment complete."

    # checkout
    res = shop.checkout()

    total_amount = 100 + 10
    assert res == total_amount

    # verify integration with mocked out payment service
    payment_rpc.take_payment.assert_called_once_with({
        'customer': "wile_e_coyote",
        'address': "12 Long Road, High Cliffs, Utah",
        'amount': total_amount,
        'message': "Dear Wile E Coyote. Please pay $110 to ACME Corp."
    })

    # verify events fired as expected
    assert fire_event.call_count == 3


if __name__ == "__main__":
    import sys
    pytest.main(sys.argv)

Other Helpers

entrypoint_hook

The entrypoint hook allows a service entrypoint to be called manually. This is useful during integration testing if it is difficult or expensive to fake to external event that would cause an entrypoint to fire.

You can provide context_data for the call to mimic specific call context, for example language, user agent or auth token.

import pytest

from nameko.contextdata import Language
from nameko.rpc import rpc
from nameko.testing.services import entrypoint_hook


class HelloService:
    """ Service under test
    """
    name = "hello_service"

    language = Language()

    @rpc
    def hello(self, name):
        greeting = "Hello"
        if self.language == "fr":
            greeting = "Bonjour"
        elif self.language == "de":
            greeting = "Gutentag"

        return "{}, {}!".format(greeting, name)


@pytest.mark.parametrize("language, greeting", [
    ("en", "Hello"),
    ("fr", "Bonjour"),
    ("de", "Gutentag"),
])
def test_hello_languages(language, greeting, container_factory, rabbit_config):

    container = container_factory(HelloService, rabbit_config)
    container.start()

    context_data = {'language': language}
    with entrypoint_hook(container, 'hello', context_data) as hook:
        assert hook("Matt") == "{}, Matt!".format(greeting)

entrypoint_waiter

The entrypoint waiter is a context manager that does not exit until a named entrypoint has fired and completed. This is useful when testing integration points between services that are asynchronous, for example receiving an event:

from nameko.events import event_handler
from nameko.standalone.events import event_dispatcher
from nameko.testing.services import entrypoint_waiter


class ServiceB:
    """ Event listening service.
    """
    name = "service_b"

    @event_handler("service_a", "event_type")
    def handle_event(self, payload):
        print("service b received", payload)


def test_event_interface(container_factory, rabbit_config):

    container = container_factory(ServiceB, rabbit_config)
    container.start()

    dispatch = event_dispatcher(rabbit_config)

    # prints "service b received payload" before "exited"
    with entrypoint_waiter(container, 'handle_event'):
        dispatch("service_a", "event_type", "payload")
    print("exited")

Note that the context manager waits not only for the entrypoint method to complete, but also for any dependency teardown. Dependency-based loggers such as (TODO: link to bundled logger) for example would have also completed.

Using pytest

Nameko’s test suite uses pytest, and makes some useful configuration and fixtures available for your own tests if you choose to use pytest.

They are contained in nameko.testing.pytest. This module is automatically registered as a pytest plugin by setuptools if you have pytest installed.