Wie testest du eine Sellerie-Aufgabe?

Antworten:

61

Es ist möglich, Aufgaben synchron mit jeder unittest lib da draußen zu testen. Normalerweise mache ich 2 verschiedene Testsitzungen, wenn ich mit Sellerie-Aufgaben arbeite. Der erste (wie ich unten vorschlage) ist vollständig synchron und sollte derjenige sein, der sicherstellt, dass der Algorithmus das tut, was er tun soll. Die zweite Sitzung verwendet das gesamte System (einschließlich des Brokers) und stellt sicher, dass ich keine Serialisierungsprobleme oder andere Verteilungs- oder Kommunikationsprobleme habe.

So:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

Und dein Test:

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

Hoffentlich hilft das!

FlaPer87
quelle
1
Dies funktioniert nur bei Aufgaben, die eine HttpDispatchTask verwenden - docs.celeryproject.org/en/latest/userguide/remote-tasks.html, bei denen ich celery.conf.CELERY_ALWAYS_EAGER = True setzen muss, aber auch wenn celery.conf.CELERY_IMPORTS = gesetzt wird ('celery.task.http') Der Test schlägt mit NotRegistered fehl: celery.task.http.HttpDispatchTask
davidmytton
Seltsam, sind Sie sicher, dass Sie keine Importprobleme haben? Dieser Test funktioniert (beachten Sie, dass ich die Antwort fälsche, damit sie das zurückgibt, was Sellerie erwartet). Außerdem werden in CELERY_IMPORTS definierte Module während der Worker-Initialisierung importiert. Um dies zu vermeiden, sollten Sie aufrufen celery.loader.import_default_modules().
FlaPer87
Ich würde vorschlagen , dass Sie auch einen Blick nehmen hier . Es verspottet die http-Anfrage. Keine Ahnung, ob es hilft, ich denke, Sie möchten einen Dienst testen, der läuft, nicht wahr?
FlaPer87
52

Ich benutze das:

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
    ...

Dokumente: http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager

Mit CELERY_ALWAYS_EAGER können Sie Ihre Aufgabe synchron ausführen und benötigen keinen Sellerieserver.

guettli
quelle
1
Ich denke das ist veraltet - ich verstehe ImportError: No module named celeryconfig.
Daenyth
7
Ich glaube oben, dass das Modul celeryconfig.pyin einem Paket vorhanden ist. Siehe docs.celeryproject.org/en/latest/getting-started/… .
Kamil Sindi
1
Ich weiß, dass es alt ist, aber können Sie ein vollständiges Beispiel dafür geben, wie Sie Aufgaben addaus der OP-Frage innerhalb einer TestCaseKlasse starten können ?
Kruupös
@ MaxChrétien sorry, ich kann kein vollständiges Beispiel liefern, da ich keinen Sellerie mehr benutze. Sie können meine Frage bearbeiten, wenn Sie genügend Reputationspunkte haben. Wenn Sie nicht genug haben, lassen Sie mich bitte wissen, was ich kopieren und in diese Antwort einfügen soll.
Guettli
1
@ miken32 danke. Da die jüngste Antwort das Problem, bei dem ich helfen wollte, irgendwie angeht, habe ich nur einen Kommentar hinterlassen, von dem die offiziellen Dokumente für 4.0 die Verwendung CELERY_TASK_ALWAYS_EAGERfür Unit-Tests abhalten .
Krassowski
33

Kommt darauf an, was genau Sie testen möchten.

  • Testen Sie den Aufgabencode direkt. Rufen Sie nicht "task.delay (...)" auf, sondern rufen Sie "task (...)" aus Ihren Unit-Tests auf.
  • Verwenden Sie CELERY_ALWAYS_EAGER . Dadurch werden Ihre Aufgaben sofort an der Stelle aufgerufen, an der Sie "task.delay (...)" sagen, sodass Sie den gesamten Pfad testen können (jedoch kein asynchrones Verhalten).
Slacy
quelle
24

Gerätetest

import unittest

from myproject.myapp import celeryapp

class TestMyCeleryWorker(unittest.TestCase):

  def setUp(self):
      celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

py.test Vorrichtungen

# conftest.py
from myproject.myapp import celeryapp

@pytest.fixture(scope='module')
def celery_app(request):
    celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
    return celeryapp

# test_tasks.py
def test_some_task(celery_app):
    ...

Nachtrag: send_task respektvoll machen

from celery import current_app

def send_task(name, args=(), kwargs={}, **opts):
    # https://github.com/celery/celery/issues/581
    task = current_app.tasks[name]
    return task.apply(args, kwargs, **opts)

current_app.send_task = send_task
Kamil Sindi
quelle
22

Für diejenigen auf Sellerie 4 ist es:

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)

Da die Einstellungsnamen geändert wurden und aktualisiert werden müssen, wenn Sie ein Upgrade durchführen möchten, siehe

https://docs.celeryproject.org/en/latest/history/whatsnew-4.0.html?highlight=what%20is%20new#lowercase-setting-names

