Interessante Frage, ich habe einige Zeit damit verbracht, den Code nach Details zu durchsuchen, und hier sind meine Gedanken. Die Teilungen werden vom Client von verarbeitet InputFormat.getSplits
, sodass ein Blick auf FileInputFormat die folgenden Informationen liefert:
- Für jede Eingabedatei, erhalten die Dateilänge, die Blockgröße und berechnen Sie die Split - Größe wie
max(minSize, min(maxSize, blockSize))
wo maxSize
entspricht mapred.max.split.size
und minSize
ist mapred.min.split.size
.
Teilen Sie die Datei FileSplit
basierend auf der oben berechneten Teilungsgröße in verschiedene s. Wichtig ist hierbei, dass jeder FileSplit
mit einem start
Parameter initialisiert wird, der dem Offset in der Eingabedatei entspricht . Zu diesem Zeitpunkt gibt es noch keine Handhabung der Linien. Der relevante Teil des Codes sieht folgendermaßen aus:
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
Wenn Sie sich danach das ansehen, LineRecordReader
was durch das definiert TextInputFormat
ist, werden dort die Zeilen behandelt:
- Wenn Sie Ihre initialisieren
LineRecordReader
, wird versucht, LineReader
eine Abstraktion zu instanziieren , um Zeilen lesen zu können FSDataInputStream
. Es gibt 2 Fälle:
- Wenn ein
CompressionCodec
Codec definiert ist, ist dieser Codec für die Behandlung von Grenzen verantwortlich. Wahrscheinlich nicht relevant für Ihre Frage.
Wenn es jedoch keinen Codec gibt, sind die Dinge interessant: Wenn der start
von Ihnen InputSplit
anders als 0 ist, ziehen Sie 1 Zeichen zurück und überspringen dann die erste Zeile, auf die Sie stoßen, gekennzeichnet durch \ n oder \ r \ n (Windows) ! Der Backtrack ist wichtig, da Sie sicherstellen, dass Sie die gültige Zeile nicht überspringen, falls Ihre Liniengrenzen mit geteilten Grenzen übereinstimmen. Hier ist der relevante Code:
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
Da die Teilungen im Client berechnet werden, müssen die Mapper nicht nacheinander ausgeführt werden. Jeder Mapper weiß bereits, ob die erste Zeile verworfen werden muss oder nicht.
Wenn Sie also zwei Zeilen mit jeweils 100 MB in derselben Datei haben und zur Vereinfachung sagen wir, die Teilungsgröße beträgt 64 MB. Wenn dann die Eingabesplits berechnet werden, haben wir das folgende Szenario:
- Teilen Sie 1 mit dem Pfad und den Hosts zu diesem Block. Initialisiert zu Beginn 200-200 = 0 MB, Länge 64 MB.
- Split 2 zu Beginn initialisiert 200-200 + 64 = 64 MB, Länge 64 MB.
- Split 3 zu Beginn initialisiert 200-200 + 128 = 128 MB, Länge 64 MB.
- Split 4 initialisiert zu Beginn 200-200 + 192 = 192 MB, Länge 8 MB.
- Mapper A verarbeitet Split 1, Start ist 0, überspringen Sie also nicht die erste Zeile und lesen Sie eine vollständige Zeile, die über das 64-MB-Limit hinausgeht und daher remote gelesen werden muss.
- Mapper B verarbeitet Split 2, Start ist! = 0, überspringen Sie also die erste Zeile nach 64 MB bis 1 Byte, was dem Ende von Zeile 1 bei 100 MB entspricht, das sich noch in Split 2 befindet. Wir haben also 28 MB der Zeile in Split 2 Remote lesen Sie die verbleibenden 72 MB.
- Mapper C verarbeitet Split 3, Start ist! = 0, überspringen Sie also die erste Zeile nach 128 MB bis 1 Byte, was dem Ende von Zeile 2 bei 200 MB entspricht, also dem Ende der Datei. Tun Sie also nichts.
- Mapper D ist dasselbe wie Mapper C, außer dass nach 192 MB-1 Byte nach einer neuen Zeile gesucht wird.
LineReader.readLine
Funktion behandelt werden. Ich denke nicht, dass dies für Ihre Frage relevant ist, kann aber bei Bedarf weitere Details hinzufügen.\r\n, \n
das Abschneiden von Datensätzen dargestellt wird)?Der Map Reduce- Algorithmus funktioniert nicht bei physischen Blöcken der Datei. Es funktioniert bei logischen Eingabesplits. Die Aufteilung der Eingaben hängt davon ab, wo der Datensatz geschrieben wurde. Ein Datensatz kann zwei Mapper umfassen.
So wie HDFS eingerichtet wurde, zerlegt es sehr große Dateien in große Blöcke (z. B. 128 MB) und speichert drei Kopien dieser Blöcke auf verschiedenen Knoten im Cluster.
HDFS kennt den Inhalt dieser Dateien nicht. Ein Datensatz wurde möglicherweise in Block-a gestartet, aber das Ende dieses Datensatzes kann in Block-b vorhanden sein .
Um dieses Problem zu lösen, verwendet Hadoop eine logische Darstellung der in Dateiblöcken gespeicherten Daten, die als Eingabesplits bezeichnet werden. Wenn ein MapReduce-Jobclient die Eingabeaufteilungen berechnet , ermittelt er, wo der erste ganze Datensatz in einem Block beginnt und wo der letzte Datensatz im Block endet .
Der entscheidende Punkt:
In Fällen, in denen der letzte Datensatz in einem Block unvollständig ist, enthält die Eingabeaufteilung Standortinformationen für den nächsten Block und den Byte-Offset der Daten, die zum Vervollständigen des Datensatzes erforderlich sind.
Schauen Sie sich das folgende Diagramm an.
Schauen Sie sich diesen Artikel und die zugehörige SE-Frage an: Informationen zum Aufteilen von Hadoop / HDFS-Dateien
Weitere Details können Sie der Dokumentation entnehmen
Das Map-Reduce-Framework basiert auf dem InputFormat des Jobs, um:
InputSplit[] getSplits(JobConf job,int numSplits
) ist die API, die sich um diese Dinge kümmert.FileInputFormat , das die
InputFormat
implementiertegetSplits
() Methode erweitert. Schauen Sie sich die Interna dieser Methode unter grepcode anquelle
Ich sehe es wie folgt: InputFormat ist dafür verantwortlich, Daten unter Berücksichtigung der Art der Daten in logische Teilungen aufzuteilen.
Nichts hindert es daran, obwohl dies zu einer erheblichen Latenz des Jobs führen kann - die gesamte Logik und das Lesen um die gewünschten Grenzen der geteilten Größe erfolgt im Jobtracker.
Das einfachste Datensatz-fähige Eingabeformat ist TextInputFormat. Es funktioniert wie folgt (soweit ich es aus dem Code verstanden habe) - Eingabeformat erstellt Teilungen nach Größe, unabhängig von den Zeilen, aber LineRecordReader immer:
a) Überspringen Sie die erste Zeile in der Teilung (oder einen Teil davon), wenn dies nicht der Fall ist die erste Teilung
b) Lesen Sie am Ende eine Zeile nach der Grenze der Teilung (wenn Daten verfügbar sind, ist dies nicht die letzte Teilung).
quelle
Skip first line in the split (or part of it), if it is not the first split
- Wenn der erste Datensatz im nicht ersten Block vollständig ist, ist nicht sicher, wie diese Logik funktioniert.Soweit ich weiß, wird beim
FileSplit
Initialisieren von für den ersten Block der Standardkonstruktor aufgerufen. Daher sind die Werte für Start und Länge anfangs Null. Wenn am Ende der Verarbeitung des ersten Blocks die letzte Zeile unvollständig ist, ist der Wert der Länge größer als die Länge der Teilung und es wird auch die erste Zeile des nächsten Blocks gelesen. Aus diesem Grund ist der Startwert für den ersten Block größer als Null und unter dieser BedingungLineRecordReader
wird die erste Zeile des zweiten Blocks übersprungen. (Siehe Quelle )Wenn die letzte Zeile des ersten Blocks vollständig ist, ist der Wert der Länge gleich der Länge des ersten Blocks und der Wert des Starts für den zweiten Block ist Null. In diesem Fall
LineRecordReader
überspringt der nicht die erste Zeile und liest den zweiten Block vom Anfang.Macht Sinn?
quelle
Aus dem Hadoop-Quellcode von LineRecordReader.java, dem Konstruktor: Ich finde einige Kommentare:
Ich glaube, Hadoop liest eine zusätzliche Zeile für jeden Split (am Ende des aktuellen Split lesen Sie die nächste Zeile im nächsten Split), und wenn nicht der erste Split, wird die erste Zeile weggeworfen. so dass kein Zeilendatensatz verloren geht und unvollständig ist
quelle
Die Mapper müssen nicht kommunizieren. Die Dateiblöcke befinden sich in HDFS und der aktuelle Mapper (RecordReader) kann den Block lesen, der den verbleibenden Teil der Zeile enthält. Dies geschieht hinter den Kulissen.
quelle