Verwirrt, wenn boost :: asio :: io_service die Methode blockiert / entsperrt

87

Als absoluter Anfänger von Boost.Asio bin ich verwirrt mit io_service::run(). Ich würde es begrüßen, wenn mir jemand erklären könnte, wann diese Methode blockiert / entsperrt. In den Dokumentationen heißt es:

Die run()Funktion wird blockiert, bis alle Arbeiten abgeschlossen sind und keine Handler mehr gesendet werden müssen oder bis die io_servicegestoppt wurden.

Mehrere Threads können die run()Funktion aufrufen , um einen Pool von Threads einzurichten, aus denen die io_serviceHandler ausgeführt werden können. Alle Threads, die im Pool warten, sind gleichwertig und io_servicekönnen einen beliebigen Thread auswählen, um einen Handler aufzurufen.

Ein normaler Ausstieg aus der run()Funktion impliziert, dass das io_serviceObjekt gestoppt ist (die stopped()Funktion gibt true zurück). Nachfolgende Aufrufe run(), run_one(), poll()oder poll_one()wird sofort zurück , es sei denn es einen vorherigen Aufruf an ist reset().

Was bedeutet die folgende Aussage?

[...] keine Handler mehr zu versenden [...]


Bei dem Versuch , das Verhalten zu verstehen io_service::run(), kam ich auf diesem Beispiel (Beispiel 3a). Darin beobachte ich, dass io_service->run()Arbeitsaufträge blockiert und gewartet werden.

// WorkerThread invines io_service->run()
void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service);
void CalculateFib(size_t);

boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<boost::asio::io_service::work> work(
   new boost::asio::io_service::work(*io_service));

// ...

boost::thread_group worker_threads;
for(int x = 0; x < 2; ++x)
{
  worker_threads.create_thread(boost::bind(&WorkerThread, io_service));
}

io_service->post( boost::bind(CalculateFib, 3));
io_service->post( boost::bind(CalculateFib, 4));
io_service->post( boost::bind(CalculateFib, 5));

work.reset();
worker_threads.join_all();

In dem folgenden Code, an dem ich gearbeitet habe, stellt der Client jedoch eine Verbindung über TCP / IP her und die Ausführungsmethode wird blockiert, bis Daten asynchron empfangen werden.

typedef boost::asio::ip::tcp tcp;
boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<tcp::socket> socket(new tcp::socket(*io_service));

// Connect to 127.0.0.1:9100.
tcp::resolver resolver(*io_service);
tcp::resolver::query query("127.0.0.1", 
                           boost::lexical_cast< std::string >(9100));
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
socket->connect(endpoint_iterator->endpoint());

// Just blocks here until a message is received.
socket->async_receive(boost::asio::buffer(buf_client, 3000), 0,
                      ClientReceiveEvent);
io_service->run();

// Write response.
boost::system::error_code ignored_error;
std::cout << "Sending message \n";
boost::asio::write(*socket, boost::asio::buffer("some data"), ignored_error);

Jede Erklärung dafür run()beschreibt sein Verhalten in den beiden folgenden Beispielen.

MistyD
quelle

Antworten:

233

Stiftung

Beginnen wir mit einem vereinfachten Beispiel und untersuchen die relevanten Boost.Asio-Teile:

void handle_async_receive(...) { ... }
void print() { ... }

...  

boost::asio::io_service io_service;
boost::asio::ip::tcp::socket socket(io_service);

...

io_service.post(&print);                             // 1
socket.connect(endpoint);                            // 2
socket.async_receive(buffer, &handle_async_receive); // 3
io_service.post(&print);                             // 4
io_service.run();                                    // 5

Was ist ein Handler ?

Ein Handler ist nichts anderes als ein Rückruf. Im Beispielcode gibt es 3 Handler:

  • Der printHandler (1).
  • Der handle_async_receiveHandler (3).
  • Der printHandler (4).

