C ++ 0x hat keine Semaphoren? Wie synchronisiere ich Threads?

135

Stimmt es, dass C ++ 0x ohne Semaphore kommt? Es gibt bereits einige Fragen zum Stapelüberlauf bezüglich der Verwendung von Semaphoren. Ich benutze sie (Posix-Semaphoren) die ganze Zeit, um einen Thread auf ein Ereignis in einem anderen Thread warten zu lassen:

void thread0(...)
{
  doSomething0();

  event1.wait();

  ...
}

void thread1(...)
{
  doSomething1();

  event1.post();

  ...
}

Wenn ich das mit einem Mutex machen würde:

void thread0(...)
{
  doSomething0();

  event1.lock(); event1.unlock();

  ...
}

void thread1(...)
{
  event1.lock();

  doSomethingth1();

  event1.unlock();

  ...
}

Problem: Es ist hässlich und es kann nicht garantiert werden, dass Thread1 zuerst den Mutex sperrt (Da derselbe Thread einen Mutex sperren und entsperren sollte, können Sie Ereignis1 auch nicht sperren, bevor Thread0 und Thread1 gestartet wurden).

Also, da Boost auch keine Semaphoren hat, was ist der einfachste Weg, um das oben genannte zu erreichen?

Tauran
quelle
Vielleicht die Bedingung mutex und std :: Versprechen und std :: Zukunft verwenden?
Yves

Antworten:

180

Sie können einfach eine aus einem Mutex und einer Bedingungsvariablen erstellen:

#include <mutex>
#include <condition_variable>

class semaphore
{
private:
    std::mutex mutex_;
    std::condition_variable condition_;
    unsigned long count_ = 0; // Initialized as locked.

public:
    void notify() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        ++count_;
        condition_.notify_one();
    }

    void wait() {
        std::unique_lock<decltype(mutex_)> lock(mutex_);
        while(!count_) // Handle spurious wake-ups.
            condition_.wait(lock);
        --count_;
    }

    bool try_wait() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        if(count_) {
            --count_;
            return true;
        }
        return false;
    }
};
Maxim Egorushkin
quelle
96
Jemand sollte dem Standardkomitee einen Vorschlag unterbreiten
Ion Todirel
7
Ein Kommentar hier, der mich anfangs verwirrte, ist die Warteschleife. Man könnte sich fragen, wie ein Thread darüber hinwegkommen kann, wenn die Sperre durch Warten gehalten wird. Die etwas schlecht undurchsichtig dokumentierte Antwort lautet, dass condition_variable.wait die Sperre pulsiert und es einem anderen Thread ermöglicht, auf atomare Weise an der Benachrichtigung vorbei zu kommen, zumindest verstehe ich das so
Ion Todirel
31
Es wurde absichtlich von Boost ausgeschlossen, da ein Semaphor zu viel Seil ist, als dass sich Programmierer daran aufhängen könnten. Bedingungsvariablen sind angeblich überschaubarer. Ich verstehe ihren Standpunkt, fühle mich aber ein bisschen bevormundet. Ich gehe davon aus, dass die gleiche Logik für C ++ 11 gilt - von Programmierern wird erwartet, dass sie ihre Programme so schreiben, dass "natürlich" Kondvare oder andere zugelassene Synchronisationstechniken verwendet werden. Die Bereitstellung eines Semaphors würde dem widersprechen, unabhängig davon, ob es auf Condvar oder nativ implementiert ist.
Steve Jessop
5
Hinweis - Die Gründe für die while(!count_)Schleife finden Sie unter en.wikipedia.org/wiki/Spurious_wakeup .
Dan Nissenbaum
3
@ Maxim Es tut mir leid, ich glaube nicht, dass du Recht hast. sem_wait und sem_post syscall auch nur bei Konflikten ( siehe sourceware.org/git/?p=glibc.git;a=blob;f=nptl/sem_wait.c ), sodass der Code hier die libc-Implementierung mit potenziellen Fehlern dupliziert. Wenn Sie die Portabilität auf einem beliebigen System beabsichtigen, ist dies möglicherweise eine Lösung. Wenn Sie jedoch nur Posix-Kompatibilität benötigen, verwenden Sie das Posix-Semaphor.
Xryl669
107

Basierend auf der Antwort von Maxim Yegorushkin habe ich versucht, das Beispiel im C ++ 11-Stil zu erstellen .

#include <mutex>
#include <condition_variable>

class Semaphore {
public:
    Semaphore (int count_ = 0)
        : count(count_) {}

