Analysieren Sie eine große JSON-Datei in Nodejs

98

Ich habe eine Datei, in der viele JavaScript-Objekte in JSON-Form gespeichert sind, und ich muss die Datei lesen, jedes der Objekte erstellen und etwas damit tun (in meinem Fall in eine Datenbank einfügen). Die JavaScript-Objekte können in einem Format dargestellt werden:

Format A:

[{name: 'thing1'},
....
{name: 'thing999999999'}]

oder Format B:

{name: 'thing1'}         // <== My choice.
...
{name: 'thing999999999'}

Beachten Sie, dass das ...viele JSON-Objekte anzeigt. Mir ist bewusst, dass ich die gesamte Datei in den Speicher lesen und dann folgendermaßen verwenden JSON.parse()kann:

fs.readFile(filePath, 'utf-8', function (err, fileContents) {
  if (err) throw err;
  console.log(JSON.parse(fileContents));
});

Die Datei könnte jedoch sehr groß sein. Ich würde es vorziehen, einen Stream zu verwenden, um dies zu erreichen. Das Problem, das ich bei einem Stream sehe, ist, dass der Dateiinhalt jederzeit in Datenblöcke aufgeteilt werden kann. Wie kann ich ihn also JSON.parse()für solche Objekte verwenden?

Im Idealfall wird jedes Objekt als separater Datenblock gelesen, aber ich bin mir nicht sicher, wie das geht .

var importStream = fs.createReadStream(filePath, {flags: 'r', encoding: 'utf-8'});
importStream.on('data', function(chunk) {

    var pleaseBeAJSObject = JSON.parse(chunk);           
    // insert pleaseBeAJSObject in a database
});
importStream.on('end', function(item) {
   console.log("Woot, imported objects into the database!");
});*/

Hinweis: Ich möchte verhindern, dass die gesamte Datei in den Speicher eingelesen wird. Zeiteffizienz spielt für mich keine Rolle. Ja, ich könnte versuchen, mehrere Objekte gleichzeitig zu lesen und alle gleichzeitig einzufügen, aber das ist eine Leistungsoptimierung - ich brauche einen Weg, der garantiert keine Speicherüberlastung verursacht, unabhängig davon, wie viele Objekte in der Datei enthalten sind .

Ich kann verwenden , wählen FormatAoder FormatBoder vielleicht etwas anderes, nur geben Sie bitte Ihre Antwort. Vielen Dank!

dgh
quelle
Für Format B können Sie den Block nach neuen Zeilen durchsuchen und jede ganze Zeile extrahieren, wobei der Rest verkettet wird, wenn er in der Mitte abgeschnitten wird. Es kann jedoch einen eleganteren Weg geben. Ich habe nicht zu viel mit Streams gearbeitet.
Travis

Antworten:

82

Um eine Datei zeilenweise zu verarbeiten, müssen Sie lediglich das Lesen der Datei und den Code, der auf diese Eingabe einwirkt, entkoppeln. Sie können dies erreichen, indem Sie Ihre Eingabe puffern, bis Sie eine neue Zeile erreichen. Angenommen, wir haben ein JSON-Objekt pro Zeile (im Grunde Format B):

var stream = fs.createReadStream(filePath, {flags: 'r', encoding: 'utf-8'});
var buf = '';

stream.on('data', function(d) {
    buf += d.toString(); // when data is read, stash it in a string buffer
    pump(); // then process the buffer
});

function pump() {
    var pos;

    while ((pos = buf.indexOf('\n')) >= 0) { // keep going while there's a newline somewhere in the buffer
        if (pos == 0) { // if there's more than one newline in a row, the buffer will now start with a newline
            buf = buf.slice(1); // discard it
            continue; // so that the next iteration will start with data
        }
        processLine(buf.slice(0,pos)); // hand off the line
        buf = buf.slice(pos+1); // and slice the processed data off the buffer
    }
}

function processLine(line) { // here's where we do something with a line

    if (line[line.length-1] == '\r') line=line.substr(0,line.length-1); // discard CR (0x0D)

    if (line.length > 0) { // ignore empty lines
        var obj = JSON.parse(line); // parse the JSON
        console.log(obj); // do something with the data here!
    }
}

Jedes Mal, wenn der Dateistream Daten vom Dateisystem empfängt, werden diese in einem Puffer gespeichert und dann pumpaufgerufen.

