Реализация сопрограмм в Java
этот вопрос связан с моим вопросом о существующие реализации сопрограммы в Java. Если, как я подозреваю, окажется, что в настоящее время нет полной реализации сопрограмм, доступных в Java, что потребуется для их реализации?
Как я уже сказал в этом вопросе, я знаю о следующем:
- вы можете реализовать "сопрограммы" в виде потоков/пулов потоков за кулисами.
- вы можете делать хитрые вещи с JVM байт-код за кулисами, чтобы сделать сопрограммы возможными.
- так называемая "машина да Винчи" реализация JVM имеет примитивы, которые делают сопрограммы выполнимыми без байт-код манипуляции.
- также возможны различные подходы на основе JNI к сопрограммам.
Я буду решать недостатки каждого из них по очереди.
потоковые сопрограммы
Это "решение" - это патология. Весь смысл сопрограмм заключается в том, чтобы избежать накладные расходы на потоковую обработку, блокировку, планирование ядра и т. д. Сопрограммы должны быть легкими и быстрыми и выполняться только в пространстве пользователя. Реализация их в условиях полноприводных резьб с жесткими ограничениями избавляет от всех преимуществ.
JVM байт-код манипуляции
Это решение более практично, хотя и немного трудно снять. Это примерно то же самое, что и переход на язык ассемблера для библиотек сопрограмм в C (сколько из них работает) с тем преимуществом, что у вас есть только одна архитектура, о которой нужно беспокоиться и получить право.
Он также связывает вас только с запуском кода на полностью совместимых стеках JVM (что означает, например, отсутствие Android), Если вы не можете найти способ сделать то же самое в несовместимом стеке. Однако если вы найдете способ сделать это, вы удвоили сложность системы и потребности в тестировании.
Машина Да Винчи
Da Машина Vinci отлично подходит для экспериментов, но поскольку она не является стандартной JVM, ее функции не будут доступны везде. Действительно, я подозреваю, что большинство производственных сред специально запретят использование машины Da Vinci. Таким образом, я мог бы использовать это, чтобы сделать классные эксперименты, но не для любого кода, который я ожидаю выпустить в реальный мир.
Это также имеет дополнительную проблему, аналогичную решению для обработки байт-кода JVM выше: не будет работать на альтернативных стеках (например Андроида.)
реализация JNI
Это решение делает точку делать это в Java вообще спорным. Каждая комбинация процессора и операционной системы требует независимого тестирования, и каждый из них является точкой потенциально разочаровывающего тонкого сбоя. В качестве альтернативы, конечно, я мог бы полностью привязать себя к одной платформе, но это тоже делает смысл делать вещи на Java полностью спорным.
Так...
есть ли способ реализовать сопрограммы в Java без использования одного из этих четырех методов? Или я буду вынужден использовать один из тех четырех, которые пахнут меньше всего (манипуляция JVM) вместо этого?
редактировать, чтобы добавить:
просто чтобы убедиться, что путаница содержится, это по теме вопрос мой другой, но не то же самое. Это один ищет существующей реализация в попытке избежать ненужного изобретения колеса. Это вопрос, касающийся того, как можно было бы реализовать сопрограммы на Java, если бы другой оказался без ответа. Цель состоит в том, чтобы держать разные вопросы в разных потоках.
6 ответов:
Я хотел бы взглянуть на это:http://www.chiark.greenend.org.uk / ~sgtatham/coroutines.html, это довольно интересно и должно обеспечить хорошее место для начала. Но, конечно, мы используем Java, поэтому мы можем сделать лучше (или, может быть, хуже, потому что нет макросов :))
из моего понимания с сопрограммами у вас обычно есть производитель и потребитель сопрограмма (или, по крайней мере, это наиболее распространенная схема). Но семантически вы не хочу, чтобы производитель звонил потребителю или наоборот, потому что это вводит асимметрию. Но учитывая то, как работают языки на основе стека, нам нужно будет, чтобы кто-то сделал вызов.
Итак, вот очень простая иерархия типов:
public interface CoroutineProducer<T> { public T Produce(); public boolean isDone(); } public interface CoroutineConsumer<T> { public void Consume(T t); } public class CoroutineManager { public static Execute<T>(CoroutineProducer<T> prod, CoroutineConsumer<T> con) { while(!prod.IsDone()) // really simple { T d = prod.Produce(); con.Consume(d); } } }
теперь, конечно, трудная часть реализация интерфейсы, в частности, трудно разбить вычисление на отдельные шаги. Для этого вам, вероятно, понадобится целый другой набор постоянные структуры управления. Основная идея заключается в том, что мы хотим имитировать нелокальную передачу управления (в конце концов, это похоже на то, что мы имитируем
goto
). Мы в основном хотим отойти от использования стека иpc
(программа-счетчик), сохраняя состояние наших текущих операций в куче а не в стеке. Поэтому нам понадобится куча вспомогательных классов.например:
допустим, что в идеальном мире вы хотел написать потребитель, который выглядел так (psuedocode):
boolean is_done; int other_state; while(!is_done) { //read input //parse input //yield input to coroutine //update is_done and other_state; }
нам нужно абстрагировать локальную переменную, как
is_done
иother_state
и нам нужно абстрагировать сам цикл while, потому что нашyield
как операция не будет использовать стек. Итак, давайте создадим абстракцию цикла while и связанные с ней классы:enum WhileState {BREAK, CONTINUE, YIELD} abstract class WhileLoop<T> { private boolean is_done; public boolean isDone() { return is_done;} private T rval; public T getReturnValue() {return rval;} protected void setReturnValue(T val) { rval = val; } public T loop() { while(true) { WhileState state = execute(); if(state == WhileState.YIELD) return getReturnValue(); else if(state == WhileState.BREAK) { is_done = true; return null; } } } protected abstract WhileState execute(); }
основной трюк здесь, чтобы двигаться local переменные класс переменные и блоки области поворота в классы, которые дают нам возможность "повторно ввести" наш "цикл" после получения нашего возвращаемого значения.
теперь для реализации нашего производителя
public class SampleProducer : CoroutineProducer<Object> { private WhileLoop<Object> loop;//our control structures become state!! public SampleProducer() { loop = new WhileLoop() { private int other_state;//our local variables become state of the control structure protected WhileState execute() { //this implements a single iteration of the loop if(is_done) return WhileState.BREAK; //read input //parse input Object calcluated_value = ...; //update is_done, figure out if we want to continue setReturnValue(calculated_value); return WhileState.YIELD; } }; } public Object Produce() { Object val = loop.loop(); return val; } public boolean isDone() { //we are done when the loop has exited return loop.isDone(); } }
подобные трюки могут быть сделаны для других основных структур потока управления. В идеале вы должны создать библиотеку этих вспомогательных классов, а затем использовать их для реализации этих простых интерфейсов, которые в конечном итоге дадут вам семантику совместных процедур. Я уверен, что все, что я написал здесь, может быть обобщено и расширено значительно.
Я бы предложил посмотреть на сопрограммы Котлина на JVM. Однако он попадает в другую категорию. Там нет манипуляции байт-код участвует, и он работает на Android, тоже. Однако вам придется написать свои сопрограммы в Котлине. Положительным моментом является то, что Kotlin предназначен для взаимодействия с Java в виду, так что вы все еще можете продолжать использовать все ваши библиотеки Java и свободно комбинировать код Kotlin и Java в одном проекте, даже поставив их бок о бок в одном и том же каталоги и пакеты.
этой руководство по kotlinx.Сорокины предоставляет множество примеров, в то время как сопрограммы конструкция документ объясняет все мотивации, и детали реализации.
Я просто наткнулся на этот вопрос и просто хочу упомянуть, что я думаю, что можно было бы реализовать сопрограммы или генераторы аналогичным образом, как это делает C#. Тем не менее, я на самом деле не использую Java, но CIL имеет довольно похожие ограничения, как и JVM.
The оператор yield в C# является чисто языковой функцией и не является частью байт-кода CIL. Компилятор C# просто создает скрытый отдельный класс для каждой функции генератора. Если вы используете оператор yield в функция она должна возвращать IEnumerator или IEnumerable. Компилятор "упаковывает" ваш код в класс, подобный statemachine.
компилятор C# может использовать некоторые "goto" в сгенерированном коде, чтобы упростить преобразование в statemachine. Я не знаю возможностей байт-кода Java, и если есть что-то вроде простого безусловного прыжка, но на "уровне сборки" это обычно возможно.
Как уже упоминалось, эта функция должна быть реализована в компиляторе. Поскольку у меня мало знаний о Java, и это компилятор, я не могу сказать, можно ли изменить / расширить компилятор, возможно, с помощью "препроцессора" или чего-то еще.
лично я люблю сопрограммы. Как разработчик Unity games я использую их довольно часто. Поскольку я много играю в Minecraft с ComputerCraft мне было любопытно, почему сопрограммы в Lua (LuaJ) реализованы с помощью потоков.
Котлин использует следующий подход для совместных процедур
(из https://kotlinlang.org/docs/reference/coroutines.html):сопрограммы полностью реализованы с помощью метода компиляции (не требуется поддержка со стороны виртуальной машины или ОС), а подвеска работает через преобразование кода. В принципе, каждая функция приостановки (оптимизация может применяться, но мы не будем вдаваться в это здесь) преобразуется в конечный автомат, где состояния соответствуют к приостановке звонков. Непосредственно перед приостановкой следующее состояние сохраняется в поле сгенерированного компилятором класса вместе с соответствующими локальными переменными и т. д. При возобновлении этой сопрограммы локальные переменные восстанавливаются, и конечный автомат переходит из состояния сразу после приостановки.
приостановленная сопрограмма может храниться и передаваться как объект, который сохраняет свое приостановленное состояние и локальные объекты. Тип таких объектов-продолжение и общее преобразование кода описанное здесь соответствует классическому стилю продолжения-прохождения. Следовательно, функции приостановки принимают дополнительный параметр продолжения типа под капотом.
Проверьте проектную документацию по адресу https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md
У меня есть класс Coroutine, который я использую в Java. Он основан на потоках, и использование потоков имеет то преимущество, что позволяет параллельную работу, что на многоядерных машинах может быть преимуществом. Поэтому вы можете рассмотреть подход, основанный на потоке.
есть еще один выбор здесь для Java6+
Python-образным сопрограмма реализации:
import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; class CorRunRAII { private final List<WeakReference<? extends CorRun>> resources = new ArrayList<>(); public CorRunRAII add(CorRun resource) { if (resource == null) { return this; } resources.add(new WeakReference<>(resource)); return this; } public CorRunRAII addAll(List<? extends CorRun> arrayList) { if (arrayList == null) { return this; } for (CorRun corRun : arrayList) { add(corRun); } return this; } @Override protected void finalize() throws Throwable { super.finalize(); for (WeakReference<? extends CorRun> corRunWeakReference : resources) { CorRun corRun = corRunWeakReference.get(); if (corRun != null) { corRun.stop(); } } } } class CorRunYieldReturn<ReceiveType, YieldReturnType> { public final AtomicReference<ReceiveType> receiveValue; public final LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue; CorRunYieldReturn(AtomicReference<ReceiveType> receiveValue, LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue) { this.receiveValue = receiveValue; this.yieldReturnValue = yieldReturnValue; } } interface CorRun<ReceiveType, YieldReturnType> extends Runnable, Callable<YieldReturnType> { boolean start(); void stop(); void stop(final Throwable throwable); boolean isStarted(); boolean isEnded(); Throwable getError(); ReceiveType getReceiveValue(); void setResultForOuter(YieldReturnType resultForOuter); YieldReturnType getResultForOuter(); YieldReturnType receive(ReceiveType value); ReceiveType yield(); ReceiveType yield(YieldReturnType value); <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another); <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another, final TargetReceiveType value); } abstract class CorRunSync<ReceiveType, YieldReturnType> implements CorRun<ReceiveType, YieldReturnType> { private ReceiveType receiveValue; public final List<WeakReference<CorRun>> potentialChildrenCoroutineList = new ArrayList<>(); // Outside private AtomicBoolean isStarted = new AtomicBoolean(false); private AtomicBoolean isEnded = new AtomicBoolean(false); private Throwable error; private YieldReturnType resultForOuter; @Override public boolean start() { boolean isStarted = this.isStarted.getAndSet(true); if ((! isStarted) && (! isEnded())) { receive(null); } return isStarted; } @Override public void stop() { stop(null); } @Override public void stop(Throwable throwable) { isEnded.set(true); if (throwable != null) { error = throwable; } for (WeakReference<CorRun> weakReference : potentialChildrenCoroutineList) { CorRun child = weakReference.get(); if (child != null) { child.stop(); } } } @Override public boolean isStarted() { return isStarted.get(); } @Override public boolean isEnded() { return isEnded.get(); } @Override public Throwable getError() { return error; } @Override public ReceiveType getReceiveValue() { return receiveValue; } @Override public void setResultForOuter(YieldReturnType resultForOuter) { this.resultForOuter = resultForOuter; } @Override public YieldReturnType getResultForOuter() { return resultForOuter; } @Override public synchronized YieldReturnType receive(ReceiveType value) { receiveValue = value; run(); return getResultForOuter(); } @Override public ReceiveType yield() { return yield(null); } @Override public ReceiveType yield(YieldReturnType value) { resultForOuter = value; return receiveValue; } @Override public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(CorRun<TargetReceiveType, TargetYieldReturnType> another) { return yieldFrom(another, null); } @Override public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(CorRun<TargetReceiveType, TargetYieldReturnType> another, TargetReceiveType value) { if (another == null || another.isEnded()) { throw new RuntimeException("Call null or isEnded coroutine"); } potentialChildrenCoroutineList.add(new WeakReference<CorRun>(another)); synchronized (another) { boolean isStarted = another.start(); boolean isJustStarting = ! isStarted; if (isJustStarting && another instanceof CorRunSync) { return another.getResultForOuter(); } return another.receive(value); } } @Override public void run() { try { this.call(); } catch (Exception e) { e.printStackTrace(); stop(e); return; } } } abstract class CorRunThread<ReceiveType, YieldReturnType> implements CorRun<ReceiveType, YieldReturnType> { private final ExecutorService childExecutorService = newExecutorService(); private ExecutorService executingOnExecutorService; private static final CorRunYieldReturn DUMMY_COR_RUN_YIELD_RETURN = new CorRunYieldReturn(new AtomicReference<>(null), new LinkedBlockingDeque<AtomicReference>()); private final CorRun<ReceiveType, YieldReturnType> self; public final List<WeakReference<CorRun>> potentialChildrenCoroutineList; private CorRunYieldReturn<ReceiveType, YieldReturnType> lastCorRunYieldReturn; private final LinkedBlockingDeque<CorRunYieldReturn<ReceiveType, YieldReturnType>> receiveQueue; // Outside private AtomicBoolean isStarted = new AtomicBoolean(false); private AtomicBoolean isEnded = new AtomicBoolean(false); private Future<YieldReturnType> future; private Throwable error; private final AtomicReference<YieldReturnType> resultForOuter = new AtomicReference<>(); CorRunThread() { executingOnExecutorService = childExecutorService; receiveQueue = new LinkedBlockingDeque<>(); potentialChildrenCoroutineList = new ArrayList<>(); self = this; } @Override public void run() { try { self.call(); } catch (Exception e) { stop(e); return; } stop(); } @Override public abstract YieldReturnType call(); @Override public boolean start() { return start(childExecutorService); } protected boolean start(ExecutorService executorService) { boolean isStarted = this.isStarted.getAndSet(true); if (!isStarted) { executingOnExecutorService = executorService; future = (Future<YieldReturnType>) executingOnExecutorService.submit((Runnable) self); } return isStarted; } @Override public void stop() { stop(null); } @Override public void stop(final Throwable throwable) { if (throwable != null) { error = throwable; } isEnded.set(true); returnYieldValue(null); // Do this for making sure the coroutine has checked isEnd() after getting a dummy value receiveQueue.offer(DUMMY_COR_RUN_YIELD_RETURN); for (WeakReference<CorRun> weakReference : potentialChildrenCoroutineList) { CorRun child = weakReference.get(); if (child != null) { if (child instanceof CorRunThread) { ((CorRunThread)child).tryStop(childExecutorService); } } } childExecutorService.shutdownNow(); } protected void tryStop(ExecutorService executorService) { if (this.executingOnExecutorService == executorService) { stop(); } } @Override public boolean isEnded() { return isEnded.get() || ( future != null && (future.isCancelled() || future.isDone()) ); } @Override public boolean isStarted() { return isStarted.get(); } public Future<YieldReturnType> getFuture() { return future; } @Override public Throwable getError() { return error; } @Override public void setResultForOuter(YieldReturnType resultForOuter) { this.resultForOuter.set(resultForOuter); } @Override public YieldReturnType getResultForOuter() { return this.resultForOuter.get(); } @Override public YieldReturnType receive(ReceiveType value) { LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue = new LinkedBlockingDeque<>(); offerReceiveValue(value, yieldReturnValue); try { AtomicReference<YieldReturnType> takeValue = yieldReturnValue.take(); return takeValue == null ? null : takeValue.get(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } @Override public ReceiveType yield() { return yield(null); } @Override public ReceiveType yield(final YieldReturnType value) { returnYieldValue(value); return getReceiveValue(); } @Override public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another) { return yieldFrom(another, null); } @Override public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another, final TargetReceiveType value) { if (another == null || another.isEnded()) { throw new RuntimeException("Call null or isEnded coroutine"); } boolean isStarted = false; potentialChildrenCoroutineList.add(new WeakReference<CorRun>(another)); synchronized (another) { if (another instanceof CorRunThread) { isStarted = ((CorRunThread)another).start(childExecutorService); } else { isStarted = another.start(); } boolean isJustStarting = ! isStarted; if (isJustStarting && another instanceof CorRunSync) { return another.getResultForOuter(); } TargetYieldReturnType send = another.receive(value); return send; } } @Override public ReceiveType getReceiveValue() { setLastCorRunYieldReturn(takeLastCorRunYieldReturn()); return lastCorRunYieldReturn.receiveValue.get(); } protected void returnYieldValue(final YieldReturnType value) { CorRunYieldReturn<ReceiveType, YieldReturnType> corRunYieldReturn = lastCorRunYieldReturn; if (corRunYieldReturn != null) { corRunYieldReturn.yieldReturnValue.offer(new AtomicReference<>(value)); } } protected void offerReceiveValue(final ReceiveType value, LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue) { receiveQueue.offer(new CorRunYieldReturn(new AtomicReference<>(value), yieldReturnValue)); } protected CorRunYieldReturn<ReceiveType, YieldReturnType> takeLastCorRunYieldReturn() { try { return receiveQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } protected void setLastCorRunYieldReturn(CorRunYieldReturn<ReceiveType,YieldReturnType> lastCorRunYieldReturn) { this.lastCorRunYieldReturn = lastCorRunYieldReturn; } protected ExecutorService newExecutorService() { return Executors.newCachedThreadPool(getThreadFactory()); } protected ThreadFactory getThreadFactory() { return new ThreadFactory() { @Override public Thread newThread(final Runnable runnable) { Thread thread = new Thread(runnable); thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread thread, Throwable throwable) { throwable.printStackTrace(); if (runnable instanceof CorRun) { CorRun self = (CorRun) runnable; self.stop(throwable); thread.interrupt(); } } }); return thread; } }; } }
теперь вы можете использовать pythonic coroutines таким образом (например, числа Фибоначчи)
Нить Версия:
class Fib extends CorRunThread<Integer, Integer> { @Override public Integer call() { Integer times = getReceiveValue(); do { int a = 1, b = 1; for (int i = 0; times != null && i < times; i++) { int temp = a + b; a = b; b = temp; } // A pythonic "yield", i.e., it returns `a` to the caller and waits `times` value from the next caller times = yield(a); } while (! isEnded()); setResultForOuter(Integer.MAX_VALUE); return getResultForOuter(); } } class MainRun extends CorRunThread<String, String> { @Override public String call() { // The fib coroutine would be recycled by its parent // (no requirement to call its start() and stop() manually) // Otherwise, if you want to share its instance and start/stop it manually, // please start it before being called by yieldFrom() and stop it in the end. Fib fib = new Fib(); String result = ""; Integer current; int times = 10; for (int i = 0; i < times; i++) { // A pythonic "yield from", i.e., it calls fib with `i` parameter and waits for returned value as `current` current = yieldFrom(fib, i); if (fib.getError() != null) { throw new RuntimeException(fib.getError()); } if (current == null) { continue; } if (i > 0) { result += ","; } result += current; } setResultForOuter(result); return result; } }
синхронизация(без резьбы) версия:
class Fib extends CorRunSync<Integer, Integer> { @Override public Integer call() { Integer times = getReceiveValue(); int a = 1, b = 1; for (int i = 0; times != null && i < times; i++) { int temp = a + b; a = b; b = temp; } yield(a); return getResultForOuter(); } } class MainRun extends CorRunSync<String, String> { @Override public String call() { CorRun<Integer, Integer> fib = null; try { fib = new Fib(); } catch (Exception e) { e.printStackTrace(); } String result = ""; Integer current; int times = 10; for (int i = 0; i < times; i++) { current = yieldFrom(fib, i); if (fib.getError() != null) { throw new RuntimeException(fib.getError()); } if (current == null) { continue; } if (i > 0) { result += ","; } result += current; } stop(); setResultForOuter(result); if (Utils.isEmpty(result)) { throw new RuntimeException("Error"); } return result; } }
исполнение(обе версии будут работать):
// Run the entry coroutine MainRun mainRun = new MainRun(); mainRun.start(); // Wait for mainRun ending for 5 seconds long startTimestamp = System.currentTimeMillis(); while(!mainRun.isEnded()) { if (System.currentTimeMillis() - startTimestamp > TimeUnit.SECONDS.toMillis(5)) { throw new RuntimeException("Wait too much time"); } } // The result should be "1,1,2,3,5,8,13,21,34,55" System.out.println(mainRun.getResultForOuter());