Obwohl dieselbe print()Funktion zweimal verwendet wird, wird bei jeder Verwendung davon ausgegangen, dass ein eigener, eindeutig identifizierbarer Handler erstellt wird. Handler können in vielen Formen und Größen angeboten werden, angefangen von Grundfunktionen wie den oben genannten bis hin zu komplexeren Konstrukten wie aus boost::bind()und Lambdas erzeugten Funktoren . Unabhängig von der Komplexität bleibt der Handler nichts anderes als ein Rückruf.

Was ist Arbeit ?

Arbeit ist eine Verarbeitung, die Boost.Asio im Auftrag des Anwendungscodes ausführen muss. Manchmal kann Boost.Asio einen Teil der Arbeit beginnen, sobald es darüber informiert wurde, und manchmal kann es warten, bis die Arbeit zu einem späteren Zeitpunkt erledigt ist. Nach Abschluss der Arbeiten informiert Boost.Asio die Anwendung, indem es den mitgelieferten Handler aufruft .

Boost.Asio garantiert , dass Handler nur in einem Thread ausgeführt werden , die zur Zeit rufen run(), run_one(), poll(), oder poll_one(). Dies sind die Threads, die funktionieren und Handler aufrufen . Daher wird im obigen Beispiel print()nicht aufgerufen, wenn es in io_service(1) gebucht wird . Stattdessen wird es dem hinzugefügt io_serviceund zu einem späteren Zeitpunkt aufgerufen. In diesem Fall innerhalb von io_service.run()(5).

Was sind asynchrone Operationen?

Eine asynchrone Operation erstellt Arbeit und Boost.Asio ruft einen Handler auf, um die Anwendung zu informieren, wenn die Arbeit abgeschlossen ist. Asynchrone Operationen werden durch Aufrufen einer Funktion erstellt, die einen Namen mit dem Präfix hat async_. Diese Funktionen werden auch als Initiierungsfunktionen bezeichnet .

Asynchrone Operationen können in drei eindeutige Schritte zerlegt werden:

  • Das Initiieren oder Informieren der zugehörigen io_serviceArbeit muss durchgeführt werden. Die async_receiveOperation (3) informiert den, io_servicedass Daten asynchron aus dem Socket gelesen werden müssen, und async_receivekehrt dann sofort zurück.
  • Die eigentliche Arbeit erledigen. In diesem Fall werden beim socketEmpfang von Daten Bytes gelesen und kopiert buffer. Die eigentliche Arbeit wird entweder erledigt in:
    • Die auslösende Funktion (3), wenn Boost.Asio feststellen kann, dass sie nicht blockiert.
    • Wenn die Anwendung explizit die io_service(5) ausführt.
  • Unter Berufung auf die handle_async_receive ReadHandler . Auch hier werden Handler nur in Threads aufgerufen, in denen das ausgeführt wird io_service. Unabhängig davon, wann die Arbeit erledigt ist (3 oder 5), ist somit garantiert, dass handle_async_receive()nur innerhalb von io_service.run()(5) aufgerufen wird .

Die zeitliche und räumliche Trennung zwischen diesen drei Schritten wird als Kontrollflussinversion bezeichnet. Dies ist eine der Komplexitäten, die die asynchrone Programmierung erschwert. Es gibt jedoch Techniken, die helfen können, dies zu mildern, beispielsweise durch die Verwendung von Coroutinen .

Was macht io_service.run()?

Wenn ein Thread aufruft io_service.run(), werden Arbeit und Handler aus diesem Thread heraus aufgerufen. Im obigen Beispiel wird io_service.run()(5) blockiert, bis entweder:

  • Es wurde von beiden printHandlern aufgerufen und zurückgegeben , der Empfangsvorgang wird mit Erfolg oder Misserfolg abgeschlossen, und sein handle_async_receiveHandler wurde aufgerufen und zurückgegeben.
  • Das io_servicewird explizit über gestoppt io_service::stop().
  • Eine Ausnahme wird innerhalb eines Handlers ausgelöst.

Ein möglicher pseudo-isch Fluss könnte wie folgt beschrieben werden:

Erstelle io_service
Socket erstellen
Druckhandler zu io_service hinzufügen (1)
Warten Sie, bis die Buchse angeschlossen ist (2).
Hinzufügen einer asynchronen Lesearbeitsanforderung zum io_service (3)
Druckhandler zu io_service hinzufügen (4)
Führen Sie den io_service aus (5)
  Gibt es Arbeit oder Handler?
    Ja, es gibt 1 Arbeit und 2 Handler
      Hat Socket Daten? Nein, tu nichts
      Druckhandler ausführen (1)
  Gibt es Arbeit oder Handler?
    Ja, es gibt 1 Arbeit und 1 Handler
      Hat Socket Daten? Nein, tu nichts
      Druckhandler ausführen (4)
  Gibt es Arbeit oder Handler?
    Ja, es gibt 1 Arbeit
      Hat Socket Daten? Nein, warte weiter
  - Socket empfängt Daten -
      Socket hat Daten, lesen Sie sie in den Puffer
      Fügen Sie den Handler handle_async_receive zu io_service hinzu
  Gibt es Arbeit oder Handler?
    Ja, es gibt 1 Handler
      Führen Sie den Handler handle_async_receive aus (3).
  Gibt es Arbeit oder Handler?
    nein, setze io_service auf gestoppt und kehre zurück

Beachten Sie, dass nach dem Lesen ein weiterer Handler zum hinzugefügt wurde io_service. Dieses subtile Detail ist ein wichtiges Merkmal der asynchronen Programmierung. Dadurch können Handler miteinander verkettet werden. Wenn beispielsweise handle_async_receivenicht alle erwarteten Daten abgerufen werden, kann die Implementierung eine weitere asynchrone Leseoperation veröffentlichen, was zu io_servicemehr Arbeit führt und somit nicht zurückkehrt io_service.run().

Beachten Sie, dass io_servicedie Anwendung reset()die Funktion ausführen muss, io_servicebevor sie erneut ausgeführt wird , wenn die Arbeit abgelaufen ist .


Beispielfrage und Beispiel 3a Code

Lassen Sie uns nun die beiden Codeteile untersuchen, auf die in der Frage verwiesen wird.

Fragencode

socket->async_receivefügt dem hinzu io_service. Somit io_service->run()wird blockiert , bis der Lesevorgang abgeschlossen ist mit Erfolg oder Fehlern, und ClientReceiveEventhat entweder fertigen Laufen oder eine Ausnahme auslöst.

Beispiel 3a Code

In der Hoffnung, das Verständnis zu erleichtern, finden Sie hier ein kleineres kommentiertes Beispiel 3a:

void CalculateFib(std::size_t n);

int main()
{
  boost::asio::io_service io_service;
  boost::optional<boost::asio::io_service::work> work =       // '. 1
      boost::in_place(boost::ref(io_service));                // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  work = boost::none;                                         // 4
  worker_threads.join_all();                                  // 5
}

Auf hoher Ebene erstellt das Programm zwei Threads, die die io_serviceEreignisschleife (2) verarbeiten. Dies führt zu einem einfachen Thread-Pool, der Fibonacci-Zahlen berechnet (3).

Der einzige wesentliche Unterschied zwischen dem Fragencode und diesem Code besteht darin, dass dieser Code io_service::run()(2) aufruft, bevor die eigentliche Arbeit und die Handler zu io_service(3) hinzugefügt werden . Um zu verhindern, dass der io_service::run()sofort zurückkehrt, wird ein io_service::workObjekt erstellt (1). Dieses Objekt verhindert, dass die io_serviceArbeit ausgeht. Daher io_service::run()wird nicht als Ergebnis keiner Arbeit zurückkehren.

Der Gesamtfluss ist wie folgt:

  1. Erstellen Sie das io_service::workhinzugefügte Objekt und fügen Sie es hinzu io_service.
  2. Thread-Pool erstellt, der aufruft io_service::run(). Diese Arbeitsthreads werden io_serviceaufgrund des io_service::workObjekts nicht zurückgegeben.
  3. Fügen Sie 3 Handler hinzu, die Fibonacci-Zahlen berechnen io_service, und kehren Sie sofort zurück. Die Worker-Threads, nicht der Haupt-Thread, können diese Handler sofort ausführen.
  4. Löschen Sie das io_service::workObjekt.
  5. Warten Sie, bis die Arbeitsthreads beendet sind. Dies tritt erst auf, wenn alle 3 Handler die Ausführung beendet haben, da io_serviceweder Handler noch Arbeit vorhanden sind.

