Warteschlange mit fester Größe, die alte Werte bei neuen Enques automatisch aus der Warteschlange entfernt

119

Ich verwende ConcurrentQueuefür eine gemeinsam genutzte Datenstruktur, deren Zweck darin besteht, die letzten N übergebenen Objekte zu speichern (Art des Verlaufs).

Angenommen, wir haben einen Browser und möchten die letzten 100 durchsuchten URLs haben. Ich möchte eine Warteschlange, die den ältesten (ersten) Eintrag beim Einfügen eines neuen Eintrags (Warteschlange) automatisch löscht (in die Warteschlange stellt), wenn die Kapazität voll ist (100 Adressen im Verlauf).

Wie kann ich das mit erreichen System.Collections?

Xaqron
quelle
Es war nicht speziell für Sie gedacht, sondern für jeden, der auf diese Frage stößt und sie möglicherweise nützlich findet. Übrigens spricht es auch über C #. Haben Sie es geschafft, alle Antworten (in 2 Minuten) zu lesen und herauszufinden, dass dort kein C # -Code vorhanden ist? Jedenfalls bin ich mir selbst nicht sicher, und daher ist es ein Kommentar ...
Sie können die Methoden einfach in eine Sperre einschließen. Da sie schnell sind, können Sie einfach das gesamte Array sperren. Dies ist wahrscheinlich ein Betrug. Wenn Sie nach Zirkelpuffer-Implementierungen mit C # -Code suchen, finden Sie möglicherweise etwas. Wie auch immer - Viel Glück.

Antworten:

110

Ich würde eine Wrapper-Klasse schreiben, die bei Enqueue den Count überprüft und dann Dequeue, wenn der Count das Limit überschreitet.

 public class FixedSizedQueue<T>
 {
     ConcurrentQueue<T> q = new ConcurrentQueue<T>();
     private object lockObject = new object();

     public int Limit { get; set; }
     public void Enqueue(T obj)
     {
        q.Enqueue(obj);
        lock (lockObject)
        {
           T overflow;
           while (q.Count > Limit && q.TryDequeue(out overflow)) ;
        }
     }
 }
Richard Schneider
quelle
4
qist für das Objekt privat, so dass lockandere Threads nicht gleichzeitig darauf zugreifen können.
Richard Schneider
14
Es ist keine gute Idee, zu sperren. Der gesamte Zweck der gleichzeitigen BCL-Sammlungen besteht darin, aus Leistungsgründen eine sperrfreie Parallelität bereitzustellen. Das Sperren Ihres Codes beeinträchtigt diesen Vorteil. Tatsächlich sehe ich keinen Grund, warum Sie die deq sperren müssen.
KFL
2
@KFL, Notwendigkeit zu sperren , da Countund TryDequeuesind zwei unabhängige Operationen , die nicht von BCL Concurrent synched Pflege.
Richard Schneider
9
@RichardSchneider Wenn Sie Parallelitätsprobleme selbst lösen müssen, ist es eine gute Idee, das ConcurrentQueue<T>Objekt gegen ein Queue<T>Objekt mit geringerem Gewicht auszutauschen .
0b101010
6
Definieren Sie keine eigene Warteschlange, sondern verwenden Sie nur die geerbte. Wenn Sie das tun, was Sie tun, können Sie mit den Warteschlangenwerten nichts anderes tun. Alle anderen Funktionen außer Ihrer neuen Funktion Enqueuerufen weiterhin die ursprüngliche Warteschlange auf. Mit anderen Worten, obwohl diese Antwort als akzeptiert markiert ist, ist sie vollständig und vollständig gebrochen.
Gábor
104

Ich würde mich für eine kleine Variante entscheiden ... ConcurrentQueue erweitern, um Linq-Erweiterungen für FixedSizeQueue verwenden zu können

public class FixedSizedQueue<T> : ConcurrentQueue<T>
{
    private readonly object syncObject = new object();

    public int Size { get; private set; }

    public FixedSizedQueue(int size)
    {
        Size = size;
    }

