Wie werden Hadoop-Prozessdatensätze über Blockgrenzen hinweg aufgeteilt?

119

Laut dem Hadoop - The Definitive Guide

Die von FileInputFormats definierten logischen Datensätze passen normalerweise nicht genau in HDFS-Blöcke. Beispielsweise sind die logischen Datensätze eines TextInputFormat Zeilen, die häufig HDFS-Grenzen überschreiten. Dies hat keinen Einfluss auf die Funktionsweise Ihres Programms - Zeilen werden beispielsweise nicht übersehen oder unterbrochen -, aber es lohnt sich zu wissen, da dies bedeutet, dass datenlokale Karten (dh Karten, die auf demselben Host wie ihre ausgeführt werden) Eingabedaten) führt einige Remote-Lesevorgänge durch. Der dadurch verursachte geringfügige Overhead ist normalerweise nicht signifikant.

Angenommen, eine Datensatzzeile ist auf zwei Blöcke (b1 und b2) aufgeteilt. Der Mapper, der den ersten Block (b1) verarbeitet, bemerkt, dass die letzte Zeile kein EOL-Trennzeichen hat, und holt den Rest der Zeile aus dem nächsten Datenblock (b2).

Wie stellt der Mapper, der den zweiten Block (b2) verarbeitet, fest, dass der erste Datensatz unvollständig ist, und sollte ab dem zweiten Datensatz im Block (b2) verarbeiten?

Praveen Sripati
quelle

Antworten:

160

Interessante Frage, ich habe einige Zeit damit verbracht, den Code nach Details zu durchsuchen, und hier sind meine Gedanken. Die Teilungen werden vom Client von verarbeitet InputFormat.getSplits, sodass ein Blick auf FileInputFormat die folgenden Informationen liefert:

  • Für jede Eingabedatei, erhalten die Dateilänge, die Blockgröße und berechnen Sie die Split - Größe wie max(minSize, min(maxSize, blockSize))wo maxSizeentspricht mapred.max.split.sizeund minSizeist mapred.min.split.size.
  • Teilen Sie die Datei FileSplitbasierend auf der oben berechneten Teilungsgröße in verschiedene s. Wichtig ist hierbei, dass jeder FileSplitmit einem startParameter initialisiert wird, der dem Offset in der Eingabedatei entspricht . Zu diesem Zeitpunkt gibt es noch keine Handhabung der Linien. Der relevante Teil des Codes sieht folgendermaßen aus:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    

Wenn Sie sich danach das ansehen, LineRecordReaderwas durch das definiert TextInputFormatist, werden dort die Zeilen behandelt:

  • Wenn Sie Ihre initialisieren LineRecordReader, wird versucht, LineReadereine Abstraktion zu instanziieren , um Zeilen lesen zu können FSDataInputStream. Es gibt 2 Fälle:
  • Wenn ein CompressionCodecCodec definiert ist, ist dieser Codec für die Behandlung von Grenzen verantwortlich. Wahrscheinlich nicht relevant für Ihre Frage.
  • Wenn es jedoch keinen Codec gibt, sind die Dinge interessant: Wenn der startvon Ihnen InputSplitanders als 0 ist, ziehen Sie 1 Zeichen zurück und überspringen dann die erste Zeile, auf die Sie stoßen, gekennzeichnet durch \ n oder \ r \ n (Windows) ! Der Backtrack ist wichtig, da Sie sicherstellen, dass Sie die gültige Zeile nicht überspringen, falls Ihre Liniengrenzen mit geteilten Grenzen übereinstimmen. Hier ist der relevante Code:

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

Da die Teilungen im Client berechnet werden, müssen die Mapper nicht nacheinander ausgeführt werden. Jeder Mapper weiß bereits, ob die erste Zeile verworfen werden muss oder nicht.

