Was ist ein guter Algorithmus zur Ratenbegrenzung?

155

Ich könnte einen Pseudocode oder besser Python verwenden. Ich versuche, eine Warteschlange zur Ratenbegrenzung für einen Python-IRC-Bot zu implementieren, die teilweise funktioniert. Wenn jedoch jemand weniger Nachrichten als das Limit auslöst (z. B. beträgt das Ratenlimit 5 Nachrichten pro 8 Sekunden und die Person nur 4). und der nächste Auslöser ist über die 8 Sekunden (z. B. 16 Sekunden später), der Bot sendet die Nachricht, aber die Warteschlange wird voll und der Bot wartet 8 Sekunden, obwohl er nicht benötigt wird, da die 8-Sekunden-Periode abgelaufen ist.

Miniman
quelle

Antworten:

230

Hier der einfachste Algorithmus , wenn Sie Nachrichten einfach löschen möchten, wenn sie zu schnell eintreffen (anstatt sie in die Warteschlange zu stellen, was sinnvoll ist, da die Warteschlange möglicherweise beliebig groß wird):

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

Diese Lösung enthält keine Datenstrukturen, Timer usw. und funktioniert einwandfrei :) Um dies zu sehen, wächst die Zulage mit einer Geschwindigkeit von höchstens 5/8 Einheiten pro Sekunde, dh höchstens fünf Einheiten pro acht Sekunden. Jede weitergeleitete Nachricht zieht eine Einheit ab, sodass Sie nicht mehr als fünf Nachrichten pro acht Sekunden senden können.

Beachten Sie, dass ratedies eine Ganzzahl sein sollte, dh ohne einen Dezimalteil ungleich Null , da sonst der Algorithmus nicht richtig funktioniert (die tatsächliche Rate wird es nicht sein rate/per). ZB rate=0.5; per=1.0;funktioniert nicht, weil allowancenie auf 1.0 wachsen wird. Funktioniert aber rate=1.0; per=2.0;gut.

Antti Huima
quelle
4
Es ist auch erwähnenswert, dass die Dimension und der Maßstab von 'time_passed' mit 'per' übereinstimmen müssen, z. B. Sekunden.
Skaffman
2
Hallo Skaffman, danke für die Komplimente --- Ich habe es aus meinem Ärmel geworfen, aber mit einer Wahrscheinlichkeit von 99,9% hat jemand früher eine ähnliche Lösung gefunden :)
Antti Huima
51
Das ist ein Standardalgorithmus - es ist ein Token-Bucket ohne Warteschlange. Der Eimer ist allowance. Die Eimergröße ist rate. Die allowance += …Zeile ist eine Optimierung des Hinzufügens eines Tokens pro Rate ÷ pro Sekunde.
Derobert
5
@zwirbeltier Was Sie oben schreiben, ist nicht wahr. 'Zulage' wird immer durch 'Rate' begrenzt (siehe Zeile "// Gas"), so dass zu einem bestimmten Zeitpunkt nur ein Burst von Nachrichten mit genau 'Rate' zulässig ist, dh 5.
Antti Huima
7
Dies ist gut, kann aber die Rate überschreiten. Nehmen wir an, Sie leiten zum Zeitpunkt 0 5 Nachrichten weiter, und zum Zeitpunkt N * (8/5) für N = 1, 2, ... können Sie eine weitere Nachricht senden, was zu mehr als 5 Nachrichten in einem Zeitraum von 8 Sekunden führt
mindvirus
47

Verwenden Sie diesen Dekorator @RateLimited (ratepersec) vor Ihrer Funktion, die in die Warteschlange gestellt wird.

Grundsätzlich wird überprüft, ob seit dem letzten Mal 1 / Rate-Sekunden vergangen sind, und wenn nicht, wird der Rest der Zeit gewartet, andernfalls wird nicht gewartet. Dies begrenzt Sie effektiv auf Rate / Sek. Der Dekorateur kann auf jede Funktion angewendet werden, für die Sie eine Geschwindigkeitsbegrenzung wünschen.

