Каков рекомендуемый способ отсрочить строительство Котлина?


Я пытаюсь опросить paginated API и предоставить новые элементы пользователю по мере их появления.

fun connect(): Sequence<T> = buildSequence {
    while (true) {
        // result is a List<T>
        val result = dataSource.getFirstPage()
        yieldAll(/* the new data in `result` */)

        // Block the thread for a little bit
    }
}

Вот пример использования:

for (item in connect()) {
    // do something as each item is made available
}

Моей первой мыслью было использовать функцию delay, но я получаю следующее сообщение:

Ограниченные приостановленные функции могут вызывать только функции-члены или расширения, приостанавливающие функции в их ограниченной области видимости сопрограммы

Это подпись для buildSequence:

public fun <T> buildSequence(builderAction: suspend SequenceBuilder<T>.() -> Unit): Sequence<T>

Я думаю, что это сообщение означает, что я могу использовать только функции suspend в SequenceBuilder: yield и yieldAll, а также использование произвольных вызовов функций suspend не допускаются.

Прямо сейчас я использую это, чтобы блокировать построение последовательности на одну секунду после каждого опроса API:

val resumeTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(1)
while (resumeTime > System.nanoTime()) {
    // do nothing
}
Это работает, но на самом деле не кажется хорошим решением. Кто-нибудь сталкивался с этой проблемой раньше?
1 2

1 ответ:

Почему это не работает? Некоторые исследования Когда мы смотрим на buildSequence, мы видим, что он принимает builderAction: suspend SequenceBuilder<T>.() -> Unit в качестве аргумента. Как клиент этого метода, вы сможете передать suspend лямбду, которая имеет SequenceBuilder в качестве приемника (читайте о лямбде с приемником здесь).
Сам SequenceBuilder аннотируется RestrictSuspension:

@RestrictsSuspension
@SinceKotlin("1.1")
public abstract class SequenceBuilder<in T> ...

Аннотация определяется и комментируется следующим образом:

/**
 * Classes and interfaces marked with this annotation are restricted
 * when used as receivers for extension `suspend` functions. 
 * These `suspend` extensions can only invoke other member or extension     
 * `suspend` functions on this particular receiver only 
 * and are restricted from calling arbitrary suspension functions.
 */
@SinceKotlin("1.1") @Target(AnnotationTarget.CLASS) @Retention(AnnotationRetention.BINARY)
public annotation class RestrictsSuspension

Как говорится в документации RestrictSuspension, в случае buildSequence Вы можете передать лямбда с SequenceBuilder в качестве приемника, но с ограниченными возможностями, так как вы сможете вызывать только "другие функции члена или расширения suspend на этом конкретном приемнике". Это означает, что блок, переданный в buildSequence, может вызвать любой метод, определенный в SequenceBuilder (например yield, yieldAll). Поскольку, с другой стороны, блок "ограничен от вызова произвольных функций приостановки", использование delay не работает. Результирующая ошибка компилятора проверяет его:

Ограничение приостановлено функции могут вызывать только функции-члены или расширения, приостанавливающие функции в их ограниченной области видимости сопрограммы.

В конечном счете, вы должны знать, что buildSequence создает сопрограмму, которая является примером синхронной сопрограммы. В вашем примере код последовательности будет выполняться в том же потоке, который использует последовательность, вызывая connect().

Как задержать последовательность?

Как мы узнали, buildSequenceсоздает синхронную последовательность. Это прекрасно использовать обычную блокировку потока здесь:
fun connect(): Sequence<T> = buildSequence {
    while (true) {
        val result = dataSource.getFirstPage()
        yieldAll(result)
        Thread.sleep(1000)
    }
}

Но вы действительно хотите, чтобы весь поток был заблокирован? В качестве альтернативы можно реализовать асинхронные последовательности, как описано здесь. В результате использование delay и других приостанавливающих функций будет допустимым.