Vermeiden von Speicherlecks mit Scalaz 7 zipWithIndex / group enumeratees

106

Hintergrund

Wie in dieser Frage erwähnt , verwende ich Scalaz 7-Iterate, um einen großen (dh unbegrenzten) Datenstrom in einem konstanten Heap-Raum zu verarbeiten.

Mein Code sieht folgendermaßen aus:

type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]

def processChunk(c: Chunk, idx: Long): Result

def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
  Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
    rs ++ vs map { 
      case (c, i) => processChunk(c, i) 
    }
  } &= (data.zipWithIndex mapE Iteratee.group(P))

Das Problem

Ich bin anscheinend auf einen Speicherverlust gestoßen, bin aber mit Scalaz / FP nicht vertraut genug, um zu wissen, ob der Fehler in Scalaz oder in meinem Code vorliegt. Intuitiv erwarte ich, dass dieser Code nur (in der Größenordnung von) das P- fache des ChunkSpeicherplatzes benötigt.

Hinweis: Ich habe eine ähnliche Frage gefunden, bei der eine OutOfMemoryErroraufgetreten ist, aber mein Code wird nicht verwendet consume.

Testen

Ich habe einige Tests durchgeführt, um das Problem einzugrenzen. Zusammenfassend scheint das Leck nur dann aufzutreten, wenn beide zipWithIndexund groupverwendet werden.

// no zipping/grouping
scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO
res47: Long = 4294967296

// grouping only
scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO
res49: Long = 4294967296

// zipping and grouping
scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO
java.lang.OutOfMemoryError: Java heap space

// zipping only
scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO
res51: Long = 4294967296

// no zipping/grouping, larger arrays
scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO
res53: Long = 17179869184

// zipping only, larger arrays
scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO
res54: Long = 17179869184

Code für die Tests:

import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._

// define an enumerator that produces a stream of new, zero-filled arrays
def enumArrs(sz: Int, n: Int) = 
  Iteratee.enumIterator[Array[Int], IO](
    Iterator.continually(Array.fill(sz)(0)).take(n))

// define an iteratee that consumes a stream of arrays 
// and computes its length
val i1 = Iteratee.fold[Array[Int], IO, Long](0) { 
  (c, a) => c + a.length 
}

// define an iteratee that consumes a grouped stream of arrays 
// and computes its length
val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) { 
  (c, as) => c + as.map(_.length).sum 
}

// define an iteratee that consumes a grouped/zipped stream of arrays
// and computes its length
val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) {
  (c, vs) => c + vs.map(_._1.length).sum
}

// define an iteratee that consumes a zipped stream of arrays
// and computes its length
val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) {
  (c, v) => c + v._1.length
}

Fragen

  • Ist der Fehler in meinem Code?
  • Wie kann ich diese Funktion in einem konstanten Heap-Bereich ausführen?
Aaron Novstrup
quelle
6
Am Ende habe ich dies als Problem in Scalaz gemeldet .
Aaron Novstrup
1
Es wird keinen Spaß machen, aber Sie können versuchen -XX:+HeapDumpOnOutOfMemoryError, den Speicherauszug mit eclipse MAT eclipse.org/mat zu analysieren, um festzustellen , welche Codezeile an den Arrays festhält .
Huynhjl
10
@huynhjl FWIW, ich habe versucht, den Heap sowohl mit JProfiler als auch mit MAT zu analysieren, konnte aber nicht alle Verweise auf anonyme Funktionsklassen usw. durchblättern. Scala benötigt wirklich spezielle Tools für diese Art von Dingen.
Aaron Novstrup
Was ist, wenn es kein Leck gibt und nur das, was Sie tun, eine wild wachsende Menge an Speicher benötigt? Sie können den zipWithIndex problemlos ohne dieses bestimmte FP-Konstrukt replizieren, indem varSie unterwegs nur einen Zähler verwalten.
Hesekiel Victor
@ EzekielVictor Ich bin nicht sicher, ob ich den Kommentar verstehe. Sie schlagen vor, dass das Hinzufügen eines einzelnen LongIndex pro Block den Algorithmus von konstantem zu nicht konstantem Heap-Speicherplatz ändern würde? Die Nicht-Zipping-Version verwendet eindeutig konstanten Heap-Speicherplatz, da sie so viele Chunks "verarbeiten" kann, wie Sie warten möchten.
Aaron Novstrup

Antworten:

4

Dies wird für jeden, der an der älteren iterateeAPI festhält , kein Trost sein , aber ich habe kürzlich überprüft, dass ein gleichwertiger Test gegen die Scalaz-Stream-API besteht . Dies ist eine neuere Stream-Verarbeitungs-API, die ersetzt werden soll iteratee.

Der Vollständigkeit halber hier der Testcode:

// create a stream containing `n` arrays with `sz` Ints in each one
def streamArrs(sz: Int, n: Int): Process[Task, Array[Int]] =
  (Process emit Array.fill(sz)(0)).repeat take n

(streamArrs(1 << 25, 1 << 14).zipWithIndex 
      pipe process1.chunk(4) 
      pipe process1.fold(0L) {
    (c, vs) => c + vs.map(_._1.length.toLong).sum
  }).runLast.run

Dies sollte mit jedem Wert für den nParameter funktionieren (vorausgesetzt, Sie sind bereit, lange genug zu warten) - Ich habe mit 2 ^ 14 32-MB-Arrays getestet (dh insgesamt einem halben TiB Speicher, der im Laufe der Zeit zugewiesen wurde).

Aaron Novstrup
quelle