Wie erstelle ich einen blockierenden Hintergrundlader in Java 8?

22

Frage

Wie erstellt man in Java 8 einen richtigen Hintergrundlader? Die Bedingungen:

  • Daten sollten im Hintergrund geladen werden
  • Nach dem Laden sollten die Daten angezeigt werden
  • Während Daten geladen werden, sollten keine weiteren Anforderungen akzeptiert werden
  • Wenn während des Ladens der Daten Anforderungen aufgetreten sind, sollte nach einer bestimmten Zeitüberschreitung (z. B. 5 Sekunden) ein erneutes Laden geplant werden.

Der Zweck besteht beispielsweise darin, Neuladeanforderungen zu akzeptieren, nicht jedoch die mit den Anforderungen überflutete Datenbank.

MCVE

Hier ist eine MCVE. Es besteht aus einer Hintergrundaufgabe, die das Laden simuliert, indem Thread.sleep einfach 2 Sekunden lang aufgerufen wird. Die Aufgabe wird jede Sekunde geplant, was natürlich zu einer Überlappung der Hintergrundladeaufgaben führt, die vermieden werden sollte.

public class LoadInBackgroundExample {

  /**
   * A simple background task which should perform the data loading operation. In this minimal example it simply invokes Thread.sleep
   */
  public static class BackgroundTask implements Runnable {

    private int id;

    public BackgroundTask(int id) {
      this.id = id;
    }

    /**
     * Sleep for a given amount of time to simulate loading.
     */
    @Override
    public void run() {

      try {

        System.out.println("Start #" + id + ": " + Thread.currentThread());

        long sleepTime = 2000; 
        Thread.sleep( sleepTime);

      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        System.out.println("Finish #" + id + ": " + Thread.currentThread());
      }

    }
  }

  /**
   * CompletableFuture which simulates loading and showing data.
   * @param taskId Identifier of the current task
   */
  public static void loadInBackground( int taskId) {

    // create the loading task
    BackgroundTask backgroundTask = new BackgroundTask( taskId);

    // "load" the data asynchronously
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {

      @Override
      public String get() {

        CompletableFuture<Void> future = CompletableFuture.runAsync(backgroundTask);

        try {

          future.get();

        } catch (InterruptedException | ExecutionException e) {
          e.printStackTrace();
        }

        return "task " + backgroundTask.id;
      }
    });

    // display the data after they are loaded
    CompletableFuture<Void> future = completableFuture.thenAccept(x -> {

      System.out.println( "Background task finished:" + x);

    });

  }


  public static void main(String[] args) {

    // runnable which invokes the background loader every second
    Runnable trigger = new Runnable() {

      int taskId = 0;

      public void run() { 

        loadInBackground( taskId++);

      }
    };

    // create scheduler
    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(trigger, 0, 1, TimeUnit.SECONDS);

    // cancel the scheudler and the application after 10 seconds
    scheduler.schedule(() -> beeperHandle.cancel(true), 10, TimeUnit.SECONDS);

    try {
      beeperHandle.get();
    } catch (Throwable th) {
    }

    System.out.println( "Cancelled");
    System.exit(0);
  }

}

Die Ausgabe ist folgende:

Start #0: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Start #1: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Start #2: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Finish #0: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Background task finished:task 0
Finish #1: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Background task finished:task 1
Start #3: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Finish #2: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 2
Start #4: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Start #5: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Finish #3: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Background task finished:task 3
Start #6: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Finish #4: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 4
Finish #5: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Background task finished:task 5
Start #7: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Finish #6: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Start #8: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 6
Start #9: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Finish #7: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Background task finished:task 7
Start #10: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Finish #8: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 8
Cancelled

Das Ziel ist es, zB # 1 und # 2 überspringen zu lassen, weil # 0 noch läuft.

Problem

Wo stellen Sie den Blockiermechanismus richtig ein? Sollte die Synchronisation verwendet werden? Oder welche AtomicBoolean? Und wenn ja, sollte es innerhalb der get()Methode oder anderswo sein?

Roland
quelle
5
Haben Sie BlockingQueue in Betracht gezogen ?
Abra
Noch nicht, aber es klingt nach einem gültigen Ansatz. Ich werde das überprüfen. Vielen Dank!
Roland
Haben Sie eine ExecutorServicemit einer Thread-Pool-Größe von 1 in Betracht gezogen ?
user207421
2
@ Roland Worum geht es bei der Prämie? Die Frage wurde nach dem letzten Kommentar von Ihnen nicht aktualisiert. Hast du erforscht BlockingQueue?
Naman
"Hintergrund blockieren" scheint einen Widerspruch zu verkörpern.
user207421

Antworten:

6

Sie haben bereits einen Threadpool, um die Aufgabe auszuführen. Es ist nicht unbedingt und kompliziert, die Aufgabe in einem anderen asynchronen Executor auszuführen ( ForkJoinPoolwenn Sie verwenden CompletableFuture)

Mach es einfach:

public static void loadInBackground(int taskId) {
    // create the loading task
    BackgroundTask backgroundTask = new BackgroundTask(taskId);
    // No need to run in async, as it already in executor
    backgroundTask.run();
}

Der ScheduledExecutorService stellt sicher, dass jeweils nur eine Aufgabe ausgeführt wird, wenn Sie sie mit schedAtFixedRate aufrufen