    inline void notify()
    {
        std::unique_lock<std::mutex> lock(mtx);
        count++;
        cv.notify_one();
    }

    inline void wait()
    {
        std::unique_lock<std::mutex> lock(mtx);

        while(count == 0){
            cv.wait(lock);
        }
        count--;
    }

private:
    std::mutex mtx;
    std::condition_variable cv;
    int count;
};
Tsuneo Yoshioka
quelle
34
Sie können warten () auch einen Dreiliner machen:cv.wait(lck, [this]() { return count > 0; });
Domi
2
Das Hinzufügen einer weiteren Klasse im Sinne von lock_guard ist ebenfalls hilfreich. Auf RAII-Weise ruft der Konstruktor, der das Semaphor als Referenz verwendet, den Aufruf wait () des Semaphors auf, und der Destruktor ruft seinen Aufruf notify () auf. Dies verhindert, dass Ausnahmen das Semaphor nicht freigeben können.
Jim Hunziker
Gibt es keine Deadlock-Sperre? Wenn beispielsweise N Threads mit dem Namen wait () und count == 0 angegeben werden, wird cv.notify_one () verwendet. wird nie aufgerufen, da der mtx nicht veröffentlicht hat?
Marcello
1
@Marcello Die wartenden Threads halten das Schloss nicht. Der springende Punkt bei Bedingungsvariablen ist die Bereitstellung einer atomaren Operation zum Entsperren und Warten.
David Schwartz
3
Sie sollten die Sperre aufheben, bevor Sie notify_one () aufrufen, um zu vermeiden, dass das Aufwecken sofort blockiert wird ... siehe hier: en.cppreference.com/w/cpp/thread/condition_variable/notify_all
kylefinn
38

Ich habe mich entschlossen, das robusteste / generischste C ++ 11-Semaphor zu schreiben, das ich im Stil des Standards so oft wie möglich schreiben konnte (beachten using semaphore = ...Sie, dass Sie normalerweise nur den Namen verwenden würden, der semaphoredem normalen Verwenden von stringnot ähnelt basic_string):

template <typename Mutex, typename CondVar>
class basic_semaphore {
public:
    using native_handle_type = typename CondVar::native_handle_type;

    explicit basic_semaphore(size_t count = 0);
    basic_semaphore(const basic_semaphore&) = delete;
    basic_semaphore(basic_semaphore&&) = delete;
    basic_semaphore& operator=(const basic_semaphore&) = delete;
    basic_semaphore& operator=(basic_semaphore&&) = delete;

    void notify();
    void wait();
    bool try_wait();
    template<class Rep, class Period>
    bool wait_for(const std::chrono::duration<Rep, Period>& d);
    template<class Clock, class Duration>
    bool wait_until(const std::chrono::time_point<Clock, Duration>& t);

    native_handle_type native_handle();

private:
    Mutex   mMutex;
    CondVar mCv;
    size_t  mCount;
};

using semaphore = basic_semaphore<std::mutex, std::condition_variable>;

template <typename Mutex, typename CondVar>
basic_semaphore<Mutex, CondVar>::basic_semaphore(size_t count)
    : mCount{count}
{}

template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::notify() {
    std::lock_guard<Mutex> lock{mMutex};
    ++mCount;
    mCv.notify_one();
}

template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::wait() {
    std::unique_lock<Mutex> lock{mMutex};
    mCv.wait(lock, [&]{ return mCount > 0; });
    --mCount;
}

template <typename Mutex, typename CondVar>
bool basic_semaphore<Mutex, CondVar>::try_wait() {
    std::lock_guard<Mutex> lock{mMutex};

    if (mCount > 0) {
        --mCount;
        return true;
    }

    return false;
}

template <typename Mutex, typename CondVar>
template<class Rep, class Period>
bool basic_semaphore<Mutex, CondVar>::wait_for(const std::chrono::duration<Rep, Period>& d) {
    std::unique_lock<Mutex> lock{mMutex};
    auto finished = mCv.wait_for(lock, d, [&]{ return mCount > 0; });

    if (finished)
        --mCount;

    return finished;
}

template <typename Mutex, typename CondVar>
template<class Clock, class Duration>
bool basic_semaphore<Mutex, CondVar>::wait_until(const std::chrono::time_point<Clock, Duration>& t) {
    std::unique_lock<Mutex> lock{mMutex};
    auto finished = mCv.wait_until(lock, t, [&]{ return mCount > 0; });

    if (finished)
        --mCount;

    return finished;
}