Wenn sich keine neue Zeile im Puffer befindet, pumpkehren Sie einfach zurück, ohne etwas zu tun. Wenn der Stream das nächste Mal Daten erhält, werden dem Puffer weitere Daten (und möglicherweise eine neue Zeile) hinzugefügt, und dann haben wir ein vollständiges Objekt.

Wenn es eine neue Zeile gibt, pumpwird der Puffer von Anfang bis zur neuen Zeile abgeschnitten und an übergeben process. Es wird dann erneut geprüft, ob sich eine weitere neue Zeile im Puffer befindet (die whileSchleife). Auf diese Weise können wir alle Zeilen verarbeiten, die im aktuellen Block gelesen wurden.

Schließlich processwird einmal pro Eingabezeile aufgerufen. Wenn vorhanden, wird das Wagenrücklaufzeichen entfernt (um Probleme mit den Zeilenenden zu vermeiden - LF vs CRLF) und dann JSON.parseeine Zeile aufgerufen . An diesem Punkt können Sie mit Ihrem Objekt alles tun, was Sie brauchen.

Beachten Sie, dass dies JSON.parsestreng ist, was als Eingabe akzeptiert wird. Sie müssen Ihre Bezeichner und Zeichenfolgenwerte in doppelte Anführungszeichen setzen . Mit anderen Worten, {name:'thing1'}wird einen Fehler auslösen; Sie müssen verwenden {"name":"thing1"}.

Da sich immer nur ein Datenblock gleichzeitig im Speicher befindet, ist dies äußerst speichereffizient. Es wird auch extrem schnell sein. Ein schneller Test ergab, dass ich 10.000 Zeilen in weniger als 15 ms verarbeitet habe.

josh3736
quelle
12
Diese Antwort ist jetzt überflüssig. Verwenden Sie JSONStream, und Sie haben sofort Unterstützung.
Arcseldon
2
Der Funktionsname 'process' ist schlecht. 'process' sollte eine Systemvariable sein. Dieser Fehler hat mich stundenlang verwirrt.
Zhigong Li
17
@arcseldon Ich glaube nicht, dass die Tatsache, dass es eine Bibliothek gibt, die dies tut, diese Antwort überflüssig macht. Es ist sicherlich immer noch nützlich zu wissen, wie dies ohne das Modul möglich ist.
Kevin B
3
Ich bin nicht sicher, ob dies für eine minimierte JSON-Datei funktionieren würde. Was wäre, wenn die gesamte Datei in einer einzigen Zeile zusammengefasst wäre und die Verwendung solcher Trennzeichen nicht möglich wäre? Wie lösen wir dieses Problem dann?
SLearner
7
Bibliotheken von Drittanbietern bestehen nicht aus Magie, die Sie kennen. Sie sind genau wie diese Antwort, ausgearbeitete Versionen von handgerollten Lösungen, aber nur verpackt und als Programm gekennzeichnet. Es ist viel wichtiger und relevanter zu verstehen, wie Dinge funktionieren, als Daten blind in eine Bibliothek zu werfen und Ergebnisse zu erwarten.
Ich
34

Gerade als ich dachte, dass es Spaß machen würde, einen Streaming-JSON-Parser zu schreiben, dachte ich auch, dass ich vielleicht eine schnelle Suche durchführen sollte, um zu sehen, ob bereits einer verfügbar ist.

Es stellt sich heraus, dass es gibt.

Da ich es gerade gefunden habe, habe ich es offensichtlich nicht verwendet, daher kann ich seine Qualität nicht kommentieren, aber ich bin gespannt, ob es funktioniert.

Es funktioniert unter Berücksichtigung des folgenden Javascript und _.isString:

stream.pipe(JSONStream.parse('*'))
  .on('data', (d) => {
    console.log(typeof d);
    console.log("isString: " + _.isString(d))
  });

Dadurch werden Objekte beim Eingang protokolliert, wenn der Stream ein Array von Objekten ist. Daher wird jeweils nur ein Objekt gepuffert.

Benutzer1106925
quelle
29

Ab Oktober 2014 können Sie (mit JSONStream) Folgendes tun: https://www.npmjs.org/package/JSONStream

var fs = require('fs'),
    JSONStream = require('JSONStream'),

var getStream() = function () {
    var jsonData = 'myData.json',
        stream = fs.createReadStream(jsonData, { encoding: 'utf8' }),
        parser = JSONStream.parse('*');
    return stream.pipe(parser);
}

