Ereignissystem in Python

195

Welches Ereignissystem für Python verwenden Sie? Ich kenne Pydispatcher bereits , habe mich aber gefragt, was sonst noch zu finden ist oder häufig verwendet wird.

Ich interessiere mich nicht für Event-Manager, die Teil großer Frameworks sind, sondern verwende lieber eine kleine Bare-Bones-Lösung, die ich problemlos erweitern kann.

Josip
quelle

Antworten:

178

PyPI-Pakete

Ab Juni 2020 sind dies die ereignisbezogenen Pakete, die auf PyPI verfügbar sind und nach dem letzten Veröffentlichungsdatum sortiert sind.

Es gibt mehr

Das sind viele Bibliotheken zur Auswahl, die eine sehr unterschiedliche Terminologie verwenden (Ereignisse, Signale, Handler, Methodenversand, Hooks, ...).

Ich versuche, einen Überblick über die oben genannten Pakete sowie die in den Antworten hier genannten Techniken zu behalten.

Zunächst einige Begriffe ...

Beobachtermuster

Der grundlegendste Stil des Ereignissystems ist die 'Bag of Handler-Methode', die eine einfache Implementierung des Observer-Musters darstellt .

Grundsätzlich werden die Handler-Methoden (Callables) in einem Array gespeichert und jeweils aufgerufen, wenn das Ereignis 'ausgelöst' wird.

Veröffentlichen-Abonnieren

Der Nachteil von Observer-Ereignissystemen besteht darin, dass Sie die Handler nur für das tatsächliche Ereignisobjekt (oder die Handlerliste) registrieren können. Zum Zeitpunkt der Registrierung muss die Veranstaltung also bereits vorhanden sein.

Aus diesem Grund gibt es den zweiten Stil von Ereignissystemen: das Publish-Subscribe-Muster . Hier registrieren sich die Handler nicht für ein Ereignisobjekt (oder eine Handlerliste), sondern für einen zentralen Dispatcher. Auch die Benachrichtiger sprechen nur mit dem Dispatcher. Was zu hören oder zu veröffentlichen ist, wird durch 'Signal' bestimmt, das nichts weiter als ein Name (Zeichenfolge) ist.

Vermittlermuster

Könnte auch von Interesse sein: das Mediator-Muster .

Haken

Ein 'Hook'-System wird üblicherweise im Zusammenhang mit Anwendungs-Plugins verwendet. Die Anwendung enthält feste Integrationspunkte (Hooks), und jedes Plugin kann eine Verbindung zu diesem Hook herstellen und bestimmte Aktionen ausführen.

Andere Ereignisse'

Hinweis: threading.Event ist kein 'Ereignissystem' im obigen Sinne. Es ist ein Thread-Synchronisationssystem, bei dem ein Thread wartet, bis ein anderer Thread das Ereignisobjekt "signalisiert".

Netzwerk-Messaging-Bibliotheken verwenden häufig auch den Begriff "Ereignisse". manchmal sind diese im Konzept ähnlich; manchmal nicht. Sie können natürlich Thread-, Prozess- und Computergrenzen überschreiten. Siehe zB pyzmq , pymq , Twisted , Tornado , gevent , eventlet .

Schwache Referenzen

Wenn Sie in Python einen Verweis auf eine Methode oder ein Objekt halten, wird sichergestellt, dass dieser nicht vom Garbage Collector gelöscht wird. Dies kann wünschenswert sein, kann aber auch zu Speicherverlusten führen: Die verknüpften Handler werden niemals bereinigt.

Einige Ereignissysteme verwenden schwache Referenzen anstelle regulärer Referenzen, um dies zu lösen.

Einige Worte zu den verschiedenen Bibliotheken

