Verbrauchen Sie dieselbe Nachricht erneut, wenn die Verarbeitung der Nachricht fehlschlägt

10

Ich verwende Confluent.Kafka .NET Client Version 1.3.0. Ich folge den Dokumenten :

var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "server1, server2",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnableAutoCommit = true,
    EnableAutoOffsetStore = false,
    GroupId = this.groupId,
    SecurityProtocol = SecurityProtocol.SaslPlaintext,
    SaslMechanism = SaslMechanism.Plain,
    SaslUsername = this.kafkaUsername,
    SaslPassword = this.kafkaPassword,
};

using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
    var cancellationToken = new CancellationTokenSource();
    Console.CancelKeyPress += (_, e) =>
    {
        e.Cancel = true;
        cancellationToken.Cancel();
    };

    consumer.Subscribe("my-topic");
    while (true)
    {
        try
        {
            var consumerResult = consumer.Consume();
            // process message
            consumer.StoreOffset(consumerResult);
        }
        catch (ConsumeException e)
        {
            // log
        }
        catch (KafkaException e)
        {
            // log
        }
        catch (OperationCanceledException e)
        {
            // log
        }
    }
}

Das Problem ist, dass selbst wenn ich die Zeile consumer.StoreOffset(consumerResult);auskommentiere, ich beim nächsten Konsum immer wieder die nächste nicht verbrauchte Nachricht erhalte , dh der Offset steigt weiter an, was nicht so zu sein scheint, wie es die Dokumentation behauptet, dh mindestens eine Zustellung .

Auch wenn ich Satz EnableAutoCommit = falseund entfernen ‚EnableAutoOffsetStore = false‘ aus der Config, und ersetzen consumer.StoreOffset(consumerResult)mit consumer.Commit(), ich sehe immer noch das gleiche Verhalten, das heißt , auch wenn ich den Kommentar aus Commit, halte ich noch die nächsten unverbrauchten Nachrichten bekommen.

Ich habe das Gefühl, dass mir hier etwas Grundlegendes fehlt, kann aber nicht herausfinden, was. Jede Hilfe wird geschätzt!

havij
quelle
Die Nachrichten wurden vom kafka-Standpunkt aus bereits an die Anwendung zurückgegeben. Wenn Sie sie festschreiben, werden sie als zuletzt festgeschriebene Offsets gespeichert, aber verbrauchen weiterhin die nächsten Nachrichten, unabhängig davon, ob Sie sie verbraucht haben oder nicht. Was erwarten Sie hier? Könnten Sie bitte näher erläutern, was Sie vor / nach dem Festschreiben und Konsumieren erwarten?
Sagar Veeram
Nachrichten werden erst wieder abgerufen, wenn Sie den Suchversatz verwenden. Dies wirkt sich auf den Verbrauch aus und Nachrichten werden vom Suchoffset zurückgegeben.
Sagar Veeram
@ user2683814 In meinem Beitrag habe ich zwei Szenarien erwähnt, je nachdem, was eingestellt EnableAutoCommitist. Nehmen wir an EnableAutoCommit = false, wir haben es , und wenn ich es Consumebekomme, erhalte ich die Nachricht mit Offset 11 zurück. Ich hatte erwartet, dass ich immer wieder dieselbe Nachricht mit Offset 11 erhalte, wenn die Verarbeitung der Nachricht weiter geworfen wird und daher kein Aufruf Commiterfolgt.
Havij
Nein, das ist nicht der Fall. Sie können nicht steuern, was poll ( Consume) verwendet werden soll, Commitnachdem Sie bereits ein SubscribeThema erstellt haben. Kafka (wie in client lib) hinter den Kulissen behält alle Offsets bei, die an die App in der gesendet wurden, Consumeund sendet sie linear. Um eine Nachricht wie in einem Fehlerszenario erneut zu verarbeiten, müssen Sie sie in Ihrem Code verfolgen und versuchen, die Nachricht zu versetzen und zu verarbeiten. Außerdem sollten Sie wissen, was zu überspringen ist, wenn sie bereits in früheren Anforderungen verarbeitet wurde. Ich bin nicht mit der .net-Bibliothek vertraut, aber es sollte eigentlich keine Rolle spielen, da dies Kafka-Design ist.
Sagar Veeram
Ich denke, Sie müssen eine Kombination aus Abonnieren und Zuweisen verwenden und benötigen möglicherweise unterschiedliche Verbraucher, um Ihren Anwendungsfall zu unterstützen. Bei Fehlern verwenden Sie "Zuweisen / Suchen" zum Versetzen von Themenpartitionen mit einem Verbraucher, um Nachrichten erneut zu verarbeiten, und für die normale Verarbeitung einen anderen Verbraucher mit Abonnement- / Verbrauchs- / Festschreibungsfluss.
Sagar Veeram

Antworten:

0

Möglicherweise möchten Sie eine Wiederholungslogik für die Verarbeitung jeder Ihrer Nachrichten für eine festgelegte Anzahl von Malen haben, z. B. 5. Wenn dies während dieser 5 Wiederholungsversuche nicht erfolgreich ist, möchten Sie diese Nachricht möglicherweise einem anderen Thema hinzufügen, um alle zu behandeln Fehlgeschlagene Nachrichten, die Vorrang vor Ihrem eigentlichen Thema haben. Oder Sie möchten die fehlgeschlagene Nachricht zum selben Thema hinzufügen, damit sie später abgerufen wird, sobald alle anderen Nachrichten verbraucht sind.

Wenn die Verarbeitung einer Nachricht innerhalb dieser 5 Wiederholungsversuche erfolgreich ist, können Sie zur nächsten Nachricht in der Warteschlange springen.

Raju Dasupally
quelle