Erstellen einer blockierenden Warteschlange <T> in .NET?

163

Ich habe ein Szenario, in dem mehrere Threads zu einer Warteschlange hinzugefügt werden und mehrere Threads aus derselben Warteschlange lesen. Wenn die Warteschlange eine bestimmte Größe erreicht, werden alle Threads , die die Warteschlange füllen, beim Hinzufügen blockiert, bis ein Element aus der Warteschlange entfernt wird.

Die folgende Lösung verwende ich gerade und meine Frage lautet: Wie kann dies verbessert werden? Gibt es ein Objekt, das dieses Verhalten bereits in der BCL aktiviert, das ich verwenden sollte?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}
Eric Schoonover
quelle
5
.Net wie hat eingebaute Klassen, um bei diesem Szenario zu helfen. Die meisten der hier aufgeführten Antworten sind veraltet. Die neuesten Antworten finden Sie unten. Sehen Sie sich threadsichere Blockierungssammlungen an. Die Antworten mögen veraltet sein, aber es ist immer noch eine gute Frage!
Tom A
Ich denke, es ist immer noch eine gute Idee, etwas über Monitor.Wait / Pulse / PulseAll zu lernen, selbst wenn wir neue gleichzeitige Klassen in .NET haben.
Thewpfguy
1
Stimmen Sie mit @thewpfguy überein. Sie sollten die grundlegenden Verriegelungsmechanismen hinter den Kulissen verstehen. Erwähnenswert ist auch, dass Systems.Collections.Concurrent erst im April 2010 und dann nur in Visual Studio 2010 und höher vorhanden war. Definitiv keine Option für die VS2008 Hold-Outs ...
Vic
Wenn Sie dies jetzt lesen, werfen Sie einen Blick auf System.Threading.Channels, um eine begrenzte, optional blockierende Implementierung für Multi. Writer / Multi-Reader für .NET Core und .NET Standard zu erhalten.
Mark Rendle

Antworten:

200

Das sieht sehr unsicher aus (sehr wenig Synchronisation); wie wäre es mit so etwas wie:

class SizeQueue<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int maxSize;
    public SizeQueue(int maxSize) { this.maxSize = maxSize; }

    public void Enqueue(T item)
    {
        lock (queue)
        {
            while (queue.Count >= maxSize)
            {
                Monitor.Wait(queue);
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }
        }
    }
    public T Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            T item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
                // wake up any blocked enqueue
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }
}

(bearbeiten)

In Wirklichkeit möchten Sie die Warteschlange schließen, damit die Leser sauber beendet werden - vielleicht so etwas wie ein Bool-Flag -, wenn sie gesetzt ist, wird eine leere Warteschlange nur zurückgegeben (anstatt zu blockieren):