Ereignissysteme im Beobachterstil:

  • zope.event zeigt, wie das funktioniert (siehe Lennarts Antwort ). Hinweis: Dieses Beispiel unterstützt nicht einmal Handlerargumente.
  • Die 'Callable List'- Implementierung von LongPoke zeigt, dass ein solches Ereignissystem durch Unterklassen sehr minimalistisch implementiert werden kann list.
  • Felks Variation EventHook sorgt auch für die Signaturen von Anrufern und Anrufern.
  • spassigs EventHook (Michael Foords Event Pattern) ist eine einfache Implementierung.
  • Die Ereignisklasse "Wertvolle Lektionen" von Josip ist im Grunde dieselbe, verwendet jedoch a setanstelle von a list, um die Tasche zu speichern, und implementiert Geräte, __call__die beide sinnvolle Ergänzungen sind.
  • PyNotify hat ein ähnliches Konzept und bietet zusätzliche Konzepte für Variablen und Bedingungen ('variabel geändertes Ereignis'). Die Homepage funktioniert nicht.
  • axel ist im Grunde ein Bag-of-Handler mit mehr Funktionen in Bezug auf Threading, Fehlerbehandlung, ...
  • Für Python-Dispatch müssen die geraden Quellklassen abgeleitet werden pydispatch.Dispatcher.
  • buslane ist klassenbasiert, unterstützt Einzel- oder Mehrfachhandler und ermöglicht umfangreiche Typhinweise.
  • Pithikos ' Observer / Event ist ein leichtes Design.

Publish-Subscribe-Bibliotheken:

  • Blinker hat einige raffinierte Funktionen wie automatische Trennung und Filterung basierend auf dem Absender.
  • PyPubSub ist ein stabiles Paket und verspricht "erweiterte Funktionen, die das Debuggen und Verwalten von Themen und Nachrichten erleichtern".
  • pymitter ist ein Python-Port von Node.js EventEmitter2 und bietet Namespaces, Platzhalter und TTL.
  • PyDispatcher scheint die Flexibilität in Bezug auf viele-zu-viele-Veröffentlichungen usw. zu betonen. Unterstützt schwache Referenzen.
  • louie ist ein überarbeiteter PyDispatcher und sollte "in einer Vielzahl von Kontexten" arbeiten.
  • pypydispatcher basiert auf (Sie haben es erraten ...) PyDispatcher und funktioniert auch in PyPy.
  • django.dispatch ist ein umgeschriebener PyDispatcher "mit einer eingeschränkteren Oberfläche, aber höherer Leistung".
  • pyeventdispatcher basiert auf dem Event-Dispatcher des Symfony-Frameworks von PHP.
  • Der Dispatcher wurde aus django.dispatch extrahiert, wird aber ziemlich alt.
  • Cristian Garcias EventManger ist eine sehr kurze Implementierung.

Andere:

  • pluggy enthält ein Hook-System, das von pytestPlugins verwendet wird .
  • RxPy3 implementiert das Observable-Muster und ermöglicht das Zusammenführen von Ereignissen, erneuten Versuchen usw.
  • Die Signale und Slots von Qt sind bei PyQt oder PySide2 erhältlich . Sie funktionieren als Rückruf, wenn sie im selben Thread verwendet werden, oder als Ereignisse (mithilfe einer Ereignisschleife) zwischen zwei verschiedenen Threads. Signale und Slots haben die Einschränkung, dass sie nur in Objekten von Klassen funktionieren, von denen abgeleitet ist QObject.