Erstellt eine periodische Aktion und führt sie aus, die zuerst nach der angegebenen anfänglichen Verzögerung und anschließend mit der angegebenen Periode aktiviert wird. Das heißt, die Ausführungen beginnen nach initialDelay, dann initialDelay + period, dann initialDelay + 2 * period und so weiter. Wenn bei einer Ausführung der Aufgabe eine Ausnahme auftritt, werden nachfolgende Ausführungen unterdrückt. Andernfalls wird die Aufgabe nur durch Stornierung oder Beendigung des Executors beendet. Wenn eine Ausführung dieser Aufgabe länger als die Dauer dauert, werden nachfolgende Ausführungen möglicherweise zu spät gestartet, jedoch nicht gleichzeitig ausgeführt .

Mạnh Quyết Nguyễn
quelle
5

Nehmen Sie Folgendes als Voraussetzung:

  • Daten sollten im Hintergrund geladen werden
  • Nach dem Laden sollten die Daten angezeigt werden
  • Während Daten geladen werden, sollten keine weiteren Anforderungen akzeptiert werden
  • Wenn während des Ladens der Daten Anforderungen aufgetreten sind, sollte nach einer bestimmten Zeitüberschreitung (z. B. 5 Sekunden) ein erneutes Laden geplant werden.

Die Lösung ca werden Build basiert auf dem Executors.newSingleThreadExecutor(), CompletableFutureund LinkedBlockingQueue:

public class SingleThreadedLoader {

  private static class BackgroundTask extends CompletableFuture<String> {

    private final String query;

    private BackgroundTask(String query) {
      this.query = query;
    }

    public String getQuery() {
      return query;
    }
  }

  private final BlockingQueue<BackgroundTask> tasks = new LinkedBlockingQueue<>();
  // while data are loaded no further requests should be accepted
  private final Executor executor = Executors.newSingleThreadExecutor();

  private final int delaySeconds;

  private AtomicReference<Instant> lastExecution = new AtomicReference<>(Instant.EPOCH);

  public SingleThreadedLoader(int delaySeconds) {
    this.delaySeconds = delaySeconds;
    setupLoading();
  }

  public BackgroundTask loadInBackground(String query) {
    log("Enqueued query " + query);
    BackgroundTask task = new BackgroundTask(query);
    tasks.add(task);
    return task;
  }

  private void setupLoading() {
    // data should be loaded in background
    executor.execute(() -> {
      while (true) {
        try {
          // if there were requests while the data were loaded
          // another loading should be scheduled after a certain timeout (e. g. 5 seconds)
          Instant prev = lastExecution.get();
          long delay = Duration.between(prev, Instant.now()).toSeconds();
          if (delay < delaySeconds) {
            log("Waiting for 5 seconds before next data loading");
            TimeUnit.SECONDS.sleep(delaySeconds - delay);
          }
          BackgroundTask task = tasks.take();
          try {
            String query = task.getQuery();
            String data = loadData(query);
            task.complete(data);
          } catch (Exception e) {
            task.completeExceptionally(e);
          }
          lastExecution.set(Instant.now());
        } catch (InterruptedException e) {
          log(e.getMessage());
          return;
        }
      }
    });
  }

  private String loadData(String query) {
    try {
      log("Loading data for " + query);
      TimeUnit.SECONDS.sleep(2);
      log("Loaded data for " + query);
      return "Result " + query;
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }

  private static void log(String str) {
    String time = LocalTime.now().truncatedTo(ChronoUnit.SECONDS).format(DateTimeFormatter.ISO_TIME);
    String thread = Thread.currentThread().getName();
    System.out.println(time + ' ' + thread + ": " + str);
  }

  public static void main(String[] args) throws Exception {
    SingleThreadedLoader loader = new SingleThreadedLoader(5);
    // after the loading the data should be displayed
    loader.loadInBackground("1").thenAccept(SingleThreadedLoader::log);
    loader.loadInBackground("2").thenAccept(SingleThreadedLoader::log);
    loader.loadInBackground("3").thenAccept(SingleThreadedLoader::log);

    log("Do another work in the main thread");

    TimeUnit.SECONDS.sleep(30);
  }
}

Nach der Ausführung hat das stdout die folgende Ausgabe:

10:29:26 main: Enqueued query 1
10:29:26 pool-1-thread-1: Loading data for 1
10:29:26 main: Enqueued query 2
10:29:26 main: Enqueued query 3
10:29:26 main: Do another work in the main thread
10:29:28 pool-1-thread-1: Loaded data for 1
10:29:28 pool-1-thread-1: Result 1
10:29:28 pool-1-thread-1: Waiting for 5 seconds before next data loading
10:29:33 pool-1-thread-1: Loading data for 2
10:29:36 pool-1-thread-1: Loaded data for 2
10:29:36 pool-1-thread-1: Result 2
10:29:36 pool-1-thread-1: Waiting for 5 seconds before next data loading
10:29:41 pool-1-thread-1: Loading data for 3
10:29:43 pool-1-thread-1: Loaded data for 3
10:29:43 pool-1-thread-1: Result 3
Evgeniy Khyst
quelle
4

Ich habe eine AtomicInteger hinzugefügt, die als Zähler für die Ausführung von Aufgaben mit einfachen Methoden lock () und entsperren () dient, wobei diese geringfügige Änderung an Ihrem ursprünglichen Code vorgenommen wurde:

Start #0: Thread[ForkJoinPool.commonPool-worker-2,5,main]
background task cancelled  1
background task cancelled  2
Finish #0: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Background task finished:task 0
Start #3: Thread[ForkJoinPool.commonPool-worker-2,5,main]
background task cancelled  4
Finish #3: Thread[ForkJoinPool.commonPool-worker-2,5,main]
background task cancelled  5
Background task finished:task 3
Start #6: Thread[ForkJoinPool.commonPool-worker-3,5,main]
background task cancelled  7
Finish #6: Thread[ForkJoinPool.commonPool-worker-3,5,main]
background task cancelled  8
Background task finished:task 6
Start #9: Thread[ForkJoinPool.commonPool-worker-2,5,main]
background task cancelled  10
Cancelled

Hier ist meine Lösung für Ihre Aufgabe:

public class LoadInBackgroundExample {
    //Added new exception
    public static class AlreadyIsRunningException extends RuntimeException {
        long taskId;

