Analysieren großer Protokolldateien in Node.js - Zeile für Zeile einlesen

125

Ich muss einige große (5-10 GB) Protokolldateien in Javascript / Node.js analysieren (ich verwende Cube).

Die Logline sieht ungefähr so ​​aus:

10:00:43.343423 I'm a friendly log message. There are 5 cats, and 7 dogs. We are in state "SUCCESS".

Wir brauchen jede Zeile zu lesen, einige der Parsing (zB Streifen aus 5, 7und SUCCESS), dann diese Daten in Cube - Pumpe ( https://github.com/square/cube ) ihre JS - Client.

Was ist die kanonische Methode in Node, um eine Datei Zeile für Zeile einzulesen?

Es scheint eine ziemlich häufige Online-Frage zu sein:

Viele der Antworten scheinen auf eine Reihe von Modulen von Drittanbietern zu verweisen:

Dies scheint jedoch eine ziemlich grundlegende Aufgabe zu sein - sicherlich gibt es innerhalb der stdlib eine einfache Möglichkeit, eine Textdatei zeilenweise einzulesen?

Zweitens muss ich dann jede Zeile verarbeiten (z. B. den Zeitstempel in ein Datumsobjekt konvertieren und nützliche Felder extrahieren).

Was ist der beste Weg, um den Durchsatz zu maximieren? Gibt es eine Möglichkeit, die das Lesen in jeder Zeile oder das Senden an Cube nicht blockiert?

Drittens - ich vermute, dass String-Splits verwendet werden und das JS-Äquivalent von enthält (IndexOf! = -1?) Viel schneller ist als reguläre Ausdrücke? Hat jemand viel Erfahrung mit dem Parsen großer Mengen von Textdaten in Node.js?

Prost, Victor

victorhooi
quelle
Ich habe einen Protokollparser im Knoten erstellt, der eine Reihe von Regex-Zeichenfolgen mit integrierten Captures und Ausgaben an JSON verwendet. Sie können sogar Funktionen für jede Aufnahme aufrufen, wenn Sie eine Berechnung durchführen möchten. Es könnte tun, was Sie wollen: npmjs.org/package/logax
Jess

Antworten:

208

Ich suchte nach einer Lösung, um sehr große Dateien (gbs) Zeile für Zeile mithilfe eines Streams zu analysieren. Alle Bibliotheken und Beispiele von Drittanbietern entsprachen nicht meinen Anforderungen, da sie die Dateien nicht zeilenweise (wie 1, 2, 3, 4 ..) verarbeiteten oder die gesamte Datei in den Speicher lasen

Die folgende Lösung kann sehr große Dateien Zeile für Zeile mit Stream & Pipe analysieren. Zum Testen habe ich eine 2,1-GB-Datei mit 17.000.000 Datensätzen verwendet. Die Ram-Nutzung überschritt 60 MB nicht.

Installieren Sie zunächst das Event-Stream- Paket:

npm install event-stream

Dann:

var fs = require('fs')
    , es = require('event-stream');

var lineNr = 0;

var s = fs.createReadStream('very-large-file.csv')
    .pipe(es.split())
    .pipe(es.mapSync(function(line){

        // pause the readstream
        s.pause();

        lineNr += 1;

        // process line here and call s.resume() when rdy
        // function below was for logging memory usage
        logMemoryUsage(lineNr);

        // resume the readstream, possibly from a callback
        s.resume();
    })
    .on('error', function(err){
        console.log('Error while reading file.', err);
    })
    .on('end', function(){
        console.log('Read entire file.')
    })
);

Geben Sie hier die Bildbeschreibung ein

Bitte lassen Sie mich wissen, wie es geht!

Gerard
quelle
6
Zu Ihrer Information, dieser Code ist nicht synchron. Es ist asynchron. Wenn Sie console.log(lineNr)nach der letzten Zeile Ihres Codes einfügen, wird die endgültige Zeilenanzahl nicht angezeigt, da die Datei asynchron gelesen wird.
jfriend00
4
Vielen Dank, dies war die einzige Lösung, die ich finden konnte und die tatsächlich angehalten und wieder aufgenommen wurde, als es beabsichtigt war. Readline nicht.
Brent
3
Tolles Beispiel, und es pausiert tatsächlich. Wenn Sie sich entscheiden, das Lesen der Datei vorzeitig zu beenden, können Sie außerdems.end();
zipzit
2
Lief wie am Schnürchen. Damit wurden 150 Millionen Dokumente in den Elasticsearch-Index indiziert. readlineModul ist ein Schmerz. Es pausiert nicht und verursachte jedes Mal nach 40-50 Millionen einen Fehler. Einen Tag verschwendet. Vielen Dank für die Antwort. Dieser funktioniert perfekt
Mandeep Singh
3
Event-Stream wurde kompromittiert: medium.com/intrinsic/… aber 4+ ist anscheinend sicher blog.npmjs.org/post/180565383195/…
John Vandivier
72

Sie können das integrierte readlinePaket verwenden, siehe Dokumentation hier . Ich benutze Stream , um einen neuen Ausgabestream zu erstellen.

var fs = require('fs'),
    readline = require('readline'),
    stream = require('stream');

var instream = fs.createReadStream('/path/to/file');
var outstream = new stream;
outstream.readable = true;
outstream.writable = true;

var rl = readline.createInterface({
    input: instream,
    output: outstream,
    terminal: false
});

rl.on('line', function(line) {
    console.log(line);
    //Do your stuff ...
    //Then write to outstream
    rl.write(cubestuff);
});

Die Verarbeitung großer Dateien dauert einige Zeit. Sagen Sie, ob es funktioniert.

user568109
quelle
2
Wie geschrieben, schlägt die vorletzte Zeile fehl, da kein Cubestuff definiert ist.
Greg
2
Ist readlinees möglich, den Lesestream anzuhalten / fortzusetzen, um asynchrone Aktionen im Bereich "Sachen erledigen" auszuführen?
Jchook
1
@jchook readlinegab mir viele Probleme, als ich versuchte, Pause / Lebenslauf. Es pausiert den Stream nicht richtig und verursacht eine Menge Probleme, wenn der Downstream-Prozess langsamer ist
Mandeep Singh
31

Die Antwort von @gerard hat mir sehr gut gefallen , was es verdient, hier die richtige Antwort zu sein. Ich habe einige Verbesserungen vorgenommen:

  • Code ist in einer Klasse (modular)
  • Das Parsen ist enthalten
  • Die Möglichkeit zur Wiederaufnahme wird nach außen gegeben, falls ein asynchroner Job mit dem Lesen der CSV wie dem Einfügen in die Datenbank oder einer HTTP-Anforderung verkettet ist
  • Lesen von Chunks / Batche-Größen, die der Benutzer deklarieren kann. Ich habe mich auch um die Codierung im Stream gekümmert, falls Sie Dateien in unterschiedlicher Codierung haben.

Hier ist der Code:

'use strict'

const fs = require('fs'),
    util = require('util'),
    stream = require('stream'),
    es = require('event-stream'),
    parse = require("csv-parse"),
    iconv = require('iconv-lite');

class CSVReader {
  constructor(filename, batchSize, columns) {
    this.reader = fs.createReadStream(filename).pipe(iconv.decodeStream('utf8'))
    this.batchSize = batchSize || 1000
    this.lineNumber = 0
    this.data = []
    this.parseOptions = {delimiter: '\t', columns: true, escape: '/', relax: true}
  }

  read(callback) {
    this.reader
      .pipe(es.split())
      .pipe(es.mapSync(line => {
        ++this.lineNumber

        parse(line, this.parseOptions, (err, d) => {
          this.data.push(d[0])
        })

        if (this.lineNumber % this.batchSize === 0) {
          callback(this.data)
        }
      })
      .on('error', function(){
          console.log('Error while reading file.')
      })
      .on('end', function(){
          console.log('Read entirefile.')
      }))
  }

  continue () {
    this.data = []
    this.reader.resume()
  }
}

module.exports = CSVReader

Im Grunde genommen werden Sie es hier folgendermaßen verwenden:

let reader = CSVReader('path_to_file.csv')
reader.read(() => reader.continue())

Ich habe dies mit einer 35-GB-CSV-Datei getestet und es hat bei mir funktioniert. Deshalb habe ich mich entschieden, es auf der Antwort von @gerard aufzubauen. Rückmeldungen sind willkommen.

ambodi
quelle
Wie lange hat es gedauert?
Z. Khullah
Anscheinend fehlt dieser pause()Anruf, nicht wahr?
Vanuan
Dies ruft auch keine Rückruffunktion am Ende auf. Wenn also batchSize 100 ist, die Dateigröße 150 beträgt, werden nur 100 Elemente verarbeitet. Liege ich falsch?
Vanuan
16

Ich habe https://www.npmjs.com/package/line-by-line verwendet, um mehr als 1 000 000 Zeilen aus einer Textdatei zu lesen. In diesem Fall betrug die belegte RAM-Kapazität etwa 50 bis 60 Megabyte.

    const LineByLineReader = require('line-by-line'),
    lr = new LineByLineReader('big_file.txt');

    lr.on('error', function (err) {
         // 'err' contains error object
    });

    lr.on('line', function (line) {
        // pause emitting of lines...
        lr.pause();

        // ...do your asynchronous line processing..
        setTimeout(function () {
            // ...and continue emitting lines.
            lr.resume();
        }, 100);
    });

    lr.on('end', function () {
         // All lines are read, file is closed now.
    });
Eugene Ilyushin
quelle
'Zeile für Zeile' ist speichereffizienter als die ausgewählte Antwort. Für 1 Million Zeilen in einer CSV hatte die ausgewählte Antwort meinen Knotenprozess in den niedrigen 800 Megabyte. Mit "Zeile für Zeile" war es konsequent in den niedrigen 700er Jahren. Dieses Modul hält auch den Code sauber und leicht zu lesen. Insgesamt muss ich ungefähr 18 Millionen lesen, damit jeder MB zählt!
Neo
Es ist eine Schande, dass anstelle des Standard-Chunks eine eigene Ereigniszeile verwendet wird, was bedeutet, dass Sie Pipe nicht verwenden können.
Rene Wooller
Nach stundenlangem Testen und Suchen ist dies die einzige Lösung, die die lr.cancel()Methode tatsächlich beendet . Liest die ersten 1000 Zeilen einer 5Gig-Datei in 1 ms. Genial!!!!
Perez Lamed van Niekerk
6

Abgesehen davon, dass Sie die große Datei Zeile für Zeile lesen, können Sie sie auch Stück für Stück lesen. Weitere Informationen finden Sie in diesem Artikel

var offset = 0;
var chunkSize = 2048;
var chunkBuffer = new Buffer(chunkSize);
var fp = fs.openSync('filepath', 'r');
var bytesRead = 0;
while(bytesRead = fs.readSync(fp, chunkBuffer, 0, chunkSize, offset)) {
    offset += bytesRead;
    var str = chunkBuffer.slice(0, bytesRead).toString();
    var arr = str.split('\n');

    if(bytesRead = chunkSize) {
        // the last item of the arr may be not a full line, leave it to the next chunk
        offset -= arr.pop().length;
    }
    lines.push(arr);
}
console.log(lines);
Kris Roofe
quelle
Könnte es sein, dass das Folgende ein Vergleich anstelle einer Aufgabe sein sollte : if(bytesRead = chunkSize)?
Stefan Rein
4

Die Node.js-Dokumentation bietet ein sehr elegantes Beispiel für die Verwendung des Readline-Moduls.

Beispiel: Lesen Sie den Dateistream Zeile für Zeile

const fs = require('fs');
const readline = require('readline');

const rl = readline.createInterface({
    input: fs.createReadStream('sample.txt'),
    crlfDelay: Infinity
});

rl.on('line', (line) => {
    console.log(`Line from file: ${line}`);
});

Hinweis: Wir verwenden die Option crlfDelay, um alle Instanzen von CR LF ('\ r \ n') als einzelnen Zeilenumbruch zu erkennen.

Jaime Gómez
quelle
3

Ich hatte noch das gleiche Problem. Nachdem ich einige Module verglichen hatte, die diese Funktion zu haben scheinen, entschied ich mich, es selbst zu tun. Es ist einfacher als ich dachte.

Kern: https://gist.github.com/deemstone/8279565

var fetchBlock = lineByline(filepath, onEnd);
fetchBlock(function(lines, start){ ... });  //lines{array} start{int} lines[0] No.

Es deckt die in einem Abschluss geöffnete Datei ab, die fetchBlock()einen Block aus der Datei abruft und das Ende in ein Array aufteilt (behandelt das Segment vom letzten Abruf).

Ich habe die Blockgröße für jede Leseoperation auf 1024 eingestellt. Dies kann Fehler haben, aber die Codelogik ist offensichtlich. Probieren Sie es selbst aus.

Deemstone
quelle
2

Node-Byline verwendet Streams, daher würde ich diesen für Ihre riesigen Dateien bevorzugen.

Für Ihre Datumskonvertierungen würde ich moment.js verwenden .

Um Ihren Durchsatz zu maximieren, können Sie einen Software-Cluster verwenden. Es gibt einige nette Module, die das knoteneigene Cluster-Modul recht gut umschließen. Ich mag Cluster-Master von Isaacs. Sie könnten beispielsweise einen Cluster von x Workern erstellen, die alle eine Datei berechnen.

Verwenden Sie für das Benchmarking von Splits und Regexen die Referenzbenchmark.js . Ich habe es bis jetzt nicht getestet. Benchmark.js ist als Knotenmodul verfügbar

hereandnow78
quelle
2

Basierend auf der Beantwortung dieser Fragen habe ich eine Klasse implementiert, mit der Sie eine Datei Zeile für Zeile synchron lesen können fs.readSync(). Sie können diese "Pause" und "Wiederaufnahme" machen, indem Sie ein QVersprechen verwenden ( jQueryanscheinend ist ein DOM erforderlich, daher kann es nicht ausgeführt werden nodejs):

var fs = require('fs');
var Q = require('q');

var lr = new LineReader(filenameToLoad);
lr.open();

var promise;
workOnLine = function () {
    var line = lr.readNextLine();
    promise = complexLineTransformation(line).then(
        function() {console.log('ok');workOnLine();},
        function() {console.log('error');}
    );
}
workOnLine();

complexLineTransformation = function (line) {
    var deferred = Q.defer();
    // ... async call goes here, in callback: deferred.resolve('done ok'); or deferred.reject(new Error(error));
    return deferred.promise;
}

function LineReader (filename) {      
  this.moreLinesAvailable = true;
  this.fd = undefined;
  this.bufferSize = 1024*1024;
  this.buffer = new Buffer(this.bufferSize);
  this.leftOver = '';

  this.read = undefined;
  this.idxStart = undefined;
  this.idx = undefined;

  this.lineNumber = 0;

  this._bundleOfLines = [];

  this.open = function() {
    this.fd = fs.openSync(filename, 'r');
  };

  this.readNextLine = function () {
    if (this._bundleOfLines.length === 0) {
      this._readNextBundleOfLines();
    }
    this.lineNumber++;
    var lineToReturn = this._bundleOfLines[0];
    this._bundleOfLines.splice(0, 1); // remove first element (pos, howmany)
    return lineToReturn;
  };

  this.getLineNumber = function() {
    return this.lineNumber;
  };

  this._readNextBundleOfLines = function() {
    var line = "";
    while ((this.read = fs.readSync(this.fd, this.buffer, 0, this.bufferSize, null)) !== 0) { // read next bytes until end of file
      this.leftOver += this.buffer.toString('utf8', 0, this.read); // append to leftOver
      this.idxStart = 0
      while ((this.idx = this.leftOver.indexOf("\n", this.idxStart)) !== -1) { // as long as there is a newline-char in leftOver
        line = this.leftOver.substring(this.idxStart, this.idx);
        this._bundleOfLines.push(line);        
        this.idxStart = this.idx + 1;
      }
      this.leftOver = this.leftOver.substring(this.idxStart);
      if (line !== "") {
        break;
      }
    }
  }; 
}
Benvorth
quelle
0
import * as csv from 'fast-csv';
import * as fs from 'fs';
interface Row {
  [s: string]: string;
}
type RowCallBack = (data: Row, index: number) => object;
export class CSVReader {
  protected file: string;
  protected csvOptions = {
    delimiter: ',',
    headers: true,
    ignoreEmpty: true,
    trim: true
  };
  constructor(file: string, csvOptions = {}) {
    if (!fs.existsSync(file)) {
      throw new Error(`File ${file} not found.`);
    }
    this.file = file;
    this.csvOptions = Object.assign({}, this.csvOptions, csvOptions);
  }
  public read(callback: RowCallBack): Promise < Array < object >> {
    return new Promise < Array < object >> (resolve => {
      const readStream = fs.createReadStream(this.file);
      const results: Array < any > = [];
      let index = 0;
      const csvStream = csv.parse(this.csvOptions).on('data', async (data: Row) => {
        index++;
        results.push(await callback(data, index));
      }).on('error', (err: Error) => {
        console.error(err.message);
        throw err;
      }).on('end', () => {
        resolve(results);
      });
      readStream.pipe(csvStream);
    });
  }
}
import { CSVReader } from '../src/helpers/CSVReader';
(async () => {
  const reader = new CSVReader('./database/migrations/csv/users.csv');
  const users = await reader.read(async data => {
    return {
      username: data.username,
      name: data.name,
      email: data.email,
      cellPhone: data.cell_phone,
      homePhone: data.home_phone,
      roleId: data.role_id,
      description: data.description,
      state: data.state,
    };
  });
  console.log(users);
})();
Raza
quelle
-1

Ich habe ein Knotenmodul erstellt, um große Dateien asynchron Text oder JSON zu lesen. Getestet an großen Dateien.

var fs = require('fs')
, util = require('util')
, stream = require('stream')
, es = require('event-stream');

module.exports = FileReader;

function FileReader(){

}

FileReader.prototype.read = function(pathToFile, callback){
    var returnTxt = '';
    var s = fs.createReadStream(pathToFile)
    .pipe(es.split())
    .pipe(es.mapSync(function(line){

        // pause the readstream
        s.pause();

        //console.log('reading line: '+line);
        returnTxt += line;        

        // resume the readstream, possibly from a callback
        s.resume();
    })
    .on('error', function(){
        console.log('Error while reading file.');
    })
    .on('end', function(){
        console.log('Read entire file.');
        callback(returnTxt);
    })
);
};

FileReader.prototype.readJSON = function(pathToFile, callback){
    try{
        this.read(pathToFile, function(txt){callback(JSON.parse(txt));});
    }
    catch(err){
        throw new Error('json file is not valid! '+err.stack);
    }
};

Speichern Sie die Datei einfach als file-reader.js und verwenden Sie sie wie folgt:

var FileReader = require('./file-reader');
var fileReader = new FileReader();
fileReader.readJSON(__dirname + '/largeFile.json', function(jsonObj){/*callback logic here*/});
Eyal Zoref
quelle
7
Ich sehe aus, als hättest du von Gerards Antwort kopiert. Sie sollten Gerard den Teil gutschreiben, den Sie kopiert haben.
Paul Lynch