Gibt es einen ExecutorService, der den aktuellen Thread verwendet?

93

Was ich suche, ist eine kompatible Möglichkeit, die Verwendung eines Thread-Pools zu konfigurieren oder nicht. Im Idealfall sollte der Rest des Codes überhaupt nicht beeinflusst werden. Ich könnte einen Thread-Pool mit 1 Thread verwenden, aber das ist nicht ganz das, was ich will. Irgendwelche Ideen?

ExecutorService es = threads == 0 ? new CurrentThreadExecutor() : Executors.newThreadPoolExecutor(threads);

// es.execute / es.submit / new ExecutorCompletionService(es) etc
Michael Rutherfurd
quelle

Antworten:

69

Hier ist eine wirklich einfache Executor( ExecutorServicewohlgemerkt) Implementierung, die nur den aktuellen Thread verwendet. Dies aus "Java Concurrency in Practice" stehlen (wesentliche Lektüre).

public class CurrentThreadExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();
    }
}

ExecutorService ist eine aufwändigere Schnittstelle, könnte aber mit dem gleichen Ansatz behandelt werden.

überdenken
quelle
4
+1: Wie Sie sagen, könnte ein ExecutorService auf die gleiche Weise behandelt werden, möglicherweise durch Unterklasse AbstractExecutorService.
Paul Cager
@ Paul Yep, AbstractExecutorServicesieht aus wie der Weg zu gehen.
Überdenken Sie den
14
In Java8 können Sie dies auf nur reduzierenRunnable::run
Jon Freedman
@Juude wird immer auf dem Thread ausgeführt, der den Executor aufruft.
Gustav Karlsson
Ist es nicht der Sinn eines Executors mit demselben Thread, mehr Aufgaben in execute () planen zu können? Diese Antwort reicht nicht aus. Ich kann keine Antwort finden, die dies befriedigt.
Haelix
81

Sie können Guaven verwenden MoreExecutors.newDirectExecutorService()oder MoreExecutors.directExecutor()wenn Sie keine benötigen ExecutorService.

Wenn das Einschließen von Guave zu schwer ist, können Sie etwas fast genauso Gutes implementieren:

public final class SameThreadExecutorService extends ThreadPoolExecutor {
  private final CountDownLatch signal = new CountDownLatch(1);

  private SameThreadExecutorService() {
    super(1, 1, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(),
        new ThreadPoolExecutor.CallerRunsPolicy());
  }

  @Override public void shutdown() {
    super.shutdown();
    signal.countDown();
  }

  public static ExecutorService getInstance() {
    return SingletonHolder.instance;
  }

  private static class SingletonHolder {
    static ExecutorService instance = createInstance();    
  }

  private static ExecutorService createInstance() {
    final SameThreadExecutorService instance
        = new SameThreadExecutorService();

    // The executor has one worker thread. Give it a Runnable that waits
    // until the executor service is shut down.
    // All other submitted tasks will use the RejectedExecutionHandler
    // which runs tasks using the  caller's thread.
    instance.submit(new Runnable() {
        @Override public void run() {
          boolean interrupted = false;
          try {
            while (true) {
              try {
                instance.signal.await();
                break;
              } catch (InterruptedException e) {
                interrupted = true;
              }
            }
          } finally {
            if (interrupted) {
              Thread.currentThread().interrupt();
            }
          }
        }});
    return Executors.unconfigurableScheduledExecutorService(instance);
  }
}
NamshubWriter
quelle
1
Für Android wird Executors.unconfigurableExecutorService (Instanz) zurückgegeben.
Maragues
Wenn wir nur den aktuellen Thread verwenden , warum dann Synchronisationsprimitive? warum der Riegel?
Haelix
@haelix Der Latch wird benötigt, da jeder Thread den Executor herunterfahren kann, obwohl die Arbeit im selben Thread ausgeführt wird, in dem die Arbeit hinzugefügt wurde.
NamshubWriter
64

Java 8-Stil:

Executor e = Runnable::run;

lpandzic
quelle
7
Absolut dreckig. Ich liebe es.
Schurke
Was ist daran schmutzig? Es ist elegant :)
lpandzic
Es ist die beste Art von schmutzigem @Ipandzic, es ist ungewöhnlich und prägnant.
Schurke
12

Ich schrieb eine ExecutorServicebasierend auf der AbstractExecutorService.