Wenn Sie also zwei Zeilen mit jeweils 100 MB in derselben Datei haben und zur Vereinfachung sagen wir, die Teilungsgröße beträgt 64 MB. Wenn dann die Eingabesplits berechnet werden, haben wir das folgende Szenario:

  • Teilen Sie 1 mit dem Pfad und den Hosts zu diesem Block. Initialisiert zu Beginn 200-200 = 0 MB, Länge 64 MB.
  • Split 2 zu Beginn initialisiert 200-200 + 64 = 64 MB, Länge 64 MB.
  • Split 3 zu Beginn initialisiert 200-200 + 128 = 128 MB, Länge 64 MB.
  • Split 4 initialisiert zu Beginn 200-200 + 192 = 192 MB, Länge 8 MB.
  • Mapper A verarbeitet Split 1, Start ist 0, überspringen Sie also nicht die erste Zeile und lesen Sie eine vollständige Zeile, die über das 64-MB-Limit hinausgeht und daher remote gelesen werden muss.
  • Mapper B verarbeitet Split 2, Start ist! = 0, überspringen Sie also die erste Zeile nach 64 MB bis 1 Byte, was dem Ende von Zeile 1 bei 100 MB entspricht, das sich noch in Split 2 befindet. Wir haben also 28 MB der Zeile in Split 2 Remote lesen Sie die verbleibenden 72 MB.
  • Mapper C verarbeitet Split 3, Start ist! = 0, überspringen Sie also die erste Zeile nach 128 MB bis 1 Byte, was dem Ende von Zeile 2 bei 200 MB entspricht, also dem Ende der Datei. Tun Sie also nichts.
  • Mapper D ist dasselbe wie Mapper C, außer dass nach 192 MB-1 Byte nach einer neuen Zeile gesucht wird.
Charles Menguy
quelle
Auch @PraveenSripati ist es erwähnenswert, dass die Randfälle, in denen sich eine Grenze in einer \ r \ n-Rückgabe bei \ r befindet, in der LineReader.readLineFunktion behandelt werden. Ich denke nicht, dass dies für Ihre Frage relevant ist, kann aber bei Bedarf weitere Details hinzufügen.
Charles Menguy
Nehmen wir an, es gibt zwei Zeilen mit genau 64 MB in der Eingabe, sodass die InputSplits genau an den Zeilengrenzen stattfinden. Also, wird der Mapper immer die Zeile im zweiten Block ignorieren, weil Start! = 0.
Praveen Sripati
6
@PraveenSripati In diesem Fall wird auf dem zweiten Mapper start! = 0 angezeigt. Gehen Sie also 1 Zeichen zurück, wodurch Sie kurz vor dem \ n der ersten Zeile zurückkehren und dann zum folgenden \ n springen. Es wird also die erste Zeile übersprungen, aber die zweite Zeile wie erwartet verarbeitet.
Charles Menguy
@CharlesMenguy ist es möglich, dass die erste Zeile der Datei irgendwie übersprungen wird? Konkret habe ich die erste Zeile mit Schlüssel = 1 und Wert a, dann gibt es zwei weitere Zeilen mit demselben Schlüssel irgendwo in der Datei, Schlüssel = 1, Wert = b und Schlüssel = 1, Wert = c. Die Sache ist, mein Reduzierer bekommt {1, [b, c]} und {1, [a]} anstelle von {1, [a, b, c]}. Dies passiert nicht, wenn ich am Anfang meiner Datei eine neue Zeile hinzufüge. Was könnte der Grund sein, Sir?
Kobe-Wan Kenobi
@CharlesMenguy Was ist, wenn die Datei in HDFS eine Binärdatei ist (im Gegensatz zu einer Textdatei, in der \r\n, \ndas Abschneiden von Datensätzen dargestellt wird)?
CᴴᴀZ
17

Der Map Reduce- Algorithmus funktioniert nicht bei physischen Blöcken der Datei. Es funktioniert bei logischen Eingabesplits. Die Aufteilung der Eingaben hängt davon ab, wo der Datensatz geschrieben wurde. Ein Datensatz kann zwei Mapper umfassen.

So wie HDFS eingerichtet wurde, zerlegt es sehr große Dateien in große Blöcke (z. B. 128 MB) und speichert drei Kopien dieser Blöcke auf verschiedenen Knoten im Cluster.

HDFS kennt den Inhalt dieser Dateien nicht. Ein Datensatz wurde möglicherweise in Block-a gestartet, aber das Ende dieses Datensatzes kann in Block-b vorhanden sein .

Um dieses Problem zu lösen, verwendet Hadoop eine logische Darstellung der in Dateiblöcken gespeicherten Daten, die als Eingabesplits bezeichnet werden. Wenn ein MapReduce-Jobclient die Eingabeaufteilungen berechnet , ermittelt er, wo der erste ganze Datensatz in einem Block beginnt und wo der letzte Datensatz im Block endet .

Der entscheidende Punkt:

In Fällen, in denen der letzte Datensatz in einem Block unvollständig ist, enthält die Eingabeaufteilung Standortinformationen für den nächsten Block und den Byte-Offset der Daten, die zum Vervollständigen des Datensatzes erforderlich sind.

Schauen Sie sich das folgende Diagramm an.

Geben Sie hier die Bildbeschreibung ein

