Gibt es eine Möglichkeit, das Thema in Kafka zu löschen?
Ich habe eine zu große Nachricht in ein Kafka-Nachrichtenthema auf meinem lokalen Computer verschoben. Jetzt wird eine Fehlermeldung angezeigt:
kafka.common.InvalidMessageSizeException: invalid message size
Das Erhöhen der fetch.size
ist hier nicht ideal, weil ich eigentlich keine so großen Nachrichten annehmen möchte.
apache-kafka
purge
Peter Klipfel
quelle
quelle
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
--delete-config retention.ms
e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000
Dies erlaubt Ihnen auch die aktuelle Aufbewahrungsfrist zu überprüfen, zB kafkas-configs --zookeeper <zkhost>: 2181 --describe --entity-Typ Themen --entity-name <topic name>Um die Warteschlange zu löschen, können Sie das Thema löschen:
dann erstelle es neu:
quelle
delete.topic.enable=true
in die Datei einzufügenconfig/server.properties
, wie in der Warnung des genannten Befehls angegebenNote: This will have no impact if delete.topic.enable is not set to true.
Hier sind die Schritte, die ich befolge, um ein Thema mit dem Namen zu löschen
MyTopic
:rm -rf /tmp/kafka-logs/MyTopic-0
. Wiederholen Sie diesen Vorgang für andere Partitionen und alle ReplikatezkCli.sh
dannrmr /brokers/MyTopic
Wenn Sie Schritt 3 verpassen, meldet Apache Kafka das Thema weiterhin als vorhanden (z. B. wenn Sie es ausführen
kafka-list-topic.sh
).Getestet mit Apache Kafka 0.8.0.
quelle
./zookeeper-shell.sh localhost:2181
und./kafka-topics.sh --list --zookeeper localhost:2181
zookeeper-client
anstelle vonzkCli.sh
(auf Cloudera CDH5 ausprobiert)Obwohl die akzeptierte Antwort korrekt ist, ist diese Methode veraltet. Die Themenkonfiguration sollte jetzt über erfolgen
kafka-configs
.Mit dieser Methode festgelegte Konfigurationen können mit dem Befehl angezeigt werden
quelle
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --delete-config retention.ms --entity-name MyTopic
In Kafka 0.8.2 für das Schnellstartbeispiel getestet: Fügen Sie zunächst eine Zeile zur Datei server.properties im Konfigurationsordner hinzu:
Dann können Sie diesen Befehl ausführen:
quelle
Von kafka 1.1
Ein Thema löschen
Warten Sie 1 Minute, um sicherzugehen, dass kafka das Thema löscht, entfernen Sie die Konfiguration und wechseln Sie dann zum Standardwert
quelle
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config rentention.ms=100
kafka hat keine direkte Methode zum Löschen / Bereinigen von Themen (Warteschlangen), kann dies jedoch tun, indem Sie dieses Thema löschen und neu erstellen.
Stellen Sie zunächst sicher, dass die Datei Sever.Properties vorhanden ist, und fügen Sie sie hinzu, falls nicht
delete.topic.enable=true
dann Thema löschen
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic
dann erstelle es erneut.
quelle
Wenn Sie einen gesättigten Cluster haben (zu viele Partitionen oder verschlüsselte Themendaten oder SSL verwenden oder der Controller sich auf einem fehlerhaften Knoten befindet oder die Verbindung nicht funktioniert), dauert es manchmal lange, bis das Thema gelöscht ist .
Ich folge diesen Schritten, insbesondere wenn Sie Avro verwenden.
1: Mit Kafka-Tools ausführen:
2: Auf dem Schema-Registrierungsknoten ausführen:
kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning
3: Setzen Sie die Themenaufbewahrung auf die ursprüngliche Einstellung zurück, sobald das Thema leer ist.
Hoffe das hilft jemandem, da es nicht einfach zu bewerben ist.
quelle
kafka-avro-console-consumer
ist nicht erforderlichUPDATE: Diese Antwort ist relevant für Kafka 0.6. Für Kafka 0.8 und höher siehe Antwort von @Patrick.
Ja, stoppen Sie kafka und löschen Sie alle Dateien manuell aus dem entsprechenden Unterverzeichnis (es ist einfach, sie im kafka-Datenverzeichnis zu finden). Nach dem Neustart von kafka ist das Thema leer.
quelle
Am einfachsten ist es, das Datum der einzelnen Protokolldateien so festzulegen, dass es älter als die Aufbewahrungsdauer ist. Dann sollte der Broker sie bereinigen und innerhalb weniger Sekunden für Sie entfernen. Dies bietet mehrere Vorteile:
Nach meiner Erfahrung mit Kafka 0.7.x kann das Entfernen der Protokolldateien und das Neustarten des Brokers bei bestimmten Verbrauchern zu ungültigen Offset-Ausnahmen führen. Dies würde passieren, weil der Broker die Offsets bei Null neu startet (wenn keine vorhandenen Protokolldateien vorhanden sind) und ein Verbraucher, der zuvor das Thema verwendet hat, erneut eine Verbindung herstellt, um einen bestimmten [einmal gültigen] Offset anzufordern. Wenn dieser Versatz außerhalb der Grenzen der neuen Themenprotokolle liegt, ist dies kein Schaden, und der Verbraucher wird entweder am Anfang oder am Ende wieder aufgenommen. Wenn der Versatz jedoch innerhalb der Grenzen der neuen Themenprotokolle liegt, versucht der Broker, den Nachrichtensatz abzurufen, schlägt jedoch fehl, da der Versatz nicht an einer tatsächlichen Nachricht ausgerichtet ist.
Dies könnte gemildert werden, indem auch die Verbraucher-Offsets in zookeeper für dieses Thema gelöscht werden. Wenn Sie jedoch kein jungfräuliches Thema benötigen und nur den vorhandenen Inhalt entfernen möchten, ist das einfache Berühren einiger Themenprotokolle viel einfacher und zuverlässiger als das Stoppen von Brokern, das Löschen von Themenprotokollen und das Löschen bestimmter Zookeeper-Knoten .
quelle
Der Rat von Thomas ist großartig, aber leider
zkCli
in alten Versionen von Zookeeper (zum Beispiel 3.3.6) nicht zu unterstützenrmr
. Vergleichen Sie beispielsweise die Befehlszeilenimplementierung in Modern Zookeeper mit Version 3.3 .Wenn Sie mit einer alten Version von Zookeeper konfrontiert sind, besteht eine Lösung darin, eine Clientbibliothek wie zc.zk für Python zu verwenden. Für Leute, die nicht mit Python vertraut sind, müssen Sie es mit pip oder easy_install installieren . Starten Sie dann eine Python-Shell (
python
) und Sie können Folgendes tun:oder auch
wenn Sie alle Themen aus Kafka entfernen möchten.
quelle
So bereinigen Sie alle Nachrichten eines bestimmten Themas mithilfe Ihrer Anwendungsgruppe (Gruppenname sollte mit dem Namen der Anwendungs-Kafka-Gruppe identisch sein).
./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group
quelle
Nach der Antwort von @steven appleyard habe ich die folgenden Befehle in Kafka 2.2.0 ausgeführt und sie haben für mich funktioniert.
quelle
Viele gute Antworten hier, aber unter ihnen habe ich keine über Docker gefunden. Ich habe einige Zeit damit verbracht herauszufinden, dass die Verwendung des Broker-Containers in diesem Fall falsch ist (offensichtlich !!!)
und ich hätte
zookeeper:2181
anstelle von--zookeeper localhost:2181
gemäß meiner Erstellungsdatei verwenden sollender richtige Befehl wäre
Hoffe, es wird jemandem Zeit sparen.
Beachten Sie außerdem, dass die Nachrichten nicht sofort gelöscht werden und dass das Segment des Protokolls geschlossen wird.
quelle
localhost:2181
... ZB Sie verstehen die Docker-Netzwerkfunktionen falsch. Darüber hinaus sind nicht alle Zookeeper-Container vorhanden.kafka-topics
Verwenden Sie sie daher am besten nicht auf diese Weise. Neueste Kafka-Installationen ermöglichen--bootstrap-servers
das Ändern eines Themas anstelle von--zookeeper
you can use
--zookeeper zookeeper: 2181` aus dem Kafka Container ist mein Punkt. Oder greifen Sie sogar die Zookeeper-Zeile aus der DateiAufgrund der Größe konnte kein Kommentar hinzugefügt werden: Ich bin mir nicht sicher, ob dies zutrifft, abgesehen von der Aktualisierung von Retention.ms und Retention.bytes. Ich habe jedoch festgestellt, dass die Richtlinie zur Bereinigung von Themen "Löschen" (Standard) sein sollte. Wenn "Kompakt", wird dies der Fall sein Halten Sie Nachrichten länger fest, dh wenn es "kompakt" ist, müssen Sie auch delete.retention.ms angeben .
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
Auch mussten früheste / späteste Offsets gleich überwacht werden, um zu bestätigen, dass dies erfolgreich passiert ist, kann auch das du -h / tmp / kafka-logs / test-topic-3-100- * überprüfen
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762
Das andere Problem ist, müssen Sie aktuelle Konfiguration erhalten zuerst , so dass Sie zurückkehren denken Sie daran, nach dem Löschen ist erfolgreich:
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
quelle
Ein anderer, eher manueller Ansatz zum Löschen eines Themas ist:
in den Maklern:
sudo service kafka stop
sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*
im Tierpfleger:
sudo /usr/lib/zookeeper/bin/zkCli.sh
rmr /brokers/topic/<some_topic_name>
wieder in den Maklern:
sudo service kafka start
quelle
Dies sollte
retention.ms
konfiguriert geben . Dann können Sie den obigen Änderungsbefehl verwenden, um auf 1 Sekunde zu wechseln (und später auf die Standardeinstellungen zurückzukehren).quelle
Verwenden Sie von Java aus das Neue
AdminZkClient
anstelle des VeraltetenAdminUtils
:quelle
AdminClient
oderKafkaAdminClient