        public AlreadyIsRunningException(String message, long taskId) {
            super(message);
            this.taskId = taskId;
        }

        public long getTaskId() {
            return taskId;
        }

        public void setTaskId(long taskId) {
            this.taskId = taskId;
        }
    }


    /**
     * A simple background task which should perform the data loading operation. In this minimal example it simply invokes Thread.sleep
     */
    public static class BackgroundTask implements Runnable {


        //this atomicInteger acts as a global lock counter for BackgroundTask objects
        private static AtomicInteger counter = new AtomicInteger(0);


        private int id;

        public BackgroundTask(int id) {
            this.id = id;
        }

        private void unlock() {
            counter.decrementAndGet();
        }

        private void lock() {
            //we need to check this way to avoid some unlucky timing between threads
            int lockValue = counter.incrementAndGet();
            //if we got counter different than 1 that means that some other task is already running (it has already acquired the lock)
            if (lockValue != 1) {
                //rollback our check
                counter.decrementAndGet();
                //throw an exception
                throw new AlreadyIsRunningException("Some other task already is running", id);
            }
        }

        /**
         * Sleep for a given amount of time to simulate loading.
         */
        @Override
        public void run() {
            //Check if we can acquire lock

            lock();
            //we have a lock to
            try {
                System.out.println("Start #" + id + ": " + Thread.currentThread());
                long sleepTime = 2000;
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("Finish #" + id + ": " + Thread.currentThread());
                unlock();
            }
        }
    }

    /**
     * CompletableFuture which simulates loading and showing data.
     *
     * @param taskId Identifier of the current task
     */


    public static void loadInBackground(int taskId) {
        // create the loading task
        BackgroundTask backgroundTask = new BackgroundTask(taskId);
        // "load" the data asynchronously
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                CompletableFuture<Void> future = CompletableFuture.runAsync(backgroundTask);

                try {
                    future.get();
                } catch (ExecutionException e) {
                    if (e.getCause() instanceof AlreadyIsRunningException) {
                        System.out.println("background task cancelled  " + ((AlreadyIsRunningException) e.getCause()).getTaskId());
                        throw (AlreadyIsRunningException) e.getCause();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "task " + backgroundTask.id;
            }
        });
        // display the data after they are loaded
        CompletableFuture<Void> future = completableFuture.thenAccept(x -> {
            System.out.println("Background task finished:" + x);
        });
    }

    ArrayList<BackgroundTask> backgroundTasks = new ArrayList<>();


    public static void main(String[] args) {
        // runnable which invokes the background loader every second
        Runnable trigger = new Runnable() {
            int taskId = 0;

            public void run() {
                loadInBackground(taskId++);
            }
        };

        // create scheduler
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(trigger, 0, 1, TimeUnit.SECONDS);

        // cancel the scheudler and the application after 10 seconds
        scheduler.schedule(() -> beeperHandle.cancel(true), 10, TimeUnit.SECONDS);

        try {
            beeperHandle.get();
        } catch (Throwable th) {
        }

        System.out.println("Cancelled");
        System.exit(0);
    }

AKTUALISIEREN

Ich habe die Methoden lock () und entsperren () in eine einfachere Form geändert:

private static  AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        private void unlock() {
            atomicBoolean.set(false);
        }
        private void lock() {
            //if 'changed' is false that means some other task is already running
            boolean changed = atomicBoolean.compareAndSet(false,true);
            if (!changed) {
                throw new AlreadyIsRunningException("Some other task  is already running", id);
            }
        }
Nonika
quelle
2

Wenn Sie verstehen, haben Sie mehrere Aufgaben gleichzeitig im Hintergrund. Da diese Aufgaben genau denselben Job ausführen, den Sie nicht parallel ausführen möchten, benötigen Sie eine Aufgabe, um den Job zu beenden und seine Ergebnisse an andere weiterzugeben. Wenn Sie also 10 CompletableFuturegleichzeitig erhalten, möchten Sie, dass einer von ihnen 'reload' in db aufruft und die Ausführungsergebnisse auf eine Weise an andere weitergibt, die alle CompletableFuturenormal mit dem Ergebnis abschließt. Ich nehme das an von

Das Ziel ist es, zB # 1 und # 2 überspringen zu lassen, weil # 0 noch läuft.

und

Nach dem Laden sollten die Daten angezeigt werden

Wenn meine Vermutungen richtig sind, können Sie meine Lösung ausprobieren.

Ich habe eine Art Eltern-Kind-Beziehung zwischen Aufgaben. Die Elternaufgabe ist diejenige, die wirklich ihren Job macht und das Ergebnis an ihre Kinder weitergibt. Die untergeordnete Aufgabe ist eine Aufgabe, die hinzugefügt wurde, während die übergeordnete Aufgabe noch ausgeführt wurde. Die untergeordnete Aufgabe wartet, bis die Ausführung der übergeordneten Aufgabe abgeschlossen ist. Da die Ergebnisse der Elternaufgabe noch "frisch" sind, werden sie in jedes Kind kopiert und alle vervollständigen ihre Zukunft.

public class BackgroundService {


