Ich benutze Apache Kafka für Messaging. Ich habe den Produzenten und Konsumenten in Java implementiert. Wie können wir die Anzahl der Nachrichten in einem Thema ermitteln?
quelle
Ich benutze Apache Kafka für Messaging. Ich habe den Produzenten und Konsumenten in Java implementiert. Wie können wir die Anzahl der Nachrichten in einem Thema ermitteln?
Der einzige Weg, der aus Verbrauchersicht in den Sinn kommt, besteht darin, die Nachrichten tatsächlich zu konsumieren und sie dann zu zählen.
Der Kafka-Broker stellt JMX-Zähler für die Anzahl der seit dem Start empfangenen Nachrichten bereit, aber Sie können nicht wissen, wie viele davon bereits gelöscht wurden.
In den meisten gängigen Szenarien werden Nachrichten in Kafka am besten als unendlicher Stream angesehen, und es ist nicht relevant, einen diskreten Wert dafür zu erhalten, wie viele Nachrichten derzeit auf der Festplatte gespeichert sind. Darüber hinaus wird es komplizierter, wenn es sich um eine Gruppe von Brokern handelt, die alle eine Teilmenge der Nachrichten in einem Thema enthalten.
Es ist kein Java, kann aber nützlich sein
quelle
bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -1 | awk -F ":" '{sum += $3} END {print sum}' 13818663 bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -2 | awk -F ":" '{sum += $3} END {print sum}' 12434609
Und dann gibt die Differenz tatsächlich ausstehende Nachrichten im Thema zurück? Hab ich recht?Ich verwende dies tatsächlich zum Benchmarking meines POC. Das Element, das Sie ConsumerOffsetChecker verwenden möchten. Sie können es mit dem Bash-Skript wie unten ausführen.
Und unten ist das Ergebnis: Wie Sie auf dem roten Feld sehen können, ist 999 die Anzahl der Nachrichten, die sich derzeit im Thema befinden.
Update: ConsumerOffsetChecker ist seit 0.10.0 veraltet. Möglicherweise möchten Sie ConsumerGroupCommand verwenden.
quelle
Manchmal besteht das Interesse darin, die Anzahl der Nachrichten in jeder Partition zu kennen, beispielsweise beim Testen eines benutzerdefinierten Partitionierers. Die folgenden Schritte wurden getestet, um mit Kafka 0.10.2.1-2 aus Confluent 3.2 zu funktionieren. Bei einem Kafka-Thema
kt
und der folgenden Befehlszeile:Dadurch wird die Beispielausgabe gedruckt, in der die Anzahl der Nachrichten in den drei Partitionen angezeigt wird:
Die Anzahl der Zeilen kann mehr oder weniger abhängig von der Anzahl der Partitionen für das Thema sein.
quelle
Da dies
ConsumerOffsetChecker
nicht mehr unterstützt wird, können Sie mit diesem Befehl alle Nachrichten im Thema überprüfen:Wo
LAG
ist die Anzahl der Nachrichten in der Themenpartition:Sie können auch versuchen, Kafkacat zu verwenden . Dies ist ein Open Source-Projekt, mit dem Sie möglicherweise Nachrichten aus einem Thema und einer Partition lesen und in stdout drucken können. Hier ist ein Beispiel, das die letzten 10 Nachrichten aus dem
sample-kafka-topic
Thema liest und dann beendet:quelle
Verwenden Sie https://prestodb.io/docs/current/connector/kafka-tutorial.html
Eine von Facebook bereitgestellte Super-SQL-Engine, die eine Verbindung zu mehreren Datenquellen herstellt (Cassandra, Kafka, JMX, Redis ...).
PrestoDB wird als Server mit optionalen Workern ausgeführt (es gibt einen eigenständigen Modus ohne zusätzliche Worker). Anschließend verwenden Sie eine kleine ausführbare JAR (Presto CLI), um Abfragen durchzuführen.
Sobald Sie den Presto-Server gut konfiguriert haben, können Sie traditionelles SQL verwenden:
quelle
Apache Kafka-Befehl zum Abrufen nicht behandelter Nachrichten auf allen Partitionen eines Themas:
Drucke:
Spalte 6 enthält die nicht behandelten Nachrichten. Addiere sie so:
awk liest die Zeilen, überspringt die Kopfzeile und addiert die 6. Spalte und druckt am Ende die Summe.
Druckt
quelle
Um alle für das Thema gespeicherten Nachrichten abzurufen, können Sie den Konsumenten für jede Partition an den Anfang und das Ende des Streams suchen und die Ergebnisse summieren
quelle
Führen Sie Folgendes aus (vorausgesetzt, es
kafka-console-consumer.sh
befindet sich auf dem Pfad):quelle
--new-consumer
da diese Option nicht mehr verfügbar ist (oder anscheinend notwendig ist)Mit dem Java-Client von Kafka 2.11-1.0.0 können Sie Folgendes tun:
Die Ausgabe ist ungefähr so:
quelle
In den neuesten Versionen von Kafka Manager gibt es eine Spalte mit dem Titel Summierte letzte Offsets .
quelle
Ich hatte dieselbe Frage und so mache ich es von einem KafkaConsumer in Kotlin:
Sehr grober Code, da ich das gerade zum Laufen gebracht habe, aber im Grunde möchten Sie den Anfangsversatz des Themas vom Endversatz subtrahieren, und dies ist die aktuelle Nachrichtenanzahl für das Thema.
Sie können sich nicht einfach auf den Endversatz verlassen, da andere Konfigurationen (Bereinigungsrichtlinie, Aufbewahrungs-ms usw.) dazu führen können, dass alte Nachrichten aus Ihrem Thema gelöscht werden. Offsets "bewegen" sich nur vorwärts, daher ist es der anfängliche Offset, der sich näher an den Endversatz vorwärts bewegt (oder schließlich an denselben Wert, wenn das Thema gerade keine Nachricht enthält).
Grundsätzlich repräsentiert der Endversatz die Gesamtzahl der Nachrichten, die dieses Thema durchlaufen haben, und die Differenz zwischen den beiden repräsentiert die Anzahl der Nachrichten, die das Thema gerade enthält.
quelle
Auszüge aus Kafka-Dokumenten
Abschreibungen in 0.9.0.0
Die Datei kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) ist veraltet. Verwenden Sie für diese Funktionalität in Zukunft kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand).
Ich verwende Kafka Broker mit aktiviertem SSL für Server und Client. Unter Befehl benutze ich
kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x
Dabei ist / tmp / ssl_config wie folgt
quelle
Wenn Sie Zugriff auf die JMX-Schnittstelle des Servers haben, sind die Start- und End-Offsets unter folgender Adresse vorhanden:
(Sie müssen
TOPICNAME
& ersetzenPARTITIONNUMBER
). Denken Sie daran, dass Sie nach den Replikaten einer bestimmten Partition suchen müssen oder herausfinden müssen, welcher der Broker für eine bestimmte Partition führend ist (und dies kann sich im Laufe der Zeit ändern).Alternativ können Sie Kafka Consumer- Methoden
beginningOffsets
und verwendenendOffsets
.quelle
Ich habe nicht versucht , diese selbst, aber es scheint Sinn zu machen.
Sie können auch
kafka.tools.ConsumerOffsetChecker
( Quelle ) verwenden.quelle
Der einfachste Weg, den ich gefunden habe, besteht darin, die Kafdrop-REST-API zu verwenden
/topic/topicName
und den Header key:"Accept"
/ value: anzugeben"application/json"
, um eine JSON-Antwort zurückzugewinnen.Dies ist hier dokumentiert .
quelle
Sie können kafkatool verwenden . Bitte überprüfen Sie diesen Link -> http://www.kafkatool.com/download.html
quelle