florisla
quelle
2
Es gibt auch Louie, das auf PyDispatcher basiert: pypi.python.org/pypi/Louie/1.1
the979kid
@ the979kid louie scheint schlecht gepflegt zu sein, die pypi-Seite verlinkt auf 404s auf GitHub: 11craft.github.io/louie ; github.com/gldnspud/louie . Sollte github.com/11craft/louie sein .
Florida
1
Zuhörer mit schwachen Ereignissen sind ein häufiges Bedürfnis. Andernfalls wird die Verwendung in der realen Welt schwierig. Ein Hinweis, welche Lösungen dies unterstützen, kann nützlich sein.
kxr
Pypubsub 4 ist viele-zu-viele und verfügt über leistungsstarke Debugging-Tools für Nachrichten sowie verschiedene Möglichkeiten, die Nutzdaten von Nachrichten einzuschränken, damit Sie früher wissen, wann Sie ungültige Daten oder fehlende Daten gesendet haben. PyPubSub 4 unterstützt Python 3 (und PyPubSub 3.x unterstützt Python 2).
Oliver
Ich habe kürzlich eine Bibliothek namens pymq github.com/thrau/pymq veröffentlicht, die gut zu dieser Liste passt.
14.
98

Ich habe es so gemacht:

class Event(list):
    """Event subscription.

    A list of callable objects. Calling an instance of this will cause a
    call to each item in the list in ascending order by index.

    Example Usage:
    >>> def f(x):
    ...     print 'f(%s)' % x
    >>> def g(x):
    ...     print 'g(%s)' % x
    >>> e = Event()
    >>> e()
    >>> e.append(f)
    >>> e(123)
    f(123)
    >>> e.remove(f)
    >>> e()
    >>> e += (f, g)
    >>> e(10)
    f(10)
    g(10)
    >>> del e[0]
    >>> e(2)
    g(2)

    """
    def __call__(self, *args, **kwargs):
        for f in self:
            f(*args, **kwargs)

    def __repr__(self):
        return "Event(%s)" % list.__repr__(self)

Wie bei allem anderen, was ich gesehen habe, gibt es dafür kein automatisch generiertes Pydoc und keine Signaturen, was wirklich scheiße ist.

L̲̳o̲̳̳n̲̳̳g̲̳̳p̲̳o̲̳̳k̲̳̳e̲̳̳
quelle
3
Ich finde diesen Stil ziemlich faszinierend. Es ist süß nackt. Ich mag die Tatsache, dass man damit Ereignisse und ihre Abonnenten als autonome Operationen manipulieren kann. Ich werde sehen, wie es in einem echten Projekt abschneidet.
Rudy Lattae
2
Sehr schöner minimalistischer Stil! Super!
akaRem
2
Ich kann das nicht genug bewerten, das ist wirklich einfach und unkompliziert.
2
großer Gefallen, könnte jemand das erklären, als wäre ich 10? Wird diese Klasse von der Hauptklasse geerbt? Ich sehe kein Init, so dass super () nicht verwendet werden würde. Es klickt aus irgendeinem Grund nicht für mich.
Omgimdrunk
1
@omgimdrunk Ein einfacher Ereignishandler löst eine oder mehrere aufrufbare Funktionen aus, wenn ein Ereignis ausgelöst wird. Eine Klasse, die dies für Sie "verwaltet", würde mindestens die folgenden Methoden erfordern - add & fire. Innerhalb dieser Klasse müssten Sie eine Liste der auszuführenden Handler führen. Fügen wir das in die Instanzvariable ein, _bag_of_handlersdie eine Liste ist. Die Add-Methode der Klasse wäre einfach self._bag_of_handlers.append(some_callable). Die Feuermethode der Klasse würde eine Schleife durch die Tasche der Handler durchlaufen und die bereitgestellten Argumente und Warnungen an die Handler weitergeben und jeweils nacheinander ausführen.
Gabe Spradlin
68

Wir verwenden einen EventHook, wie von Michael Foord in seinem Event-Muster vorgeschlagen :

Fügen Sie einfach EventHooks zu Ihren Klassen hinzu mit:

class MyBroadcaster()
    def __init__():
        self.onChange = EventHook()

theBroadcaster = MyBroadcaster()

# add a listener to the event
theBroadcaster.onChange += myFunction

# remove listener from the event
theBroadcaster.onChange -= myFunction

# fire event
theBroadcaster.onChange.fire()

Wir haben die Funktionalität hinzugefügt, um alle Listener von einem Objekt zur Michaels-Klasse zu entfernen.