    public new void Enqueue(T obj)
    {
        base.Enqueue(obj);
        lock (syncObject)
        {
            while (base.Count > Size)
            {
                T outObj;
                base.TryDequeue(out outObj);
            }
        }
    }
}
Dave Lawrence
quelle
1
Was passiert, wenn jemand die Instanz statisch als ConcurrentQueue <T> kennt und gerade Ihr 'neues' Schlüsselwort umgangen hat?
Mhand
6
@mhand Wenn 'jemand' das tun wollte; Dann hätten sie sich entschieden, zunächst ein ConcurrentQueue <T> -Objekt zu verwenden ... Dies ist eine benutzerdefinierte Speicherklasse. Niemand möchte, dass dies an das .NET-Framework gesendet wird. Sie haben versucht, ein Problem zu schaffen.
Dave Lawrence
9
Mein Punkt ist, anstatt zu unterklassifizieren, vielleicht sollten Sie einfach die Warteschlange umbrechen ... dies erzwingt in allen Fällen das gewünschte Verhalten. Da es sich um eine benutzerdefinierte Speicherklasse handelt, machen wir sie vollständig benutzerdefiniert und legen nur die Operationen offen, die wir benötigen. Unterklassen sind hier meiner Meinung nach das falsche Werkzeug.
Mhand
3
@mhand Ja, ich verstehe, was Sie sagen. Ich könnte eine Warteschlange umschließen und den Enumerator der Warteschlange verfügbar machen, um Linq-Erweiterungen zu verwenden.
Dave Lawrence
1
Ich stimme @mhand zu, dass Sie ConcurrentQueue nicht erben sollten, da die Enqueue-Methode nicht virtuell ist. Sie sollten die Warteschlange als Proxy verwenden und bei Bedarf die gesamte Schnittstelle implementieren.
Chris Marisic
29

Für alle, die es nützlich finden, gibt es hier einen Arbeitscode, der auf der obigen Antwort von Richard Schneider basiert:

public class FixedSizedQueue<T>
{
    readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public int Size { get; private set; }

    public FixedSizedQueue(int size)
    {
        Size = size;
    }

    public void Enqueue(T obj)
    {
        queue.Enqueue(obj);

        while (queue.Count > Size)
        {
            T outObj;
            queue.TryDequeue(out outObj);
        }
    }
}
Tod Thomson
quelle
1
Das Abstimmen aus den genannten Gründen (Sperren bei Verwendung einer ConcurrentQueue ist schlecht) sowie das Nichtimplementieren einer der erforderlichen Schnittstellen, damit dies eine echte Sammlung ist.
Josh
11

Hier ist ein leichter kreisförmiger Puffer mit einigen Methoden, die für eine sichere und unsichere Verwendung gekennzeichnet sind.

public class CircularBuffer<T> : IEnumerable<T>
{
    readonly int size;
    readonly object locker;

    int count;
    int head;
    int rear;
    T[] values;

    public CircularBuffer(int max)
    {
        this.size = max;
        locker = new object();
        count = 0;
        head = 0;
        rear = 0;
        values = new T[size];
    }

    static int Incr(int index, int size)
    {
        return (index + 1) % size;
    }

    private void UnsafeEnsureQueueNotEmpty()
    {
        if (count == 0)
            throw new Exception("Empty queue");
    }

    public int Size { get { return size; } }
    public object SyncRoot { get { return locker; } }

    #region Count

    public int Count { get { return UnsafeCount; } }
    public int SafeCount { get { lock (locker) { return UnsafeCount; } } }
    public int UnsafeCount { get { return count; } }

    #endregion

    #region Enqueue

    public void Enqueue(T obj)
    {
        UnsafeEnqueue(obj);
    }

    public void SafeEnqueue(T obj)
    {
        lock (locker) { UnsafeEnqueue(obj); }
    }

    public void UnsafeEnqueue(T obj)
    {
        values[rear] = obj;

        if (Count == Size)
            head = Incr(head, Size);
        rear = Incr(rear, Size);
        count = Math.Min(count + 1, Size);
    }

    #endregion

    #region Dequeue

    public T Dequeue()
    {
        return UnsafeDequeue();
    }

    public T SafeDequeue()
    {
        lock (locker) { return UnsafeDequeue(); }
    }

    public T UnsafeDequeue()
    {
        UnsafeEnsureQueueNotEmpty();

        T res = values[head];
        values[head] = default(T);
        head = Incr(head, Size);
        count--;

        return res;
    }

    #endregion

    #region Peek

    public T Peek()
    {
        return UnsafePeek();
    }

