Asynchrone Deserialisierung einer Liste mit System.Text.Json

11

Nehmen wir an, ich fordere eine große JSON-Datei an, die eine Liste vieler Objekte enthält. Ich möchte nicht, dass sie alle auf einmal im Gedächtnis bleiben, aber ich würde sie lieber einzeln lesen und verarbeiten. Also muss ich einen asynchronen System.IO.StreamStream in einen verwandeln IAsyncEnumerable<T>. Wie verwende ich dazu die neue System.Text.JsonAPI?

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
    {
        using (var stream = await httpResponse.Content.ReadAsStreamAsync())
        {
            // Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
        }
    }
}
Rick de Water
quelle
1
Sie werden wahrscheinlich so etwas wie DeserializeAsync- Methode benötigen
Pavel Anikhouski
2
Entschuldigung, es scheint, dass die obige Methode den gesamten Stream in den Speicher lädt. Sie können die Daten von Chunks lesen asynchonously mit Utf8JsonReader, bitte einen Blick auf einige Github haben Proben und bei bestehenden Thread als auch
Pavel Anikhouski
GetAsyncvon selbst kehrt zurück, wenn die gesamte Antwort empfangen wird. Sie müssen SendAsyncstattdessen "HttpCompletionOption.ResponseContentRead" verwenden. Sobald Sie das haben, können Sie den JsonTextReader von JSON.NET verwenden . Die Verwendung System.Text.Jsonhierfür ist nicht so einfach, wie dieses Problem zeigt . Die Funktionalität ist nicht verfügbar und die Implementierung in einer Low-Allocation mit Strukturen ist nicht trivial
Panagiotis Kanavos
Das Problem beim Deserialisieren in Blöcken besteht darin, dass Sie wissen müssen, wann Sie einen vollständigen Block zum Deserialisieren haben. Dies wäre in allgemeinen Fällen schwer sauber zu erreichen. Es würde ein vorheriges Parsen erfordern, was in Bezug auf die Leistung ein ziemlich schlechter Kompromiss sein könnte. Es wäre ziemlich schwierig zu verallgemeinern. Wenn Sie jedoch Ihre eigenen Einschränkungen für Ihren JSON erzwingen, z. B. "Ein einzelnes Objekt belegt genau 20 Zeilen in der Datei", können Sie im Wesentlichen asynchron deserialisieren, indem Sie die Datei in asynchronen Blöcken lesen. Ich würde mir vorstellen, dass Sie einen massiven Json brauchen würden, um hier Nutzen zu ziehen.
DetectivePikachu
Sieht aus wie jemand bereits eine beantwortet ähnliche Frage hier mit vollem Code.
Panagiotis Kanavos

Antworten:

4

Ja, ein wirklich Streaming-JSON (De) Serializer wäre an so vielen Orten eine schöne Leistungsverbesserung.

Tut System.Text.Jsondies derzeit leider nicht. Ich bin mir nicht sicher, ob es in Zukunft so sein wird - ich hoffe es! Die Streaming-Deserialisierung von JSON stellt sich als ziemlich herausfordernd heraus.

Sie könnten vielleicht überprüfen, ob der extrem schnelle Utf8Json dies unterstützt.

Möglicherweise gibt es jedoch eine benutzerdefinierte Lösung für Ihre spezifische Situation, da Ihre Anforderungen die Schwierigkeit zu beschränken scheinen.

Die Idee ist, jeweils ein Element manuell aus dem Array zu lesen. Wir nutzen die Tatsache, dass jedes Element in der Liste für sich ein gültiges JSON-Objekt ist.

Sie können das [(für das erste Element) oder das ,(für jedes nächste Element) manuell überspringen . Dann ist es meiner Meinung nach am besten, mit .NET Core Utf8JsonReaderzu bestimmen, wo das aktuelle Objekt endet, und die gescannten Bytes zuzuführen JsonDeserializer.

Auf diese Weise puffern Sie jeweils nur geringfügig über ein Objekt.

Und da es sich um Performance handelt, können Sie den Input von a erhalten PipeReader, während Sie gerade dabei sind. :-)