class EventHook(object):

    def __init__(self):
        self.__handlers = []

    def __iadd__(self, handler):
        self.__handlers.append(handler)
        return self

    def __isub__(self, handler):
        self.__handlers.remove(handler)
        return self

    def fire(self, *args, **keywargs):
        for handler in self.__handlers:
            handler(*args, **keywargs)

    def clearObjectHandlers(self, inObject):
        for theHandler in self.__handlers:
            if theHandler.im_self == inObject:
                self -= theHandler
spassig
quelle
Ein Nachteil dieser Verwendung ist, dass Sie zuerst ein Ereignis hinzufügen müssen, bevor Sie sich als Abonnent registrieren. Wenn nur die Herausgeber ihre Ereignisse hinzufügen (kein Muss, nur eine gute Vorgehensweise), müssen Sie die Herausgeber vor den Abonnenten initialisieren, was bei großen Projekten ein
Jonathan
6
Die letzte Methode ist fehlerhaft, da Self .__-Handler während der Iterationen geändert werden. Fix: `self .__ handlers = [h für h in self .__ handlers wenn h.im_self! = Obj]`
Simon Bergot
1
@Simon ist richtig, führt aber einen Fehler ein, da wir ungebundene Funktionen in Self .__-Handlern haben können. Fix:self.__handlers = [h for h in self._handlers if getattr(h, 'im_self', False) != obj]
Eric Marcos
20

Ich benutze zope.event . Es sind die nacktesten Knochen, die Sie sich vorstellen können. :-) Tatsächlich ist hier der vollständige Quellcode:

subscribers = []

def notify(event):
    for subscriber in subscribers:
        subscriber(event)

Beachten Sie, dass Sie beispielsweise keine Nachrichten zwischen Prozessen senden können. Es ist kein Nachrichtensystem, nur ein Ereignissystem, nicht mehr und nicht weniger.

Lennart Regebro
quelle
17
pypi.python.org/pypi/zope.event ... um dem armen Google etwas Bandbreite zu ersparen ;-)
Boldewyn
Ich möchte immer noch Nachrichten senden können. Ich würde das Ereignissystem in einer auf Tkinter basierenden Anwendung verwenden. Ich verwende das Ereignissystem nicht, da es keine Nachrichten unterstützt.
Josip
Mit zope.event können Sie senden, was Sie wollen. Mein Punkt ist jedoch, dass es sich nicht um ein ordnungsgemäßes Nachrichtensystem handelt, da Sie keine Ereignisse / Nachrichten an andere Prozesse oder andere Computer senden können. Sie sollten wahrscheinlich ein aber spezifischer mit Ihren Anforderungen sein.
Lennart Regebro
15

Ich habe dieses kleine Skript in Valued Lessons gefunden . Es scheint genau das richtige Verhältnis von Einfachheit und Leistung zu haben, nach dem ich suche. Peter Thatcher ist der Autor des folgenden Codes (es wird keine Lizenzierung erwähnt).

class Event:
    def __init__(self):
        self.handlers = set()

    def handle(self, handler):
        self.handlers.add(handler)
        return self

    def unhandle(self, handler):
        try:
            self.handlers.remove(handler)
        except:
            raise ValueError("Handler is not handling this event, so cannot unhandle it.")
        return self

    def fire(self, *args, **kargs):
        for handler in self.handlers:
            handler(*args, **kargs)

    def getHandlerCount(self):
        return len(self.handlers)

    __iadd__ = handle
    __isub__ = unhandle
    __call__ = fire
    __len__  = getHandlerCount

class MockFileWatcher:
    def __init__(self):
        self.fileChanged = Event()

    def watchFiles(self):
        source_path = "foo"
        self.fileChanged(source_path)

def log_file_change(source_path):
    print "%r changed." % (source_path,)

def log_file_change2(source_path):
    print "%r changed!" % (source_path,)