    public T SafePeek()
    {
        lock (locker) { return UnsafePeek(); }
    }

    public T UnsafePeek()
    {
        UnsafeEnsureQueueNotEmpty();

        return values[head];
    }

    #endregion


    #region GetEnumerator

    public IEnumerator<T> GetEnumerator()
    {
        return UnsafeGetEnumerator();
    }

    public IEnumerator<T> SafeGetEnumerator()
    {
        lock (locker)
        {
            List<T> res = new List<T>(count);
            var enumerator = UnsafeGetEnumerator();
            while (enumerator.MoveNext())
                res.Add(enumerator.Current);
            return res.GetEnumerator();
        }
    }

    public IEnumerator<T> UnsafeGetEnumerator()
    {
        int index = head;
        for (int i = 0; i < count; i++)
        {
            yield return values[index];
            index = Incr(index, size);
        }
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }

    #endregion
}

Ich benutze gerne die Foo()/SafeFoo()/UnsafeFoo()Konvention:

  • FooMethodenaufruf UnsafeFooals Standard.
  • UnsafeFoo Methoden ändern den Status frei ohne Sperre. Sie sollten nur andere unsichere Methoden aufrufen.
  • SafeFooMethoden rufen UnsafeFooMethoden innerhalb einer Sperre auf.

Es ist ein wenig ausführlich, aber es macht offensichtliche Fehler, wie das Aufrufen unsicherer Methoden außerhalb einer Sperre in einer Methode, die threadsicher sein soll, offensichtlicher.

Julia
quelle
5

Hier ist meine Einstellung zur Warteschlange mit fester Größe

Es wird eine reguläre Warteschlange verwendet, um den Synchronisierungsaufwand zu vermeiden, wenn die CountEigenschaft für verwendet wird ConcurrentQueue. Es wird auch implementiert, IReadOnlyCollectiondamit LINQ-Methoden verwendet werden können. Der Rest ist den anderen Antworten hier sehr ähnlich.

[Serializable]
[DebuggerDisplay("Count = {" + nameof(Count) + "}, Limit = {" + nameof(Limit) + "}")]
public class FixedSizedQueue<T> : IReadOnlyCollection<T>
{
    private readonly Queue<T> _queue = new Queue<T>();
    private readonly object _lock = new object();

    public int Count { get { lock (_lock) { return _queue.Count; } } }
    public int Limit { get; }

    public FixedSizedQueue(int limit)
    {
        if (limit < 1)
            throw new ArgumentOutOfRangeException(nameof(limit));

        Limit = limit;
    }

    public FixedSizedQueue(IEnumerable<T> collection)
    {
        if (collection is null || !collection.Any())
           throw new ArgumentException("Can not initialize the Queue with a null or empty collection", nameof(collection));

        _queue = new Queue<T>(collection);
        Limit = _queue.Count;
    }

    public void Enqueue(T obj)
    {
        lock (_lock)
        {
            _queue.Enqueue(obj);

            while (_queue.Count > Limit)
                _queue.Dequeue();
        }
    }

    public void Clear()
    {
        lock (_lock)
            _queue.Clear();
    }

    public IEnumerator<T> GetEnumerator()
    {
        lock (_lock)
            return new List<T>(_queue).GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}
Ali Zahid
quelle
3

Nur zum Spaß, hier ist eine weitere Implementierung, von der ich glaube, dass sie die meisten Bedenken der Kommentatoren berücksichtigt. Insbesondere wird die Thread-Sicherheit ohne Sperren erreicht und die Implementierung wird von der Wrapping-Klasse ausgeblendet.

public class FixedSizeQueue<T> : IReadOnlyCollection<T>
{
  private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
  private int _count;

  public int Limit { get; private set; }

  public FixedSizeQueue(int limit)
  {
    this.Limit = limit;
  }

  public void Enqueue(T obj)
  {
    _queue.Enqueue(obj);
    Interlocked.Increment(ref _count);

    // Calculate the number of items to be removed by this thread in a thread safe manner
    int currentCount;
    int finalCount;
    do
    {
      currentCount = _count;
      finalCount = Math.Min(currentCount, this.Limit);
    } while (currentCount != 
      Interlocked.CompareExchange(ref _count, finalCount, currentCount));

    T overflow;
    while (currentCount > finalCount && _queue.TryDequeue(out overflow))
      currentCount--;
  }