getStream().pipe(MyTransformToDoWhateverProcessingAsNeeded).on('error', function (err) {
    // handle any errors
});

So demonstrieren Sie anhand eines Arbeitsbeispiels:

npm install JSONStream event-stream

data.json:

{
  "greeting": "hello world"
}

hallo.js:

var fs = require('fs'),
    JSONStream = require('JSONStream'),
    es = require('event-stream');

var getStream = function () {
    var jsonData = 'data.json',
        stream = fs.createReadStream(jsonData, { encoding: 'utf8' }),
        parser = JSONStream.parse('*');
    return stream.pipe(parser);
};

getStream()
    .pipe(es.mapSync(function (data) {
        console.log(data);
    }));
$ node hello.js
// hello world
arcseldon
quelle
2
Dies ist meistens wahr und nützlich, aber ich denke, Sie müssen es tun parse('*')oder Sie werden keine Daten erhalten.
John Zwinck
@JohnZwinck Vielen Dank, Sie haben die Antwort aktualisiert und ein funktionierendes Beispiel hinzugefügt, um sie vollständig zu demonstrieren.
Arcseldon
Im ersten Codeblock sollte der erste Satz von Klammern var getStream() = function () {entfernt werden.
Givemesnacks
1
Dies schlug mit einem Speicherfehler mit einer 500-MB-JSON-Datei fehl.
Keith John Hutchison
18

Mir ist klar, dass Sie nach Möglichkeit vermeiden möchten, die gesamte JSON-Datei in den Speicher einzulesen. Wenn Sie jedoch über den verfügbaren Speicher verfügen, ist dies in Bezug auf die Leistung möglicherweise keine schlechte Idee. Wenn Sie node.js 'require () für eine json-Datei verwenden, werden die Daten sehr schnell in den Speicher geladen.

Ich habe zwei Tests durchgeführt, um festzustellen, wie die Leistung beim Ausdrucken eines Attributs aus jeder Funktion aus einer 81-MB-Geojson-Datei aussieht.

Im ersten Test habe ich die gesamte Geojson-Datei mit in den Speicher eingelesen var data = require('./geo.json'). Das dauerte 3330 Millisekunden, und das Ausdrucken eines Attributs aus jedem Feature dauerte 804 Millisekunden, was einer Gesamtsumme von 4134 Millisekunden entspricht. Es schien jedoch, dass node.js 411 MB Speicher verwendete.

Im zweiten Test habe ich die Antwort von @ arcseldon mit JSONStream + event-stream verwendet. Ich habe die JSONPath-Abfrage so geändert, dass nur das ausgewählt wird, was ich benötige. Diesmal war der Speicher nie höher als 82 MB, aber das Ganze dauerte jetzt 70 Sekunden!

Evan Siroky
quelle
18

Ich hatte ähnliche Anforderungen, ich muss eine große JSON-Datei in Knoten JS lesen und Daten in Chunks verarbeiten und eine API aufrufen und in Mongodb speichern. inputFile.json ist wie folgt:

{
 "customers":[
       { /*customer data*/},
       { /*customer data*/},
       { /*customer data*/}....
      ]
}

Jetzt habe ich JsonStream und EventStream verwendet, um dies synchron zu erreichen.

var JSONStream = require("JSONStream");
var es = require("event-stream");

fileStream = fs.createReadStream(filePath, { encoding: "utf8" });
fileStream.pipe(JSONStream.parse("customers.*")).pipe(
  es.through(function(data) {
    console.log("printing one customer object read from file ::");
    console.log(data);
    this.pause();
    processOneCustomer(data, this);
    return data;
  }),
  function end() {
    console.log("stream reading ended");
    this.emit("end");
  }
);

function processOneCustomer(data, es) {
  DataModel.save(function(err, dataModel) {
    es.resume();
  });
}
karthick N.
quelle
Vielen Dank, dass Sie Ihre Antwort hinzugefügt haben. Mein Fall musste auch synchron behandelt werden. Nach dem Testen war es mir jedoch nicht möglich, "end ()" als Rückruf aufzurufen, nachdem die Pipe fertig ist. Ich glaube, das einzige, was getan werden könnte, ist das Hinzufügen eines Ereignisses, was passieren sollte, nachdem der Stream mit "fileStream.on" ("close", ...) "beendet / geschlossen wurde.
nonNumericalFloat
6

Ich habe ein Modul namens BFJ geschrieben, das dies kann . Insbesondere kann die Methode bfj.matchverwendet werden, um einen großen Stream in diskrete Teile von JSON aufzuteilen:

const bfj = require('bfj');
const fs = require('fs');

const stream = fs.createReadStream(filePath);

bfj.match(stream, (key, value, depth) => depth === 0, { ndjson: true })
  .on('data', object => {
    // do whatever you need to do with object
  })
  .on('dataError', error => {
    // a syntax error was found in the JSON
  })
  .on('error', error => {
    // some kind of operational error occurred
  })
  .on('end', error => {
    // finished processing the stream
  });

Hier bfj.matchgibt einen lesbaren, Objekt-Modus Strom, den die geparsten Datenelemente erhalten und wird 3 Argumente übergeben:

  1. Ein lesbarer Stream, der den Eingabe-JSON enthält.

  2. Ein Prädikat, das angibt, welche Elemente aus dem analysierten JSON in den Ergebnisstrom verschoben werden.

  3. Ein Optionsobjekt, das angibt, dass es sich bei der Eingabe um ein durch Zeilenumbrüche getrenntes JSON handelt (dies dient zum Verarbeiten von Format B aus der Frage, ist für Format A nicht erforderlich).

Beim bfj.matchAufrufen wird JSON zuerst aus der Tiefe des Eingabestreams analysiert, wobei das Prädikat mit jedem Wert aufgerufen wird, um zu bestimmen, ob dieses Element in den Ergebnisstrom verschoben werden soll oder nicht. Dem Prädikat werden drei Argumente übergeben:

  1. Der Eigenschaftsschlüssel oder Array-Index (dies gilt undefinedfür Elemente der obersten Ebene).

  2. Der Wert selbst.

  3. Die Tiefe des Elements in der JSON-Struktur (Null für Elemente der obersten Ebene).

Natürlich kann je nach Bedarf auch ein komplexeres Prädikat verwendet werden. Sie können anstelle einer Prädikatfunktion auch eine Zeichenfolge oder einen regulären Ausdruck übergeben, wenn Sie einfache Übereinstimmungen mit Eigenschaftsschlüsseln durchführen möchten.

Phil Booth
quelle
4

Ich habe dieses Problem mit dem Split-npm-Modul gelöst . Pipe deinen Stream in Split, und es wird " einen Stream aufbrechen und wieder zusammensetzen, so dass jede Zeile ein Stück ist ".

Beispielcode:

var fs = require('fs')
  , split = require('split')
  ;

var stream = fs.createReadStream(filePath, {flags: 'r', encoding: 'utf-8'});
var lineStream = stream.pipe(split());
linestream.on('data', function(chunk) {
    var json = JSON.parse(chunk);           
    // ...
});
Brian Leathem
quelle
4

Wenn Sie die Kontrolle über die Eingabedatei haben und es sich um ein Array von Objekten handelt, können Sie dies einfacher lösen. Ordnen Sie die Ausgabe der Datei mit jedem Datensatz in einer Zeile folgendermaßen an:

[
   {"key": value},
   {"key": value},
   ...

Dies ist immer noch gültiger JSON.

Verwenden Sie dann das Readline-Modul node.js, um sie zeilenweise zu verarbeiten.

var fs = require("fs");

var lineReader = require('readline').createInterface({
    input: fs.createReadStream("input.txt")
});

lineReader.on('line', function (line) {
    line = line.trim();

    if (line.charAt(line.length-1) === ',') {
        line = line.substr(0, line.length-1);
    }

    if (line.charAt(0) === '{') {
        processRecord(JSON.parse(line));
    }
});

function processRecord(record) {
    // Process the records one at a time here! 
}
Steve Hanov
quelle
-1

Ich denke, Sie müssen eine Datenbank verwenden. MongoDB ist in diesem Fall eine gute Wahl, da es JSON-kompatibel ist.

UPDATE : Mit dem Mongoimport- Tool können Sie JSON-Daten in MongoDB importieren.

mongoimport --collection collection --file collection.json
Vadim Baryshev
quelle
1
Dies beantwortet die Frage nicht. Beachten Sie, dass in der zweiten Zeile der Frage angegeben wird, dass er dies tun möchte, um Daten in eine Datenbank zu übertragen .
Josh3736
mongoimport importiert nur Dateigrößen bis zu 16 MB.
Haziq Ahmed