    public static class BackgroundJob implements Callable<String> {

        private static BackgroundJob ROOT_JOB = null;

        private synchronized static void addBackgroundJob(BackgroundJob backgroundJob) {
            if (ROOT_JOB != null) {
                ROOT_JOB.addChild(backgroundJob);
            } else {
                System.out.println();
                System.out.println(Thread.currentThread().getName() + " RUNNING ROOT TASK-" + backgroundJob.jobId);
                ROOT_JOB = backgroundJob;
            }
        }

        private synchronized static void unlock() {
            ROOT_JOB = null;
        }

        private final int jobId;
        private List<BackgroundJob> children = new ArrayList<>();
        private BackgroundJob parent;
        private String providedResultFromParent = null;


        public BackgroundJob(int jobId) {
            this.jobId = jobId;
        }

        private  void addChild(BackgroundJob backgroundJob) {
            backgroundJob.parent = this;
            this.children.add(backgroundJob);
        }

        @Override
        public String call() throws Exception {
            addBackgroundJob(this);
            if (parent == null) {
                String result = logic();

                synchronized (ROOT_JOB) {
                    for (final BackgroundJob backgroundJob : children) {
                        backgroundJob.providedResultFromParent = result;
                        synchronized (backgroundJob) {
                            backgroundJob.notify();
                        }
                    }
                    unlock();
                }

                return "\t\tROOT task" + jobId + "'s " + result;
            } else {
                synchronized (this) {
                    System.out.println(Thread.currentThread().getName() + "\t\tskipping task-" + jobId + " and waiting running task-" + parent.jobId + " to finish");
                    this.wait();
                }
                return "\t\t\t\ttask-" + jobId + "'s " + providedResultFromParent;
            }
        }

