Ich sende String-Nachrichten mit der Java Producer API an Kafka V. 0.8. Wenn die Nachrichtengröße ca. 15 MB beträgt, erhalte ich eine MessageSizeTooLargeException
. Ich habe versucht, message.max.bytes
auf 40 MB einzustellen , aber ich bekomme immer noch die Ausnahme. Kleine Nachrichten funktionierten ohne Probleme.
(Die Ausnahme erscheint im Hersteller, ich habe keinen Verbraucher in dieser Anwendung.)
Was kann ich tun, um diese Ausnahme zu beseitigen?
Mein Beispiel Produzent Konfiguration
private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}
Fehlerprotokoll:
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
java
apache-kafka
Sonson123
quelle
quelle
Antworten:
Sie müssen drei (oder vier) Eigenschaften anpassen:
fetch.message.max.bytes
- Hiermit wird die größte Größe einer Nachricht bestimmt, die vom Verbraucher abgerufen werden kann.replica.fetch.max.bytes
- Dadurch können die Replikate in den Brokern Nachrichten innerhalb des Clusters senden und sicherstellen, dass die Nachrichten korrekt repliziert werden. Wenn dies zu klein ist, wird die Nachricht niemals repliziert, und daher wird der Verbraucher die Nachricht niemals sehen, da die Nachricht niemals festgeschrieben (vollständig repliziert) wird.message.max.bytes
- Dies ist die größte Größe der Nachricht, die der Broker von einem Produzenten empfangen kann.max.message.bytes
- Dies ist die größte Größe der Nachricht, die der Broker an das Thema anhängen darf. Diese Größe wird vor der Komprimierung validiert. (Standardmäßig Brokermessage.max.bytes
.)Ich habe den schwierigen Weg zu Nummer 2 herausgefunden - Sie erhalten KEINE Ausnahmen, Nachrichten oder Warnungen von Kafka. Denken Sie also daran, wenn Sie große Nachrichten senden.
quelle
message.max.bytes
im Quellcode eingestellt. Aber ich muss diese Werte in der Konfiguration des Kafka-Servers einstellenconfig/server.properties
. Jetzt funktionieren auch größere Nachrichten :).fetch.message.max.bytes
JEDER Partition Speicher zu. Dies bedeutet, dass bei Verwendung einer großen Anzahl für diefetch.message.max.bytes
Kombination mit einer großen Anzahl von Partitionen viel Speicherplatz verbraucht wird. Da der Replikationsprozess zwischen den Brokern auch ein spezialisierter Verbraucher ist, wird dadurch auch Speicher auf den Brokern verbraucht.max.message.bytes
Konfiguration pro Thema gibt, die niedriger sein kann als die des Brokersmessage.max.bytes
./.*fetch.*bytes/
scheinen die Parameter auf der Verbraucherseite und diejenigen, die die Replikation zwischen Brokern betreffen , keine harten Grenzen zu sein: "Dies ist kein absolutes Maximum, wenn [...] dieser Wert größer ist als der Rekordstapel noch zurückgegeben werden, um sicherzustellen, dass Fortschritte erzielt werden können. "Für Kafka 0.10 und den neuen Verbraucher sind geringfügige Änderungen erforderlich, verglichen mit der Antwort von smile_man :
message.max.bytes
und erhöhenreplica.fetch.max.bytes
.message.max.bytes
muss gleich oder kleiner (*) sein alsreplica.fetch.max.bytes
.max.request.size
, um die größere Nachricht zu senden.max.partition.fetch.bytes
, um größere Nachrichten zu erhalten.(*) Lesen Sie die Kommentare, um mehr über
message.max.bytes
<= zu erfahrenreplica.fetch.max.bytes
quelle
message.max.bytes
kleiner sein muss alsreplica.fetch.max.bytes
?replica.fetch.max.bytes
sie unbedingt größer sein solltenmessage.max.bytes
. Ein Confluent-Mitarbeiter hat heute früher bestätigt, was ich vermutet habe: dass die beiden Mengen tatsächlich gleich sein können.message.max.bytes<replica.fetch.max.bytes
odermessage.max.bytes=replica.fetch.max.bytes
@Kostas?Sie müssen die folgenden Eigenschaften überschreiben:
Broker-Konfigurationen ($ KAFKA_HOME / config / server.properties)
Consumer-Konfigurationen ($ KAFKA_HOME / config / consumer.properties)
Dieser Schritt hat bei mir nicht funktioniert. Ich füge es der Consumer-App hinzu und es hat gut funktioniert
Starten Sie den Server neu.
Weitere Informationen finden Sie in dieser Dokumentation: http://kafka.apache.org/08/configuration.html
quelle
Die Idee ist, dass die gleiche Größe der Nachricht vom Kafka-Produzenten an den Kafka-Broker gesendet und dann von Kafka Consumer empfangen wird, d. H.
Kafka Produzent -> Kafka Broker -> Kafka Consumer
Angenommen, wenn 15 MB Nachricht gesendet werden sollen, müssen der Produzent , der Broker und der Verbraucher , alle drei, synchron sein.
Kafka Producer sendet 15 MB -> Kafka Broker erlaubt / speichert 15 MB -> Kafka Consumer erhält 15 MB
Die Einstellung sollte daher sein:
a) auf Broker:
b) zum Verbraucher:
quelle
Eine wichtige Sache, an die Sie sich erinnern sollten, ist, dass dieses
message.max.bytes
Attribut mit dem Eigentum des Verbrauchers synchron sein mussfetch.message.max.bytes
. Die Abrufgröße muss mindestens so groß sein wie die maximale Nachrichtengröße. Andernfalls kann es vorkommen, dass Produzenten Nachrichten senden können, die größer sind, als der Verbraucher verbrauchen / abrufen kann. Es könnte sich lohnen, einen Blick darauf zu werfen.Welche Version von Kafka verwenden Sie? Geben Sie auch einige weitere Details an, die Sie erhalten. Gibt es etwas wie ...
payload size of xxxx larger than 1000000
im Protokoll auftauchen?quelle
Die Antwort von @laughing_man ist ziemlich genau. Trotzdem wollte ich eine Empfehlung geben, die ich von Kafka-Experte Stephane Maarek aus Quora gelernt habe .
Kafka ist nicht für große Nachrichten gedacht.
Ihre API sollte Cloud-Speicher (Ex AWS S3) verwenden und einfach eine Referenz von S3 an Kafka oder einen Nachrichtenbroker senden. Sie müssen einen Ort finden, an dem Ihre Daten gespeichert werden können. Vielleicht handelt es sich um ein Netzwerklaufwerk, vielleicht um was auch immer, aber es sollte kein Nachrichtenbroker sein.
Nun, wenn Sie nicht mit der oben genannten Lösung gehen möchten
Die Meldung max Größe ist 1 MB (die Einstellung in Ihrem Broker genannt wird
message.max.bytes
) Apache Kafka . Wenn Sie es wirklich dringend benötigen, können Sie diese Größe erhöhen und sicherstellen, dass die Netzwerkpuffer für Ihre Produzenten und Verbraucher erhöht werden.Wenn Sie sich wirklich für die Aufteilung Ihrer Nachricht interessieren, stellen Sie sicher, dass jede Aufteilung der Nachricht genau denselben Schlüssel hat, damit sie auf dieselbe Partition übertragen wird, und Ihr Nachrichteninhalt sollte eine „Teil-ID“ melden, damit Ihr Verbraucher die Nachricht vollständig rekonstruieren kann .
Sie können die Komprimierung auch untersuchen, wenn Ihre Nachricht textbasiert ist (gzip, snappy, lz4-Komprimierung), wodurch die Datengröße möglicherweise verringert wird, jedoch nicht auf magische Weise.
Auch hier müssen Sie ein externes System verwenden, um diese Daten zu speichern und einfach einen externen Verweis auf Kafka zu senden. Das ist eine sehr verbreitete Architektur, mit der Sie sich einverstanden erklären sollten.
Denken Sie daran, dass Kafka nur dann am besten funktioniert, wenn die Nachrichten eine große Menge, aber keine große Größe haben.
Quelle: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka
quelle
Für Benutzer von landoop kafka: Sie können die Konfigurationswerte in den Umgebungsvariablen wie folgt übergeben:
Und wenn Sie rdkafka verwenden, übergeben Sie die message.max.bytes in der Produzenten-Konfiguration wie folgt:
Ebenso für den Verbraucher,
quelle