Writing Extensions

Structure

Extensions should subclass nameko.extensions.Extension. This base class provides the basic structure for an extension, in particular the following methods which can be overridden to add functionality:

Extension.setup()

Called on bound Extensions before the container starts.

Extensions should do any required initialisation here.

Extension.start()

Called on bound Extensions when the container has successfully started.

This is only called after all other Extensions have successfully returned from Extension.setup(). If the Extension reacts to external events, it should now start acting upon them.

Extension.stop()

Called when the service container begins to shut down.

Extensions should do any graceful shutdown here.

Writing Dependency Providers

Almost every Nameko application will need to define its own dependencies – maybe to interface with a database for which there is no community extension or to communicate with a specific web service.

Dependency providers should subclass nameko.extensions.DependencyProvider and implement a get_dependency() method that returns the object to be injected into service workers.

The recommended pattern is to inject the minimum required interface for the dependency. This reduces the test surface and makes it easier to exercise service code under test.

Dependency providers may also hook into the worker lifecycle. The following three methods are called on all dependency providers for every worker:

DependencyProvider.worker_setup(worker_ctx)

Called before a service worker executes a task.

Dependencies should do any pre-processing here, raising exceptions in the event of failure.

Example: ...

Parameters:
worker_ctx : WorkerContext

See nameko.containers.ServiceContainer.spawn_worker

DependencyProvider.worker_result(worker_ctx, result=None, exc_info=None)

Called with the result of a service worker execution.

Dependencies that need to process the result should do it here. This method is called for all Dependency instances on completion of any worker.

Example: a database session dependency may flush the transaction

Parameters:
worker_ctx : WorkerContext

See nameko.containers.ServiceContainer.spawn_worker

DependencyProvider.worker_teardown(worker_ctx)

Called after a service worker has executed a task.

Dependencies should do any post-processing here, raising exceptions in the event of failure.

Example: a database session dependency may commit the session

Parameters:
worker_ctx : WorkerContext

See nameko.containers.ServiceContainer.spawn_worker

Concurrency and Thread-Safety

The object returned by get_dependency() should be thread-safe, because it may be accessed by multiple concurrently running workers.

The worker lifecycle are called in the same thread that executes the service method. This means, for example, that you can define thread-local variables and access them from each method.

Example

A simple DependencyProvider that sends messages to an SQS queue.

from nameko.extensions import DependencyProvider

import boto3

class SqsSend(DependencyProvider):

    def __init__(self, url, region="eu-west-1", **kwargs):
        self.url = url
        self.region = region
        super(SqsSend, self).__init__(**kwargs)

    def setup(self):
        self.client = boto3.client('sqs', region_name=self.region)

    def get_dependency(self, worker_ctx):

        def send_message(payload):
            # assumes boto client is thread-safe for this action, because it
            # happens inside the worker threads
            self.client.send_message(
                QueueUrl=self.url,
                MessageBody=payload
            )
        return send_message

Writing Entrypoints

You can implement new Entrypoint extensions if you want to support new transports or mechanisms for initiating service code.

The minimum requirements for an Entrypoint are:

  1. Inherit from nameko.extensions.Entrypoint
  2. Implement the start() method to start the entrypoint when the container does. If a background thread is required, it’s recommended to use a thread managed by the service container (see Spawning Background Threads)
  3. Call spawn_worker() on the bound container when appropriate.

Example

A simple Entrypoint that receives messages from an SQS queue.

from nameko.extensions import Entrypoint
from functools import partial

import boto3


class SqsReceive(Entrypoint):

    def __init__(self, url, region="eu-west-1", **kwargs):
        self.url = url
        self.region = region
        super(SqsReceive, self).__init__(**kwargs)

    def setup(self):
        self.client = boto3.client('sqs', region_name=self.region)

    def start(self):
        self.container.spawn_managed_thread(
            self.run, identifier="SqsReceiver.run"
        )

    def run(self):
        while True:
            response = self.client.receive_message(
                QueueUrl=self.url,
                WaitTimeSeconds=5,
            )
            messages = response.get('Messages', ())
            for message in messages:
                self.handle_message(message)

    def handle_message(self, message):
        handle_result = partial(self.handle_result, message)

        args = (message['Body'],)
        kwargs = {}

        self.container.spawn_worker(
            self, args, kwargs, handle_result=handle_result
        )

    def handle_result(self, message, worker_ctx, result, exc_info):
        # assumes boto client is thread-safe for this action, because it
        # happens inside the worker threads
        self.client.delete_message(
            QueueUrl=self.url,
            ReceiptHandle=message['ReceiptHandle']
        )
        return result, exc_info


receive = SqsReceive.decorator

Used in a service:

from .sqs_receive import receive


class SqsService(object):
    name = "sqs-service"

    @receive('https://sqs.eu-west-1.amazonaws.com/123456789012/nameko-sqs')
    def handle_sqs_message(self, body):
        """ This method is called by the `receive` entrypoint whenever
        a message sent to the given SQS queue.
        """
        print(body)
        return body

Expected Exceptions

The Entrypoint base class constructor will accept a list of classes that should be considered to be “expected” if they are raised by the decorated service method. This can used to differentiate user errors from more fundamental execution errors. For example:

class Service:
    name = "service"

    auth = Auth()

    @rpc(expected_exceptions=Unauthorized)
    def update(self, data):
        if not self.auth.has_role("admin"):
            raise Unauthorized()

        # perform update
        raise TypeError("Whoops, genuine error.")

The list of expected exceptions are saved to the Entrypoint instance so they can later be inspected, for example by other extensions that process exceptions, as in nameko-sentry.

Sensitive Arguments

In the same way as expected exceptions, the Entrypoint constructor allows you to mark certain arguments or parts of arguments as sensitive. For example:

class Service:
    name = "service"

    auth = Auth()

    @rpc(sensitive_arguments="password", expected_exceptions=Unauthenticated)
    def login(self, username, password):
        # raises Unauthenticated if username/password do not match
        return self.auth.authenticate(username, password)

This can to be used in combination with the utility function nameko.utils.get_redacted_args(), which will return an entrypoint’s call args (similar to inspect.getcallargs()) but with sensitive elements redacted.

This is useful in Extensions that log or save information about entrypoint invocations, such as nameko-tracer.

For entrypoints that accept sensitive information nested within an otherwise safe argument, you can specify partial redaction. For example:

# by dictionary key
@entrypoint(sensitive_arguments="foo.a")
def method(self, foo):
    pass

>>> get_redacted_args(method, foo={'a': 1, 'b': 2})
... {'foo': {'a': '******', 'b': 2}}

# list index
@entrypoint(sensitive_arguments="foo.a[1]")
def method(self, foo):
    pass

>>> get_redacted_args(method, foo=[{'a': [1, 2, 3]}])
... {'foo': {'a': [1, '******', 3]}}

Slices and relative list indexes are not supported.

Spawning Background Threads

Extensions needing to execute work in a thread may choose to delegate the management of that thread to the service container using spawn_managed_thread().

    def start(self):
        self.container.spawn_managed_thread(
            self.run, identifier="SqsReceiver.run"
        )

Delegating thread management to the container is advised because:

  • Managed threads will always be terminated when the container is stopped or killed.
  • Unhandled exceptions in managed threads are caught by the container and will cause it to terminate with an appropriate message, which can prevent hung processes.