        private String logic() throws InterruptedException {
            Thread.sleep(2000);
            return (int) (Math.random() * 1000) + " ";
        }
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        AtomicInteger atomicInteger = new AtomicInteger();
        ExecutorService pool = Executors.newCachedThreadPool();
        Supplier<String> job = () -> {
            int taskId = atomicInteger.incrementAndGet();
            BackgroundJob backgroundJob = new BackgroundJob(taskId);
            try {
                return backgroundJob.call();
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "finished " + taskId;
        };

        for (int i = 100; i > 0; i--) {
            CompletableFuture.supplyAsync(job, pool).thenAccept(s -> System.out.println(Thread.currentThread().getName()+" "+ s + " result is readable"));
            Thread.sleep((long) (Math.random() * 500));
        }

        pool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        pool.shutdown();
    }

Und hier ist die Ausgabe:

pool-1-thread-1 RUNNING ROOT TASK-1
pool-1-thread-2     skipping task-2 and waiting running task-1 to finish
pool-1-thread-3     skipping task-3 and waiting running task-1 to finish
pool-1-thread-4     skipping task-4 and waiting running task-1 to finish
pool-1-thread-5     skipping task-5 and waiting running task-1 to finish
pool-1-thread-6     skipping task-6 and waiting running task-1 to finish
pool-1-thread-7     skipping task-7 and waiting running task-1 to finish
pool-1-thread-8     skipping task-8 and waiting running task-1 to finish
pool-1-thread-3                 task-3's 165  result is readable
pool-1-thread-8                 task-8's 165  result is readable
pool-1-thread-6                 task-6's 165  result is readable
pool-1-thread-5                 task-5's 165  result is readable
pool-1-thread-7                 task-7's 165  result is readable
pool-1-thread-2                 task-2's 165  result is readable
pool-1-thread-1         ROOT task1's 165  result is readable
pool-1-thread-4                 task-4's 165  result is readable

pool-1-thread-4 RUNNING ROOT TASK-9
pool-1-thread-1     skipping task-10 and waiting running task-9 to finish
pool-1-thread-2     skipping task-11 and waiting running task-9 to finish
pool-1-thread-7     skipping task-12 and waiting running task-9 to finish
pool-1-thread-5     skipping task-13 and waiting running task-9 to finish
pool-1-thread-8     skipping task-14 and waiting running task-9 to finish
pool-1-thread-6     skipping task-15 and waiting running task-9 to finish
pool-1-thread-3     skipping task-16 and waiting running task-9 to finish
pool-1-thread-9     skipping task-17 and waiting running task-9 to finish
pool-1-thread-10        skipping task-18 and waiting running task-9 to finish
pool-1-thread-1                 task-10's 370  result is readable
pool-1-thread-10                task-18's 370  result is readable
pool-1-thread-4         ROOT task9's 370  result is readable
pool-1-thread-9                 task-17's 370  result is readable
pool-1-thread-7                 task-12's 370  result is readable
pool-1-thread-6                 task-15's 370  result is readable
pool-1-thread-8                 task-14's 370  result is readable
pool-1-thread-2                 task-11's 370  result is readable
pool-1-thread-3                 task-16's 370  result is readable
pool-1-thread-5                 task-13's 370  result is readable

pool-1-thread-5 RUNNING ROOT TASK-19
pool-1-thread-3     skipping task-20 and waiting running task-19 to finish
pool-1-thread-2     skipping task-21 and waiting running task-19 to finish
pool-1-thread-8     skipping task-22 and waiting running task-19 to finish
pool-1-thread-6     skipping task-23 and waiting running task-19 to finish
pool-1-thread-7     skipping task-24 and waiting running task-19 to finish
pool-1-thread-9     skipping task-25 and waiting running task-19 to finish
pool-1-thread-4     skipping task-26 and waiting running task-19 to finish
pool-1-thread-10        skipping task-27 and waiting running task-19 to finish
pool-1-thread-1     skipping task-28 and waiting running task-19 to finish
pool-1-thread-5         ROOT task19's 574  result is readable
pool-1-thread-8                 task-22's 574  result is readable
pool-1-thread-4                 task-26's 574  result is readable
pool-1-thread-7                 task-24's 574  result is readable
pool-1-thread-6                 task-23's 574  result is readable
pool-1-thread-3                 task-20's 574  result is readable
pool-1-thread-9                 task-25's 574  result is readable
pool-1-thread-2                 task-21's 574  result is readable
pool-1-thread-1                 task-28's 574  result is readable
pool-1-thread-10                task-27's 574  result is readable

pool-1-thread-10 RUNNING ROOT TASK-29
pool-1-thread-1     skipping task-30 and waiting running task-29 to finish
pool-1-thread-2     skipping task-31 and waiting running task-29 to finish
pool-1-thread-9     skipping task-32 and waiting running task-29 to finish
pool-1-thread-3     skipping task-33 and waiting running task-29 to finish
pool-1-thread-6     skipping task-34 and waiting running task-29 to finish
pool-1-thread-7     skipping task-35 and waiting running task-29 to finish
pool-1-thread-4     skipping task-36 and waiting running task-29 to finish
pool-1-thread-8     skipping task-37 and waiting running task-29 to finish
pool-1-thread-5     skipping task-38 and waiting running task-29 to finish
pool-1-thread-11        skipping task-39 and waiting running task-29 to finish
pool-1-thread-1                 task-30's 230  result is readable
pool-1-thread-11                task-39's 230  result is readable
pool-1-thread-8                 task-37's 230  result is readable
pool-1-thread-5                 task-38's 230  result is readable
pool-1-thread-4                 task-36's 230  result is readable
pool-1-thread-7                 task-35's 230  result is readable

pool-1-thread-12 RUNNING ROOT TASK-40
pool-1-thread-6                 task-34's 230  result is readable
pool-1-thread-10        ROOT task29's 230  result is readable
pool-1-thread-3                 task-33's 230  result is readable
pool-1-thread-9                 task-32's 230  result is readable
pool-1-thread-2                 task-31's 230  result is readable
pool-1-thread-2     skipping task-41 and waiting running task-40 to finish
pool-1-thread-9     skipping task-42 and waiting running task-40 to finish
pool-1-thread-3     skipping task-43 and waiting running task-40 to finish
pool-1-thread-10        skipping task-44 and waiting running task-40 to finish
pool-1-thread-6     skipping task-45 and waiting running task-40 to finish
pool-1-thread-7     skipping task-46 and waiting running task-40 to finish
pool-1-thread-2                 task-41's 282  result is readable
pool-1-thread-10                task-44's 282  result is readable
pool-1-thread-6                 task-45's 282  result is readable
pool-1-thread-7                 task-46's 282  result is readable
pool-1-thread-3                 task-43's 282  result is readable
pool-1-thread-9                 task-42's 282  result is readable
pool-1-thread-12        ROOT task40's 282  result is readable

pool-1-thread-12 RUNNING ROOT TASK-47
pool-1-thread-9     skipping task-48 and waiting running task-47 to finish
pool-1-thread-3     skipping task-49 and waiting running task-47 to finish
pool-1-thread-7     skipping task-50 and waiting running task-47 to finish
pool-1-thread-6     skipping task-51 and waiting running task-47 to finish
pool-1-thread-10        skipping task-52 and waiting running task-47 to finish
pool-1-thread-2     skipping task-53 and waiting running task-47 to finish
pool-1-thread-12        ROOT task47's 871  result is readable
pool-1-thread-10                task-52's 871  result is readable
pool-1-thread-2                 task-53's 871  result is readable
pool-1-thread-3                 task-49's 871  result is readable
pool-1-thread-6                 task-51's 871  result is readable
pool-1-thread-7                 task-50's 871  result is readable
pool-1-thread-9                 task-48's 871  result is readable

pool-1-thread-9 RUNNING ROOT TASK-54
pool-1-thread-7     skipping task-55 and waiting running task-54 to finish
pool-1-thread-6     skipping task-56 and waiting running task-54 to finish
pool-1-thread-3     skipping task-57 and waiting running task-54 to finish
pool-1-thread-2     skipping task-58 and waiting running task-54 to finish
pool-1-thread-10        skipping task-59 and waiting running task-54 to finish
pool-1-thread-12        skipping task-60 and waiting running task-54 to finish
pool-1-thread-4     skipping task-61 and waiting running task-54 to finish
pool-1-thread-5     skipping task-62 and waiting running task-54 to finish
pool-1-thread-9         ROOT task54's 345  result is readable
pool-1-thread-2                 task-58's 345  result is readable
pool-1-thread-5                 task-62's 345  result is readable
pool-1-thread-7                 task-55's 345  result is readable
pool-1-thread-10                task-59's 345  result is readable
pool-1-thread-6                 task-56's 345  result is readable
pool-1-thread-3                 task-57's 345  result is readable
pool-1-thread-4                 task-61's 345  result is readable
pool-1-thread-12                task-60's 345  result is readable

pool-1-thread-12 RUNNING ROOT TASK-63
pool-1-thread-4     skipping task-64 and waiting running task-63 to finish
pool-1-thread-3     skipping task-65 and waiting running task-63 to finish
pool-1-thread-6     skipping task-66 and waiting running task-63 to finish
pool-1-thread-10        skipping task-67 and waiting running task-63 to finish
pool-1-thread-7     skipping task-68 and waiting running task-63 to finish
pool-1-thread-5     skipping task-69 and waiting running task-63 to finish
pool-1-thread-2     skipping task-70 and waiting running task-63 to finish
pool-1-thread-12        ROOT task63's 670  result is readable
pool-1-thread-2                 task-70's 670  result is readable
pool-1-thread-5                 task-69's 670  result is readable
pool-1-thread-7                 task-68's 670  result is readable
pool-1-thread-10                task-67's 670  result is readable
pool-1-thread-6                 task-66's 670  result is readable
pool-1-thread-3                 task-65's 670  result is readable
pool-1-thread-4                 task-64's 670  result is readable

pool-1-thread-4 RUNNING ROOT TASK-71
pool-1-thread-3     skipping task-72 and waiting running task-71 to finish
pool-1-thread-6     skipping task-73 and waiting running task-71 to finish
pool-1-thread-10        skipping task-74 and waiting running task-71 to finish
pool-1-thread-7     skipping task-75 and waiting running task-71 to finish
pool-1-thread-5     skipping task-76 and waiting running task-71 to finish
pool-1-thread-2     skipping task-77 and waiting running task-71 to finish
pool-1-thread-12        skipping task-78 and waiting running task-71 to finish
pool-1-thread-9     skipping task-79 and waiting running task-71 to finish
pool-1-thread-8     skipping task-80 and waiting running task-71 to finish
pool-1-thread-4         ROOT task71's 445  result is readable
pool-1-thread-6                 task-73's 445  result is readable
pool-1-thread-9                 task-79's 445  result is readable
pool-1-thread-3                 task-72's 445  result is readable
pool-1-thread-8                 task-80's 445  result is readable
pool-1-thread-12                task-78's 445  result is readable
pool-1-thread-5                 task-76's 445  result is readable
pool-1-thread-10                task-74's 445  result is readable
pool-1-thread-2                 task-77's 445  result is readable
pool-1-thread-7                 task-75's 445  result is readable

pool-1-thread-7 RUNNING ROOT TASK-81
pool-1-thread-2     skipping task-82 and waiting running task-81 to finish
pool-1-thread-10        skipping task-83 and waiting running task-81 to finish
pool-1-thread-5     skipping task-84 and waiting running task-81 to finish
pool-1-thread-12        skipping task-85 and waiting running task-81 to finish
pool-1-thread-8     skipping task-86 and waiting running task-81 to finish
pool-1-thread-3     skipping task-87 and waiting running task-81 to finish
pool-1-thread-9     skipping task-88 and waiting running task-81 to finish
pool-1-thread-6     skipping task-89 and waiting running task-81 to finish
pool-1-thread-7         ROOT task81's 141  result is readable
pool-1-thread-6                 task-89's 141  result is readable
pool-1-thread-9                 task-88's 141  result is readable
pool-1-thread-3                 task-87's 141  result is readable
pool-1-thread-10                task-83's 141  result is readable
pool-1-thread-5                 task-84's 141  result is readable
pool-1-thread-12                task-85's 141  result is readable
pool-1-thread-8                 task-86's 141  result is readable
pool-1-thread-2                 task-82's 141  result is readable

pool-1-thread-2 RUNNING ROOT TASK-90
pool-1-thread-8     skipping task-91 and waiting running task-90 to finish
pool-1-thread-12        skipping task-92 and waiting running task-90 to finish
pool-1-thread-5     skipping task-93 and waiting running task-90 to finish
pool-1-thread-10        skipping task-94 and waiting running task-90 to finish
pool-1-thread-3     skipping task-95 and waiting running task-90 to finish
pool-1-thread-9     skipping task-96 and waiting running task-90 to finish
pool-1-thread-6     skipping task-97 and waiting running task-90 to finish
pool-1-thread-7     skipping task-98 and waiting running task-90 to finish
pool-1-thread-4     skipping task-99 and waiting running task-90 to finish
pool-1-thread-11        skipping task-100 and waiting running task-90 to finish
pool-1-thread-2         ROOT task90's 321  result is readable
pool-1-thread-3                 task-95's 321  result is readable
pool-1-thread-7                 task-98's 321  result is readable
pool-1-thread-8                 task-91's 321  result is readable
pool-1-thread-11                task-100's 321  result is readable
pool-1-thread-4                 task-99's 321  result is readable
pool-1-thread-5                 task-93's 321  result is readable
pool-1-thread-9                 task-96's 321  result is readable
pool-1-thread-12                task-92's 321  result is readable
pool-1-thread-10                task-94's 321  result is readable
pool-1-thread-6                 task-97's 321  result is readable
Nonika
quelle
0

Wenn Sie nur einen einzigen Zugriffsthread möchten, erledigt ein einfacher synchronisierter Thread die Aufgabe ...

Ausgabe:

Start #2: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Finish #0: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Background task finished:task 0 finished getting data...
Start #3: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Finish #2: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Start #4: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 2 finished getting data...
Finish #1: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Background task finished:task 1 finished getting data...
Start #6: Thread[ForkJoinPool.commonPool-worker-3,5,main]
Finish #4: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Start #5: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 4 finished getting data...
Finish #3: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Start #7: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Background task finished:task 3 finished getting data...
Cancelled

Code:

package queue;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class LoadInBackgroundExample {

    public static class SyncronizedBackend {
        public synchronized String getData() {

            long sleepTime = 2000;
            try {
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            return new String("finished getting data...");
        }
    }

    /**
     * A simple background task which should perform the data loading operation. In
     * this minimal example it simply invokes Thread.sleep
     */
    public static class BackgroundTask implements Runnable {

        private int id;
        private SyncronizedBackend syncronizedBackend;
        private String result;

        public BackgroundTask(SyncronizedBackend syncronizedBackend, int id) {
            this.syncronizedBackend = syncronizedBackend;
            this.id = id;
        }

        /**
         * Sleep for a given amount of time to simulate loading.
         */
        @Override
        public void run() {

            System.out.println("Start #" + id + ": " + Thread.currentThread());

            result = this.syncronizedBackend.getData();

            System.out.println("Finish #" + id + ": " + Thread.currentThread());
        }

        public String getResult() {
            return result;
        }
    }

    /**
     * CompletableFuture which simulates loading and showing data.
     * @param syncronizedBackend 
     * 
     * @param taskId Identifier of the current task
     */
    public static void loadInBackground(SyncronizedBackend syncronizedBackend, int taskId) {

        // create the loading task
        BackgroundTask backgroundTask = new BackgroundTask(syncronizedBackend, taskId);

        // "load" the data asynchronously
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {

            @Override
            public String get() {

                CompletableFuture<Void> future = CompletableFuture.runAsync(backgroundTask);

                try {

                    future.get();

                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }

                return "task " + backgroundTask.id + " " + backgroundTask.getResult();
            }
        });

        // display the data after they are loaded
        CompletableFuture<Void> future = completableFuture.thenAccept(x -> {

            System.out.println("Background task finished:" + x);

        });

    }

    public static void main(String[] args) {

        SyncronizedBackend syncronizedBackend = new SyncronizedBackend();

        // runnable which invokes the background loader every second
        Runnable trigger = new Runnable() {

            int taskId = 0;

            public void run() {

                loadInBackground(syncronizedBackend, taskId++);

            }
        };

        // create scheduler
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(trigger, 0, 1, TimeUnit.SECONDS);

        // cancel the scheudler and the application after 10 seconds
        scheduler.schedule(() -> beeperHandle.cancel(true), 10, TimeUnit.SECONDS);

        try {
            beeperHandle.get();
        } catch (Throwable th) {
        }

        System.out.println("Cancelled");
        System.exit(0);
    }

}
Marc Ströbel
quelle
0

Ich habe eine Lösung mit einem Thread-Dual-Switch ausprobiert, siehe Klasse BackgroundTaskDualSwitch, sie simuliert das Laden mit dem CompletableFuture. Die Idee ist, eine zweite Aufgabe warten zu lassen, bis die aktuell ausgeführte Aufgabe abgeschlossen ist (siehe Änderung in) BackgroundTask. Dadurch wird sichergestellt, dass maximal ein Task-Thread ausgeführt wird und maximal ein Task-Thread wartet. Weitere Anforderungen werden übersprungen, bis die laufende Aufgabe abgeschlossen ist und die nächste Anforderung bearbeitet werden kann.

public static class BackgroundTask extends Thread {

