Testing Services¶
Philosophy¶
Nameko’s conventions are designed to make testing as easy as possible. Services are likely to be small and single-purpose, and dependency injection makes it simple to replace and isolate pieces of functionality.
The examples below use pytest, which is what Nameko’s own test suite uses, but the helpers are test framework agnostic.
Unit Testing¶
Unit testing in Nameko usually means testing a single service in isolation – i.e. without any or most of its dependencies.
The worker_factory()
utility will create a worker from a given service class, with its dependencies replaced by mock.MagicMock
objects. Dependency functionality can then be imitated by adding side_effect
s and return_value
s:
""" Service unit testing best practice.
"""
from nameko.rpc import RpcProxy, rpc
from nameko.testing.services import worker_factory
class ConversionService(object):
""" Service under test
"""
name = "conversions"
maths_rpc = RpcProxy("maths")
@rpc
def inches_to_cm(self, inches):
return self.maths_rpc.multiply(inches, 2.54)
@rpc
def cms_to_inches(self, cms):
return self.maths_rpc.divide(cms, 2.54)
def test_conversion_service():
# create worker with mock dependencies
service = worker_factory(ConversionService)
# add side effects to the mock proxy to the "maths" service
service.maths_rpc.multiply.side_effect = lambda x, y: x * y
service.maths_rpc.divide.side_effect = lambda x, y: x / y
# test inches_to_cm business logic
assert service.inches_to_cm(300) == 762
service.maths_rpc.multiply.assert_called_once_with(300, 2.54)
# test cms_to_inches business logic
assert service.cms_to_inches(762) == 300
service.maths_rpc.divide.assert_called_once_with(762, 2.54)
In some circumstances it’s helpful to provide an alternative dependency, rather than use a mock. This may be a fully functioning replacement (e.g. a test database session) or a lightweight shim that provides partial functionality.
""" Service unit testing best practice, with an alternative dependency.
"""
import pytest
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from nameko.rpc import rpc
from nameko.testing.services import worker_factory
# using community extension from http://pypi.python.org/pypi/nameko-sqlalchemy
from nameko_sqlalchemy import Session
Base = declarative_base()
class Result(Base):
__tablename__ = 'model'
id = Column(Integer, primary_key=True)
value = Column(String(64))
class Service:
""" Service under test
"""
name = "service"
db = Session(Base)
@rpc
def save(self, value):
result = Result(value=value)
self.db.add(result)
self.db.commit()
@pytest.fixture
def session():
""" Create a test database and session
"""
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
session_cls = sessionmaker(bind=engine)
return session_cls()
def test_service(session):
# create instance, providing the test database session
service = worker_factory(Service, db=session)
# verify ``save`` logic by querying the test database
service.save("helloworld")
assert session.query(Result.value).all() == [("helloworld",)]
Integration Testing¶
Integration testing in Nameko means testing the interface between a number of services. The recommended way is to run all the services being tested in the normal way, and trigger behaviour by “firing” an entrypoint using a helper:
""" Service integration testing best practice.
"""
from nameko.rpc import rpc, RpcProxy
from nameko.testing.utils import get_container
from nameko.testing.services import entrypoint_hook
class ServiceX:
""" Service under test
"""
name = "service_x"
y = RpcProxy("service_y")
@rpc
def remote_method(self, value):
res = "{}-x".format(value)
return self.y.append_identifier(res)
class ServiceY:
""" Service under test
"""
name = "service_y"
@rpc
def append_identifier(self, value):
return "{}-y".format(value)
def test_service_x_y_integration(runner_factory, rabbit_config):
# run services in the normal manner
runner = runner_factory(rabbit_config, ServiceX, ServiceY)
runner.start()
# artificially fire the "remote_method" entrypoint on ServiceX
# and verify response
container = get_container(runner, ServiceX)
with entrypoint_hook(container, "remote_method") as entrypoint:
assert entrypoint("value") == "value-x-y"
Note that the interface between ServiceX
and ServiceY
here is just as if under normal operation.
Interfaces that are out of scope for a particular test can be deactivated with one of the following test helpers:
restrict_entrypoints¶
-
nameko.testing.services.
restrict_entrypoints
(container, *entrypoints) Restrict the entrypoints on
container
to those named inentrypoints
.This method must be called before the container is started.
Usage
The following service definition has two entrypoints:
class Service(object): name = "service" @timer(interval=1) def foo(self, arg): pass @rpc def bar(self, arg) pass @rpc def baz(self, arg): pass container = ServiceContainer(Service, config)
To disable the timer entrypoint on
foo
, leaving just the RPC entrypoints:restrict_entrypoints(container, "bar", "baz")
Note that it is not possible to identify multiple entrypoints on the same method individually.
replace_dependencies¶
-
nameko.testing.services.
replace_dependencies
(container, *dependencies, **dependency_map) Replace the dependency providers on
container
with instances ofMockDependencyProvider
.Dependencies named in *dependencies will be replaced with a
MockDependencyProvider
, which injects a MagicMock instead of the dependency.Alternatively, you may use keyword arguments to name a dependency and provide the replacement value that the MockDependencyProvider should inject.
Return the
MockDependencyProvider.dependency
for every dependency specified in the (*dependencies) args so that calls to the replaced dependencies can be inspected. Return a single object if only one dependency was replaced, and a generator yielding the replacements in the same order asdependencies
otherwise. Note that any replaced dependencies specified via kwargs **dependency_map will not be returned.Replacements are made on the container instance and have no effect on the service class. New container instances are therefore unaffected by replacements on previous instances.
Usage
from nameko.rpc import RpcProxy, rpc from nameko.standalone.rpc import ServiceRpcProxy class ConversionService(object): name = "conversions" maths_rpc = RpcProxy("maths") @rpc def inches_to_cm(self, inches): return self.maths_rpc.multiply(inches, 2.54) @rpc def cm_to_inches(self, cms): return self.maths_rpc.divide(cms, 2.54) container = ServiceContainer(ConversionService, config) mock_maths_rpc = replace_dependencies(container, "maths_rpc") mock_maths_rpc.divide.return_value = 39.37 container.start() with ServiceRpcProxy('conversions', config) as proxy: proxy.cm_to_inches(100) # assert that the dependency was called as expected mock_maths_rpc.divide.assert_called_once_with(100, 2.54)
Providing a specific replacement by keyword:
class StubMaths(object): def divide(self, val1, val2): return val1 / val2 replace_dependencies(container, maths_rpc=StubMaths()) container.start() with ServiceRpcProxy('conversions', config) as proxy: assert proxy.cm_to_inches(127) == 50.0
Complete Example¶
The following integration testing example makes use of both scope-restricting helpers:
"""
This file defines several toy services that interact to form a shop of the
famous ACME Corporation. The AcmeShopService relies on the StockService,
InvoiceService and PaymentService to fulfil its orders. They are not best
practice examples! They're minimal services provided for the test at the
bottom of the file.
``test_shop_integration`` is a full integration test of the ACME shop
"checkout flow". It demonstrates how to test the multiple ACME services in
combination with each other, including limiting service interactions by
replacing certain entrypoints and dependencies.
"""
from collections import defaultdict
import pytest
from nameko.extensions import DependencyProvider
from nameko.events import EventDispatcher, event_handler
from nameko.exceptions import RemoteError
from nameko.rpc import rpc, RpcProxy
from nameko.standalone.rpc import ServiceRpcProxy
from nameko.testing.services import replace_dependencies, restrict_entrypoints
from nameko.testing.utils import get_container
from nameko.timer import timer
class NotLoggedInError(Exception):
pass
class ItemOutOfStockError(Exception):
pass
class ItemDoesNotExistError(Exception):
pass
class ShoppingBasket(DependencyProvider):
""" A shopping basket tied to the current ``user_id``.
"""
def __init__(self):
self.baskets = defaultdict(list)
def get_dependency(self, worker_ctx):
class Basket(object):
def __init__(self, basket):
self._basket = basket
self.worker_ctx = worker_ctx
def add(self, item):
self._basket.append(item)
def __iter__(self):
for item in self._basket:
yield item
try:
user_id = worker_ctx.data['user_id']
except KeyError:
raise NotLoggedInError()
return Basket(self.baskets[user_id])
class AcmeShopService:
name = "acmeshopservice"
user_basket = ShoppingBasket()
stock_rpc = RpcProxy('stockservice')
invoice_rpc = RpcProxy('invoiceservice')
payment_rpc = RpcProxy('paymentservice')
fire_event = EventDispatcher()
@rpc
def add_to_basket(self, item_code):
""" Add item identified by ``item_code`` to the shopping basket.
This is a toy example! Ignore the obvious race condition.
"""
stock_level = self.stock_rpc.check_stock(item_code)
if stock_level > 0:
self.user_basket.add(item_code)
self.fire_event("item_added_to_basket", item_code)
return item_code
raise ItemOutOfStockError(item_code)
@rpc
def checkout(self):
""" Take payment for all items in the shopping basket.
"""
total_price = sum(self.stock_rpc.check_price(item)
for item in self.user_basket)
# prepare invoice
invoice = self.invoice_rpc.prepare_invoice(total_price)
# take payment
self.payment_rpc.take_payment(invoice)
# fire checkout event if prepare_invoice and take_payment succeeded
checkout_event_data = {
'invoice': invoice,
'items': list(self.user_basket)
}
self.fire_event("checkout_complete", checkout_event_data)
return total_price
class Warehouse(DependencyProvider):
""" A database of items in the warehouse.
This is a toy example! A dictionary is not a database.
"""
def __init__(self):
self.database = {
'anvil': {
'price': 100,
'stock': 3
},
'dehydrated_boulders': {
'price': 999,
'stock': 12
},
'invisible_paint': {
'price': 10,
'stock': 30
},
'toothpicks': {
'price': 1,
'stock': 0
}
}
def get_dependency(self, worker_ctx):
return self.database
class StockService:
name = "stockservice"
warehouse = Warehouse()
@rpc
def check_price(self, item_code):
""" Check the price of an item.
"""
try:
return self.warehouse[item_code]['price']
except KeyError:
raise ItemDoesNotExistError(item_code)
@rpc
def check_stock(self, item_code):
""" Check the stock level of an item.
"""
try:
return self.warehouse[item_code]['stock']
except KeyError:
raise ItemDoesNotExistError(item_code)
@rpc
@timer(100)
def monitor_stock(self):
""" Periodic stock monitoring method. Can also be triggered manually
over RPC.
This is an expensive process that we don't want to exercise during
integration testing...
"""
raise NotImplementedError()
@event_handler('acmeshopservice', "checkout_complete")
def dispatch_items(self, event_data):
""" Dispatch items from stock on successful checkouts.
This is an expensive process that we don't want to exercise during
integration testing...
"""
raise NotImplementedError()
class AddressBook(DependencyProvider):
""" A database of user details, keyed on user_id.
"""
def __init__(self):
self.address_book = {
'wile_e_coyote': {
'username': 'wile_e_coyote',
'fullname': 'Wile E Coyote',
'address': '12 Long Road, High Cliffs, Utah',
},
}
def get_dependency(self, worker_ctx):
def get_user_details():
try:
user_id = worker_ctx.data['user_id']
except KeyError:
raise NotLoggedInError()
return self.address_book.get(user_id)
return get_user_details
class InvoiceService:
name = "invoiceservice"
get_user_details = AddressBook()
@rpc
def prepare_invoice(self, amount):
""" Prepare an invoice for ``amount`` for the current user.
"""
address = self.get_user_details().get('address')
fullname = self.get_user_details().get('fullname')
username = self.get_user_details().get('username')
msg = "Dear {}. Please pay ${} to ACME Corp.".format(fullname, amount)
invoice = {
'message': msg,
'amount': amount,
'customer': username,
'address': address
}
return invoice
class PaymentService:
name = "paymentservice"
@rpc
def take_payment(self, invoice):
""" Take payment from a customer according to ``invoice``.
This is an expensive process that we don't want to exercise during
integration testing...
"""
raise NotImplementedError()
# =============================================================================
# Begin test
# =============================================================================
@pytest.yield_fixture
def rpc_proxy_factory(rabbit_config):
""" Factory fixture for standalone RPC proxies.
Proxies are started automatically so they can be used without a ``with``
statement. All created proxies are stopped at the end of the test, when
this fixture closes.
"""
all_proxies = []
def make_proxy(service_name, **kwargs):
proxy = ServiceRpcProxy(service_name, rabbit_config, **kwargs)
all_proxies.append(proxy)
return proxy.start()
yield make_proxy
for proxy in all_proxies:
proxy.stop()
def test_shop_checkout_integration(
rabbit_config, runner_factory, rpc_proxy_factory
):
""" Simulate a checkout flow as an integration test.
Requires instances of AcmeShopService, StockService and InvoiceService
to be running. Explicitly replaces the rpc proxy to PaymentService so
that service doesn't need to be hosted.
Also replaces the event dispatcher dependency on AcmeShopService and
disables the timer entrypoint on StockService. Limiting the interactions
of services in this way reduces the scope of the integration test and
eliminates undesirable side-effects (e.g. processing events unnecessarily).
"""
context_data = {'user_id': 'wile_e_coyote'}
shop = rpc_proxy_factory('acmeshopservice', context_data=context_data)
runner = runner_factory(
rabbit_config, AcmeShopService, StockService, InvoiceService)
# replace ``event_dispatcher`` and ``payment_rpc`` dependencies on
# AcmeShopService with ``MockDependencyProvider``\s
shop_container = get_container(runner, AcmeShopService)
fire_event, payment_rpc = replace_dependencies(
shop_container, "fire_event", "payment_rpc")
# restrict entrypoints on StockService
stock_container = get_container(runner, StockService)
restrict_entrypoints(stock_container, "check_price", "check_stock")
runner.start()
# add some items to the basket
assert shop.add_to_basket("anvil") == "anvil"
assert shop.add_to_basket("invisible_paint") == "invisible_paint"
# try to buy something that's out of stock
with pytest.raises(RemoteError) as exc_info:
shop.add_to_basket("toothpicks")
assert exc_info.value.exc_type == "ItemOutOfStockError"
# provide a mock response from the payment service
payment_rpc.take_payment.return_value = "Payment complete."
# checkout
res = shop.checkout()
total_amount = 100 + 10
assert res == total_amount
# verify integration with mocked out payment service
payment_rpc.take_payment.assert_called_once_with({
'customer': "wile_e_coyote",
'address': "12 Long Road, High Cliffs, Utah",
'amount': total_amount,
'message': "Dear Wile E Coyote. Please pay $110 to ACME Corp."
})
# verify events fired as expected
assert fire_event.call_count == 3
if __name__ == "__main__":
import sys
pytest.main(sys.argv)
Other Helpers¶
entrypoint_hook¶
The entrypoint hook allows a service entrypoint to be called manually. This is useful during integration testing if it is difficult or expensive to fake to external event that would cause an entrypoint to fire.
You can provide context_data for the call to mimic specific call context, for example language, user agent or auth token.
import pytest
from nameko.contextdata import Language
from nameko.rpc import rpc
from nameko.testing.services import entrypoint_hook
class HelloService:
""" Service under test
"""
name = "hello_service"
language = Language()
@rpc
def hello(self, name):
greeting = "Hello"
if self.language == "fr":
greeting = "Bonjour"
elif self.language == "de":
greeting = "Gutentag"
return "{}, {}!".format(greeting, name)
@pytest.mark.parametrize("language, greeting", [
("en", "Hello"),
("fr", "Bonjour"),
("de", "Gutentag"),
])
def test_hello_languages(language, greeting, container_factory, rabbit_config):
container = container_factory(HelloService, rabbit_config)
container.start()
context_data = {'language': language}
with entrypoint_hook(container, 'hello', context_data) as hook:
assert hook("Matt") == "{}, Matt!".format(greeting)
entrypoint_waiter¶
The entrypoint waiter is a context manager that does not exit until a named entrypoint has fired and completed. This is useful when testing integration points between services that are asynchronous, for example receiving an event:
from nameko.events import event_handler
from nameko.standalone.events import event_dispatcher
from nameko.testing.services import entrypoint_waiter
class ServiceB:
""" Event listening service.
"""
name = "service_b"
@event_handler("service_a", "event_type")
def handle_event(self, payload):
print("service b received", payload)
def test_event_interface(container_factory, rabbit_config):
container = container_factory(ServiceB, rabbit_config)
container.start()
dispatch = event_dispatcher(rabbit_config)
# prints "service b received payload" before "exited"
with entrypoint_waiter(container, 'handle_event'):
dispatch("service_a", "event_type", "payload")
print("exited")
Note that the context manager waits not only for the entrypoint method to complete, but also for any dependency teardown. Dependency-based loggers such as (TODO: link to bundled logger) for example would have also completed.
Using pytest¶
Nameko’s test suite uses pytest, and makes some useful configuration and fixtures available for your own tests if you choose to use pytest.
They are contained in nameko.testing.pytest
. This module is automatically registered as a pytest plugin by setuptools if you have pytest installed.