Schauen Sie sich diesen Artikel und die zugehörige SE-Frage an: Informationen zum Aufteilen von Hadoop / HDFS-Dateien

Weitere Details können Sie der Dokumentation entnehmen

Das Map-Reduce-Framework basiert auf dem InputFormat des Jobs, um:

  1. Überprüfen Sie die Eingabespezifikation des Jobs.
  2. Teilen Sie die Eingabedatei (en) in logische InputSplits auf, von denen jede einem einzelnen Mapper zugewiesen wird.
  3. Jedes InputSplit wird dann einem einzelnen Mapper zur Verarbeitung zugewiesen. Split könnte Tupel sein . InputSplit[] getSplits(JobConf job,int numSplits) ist die API, die sich um diese Dinge kümmert.

FileInputFormat , das die InputFormatimplementierte getSplits() Methode erweitert. Schauen Sie sich die Interna dieser Methode unter grepcode an

Ravindra Babu
quelle
7

Ich sehe es wie folgt: InputFormat ist dafür verantwortlich, Daten unter Berücksichtigung der Art der Daten in logische Teilungen aufzuteilen.
Nichts hindert es daran, obwohl dies zu einer erheblichen Latenz des Jobs führen kann - die gesamte Logik und das Lesen um die gewünschten Grenzen der geteilten Größe erfolgt im Jobtracker.
Das einfachste Datensatz-fähige Eingabeformat ist TextInputFormat. Es funktioniert wie folgt (soweit ich es aus dem Code verstanden habe) - Eingabeformat erstellt Teilungen nach Größe, unabhängig von den Zeilen, aber LineRecordReader immer:
a) Überspringen Sie die erste Zeile in der Teilung (oder einen Teil davon), wenn dies nicht der Fall ist die erste Teilung
b) Lesen Sie am Ende eine Zeile nach der Grenze der Teilung (wenn Daten verfügbar sind, ist dies nicht die letzte Teilung).

David Gruzman
quelle
Skip first line in the split (or part of it), if it is not the first split- Wenn der erste Datensatz im nicht ersten Block vollständig ist, ist nicht sicher, wie diese Logik funktioniert.
Praveen Sripati
Soweit ich den Code sehe - jeder Split liest was er hat + nächste Zeile. Wenn sich der Zeilenumbruch nicht an der Blockgrenze befindet, ist dies in Ordnung. Wie genau behandelt Fall, wenn der Zeilenumbruch genau auf dem Block gebunden ist - muss verstanden werden - ich werde Code ein bisschen mehr lesen
David Gruzman
3

Soweit ich weiß, wird beim FileSplitInitialisieren von für den ersten Block der Standardkonstruktor aufgerufen. Daher sind die Werte für Start und Länge anfangs Null. Wenn am Ende der Verarbeitung des ersten Blocks die letzte Zeile unvollständig ist, ist der Wert der Länge größer als die Länge der Teilung und es wird auch die erste Zeile des nächsten Blocks gelesen. Aus diesem Grund ist der Startwert für den ersten Block größer als Null und unter dieser Bedingung LineRecordReaderwird die erste Zeile des zweiten Blocks übersprungen. (Siehe Quelle )

Wenn die letzte Zeile des ersten Blocks vollständig ist, ist der Wert der Länge gleich der Länge des ersten Blocks und der Wert des Starts für den zweiten Block ist Null. In diesem Fall LineRecordReaderüberspringt der nicht die erste Zeile und liest den zweiten Block vom Anfang.

Macht Sinn?

aa8y
quelle
2
In diesem Szenario müssen die Mapper miteinander kommunizieren und die Blöcke nacheinander verarbeiten, wenn die letzte Zeile in einem bestimmten Block nicht vollständig ist. Ich bin mir nicht sicher, ob es so funktioniert.
Praveen Sripati
1

Aus dem Hadoop-Quellcode von LineRecordReader.java, dem Konstruktor: Ich finde einige Kommentare:

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
  start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;

Ich glaube, Hadoop liest eine zusätzliche Zeile für jeden Split (am Ende des aktuellen Split lesen Sie die nächste Zeile im nächsten Split), und wenn nicht der erste Split, wird die erste Zeile weggeworfen. so dass kein Zeilendatensatz verloren geht und unvollständig ist

Shenghai.Geng
quelle
0

Die Mapper müssen nicht kommunizieren. Die Dateiblöcke befinden sich in HDFS und der aktuelle Mapper (RecordReader) kann den Block lesen, der den verbleibenden Teil der Zeile enthält. Dies geschieht hinter den Kulissen.

user3507308
quelle