  private int id;
  private Thread pendingTask;

  public BackgroundTask(int id) {
    this.id = id;
  }

  public BackgroundTask(int id, Thread pendingTask) {
    this(id);
    this.pendingTask = pendingTask;
  }

  /**
   * Sleep for a given amount of time to simulate loading.
   */
  @Override
  public void run() {

    try {

      if (pendingTask != null && pendingTask.isAlive()) {
        pendingTask.join();
      }

      System.out.println("Start #" + id + ": " + Thread.currentThread());
      ...
    }
  }

public static class BackgroundTaskDualSwitch {

  private static BackgroundTask task1;
  private static BackgroundTask task2;

  public static synchronized boolean runTask(int taskId) {
    if (! isBusy(task1)) {
      if (isBusy(task2)) {
        task1 = new BackgroundTask(taskId, task2);
      } else {
        task1 = new BackgroundTask(taskId);
      }
      runAsync(task1);
      return true;
    } else if (! isBusy(task2)) {
      if (isBusy(task1)) {
        task2 = new BackgroundTask(taskId, task1);
      } else {
        task2 = new BackgroundTask(taskId);
      }
      runAsync(task2);
      return true;
    } else {
      return false; // SKIPPED
    }
  }

  private static void runAsync(BackgroundTask task) {
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {

      @Override
      public String get() {

        try {
          task.start();
          task.join();
        }
        catch (InterruptedException e) {
          e.printStackTrace();
        }
        return "task " + task.id;
      }
    });

