Wie verwende ich MDC mit Thread-Pools?

146

In unserer Software verwenden wir MDC ausgiebig, um beispielsweise Sitzungs-IDs und Benutzernamen für Webanfragen zu verfolgen. Dies funktioniert gut, wenn Sie im ursprünglichen Thread ausgeführt werden. Es gibt jedoch viele Dinge, die im Hintergrund verarbeitet werden müssen. Dafür verwenden wir die Klassen java.concurrent.ThreadPoolExecutorund java.util.Timerzusammen mit einigen selbst gerollten asynchronen Ausführungsdiensten. Alle diese Dienste verwalten ihren eigenen Thread-Pool.

Dies ist, was das Handbuch von Logback über die Verwendung von MDC in einer solchen Umgebung zu sagen hat:

Eine Kopie des zugeordneten Diagnosekontexts kann nicht immer von Arbeitsthreads vom initiierenden Thread geerbt werden. Dies ist der Fall, wenn java.util.concurrent.Executors für die Thread-Verwaltung verwendet wird. Beispielsweise erstellt die newCachedThreadPool-Methode einen ThreadPoolExecutor und verfügt wie anderer Thread-Pooling-Code über eine komplexe Thread-Erstellungslogik.

In solchen Fällen wird empfohlen, MDC.getCopyOfContextMap () für den ursprünglichen (Master-) Thread aufzurufen, bevor eine Aufgabe an den Executor gesendet wird. Wenn die Aufgabe als erste Aktion ausgeführt wird, sollte sie MDC.setContextMapValues ​​() aufrufen, um die gespeicherte Kopie der ursprünglichen MDC-Werte dem neuen von Executor verwalteten Thread zuzuordnen.

Dies wäre in Ordnung, aber es ist sehr leicht zu vergessen, diese Anrufe hinzuzufügen, und es gibt keine einfache Möglichkeit, das Problem zu erkennen, bis es zu spät ist. Das einzige Anzeichen bei Log4j ist, dass Sie fehlende MDC-Informationen in den Protokollen erhalten, und bei Logback erhalten Sie veraltete MDC-Informationen (da der Thread im Laufflächenpool seinen MDC von der ersten Aufgabe erbt, die darauf ausgeführt wurde). Beides sind schwerwiegende Probleme in einem Produktionssystem.

Ich sehe unsere Situation in keiner Weise besonders, aber ich konnte im Web nicht viel über dieses Problem finden. Anscheinend ist dies nicht etwas, gegen das viele Menschen stoßen, also muss es einen Weg geben, dies zu vermeiden. Was machen wir hier falsch?

Lóránt Pintér
quelle
1
Wenn Ihre Anwendung in einer JEE-Umgebung bereitgestellt wird, können Sie Java-Interceptors verwenden, um den MDC-Kontext vor dem Aufruf von EJB festzulegen.
Maxim Kirilov
2
Ab Logback-Version 1.1.5 werden MDC-Werte nicht mehr von untergeordneten Threads geerbt.
Ceki
2
@Ceki Die Dokumentation muss aktualisiert werden: "Ein untergeordneter Thread erbt automatisch eine Kopie des zugeordneten Diagnosekontexts seines übergeordneten Threads." logback.qos.ch/manual/mdc.html
steffen
Ich habe eine Pull-Anfrage an slf4j erstellt, die das Problem der Verwendung von MDC über Threads hinweg löst (Link github.com/qos-ch/slf4j/pull/150 ). Vielleicht, wenn Leute kommentieren und danach fragen, werden sie die Änderung in den SLF4J aufnehmen :)
Männlich

Antworten:

79

Ja, dies ist ein häufiges Problem, auf das ich ebenfalls gestoßen bin. Es gibt einige Problemumgehungen (wie das manuelle Einstellen wie beschrieben), aber im Idealfall möchten Sie eine Lösung, die

  • Legt den MDC konsistent fest.
  • Vermeidet stillschweigende Fehler, bei denen der MDC falsch ist, Sie ihn aber nicht kennen. und
  • Minimiert Änderungen an der Verwendung von Thread-Pools (z. B. Unterklassen Callablemit MyCallableüberall oder ähnliche Hässlichkeit).

Hier ist eine Lösung, die ich verwende und die diese drei Anforderungen erfüllt. Der Code sollte selbsterklärend sein.