  public int Count
  {
    get { return _count; }
  }

  public IEnumerator<T> GetEnumerator()
  {
    return _queue.GetEnumerator();
  }

  System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
  {
    return _queue.GetEnumerator();
  }
}
erdomke
quelle
1
Dies ist fehlerhaft, wenn es gleichzeitig verwendet wird. Was passiert, wenn ein Thread nach dem Aufruf, _queue.Enqueue(obj)aber vorher Interlocked.Increment(ref _count), vorbelegt wird und der andere Thread aufruft .Count? Es würde eine falsche Zählung bekommen. Ich habe nicht nach den anderen Problemen gesucht.
KFL
3

Meine Version ist nur eine Unterklasse der normalen Queue. Nichts Besonderes, aber alle Teilnehmer zu sehen, und es passt immer noch zum Titel des Themas, den ich genauso gut hier einfügen könnte. Es gibt auch die in die Warteschlange gestellten für alle Fälle zurück.

public sealed class SizedQueue<T> : Queue<T>
{
    public int FixedCapacity { get; }
    public SizedQueue(int fixedCapacity)
    {
        this.FixedCapacity = fixedCapacity;
    }

    /// <summary>
    /// If the total number of item exceed the capacity, the oldest ones automatically dequeues.
    /// </summary>
    /// <returns>The dequeued value, if any.</returns>
    public new T Enqueue(T item)
    {
        base.Enqueue(item);
        if (base.Count > FixedCapacity)
        {
            return base.Dequeue();
        }
        return default;
    }
}
5argon
quelle
2

Fügen wir noch eine Antwort hinzu. Warum das über andere?

1) Einfachheit. Der Versuch, die Größe zu garantieren, ist gut und schön, führt jedoch zu unnötiger Komplexität, die ihre eigenen Probleme aufweisen kann.

2) Implementiert IReadOnlyCollection, dh Sie können Linq darauf verwenden und es an eine Vielzahl von Dingen übergeben, die IEnumerable erwarten.

3) Keine Verriegelung. Viele der oben genannten Lösungen verwenden Sperren, was bei einer Sammlung ohne Sperren falsch ist.

4) Implementiert die gleichen Methoden, Eigenschaften und Schnittstellen wie ConcurrentQueue, einschließlich IProducerConsumerCollection. Dies ist wichtig, wenn Sie die Auflistung mit BlockingCollection verwenden möchten.

Diese Implementierung könnte möglicherweise zu mehr Einträgen führen als erwartet, wenn TryDequeue fehlschlägt. Die Häufigkeit dieses Auftretens scheint jedoch keinen speziellen Code wert zu sein, der die Leistung zwangsläufig beeinträchtigt und eigene unerwartete Probleme verursacht.

Wenn Sie unbedingt eine Größe garantieren möchten, ist die Implementierung einer Prune () - oder ähnlichen Methode die beste Idee. Sie können eine ReaderWriterLockSlim-Lesesperre in den anderen Methoden (einschließlich TryDequeue) verwenden und nur beim Bereinigen eine Schreibsperre verwenden.

class ConcurrentFixedSizeQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>, ICollection {
    readonly ConcurrentQueue<T> m_concurrentQueue;
    readonly int m_maxSize;

    public int Count => m_concurrentQueue.Count;
    public bool IsEmpty => m_concurrentQueue.IsEmpty;

    public ConcurrentFixedSizeQueue (int maxSize) : this(Array.Empty<T>(), maxSize) { }

    public ConcurrentFixedSizeQueue (IEnumerable<T> initialCollection, int maxSize) {
        if (initialCollection == null) {
            throw new ArgumentNullException(nameof(initialCollection));
        }

        m_concurrentQueue = new ConcurrentQueue<T>(initialCollection);
        m_maxSize = maxSize;
    }

    public void Enqueue (T item) {
        m_concurrentQueue.Enqueue(item);

        if (m_concurrentQueue.Count > m_maxSize) {
            T result;
            m_concurrentQueue.TryDequeue(out result);
        }
    }

    public void TryPeek (out T result) => m_concurrentQueue.TryPeek(out result);
    public bool TryDequeue (out T result) => m_concurrentQueue.TryDequeue(out result);