    // display the data after they are loaded
    CompletableFuture<Void> future = completableFuture.thenAccept(x -> {

      System.out.println( "Background task finished:" + x); 

    });
  }

  private static boolean isBusy(BackgroundTask task) {
    return task != null && task.isAlive();
  }
}

/**
 * Simulates loading and showing data.
 * @param taskId Identifier of the current task
 */
public static void loadInBackground(int taskId) {

  // create the loading task
  if (! BackgroundTaskDualSwitch.runTask(taskId)) {
    System.out.println( "Background task ignored:task " + taskId); // SKIPPED
  }
}
...

Ausgabe ist:

Start #0: Thread[Thread-0,5,main]
Background task ignored:task 2
Finish #0: Thread[Thread-0,5,main]
Start #1: Thread[Thread-1,5,main]
Background task finished:task 0
Background task ignored:task 4
Finish #1: Thread[Thread-1,5,main]
Start #3: Thread[Thread-2,5,main]
Background task finished:task 1
Background task ignored:task 6
Finish #3: Thread[Thread-2,5,main]
Start #5: Thread[Thread-3,5,main]
Background task finished:task 3
Background task ignored:task 8
Finish #5: Thread[Thread-3,5,main]
Start #7: Thread[Thread-4,5,main]
Background task finished:task 5
Background task ignored:task 10
Finish #7: Thread[Thread-4,5,main]
Start #9: Thread[Thread-5,5,main]
Background task finished:task 7
Cancelled
geri
quelle
0

Der erste Thread, der mit der teuren Arbeit beginnt, benachrichtigt das Ergebnis mit einem Rückruf. Andere Threads, die versuchen, es auszuführen, werden in ExpensiveWork.notificables registriert. Sobald die teure Arbeit beendet ist, werden sie vom Thread benachrichtigt, der die Arbeit ausgeführt hat.

In der Zwischenzeit überprüfen die Threads alle 5 Sekunden das Ergebnis.

public class ExpensiveWorkTest {

