[#288] Replace Deprecated Usage of `ConflatedBroadcastChannel`

Co-authored-by: Carter Jernigan <git@carterjernigan.com>
This commit is contained in:
Bruno Wieczorek 2022-08-27 14:25:54 +02:00 committed by GitHub
parent 12c23dd054
commit ec6b714f92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 61 additions and 57 deletions

View File

@ -66,11 +66,9 @@ import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.first
@ -95,7 +93,6 @@ import kotlin.coroutines.EmptyCoroutineContext
* @property processor saves the downloaded compact blocks to the cache and then scans those blocks for
* data related to this wallet.
*/
@OptIn(kotlinx.coroutines.ObsoleteCoroutinesApi::class)
@FlowPreview
@Suppress("TooManyFunctions")
class SdkSynchronizer internal constructor(
@ -109,9 +106,7 @@ class SdkSynchronizer internal constructor(
private val _saplingBalances = MutableStateFlow<WalletBalance?>(null)
private val _transparentBalances = MutableStateFlow<WalletBalance?>(null)
// TODO [#288]: Remove Deprecated Usage of ConflatedBroadcastChannel
// TODO [#288]: https://github.com/zcash/zcash-android-wallet-sdk/issues/288
private val _status = ConflatedBroadcastChannel<Synchronizer.Status>(DISCONNECTED)
private val _status = MutableStateFlow<Synchronizer.Status>(DISCONNECTED)
/**
* The lifespan of this Synchronizer. This scope is initialized once the Synchronizer starts
@ -173,10 +168,7 @@ class SdkSynchronizer internal constructor(
* processor is finished scanning, the synchronizer updates transaction and balance info and
* then emits a [SYNCED] status.
*/
// TODO [#658] Replace ComputableFlow and asFlow() obsolete Coroutine usage
// TODO [#658] https://github.com/zcash/zcash-android-wallet-sdk/issues/658
@Suppress("DEPRECATION")
override val status = _status.asFlow()
override val status = _status.asStateFlow()
/**
* Indicates the download progress of the Synchronizer. When progress reaches 100, that
@ -301,8 +293,8 @@ class SdkSynchronizer internal constructor(
processor.stop()
twig("Synchronizer::stop: coroutineScope.cancel()")
coroutineScope.cancel()
twig("Synchronizer::stop: _status.cancel()")
_status.cancel()
twig("Synchronizer::stop: _status.value = STOPPED")
_status.value = STOPPED
twig("Synchronizer::stop: COMPLETE")
}
}
@ -412,7 +404,7 @@ class SdkSynchronizer internal constructor(
// ignore enhancing status for now
// TODO [#682]: clean this up and handle enhancing gracefully
// TODO [#682]: https://github.com/zcash/zcash-android-wallet-sdk/issues/682
if (synchronizerStatus != ENHANCING) _status.send(synchronizerStatus)
if (synchronizerStatus != ENHANCING) _status.value = synchronizerStatus
}
}.launchIn(this)
processor.start()

View File

@ -46,10 +46,8 @@ import cash.z.wallet.sdk.rpc.Service
import io.grpc.StatusRuntimeException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.isActive
@ -77,7 +75,6 @@ import kotlin.time.Duration.Companion.days
* in when considering initial range to download. In most cases, this should be the birthday height
* of the current wallet--the height before which we do not need to scan for transactions.
*/
@OptIn(kotlinx.coroutines.ObsoleteCoroutinesApi::class)
@OpenForTesting
@Suppress("TooManyFunctions", "LargeClass")
class CompactBlockProcessor internal constructor(
@ -129,12 +126,9 @@ class CompactBlockProcessor internal constructor(
)
)
// TODO [#288]: Remove Deprecated Usage of ConflatedBroadcastChannel
// TODO [#288]: https://github.com/zcash/zcash-android-wallet-sdk/issues/288
private val _state: ConflatedBroadcastChannel<State> = ConflatedBroadcastChannel(Initialized)
private val _progress = ConflatedBroadcastChannel(0)
private val _processorInfo =
ConflatedBroadcastChannel(ProcessorInfo(null, null, null, null, null))
private val _state: MutableStateFlow<State> = MutableStateFlow(Initialized)
private val _progress = MutableStateFlow(0)
private val _processorInfo = MutableStateFlow(ProcessorInfo(null, null, null, null, null))
private val _networkHeight = MutableStateFlow<BlockHeight?>(null)
private val processingMutex = Mutex()
@ -166,28 +160,19 @@ class CompactBlockProcessor internal constructor(
* The flow of state values so that a wallet can monitor the state of this class without needing
* to poll.
*/
// TODO [#658] Replace ComputableFlow and asFlow() obsolete Coroutine usage
// TODO [#658] https://github.com/zcash/zcash-android-wallet-sdk/issues/658
@Suppress("DEPRECATION")
val state = _state.asFlow()
val state = _state.asStateFlow()
/**
* The flow of progress values so that a wallet can monitor how much downloading remains
* without needing to poll.
*/
// TODO [#658] Replace ComputableFlow and asFlow() obsolete Coroutine usage
// TODO [#658] https://github.com/zcash/zcash-android-wallet-sdk/issues/658
@Suppress("DEPRECATION")
val progress = _progress.asFlow()
val progress = _progress.asStateFlow()
/**
* The flow of detailed processorInfo like the range of blocks that shall be downloaded and
* scanned. This gives the wallet a lot of insight into the work of this processor.
*/
// TODO [#658] Replace ComputableFlow and asFlow() obsolete Coroutine usage
// TODO [#658] https://github.com/zcash/zcash-android-wallet-sdk/issues/658
@Suppress("DEPRECATION")
val processorInfo = _processorInfo.asFlow()
val processorInfo = _processorInfo.asStateFlow()
/**
* The flow of network height. This value is updated at the same time that [currentInfo] is
@ -260,7 +245,7 @@ class CompactBlockProcessor internal constructor(
}
}
}
} while (isActive && !_state.isClosedForSend && _state.value !is Stopped)
} while (isActive && _state.value !is Stopped)
twig("processor complete")
stop()
}
@ -600,7 +585,7 @@ class CompactBlockProcessor internal constructor(
if (null == range || range.isEmpty()) {
twig("no blocks to download")
} else {
_state.send(Downloading)
_state.value = Downloading
Twig.sprout("downloading")
twig("downloading blocks in range $range", -1)
@ -633,7 +618,7 @@ class CompactBlockProcessor internal constructor(
}
twig("downloaded $count blocks!")
progress = (i / batches.toFloat() * 100).roundToInt()
_progress.send(progress)
_progress.value = progress
val lastDownloadedHeight = downloader.getLastDownloadedHeight()
updateProgress(lastDownloadedHeight = lastDownloadedHeight)
downloadedBlockHeight = end + 1
@ -641,7 +626,7 @@ class CompactBlockProcessor internal constructor(
}
Twig.clip("downloading")
}
_progress.send(100)
_progress.value = 100
}
/**
@ -756,7 +741,7 @@ class CompactBlockProcessor internal constructor(
withContext(IO) {
_networkHeight.value = networkBlockHeight
_processorInfo.send(currentInfo)
_processorInfo.value = currentInfo
}
}
@ -859,7 +844,7 @@ class CompactBlockProcessor internal constructor(
lastDownloadRange = (targetHeight + 1)..currentNetworkBlockHeight
)
}
_progress.send(0)
_progress.value = 0
} else {
if (null == currentNetworkBlockHeight) {
updateProgress(
@ -873,7 +858,7 @@ class CompactBlockProcessor internal constructor(
)
}
_progress.send(0)
_progress.value = 0
if (null != lastScannedHeight) {
val range = (targetHeight + 1)..lastScannedHeight
@ -1058,7 +1043,7 @@ class CompactBlockProcessor internal constructor(
* Transmits the given state for this processor.
*/
private suspend fun setState(newState: State) {
_state.send(newState)
_state.value = newState
}
/**

View File

@ -3,37 +3,29 @@ package cash.z.ecc.android.sdk.internal.ext.android
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.launch
/* Adapted from ComputableLiveData */
// TODO [#658] https://github.com/zcash/zcash-android-wallet-sdk/issues/658
@Suppress("DEPRECATION")
@OptIn(ObsoleteCoroutinesApi::class)
abstract class ComputableFlow<T>(dispatcher: CoroutineDispatcher = Dispatchers.IO) {
private val computationScope: CoroutineScope = CoroutineScope(dispatcher + SupervisorJob())
private val computationChannel: ConflatedBroadcastChannel<T> = ConflatedBroadcastChannel()
internal val flow = computationChannel.asFlow().flowOn(dispatcher).onStart {
invalidate()
}
private val computationScope: CoroutineScope = CoroutineScope(dispatcher)
private val computationFlow: MutableSharedFlow<T> = MutableSharedFlow(replay = 1)
internal val flow = computationFlow.asSharedFlow().onStart { invalidate() }
/**
* Invalidates the flow.
* This will trigger a call to [.compute].
*/
fun invalidate() {
computationScope.launch { computationChannel.send(compute()) }
computationScope.launch { computationFlow.emit(compute()) }
}
fun cancel() {
computationScope.cancel()
computationChannel.cancel()
computationFlow.resetReplayCache()
}
protected abstract fun compute(): T

View File

@ -0,0 +1,35 @@
package cash.z.ecc.android.sdk.internal.ext.android
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.runBlocking
import org.junit.Test
import kotlin.test.assertEquals
class ComputableFlowTest {
private class TestComputableFlow : ComputableFlow<Int>(dispatcher = Dispatchers.Unconfined) {
var computationCounter: Int = 0
private set
override fun compute() = ++computationCounter
}
@Test
fun shouldComputeOnInvalidation() {
val testComputableFlow = TestComputableFlow()
testComputableFlow.invalidate()
testComputableFlow.invalidate()
assertEquals(2, testComputableFlow.computationCounter)
}
@Test
fun shouldInvalidateOnCollection() = runBlocking {
val testComputableFlow = TestComputableFlow()
testComputableFlow.flow.first()
assertEquals(1, testComputableFlow.computationCounter)
}
}