Ich verwende Confluent.Kafka .NET Client Version 1.3.0. Ich folge den Dokumenten :
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "server1, server2",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
EnableAutoOffsetStore = false,
GroupId = this.groupId,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = this.kafkaUsername,
SaslPassword = this.kafkaPassword,
};
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
var cancellationToken = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancellationToken.Cancel();
};
consumer.Subscribe("my-topic");
while (true)
{
try
{
var consumerResult = consumer.Consume();
// process message
consumer.StoreOffset(consumerResult);
}
catch (ConsumeException e)
{
// log
}
catch (KafkaException e)
{
// log
}
catch (OperationCanceledException e)
{
// log
}
}
}
Das Problem ist, dass selbst wenn ich die Zeile consumer.StoreOffset(consumerResult);
auskommentiere, ich beim nächsten Konsum immer wieder die nächste nicht verbrauchte Nachricht erhalte , dh der Offset steigt weiter an, was nicht so zu sein scheint, wie es die Dokumentation behauptet, dh mindestens eine Zustellung .
Auch wenn ich Satz EnableAutoCommit = false
und entfernen ‚EnableAutoOffsetStore = false‘ aus der Config, und ersetzen consumer.StoreOffset(consumerResult)
mit consumer.Commit()
, ich sehe immer noch das gleiche Verhalten, das heißt , auch wenn ich den Kommentar aus Commit
, halte ich noch die nächsten unverbrauchten Nachrichten bekommen.
Ich habe das Gefühl, dass mir hier etwas Grundlegendes fehlt, kann aber nicht herausfinden, was. Jede Hilfe wird geschätzt!
EnableAutoCommit
ist. Nehmen wir anEnableAutoCommit = false
, wir haben es , und wenn ich esConsume
bekomme, erhalte ich die Nachricht mit Offset 11 zurück. Ich hatte erwartet, dass ich immer wieder dieselbe Nachricht mit Offset 11 erhalte, wenn die Verarbeitung der Nachricht weiter geworfen wird und daher kein AufrufCommit
erfolgt.Consume
) verwendet werden soll,Commit
nachdem Sie bereits einSubscribe
Thema erstellt haben. Kafka (wie in client lib) hinter den Kulissen behält alle Offsets bei, die an die App in der gesendet wurden,Consume
und sendet sie linear. Um eine Nachricht wie in einem Fehlerszenario erneut zu verarbeiten, müssen Sie sie in Ihrem Code verfolgen und versuchen, die Nachricht zu versetzen und zu verarbeiten. Außerdem sollten Sie wissen, was zu überspringen ist, wenn sie bereits in früheren Anforderungen verarbeitet wurde. Ich bin nicht mit der .net-Bibliothek vertraut, aber es sollte eigentlich keine Rolle spielen, da dies Kafka-Design ist.Antworten:
Entschuldigung, ich kann noch keinen Kommentar hinzufügen. Der Kafka-Konsument verbraucht Nachrichten in Stapeln. Vielleicht durchlaufen Sie den Stapel immer noch, der vom Hintergrundthread vorab abgerufen wurde .
Mit kafka util können Sie überprüfen, ob Ihr Verbraucher tatsächlich einen Offset festlegt oder nicht
kafka-consumer-groups.sh
quelle
Möglicherweise möchten Sie eine Wiederholungslogik für die Verarbeitung jeder Ihrer Nachrichten für eine festgelegte Anzahl von Malen haben, z. B. 5. Wenn dies während dieser 5 Wiederholungsversuche nicht erfolgreich ist, möchten Sie diese Nachricht möglicherweise einem anderen Thema hinzufügen, um alle zu behandeln Fehlgeschlagene Nachrichten, die Vorrang vor Ihrem eigentlichen Thema haben. Oder Sie möchten die fehlgeschlagene Nachricht zum selben Thema hinzufügen, damit sie später abgerufen wird, sobald alle anderen Nachrichten verbraucht sind.
Wenn die Verarbeitung einer Nachricht innerhalb dieser 5 Wiederholungsversuche erfolgreich ist, können Sie zur nächsten Nachricht in der Warteschlange springen.
quelle