template <typename Mutex, typename CondVar>
typename basic_semaphore<Mutex, CondVar>::native_handle_type basic_semaphore<Mutex, CondVar>::native_handle() {
    return mCv.native_handle();
}
David
quelle
Dies funktioniert mit einer geringfügigen Änderung. Die Methodenaufrufe wait_forund wait_untilmit dem Prädikat geben einen booleschen Wert zurück (kein `std :: cv_status).
JDknight
Es tut mir leid, dass ich so spät im Spiel nicht ausgewählt habe. std::size_tist ohne Vorzeichen, so dass das Dekrementieren unter Null UB ist und es immer sein wird >= 0. IMHO countsollte ein sein int.
Richard Hodges
3
@RichardHodges gibt es keine Möglichkeit, unter Null zu dekrementieren, also gibt es kein Problem, und was würde eine negative Zählung auf einem Semaphor bedeuten? Das macht IMO nicht mal Sinn.
David
1
@ David Was wäre, wenn ein Thread darauf warten müsste, dass andere Dinge initialisieren? Wenn beispielsweise 1 Reader-Thread auf 4 Threads wartet, würde ich den Semaphor-Konstruktor mit -3 aufrufen, damit der Reader-Thread wartet, bis alle anderen Threads einen Beitrag verfasst haben. Ich denke, es gibt andere Möglichkeiten, das zu tun, aber ist es nicht vernünftig? Ich denke, es ist tatsächlich die Frage, die das OP stellt, aber mit mehr "thread1".
Jmmut
2
@RichardHodges, um sehr pedantisch zu sein, ist das Dekrementieren eines vorzeichenlosen Integer-Typs unter 0 nicht UB.
JCai
15

in Übereinstimmung mit Posix-Semaphoren würde ich hinzufügen

class semaphore
{
    ...
    bool trywait()
    {
        boost::mutex::scoped_lock lock(mutex_);
        if(count_)
        {
            --count_;
            return true;
        }
        else
        {
            return false;
        }
    }
};

Und ich bevorzuge es sehr, einen Synchronisationsmechanismus auf einer bequemen Abstraktionsebene zu verwenden, anstatt immer das Zusammenfügen einer zusammengefügten Version mit grundlegenderen Operatoren zu kopieren.

Michael Zillich
quelle
9

Sie können auch cpp11-on-multicore ausprobieren - es verfügt über eine tragbare und optimale Semaphor-Implementierung.

Das Repository enthält auch andere Threading-Extras, die das C ++ 11-Threading ergänzen.

onqtam
quelle
8

Sie können mit Mutex- und Bedingungsvariablen arbeiten. Sie erhalten exklusiven Zugriff mit dem Mutex. Prüfen Sie, ob Sie fortfahren möchten oder auf das andere Ende warten müssen. Wenn Sie warten müssen, warten Sie in einem Zustand. Wenn der andere Thread feststellt, dass Sie fortfahren können, signalisiert er den Zustand.

In der Boost :: Thread-Bibliothek gibt es ein kurzes Beispiel , das Sie höchstwahrscheinlich einfach kopieren können (die C ++ 0x- und Boost-Thread-Bibliotheken sind sehr ähnlich).

David Rodríguez - Dribeas
quelle
Bedingungssignale nur an wartende Threads oder nicht? Wenn Thread0 nicht da ist und wartet, wenn Thread1 signalisiert, wird es später blockiert? Plus: Ich brauche kein zusätzliches Schloss, das mit der Bedingung geliefert wird - es ist Overhead.
Tauran
Ja, Bedingung signalisiert nur wartende Threads. Das übliche Muster besteht darin, eine Variable mit dem Status und einer Bedingung für den Fall zu haben, dass Sie warten müssen. Denken Sie an einen Produzenten / Konsumenten, es wird eine Zählung der Elemente im Puffer geben, der Produzent sperrt, fügt das Element hinzu, erhöht die Zählung und die Signale. Der Verbraucher sperrt, überprüft den Zähler und verbraucht, wenn er nicht Null ist, während Null in der Bedingung wartet.
David Rodríguez - Dribeas
2
Sie können simulieren , eine auf diese Weise Semaphore: initialisieren eine Variable mit dem Wert, den Sie würde die Semaphore geben, dann wait()zu übersetzt „lock, Prüfanzahl wenn nicht Null Abnahme- und weiter, und wenn Null - Warte unter der Bedingung“ , während postwäre „Schloss, Inkrementzähler, Signal, wenn es 0 "war
David Rodríguez - Dribeas
Ja, klingt gut. Ich frage mich, ob Posix-Semaphoren auf die gleiche Weise implementiert werden.
Tauran
@tauran: Ich weiß nicht genau (und es könnte davon abhängen, welches Posix-Betriebssystem), aber ich halte es für unwahrscheinlich. Semaphoren sind traditionell ein "untergeordnetes" Synchronisationsprimitiv als Mutexe und Bedingungsvariablen und können im Prinzip effizienter gemacht werden, als wenn sie auf einer Kondvar implementiert würden. In einem bestimmten Betriebssystem ist es daher wahrscheinlicher, dass alle Synchronisierungsprimitive auf Benutzerebene auf einigen gängigen Tools basieren, die mit dem Scheduler interagieren.
Steve Jessop
3

Kann auch ein nützlicher RAII-Semaphor-Wrapper in Threads sein:

class ScopedSemaphore
{
public:
    explicit ScopedSemaphore(Semaphore& sem) : m_Semaphore(sem) { m_Semaphore.Wait(); }
    ScopedSemaphore(const ScopedSemaphore&) = delete;
    ~ScopedSemaphore() { m_Semaphore.Notify(); }

   ScopedSemaphore& operator=(const ScopedSemaphore&) = delete;

private:
    Semaphore& m_Semaphore;
};

Anwendungsbeispiel in der Multithread-App:

boost::ptr_vector<std::thread> threads;
Semaphore semaphore;

for (...)
{
    ...
    auto t = new std::thread([..., &semaphore]
    {
        ScopedSemaphore scopedSemaphore(semaphore);
        ...
    }
    );
    threads.push_back(t);
}

for (auto& t : threads)
    t.join();
slasla
quelle
3

C ++ 20 wird endlich Semaphoren haben - std::counting_semaphore<max_count>.

Diese haben (mindestens) die folgenden Methoden:

  • acquire() (Blockierung)
  • try_acquire() (nicht blockierend, wird sofort zurückgegeben)
  • try_acquire_for() (nicht blockierend, dauert eine Dauer)
  • try_acquire_until() (Nicht blockierend, benötigt eine Zeit, um den Versuch zu beenden)
  • release()

Dies ist noch nicht in cppreference aufgeführt, aber Sie können diese CppCon 2019-Präsentationsfolien lesen oder das Video ansehen . Es gibt auch den offiziellen Vorschlag P0514R4 , aber ich bin mir nicht sicher, ob dies die aktuellste Version ist.

einpoklum
quelle
2

Ich fand, dass shared_ptr und schwaches_ptr, ein langes mit einer Liste, die Arbeit erledigten, die ich brauchte. Mein Problem war, dass ich mehrere Clients hatte, die mit den internen Daten eines Hosts interagieren wollten. In der Regel aktualisiert der Host die Daten selbst. Wenn ein Client dies anfordert, muss der Host die Aktualisierung jedoch beenden, bis keine Clients mehr auf die Hostdaten zugreifen. Gleichzeitig kann ein Client einen exklusiven Zugriff anfordern, sodass weder andere Clients noch der Host diese Hostdaten ändern können.

Wie ich das gemacht habe, habe ich eine Struktur erstellt:

struct UpdateLock
{
    typedef std::shared_ptr< UpdateLock > ptr;
};

Jeder Kunde hätte ein Mitglied von folgenden:

UpdateLock::ptr m_myLock;

Dann hätte der Host ein schwaches_Ptr-Mitglied für Exklusivität und eine Liste von schwachen_Ptrs für nicht exklusive Sperren:

std::weak_ptr< UpdateLock > m_exclusiveLock;
std::list< std::weak_ptr< UpdateLock > > m_locks;

Es gibt eine Funktion zum Aktivieren der Sperre und eine andere Funktion zum Überprüfen, ob der Host gesperrt ist:

UpdateLock::ptr LockUpdate( bool exclusive );       
bool IsUpdateLocked( bool exclusive ) const;

Ich teste in LockUpdate, IsUpdateLocked und regelmäßig in der Update-Routine des Hosts auf Sperren. Das Testen auf eine Sperre ist so einfach wie das Überprüfen, ob der schwache_PTR abgelaufen ist, und das Entfernen von abgelaufenen Dateien aus der m_Locks-Liste (ich mache dies nur während des Host-Updates). Ich kann überprüfen, ob die Liste leer ist. Gleichzeitig wird die automatische Entsperrung automatisch aktiviert, wenn ein Client das shared_ptr zurücksetzt, an dem er festhält. Dies geschieht auch, wenn ein Client automatisch zerstört wird.

Der Gesamteffekt ist, dass Clients selten Exklusivität benötigen (normalerweise nur für Hinzufügungen und Löschungen reserviert). Meistens ist eine Anforderung an LockUpdate (false), dh nicht exklusiv, erfolgreich, solange (! M_exclusiveLock). Und ein LockUpdate (true), eine Anfrage nach Exklusivität, ist nur erfolgreich, wenn sowohl (! ​​M_exclusiveLock) als auch (m_locks.empty ()).

Es könnte eine Warteschlange hinzugefügt werden, um zwischen exklusiven und nicht exklusiven Sperren abzumildern. Bisher gab es jedoch keine Kollisionen. Daher möchte ich warten, bis die Lösung hinzugefügt wird (meistens, damit ich eine reale Testbedingung habe).

Bisher funktioniert dies gut für meine Bedürfnisse; Ich kann mir vorstellen, dass dies erweitert werden muss, und einige Probleme, die bei einer erweiterten Verwendung auftreten können, waren jedoch schnell zu implementieren und erforderten nur sehr wenig benutzerdefinierten Code.

Kit10
quelle
-4

Falls sich jemand für die Atomversion interessiert, finden Sie hier die Implementierung. Die Leistung wird besser erwartet als die Version mit Mutex- und Bedingungsvariablen.

class semaphore_atomic
{
public:
    void notify() {
        count_.fetch_add(1, std::memory_order_release);
    }

    void wait() {
        while (true) {
            int count = count_.load(std::memory_order_relaxed);
            if (count > 0) {
                if (count_.compare_exchange_weak(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                    break;
                }
            }
        }
    }

    bool try_wait() {
        int count = count_.load(std::memory_order_relaxed);
        if (count > 0) {
            if (count_.compare_exchange_strong(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                return true;
            }
        }
        return false;
    }
private:
    std::atomic_int count_{0};
};
Jeffery
quelle
4
Ich würde erwarten, dass die Leistung viel schlechter wird. Dieser Code macht buchstäblich jeden möglichen Fehler. Angenommen, der waitCode muss mehrmals wiederholt werden. Wenn es endgültig entsperrt wird, wird die Mutter aller falsch vorhergesagten Zweige benötigt, da die Schleifenvorhersage der CPU mit Sicherheit vorhersagt, dass es wieder eine Schleife geben wird. Ich könnte noch viele weitere Probleme mit diesem Code auflisten.
David Schwartz
1
Hier ist ein weiterer offensichtlicher Leistungskiller: Die waitSchleife verbraucht beim Drehen CPU-Mikroausführungsressourcen. Angenommen, es befindet sich im selben physischen Kern wie der Thread, der es soll notify- es wird diesen Thread fürchterlich verlangsamen.
David Schwartz
1
Und hier noch eine: Auf x86-CPUs (den derzeit beliebtesten CPUs) ist eine compare_exchange_weak-Operation immer eine Schreiboperation, auch wenn sie fehlschlägt (sie schreibt denselben Wert zurück, den sie gelesen hat, wenn der Vergleich fehlschlägt). Angenommen, zwei Kerne befinden sich beide in einer waitSchleife für dasselbe Semaphor. Beide schreiben mit voller Geschwindigkeit in dieselbe Cache-Zeile, wodurch andere Kerne durch Sättigung der Inter-Core-Busse zum Crawlen gebracht werden können.
David Schwartz
@ DavidSchwartz Freut mich über Ihre Kommentare. Ich bin mir nicht sicher, ob ich den Teil '... CPU's Loop Prediction ...' verstehe. Einverstanden mit dem 2 .. Anscheinend kann Ihr dritter Fall auftreten, aber im Vergleich zu Mutex, der dazu führt, dass der Benutzermodus zwischen Kernelmodus und Systemaufruf wechselt, ist die Synchronisierung zwischen den Kernen nicht schlechter.
Jeffery
1
Es gibt kein sperrfreies Semaphor. Die ganze Idee, sperrenfrei zu sein, besteht nicht darin, Code ohne Verwendung von Mutexen zu schreiben, sondern Code zu schreiben, in dem ein Thread niemals blockiert. In diesem Fall besteht das Wesentliche des Semaphors darin, Threads zu blockieren, die die Funktion wait () aufrufen!
Carlo Wood