Gibt es eine Möglichkeit, alle Daten aus einem Thema zu löschen oder das Thema vor jedem Lauf zu löschen?

87

Gibt es eine Möglichkeit, alle Daten aus einem Thema zu löschen oder das Thema vor jedem Lauf zu löschen?

Kann ich die Datei KafkaConfig.scala ändern, um die logRetentionHoursEigenschaft zu ändern ? Gibt es eine Möglichkeit, die Nachrichten zu löschen, sobald der Verbraucher sie liest?

Ich verwende Produzenten, um die Daten von irgendwoher abzurufen und an ein bestimmtes Thema zu senden, das ein Verbraucher konsumiert. Kann ich bei jedem Lauf alle Daten aus diesem Thema löschen? Ich möchte jedes Mal nur neue Daten im Thema. Gibt es eine Möglichkeit, das Thema irgendwie neu zu initialisieren?

TommyT
quelle

Antworten:

61

Glaube nicht, dass es noch unterstützt wird. Schauen Sie sich dieses JIRA-Problem "Unterstützung zum Löschen von Themen hinzufügen" an.

So löschen Sie manuell:

  1. Fahren Sie den Cluster herunter
  2. Bereinigen Sie das kafka-Protokollverzeichnis (angegeben durch das log.dirAttribut in der kafka- Konfigurationsdatei ) sowie die zookeeper-Daten
  3. Starten Sie den Cluster neu

Für jedes Thema können Sie Folgendes tun

  1. Hör auf Kafka
  2. Bereinigen Sie das partitionsspezifische Kafka-Protokoll. Kafka speichert seine Protokolldatei in einem Format von "logDir / topic-partition", sodass für ein Thema mit dem Namen "MyTopic" das Protokoll für die Partitions-ID 0 dort gespeichert wird, /tmp/kafka-logs/MyTopic-0wo /tmp/kafka-logses durch das log.dirAttribut angegeben wird
  3. Starten Sie kafka neu

Dies ist NOTein guter und empfohlener Ansatz, der jedoch funktionieren sollte. In der Kafka Broker-Konfigurationsdatei wird das log.retention.hours.per.topicAttribut zum Definieren verwendetThe number of hours to keep a log file before deleting it for some specific topic

Gibt es auch eine Möglichkeit, die Nachrichten zu löschen, sobald der Verbraucher sie liest?

Aus der Kafka-Dokumentation :

Der Kafka-Cluster speichert alle veröffentlichten Nachrichten - unabhängig davon, ob sie verbraucht wurden oder nicht - für einen konfigurierbaren Zeitraum. Wenn die Protokollaufbewahrung beispielsweise auf zwei Tage festgelegt ist, steht sie für die zwei Tage nach Veröffentlichung einer Nachricht zum Verzehr zur Verfügung. Danach wird sie verworfen, um Speicherplatz freizugeben. Die Leistung von Kafka ist in Bezug auf die Datengröße praktisch konstant, sodass das Beibehalten vieler Daten kein Problem darstellt.

Tatsächlich sind die einzigen Metadaten, die pro Verbraucher gespeichert werden, die Position des Verbrauchers im Protokoll, die als "Offset" bezeichnet wird. Dieser Versatz wird vom Verbraucher gesteuert: Normalerweise erhöht ein Verbraucher seinen Versatz beim Lesen von Nachrichten linear, aber tatsächlich wird die Position vom Verbraucher gesteuert und er kann Nachrichten in beliebiger Reihenfolge verbrauchen. Beispielsweise kann ein Verbraucher zur Wiederaufbereitung auf einen älteren Offset zurücksetzen.

Für den Start der Suche nach Offset in Kafka 0,8 lesen Einfaches Consumer Beispiel sagen sie

Kafka enthält zwei Konstanten, um zu helfen, kafka.api.OffsetRequest.EarliestTime()findet den Anfang der Daten in den Protokollen und startet das Streaming von dort, kafka.api.OffsetRequest.LatestTime()wird nur neue Nachrichten streamen.

Dort finden Sie auch den Beispielcode für die Verwaltung des Offsets bei Ihrem Kunden.

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
}
Hild
quelle
Ich glaube, der richtige Link zum JIRA-Problem lautet issue.apache.org/jira/browse/KAFKA-330
Asmaier
4
Das Thema wird hier weiterhin angezeigt, da es in zookeeper aufgeführt ist. Sie müssen rekursiv alles unter brokers/topics/<topic_to_delete>sowie die Protokolle löschen , um es loszuwerden.
SubmittedDenied
3
Entsprechend dem Problemlink können Sie ein Thema nach Version 0.8.1 löschen. Sie können die Detailhilfe von anzeigen kafka-run-class.sh kafka.admin.DeleteTopicCommand.
Jay
5
Update: Ab Kafka 0.8.2 wird der Befehl geändert in:kafka-run-class.sh kafka.admin.TopicCommand --delete --topic [topic_to_delete] --zookeeper localhost:2181
Jay Taylor
Ich denke, diese Funktion zum Löschen von Themen wurde jetzt hinzugefügt. Wahrscheinlich wird es die nächste stabile Version haben.
ha9u63ar
68