okrutny
quelle
11
Laut den offiziellen Dokumenten ist die Verwendung von "task_always_eager" (früher "CELERY_ALWAYS_EAGER") nicht für Unit-Tests geeignet. Stattdessen schlagen sie einige andere großartige Möglichkeiten vor, um Ihre Sellerie-App zu testen.
Krassowski
4
Ich möchte nur hinzufügen, dass der Grund, warum Sie keine eifrigen Aufgaben in Ihren Komponententests wünschen, darin besteht, dass Sie dann nicht z. B. die Serialisierung von Parametern testen, die auftreten wird, wenn Sie den Code in der Produktion verwenden.
Verdammter
15

Ab Sellerie 3.0 können Sie CELERY_ALWAYS_EAGERin Django Folgendes festlegen :

from django.test import TestCase, override_settings

from .foo import foo_celery_task

class MyTest(TestCase):

    @override_settings(CELERY_ALWAYS_EAGER=True)
    def test_foo(self):
        self.assertTrue(foo_celery_task.delay())
Aaron Lelevier
quelle
7

Seit Celery v4.0 werden py.test-Vorrichtungen bereitgestellt , um einen Sellerie-Arbeiter nur für den Test zu starten, und werden nach Abschluss heruntergefahren:

def test_myfunc_is_executed(celery_session_worker):
    # celery_session_worker: <Worker: [email protected] (running)>
    assert myfunc.delay().wait(3)

Unter anderen auf http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test beschriebenen Geräten können Sie die Standardoptionen für Sellerie ändern, indem Sie das celery_configGerät folgendermaßen neu definieren :

@pytest.fixture(scope='session')
def celery_config():
    return {
        'accept_content': ['json', 'pickle'],
        'result_serializer': 'pickle',
    }

Standardmäßig verwendet der Testarbeiter einen In-Memory-Broker und ein Ergebnis-Backend. Sie müssen kein lokales Redis oder RabbitMQ verwenden, wenn Sie bestimmte Funktionen nicht testen.

Alanjds
quelle
Lieber Downvoter, möchten Sie uns mitteilen, warum dies eine schlechte Antwort ist? Aufrichtig danke.
Alanjds
2
Hat bei mir nicht funktioniert, die Testsuite hängt einfach. Könnten Sie etwas mehr Kontext bereitstellen? (Ich habe aber noch nicht abgestimmt;)).
Dualität_
In meinem Fall musste ich das celey_config-Gerät explizit festlegen, um den Speicherbroker und das Cache + Speicher-Backend zu verwenden
sanzoghenzo
5

Referenz mit Pytest.

def test_add(celery_worker):
    mytask.delay()

Wenn Sie flask verwenden, stellen Sie die App-Konfiguration ein

    CELERY_BROKER_URL = 'memory://'
    CELERY_RESULT_BACKEND = 'cache+memory://'

und in conftest.py

@pytest.fixture
def app():
    yield app   # Your actual Flask application

@pytest.fixture
def celery_app(app):
    from celery.contrib.testing import tasks   # need it
    yield celery_app    # Your actual Flask-Celery application
Yoge
quelle
2

In meinem Fall (und ich nehme viele andere an) wollte ich nur die innere Logik einer Aufgabe mit pytest testen.

TL; DR; am Ende verspottete alles ( OPTION 2 )


Anwendungsbeispiel :

proj/tasks.py

@shared_task(bind=True)
def add_task(self, a, b):
    return a+b;

tests/test_tasks.py

from proj import add_task

def test_add():
    assert add_task(1, 2) == 3, '1 + 2 should equal 3'

Da der shared_taskDekorateur jedoch viel Sellerie-interne Logik ausführt, handelt es sich nicht wirklich um Unit-Tests.

Für mich gab es also zwei Möglichkeiten:

OPTION 1: Separate interne Logik

proj/tasks_logic.py

def internal_add(a, b):
    return a + b;

proj/tasks.py

from .tasks_logic import internal_add

@shared_task(bind=True)
def add_task(self, a, b):
    return internal_add(a, b);

Dies sieht sehr seltsam aus und erfordert nicht nur eine geringere Lesbarkeit, sondern auch das manuelle Extrahieren und Übergeben von Attributen, die Teil der Anforderung sind, z. B. für den task_idFall, dass Sie sie benötigen, wodurch die Logik weniger rein wird.

OPTION 2:
Verspottet Sellerie-Einbauten

tests/__init__.py

# noinspection PyUnresolvedReferences
from celery import shared_task

from mock import patch


def mock_signature(**kwargs):
    return {}


def mocked_shared_task(*decorator_args, **decorator_kwargs):
    def mocked_shared_decorator(func):
        func.signature = func.si = func.s = mock_signature
        return func

    return mocked_shared_decorator

patch('celery.shared_task', mocked_shared_task).start()

Dadurch kann ich das Anforderungsobjekt verspotten (erneut, falls Sie Dinge aus der Anforderung benötigen, wie z. B. die ID oder den Wiederholungszähler.

tests/test_tasks.py

from proj import add_task

class MockedRequest:
    def __init__(self, id=None):
        self.id = id or 1


class MockedTask:
    def __init__(self, id=None):
        self.request = MockedRequest(id=id)


def test_add():
    mocked_task = MockedTask(id=3)
    assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'

Diese Lösung ist viel manueller, gibt mir aber die Kontrolle, die ich für einen Unit- Test benötige , ohne mich zu wiederholen und ohne den Umfang des Selleries zu verlieren.

Daniel Dubovski
quelle