bool closing;
public void Close()
{
    lock(queue)
    {
        closing = true;
        Monitor.PulseAll(queue);
    }
}
public bool TryDequeue(out T value)
{
    lock (queue)
    {
        while (queue.Count == 0)
        {
            if (closing)
            {
                value = default(T);
                return false;
            }
            Monitor.Wait(queue);
        }
        value = queue.Dequeue();
        if (queue.Count == maxSize - 1)
        {
            // wake up any blocked enqueue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}
Marc Gravell
quelle
1
Wie wäre es, wenn Sie die Wartezeit in eine WaitAny ändern und eine Kündigungsfrist für den Bau übergeben ...
Sam Saffron
1
@ Marc - Eine Optimierung, wenn Sie erwarten würden, dass die Warteschlange immer die Kapazität erreicht, besteht darin, den Wert maxSize an den Konstruktor der Warteschlange <T> zu übergeben. Sie können Ihrer Klasse einen weiteren Konstruktor hinzufügen, um dies zu berücksichtigen.
RichardOD
3
Warum SizeQueue, warum nicht FixedSizeQueue?
mindless.panda
4
@Lasse - Es gibt die Sperre (n) während frei Wait, damit andere Threads sie erwerben können. Es wacht die Sperre (n) zurück, wenn es aufwacht.
Marc Gravell
1
Schön, wie gesagt, es gab etwas, das ich nicht bekommen habe :) Das bringt mich sicher dazu, einen Teil meines Thread-Codes noch einmal zu besuchen ...
Lasse V. Karlsen
14

"Wie kann das verbessert werden?"

Nun, Sie müssen sich jede Methode in Ihrer Klasse ansehen und überlegen, was passieren würde, wenn ein anderer Thread gleichzeitig diese Methode oder eine andere Methode aufruft. Beispielsweise setzen Sie eine Sperre für die Remove-Methode, nicht jedoch für die Add-Methode. Was passiert, wenn ein Thread gleichzeitig mit einem anderen Thread hinzugefügt wird? Schlechte Dinge.

Beachten Sie auch, dass eine Methode ein zweites Objekt zurückgeben kann, das Zugriff auf die internen Daten des ersten Objekts bietet, z. B. GetEnumerator. Stellen Sie sich vor, ein Thread durchläuft diesen Enumerator, ein anderer Thread ändert gleichzeitig die Liste. Nicht gut.

Eine gute Faustregel ist, dies zu vereinfachen, indem die Anzahl der Methoden in der Klasse auf das absolute Minimum reduziert wird.

Erben Sie insbesondere keine andere Containerklasse, da Sie alle Methoden dieser Klasse verfügbar machen und dem Aufrufer die Möglichkeit bieten, die internen Daten zu beschädigen oder teilweise vollständige Änderungen an den Daten anzuzeigen (genauso schlecht, weil die Daten erscheint in diesem Moment beschädigt). Verstecken Sie alle Details und seien Sie völlig rücksichtslos darüber, wie Sie den Zugriff darauf erlauben.

Ich würde Ihnen dringend empfehlen, Standardlösungen zu verwenden - ein Buch über Threading zu erhalten oder eine Bibliothek von Drittanbietern zu verwenden. Andernfalls werden Sie angesichts des Versuchs Ihren Code für eine lange Zeit debuggen.

Wäre es für Remove nicht sinnvoller, ein Element zurückzugeben (z. B. das zuerst hinzugefügte, da es sich um eine Warteschlange handelt), als dass der Anrufer ein bestimmtes Element auswählt? Und wenn die Warteschlange leer ist, sollte Entfernen möglicherweise auch blockieren.

Update: Marc's Antwort setzt tatsächlich all diese Vorschläge um! :) Aber ich lasse das hier, da es hilfreich sein kann zu verstehen, warum seine Version eine solche Verbesserung darstellt.

Daniel Earwicker
quelle
12

Sie können BlockingCollection und ConcurrentQueue im System.Collections.Concurrent-Namespace verwenden

 public class ProducerConsumerQueue<T> : BlockingCollection<T>
{
    /// <summary>
    /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
    /// </summary>
    public ProducerConsumerQueue()  
        : base(new ConcurrentQueue<T>())
    {
    }

  /// <summary>
  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
  /// </summary>
  /// <param name="maxSize"></param>
    public ProducerConsumerQueue(int maxSize)
        : base(new ConcurrentQueue<T>(), maxSize)
    {
    }



}
Andreas
quelle
3
BlockingCollection ist standardmäßig Queue. Ich denke also nicht, dass dies notwendig ist.
Curtis White
Erhält BlockingCollection die Reihenfolge wie eine Warteschlange?
Joel
Ja, wenn es mit einer ConcurrentQueue initialisiert wird
Andreas
6

Ich habe das gerade mit den Reactive Extensions aufgerissen und mich an diese Frage erinnert:

public class BlockingQueue<T>
{
    private readonly Subject<T> _queue;
    private readonly IEnumerator<T> _enumerator;
    private readonly object _sync = new object();

    public BlockingQueue()
    {
        _queue = new Subject<T>();
        _enumerator = _queue.GetEnumerator();
    }

    public void Enqueue(T item)
    {
        lock (_sync)
        {
            _queue.OnNext(item);
        }
    }

    public T Dequeue()
    {
        _enumerator.MoveNext();
        return _enumerator.Current;
    }
}

Nicht unbedingt ganz sicher, aber sehr einfach.

Mark Rendle
quelle
Was ist Betreff <t>? Ich habe keinen Resolver für seinen Namespace.
theJerm
Es ist Teil der Reactive Extensions.
Mark Rendle
Keine Antwort. Dies beantwortet die Frage überhaupt nicht.
Makhdumi
5

Dies ist, was ich für eine Thread-sichere begrenzte Blockierungswarteschlange op kam.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

public class BlockingBuffer<T>
{
    private Object t_lock;
    private Semaphore sema_NotEmpty;
    private Semaphore sema_NotFull;
    private T[] buf;

    private int getFromIndex;
    private int putToIndex;
    private int size;
    private int numItems;

    public BlockingBuffer(int Capacity)
    {
        if (Capacity <= 0)
            throw new ArgumentOutOfRangeException("Capacity must be larger than 0");

        t_lock = new Object();
        buf = new T[Capacity];
        sema_NotEmpty = new Semaphore(0, Capacity);
        sema_NotFull = new Semaphore(Capacity, Capacity);
        getFromIndex = 0;
        putToIndex = 0;
        size = Capacity;
        numItems = 0;
    }

    public void put(T item)
    {
        sema_NotFull.WaitOne();
        lock (t_lock)
        {
            while (numItems == size)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            buf[putToIndex++] = item;

            if (putToIndex == size)
                putToIndex = 0;

            numItems++;

            Monitor.Pulse(t_lock);

        }
        sema_NotEmpty.Release();


    }

    public T take()
    {
        T item;

        sema_NotEmpty.WaitOne();
        lock (t_lock)
        {

            while (numItems == 0)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            item = buf[getFromIndex++];

            if (getFromIndex == size)
                getFromIndex = 0;

            numItems--;

            Monitor.Pulse(t_lock);

        }
        sema_NotFull.Release();

        return item;
    }
}
Kevin
quelle
Könnten Sie einige Codebeispiele bereitstellen, wie ich einige Thread-Funktionen mithilfe dieser Bibliothek in die Warteschlange stellen würde, einschließlich der Art und Weise, wie ich diese Klasse instanziieren würde?
theJerm
Diese Frage / Antwort ist etwas veraltet. Sie sollten sich den System.Collections.Concurrent-Namespace ansehen, um die Unterstützung für Warteschlangen zu blockieren.
Kevin
2

Ich habe die TPL noch nicht vollständig erforscht, aber sie haben möglicherweise etwas, das Ihren Anforderungen entspricht, oder zumindest ein Reflektorfutter, von dem Sie sich inspirieren lassen können.

Hoffentlich hilft das.

TheMissingLINQ
quelle
Ich bin mir bewusst, dass dies alt ist, aber mein Kommentar richtet sich an SO-Neulinge, da OP dies bereits heute weiß. Dies ist keine Antwort, dies hätte ein Kommentar sein sollen.
John Demetriou
0

Nun, Sie könnten sich die System.Threading.SemaphoreKlasse ansehen . Davon abgesehen - nein, du musst das selbst machen. AFAIK gibt es keine solche eingebaute Sammlung.

Vilx-
quelle
Ich habe mir das angesehen, um die Anzahl der Threads zu drosseln, die auf eine Ressource zugreifen, aber es erlaubt Ihnen nicht, den gesamten Zugriff auf eine Ressource basierend auf einer bestimmten Bedingung (wie Collection.Count) zu blockieren. AFAIK sowieso
Eric Schoonover
Nun, du machst diesen Teil selbst, genau wie jetzt. Anstelle von MaxSize und _FullEvent haben Sie einfach das Semaphor, das Sie mit der richtigen Anzahl im Konstruktor initialisieren. Rufen Sie dann bei jedem Hinzufügen / Entfernen WaitForOne () oder Release () auf.
Vilx
Es ist nicht viel anders als das, was Sie jetzt haben. Einfach einfacher IMHO.
Vilx
Können Sie mir ein Beispiel geben, das diese Arbeitsweise zeigt? Ich habe nicht gesehen, wie die Größe einer Semaphor dynamisch angepasst werden kann, was für dieses Szenario erforderlich ist. Da müssen Sie in der Lage sein, alle Ressourcen nur zu blockieren, wenn die Warteschlange voll ist.
Eric Schoonover
Ahh, Größe ändern! Warum hast du das nicht sofort gesagt? OK, dann ist ein Semaphor nichts für dich. Viel Glück bei diesem Ansatz!
Vilx
-1

Wenn Sie einen maximalen Durchsatz wünschen, bei dem mehrere Leser lesen und nur ein Schreiber schreiben kann, bietet BCL ReaderWriterLockSlim, mit dem Sie Ihren Code verkleinern können ...

DavidN
quelle
Ich möchte, dass niemand schreiben kann, wenn die Warteschlange voll ist.
Eric Schoonover
Sie kombinieren es also mit einem Schloss. Hier sind einige sehr gute Beispiele albahari.com/threading/part2.aspx#_ProducerConsumerQWaitHandle albahari.com/threading/part4.aspx
DavidN
3
Mit Warteschlange / Warteschlange ist jeder ein Schriftsteller ... eine exklusive Sperre wird vielleicht pragmatischer sein
Marc Gravell
Ich bin mir bewusst, dass dies alt ist, aber mein Kommentar richtet sich an SO-Neulinge, da OP dies bereits heute weiß. Dies ist keine Antwort, dies hätte ein Kommentar sein sollen.
John Demetriou