Der Code kann auf die gleiche Weise wie der Originalcode anders geschrieben werden, wobei Handler zum Code hinzugefügt io_servicewerden und dann die io_serviceEreignisschleife verarbeitet wird. Dies macht die Verwendung überflüssig io_service::workund führt zu folgendem Code:

int main()
{
  boost::asio::io_service io_service;

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'
  worker_threads.join_all();                                  // 5
}

Synchron vs. asynchron

Obwohl der fragliche Code eine asynchrone Operation verwendet, funktioniert er effektiv synchron, da er darauf wartet, dass die asynchrone Operation abgeschlossen wird:

socket.async_receive(buffer, handler)
io_service.run();

ist äquivalent zu:

boost::asio::error_code error;
std::size_t bytes_transferred = socket.receive(buffer, 0, error);
handler(error, bytes_transferred);

Als allgemeine Faustregel sollten Sie vermeiden, synchrone und asynchrone Operationen zu mischen. Oft kann es ein komplexes System in ein kompliziertes System verwandeln. Diese Antwort zeigt die Vorteile der asynchronen Programmierung auf, von denen einige auch in der Boost.Asio- Dokumentation behandelt werden .

Tanner Sansbury
quelle
13
Super Beitrag. Ich möchte nur eines hinzufügen, weil ich der Meinung bin, dass es nicht genug Aufmerksamkeit erhält: Nachdem run () zurückgekehrt ist, müssen Sie reset () auf Ihrem io_service aufrufen, bevor Sie es erneut ausführen können (). Andernfalls wird möglicherweise sofort zurückgegeben, ob asynchrone Operationen warten oder nicht.
DeVadder
Woher kommt der Puffer? Was ist es?
Ruipacheco
Ich bin immer noch verwirrt. Wenn das Mischen synchron und asynchron nicht empfohlen wird, was ist dann der reine asynchrone Modus? Können Sie ein Beispiel geben, das den Code ohne io_service.run () zeigt?
Splash
@Splash Man kann io_service.poll()die Ereignisschleife verarbeiten, ohne ausstehende Operationen zu blockieren. Die Hauptempfehlung, um das Mischen von synchronen und asynchronen Vorgängen zu vermeiden, besteht darin, unnötige Komplexität zu vermeiden und eine schlechte Reaktionsfähigkeit zu vermeiden, wenn die Ausführung von Handlern lange dauert. Es gibt einige Fälle, in denen es sicher ist, beispielsweise wenn man weiß, dass der Synchronbetrieb nicht blockiert.
Tanner Sansbury
Was meinst du mit "aktuell" in "Boost.Asio garantiert, dass Handler nur in einem Thread ausgeführt werden, der gerade aufruftrun() ..." ? Wenn es N Threads gibt (die aufgerufen haben run()), welcher ist dann "aktueller" Thread? Es kann viele geben? Oder meinst du damit, dass der Thread, der die Ausführung async_*()(sagen wir async_read) beendet hat, garantiert auch seine Handler aufruft?
Nawaz
18

Stellen runSie sich das als einen Mitarbeiter vor, der einen Stapel Papier verarbeiten muss. es nimmt ein Blatt, tut, was das Blatt sagt, wirft das Blatt weg und nimmt das nächste; Wenn ihm die Laken ausgehen, verlässt er das Büro. Auf jedem Blatt kann jede Art von Anweisung stehen, sogar ein neues Blatt zum Stapel hinzufügen. Zurück zu Asio: Sie zu einem geben kann io_serviceArbeit in zweierlei Hinsicht, im Wesentlichen: durch die Verwendung postauf sie wie in der Probe Sie verknüpft oder durch andere Objekte verwenden , die intern anrufen postauf die io_service, wie die socketund ihre async_*Methoden.

Loghorn
quelle