Timo
quelle
Hier geht es überhaupt nicht um Leistung. Es geht nicht um asynchrone Deserialisierung, was bereits der Fall ist. Es geht um den Streaming-Zugriff - die Verarbeitung von JSON-Elementen, wie sie aus dem Stream analysiert werden, wie es der JsonTextReader von JSON.NET tut.
Panagiotis Kanavos
Die relevante Klasse in Utf8Json ist JsonReader und wie der Autor sagt, ist es seltsam. JONTextReader von JSON.NET und Utf8JsonReader von System.Text.Json haben dieselbe Verrücktheit: Sie müssen den Typ des aktuellen Elements während der Schleife durchlaufen und überprüfen.
Panagiotis Kanavos
@ PanagiotisKanavos Ah, ja, Streaming. Das ist das Wort, nach dem ich gesucht habe! Ich aktualisiere das Wort "asynchron" auf "Streaming". Ich glaube, der Grund für das Streaming ist die Einschränkung der Speichernutzung, was ein Leistungsproblem darstellt. Vielleicht kann OP bestätigen.
Timo
Leistung bedeutet nicht Geschwindigkeit. Unabhängig davon, wie schnell der Deserializer ist, wenn Sie 1 Million Elemente verarbeiten müssen, möchten Sie diese weder im RAM speichern noch warten, bis alle deserialisiert sind, bevor Sie das erste verarbeiten können.
Panagiotis Kanavos
Semantik, mein Freund! Ich bin froh, dass wir doch versuchen, dasselbe zu erreichen.
Timo
4

TL; DR Es ist nicht trivial


Es sieht so aus, als hätte jemand bereits vollständigen Code für eine Utf8JsonStreamReaderStruktur gepostet , die Puffer aus einem Stream liest und sie einem Utf8JsonRreader zuführt, wodurch eine einfache Deserialisierung mit ermöglicht wird JsonSerializer.Deserialize<T>(ref newJsonReader, options);. Der Code ist auch nicht trivial. Die verwandte Frage ist hier und die Antwort ist hier .

Das reicht jedoch nicht aus - HttpClient.GetAsyncwird erst zurückgegeben, nachdem die gesamte Antwort empfangen wurde, und im Wesentlichen alles im Speicher gepuffert.

Um dies zu vermeiden, sollte HttpClient.GetAsync (Zeichenfolge, HttpCompletionOption) mit verwendet werden HttpCompletionOption.ResponseHeadersRead.

Die Deserialisierungsschleife sollte auch das Stornierungs-Token überprüfen und entweder beenden oder werfen, wenn dies signalisiert wird. Andernfalls wird die Schleife fortgesetzt, bis der gesamte Stream empfangen und verarbeitet wurde.

Dieser Code basiert auf dem Beispiel der zugehörigen Antwort und verwendet HttpCompletionOption.ResponseHeadersReadund überprüft das Stornierungs-Token. Es kann JSON-Zeichenfolgen analysieren, die ein geeignetes Array von Elementen enthalten, z.

[{"prop1":123},{"prop1":234}]

Der erste Aufruf von jsonStreamReader.Read()bewegt sich zum Anfang des Arrays, während der zweite zum Anfang des ersten Objekts wechselt. Die Schleife selbst wird beendet, wenn das Ende des Arrays ( ]) erkannt wird.

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    //Don't cache the entire response
    using var httpResponse = await httpClient.GetAsync(url,                               
                                                       HttpCompletionOption.ResponseHeadersRead,  
                                                       cancellationToken);
    using var stream = await httpResponse.Content.ReadAsStreamAsync();
    using var jsonStreamReader = new Utf8JsonStreamReader(stream, 32 * 1024);

    jsonStreamReader.Read(); // move to array start
    jsonStreamReader.Read(); // move to start of the object

    while (jsonStreamReader.TokenType != JsonTokenType.EndArray)
    {
        //Gracefully return if cancellation is requested.
        //Could be cancellationToken.ThrowIfCancellationRequested()
        if(cancellationToken.IsCancellationRequested)
        {
            return;
        }

        // deserialize object
        var obj = jsonStreamReader.Deserialize<T>();
        yield return obj;

        // JsonSerializer.Deserialize ends on last token of the object parsed,
        // move to the first token of next object
        jsonStreamReader.Read();
    }
}

JSON-Fragmente, AKA-Streaming JSON aka ... *

In Ereignis-Streaming- oder Protokollierungsszenarien ist es durchaus üblich, einzelne JSON-Objekte an eine Datei anzuhängen, ein Element pro Zeile, z.

{"eventId":1}
{"eventId":2}
...
{"eventId":1234567}

Dies ist kein gültiges JSON- Dokument, aber die einzelnen Fragmente sind gültig. Dies hat mehrere Vorteile für Big Data- / hochkonkurrierende Szenarien. Das Hinzufügen eines neuen Ereignisses erfordert nur das Anhängen einer neuen Zeile an die Datei, nicht das Parsen und Neuerstellen der gesamten Datei. Die Verarbeitung , insbesondere die Parallelverarbeitung , ist aus zwei Gründen einfacher:

  • Einzelne Elemente können einzeln abgerufen werden, indem einfach eine Zeile aus einem Stream gelesen wird.
  • Die Eingabedatei kann einfach partitioniert und über Liniengrenzen hinweg aufgeteilt werden, wobei jedes Teil einem separaten Arbeitsprozess zugeführt wird, z. B. in einem Hadoop-Cluster, oder einfach verschiedenen Threads in einer Anwendung: Berechnen Sie die Teilungspunkte, z. B. indem Sie die Länge durch die Anzahl der Arbeiter teilen Suchen Sie dann nach der ersten Zeile. Füttere alles bis zu diesem Punkt einem separaten Arbeiter.