    public void CopyTo (T[] array, int index) => m_concurrentQueue.CopyTo(array, index);
    public T[] ToArray () => m_concurrentQueue.ToArray();

    public IEnumerator<T> GetEnumerator () => m_concurrentQueue.GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator () => GetEnumerator();

    // Explicit ICollection implementations.
    void ICollection.CopyTo (Array array, int index) => ((ICollection)m_concurrentQueue).CopyTo(array, index);
    object ICollection.SyncRoot => ((ICollection) m_concurrentQueue).SyncRoot;
    bool ICollection.IsSynchronized => ((ICollection) m_concurrentQueue).IsSynchronized;

    // Explicit IProducerConsumerCollection<T> implementations.
    bool IProducerConsumerCollection<T>.TryAdd (T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryAdd(item);
    bool IProducerConsumerCollection<T>.TryTake (out T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryTake(out item);

    public override int GetHashCode () => m_concurrentQueue.GetHashCode();
    public override bool Equals (object obj) => m_concurrentQueue.Equals(obj);
    public override string ToString () => m_concurrentQueue.ToString();
}
Josh
quelle
1

Für Ihr Codierungsvergnügen übermittle ich Ihnen das ' ConcurrentDeck'

public class ConcurrentDeck<T>
{
   private readonly int _size;
   private readonly T[] _buffer;
   private int _position = 0;

   public ConcurrentDeck(int size)
   {
       _size = size;
       _buffer = new T[size];
   }

   public void Push(T item)
   {
       lock (this)
       {
           _buffer[_position] = item;
           _position++;
           if (_position == _size) _position = 0;
       }
   }

   public T[] ReadDeck()
   {
       lock (this)
       {
           return _buffer.Skip(_position).Union(_buffer.Take(_position)).ToArray();
       }
   }
}

Anwendungsbeispiel:

void Main()
{
    var deck = new ConcurrentDeck<Tuple<string,DateTime>>(25);
    var handle = new ManualResetEventSlim();
    var task1 = Task.Factory.StartNew(()=>{
    var timer = new System.Timers.Timer();
    timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task1",DateTime.Now));};
    timer.Interval = System.TimeSpan.FromSeconds(1).TotalMilliseconds;
    timer.Enabled = true;
    handle.Wait();
    }); 
    var task2 = Task.Factory.StartNew(()=>{
    var timer = new System.Timers.Timer();
    timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task2",DateTime.Now));};
    timer.Interval = System.TimeSpan.FromSeconds(.5).TotalMilliseconds;
    timer.Enabled = true;
    handle.Wait();
    }); 
    var task3 = Task.Factory.StartNew(()=>{
    var timer = new System.Timers.Timer();
    timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task3",DateTime.Now));};
    timer.Interval = System.TimeSpan.FromSeconds(.25).TotalMilliseconds;
    timer.Enabled = true;
    handle.Wait();
    }); 
    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(10));
    handle.Set();
    var outputtime = DateTime.Now;
    deck.ReadDeck().Select(d => new {Message = d.Item1, MilliDiff = (outputtime - d.Item2).TotalMilliseconds}).Dump(true);
}
Chris Hayes
quelle
1
Ich mag diese Implementierung, aber beachte, dass wenn keine hinzugefügt wurde, sie Standard (T) zurückgibt
Daniel Leach
Wenn Sie lock auf diese Weise verwenden, sollten Sie ReaderWriterLockSlim verwenden, um Ihre Leser zu priorisieren.
Josh
1

Nun, es hängt von der Verwendung ab, bei der ich festgestellt habe, dass einige der oben genannten Lösungen die Größe überschreiten können, wenn sie in einer Umgebung mit mehreren Threads verwendet werden. Mein Anwendungsfall war jedenfalls, die letzten 5 Ereignisse anzuzeigen, und es gibt mehrere Threads, die Ereignisse in die Warteschlange schreiben, und einen anderen Thread, der daraus liest und in einem Winform-Steuerelement anzeigt. Das war also meine Lösung.

BEARBEITEN: Da wir in unserer Implementierung bereits Sperren verwenden, benötigen wir ConcurrentQueue nicht wirklich. Dies kann die Leistung verbessern.

class FixedSizedConcurrentQueue<T> 
{
    readonly Queue<T> queue = new Queue<T>();
    readonly object syncObject = new object();

    public int MaxSize { get; private set; }

