Ich habe mich gefragt, ob es eine Parallel.For- Entsprechung zur .net-Version für Java gibt.
Wenn es jemanden gibt, geben Sie bitte ein Beispiel an. Vielen Dank!
java
parallel-processing
Jamie
quelle
quelle
Parallel.for
in keiner der Auflistungen finden ... @Emil thks, ich denke ich bekomme es jetztjava.util.stream.Stream
, auf die normalerweise zugegriffen wirdCollection.parallelStream()
. Siehe stream-api .Antworten:
Ich denke, das Nächste wäre:
ExecutorService exec = Executors.newFixedThreadPool(SOME_NUM_OF_THREADS); try { for (final Object o : list) { exec.submit(new Runnable() { @Override public void run() { // do stuff with o. } }); } } finally { exec.shutdown(); }
Basierend auf den Kommentaren von TheLQ würden Sie SUM_NUM_THREADS auf setzen
Runtime.getRuntime().availableProcessors();
Bearbeiten: Beschlossen, eine grundlegende "Parallel.For" -Implementierung hinzuzufügen
public class Parallel { private static final int NUM_CORES = Runtime.getRuntime().availableProcessors(); private static final ExecutorService forPool = Executors.newFixedThreadPool(NUM_CORES * 2, new NamedThreadFactory("Parallel.For")); public static <T> void For(final Iterable<T> elements, final Operation<T> operation) { try { // invokeAll blocks for us until all submitted tasks in the call complete forPool.invokeAll(createCallables(elements, operation)); } catch (InterruptedException e) { e.printStackTrace(); } } public static <T> Collection<Callable<Void>> createCallables(final Iterable<T> elements, final Operation<T> operation) { List<Callable<Void>> callables = new LinkedList<Callable<Void>>(); for (final T elem : elements) { callables.add(new Callable<Void>() { @Override public Void call() { operation.perform(elem); return null; } }); } return callables; } public static interface Operation<T> { public void perform(T pParameter); } }
Beispiel für die Verwendung von Parallel.For
// Collection of items to process in parallel Collection<Integer> elems = new LinkedList<Integer>(); for (int i = 0; i < 40; ++i) { elems.add(i); } Parallel.For(elems, // The operation to perform with each item new Parallel.Operation<Integer>() { public void perform(Integer param) { System.out.println(param); }; });
Ich denke, diese Implementierung ist Parallel.ForEach wirklich ähnlicher
Bearbeiten Ich habe dies auf GitHub eingestellt, wenn jemand interessiert ist. Parallel für auf GitHub
quelle
Die Lösung von MLaw ist eine sehr praktische Parallele. Ich habe eine kleine Modifikation hinzugefügt, um eine Parallel.For zu erstellen.
public class Parallel { static final int iCPU = Runtime.getRuntime().availableProcessors(); public static <T> void ForEach(Iterable <T> parameters, final LoopBody<T> loopBody) { ExecutorService executor = Executors.newFixedThreadPool(iCPU); List<Future<?>> futures = new LinkedList<Future<?>>(); for (final T param : parameters) { Future<?> future = executor.submit(new Runnable() { public void run() { loopBody.run(param); } }); futures.add(future); } for (Future<?> f : futures) { try { f.get(); } catch (InterruptedException e) { } catch (ExecutionException e) { } } executor.shutdown(); } public static void For(int start, int stop, final LoopBody<Integer> loopBody) { ExecutorService executor = Executors.newFixedThreadPool(iCPU); List<Future<?>> futures = new LinkedList<Future<?>>(); for (int i=start; i<stop; i++) { final Integer k = i; Future<?> future = executor.submit(new Runnable() { public void run() { loopBody.run(k); } }); futures.add(future); } for (Future<?> f : futures) { try { f.get(); } catch (InterruptedException e) { } catch (ExecutionException e) { } } executor.shutdown(); } } public interface LoopBody <T> { void run(T i); } public class ParallelTest { int k; public ParallelTest() { k = 0; Parallel.For(0, 10, new LoopBody <Integer>() { public void run(Integer i) { k += i; System.out.println(i); } }); System.out.println("Sum = "+ k); } public static void main(String [] argv) { ParallelTest test = new ParallelTest(); } }
quelle
Fügen Sie CountDownLatch hinzu, basierend auf dem Vorschlag von mlaw. Fügen Sie Chunksize hinzu, um submit () zu reduzieren.
Beim Testen mit einem Array mit 4 Millionen Elementen bietet dieses Array eine 5-fache Geschwindigkeit gegenüber sequentiell für () auf meiner Core i7 2630QM-CPU.
public class Loop { public interface Each { void run(int i); } private static final int CPUs = Runtime.getRuntime().availableProcessors(); public static void withIndex(int start, int stop, final Each body) { int chunksize = (stop - start + CPUs - 1) / CPUs; int loops = (stop - start + chunksize - 1) / chunksize; ExecutorService executor = Executors.newFixedThreadPool(CPUs); final CountDownLatch latch = new CountDownLatch(loops); for (int i=start; i<stop;) { final int lo = i; i += chunksize; final int hi = (i<stop) ? i : stop; executor.submit(new Runnable() { public void run() { for (int i=lo; i<hi; i++) body.run(i); latch.countDown(); } }); } try { latch.await(); } catch (InterruptedException e) {} executor.shutdown(); } public static void main(String [] argv) { Loop.withIndex(0, 9, new Loop.Each() { public void run(int i) { System.out.println(i*10); } }); } }
quelle
Hier ist mein Beitrag zu diesem Thema https://github.com/pablormier/parallel-loops . Die Verwendung ist sehr einfach:
Collection<String> upperCaseWords = Parallel.ForEach(words, new Parallel.F<String, String>() { public String apply(String s) { return s.toUpperCase(); } });
Es ist auch möglich, einige Verhaltensaspekte zu ändern, z. B. die Anzahl der Threads (standardmäßig wird ein zwischengespeicherter Thread-Pool verwendet):
Collection<String> upperCaseWords = new Parallel.ForEach<String, String>(words) .withFixedThreads(4) .apply(new Parallel.F<String, String>() { public String apply(String s) { return s.toUpperCase(); } }).values();
Der gesamte Code ist in einer Java-Klasse enthalten und weist nicht mehr Abhängigkeiten auf als das JDK. Ich empfehle Ihnen außerdem, die neue Methode zur funktionalen Parallelisierung mit Java 8 zu prüfen
quelle
Das Fork Join Framework in Java 7 dient der Unterstützung der Parallelität. Aber ich weiß nicht über ein genaues Äquivalent für
Parallel.For
.quelle
Eine einfachere Option wäre
// A thread pool which runs for the life of the application. private static final ExecutorService EXEC = Executors.newFixedThreadPool(SOME_NUM_OF_THREADS); //later EXEC.invokeAll(tasks); // you can optionally specify a timeout.
quelle
Parallel.For ist als Java-Erweiterung verfügbar. Es heißt Ateji PX, sie haben eine kostenlose Version, mit der Sie spielen können. http://www.ateji.com/px/index.html
Es ist das genaue Äquivalent von parallel.for und sieht ähnlich aus wie.
For ||
Weitere Beispiele und Erklärungen auf Wikipedia: http://en.wikipedia.org/wiki/Ateji_PX
Geschlossene Sache in Java IMO
quelle
Durch die Synchronisierung wird häufig die Beschleunigung paralleler for-Schleifen beendet. Daher benötigen parallele for-Schleifen häufig ihre privaten Daten und einen Reduktionsmechanismus, um alle privaten Thread-Daten so zu reduzieren, dass sie ein einziges Ergebnis enthalten.
Also habe ich die Parallel.For-Version
Weimin Xiao
um einen Reduktionsmechanismus erweitert.public class Parallel { public static interface IntLoopBody { void run(int i); } public static interface LoopBody<T> { void run(T i); } public static interface RedDataCreator<T> { T run(); } public static interface RedLoopBody<T> { void run(int i, T data); } public static interface Reducer<T> { void run(T returnData, T addData); } private static class ReductionData<T> { Future<?> future; T data; } static final int nCPU = Runtime.getRuntime().availableProcessors(); public static <T> void ForEach(Iterable <T> parameters, final LoopBody<T> loopBody) { ExecutorService executor = Executors.newFixedThreadPool(nCPU); List<Future<?>> futures = new LinkedList<Future<?>>(); for (final T param : parameters) { futures.add(executor.submit(() -> loopBody.run(param) )); } for (Future<?> f : futures) { try { f.get(); } catch (InterruptedException | ExecutionException e) { System.out.println(e); } } executor.shutdown(); } public static void For(int start, int stop, final IntLoopBody loopBody) { final int chunkSize = (stop - start + nCPU - 1)/nCPU; final int loops = (stop - start + chunkSize - 1)/chunkSize; ExecutorService executor = Executors.newFixedThreadPool(loops); List<Future<?>> futures = new LinkedList<Future<?>>(); for (int i=start; i < stop; ) { final int iStart = i; i += chunkSize; final int iStop = (i < stop) ? i : stop; futures.add(executor.submit(() -> { for (int j = iStart; j < iStop; j++) loopBody.run(j); })); } for (Future<?> f : futures) { try { f.get(); } catch (InterruptedException | ExecutionException e) { System.out.println(e); } } executor.shutdown(); } public static <T> void For(int start, int stop, T result, final RedDataCreator<T> creator, final RedLoopBody<T> loopBody, final Reducer<T> reducer) { final int chunkSize = (stop - start + nCPU - 1)/nCPU; final int loops = (stop - start + chunkSize - 1)/chunkSize; ExecutorService executor = Executors.newFixedThreadPool(loops); List<ReductionData<T>> redData = new LinkedList<ReductionData<T>>(); for (int i = start; i < stop; ) { final int iStart = i; i += chunkSize; final int iStop = (i < stop) ? i : stop; final ReductionData<T> rd = new ReductionData<T>(); rd.data = creator.run(); rd.future = executor.submit(() -> { for (int j = iStart; j < iStop; j++) { loopBody.run(j, rd.data); } }); redData.add(rd); } for (ReductionData<T> rd : redData) { try { rd.future.get(); if (rd.data != null) { reducer.run(result, rd.data); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } executor.shutdown(); } }
Hier ist ein einfaches Testbeispiel: Ein paralleler Zeichenzähler, der eine nicht synchronisierte Zuordnung verwendet.
import java.util.*; public class ParallelTest { static class Counter { int cnt; Counter() { cnt = 1; } } public static void main(String[] args) { String text = "More formally, if this map contains a mapping from a key k to a " + "value v such that key compares equal to k according to the map's ordering, then " + "this method returns v; otherwise it returns null."; Map<Character, Counter> charCounter1 = new TreeMap<Character, Counter>(); Map<Character, Counter> charCounter2 = new TreeMap<Character, Counter>(); // first sequentially for(int i=0; i < text.length(); i++) { char c = text.charAt(i); Counter cnt = charCounter1.get(c); if (cnt == null) { charCounter1.put(c, new Counter()); } else { cnt.cnt++; } } for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) { System.out.println(entry.getKey() + ": " + entry.getValue().cnt); } // now parallel without synchronization Parallel.For(0, text.length(), charCounter2, // Creator () -> new TreeMap<Character, Counter>(), // Loop Body (i, map) -> { char c = text.charAt(i); Counter cnt = map.get(c); if (cnt == null) { map.put(c, new Counter()); } else { cnt.cnt++; } }, // Reducer (result, map) -> { for(Map.Entry<Character, Counter> entry: map.entrySet()) { Counter cntR = result.get(entry.getKey()); if (cntR == null) { result.put(entry.getKey(), entry.getValue()); } else { cntR.cnt += entry.getValue().cnt; } } } ); // compare results assert charCounter1.size() == charCounter2.size() : "wrong size: " + charCounter1.size() + ", " + charCounter2.size(); Iterator<Map.Entry<Character, Counter>> it2 = charCounter2.entrySet().iterator(); for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) { Map.Entry<Character, Counter> entry2 = it2.next(); assert entry.getKey() == entry2.getKey() && entry.getValue().cnt == entry2.getValue().cnt : "wrong content"; } System.out.println("Well done!"); } }
quelle
charCounter1
anstellemap
Ihres letzten Reduzierungsschritts.entry.getValue().cnt == entry2.getValue().cnt
:)Ich habe eine aktualisierte Java Parallel-Klasse, die Parallel.For, Parallel.ForEach, Parallel.Tasks und partitionierte parallele Schleifen ausführen kann. Der Quellcode lautet wie folgt:
Beispiele für die Verwendung dieser parallelen Schleifen sind die folgenden:
public static void main(String [] argv) { //sample data final ArrayList<String> ss = new ArrayList<String>(); String [] s = {"a", "b", "c", "d", "e", "f", "g"}; for (String z : s) ss.add(z); int m = ss.size(); //parallel-for loop System.out.println("Parallel.For loop:"); Parallel.For(0, m, new LoopBody<Integer>() { public void run(Integer i) { System.out.println(i +"\t"+ ss.get(i)); } }); //parallel for-each loop System.out.println("Parallel.ForEach loop:"); Parallel.ForEach(ss, new LoopBody<String>() { public void run(String p) { System.out.println(p); } }); //partitioned parallel loop System.out.println("Partitioned Parallel loop:"); Parallel.ForEach(Parallel.create(0, m), new LoopBody<Partition>() { public void run(Partition p) { for(int i=p.start; i<p.end; i++) System.out.println(i +"\t"+ ss.get(i)); } }); //parallel tasks System.out.println("Parallel Tasks:"); Parallel.Tasks(new Task [] { //task-1 new Task() {public void run() { for(int i=0; i<3; i++) System.out.println(i +"\t"+ ss.get(i)); }}, //task-2 new Task() {public void run() { for (int i=3; i<6; i++) System.out.println(i +"\t"+ ss.get(i)); }} }); }
quelle
Ich fand ForkJoinPool und IntStream in meinem Fall sehr hilfreich (Parallel For mit einer begrenzten Anzahl von Threads).
C #:
static void MathParallel(int threads) { Parallel.For(1, partitions, new ParallelOptions { MaxDegreeOfParallelism = threads }, (i) => { partitionScores[i] = Math.Sin(3*i); }); }
und Java-Äquivalent:
static void mathParallel(int threads) { ForkJoinPool pool = new ForkJoinPool(threads); pool.submit(()-> IntStream.range(0, partitions).parallel().forEach(i -> { partitionScores[i] = Math.sin(3*i); })); pool.shutdown(); while (!pool.isTerminated()){ } }
quelle
Dies ist, was ich für Java 7 und weniger verwende.
Für Java 8 können Sie forEach () verwenden
[UPDATE]
Parallelklasse:
private static final int NUM_CORES = Runtime.getRuntime().availableProcessors(); private static final int MAX_THREAD = NUM_CORES*2; public static <T2 extends T, T> void For(final Iterable<T2> elements, final Operation<T> operation) { if (elements != null) { final Iterator<T2> iterator = elements.iterator(); if (iterator.hasNext()) { final Throwable[] throwable = new Throwable[1]; final Callable<Void> callable = new Callable<Void>() { boolean first = true; @Override public final Void call() throws Exception { if ((first || operation.follow()) && iterator.hasNext()) { T result; result = iterator.next(); operation.perform(result); if (first) { synchronized (this) { first = false; } } } return null; } }; final Runnable runnable = new Runnable() { @Override public final void run() { while (iterator.hasNext()) { try { synchronized (callable) { callable.call(); } if (!operation.follow()) { break; } } catch (Throwable t) { t.printStackTrace(); synchronized (throwable) { throwable[0] = t; } throw new RuntimeException(t); } } } }; final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREAD); for (int threadIndex=0; threadIndex<MAX_THREAD && iterator.hasNext(); threadIndex++) { executor.execute(runnable); } executor.shutdown(); while (!executor.isTerminated()) { try { Thread.sleep(0,1); } catch (InterruptedException e) { e.printStackTrace(); throw new RuntimeException(e); } } if (throwable[0] != null) throw new RuntimeException(throwable[0]); } } } public interface Operation<T> { void perform(T pParameter); boolean follow(); }
Beispiel
@Test public void test() { List<Long> longList = new ArrayList<Long>(); for (long i = 0; i < 1000000; i++) { longList.add(i); } final List<Integer> integerList = new LinkedList<>(); Parallel.For((Iterable<? extends Number>) longList, new Parallel.Operation<Number>() { @Override public void perform(Number pParameter) { System.out.println(pParameter); integerList.add(pParameter.intValue()); } @Override public boolean follow() { return true; } }); for (Number num : integerList) { System.out.println(num); } }
JavaparallelMultithreading
quelle