Verwenden eines StreamReader

Die allokative Möglichkeit, dies zu tun, besteht darin, einen TextReader zu verwenden, jeweils eine Zeile zu lesen und ihn mit JsonSerializer zu analysieren. Deserialize :

using var reader=new StreamReader(stream);
string line;
//ReadLineAsync() doesn't accept a CancellationToken 
while((line=await reader.ReadLineAsync()) != null)
{
    var item=JsonSerializer.Deserialize<T>(line);
    yield return item;

    if(cancellationToken.IsCancellationRequested)
    {
        return;
    }
}

Das ist viel einfacher als der Code, der ein richtiges Array deserialisiert. Es gibt zwei Probleme:

  • ReadLineAsync akzeptiert kein Stornierungszeichen
  • Jede Iteration weist eine neue Zeichenfolge zu, eines der Dinge, die wir mithilfe von System.Text.Json vermeiden wollten

Dies kann jedoch ausreichen, um zu versuchen, die ReadOnlySpan<Byte>von JsonSerializer benötigten Puffer zu erzeugen. Die Deserialisierung ist nicht trivial.

Pipelines und SequenceReader

Um Zuordnungen zu vermeiden, müssen wir eine ReadOnlySpan<byte>aus dem Stream erhalten. Dazu müssen System.IO.Pipeline-Pipes und die SequenceReader- Struktur verwendet werden. Steve Gordons Eine Einführung in SequenceReader erklärt, wie diese Klasse zum Lesen von Daten aus einem Stream mithilfe von Trennzeichen verwendet werden kann.

Leider SequenceReaderhandelt es sich um eine Ref-Struktur, was bedeutet, dass sie nicht in asynchronen oder lokalen Methoden verwendet werden kann. Deshalb schafft Steve Gordon in seinem Artikel eine

private static SequencePosition ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

Methode zum Lesen von Elementen aus einer ReadOnlySequence und zum Zurückgeben der Endposition, damit der PipeReader von dieser fortfahren kann. Leider möchten wir eine IEnumerable oder IAsyncEnumerable zurückgeben, und Iterator-Methoden mögen inoder outParameter auch nicht.

Wir könnten die deserialisierten Elemente in einer Liste oder Warteschlange sammeln und als einzelnes Ergebnis zurückgeben, aber das würde weiterhin Listen, Puffer oder Knoten zuweisen und müssen warten, bis alle Elemente in einem Puffer deserialisiert sind, bevor wir zurückkehren:

private static (SequencePosition,List<T>) ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

Wir brauchen etwas , das sich wie eine Aufzählung verhält, ohne dass eine Iteratormethode erforderlich ist, mit Async arbeitet und nicht alles so puffert.

Hinzufügen von Kanälen zum Erstellen einer IAsyncEnumerable

ChannelReader.ReadAllAsync gibt eine IAsyncEnumerable zurück. Wir können einen ChannelReader von Methoden zurückgeben, die nicht als Iteratoren funktionieren konnten, und trotzdem einen Stream von Elementen ohne Caching erzeugen.

Wenn wir den Code von Steve Gordon an die Verwendung von Kanälen anpassen, erhalten wir die ReadItems (ChannelWriter ...) und ReadLastItemMethoden. Der erste liest jeweils ein Element bis zu einer neuen Zeile mit ReadOnlySpan<byte> itemBytes. Dies kann von verwendet werden JsonSerializer.Deserialize. Wenn ReadItemsdas Trennzeichen nicht gefunden werden kann, gibt es seine Position zurück, sodass der PipelineReader den nächsten Block aus dem Stream ziehen kann.

Wenn wir den letzten Block erreichen und es kein anderes Trennzeichen gibt, liest ReadLastItem die verbleibenden Bytes und deserialisiert sie.

Der Code ist fast identisch mit dem von Steve Gordon. Anstatt an die Konsole zu schreiben, schreiben wir an den ChannelWriter.

private const byte NL=(byte)'\n';
private const int MaxStackLength = 128;

private static SequencePosition ReadItems<T>(ChannelWriter<T> writer, in ReadOnlySequence<byte> sequence, 
                          bool isCompleted, CancellationToken token)
{
    var reader = new SequenceReader<byte>(sequence);

    while (!reader.End && !token.IsCancellationRequested) // loop until we've read the entire sequence
    {
        if (reader.TryReadTo(out ReadOnlySpan<byte> itemBytes, NL, advancePastDelimiter: true)) // we have an item to handle
        {
            var item=JsonSerializer.Deserialize<T>(itemBytes);
            writer.TryWrite(item);            
        }
        else if (isCompleted) // read last item which has no final delimiter
        {
            var item = ReadLastItem<T>(sequence.Slice(reader.Position));
            writer.TryWrite(item);
            reader.Advance(sequence.Length); // advance reader to the end
        }
        else // no more items in this sequence
        {
            break;
        }
    }

    return reader.Position;
}

