Logstash-Analyse von XML-Dokumenten mit mehreren Protokolleinträgen

8

Ich prüfe derzeit, ob Logstash und Elasticsearch für unseren Anwendungsfall nützlich sind. Was ich habe, ist eine Protokolldatei mit mehreren Einträgen, die vom Formular ist

<root>
    <entry>
        <fieldx>...</fieldx>
        <fieldy>...</fieldy>
        <fieldz>...</fieldz>
        ...
        <fieldarray>
            <fielda>...</fielda>
            <fielda>...</fielda>
            ...
        </fieldarray>
    </entry>
    <entry>
    ...
    </entry>
    ...
<root>

Jedes entryElement würde ein Protokollereignis enthalten. (Wenn Sie interessiert sind, handelt es sich bei der Datei tatsächlich um einen Arbeitsprotokollexport für Tempo-Arbeitszeittabellen (ein Atlassian JIRA-Plug-in).)

Ist es möglich, eine solche Datei in mehrere Protokollereignisse umzuwandeln, ohne meinen eigenen Codec zu schreiben?

verdoppelt
quelle

Antworten:

11

Okay, ich habe eine Lösung gefunden, die für mich funktioniert. Das größte Problem bei der Lösung ist, dass das XML-Plugin ... nicht ganz instabil ist, aber entweder schlecht dokumentiert und fehlerhaft oder schlecht und falsch dokumentiert.

TLDR

Bash-Befehlszeile:

gzcat -d file.xml.gz | tr -d "\n\r" | xmllint --format - | logstash -f logstash-csv.conf

Logstash-Konfiguration:

input {
    stdin {}
}

filter {
    # add all lines that have more indentation than double-space to the previous line
    multiline {
        pattern => "^\s\s(\s\s|\<\/entry\>)"
        what => previous
    }
    # multiline filter adds the tag "multiline" only to lines spanning multiple lines
    # We _only_ want those here.
    if "multiline" in [tags] {
        # Add the encoding line here. Could in theory extract this from the
        # first line with a clever filter. Not worth the effort at the moment.
        mutate {
            replace => ["message",'<?xml version="1.0" encoding="UTF-8" ?>%{message}']
        }
        # This filter exports the hierarchy into the field "entry". This will
        # create a very deep structure that elasticsearch does not really like.
        # Which is why I used add_field to flatten it.
        xml {
            target => entry
            source => message
            add_field => {
                fieldx         => "%{[entry][fieldx]}"
                fieldy         => "%{[entry][fieldy]}"
                fieldz         => "%{[entry][fieldz]}"
                # With deeper nested fields, the xml converter actually creates
                # an array containing hashes, which is why you need the [0]
                # -- took me ages to find out.
                fielda         => "%{[entry][fieldarray][0][fielda]}"
                fieldb         => "%{[entry][fieldarray][0][fieldb]}"
                fieldc         => "%{[entry][fieldarray][0][fieldc]}"
            }
        }
        # Remove the intermediate fields before output. "message" contains the
        # original message (XML). You may or may-not want to keep that.
        mutate {
            remove_field => ["message"]
            remove_field => ["entry"]
        }
    }
}

output {
    ...
}

Detailliert

Meine Lösung funktioniert, weil entrymeine XML-Eingabe zumindest bis zum Level sehr einheitlich ist und daher durch eine Art Mustervergleich verarbeitet werden kann.

Da der Export im Grunde eine wirklich lange XML-Zeile ist und das Logstash-XML-Plugin im Wesentlichen nur mit Feldern (gelesen: Spalten in Zeilen) funktioniert, die XML-Daten enthalten, musste ich die Daten in ein nützlicheres Format ändern.

