Frage
Wie erstellt man in Java 8 einen richtigen Hintergrundlader? Die Bedingungen:
- Daten sollten im Hintergrund geladen werden
- Nach dem Laden sollten die Daten angezeigt werden
- Während Daten geladen werden, sollten keine weiteren Anforderungen akzeptiert werden
- Wenn während des Ladens der Daten Anforderungen aufgetreten sind, sollte nach einer bestimmten Zeitüberschreitung (z. B. 5 Sekunden) ein erneutes Laden geplant werden.
Der Zweck besteht beispielsweise darin, Neuladeanforderungen zu akzeptieren, nicht jedoch die mit den Anforderungen überflutete Datenbank.
MCVE
Hier ist eine MCVE. Es besteht aus einer Hintergrundaufgabe, die das Laden simuliert, indem Thread.sleep einfach 2 Sekunden lang aufgerufen wird. Die Aufgabe wird jede Sekunde geplant, was natürlich zu einer Überlappung der Hintergrundladeaufgaben führt, die vermieden werden sollte.
public class LoadInBackgroundExample {
/**
* A simple background task which should perform the data loading operation. In this minimal example it simply invokes Thread.sleep
*/
public static class BackgroundTask implements Runnable {
private int id;
public BackgroundTask(int id) {
this.id = id;
}
/**
* Sleep for a given amount of time to simulate loading.
*/
@Override
public void run() {
try {
System.out.println("Start #" + id + ": " + Thread.currentThread());
long sleepTime = 2000;
Thread.sleep( sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("Finish #" + id + ": " + Thread.currentThread());
}
}
}
/**
* CompletableFuture which simulates loading and showing data.
* @param taskId Identifier of the current task
*/
public static void loadInBackground( int taskId) {
// create the loading task
BackgroundTask backgroundTask = new BackgroundTask( taskId);
// "load" the data asynchronously
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
CompletableFuture<Void> future = CompletableFuture.runAsync(backgroundTask);
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return "task " + backgroundTask.id;
}
});
// display the data after they are loaded
CompletableFuture<Void> future = completableFuture.thenAccept(x -> {
System.out.println( "Background task finished:" + x);
});
}
public static void main(String[] args) {
// runnable which invokes the background loader every second
Runnable trigger = new Runnable() {
int taskId = 0;
public void run() {
loadInBackground( taskId++);
}
};
// create scheduler
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(trigger, 0, 1, TimeUnit.SECONDS);
// cancel the scheudler and the application after 10 seconds
scheduler.schedule(() -> beeperHandle.cancel(true), 10, TimeUnit.SECONDS);
try {
beeperHandle.get();
} catch (Throwable th) {
}
System.out.println( "Cancelled");
System.exit(0);
}
}
Die Ausgabe ist folgende:
Start #0: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Start #1: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Start #2: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Finish #0: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Background task finished:task 0
Finish #1: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Background task finished:task 1
Start #3: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Finish #2: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 2
Start #4: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Start #5: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Finish #3: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Background task finished:task 3
Start #6: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Finish #4: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 4
Finish #5: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Background task finished:task 5
Start #7: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Finish #6: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Start #8: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 6
Start #9: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Finish #7: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Background task finished:task 7
Start #10: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Finish #8: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 8
Cancelled
Das Ziel ist es, zB # 1 und # 2 überspringen zu lassen, weil # 0 noch läuft.
Problem
Wo stellen Sie den Blockiermechanismus richtig ein? Sollte die Synchronisation verwendet werden? Oder welche AtomicBoolean
? Und wenn ja, sollte es innerhalb der get()
Methode oder anderswo sein?
quelle
ExecutorService
mit einer Thread-Pool-Größe von 1 in Betracht gezogen ?BlockingQueue
?Antworten:
Sie haben bereits einen Threadpool, um die Aufgabe auszuführen. Es ist nicht unbedingt und kompliziert, die Aufgabe in einem anderen asynchronen Executor auszuführen (
ForkJoinPool
wenn Sie verwendenCompletableFuture
)Mach es einfach:
Der ScheduledExecutorService stellt sicher, dass jeweils nur eine Aufgabe ausgeführt wird, wenn Sie sie mit schedAtFixedRate aufrufen
quelle
Nehmen Sie Folgendes als Voraussetzung:
Die Lösung ca werden Build basiert auf dem
Executors.newSingleThreadExecutor()
,CompletableFuture
undLinkedBlockingQueue
:Nach der Ausführung hat das stdout die folgende Ausgabe:
quelle
Ich habe eine AtomicInteger hinzugefügt, die als Zähler für die Ausführung von Aufgaben mit einfachen Methoden lock () und entsperren () dient, wobei diese geringfügige Änderung an Ihrem ursprünglichen Code vorgenommen wurde:
Hier ist meine Lösung für Ihre Aufgabe:
AKTUALISIEREN
Ich habe die Methoden lock () und entsperren () in eine einfachere Form geändert:
quelle
Wenn Sie verstehen, haben Sie mehrere Aufgaben gleichzeitig im Hintergrund. Da diese Aufgaben genau denselben Job ausführen, den Sie nicht parallel ausführen möchten, benötigen Sie eine Aufgabe, um den Job zu beenden und seine Ergebnisse an andere weiterzugeben. Wenn Sie also 10
CompletableFuture
gleichzeitig erhalten, möchten Sie, dass einer von ihnen 'reload' in db aufruft und die Ausführungsergebnisse auf eine Weise an andere weitergibt, die alleCompletableFuture
normal mit dem Ergebnis abschließt. Ich nehme das an vonund
Wenn meine Vermutungen richtig sind, können Sie meine Lösung ausprobieren.
Ich habe eine Art Eltern-Kind-Beziehung zwischen Aufgaben. Die Elternaufgabe ist diejenige, die wirklich ihren Job macht und das Ergebnis an ihre Kinder weitergibt. Die untergeordnete Aufgabe ist eine Aufgabe, die hinzugefügt wurde, während die übergeordnete Aufgabe noch ausgeführt wurde. Die untergeordnete Aufgabe wartet, bis die Ausführung der übergeordneten Aufgabe abgeschlossen ist. Da die Ergebnisse der Elternaufgabe noch "frisch" sind, werden sie in jedes Kind kopiert und alle vervollständigen ihre Zukunft.
Und hier ist die Ausgabe:
quelle
Wenn Sie nur einen einzigen Zugriffsthread möchten, erledigt ein einfacher synchronisierter Thread die Aufgabe ...
Ausgabe:
Code:
quelle
Ich habe eine Lösung mit einem Thread-Dual-Switch ausprobiert, siehe Klasse
BackgroundTaskDualSwitch
, sie simuliert das Laden mit demCompletableFuture
. Die Idee ist, eine zweite Aufgabe warten zu lassen, bis die aktuell ausgeführte Aufgabe abgeschlossen ist (siehe Änderung in)BackgroundTask
. Dadurch wird sichergestellt, dass maximal ein Task-Thread ausgeführt wird und maximal ein Task-Thread wartet. Weitere Anforderungen werden übersprungen, bis die laufende Aufgabe abgeschlossen ist und die nächste Anforderung bearbeitet werden kann.Ausgabe ist:
quelle
Der erste Thread, der mit der teuren Arbeit beginnt, benachrichtigt das Ergebnis mit einem Rückruf. Andere Threads, die versuchen, es auszuführen, werden in ExpensiveWork.notificables registriert. Sobald die teure Arbeit beendet ist, werden sie vom Thread benachrichtigt, der die Arbeit ausgeführt hat.
In der Zwischenzeit überprüfen die Threads alle 5 Sekunden das Ergebnis.
Und das ist die Ausgabe:
quelle