Wie ich hier erwähnte Purge Kafka Queue :

In Kafka 0.8.2 für das Schnellstartbeispiel getestet: Fügen Sie zunächst eine Zeile zur Datei server.properties im Konfigurationsordner hinzu:

delete.topic.enable=true

Dann können Sie diesen Befehl ausführen:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
Patrick
quelle
2
Übrigens müssen Sie den Kafka-Server nach dem Hinzufügen der Option nicht neu starten, falls sich jemand wundert.
Problemoffizier
14

Getestet mit Kafka 0.10

1. stop zookeeper & Kafka server,
2. then go to 'kafka-logs' folder , there you will see list of kafka topic folders, delete folder with topic name
3. go to 'zookeeper-data' folder , delete data inside that.
4. start zookeeper & kafka server again.

Hinweis: Wenn Sie Themenordner in Kafka-Protokollen löschen, jedoch nicht aus dem Zookeeper-Datenordner, werden weiterhin Themen angezeigt.

Swadeshi
quelle
8

Im Folgenden finden Sie Skripts zum Leeren und Löschen eines Kafka-Themas, wobei localhost als zookeeper-Server angenommen wird und Kafka_Home auf das Installationsverzeichnis festgelegt ist:

Das folgende Skript leert ein Thema, indem die Aufbewahrungszeit auf 1 Sekunde festgelegt und die Konfiguration entfernt wird:

#!/bin/bash
echo "Enter name of topic to empty:"
read topicName
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --add-config retention.ms=1000
sleep 5
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --delete-config retention.ms

Um Themen vollständig zu löschen , müssen Sie alle anwendbaren Kafka-Broker stoppen und ihre Verzeichnisse aus dem Kafka-Protokollverzeichnis entfernen (Standard: / tmp / kafka-logs) und dann dieses Skript ausführen, um das Thema aus dem Zookeeper zu entfernen. Um zu überprüfen, ob es aus dem Zookeeper gelöscht wurde, sollte die Ausgabe von ls / brokers / topic nicht mehr das folgende Thema enthalten:

#!/bin/bash
echo "Enter name of topic to delete from zookeeper:"
read topicName
/$Kafka_Home/bin/zookeeper-shell localhost:2181 <<EOF
rmr /brokers/topics/$topicName
ls /brokers/topics
quit
EOF
vdlen
quelle
1
Dies funktioniert nur, wenn die Aufbewahrungsprüfung innerhalb dieser 5 Sekunden nach dem Schlafengehen erfolgt. Bitte stellen Sie sicher, dass Sie schlafen, bis der Scheck definitiv wie hier angegeben bestanden hat:grep "log.retention.check.interval" $Kafka_Home/config/server.properties
Colin
2
Ich wollte die Antwort bearbeiten, da der erste Befehl einen kleinen Fehler enthält. Änderungen an einem Zeichen sind jedoch nicht zulässig. Eigentlich ist es nicht --add configeher so--add-config
SRC
7

Wir haben ziemlich genau versucht, was die anderen Antworten mit mäßigem Erfolg beschreiben. Was für uns wirklich funktioniert hat (Apache Kafka 0.8.1), ist der Klassenbefehl

sh kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic yourtopic --zookeeper localhost: 2181

Dan M.
quelle
2
Versuchte dies in 0.8.1. Der Befehl gibt "Löschen erfolgreich!" Zurück. Die Partitionen in den Protokollordnern werden jedoch nicht gelöscht.
Dilm
8
Versucht auf 0.8.2.1 (Homebrew) und es gibt diesen Fehler. Error: Could not find or load main class kafka.admin.DeleteTopicCommand
Thanish
2
Ab dem neuen kafka (0.8.2) ist es sh kafka-run-class.sh kafka.admin.TopicCommand --delete --topic [topic_for_delete] --zookeeper localhost: 2181. Stellen Sie sicher, dass delete.topic.enable true ist.
Hoàng Long
7

Als schmutzige Problemumgehung können Sie die Einstellungen für die Laufzeitaufbewahrung pro Thema anpassen, z. B. bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1( Aufbewahrungsbytes = 0 funktionieren möglicherweise auch).

Nach kurzer Zeit sollte Kafka den Platz freigeben. Ich bin mir nicht sicher, ob dies Auswirkungen auf die Neuerstellung des Themas hat.

ps. Bringen Sie die Retentionseinstellungen besser zurück, sobald die Reinigung abgeschlossen ist.

Sie können auch retention.mshistorische Daten beibehalten

Ivan Balashov
quelle
3

Für Brauanwender

Wenn Sie brewwie ich verwenden und viel Zeit damit verschwenden, nach dem berüchtigten kafka-logsOrdner zu suchen , fürchten Sie sich nicht mehr. (und bitte lassen Sie mich wissen, ob das für Sie und mehrere verschiedene Versionen von Homebrew, Kafka usw. funktioniert :))

Sie werden es wahrscheinlich finden unter:

Ort:

/usr/local/var/lib/kafka-logs


Wie man diesen Weg tatsächlich findet

(Dies ist auch hilfreich für praktisch jede App, die Sie über Brew installieren.)

1) brew services list

kafka hat matbhz /Users/matbhz/Library/LaunchAgents/homebrew.mxcl.kafka.plist gestartet

2) Öffnen und lesen plistSie das oben gefundene

3) Finden Sie die Linie, die den server.propertiesOrt definiert, öffnen Sie sie, in meinem Fall:

  • /usr/local/etc/kafka/server.properties

4) Suchen Sie nach der log.dirsZeile:

log.dirs = / usr / local / var / lib / kafka-logs

5) Gehen Sie zu diesem Speicherort und löschen Sie die Protokolle für die gewünschten Themen

6) Starten Sie Kafka mit neu brew services restart kafka

Matheus Felipe
quelle
2

Alle Daten zu Themen und deren Partitionen werden in gespeichert tmp/kafka-logs/. Darüber hinaus werden sie in einem Format gespeichert. topic-partionNumberWenn Sie also ein Thema löschen möchten newTopic, können Sie:

  • hör auf kafka
  • Löschen Sie die Dateien rm -rf /tmp/kafka-logs/newTopic-*
Salvador Dali
quelle
1
  1. Stoppen Sie ZooKeeper und Kafka
  2. Ändern Sie in server.properties den Wert log.retention.hours. Sie können kommentieren log.retention.hoursund hinzufügen log.retention.ms=1000. Es würde die Aufzeichnung über Kafka Topic nur für eine Sekunde halten.
  3. Starten Sie Tierpfleger und Kafka.
  4. Überprüfen Sie die Verbraucherkonsole. Als ich die Konsole zum ersten Mal öffnete, war die Aufzeichnung dort. Aber als ich die Konsole wieder öffnete, wurde der Datensatz entfernt.
  5. Später können Sie den Wert log.retention.hoursauf die gewünschte Zahl einstellen .
Graf
quelle
1

Ab der Version kafka 2.3.0 gibt es eine alternative Möglichkeit zum sanften Löschen von Kafka (alte Ansätze sind veraltet).

Aktualisieren Sie Retention.ms auf 1 Sek. (1000 ms) und setzen Sie es nach einer Minute erneut auf die Standardeinstellung, dh 7 Tage (168 Stunden, 604.800.000 in ms).

Weiches Löschen: - (rentention.ms = 1000) (mit kafka-configs.sh)

bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=1000
Completed Updating config for entity: topic 'kafka_topic3p3r'.

Standardeinstellung: - 7 Tage (168 Stunden, Retention.ms = 604800000)

bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=604800000
Brajkishore Dubey
quelle
0

Wenn Sie ein Thema manuell aus einem Kafka-Cluster löschen, können Sie dies unter https://github.com/darrenfu/bigdata/issues/6 überprüfen. Ein wichtiger Schritt, der in den meisten Lösungen häufig übersehen wird, ist das Löschen des /config/topics/<topic_name>in ZK.

Abdurrahman Adebiyi
quelle
0

Ich benutze dieses Skript:

#!/bin/bash
topics=`kafka-topics --list --zookeeper zookeeper:2181`
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --config ${p}=100
    done
done
sleep 60
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --delete-config ${p}
    done
done
Дмитрий Шепелев
quelle
0

Ich verwende das folgende Dienstprogramm, um nach meinem Integrationstestlauf zu bereinigen.

Es verwendet die neueste AdminZkClientAPI. Die ältere API ist veraltet.

import javax.inject.Inject
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.utils.Time

class ZookeeperUtils @Inject() (config: AppConfig) {

  val testTopic = "users_1"

  val zkHost = config.KafkaConfig.zkHost
  val sessionTimeoutMs = 10 * 1000
  val connectionTimeoutMs = 60 * 1000
  val isSecure = false
  val maxInFlightRequests = 10
  val time: Time = Time.SYSTEM

  def cleanupTopic(config: AppConfig) = {

    val zkClient = KafkaZkClient.apply(zkHost, isSecure, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time)
    val zkUtils = new AdminZkClient(zkClient)

    val pp = new Properties()
    pp.setProperty("delete.retention.ms", "10")
    pp.setProperty("file.delete.delay.ms", "1000")
    zkUtils.changeTopicConfig(testTopic , pp)
    //    zkUtils.deleteTopic(testTopic)

    println("Waiting for topic to be purged. Then reset to retain records for the run")
    Thread.sleep(60000L)

    val resetProps = new Properties()
    resetProps.setProperty("delete.retention.ms", "3000000")
    resetProps.setProperty("file.delete.delay.ms", "4000000")
    zkUtils.changeTopicConfig(testTopic , resetProps)

  }


}

Es gibt eine Option zum Löschen von Themen. Es markiert jedoch das Thema zum Löschen. Zookeeper löscht das Thema später. Da dies unvorhersehbar lang sein kann, bevorzuge ich den Retention.ms-Ansatz

ForeverLearner
quelle