Wenn Sie in Ihrem Fall maximal 5 Nachrichten pro 8 Sekunden wünschen, verwenden Sie @RateLimited (0.625) vor Ihrer sendToQueue-Funktion.

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)
Carlos A. Ibarra
quelle
Ich mag die Idee, einen Dekorateur für diesen Zweck zu verwenden. Warum ist lastTimeCalled eine Liste? Ich bezweifle auch, dass dies funktioniert, wenn mehrere Threads dieselbe RateLimited-Funktion aufrufen ...
Stephan202
8
Es ist eine Liste, weil einfache Typen wie float konstant sind, wenn sie von einem Abschluss erfasst werden. Wenn Sie eine Liste erstellen, ist die Liste konstant, der Inhalt jedoch nicht. Ja, es ist nicht threadsicher, aber das kann leicht mit Schlössern behoben werden.
Carlos A. Ibarra
time.clock()hat nicht genug Auflösung auf meinem System, also habe ich den Code angepasst und auf use geänderttime.time()
mtrbean
3
Zur Ratenbegrenzung möchten Sie definitiv nicht verwenden time.clock(), was die verstrichene CPU-Zeit misst. Die CPU-Zeit kann viel schneller oder langsamer als die "tatsächliche" Zeit laufen. Sie möchten time.time()stattdessen verwenden, was die Wandzeit ("tatsächliche" Zeit) misst.
John Wiseman
1
Übrigens für echte Produktionssysteme: Die Implementierung einer Ratenbegrenzung mit einem sleep () -Aufruf ist möglicherweise keine gute Idee, da dadurch der Thread blockiert und somit verhindert wird, dass ein anderer Client ihn verwendet.
Maresh
28

Ein Token Bucket ist ziemlich einfach zu implementieren.

Beginnen Sie mit einem Eimer mit 5 Token.

Alle 5/8 Sekunden: Wenn der Eimer weniger als 5 Token enthält, fügen Sie einen hinzu.

Jedes Mal, wenn Sie eine Nachricht senden möchten: Wenn der Bucket ≥1 Token hat, nehmen Sie ein Token heraus und senden Sie die Nachricht. Andernfalls warten Sie / lassen Sie die Nachricht fallen / was auch immer.

(Natürlich würden Sie im tatsächlichen Code einen Ganzzahlzähler anstelle von echten Token verwenden und Sie können den Schritt alle 5/8 durch Optimieren von Zeitstempeln optimieren.)


Wenn Sie die Frage noch einmal lesen und alle 8 Sekunden das Ratenlimit vollständig zurückgesetzt haben, finden Sie hier eine Änderung:

Beginnen Sie mit einem Zeitstempel vor last_sendlanger Zeit (z. B. in der Epoche). Beginnen Sie auch mit demselben 5-Token-Bucket.

Schlagen Sie die Regel alle 5/8 Sekunden an.

Jedes Mal, wenn Sie eine Nachricht senden: Überprüfen Sie zunächst, ob vor last_send≥ 8 Sekunden. Wenn ja, füllen Sie den Eimer (stellen Sie ihn auf 5 Token ein). Zweitens, wenn sich Token im Bucket befinden, senden Sie die Nachricht (andernfalls drop / wait / etc.). Drittens last_sendauf jetzt einstellen .

Das sollte für dieses Szenario funktionieren.


Ich habe tatsächlich einen IRC-Bot mit einer Strategie wie dieser geschrieben (der erste Ansatz). Es ist in Perl, nicht in Python, aber hier ist ein Code zur Veranschaulichung:

Der erste Teil behandelt das Hinzufügen von Token zum Eimer. Sie können die Optimierung des Hinzufügens von Token basierend auf der Zeit (2. bis letzte Zeile) sehen und dann klemmt die letzte Zeile den Bucket-Inhalt auf das Maximum (MESSAGE_BURST).

    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$ conn ist eine Datenstruktur, die herumgereicht wird. Dies ist Teil einer Methode, die routinemäßig ausgeführt wird (sie berechnet, wann sie das nächste Mal etwas zu tun hat, und schläft entweder so lange oder bis sie Netzwerkverkehr erhält). Der nächste Teil der Methode behandelt das Senden. Dies ist ziemlich kompliziert, da Nachrichten Prioritäten zugeordnet sind.

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

