Ich habe eine Klasse, die Objekte von a nimmt BlockingQueue
und sie verarbeitet, indem sie take()
in einer Endlosschleife aufruft . Irgendwann weiß ich, dass der Warteschlange keine Objekte mehr hinzugefügt werden. Wie unterbreche ich die take()
Methode, damit sie nicht mehr blockiert?
Hier ist die Klasse, die die Objekte verarbeitet:
public class MyObjHandler implements Runnable {
private final BlockingQueue<MyObj> queue;
public class MyObjHandler(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
MyObj obj = queue.take();
// process obj here
// ...
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Und hier ist die Methode, mit der diese Klasse Objekte verarbeitet:
public void testHandler() {
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
new Thread(handler).start();
// get objects for handler to process
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
queue.put(i.next());
}
// what code should go here to tell the handler
// to stop waiting for more objects?
}
In diesem Fall wird der Thread möglicherweise unterbrochen, solange sich noch Elemente in der Warteschlange befinden, die auf die Verarbeitung warten. Möglicherweise möchten Sie
poll
statt verwendentake
, wodurch das Zeitlimit für den Verarbeitungsthread überschritten und beendet wird, wenn er eine Weile ohne neue Eingabe gewartet hat.quelle
Thread.sleep()
als Alternative zu einem richtigen Haken zu verwenden. In anderen Implementierungen stellen andere Threads möglicherweise Dinge in die Warteschlange, und die while-Schleife endet möglicherweise nie.take()
Implementierung könnte beispielsweise so aussehen: Estry { return take(); } catch (InterruptedException e) { E o = poll(); if (o == null) throw e; Thread.currentThread().interrupt(); return o; }
gibt jedoch keinen Grund, warum sie auf dieser Ebene implementiert werden muss, und eine etwas höhere Implementierung führt zu effizienterem Code (z. B. durch Vermeiden des Elements pro ElementInterruptedException
und / oder mitBlockingQueue.drainTo()
).Sehr spät, aber ich hoffe, das hilft auch anderen, da ich mich dem ähnlichen Problem gestellt und den
poll
von erickson oben vorgeschlagenen Ansatz mit einigen geringfügigen Änderungen verwendet habe.Dies löste beide Probleme
BlockingQueue
damit es weiß, dass es nicht mehr auf Elemente warten mussquelle
Finished
Variablevolatile
, um die Sichtbarkeit zwischen den Threads zu gewährleisten. Siehe stackoverflow.com/a/106787Unterbrechen Sie den Thread:
quelle
Oder nicht unterbrechen, es ist böse.
queue.notify()
, wenn es endet, rufen Sie aufqueue.done()
quelle