private static T ReadLastItem<T>(in ReadOnlySequence<byte> sequence)
{
    var length = (int)sequence.Length;

    if (length < MaxStackLength) // if the item is small enough we'll stack allocate the buffer
    {
        Span<byte> byteBuffer = stackalloc byte[length];
        sequence.CopyTo(byteBuffer);
        var item=JsonSerializer.Deserialize<T>(byteBuffer);
        return item;        
    }
    else // otherwise we'll rent an array to use as the buffer
    {
        var byteBuffer = ArrayPool<byte>.Shared.Rent(length);

        try
        {
            sequence.CopyTo(byteBuffer);
            var item=JsonSerializer.Deserialize<T>(byteBuffer);
            return item;
        }
        finally
        {
            ArrayPool<byte>.Shared.Return(byteBuffer);
        }

    }    
}

Die DeserializeToChannel<T>Methode erstellt einen Pipeline-Reader über dem Stream, erstellt einen Kanal und startet eine Worker-Aufgabe, die Chunks analysiert und an den Kanal weiterleitet:

ChannelReader<T> DeserializeToChannel<T>(Stream stream, CancellationToken token)
{
    var pipeReader = PipeReader.Create(stream);    
    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        while (!token.IsCancellationRequested)
        {
            var result = await pipeReader.ReadAsync(token); // read from the pipe

            var buffer = result.Buffer;

            var position = ReadItems(writer,buffer, result.IsCompleted,token); // read complete items from the current buffer

            if (result.IsCompleted) 
                break; // exit if we've read everything from the pipe

            pipeReader.AdvanceTo(position, buffer.End); //advance our position in the pipe
        }

        pipeReader.Complete(); 
    },token)
    .ContinueWith(t=>{
        pipeReader.Complete();
        writer.TryComplete(t.Exception);
    });

    return channel.Reader;
}

ChannelReader.ReceiveAllAsync()kann verwendet werden, um alle Artikel über Folgendes zu konsumieren IAsyncEnumerable<T>:

var reader=DeserializeToChannel<MyEvent>(stream,cts.Token);
await foreach(var item in reader.ReadAllAsync(cts.Token))
{
    //Do something with it 
}    
Panagiotis Kanavos
quelle
0

Es fühlt sich an, als müssten Sie Ihren eigenen Stream-Reader implementieren. Sie müssen die Bytes einzeln lesen und anhalten, sobald die Objektdefinition abgeschlossen ist. Es ist in der Tat ziemlich niedrig. Als solches werden Sie NICHT die gesamte Datei in den RAM laden, sondern den Teil übernehmen, mit dem Sie sich befassen. Scheint es eine Antwort zu sein?

Sereja Bogolubov
quelle
-2

Vielleicht könnten Sie Newtonsoft.JsonSerializer verwenden? https://www.newtonsoft.com/json/help/html/Performance.htm

Siehe insbesondere Abschnitt:

Optimieren Sie die Speichernutzung

Bearbeiten

Sie können versuchen, Werte aus JsonTextReader zu deserialisieren, z

using (var textReader = new StreamReader(stream))
using (var reader = new JsonTextReader(textReader))
{
    while (await reader.ReadAsync(cancellationToken))
    {
        yield return reader.Value;
    }
}
Miłosz Wieczorek
quelle
Das beantwortet die Frage nicht. Hier geht es überhaupt nicht um Leistung, sondern um Streaming-Zugriff, ohne alles in den Speicher zu laden
Panagiotis Kanavos
Haben Sie den entsprechenden Link geöffnet oder nur gesagt, was Sie denken? In dem Link, den ich in dem Abschnitt gesendet habe, den ich erwähnt habe, gibt es einen Codeausschnitt, wie JSON aus dem Stream deserialisiert wird.
Miłosz Wieczorek
Lesen Sie die Frage bitte noch einmal durch - das OP fragt, wie die Elemente verarbeitet werden sollen, ohne alles im Speicher zu deserialisieren. Nicht nur aus einem Stream lesen, sondern nur verarbeiten, was aus dem Stream kommt. I don't want them to be in memory all at once, but I would rather read and process them one by one.Die relevante Klasse in JSON.NET ist JsonTextReader.
Panagiotis Kanavos
In jedem Fall wird eine Nur-Link-Antwort nicht als gute Antwort angesehen, und nichts in diesem Link beantwortet die Frage des OP. Ein Link zu JsonTextReader wäre besser
Panagiotis Kanavos