(Als Randnotiz kann dieser Executor erstellt und Guavas zugeführt werden MoreExecutors.listeningDecorator(), wenn Sie Guavas verwenden ListanableFuture.)

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by jlevy.
 * Date: 6/14/13
 */
public class MdcThreadPoolExecutor extends ThreadPoolExecutor {

    final private boolean useFixedContext;
    final private Map<String, Object> fixedContext;

    /**
     * Pool where task threads take MDC from the submitting thread.
     */
    public static MdcThreadPoolExecutor newWithInheritedMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(null, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    /**
     * Pool where task threads take fixed MDC from the thread that creates the pool.
     */
    @SuppressWarnings("unchecked")
    public static MdcThreadPoolExecutor newWithCurrentMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                          TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(MDC.getCopyOfContextMap(), corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue);
    }

    /**
     * Pool where task threads always have a specified, fixed MDC.
     */
    public static MdcThreadPoolExecutor newWithFixedMdc(Map<String, Object> fixedContext, int corePoolSize,
                                                        int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                                        BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(fixedContext, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    private MdcThreadPoolExecutor(Map<String, Object> fixedContext, int corePoolSize, int maximumPoolSize,
                                  long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.fixedContext = fixedContext;
        useFixedContext = (fixedContext != null);
    }

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return useFixedContext ? fixedContext : MDC.getCopyOfContextMap();
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}
jlevy
quelle
Falls der vorherige Kontext nicht leer ist, ist es nicht immer Müll? Warum trägst du es herum?
Djjeck
2
Richtig; es sollte nicht eingestellt werden. Es scheint nur eine gute Hygiene zu sein, z. B. wenn die wrap () -Methode von jemand anderem auf der Straße freigelegt und angewendet wurde.
Jlevy
Können Sie einen Verweis darauf geben, wie dieser MdcThreadPoolExecutor von Log4J2 angehängt oder referenziert wurde? Gibt es irgendwo, wo wir speziell auf diese Klasse verweisen müssen, oder wird dies "automatisch" durchgeführt? Ich benutze keine Guave. Ich könnte, aber ich würde gerne wissen, ob es einen anderen Weg gibt, bevor ich es benutze.
JCB
Wenn ich Ihre Frage richtig verstehe, lautet die Antwort "Ja". Es handelt sich um "magische" threadlokale Variablen in SLF4J - siehe Implementierungen von MDC.setContextMap () usw. Außerdem wird SLF4J verwendet, nicht Log4J, was vorzuziehen ist wie es mit Log4j, Logback und anderen Protokollierungs-Setups funktioniert.
Jlevy
1
Nur der Vollständigkeit halber : Wenn Sie Spring's ThreadPoolTaskExecutoranstelle von einfachem Java verwenden ThreadPoolExecutor, können Sie das MdcTaskDecoratorunter moelholm.com/2017/07/24/…
Pino
27

Wir sind auf ein ähnliches Problem gestoßen. Möglicherweise möchten Sie ThreadPoolExecutor erweitern und before / afterExecute-Methoden überschreiben, um die erforderlichen MDC-Aufrufe auszuführen, bevor Sie neue Threads starten / stoppen.

Kennzeichen
quelle
10
Die Methoden beforeExecute(Thread, Runnable)und afterExecute(Runnable, Throwable)können in anderen Fällen hilfreich sein, aber ich bin nicht sicher, wie dies zum Festlegen von MDCs funktionieren wird. Sie werden beide unter dem gespawnten Thread ausgeführt. Dies bedeutet, dass Sie zuvor in der Lage sein müssen, die aktualisierte Karte aus dem Hauptthread abzurufen beforeExecute.
Kenston Choi
Besser, MDCs im Filter festzulegen, dh wenn die Anforderung von der Geschäftslogik verarbeitet wird, wird der Kontext nicht aktualisiert. Ich denke nicht, dass wir MDC überall in der Anwendung aktualisieren sollten
dereck
15

IMHO ist die beste Lösung:

  • verwenden ThreadPoolTaskExecutor
  • implementieren Sie Ihre eigenen TaskDecorator
  • benutze es: executor.setTaskDecorator(new LoggingTaskDecorator());

Der Dekorateur kann so aussehen:

private final class LoggingTaskDecorator implements TaskDecorator {

    @Override
    public Runnable decorate(Runnable task) {
        // web thread
        Map<String, String> webThreadContext = MDC.getCopyOfContextMap();
        return () -> {
            // work thread
            try {
                // TODO: is this thread safe?
                MDC.setContextMap(webThreadContext);
                task.run();
            } finally {
                MDC.clear();
            }
        };
    }

}
Tomáš Myšík
quelle
Entschuldigung, ich bin mir nicht sicher, was du meinst. UPDATE: Ich denke ich sehe jetzt, wird meine Antwort verbessern.
Tomáš Myšík
6

So mache ich es mit festen Thread-Pools und Executoren:

ExecutorService executor = Executors.newFixedThreadPool(4);
Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();

Im Gewindeteil:

executor.submit(() -> {
    MDC.setContextMap(mdcContextMap);
    // my stuff
});
Amaury D.
quelle
2

Ähnlich wie bei den zuvor veröffentlichten Lösungen können die newTaskForMethoden für Runnableund Callableüberschrieben werden, um das Argument (siehe akzeptierte Lösung) beim Erstellen der zu umbrechen RunnableFuture.

Hinweis: Folglich muss die Methode von executorService' submitanstelle der executeMethode aufgerufen werden.

Für die würden ScheduledThreadPoolExecutordie decorateTaskMethoden stattdessen überschrieben.

Mein Schlüssel_
quelle
2

Wenn Sie auf dieses Problem in einer Spring Framework-bezogenen Umgebung stoßen, in der Sie Aufgaben mithilfe von @AsyncAnmerkungen ausführen, können Sie die Aufgaben mithilfe des TaskDecorator- Ansatzes dekorieren . Ein Beispiel dafür finden Sie hier: https://moelholm.com/blog/2017/07/24/spring-43-using-a-taskdecorator-to-copy-mdc-data-to-async-threads

Ich war mit diesem Problem konfrontiert und der obige Artikel hat mir geholfen, es anzugehen. Deshalb teile ich es hier.

Soner
quelle
0

Eine andere Variante, die den hier vorhandenen Antworten ähnelt, besteht darin ExecutorService, einen Delegaten zu implementieren und an ihn weiterzuleiten. Bei Verwendung von Generika kann der tatsächliche Delegat dennoch angezeigt werden, falls Statistiken abgerufen werden sollen (solange keine anderen Änderungsmethoden verwendet werden).

Referenzcode:

public class MDCExecutorService<D extends ExecutorService> implements ExecutorService {

    private final D delegate;

    public MDCExecutorService(D delegate) {
        this.delegate = delegate;
    }

    @Override
    public void shutdown() {
        delegate.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        return delegate.shutdownNow();
    }

    @Override
    public boolean isShutdown() {
        return delegate.isShutdown();
    }

    @Override
    public boolean isTerminated() {
        return delegate.isTerminated();
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.awaitTermination(timeout, unit);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return delegate.submit(wrap(task));
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return delegate.submit(wrap(task), result);
    }

    @Override
    public Future<?> submit(Runnable task) {
        return delegate.submit(wrap(task));
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks));
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks), timeout, unit);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return delegate.invokeAny(wrapCollection(tasks));
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return delegate.invokeAny(wrapCollection(tasks), timeout, unit);
    }

    @Override
    public void execute(Runnable command) {
        delegate.execute(wrap(command));
    }

    public D getDelegate() {
        return delegate;
    }

    /* Copied from https://github.com/project-ncl/pnc/blob/master/common/src/main/java/org/jboss/pnc/common
    /concurrent/MDCWrappers.java */

    private static Runnable wrap(final Runnable runnable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                runnable.run();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Callable<T> wrap(final Callable<T> callable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                return callable.call();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Consumer<T> wrap(final Consumer<T> consumer) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return (t) -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                consumer.accept(t);
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Collection<Callable<T>> wrapCollection(Collection<? extends Callable<T>> tasks) {
        Collection<Callable<T>> wrapped = new ArrayList<>();
        for (Callable<T> task : tasks) {
            wrapped.add(wrap(task));
        }
        return wrapped;
    }
}
Kenston Choi
quelle
-3

Ich konnte dies mit folgendem Ansatz lösen

Im Hauptthread (Application.java, der Einstiegspunkt meiner Anwendung)

static public Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();

In der Ausführungsmethode der Klasse, die von Executer aufgerufen wird

MDC.setContextMap(Application.mdcContextMap);
Smishra
quelle