Распространение ThreadLocal на Новый Поток, полученный из ExecutorService


Я запускаю процесс в отдельном потоке с таймаутом, используя ExecutorService и Future (пример кода здесь ) (поток "порождение" происходит в аспекте AOP).

Теперь основной поток-это запросResteasy . Resteasy использует одну или несколько переменных ThreadLocal для хранения некоторой контекстной информации, которую мне нужно получить в какой-то момент в моем вызове метода Rest. Проблема в том, что поскольку поток Resteasy выполняется в новом потоке, то переменные ThreadLocal являются потерянный.

Как лучше всего "распространить" любую переменную ThreadLocal, используемую Resteasy, на новый поток? Похоже, что Resteasy использует более одной переменной ThreadLocal для отслеживания контекстной информации, и я хотел бы "слепо" передать всю информацию в новый поток.

Я посмотрел на подклассы ThreadPoolExecutor и использование метода beforeExecute для передачи текущего потока в пул, но я не смог найти способ передать переменные ThreadLocal к бассейну.

Есть предложения?

Спасибо

6 24

6 ответов:

Множество ThreadLocal экземпляров, связанных с потоком, хранятся в закрытых членах каждого Thread. Ваш единственный шанс перечислить их - это сделать некоторое размышление над Thread; таким образом, вы можете переопределить ограничения доступа к полям потока.

Как только вы можете получить набор ThreadLocal, Вы можете скопировать в фоновые потоки, используя beforeExecute() и afterExecute() крючки ThreadPoolExecutor, или создав Runnable оболочку для ваших задач, которая перехватывает вызов run(), чтобы установить unset необходимый ThreadLocal экземпляры. На самом деле, последний метод может работать лучше, так как он даст вам удобное место для хранения значений ThreadLocal во время постановки задачи в очередь.


обновление: Вот более конкретная иллюстрация второго подхода. Вопреки моему первоначальному описанию, все, что хранится в оболочке, - это вызывающий поток, который запрашивается при выполнении задачи.

static Runnable wrap(Runnable task)
{
  Thread caller = Thread.currentThread();
  return () -> {
    Iterable<ThreadLocal<?>> vars = copy(caller);
    try {
      task.run();
    }
    finally {
      for (ThreadLocal<?> var : vars)
        var.remove();
    }
  };
}

/**
 * For each {@code ThreadLocal} in the specified thread, copy the thread's 
 * value to the current thread.  
 * 
 * @param caller the calling thread
 * @return all of the {@code ThreadLocal} instances that are set on current thread
 */
private static Collection<ThreadLocal<?>> copy(Thread caller)
{
  /* Use a nasty bunch of reflection to do this. */
  throw new UnsupportedOperationException();
}

Как я понимаю вашу проблему, вы можете взглянуть на InheritableThreadLocal , который предназначен для передачи переменных ThreadLocal из контекста родительского потока в контекст дочернего потока

Основываясь на ответе @erickson, я написал этот код. Он работает для inheritableThreadLocals. Он строит список inheritableThreadLocals, используя тот же метод, что и в Thread contructor. Конечно, я использую для этого рефлексию. Также я переопределяю класс executor.

public class MyThreadPoolExecutor extends ThreadPoolExecutor
{
   @Override
   public void execute(Runnable command)
   {
      super.execute(new Wrapped(command, Thread.currentThread()));
   }
}

Обертка:

   private class Wrapped implements Runnable
   {
      private final Runnable task;

      private final Thread caller;

      public Wrapped(Runnable task, Thread caller)
      {
         this.task = task;
         this.caller = caller;
      }

      public void run()
      {
         Iterable<ThreadLocal<?>> vars = null;
         try
         {
            vars = copy(caller);
         }
         catch (Exception e)
         {
            throw new RuntimeException("error when coping Threads", e);
         }
         try {
            task.run();
         }
         finally {
            for (ThreadLocal<?> var : vars)
               var.remove();
         }
      }
   }

Метод копирования:

public static Iterable<ThreadLocal<?>> copy(Thread caller) throws Exception
   {
      List<ThreadLocal<?>> threadLocals = new ArrayList<>();
      Field field = Thread.class.getDeclaredField("inheritableThreadLocals");
      field.setAccessible(true);
      Object map = field.get(caller);
      Field table = Class.forName("java.lang.ThreadLocal$ThreadLocalMap").getDeclaredField("table");
      table.setAccessible(true);

      Method method = ThreadLocal.class
              .getDeclaredMethod("createInheritedMap", Class.forName("java.lang.ThreadLocal$ThreadLocalMap"));
      method.setAccessible(true);
      Object o = method.invoke(null, map);

      Field field2 = Thread.class.getDeclaredField("inheritableThreadLocals");
      field2.setAccessible(true);
      field2.set(Thread.currentThread(), o);

      Object tbl = table.get(o);
      int length = Array.getLength(tbl);
      for (int i = 0; i < length; i++)
      {
         Object entry = Array.get(tbl, i);
         Object value = null;
         if (entry != null)
         {
            Method referentField = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry").getMethod(
                    "get");
            referentField.setAccessible(true);
            value = referentField.invoke(entry);
            threadLocals.add((ThreadLocal<?>) value);
         }
      }
      return threadLocals;
   }

Вот пример передачи текущего LocaleContext в Родительском потоке в дочерний поток, охватываемый CompletableFuture[по умолчанию используется ForkJoinPool].

Просто определите все, что вы хотите сделать в дочернем потоке внутри управляемого блока. Поэтому, когда CompletableFuture выполняет запускаемый блок, его дочерний поток, который контролирует, и вуаля, у вас есть родительский ThreadLocal материал, установленный в дочернем ThreadLocal.

Проблема здесь не в том, что весь ThreadLocal является переписано заново. Копируется только локальный текст. Поскольку ThreadLocal имеет частный доступ только к тому потоку, к которому он принадлежит, слишком использовать отражение и пытаться получить и установить в дочернем устройстве слишком много дурацких вещей, которые могут привести к утечкам памяти или снижению производительности.

Так что если вы знаете параметры, которые вас интересуют из ThreadLocal, то это решение работает намного чище.

 public void parentClassMethod(Request request) {
        LocaleContext currentLocale = LocaleContextHolder.getLocaleContext();
        executeInChildThread(() -> {
                LocaleContextHolder.setLocaleContext(currentLocale);
                //Do whatever else you wanna do
            }));

        //Continue stuff you want to do with parent thread
}


private void executeInChildThread(Runnable runnable) {
    try {
        CompletableFuture.runAsync(runnable)
            .get();
    } catch (Exception e) {
        LOGGER.error("something is wrong");
    }
}

Мне не нравится рефлексивный подход. Альтернативным решением было бы реализовать executor wrapper и передать объект непосредственно в виде контекста ThreadLocal всем дочерним потокам, распространяющим Родительский контекст.

public class PropagatedObject {

    private ThreadLocal<ConcurrentHashMap<AbsorbedObjectType, Object>> data = new ThreadLocal<>();

   //put, set, merge methods, etc

}

==>

public class ObjectAwareExecutor extends AbstractExecutorService {

    private final ExecutorService delegate;
    private final PropagatedObject objectAbsorber;

    public ObjectAwareExecutor(ExecutorService delegate, PropagatedObject objectAbsorber){
        this.delegate = delegate;
        this.objectAbsorber = objectAbsorber;
    }
    @Override
    public void execute(final Runnable command) {

        final ConcurrentHashMap<String, Object> parentContext = objectAbsorber.get();
        delegate.execute(() -> {
            try{
                objectAbsorber.set(parentContext);
                command.run();
            }finally {
                parentContext.putAll(objectAbsorber.get());
                objectAbsorber.clean();
            }
        });
        objectAbsorber.merge(parentContext);
    }

Если вы посмотрите на код ThreadLocal, то увидите:

    public T get() {
        Thread t = Thread.currentThread();
        ...
    }

Текущий поток не может быть перезаписан.

Возможные решения:

  1. Посмотрите на java 7 fork/join механизм (но я думаю, что это плохой способ)

  2. Посмотрите наодобренный механизм перезаписи класса ThreadLocal в вашем JVM.

  3. Попробуйте переписать RESTEasy (вы можете использовать инструменты рефакторинга в вашей IDE, чтобы заменить все Использование ThreadLocal, это выглядит просто)