Verwandeln Sie Java Future in eine CompletableFuture

91

Java 8 führt CompletableFutureeine neue Implementierung von Future ein, die zusammensetzbar ist (einschließlich einer Reihe von thenXxx-Methoden). Ich möchte dies ausschließlich verwenden, aber viele der Bibliotheken, die ich verwenden möchte, geben nur nicht zusammensetzbare FutureInstanzen zurück.

Gibt es eine Möglichkeit, zurückgegebene FutureInstanzen in a zusammenzufassen, CompleteableFuturedamit ich sie zusammenstellen kann?

Dan Midwood
quelle

Antworten:

56

Es gibt einen Weg, aber Sie werden es nicht mögen. Die folgende Methode wandelt a Future<T>in a um CompletableFuture<T>:

public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
  if (future.isDone())
    return transformDoneFuture(future);
  return CompletableFuture.supplyAsync(() -> {
    try {
      if (!future.isDone())
        awaitFutureIsDoneInForkJoinPool(future);
      return future.get();
    } catch (ExecutionException e) {
      throw new RuntimeException(e);
    } catch (InterruptedException e) {
      // Normally, this should never happen inside ForkJoinPool
      Thread.currentThread().interrupt();
      // Add the following statement if the future doesn't have side effects
      // future.cancel(true);
      throw new RuntimeException(e);
    }
  });
}

private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
  CompletableFuture<T> cf = new CompletableFuture<>();
  T result;
  try {
    result = future.get();
  } catch (Throwable ex) {
    cf.completeExceptionally(ex);
    return cf;
  }
  cf.complete(result);
  return cf;
}

private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
    throws InterruptedException {
  ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
    @Override public boolean block() throws InterruptedException {
      try {
        future.get();
      } catch (ExecutionException e) {
        throw new RuntimeException(e);
      }
      return true;
    }
    @Override public boolean isReleasable() {
      return future.isDone();
    }
  });
}

Offensichtlich besteht das Problem bei diesem Ansatz darin, dass für jede Zukunft ein Thread blockiert wird, um auf das Ergebnis der Zukunft zu warten - ein Widerspruch zur Idee der Zukunft. In einigen Fällen ist es möglicherweise möglich, es besser zu machen. Im Allgemeinen gibt es jedoch keine Lösung, ohne aktiv auf das Ergebnis der Zukunft zu warten .

nosid
quelle
1
Ha, genau das habe ich geschrieben, bevor ich dachte, dass es einen besseren Weg geben muss. Aber ich denke nicht
Dan Midwood
12
Hmmm ... frisst diese Lösung nicht einen der Fäden des "gemeinsamen Pools", nur um zu warten? Diese "Common Pool" -Threads sollten niemals blockieren ... hmmmm ...
Peti
1
@ Peti: Du hast recht. Der Punkt ist jedoch, wenn Sie höchstwahrscheinlich etwas falsch machen, unabhängig davon, ob Sie den allgemeinen Pool oder einen unbegrenzten Thread-Pool verwenden .
Nosid
4
Es ist möglicherweise nicht perfekt, aber die Verwendung CompletableFuture.supplyAsync(supplier, new SinglethreadExecutor())würde zumindest die allgemeinen Pool-Threads nicht blockieren.
MikeFHay
6
Bitte, tu das einfach nie
Laymain
55

Wenn die Bibliothek, die Sie verwenden möchten, zusätzlich zum Future-Stil auch eine Rückrufmethode bietet, können Sie einen Handler bereitstellen, der die CompletableFuture ohne zusätzliche Thread-Blockierung abschließt. Wie so:

    AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file"));
    // ... 
    CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>();
    open.read(buffer, position, null, new CompletionHandler<Integer, Void>() {
        @Override
        public void completed(Integer result, Void attachment) {
            completableFuture.complete(buffer);
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            completableFuture.completeExceptionally(exc);
        }
    });
    completableFuture.thenApply(...)

Ohne den Rückruf sehe ich die einzige andere Möglichkeit, dies zu lösen, darin, eine Future.isDone()Abfrageschleife zu verwenden, die alle Ihre Überprüfungen auf einen einzelnen Thread legt und dann vollständig aufruft, wenn eine Zukunft verfügbar ist.

Kafkaesque
quelle
Ich verwende die asynchrone Apache Http-Bibliothek, die FutureCallback akzeptiert. Es hat mir das Leben leichter gemacht :)
Abhishek Gayakwad
10

Ich habe ein kleines Zukunftsprojekt veröffentlicht , das versucht, die Antwort besser als den einfachen Weg zu machen .

Die Hauptidee besteht darin, den einzigen Thread (und natürlich nicht nur eine Spin-Schleife) zu verwenden, um alle darin enthaltenen Futures-Zustände zu überprüfen. Dadurch wird vermieden, dass ein Thread für jede Future -> CompletableFuture-Transformation aus einem Pool blockiert wird.

Anwendungsbeispiel:

Future oldFuture = ...;
CompletableFuture profit = Futurity.shift(oldFuture);
Dmitry Spikhalskiy
quelle
Das sieht interessant aus. Verwendet es einen Timer-Thread? Wie kommt es, dass dies nicht die akzeptierte Antwort ist?
Kira
@Kira Ja, es wird im Grunde ein Timer-Thread verwendet, um auf alle eingereichten Futures zu warten.
Dmitry Spikhalskiy
10