    public FixedSizedConcurrentQueue(int maxSize)
    {
        MaxSize = maxSize;
    }

    public void Enqueue(T obj)
    {
        lock (syncObject)
        {
            queue.Enqueue(obj);
            while (queue.Count > MaxSize)
            {
                queue.Dequeue();
            }
        }
    }

    public T[] ToArray()
    {
        T[] result = null;
        lock (syncObject)
        {
            result = queue.ToArray();
        }

        return result;
    }

    public void Clear()
    {
        lock (syncObject)
        {
            queue.Clear();
        }
    }
}

BEARBEITEN: Wir brauchen das syncObjectobige Beispiel nicht wirklich und können lieber ein queueObjekt verwenden, da wir queuein keiner Funktion neu initialisieren und es readonlysowieso markiert ist.

Mubashar
quelle
1

Nur weil es noch niemand gesagt hat .. Sie können a verwenden LinkedList<T>und die Thread-Sicherheit hinzufügen:

public class Buffer<T> : LinkedList<T>
{
    private int capacity;

    public Buffer(int capacity)
    {
        this.capacity = capacity;   
    }

    public void Enqueue(T item)
    {
        // todo: add synchronization mechanism
        if (Count == capacity) RemoveLast();
        AddFirst(item);
    }

    public T Dequeue()
    {
        // todo: add synchronization mechanism
        var last = Last.Value;
        RemoveLast();
        return last;
    }
}

Zu beachten ist, dass die Standardreihenfolge für die Aufzählung in diesem Beispiel LIFO ist. Das kann aber bei Bedarf überschrieben werden.

Brandon
quelle
0

Die akzeptierte Antwort wird vermeidbare Nebenwirkungen haben.

Feinkörnige Verriegelung und verriegelungsfreie Mechanismen

Die folgenden Links sind Referenzen, die ich verwendet habe, als ich mein Beispiel unten geschrieben habe.

Die Dokumentation von Microsoft ist zwar etwas irreführend, da sie eine Sperre verwenden, sie sperren jedoch die Segmentklassen. Die Segmentklassen selbst verwenden Interlocked.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;

namespace Lib.Core
{
    // Sources: 
    // https://docs.microsoft.com/en-us/dotnet/standard/collections/thread-safe/
    // https://docs.microsoft.com/en-us/dotnet/api/system.threading.interlocked?view=netcore-3.1
    // https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueue.cs
    // https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueueSegment.cs

    /// <summary>
    /// Concurrent safe circular buffer that will used a fixed capacity specified and resuse slots as it goes.
    /// </summary>
    /// <typeparam name="TObject">The object that you want to go into the slots.</typeparam>
    public class ConcurrentCircularBuffer<TObject>
    {
        private readonly ConcurrentQueue<TObject> _queue;

        public int Capacity { get; private set; }

        public ConcurrentCircularBuffer(int capacity)
        {
            if(capacity <= 0)
            {
                throw new ArgumentException($"The capacity specified '{capacity}' is not valid.", nameof(capacity));
            }

            // Setup the queue to the initial capacity using List's underlying implementation.
            _queue = new ConcurrentQueue<TObject>(new List<TObject>(capacity));

            Capacity = capacity;
        }

        public void Enqueue(TObject @object)
        {
            // Enforce the capacity first so the head can be used instead of the entire segment (slow).
            while (_queue.Count + 1 > Capacity)
            {
                if (!_queue.TryDequeue(out _))
                {
                    // Handle error condition however you want to ie throw, return validation object, etc.
                    var ex = new Exception("Concurrent Dequeue operation failed.");
                    ex.Data.Add("EnqueueObject", @object);
                    throw ex;
                }
            }

            // Place the item into the queue
            _queue.Enqueue(@object);
        }

        public TObject Dequeue()
        {
            if(_queue.TryDequeue(out var result))
            {
                return result;
            }

            return default;
        }
    }
}
jjhayter
quelle
0

Hier ist eine weitere Implementierung, die die zugrunde liegende ConcurrentQueue so oft wie möglich verwendet und gleichzeitig dieselben Schnittstellen bereitstellt, die über ConcurrentQueue verfügbar gemacht wurden.

/// <summary>
/// This is a FIFO concurrent queue that will remove the oldest added items when a given limit is reached.
/// </summary>
/// <typeparam name="TValue"></typeparam>
public class FixedSizedConcurrentQueue<TValue> : IProducerConsumerCollection<TValue>, IReadOnlyCollection<TValue>
{
    private readonly ConcurrentQueue<TValue> _queue;

