In vielen realen Situationen, in denen Sie MapReduce anwenden, bestehen die endgültigen Algorithmen aus mehreren MapReduce-Schritten.
dh Map1, Reduce1, Map2, Reduce2 und so weiter.
Sie haben also die Ausgabe der letzten Reduzierung, die als Eingabe für die nächste Karte benötigt wird.
Die Zwischendaten möchten Sie (im Allgemeinen) nicht behalten, sobald die Pipeline erfolgreich abgeschlossen wurde. Auch weil diese Zwischendaten im Allgemeinen eine Datenstruktur haben (wie eine 'Karte' oder eine 'Menge'), möchten Sie nicht zu viel Aufwand beim Schreiben und Lesen dieser Schlüssel-Wert-Paare betreiben.
Was ist die empfohlene Vorgehensweise in Hadoop?
Gibt es ein (einfaches) Beispiel, das zeigt, wie mit diesen Zwischendaten richtig umgegangen wird, einschließlich der anschließenden Bereinigung?
Antworten:
Ich denke, dieses Tutorial im Entwicklernetzwerk von Yahoo wird Ihnen dabei helfen: Verketten von Jobs
Sie verwenden die
JobClient.runJob()
. Der Ausgabepfad der Daten aus dem ersten Job wird zum Eingabepfad zu Ihrem zweiten Job. Diese müssen als Argumente mit entsprechendem Code an Ihre Jobs übergeben werden, um sie zu analysieren und die Parameter für den Job einzurichten.Ich denke, dass die obige Methode möglicherweise so ist, wie es die jetzt ältere Mapred-API getan hat, aber sie sollte trotzdem funktionieren. Es wird eine ähnliche Methode in der neuen Mapreduce-API geben, aber ich bin mir nicht sicher, was es ist.
Wenn Sie nach Abschluss eines Auftrags Zwischendaten entfernen, können Sie dies in Ihrem Code tun. Ich habe es schon einmal so gemacht:
Dabei ist der Pfad der Speicherort der Daten auf HDFS. Sie müssen sicherstellen, dass Sie diese Daten nur löschen, wenn kein anderer Job dies erfordert.
quelle
Es gibt viele Möglichkeiten, wie Sie dies tun können.
(1) Kaskadierende Jobs
Erstellen Sie das JobConf-Objekt "job1" für den ersten Job und legen Sie alle Parameter mit "input" als Eingabeverzeichnis und "temp" als Ausgabeverzeichnis fest. Führen Sie diesen Job aus:
Erstellen Sie unmittelbar darunter das JobConf-Objekt "job2" für den zweiten Job und legen Sie alle Parameter mit "temp" als Eingabeverzeichnis und "output" als Ausgabeverzeichnis fest. Führen Sie diesen Job aus:
(2) Erstellen Sie zwei JobConf-Objekte und legen Sie alle darin enthaltenen Parameter wie in (1) fest, außer dass Sie JobClient.run nicht verwenden.
Erstellen Sie dann zwei Jobobjekte mit jobconfs als Parametern:
Mit dem jobControl-Objekt geben Sie die Jobabhängigkeiten an und führen dann die Jobs aus:
(3) Wenn Sie eine Struktur wie Map + | benötigen Reduzieren | Map * können Sie die Klassen ChainMapper und ChainReducer verwenden, die mit Hadoop Version 0.19 und höher geliefert werden.
quelle
Es gibt tatsächlich eine Reihe von Möglichkeiten, dies zu tun. Ich werde mich auf zwei konzentrieren.
Eine davon ist über Riffle ( http://github.com/cwensel/riffle ) eine Anmerkungsbibliothek, mit der abhängige Dinge identifiziert und in abhängiger (topologischer) Reihenfolge "ausgeführt" werden können.
Oder Sie können eine Kaskade (und MapReduceFlow) in Cascading ( http://www.cascading.org/ ) verwenden. Eine zukünftige Version wird Riffle-Annotationen unterstützen, funktioniert aber jetzt hervorragend mit unformatierten MR JobConf-Jobs.
Eine Variante besteht darin, MR-Jobs überhaupt nicht manuell zu verwalten, sondern Ihre Anwendung mithilfe der Cascading-API zu entwickeln. Anschließend werden die JobConf und die Jobverkettung intern über die Klassen Cascading Planer und Flow abgewickelt.
Auf diese Weise konzentrieren Sie sich auf Ihr Problem und nicht auf die Mechanismen zur Verwaltung von Hadoop-Jobs usw. Sie können sogar verschiedene Sprachen (wie Clojure oder Jruby) überlagern, um Ihre Entwicklung und Anwendungen noch weiter zu vereinfachen. http://www.cascading.org/modules.html
quelle
Ich habe die Jobverkettung nacheinander mit JobConf-Objekten durchgeführt. Ich habe ein WordCount-Beispiel für die Verkettung der Jobs verwendet. Ein Job ermittelt, wie oft ein Wort in der angegebenen Ausgabe wiederholt wird. Der zweite Job verwendet die erste Jobausgabe als Eingabe und ermittelt die Gesamtzahl der Wörter in der angegebenen Eingabe. Unten finden Sie den Code, der in die Treiberklasse eingefügt werden muss.
Der Befehl zum Ausführen dieser Jobs lautet:
bin / hadoop jar TotalWords.
Wir müssen den endgültigen Jobnamen für den Befehl angeben. Im obigen Fall handelt es sich um TotalWords.
quelle
Sie können die MR-Kette wie im Code angegeben ausführen.
BITTE BEACHTEN SIE : Es wurde nur der Treibercode angegeben
DIE SEQUENZ IST
( JOB1 ) KARTE-> REDUZIEREN-> ( JOB2 ) KARTE
Dies wurde durchgeführt, um die Schlüssel zu sortieren. Es gibt jedoch noch weitere Möglichkeiten, z. B. die Verwendung einer Baumkarte.
Ich möchte Ihre Aufmerksamkeit jedoch auf die Art und Weise richten, wie die Jobs verkettet wurden! !
Danke dir
quelle
Sie können oozie für die Barch-Verarbeitung Ihrer MapReduce-Jobs verwenden. http://issues.apache.org/jira/browse/HADOOP-5303
quelle
Es gibt Beispiele im Apache Mahout-Projekt, die mehrere MapReduce-Jobs miteinander verketten. Eines der Beispiele finden Sie unter:
RecommenderJob.java
http://search-lucene.com/c/Mahout:/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java%7C%7CRecommenderJob
quelle
Wir können die
waitForCompletion(true)
Methode des Jobs verwenden, um die Abhängigkeit zwischen dem Job zu definieren.In meinem Szenario hatte ich 3 Jobs, die voneinander abhängig waren. In der Treiberklasse habe ich den folgenden Code verwendet und er funktioniert wie erwartet.
quelle
Die neue Klasse org.apache.hadoop.mapreduce.lib.chain.ChainMapper hilft diesem Szenario
quelle
Obwohl es komplexe serverbasierte Hadoop-Workflow-Engines gibt, z. B. oozie, habe ich eine einfache Java-Bibliothek, die die Ausführung mehrerer Hadoop-Jobs als Workflow ermöglicht. Die Jobkonfiguration und der Workflow, die die Abhängigkeit zwischen Jobs definieren, werden in einer JSON-Datei konfiguriert. Alles ist extern konfigurierbar und erfordert keine Änderung der vorhandenen Map Reduce-Implementierung, um Teil eines Workflows zu sein.
Details finden Sie hier. Quellcode und JAR sind in Github verfügbar.
http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/
Pranab
quelle
Ich denke, oozie hilft den nachfolgenden Jobs, die Eingaben direkt vom vorherigen Job zu erhalten. Dies vermeidet die mit Jobcontrol ausgeführte E / A-Operation.
quelle
Wenn Sie Ihre Jobs programmgesteuert verketten möchten, müssen Sie JobControl verwenden. Die Verwendung ist recht einfach:
Danach fügen Sie ControlledJob-Instanzen hinzu. ControlledJob definiert einen Job mit seinen Abhängigkeiten und fügt so automatisch Ein- und Ausgänge ein, um sie an eine "Kette" von Jobs anzupassen.
startet die Kette. Sie werden das in einen speziellen Thread einfügen wollen. Auf diese Weise können Sie den Status Ihrer Kette überprüfen, während sie ausgeführt wird:
quelle
Wie Sie in Ihrer Anforderung erwähnt haben, dass o / p von MRJob1 das i / p von MRJob2 usw. sein soll, können Sie in Betracht ziehen, den oozie-Workflow für diesen Anwendungsfall zu verwenden. Sie können auch Ihre Zwischendaten in HDFS schreiben, da diese vom nächsten MRJob verwendet werden. Und nachdem der Auftrag abgeschlossen ist, können Sie Ihre Zwischendaten bereinigen.
quelle