Wenn Sie Futuredas Ergebnis eines Aufrufs einer ExecutorServiceMethode sind (z. B. submit()), ist es am einfachsten CompletableFuture.runAsync(Runnable, Executor), stattdessen die Methode zu verwenden.

Von

Runnbale myTask = ... ;
Future<?> future = myExecutor.submit(myTask);

zu

Runnbale myTask = ... ;
CompletableFuture<?> future = CompletableFuture.runAsync(myTask, myExecutor);

Das CompletableFuturewird dann "nativ" erstellt.

BEARBEITEN: Verfolgen von Kommentaren von @SamMefford, korrigiert von @MartinAndersson. Wenn Sie a übergeben möchten Callable, müssen Sie anrufen supplyAsync()und das Callable<T>in ein konvertieren Supplier<T>, z. B. mit:

CompletableFuture.supplyAsync(() -> {
    try { return myCallable.call(); }
    catch (Exception ex) { throw new RuntimeException(ex); } // Or return default value
}, myExecutor);

Da T Callable.call() throws Exception;eine Ausnahme ausgelöst wird und T Supplier.get();dies nicht der Fall ist, müssen Sie die Ausnahme abfangen, damit Prototypen kompatibel sind.

Matthieu
quelle
1
Wenn Sie Callable <T> anstelle von Runnable verwenden, versuchen Sie stattdessen, supplyAsync:CompletableFuture<T> future = CompletableFuture.supplyAsync(myCallable, myExecutor);
Sam Mefford
@ SamMefford danke, ich habe bearbeitet, um diese Informationen aufzunehmen.
Matthieu
supplyAsyncerhält eine Supplier. Der Code wird nicht kompiliert, wenn Sie versuchen, a zu übergeben Callable.
Martin Andersson
@ MartinAndersson das stimmt, danke. Ich habe weiter bearbeitet, um a Callable<T>in a umzuwandeln Supplier<T>.
Matthieu
7

Vorschlag:

http://www.thedevpiece.com/converting-old-java-future-to-completablefuture/

Aber im Grunde:

public class CompletablePromiseContext {
    private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor();

    public static void schedule(Runnable r) {
        SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS);
    }
}

Und das CompletablePromise:

public class CompletablePromise<V> extends CompletableFuture<V> {
    private Future<V> future;

    public CompletablePromise(Future<V> future) {
        this.future = future;
        CompletablePromiseContext.schedule(this::tryToComplete);
    }

    private void tryToComplete() {
        if (future.isDone()) {
            try {
                complete(future.get());
            } catch (InterruptedException e) {
                completeExceptionally(e);
            } catch (ExecutionException e) {
                completeExceptionally(e.getCause());
            }
            return;
        }

        if (future.isCancelled()) {
            cancel(true);
            return;
        }

        CompletablePromiseContext.schedule(this::tryToComplete);
    }
}

Beispiel:

public class Main {
    public static void main(String[] args) {
        final ExecutorService service = Executors.newSingleThreadExecutor();
        final Future<String> stringFuture = service.submit(() -> "success");
        final CompletableFuture<String> completableFuture = new CompletablePromise<>(stringFuture);

        completableFuture.whenComplete((result, failure) -> {
            System.out.println(result);
        });
    }
}
Gabriel Francisco
quelle
Dies ist recht einfach zu überlegen und elegant und passt zu den meisten Anwendungsfällen. Ich würde das CompletablePromiseContext nicht statisch machen und Parameter für das Prüfintervall (das hier auf 1 ms eingestellt ist) nehmen und dann den CompletablePromise<V>Konstruktor überladen , um Ihrem eigenen CompletablePromiseContextein möglicherweise anderes (längeres) Prüfintervall für Langzeitläufe zur Verfügung zu stellen , in Future<V>denen Sie nicht arbeiten Sie müssen nicht unbedingt in der Lage sein, den Rückruf sofort nach Abschluss auszuführen (oder zu komponieren), und Sie können auch eine Instanz haben CompletablePromiseContext, um eine Reihe von Future(falls Sie viele haben)
Dexter Legaspi
5

Lassen Sie mich eine andere (hoffentlich bessere) Option vorschlagen: https://github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata /gleichzeitig

Kurz gesagt lautet die Idee wie folgt:

  1. CompletableTask<V>Schnittstelle einführen - die Vereinigung des CompletionStage<V>+RunnableFuture<V>
  2. Warp ExecutorService, um CompletableTaskvon submit(...)Methoden zurückzukehren (anstelle von Future<V>)
  3. Fertig, wir haben lauffähige UND zusammensetzbare Futures.

Die Implementierung verwendet eine alternative CompletionStage-Implementierung (achten Sie darauf, CompletionStage anstelle von CompletableFuture):

Verwendung:

J8ExecutorService exec = J8Executors.newCachedThreadPool();
CompletionStage<String> = exec
   .submit( someCallableA )
   .thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b)
   .thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c); 
Valery Silaev
quelle
2
Kleines Update: Code wird in ein separates Projekt verschoben, github.com/vsilaev/tascalate-concurrent , und jetzt ist es möglich, Executor-s aus java.util.concurrent zu verwenden.
Valery Silaev