Was bestimmt den Kafka-Verbraucherversatz?

169

Ich bin relativ neu in Kafka. Ich habe ein bisschen damit experimentiert, aber ein paar Dinge sind mir in Bezug auf den Verbraucher-Offset unklar. Nach dem, was ich bisher verstanden habe, wird beim Start eines Verbrauchers der Offset, von dem er zu lesen beginnt, durch die Konfigurationseinstellung bestimmt auto.offset.reset(korrigieren Sie mich, wenn ich falsch liege).

Angenommen, das Thema enthält 10 Nachrichten (Offsets 0 bis 9), und ein Verbraucher hat zufällig 5 davon verbraucht, bevor es ausfiel (oder bevor ich den Verbraucher getötet habe). Sagen Sie dann, ich starte diesen Verbraucherprozess neu. Meine Fragen sind:

  1. Wenn das auf gesetzt auto.offset.resetist smallest, wird es immer ab Offset 0 verbrauchen?

  2. Wenn das auf eingestellt auto.offset.resetist largest, wird es ab Offset 5 verbraucht?

  3. Ist das Verhalten in Bezug auf diese Art von Szenario immer deterministisch?

Bitte zögern Sie nicht zu kommentieren, wenn etwas in meiner Frage unklar ist. Danke im Voraus.

Asif Iqbal
quelle

Antworten:

260

Es ist etwas komplexer als Sie beschrieben haben.
Die auto.offset.resetKonfiguration wird NUR aktiviert, wenn für Ihre Verbrauchergruppe kein gültiger Offset festgelegt wurde (2 unterstützte Offset-Speicher sind jetzt Kafka und Zookeeper). Dies hängt auch davon ab, welche Art von Verbraucher Sie verwenden.

Wenn Sie einen Java-Consumer auf hoher Ebene verwenden, stellen Sie sich folgende Szenarien vor:

  1. Sie haben einen Verbraucher in einer Verbrauchergruppe group1, der 5 Nachrichten verbraucht hat und gestorben ist. Wenn Sie diesen Consumer das nächste Mal starten, wird er diese auto.offset.resetKonfiguration nicht einmal verwenden und an der Stelle fortfahren, an der er gestorben ist, da nur der gespeicherte Offset aus dem Offset-Speicher abgerufen wird (Kafka oder ZK, wie bereits erwähnt).

  2. Sie haben Nachrichten in einem Thema (wie Sie beschrieben haben) und starten einen Verbraucher in einer neuen Verbrauchergruppe group2. Es wird nirgendwo ein Offset gespeichert und dieses Mal auto.offset.resetentscheidet die Konfiguration, ob am Anfang des Themas ( earliest) oder am Ende des Themas ( latest) begonnen werden soll.

Eine weitere Sache, die sich auf den Versatzwert earliestund die latestKonfiguration auswirkt, ist die Protokollaufbewahrungsrichtlinie. Stellen Sie sich vor, Sie haben ein Thema mit einer Aufbewahrungsdauer von 1 Stunde. Sie produzieren 5 Nachrichten und eine Stunde später veröffentlichen Sie 5 weitere Nachrichten. Der latestVersatz bleibt weiterhin derselbe wie im vorherigen Beispiel, dies kann jedoch earliestnicht der Fall sein, 0da Kafka diese Nachrichten bereits entfernt und somit der früheste verfügbare Versatz ist 5.

Alles, was oben erwähnt wurde, hat nichts mit zu tun SimpleConsumerund jedes Mal, wenn Sie es ausführen, wird entschieden, wo Sie mit der Verwendung der auto.offset.resetKonfiguration beginnen sollen.

Wenn Sie ältere Kafka - Version als 0.9 verwenden, müssen Sie ersetzen earliest, latestmit smallest, largest.

serejja
quelle
3
Vielen Dank für die Antwort. Wenn also ein Verbraucher auf hoher Ebene etwas begangen hat (entweder in ZK oder Kafka), hat das auto.offset.resetdanach keine Bedeutung mehr? Die einzige Bedeutung dieser Einstellung ist, wenn nichts festgelegt ist (und im Idealfall beim ersten Start des Verbrauchers)?
Asif Iqbal
2
Genau wie Sie beschrieben
Serejja
1
@serejja Hallo - wie wäre es, wenn ich immer 1 Verbraucher pro Gruppe habe und das Szenario Nr. 1 Ihrer Antwort für mich auftritt? Wäre es das gleiche?
ha9u63ar
1
@ ha9u63ar hat deine Frage nicht ganz verstanden. Wenn Sie Ihren Consumer in derselben Gruppe neu starten, wird auto.offset.resetder festgeschriebene Offset nicht verwendet und fortgesetzt. Wenn Sie immer eine andere Verbrauchergruppe verwenden (z. B. beim Starten des Verbrauchers generieren), wird der Verbraucher dies immer respektierenauto.offset.reset
serejja
@serejja ja und das funktioniert bei mir nicht. könnten Sie einen Blick auf nehmen bitte diese - das ist das Problem
ha9u63ar
82

Nur ein Update: Ab Kafka 0.9 verwendet Kafka eine neue Java-Version des Consumer und die Parameternamen auto.offset.reset haben sich geändert. Aus dem Handbuch:

Was tun, wenn in Kafka kein anfänglicher Offset vorhanden ist oder wenn der aktuelle Offset auf dem Server nicht mehr vorhanden ist (z. B. weil diese Daten gelöscht wurden):

frühestens : Setzt den Versatz automatisch auf den frühesten Versatz zurück

Neueste : Setzt den Versatz automatisch auf den neuesten Versatz zurück

none : Ausnahme für den Verbraucher auslösen, wenn für die Verbrauchergruppe kein vorheriger Offset gefunden wurde

alles andere: Ausnahme zum Verbraucher werfen.

Ich habe einige Zeit damit verbracht, dies zu finden, nachdem ich die akzeptierte Antwort überprüft hatte. Daher dachte ich, dass es für die Community nützlich sein könnte, sie zu veröffentlichen.

Israel Zink
quelle
9

Darüber hinaus gibt es offsets.retention.minutes. Wenn die Zeit seit dem letzten Festschreiben> ist offsets.retention.minutes, wird auto.offset.resetauch aktiviert

Sasa Ninkovic
quelle
1
Scheint dies bei der Aufbewahrung von Protokollen nicht redundant zu sein? sollte die ofset-Aufbewahrung auf der Protokollaufbewahrung basieren?
Mike01010
@ mike01010 das stimmt. Es sollte auf der Aufbewahrung von Protokollen basieren, das ist eine der vorgeschlagenen Lösungen im Ticket. Prolong default value of offsets.retention.minutes to be at least twice larger than log.retention.hours. Issues.apache.org/jira/browse/KAFKA-3806
Saheb
Diese Antwort hat mir Angst gemacht für eine Weile, bis ich überprüfen Sie die Dokumentation von offsets.retention.minutes:. <B> Nach einer Verbrauchergruppe alle seine Verbraucher verliert (dh leer wird) seine Offsets für diese Aufbewahrungszeit wird gehalten , bevor sie weggeworfen zu werden </ b> Für Standalone Bei Verbrauchern (bei manueller Zuweisung) laufen die Offsets nach dem Zeitpunkt des letzten Commits zuzüglich dieser Aufbewahrungsfrist ab. (Dies ist für Kafka 2.3)
Jumping_Monkey