/**
 * Executes all submitted tasks directly in the same thread as the caller.
 */
public class SameThreadExecutorService extends AbstractExecutorService {

    //volatile because can be viewed by other threads
    private volatile boolean terminated;

    @Override
    public void shutdown() {
        terminated = true;
    }

    @Override
    public boolean isShutdown() {
        return terminated;
    }

    @Override
    public boolean isTerminated() {
        return terminated;
    }

    @Override
    public boolean awaitTermination(long theTimeout, TimeUnit theUnit) throws InterruptedException {
        shutdown(); // TODO ok to call shutdown? what if the client never called shutdown???
        return terminated;
    }

    @Override
    public List<Runnable> shutdownNow() {
        return Collections.emptyList();
    }

    @Override
    public void execute(Runnable theCommand) {
        theCommand.run();
    }
}
Eric Obermühlner
quelle
abgeschlossenes Feld ist nicht mit synchronisiert geschützt.
Daneel S. Yaitskov
1
Das terminatedFeld @ DaneelS.Yaitskov profitiert nicht vom synchronisierten Zugriff basierend auf dem Code, der tatsächlich hier ist. Operationen an 32-Bit-Feldern sind in Java atomar.
Christopher Schultz
Ich nehme an, dass die oben beschriebene Methode isTerminated () nicht ganz richtig ist, da isTerminated () nur dann true zurückgeben soll, wenn derzeit keine Aufgaben ausgeführt werden. Guava verfolgt die Anzahl der Aufgaben in einer anderen Variablen, weshalb sie vermutlich beide Variablen mit einer Sperre schützen.
Jeremy K
6

Ich musste denselben "CurrentThreadExecutorService" zu Testzwecken verwenden, und obwohl alle vorgeschlagenen Lösungen nett waren (insbesondere die, die den Guaven-Weg erwähnte ), kam ich auf etwas Ähnliches, wie Peter Lawrey hier vorschlug .

Wie von Axelle Ziegler hier erwähnt , funktioniert Peters Lösung leider nicht wirklich, da ThreadPoolExecutorder maximumPoolSizeKonstruktorparameter überprüft wurde (dh maximumPoolSizenicht sein kann <=0).

Um dies zu umgehen, habe ich Folgendes getan:

private static ExecutorService currentThreadExecutorService() {
    CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
    return new ThreadPoolExecutor(0, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), callerRunsPolicy) {
        @Override
        public void execute(Runnable command) {
            callerRunsPolicy.rejectedExecution(command, this);
        }
    };
}
fabriziocucci
quelle
5

Sie können den RejectedExecutionHandler verwenden, um die Aufgabe im aktuellen Thread auszuführen.

public static final ThreadPoolExecutor CURRENT_THREAD_EXECUTOR = new ThreadPoolExecutor(0, 0, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        r.run();
    }
});

Sie brauchen immer nur eines davon.

Peter Lawrey
quelle
Klug! Wie sicher ist das (ehrliche Frage)? Gibt es Möglichkeiten, eine Aufgabe abzulehnen, wenn Sie sie im aktuellen Thread nicht ausführen möchten? Werden Aufgaben abgelehnt, wenn der ExecutorService heruntergefahren oder beendet wird?
Überdenken Sie den
Da die maximale Größe 0 ist, wird jede Aufgabe abgelehnt. Das abgelehnte Verhalten soll jedoch im aktuellen Thread ausgeführt werden. Es würde nur ein Problem geben, wenn die Aufgabe NICHT abgelehnt wird.
Peter Lawrey
8
Beachten Sie, dass diese Richtlinie bereits implementiert ist und Sie keine eigene definieren müssen java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy.
Jtahlborn
7
Es ist nicht mehr möglich, einen ThreadPoolExecutor mit einer maximalen Poolgröße von 0 zu erstellen. Ich denke, es wäre möglich, das Verhalten mit einer blockierenden Warteschlange der Größe 0 zu reproduzieren, aber keine Standardimplementierung scheint dies zuzulassen.
Axelle Ziegler
das wird aufgrund von {code} nicht kompiliert, wenn (corePoolSize <0 || MaximumPoolSize <= 0 || MaximumPoolSize <CorePoolSize || keepAliveTime <0) {Code} in java.util.ThreadPoolExecutor (mindestens openJdk 7)
Bogdan