Das ist die erste Warteschlange, die auf jeden Fall ausgeführt wird. Auch wenn dadurch unsere Verbindung wegen Überschwemmungen zerstört wird. Wird für äußerst wichtige Dinge verwendet, z. B. zum Antworten auf den PING des Servers. Als nächstes die restlichen Warteschlangen:

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

Schließlich wird der Bucket-Status wieder in der $ conn-Datenstruktur gespeichert (tatsächlich etwas später in der Methode; er berechnet zunächst, wie schnell mehr Arbeit vorhanden sein wird).

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

Wie Sie sehen können, ist der tatsächliche Code für die Bucket-Behandlung sehr klein - ungefähr vier Zeilen. Der Rest des Codes ist die Prioritätswarteschlangenbehandlung. Der Bot hat Prioritätswarteschlangen, so dass beispielsweise jemand, der mit ihm chattet, ihn nicht daran hindern kann, seine wichtigen Kick / Ban-Aufgaben zu erfüllen.

derobert
quelle
Vermisse ich etwas ... es sieht so aus, als würde dies Sie alle 8 Sekunden auf 1 Nachricht beschränken, nachdem Sie die ersten 5
chills42
@ chills42: Ja, ich habe die Frage falsch gelesen ... siehe die zweite Hälfte der Antwort.
Derobert
@chills: Wenn last_send <8 Sekunden ist, fügen Sie dem Bucket keine Token hinzu. Wenn Ihr Bucket Token enthält, können Sie die Nachricht senden. Andernfalls können Sie nicht (Sie haben bereits 5 Nachrichten in den letzten 8 Sekunden
gesendet
3
Ich würde es begrüßen, wenn die Leute, die dies ablehnen, erklären würden, warum ... Ich würde gerne alle Probleme beheben, die Sie sehen, aber das ist ohne Feedback schwer zu tun!
Derobert
10

Um die Verarbeitung zu blockieren, bis die Nachricht gesendet werden kann, wodurch weitere Nachrichten in die Warteschlange gestellt werden, kann die schöne Lösung von antti auch folgendermaßen geändert werden:

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

Es wird nur gewartet, bis genügend Berechtigung zum Senden der Nachricht vorhanden ist. Um nicht mit dem Zweifachen des Satzes zu beginnen, kann die Zulage auch mit 0 initialisiert werden.

san
quelle
5
Wenn Sie schlafen (1-allowance) * (per/rate), müssen Sie den gleichen Betrag hinzufügen last_check.
Alp
2

Behalten Sie die Zeit bei, zu der die letzten fünf Zeilen gesendet wurden. Halten Sie die Nachrichten in der Warteschlange so lange, bis die fünftletzte Nachricht (falls vorhanden) mindestens 8 Sekunden in der Vergangenheit liegt (wobei last_five ein Array von Zeiten ist):

now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
    last_five.insert(0, now)
    send_message(msg)
if len(last_five) > 5:
    last_five.pop()
Pesto
quelle
Nicht, seit du es überarbeitet hast, bin ich es nicht.
Pesto
Sie speichern fünf Zeitstempel und verschieben sie wiederholt durch den Speicher (oder führen verknüpfte Listenoperationen aus). Ich speichere einen ganzzahligen Zähler und einen Zeitstempel. Und nur rechnen und zuweisen.
Derobert
2
Abgesehen davon, dass meine besser funktioniert, wenn versucht wird, 5 Zeilen zu senden, aber nur 3 weitere in diesem Zeitraum zulässig sind. Mit freundlichen Grüßen können Sie die ersten drei senden und eine Wartezeit von 8 Sekunden erzwingen, bevor Sie 4 und 5 senden. Meine ermöglicht das Senden von 4 und 5 8 Sekunden nach der viert- und fünftletzten Zeile.
Pesto
1
Zu diesem Thema könnte die Leistung jedoch verbessert werden, indem eine kreisförmig verknüpfte Liste mit der Länge 5 verwendet wird, die auf den fünftletzten Sendevorgang verweist, diesen bei einem neuen Sendevorgang überschreibt und den Zeiger nach vorne bewegt.
Pesto
Für einen IRC-Bot mit einer Geschwindigkeitsbegrenzergeschwindigkeit ist dies kein Problem. Ich bevorzuge die Listenlösung, da sie besser lesbar ist. Die Antwort, die gegeben wurde, ist aufgrund der Überarbeitung verwirrend, aber es ist auch nichts falsch daran.
Jheriko
2

Eine Lösung besteht darin, jedem Warteschlangenelement einen Zeitstempel zuzuweisen und das Element nach Ablauf von 8 Sekunden zu verwerfen. Sie können diese Prüfung jedes Mal durchführen, wenn die Warteschlange hinzugefügt wird.

Dies funktioniert nur, wenn Sie die Warteschlangengröße auf 5 begrenzen und alle Ergänzungen verwerfen, während die Warteschlange voll ist.

Jheriko
quelle
1

Wenn noch jemand interessiert ist, verwende ich diese einfache aufrufbare Klasse in Verbindung mit einem zeitgesteuerten LRU-Schlüsselwertspeicher, um die Anforderungsrate pro IP zu begrenzen. Verwendet eine Deque, kann aber umgeschrieben werden, um stattdessen mit einer Liste verwendet zu werden.

from collections import deque
import time


class RateLimiter:
    def __init__(self, maxRate=5, timeUnit=1):
        self.timeUnit = timeUnit
        self.deque = deque(maxlen=maxRate)

    def __call__(self):
        if self.deque.maxlen == len(self.deque):
            cTime = time.time()
            if cTime - self.deque[0] > self.timeUnit:
                self.deque.append(cTime)
                return False
            else:
                return True
        self.deque.append(time.time())
        return False

r = RateLimiter()
for i in range(0,100):
    time.sleep(0.1)
    print(i, "block" if r() else "pass")
Sanyi
quelle
1

Nur eine Python-Implementierung eines Codes aus einer akzeptierten Antwort.

import time

class Object(object):
    pass

def get_throttler(rate, per):
    scope = Object()
    scope.allowance = rate
    scope.last_check = time.time()
    def throttler(fn):
        current = time.time()
        time_passed = current - scope.last_check;
        scope.last_check = current;
        scope.allowance = scope.allowance + time_passed * (rate / per)
        if (scope.allowance > rate):
          scope.allowance = rate
        if (scope.allowance < 1):
          pass
        else:
          fn()
          scope.allowance = scope.allowance - 1
    return throttler
Hodza
quelle
0

Wie wäre es damit:

long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;

private boolean isRateLimited(int msgs_per_sec) {
    if (System.currentTimeMillis() - check_time > 1000) {
        check_time = System.currentTimeMillis();
        msgs_sent_count = 0;
    }

    if (msgs_sent_count > (msgs_per_sec - 1)) {
        return true;
    } else {
        msgs_sent_count++;
    }

    return false;
}

quelle
0

Ich brauchte eine Variation in Scala. Hier ist es:

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A  B) extends (A  B) {

  import Thread.sleep
  private def now = System.currentTimeMillis / 1000.0
  private val (calls, sec) = callsPerSecond
  private var allowance  = 1.0
  private var last = now

  def apply(a: A): B = {
    synchronized {
      val t = now
      val delta_t = t - last
      last = t
      allowance += delta_t * (calls / sec)
      if (allowance > calls)
        allowance = calls
      if (allowance < 1d) {
        sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
      }
      allowance -= 1
    }
    f(a)
  }

}

So kann es verwendet werden:

val f = Limiter((5d, 8d), { 
  _: Unit  
    println(System.currentTimeMillis) 
})
while(true){f(())}
Landon Kuhn
quelle