Hintergrund
Wie in dieser Frage erwähnt , verwende ich Scalaz 7-Iterate, um einen großen (dh unbegrenzten) Datenstrom in einem konstanten Heap-Raum zu verarbeiten.
Mein Code sieht folgendermaßen aus:
type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]
def processChunk(c: Chunk, idx: Long): Result
def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
rs ++ vs map {
case (c, i) => processChunk(c, i)
}
} &= (data.zipWithIndex mapE Iteratee.group(P))
Das Problem
Ich bin anscheinend auf einen Speicherverlust gestoßen, bin aber mit Scalaz / FP nicht vertraut genug, um zu wissen, ob der Fehler in Scalaz oder in meinem Code vorliegt. Intuitiv erwarte ich, dass dieser Code nur (in der Größenordnung von) das P- fache des Chunk
Speicherplatzes benötigt.
Hinweis: Ich habe eine ähnliche Frage gefunden, bei der eine OutOfMemoryError
aufgetreten ist, aber mein Code wird nicht verwendet consume
.
Testen
Ich habe einige Tests durchgeführt, um das Problem einzugrenzen. Zusammenfassend scheint das Leck nur dann aufzutreten, wenn beide zipWithIndex
und group
verwendet werden.
// no zipping/grouping
scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO
res47: Long = 4294967296
// grouping only
scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO
res49: Long = 4294967296
// zipping and grouping
scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO
java.lang.OutOfMemoryError: Java heap space
// zipping only
scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO
res51: Long = 4294967296
// no zipping/grouping, larger arrays
scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO
res53: Long = 17179869184
// zipping only, larger arrays
scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO
res54: Long = 17179869184
Code für die Tests:
import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._
// define an enumerator that produces a stream of new, zero-filled arrays
def enumArrs(sz: Int, n: Int) =
Iteratee.enumIterator[Array[Int], IO](
Iterator.continually(Array.fill(sz)(0)).take(n))
// define an iteratee that consumes a stream of arrays
// and computes its length
val i1 = Iteratee.fold[Array[Int], IO, Long](0) {
(c, a) => c + a.length
}
// define an iteratee that consumes a grouped stream of arrays
// and computes its length
val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) {
(c, as) => c + as.map(_.length).sum
}
// define an iteratee that consumes a grouped/zipped stream of arrays
// and computes its length
val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) {
(c, vs) => c + vs.map(_._1.length).sum
}
// define an iteratee that consumes a zipped stream of arrays
// and computes its length
val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) {
(c, v) => c + v._1.length
}
Fragen
- Ist der Fehler in meinem Code?
- Wie kann ich diese Funktion in einem konstanten Heap-Bereich ausführen?
-XX:+HeapDumpOnOutOfMemoryError
, den Speicherauszug mit eclipse MAT eclipse.org/mat zu analysieren, um festzustellen , welche Codezeile an den Arrays festhält .var
Sie unterwegs nur einen Zähler verwalten.Long
Index pro Block den Algorithmus von konstantem zu nicht konstantem Heap-Speicherplatz ändern würde? Die Nicht-Zipping-Version verwendet eindeutig konstanten Heap-Speicherplatz, da sie so viele Chunks "verarbeiten" kann, wie Sie warten möchten.Antworten:
Dies wird für jeden, der an der älteren
iteratee
API festhält , kein Trost sein , aber ich habe kürzlich überprüft, dass ein gleichwertiger Test gegen die Scalaz-Stream-API besteht . Dies ist eine neuere Stream-Verarbeitungs-API, die ersetzt werden solliteratee
.Der Vollständigkeit halber hier der Testcode:
Dies sollte mit jedem Wert für den
n
Parameter funktionieren (vorausgesetzt, Sie sind bereit, lange genug zu warten) - Ich habe mit 2 ^ 14 32-MB-Arrays getestet (dh insgesamt einem halben TiB Speicher, der im Laufe der Zeit zugewiesen wurde).quelle