watcher              = MockFileWatcher()
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change2
watcher.watchFiles()
Josip
quelle
1
Die Verwendung von set () anstelle einer Liste ist hilfreich, um zu vermeiden, dass Handler zweimal registriert werden. Eine Konsequenz ist, dass die Handler nicht in der Reihenfolge aufgerufen werden, in der sie registriert wurden. Nicht unbedingt eine schlechte Sache ...
Florida
1
@florisla könnte auf Wunsch gegen OrderedSet ausgetauscht werden.
Robino
9

Hier ist ein minimales Design, das gut funktionieren sollte. Was Sie tun müssen, ist einfach Observerin einer Klasse zu erben und anschließend zu verwenden observe(event_name, callback_fn), um auf ein bestimmtes Ereignis zu warten. Immer wenn dieses bestimmte Ereignis irgendwo im Code ausgelöst wird (dh Event('USB connected')), wird der entsprechende Rückruf ausgelöst .

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observed_events = []
    def observe(self, event_name, callback_fn):
        self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})


class Event():
    def __init__(self, event_name, *callback_args):
        for observer in Observer._observers:
            for observable in observer._observed_events:
                if observable['event_name'] == event_name:
                    observable['callback_fn'](*callback_args)

Beispiel:

class Room(Observer):
    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # DON'T FORGET THIS
    def someone_arrived(self, who):
        print(who + " has arrived!")

# Observe for specific event
room = Room()
room.observe('someone arrived',  room.someone_arrived)

# Fire some events
Event('someone left',    'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted',  'Lenard')
Pithikos
quelle
Ich mag dein Design, es ist minimalistisch und leicht zu verstehen. und es wäre leicht, wenn einige Module nicht importiert werden müssten.
Atreyagaurav
8

Ich habe eine EventManagerKlasse erstellt (Code am Ende). Die Syntax lautet wie folgt:

#Create an event with no listeners assigned to it
EventManager.addEvent( eventName = [] )

#Create an event with listeners assigned to it
EventManager.addEvent( eventName = [fun1, fun2,...] )

#Create any number event with listeners assigned to them
EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... )

#Add or remove listener to an existing event
EventManager.eventName += extra_fun
EventManager.eventName -= removed_fun

#Delete an event
del EventManager.eventName

#Fire the event
EventManager.eventName()

Hier ist ein Beispiel:

def hello(name):
    print "Hello {}".format(name)
    
def greetings(name):
    print "Greetings {}".format(name)

EventManager.addEvent( salute = [greetings] )
EventManager.salute += hello

print "\nInitial salute"
EventManager.salute('Oscar')

print "\nNow remove greetings"
EventManager.salute -= greetings
EventManager.salute('Oscar')

Ausgabe:

Erster Gruß
Grüße Oscar
Hallo Oscar

Entfernen Sie jetzt Grüße
Hallo Oscar

EventManger-Code:

class EventManager:
    
    class Event:
        def __init__(self,functions):
            if type(functions) is not list:
                raise ValueError("functions parameter has to be a list")
            self.functions = functions
            
        def __iadd__(self,func):
            self.functions.append(func)
            return self
            
        def __isub__(self,func):
            self.functions.remove(func)
            return self
            
        def __call__(self,*args,**kvargs):
            for func in self.functions : func(*args,**kvargs)
            
    @classmethod
    def addEvent(cls,**kvargs):
        """
        addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... )
        creates events using **kvargs to create any number of events. Each event recieves a list of functions,
        where every function in the list recieves the same parameters.
        
        Example:
        
        def hello(): print "Hello ",
        def world(): print "World"
        
        EventManager.addEvent( salute = [hello] )
        EventManager.salute += world
        
        EventManager.salute()
        
        Output:
        Hello World
        """
        for key in kvargs.keys():
            if type(kvargs[key]) is not list:
                raise ValueError("value has to be a list")
            else:
                kvargs[key] = cls.Event(kvargs[key])
        
        cls.__dict__.update(kvargs)