    private readonly object _syncObject = new object();

    public int LimitSize { get; }

    public FixedSizedConcurrentQueue(int limit)
    {
        _queue = new ConcurrentQueue<TValue>();
        LimitSize = limit;
    }

    public FixedSizedConcurrentQueue(int limit, System.Collections.Generic.IEnumerable<TValue> collection)
    {
        _queue = new ConcurrentQueue<TValue>(collection);
        LimitSize = limit;

    }

    public int Count => _queue.Count;

    bool ICollection.IsSynchronized => ((ICollection) _queue).IsSynchronized;

    object ICollection.SyncRoot => ((ICollection)_queue).SyncRoot; 

    public bool IsEmpty => _queue.IsEmpty;

    // Not supported until .NET Standard 2.1
    //public void Clear() => _queue.Clear();

    public void CopyTo(TValue[] array, int index) => _queue.CopyTo(array, index);

    void ICollection.CopyTo(Array array, int index) => ((ICollection)_queue).CopyTo(array, index);

    public void Enqueue(TValue obj)
    {
        _queue.Enqueue(obj);
        lock( _syncObject )
        {
            while( _queue.Count > LimitSize ) {
                _queue.TryDequeue(out _);
            }
        }
    }

    public IEnumerator<TValue> GetEnumerator() => _queue.GetEnumerator();

    IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable<TValue>)this).GetEnumerator();

    public TValue[] ToArray() => _queue.ToArray();

    public bool TryAdd(TValue item)
    {
        Enqueue(item);
        return true;
    }

    bool IProducerConsumerCollection<TValue>.TryTake(out TValue item) => TryDequeue(out item);

    public bool TryDequeue(out TValue result) => _queue.TryDequeue(out result);

    public bool TryPeek(out TValue result) => _queue.TryPeek(out result);

}
Tod Cunningham
quelle
-1

Dies ist meine Version der Warteschlange:

public class FixedSizedQueue<T> {
  private object LOCK = new object();
  ConcurrentQueue<T> queue;

  public int MaxSize { get; set; }

  public FixedSizedQueue(int maxSize, IEnumerable<T> items = null) {
     this.MaxSize = maxSize;
     if (items == null) {
        queue = new ConcurrentQueue<T>();
     }
     else {
        queue = new ConcurrentQueue<T>(items);
        EnsureLimitConstraint();
     }
  }

  public void Enqueue(T obj) {
     queue.Enqueue(obj);
     EnsureLimitConstraint();
  }

  private void EnsureLimitConstraint() {
     if (queue.Count > MaxSize) {
        lock (LOCK) {
           T overflow;
           while (queue.Count > MaxSize) {
              queue.TryDequeue(out overflow);
           }
        }
     }
  }


  /// <summary>
  /// returns the current snapshot of the queue
  /// </summary>
  /// <returns></returns>
  public T[] GetSnapshot() {
     return queue.ToArray();
  }
}

Ich finde es nützlich, einen Konstruktor zu haben, der auf einem IEnumerable basiert, und ich finde es nützlich, einen GetSnapshot zu haben, um eine Multithread-Sicherheitsliste (in diesem Fall ein Array) der Elemente zum Zeitpunkt des Aufrufs zu haben, die nicht ansteigt Fehler, wenn sich die zugrunde liegende Sammlung ändert.

Die doppelte Zählprüfung soll unter bestimmten Umständen die Sperre verhindern.

Nicht wichtig
quelle
1
Abstimmung zum Sperren der Warteschlange. Wenn Sie unbedingt sperren möchten, ist ein ReaderWriterLockSlim am besten geeignet (vorausgesetzt, Sie erwarten, dass Sie häufiger eine Lesesperre als eine Schreibsperre verwenden). GetSnapshot wird ebenfalls nicht benötigt. Wenn Sie IReadOnlyCollection <T> implementieren (was Sie für die IEnumerable-Semantik sollten), hat ToList () dieselbe Funktion.
Josh
Die ConcurrentQueue behandelt die Sperren in ihrer Implementierung, siehe die Links in meiner Antwort.
jjhayter