    private final static int THREADS = 20;
    private final static long THREAD_TIMEOUT = 5000L;

    @Test
    public void example() throws InterruptedException {
        ExpensiveWork<String> expensiveWork = new ExpensiveWorkImpl();
        ExecutorService service = Executors.newFixedThreadPool(THREADS);
        for(int i=0; i<THREADS;i++) {
            service.execute(() ->{
                Notificable<String> notificable = new NotificableImpl();
                expensiveWork.execute(notificable);
                while(notificable.getExpensiveResult() == null) {
                    try {
                        Thread.sleep(THREAD_TIMEOUT);
                    } catch (InterruptedException e) {}
                }
                System.out.println(Thread.currentThread().getName()+" has the message: "+notificable.getExpensiveResult());
            });
        }
        service.awaitTermination(60, TimeUnit.SECONDS);
    }

    public static abstract class ExpensiveWork<T> {

        private final AtomicBoolean runnning = new AtomicBoolean(false);
        private List<Notificable<T>> notificables = Collections.synchronizedList(new ArrayList<>());

        public void execute(Notificable<T> notificable) {
            String id = Thread.currentThread().getName();
            System.out.println("Loading data for "+id);
            notificables.add(notificable);
            if(!runnning.getAndSet(true)) {
                System.out.println("Running the expensive work "+id);
                T expensiveResult = expensiveWork();
                notificables.stream().forEach(n -> n.callback(expensiveResult));
            } else {
                System.out.println(id+" will receive the response later");
            }
        }

        protected abstract T expensiveWork();

    }

    public static class ExpensiveWorkImpl extends ExpensiveWork<String>{

        @Override
        public String expensiveWork() {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {}
            return "<Expensive result>";
        }

    }

    public static interface Notificable<T> {
        void callback(T expensiveResult);
        T getExpensiveResult();
    }

    public static class NotificableImpl implements Notificable<String> {

        private volatile String expensiveResult;

        @Override
        public void callback(String expensiveResult) {
            this.expensiveResult = expensiveResult;
        }

        @Override
        public String getExpensiveResult() {
            return expensiveResult;
        }

    }

}

Und das ist die Ausgabe:

Loading data for pool-1-thread-6
Loading data for pool-1-thread-7
Loading data for pool-1-thread-9
pool-1-thread-9 will receive the response later
Loading data for pool-1-thread-8
pool-1-thread-8 will receive the response later
Loading data for pool-1-thread-1
Loading data for pool-1-thread-2
Loading data for pool-1-thread-12
pool-1-thread-12 will receive the response later
Loading data for pool-1-thread-3
pool-1-thread-3 will receive the response later
Loading data for pool-1-thread-4
Loading data for pool-1-thread-5
pool-1-thread-5 will receive the response later
pool-1-thread-4 will receive the response later
Loading data for pool-1-thread-14
pool-1-thread-14 will receive the response later
Loading data for pool-1-thread-13
Loading data for pool-1-thread-15
pool-1-thread-15 will receive the response later
pool-1-thread-2 will receive the response later
pool-1-thread-1 will receive the response later
Loading data for pool-1-thread-11
pool-1-thread-11 will receive the response later
Loading data for pool-1-thread-10
pool-1-thread-10 will receive the response later
pool-1-thread-7 will receive the response later
Running the expensive work pool-1-thread-6
Loading data for pool-1-thread-18
pool-1-thread-18 will receive the response later
Loading data for pool-1-thread-17
Loading data for pool-1-thread-16
pool-1-thread-16 will receive the response later
pool-1-thread-13 will receive the response later
Loading data for pool-1-thread-19
pool-1-thread-19 will receive the response later
pool-1-thread-17 will receive the response later
Loading data for pool-1-thread-20
pool-1-thread-20 will receive the response later
pool-1-thread-6 has the message: <Expensive result>
pool-1-thread-8 has the message: <Expensive result>
pool-1-thread-12 has the message: <Expensive result>
pool-1-thread-9 has the message: <Expensive result>
pool-1-thread-11 has the message: <Expensive result>
pool-1-thread-1 has the message: <Expensive result>
pool-1-thread-2 has the message: <Expensive result>
pool-1-thread-3 has the message: <Expensive result>
pool-1-thread-15 has the message: <Expensive result>
pool-1-thread-4 has the message: <Expensive result>
pool-1-thread-14 has the message: <Expensive result>
pool-1-thread-10 has the message: <Expensive result>
pool-1-thread-5 has the message: <Expensive result>
pool-1-thread-13 has the message: <Expensive result>
pool-1-thread-16 has the message: <Expensive result>
pool-1-thread-19 has the message: <Expensive result>
pool-1-thread-20 has the message: <Expensive result>
pool-1-thread-7 has the message: <Expensive result>
pool-1-thread-18 has the message: <Expensive result>
pool-1-thread-17 has the message: <Expensive result>
Ravenskater
quelle