Cristian Garcia
quelle
8

Sie können einen Blick auf Pymitter ( Pypi ) werfen . Es handelt sich um einen kleinen Single-File-Ansatz (~ 250 loc), der "Namespaces, Platzhalter und TTL bereitstellt".

Hier ist ein einfaches Beispiel:

from pymitter import EventEmitter

ee = EventEmitter()

# decorator usage
@ee.on("myevent")
def handler1(arg):
   print "handler1 called with", arg

# callback usage
def handler2(arg):
    print "handler2 called with", arg
ee.on("myotherevent", handler2)

# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"

ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"
Dalailirium
quelle
6

Ich habe eine Variation von Longpokes minimalistischem Ansatz gemacht, die auch die Signaturen sowohl für Anrufer als auch für Anrufer sicherstellt:

class EventHook(object):
    '''
    A simple implementation of the Observer-Pattern.
    The user can specify an event signature upon inizializazion,
    defined by kwargs in the form of argumentname=class (e.g. id=int).
    The arguments' types are not checked in this implementation though.
    Callables with a fitting signature can be added with += or removed with -=.
    All listeners can be notified by calling the EventHook class with fitting
    arguments.

    >>> event = EventHook(id=int, data=dict)
    >>> event += lambda id, data: print("%d %s" % (id, data))
    >>> event(id=5, data={"foo": "bar"})
    5 {'foo': 'bar'}

    >>> event = EventHook(id=int)
    >>> event += lambda wrong_name: None
    Traceback (most recent call last):
        ...
    ValueError: Listener must have these arguments: (id=int)

    >>> event = EventHook(id=int)
    >>> event += lambda id: None
    >>> event(wrong_name=0)
    Traceback (most recent call last):
        ...
    ValueError: This EventHook must be called with these arguments: (id=int)
    '''
    def __init__(self, **signature):
        self._signature = signature
        self._argnames = set(signature.keys())
        self._handlers = []

    def _kwargs_str(self):
        return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())

    def __iadd__(self, handler):
        params = inspect.signature(handler).parameters
        valid = True
        argnames = set(n for n in params.keys())
        if argnames != self._argnames:
            valid = False
        for p in params.values():
            if p.kind == p.VAR_KEYWORD:
                valid = True
                break
            if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
                valid = False
                break
        if not valid:
            raise ValueError("Listener must have these arguments: (%s)"
                             % self._kwargs_str())
        self._handlers.append(handler)
        return self

    def __isub__(self, handler):
        self._handlers.remove(handler)
        return self

    def __call__(self, *args, **kwargs):
        if args or set(kwargs.keys()) != self._argnames:
            raise ValueError("This EventHook must be called with these " +
                             "keyword arguments: (%s)" % self._kwargs_str())
        for handler in self._handlers[:]:
            handler(**kwargs)

    def __repr__(self):
        return "EventHook(%s)" % self._kwargs_str()
Felk
quelle
3

Wenn ich in pyQt codiere, verwende ich das QT-Sockets / Signal-Paradigma, dasselbe gilt für Django

Wenn ich asynchrone E / A-Vorgänge durchführe, verwenden Sie das native Auswahlmodul

Wenn ich einen SAX-Python-Parser verwende, verwende ich die von SAX bereitgestellte Ereignis-API. Es sieht also so aus, als wäre ich Opfer der zugrunde liegenden API :-)

Vielleicht sollten Sie sich fragen, was Sie von Event Framework / Modul erwarten. Meine persönliche Präferenz ist die Verwendung des Socket / Signal-Paradigmas von QT. Weitere Infos dazu finden Sie hier

SashaN
quelle
2

Hier ist ein weiteres Modul zur Prüfung. Es scheint eine gute Wahl für anspruchsvollere Anwendungen zu sein.

Py-notify ist ein Python-Paket, das Tools zum Implementieren des Observer-Programmiermusters bietet. Diese Tools umfassen Signale, Bedingungen und Variablen.