Shell: Vorbereiten der Datei

  • gzcat -d file.xml.gz |: War einfach zu viel Daten - das kann man natürlich überspringen
  • tr -d "\n\r" |: Zeilenumbrüche in XML-Elementen entfernen: Einige Elemente können Zeilenumbrüche als Zeichendaten enthalten. Der nächste Schritt erfordert, dass diese entfernt oder auf irgendeine Weise codiert werden. Obwohl davon ausgegangen wird, dass zu diesem Zeitpunkt der gesamte XML-Code in einer massiven Zeile enthalten ist, spielt es keine Rolle, ob dieser Befehl Leerzeichen zwischen Elementen entfernt

  • xmllint --format - |: Formatieren Sie das XML mit xmllint (wird mit libxml geliefert)

    Hier ist die einzelne riesige Spaghetti-Zeile von XML ( <root><entry><fieldx>...</fieldx></entry></root>) richtig formatiert:

    <root>
      <entry>
        <fieldx>...</fieldx>
        <fieldy>...</fieldy>
        <fieldz>...</fieldz>
        <fieldarray>
          <fielda>...</fielda>
          <fieldb>...</fieldb>
          ...
        </fieldarray>
      </entry>
      <entry>
        ...
      </entry>
      ...
    </root>
    

Logstash

logstash -f logstash-csv.conf

(Siehe den vollständigen Inhalt der .confDatei im Abschnitt TL; DR.)

Hier macht der multilineFilter den Trick. Es können mehrere Zeilen zu einer einzigen Protokollnachricht zusammengeführt werden. Und deshalb war die Formatierung mit xmllintnotwendig:

filter {
    # add all lines that have more indentation than double-space to the previous line
    multiline {
        pattern => "^\s\s(\s\s|\<\/entry\>)"
        what => previous
    }
}

Dies bedeutet im Grunde, dass jede Zeile mit einem Einzug, der mehr als zwei Leerzeichen enthält (oder </entry>/ xmllint standardmäßig einen Einzug mit zwei Leerzeichen ausführt), zu einer vorherigen Zeile gehört. Dies bedeutet auch, dass Zeichendaten keine Zeilenumbrüche enthalten dürfen ( trin der Shell entfernt) und dass die XML- Datei normalisiert werden muss (xmllint).

verdoppelt
quelle
Hallo, hast du es geschafft, dass das funktioniert? Ich bin neugierig, da ich einen ähnlichen Bedarf habe und die mehrzeilige Lösung zusammen mit der Aufteilung bei mir nicht funktioniert hat. Vielen Dank für Ihr Feedback
nämlich
@viz Das hat funktioniert, aber wir haben es nie in der Produktion verwendet. Multiline funktioniert nur, wenn Sie eine sehr reguläre XML-Struktur haben und diese zuerst mit Einrückung formatiert haben (siehe Antwort, Abschnitt "Vorbereitung der Datei")
dualed
1

Ich hatte einen ähnlichen Fall. So analysieren Sie diese XML:

<ROOT number="34">
  <EVENTLIST>
    <EVENT name="hey"/>
    <EVENT name="you"/>
  </EVENTLIST>
</ROOT>

Ich verwende diese Konfiguration, um Folgendes zu protokollieren:

input {
  file {
    path => "/path/events.xml"
    start_position => "beginning"
    sincedb_path => "/dev/null"
    codec => multiline {
      pattern => "<ROOT"
      negate => "true"
      what => "previous"
      auto_flush_interval => 1
    }
  }
}
filter {
  xml {
    source => "message"
    target => "xml_content"
  }
  split {
    field => "xml_content[EVENTLIST]"
  }
  split {
    field => "xml_content[EVENTLIST][EVENT]"
  }
  mutate {
    add_field => { "number" => "%{xml_content[number]}" }
    add_field => { "name" => "%{xml_content[EVENTLIST][EVENT][name]}" }
    remove_field => ['xml_content', 'message', 'path']
  }
}
output {
  stdout {
    codec => rubydebug
  }
}

Ich hoffe das kann jemandem helfen. Ich habe lange gebraucht, um es zu bekommen.

Drinor
quelle