Signale sind Listen von Handlern, die aufgerufen werden, wenn ein Signal ausgegeben wird. Bedingungen sind im Grunde boolesche Variablen, die mit einem Signal gekoppelt sind, das ausgegeben wird, wenn sich der Zustandszustand ändert. Sie können unter Verwendung von logischen Standardoperatoren (nicht und usw.) zu zusammengesetzten Bedingungen kombiniert werden. Im Gegensatz zu Bedingungen können Variablen jedes Python-Objekt enthalten, nicht nur Boolesche Werte, sondern können auch nicht kombiniert werden.

Josip
quelle
1
Die Homepage ist für diese außer Betrieb und wird möglicherweise nicht mehr unterstützt?
David Parks
1

Wenn Sie kompliziertere Dinge wie das Zusammenführen von Ereignissen oder das Wiederholen von Ereignissen ausführen möchten, können Sie das Observable-Muster und eine ausgereifte Bibliothek verwenden, die dies implementiert. https://github.com/ReactiveX/RxPY . Observables sind in Javascript und Java sehr verbreitet und für einige asynchrone Aufgaben sehr praktisch.

from rx import Observable, Observer


def push_five_strings(observer):
        observer.on_next("Alpha")
        observer.on_next("Beta")
        observer.on_next("Gamma")
        observer.on_next("Delta")
        observer.on_next("Epsilon")
        observer.on_completed()


class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source = Observable.create(push_five_strings)

source.subscribe(PrintObserver())

AUSGABE :

Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!
David Dehghan
quelle
1

Wenn Sie einen Eventbus benötigen, der über Prozess- oder Netzwerkgrenzen hinweg funktioniert, können Sie PyMQ ausprobieren . Derzeit werden Pub / Sub, Nachrichtenwarteschlangen und synchroner RPC unterstützt. Die Standardversion funktioniert auf einem Redis-Backend, sodass Sie einen laufenden Redis-Server benötigen. Es gibt auch ein In-Memory-Backend zum Testen. Sie können auch Ihr eigenes Backend schreiben.

import pymq

# common code
class MyEvent:
    pass

# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
    print('event received')

# publisher code
pymq.publish(MyEvent())

# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')

So initialisieren Sie das System:

from pymq.provider.redis import RedisConfig

# starts a new thread with a Redis event loop
pymq.init(RedisConfig())

# main application control loop

pymq.shutdown()

Haftungsausschluss: Ich bin der Autor dieser Bibliothek

thrau
quelle
0

Sie können das buslaneModul ausprobieren .

Diese Bibliothek erleichtert die Implementierung eines nachrichtenbasierten Systems. Es unterstützt den Ansatz von Befehlen (einzelner Handler) und Ereignissen (0 oder mehrere Handler). Buslane verwendet Anmerkungen vom Typ Python, um den Handler ordnungsgemäß zu registrieren.

Einfaches Beispiel:

from dataclasses import dataclass

from buslane.commands import Command, CommandHandler, CommandBus


@dataclass(frozen=True)
class RegisterUserCommand(Command):
    email: str
    password: str


class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):

    def handle(self, command: RegisterUserCommand) -> None:
        assert command == RegisterUserCommand(
            email='[email protected]',
            password='secret',
        )


command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
    email='[email protected]',
    password='secret',
))

Verwenden Sie zum Installieren der Busbahn einfach pip:

$ pip install buslane
Konrad Hałas
quelle
0

Vor einiger Zeit habe ich eine Bibliothek geschrieben, die für Sie nützlich sein könnte. Es ermöglicht Ihnen lokale und globale Listener, verschiedene Arten der Registrierung, Ausführungspriorität usw.

from pyeventdispatcher import register

register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)

dispatch(Event("foo.bar", {"id": 1}))
# first second

Schauen Sie sich pyeventdispatcher an

Daniel Ancuta
quelle