From ec6b714f925fcfabe2982cd9901ef649e7a523e2 Mon Sep 17 00:00:00 2001 From: Bruno Wieczorek Date: Sat, 27 Aug 2022 14:25:54 +0200 Subject: [PATCH] [#288] Replace Deprecated Usage of `ConflatedBroadcastChannel` Co-authored-by: Carter Jernigan --- .../cash/z/ecc/android/sdk/SdkSynchronizer.kt | 18 +++----- .../sdk/block/CompactBlockProcessor.kt | 43 ++++++------------- .../internal/ext/android/ComputableFlow.kt | 22 +++------- .../ext/android/ComputableFlowTest.kt | 35 +++++++++++++++ 4 files changed, 61 insertions(+), 57 deletions(-) create mode 100644 sdk-lib/src/test/java/cash/z/ecc/android/sdk/internal/ext/android/ComputableFlowTest.kt diff --git a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/SdkSynchronizer.kt b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/SdkSynchronizer.kt index fe07fcaf..4aa6ac93 100644 --- a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/SdkSynchronizer.kt +++ b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/SdkSynchronizer.kt @@ -66,11 +66,9 @@ import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.ConflatedBroadcastChannel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.first @@ -95,7 +93,6 @@ import kotlin.coroutines.EmptyCoroutineContext * @property processor saves the downloaded compact blocks to the cache and then scans those blocks for * data related to this wallet. */ -@OptIn(kotlinx.coroutines.ObsoleteCoroutinesApi::class) @FlowPreview @Suppress("TooManyFunctions") class SdkSynchronizer internal constructor( @@ -109,9 +106,7 @@ class SdkSynchronizer internal constructor( private val _saplingBalances = MutableStateFlow(null) private val _transparentBalances = MutableStateFlow(null) - // TODO [#288]: Remove Deprecated Usage of ConflatedBroadcastChannel - // TODO [#288]: https://github.com/zcash/zcash-android-wallet-sdk/issues/288 - private val _status = ConflatedBroadcastChannel(DISCONNECTED) + private val _status = MutableStateFlow(DISCONNECTED) /** * The lifespan of this Synchronizer. This scope is initialized once the Synchronizer starts @@ -173,10 +168,7 @@ class SdkSynchronizer internal constructor( * processor is finished scanning, the synchronizer updates transaction and balance info and * then emits a [SYNCED] status. */ - // TODO [#658] Replace ComputableFlow and asFlow() obsolete Coroutine usage - // TODO [#658] https://github.com/zcash/zcash-android-wallet-sdk/issues/658 - @Suppress("DEPRECATION") - override val status = _status.asFlow() + override val status = _status.asStateFlow() /** * Indicates the download progress of the Synchronizer. When progress reaches 100, that @@ -301,8 +293,8 @@ class SdkSynchronizer internal constructor( processor.stop() twig("Synchronizer::stop: coroutineScope.cancel()") coroutineScope.cancel() - twig("Synchronizer::stop: _status.cancel()") - _status.cancel() + twig("Synchronizer::stop: _status.value = STOPPED") + _status.value = STOPPED twig("Synchronizer::stop: COMPLETE") } } @@ -412,7 +404,7 @@ class SdkSynchronizer internal constructor( // ignore enhancing status for now // TODO [#682]: clean this up and handle enhancing gracefully // TODO [#682]: https://github.com/zcash/zcash-android-wallet-sdk/issues/682 - if (synchronizerStatus != ENHANCING) _status.send(synchronizerStatus) + if (synchronizerStatus != ENHANCING) _status.value = synchronizerStatus } }.launchIn(this) processor.start() diff --git a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/block/CompactBlockProcessor.kt b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/block/CompactBlockProcessor.kt index 25df0a16..0ab35267 100644 --- a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/block/CompactBlockProcessor.kt +++ b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/block/CompactBlockProcessor.kt @@ -46,10 +46,8 @@ import cash.z.wallet.sdk.rpc.Service import io.grpc.StatusRuntimeException import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers.IO -import kotlinx.coroutines.channels.ConflatedBroadcastChannel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.first import kotlinx.coroutines.isActive @@ -77,7 +75,6 @@ import kotlin.time.Duration.Companion.days * in when considering initial range to download. In most cases, this should be the birthday height * of the current wallet--the height before which we do not need to scan for transactions. */ -@OptIn(kotlinx.coroutines.ObsoleteCoroutinesApi::class) @OpenForTesting @Suppress("TooManyFunctions", "LargeClass") class CompactBlockProcessor internal constructor( @@ -129,12 +126,9 @@ class CompactBlockProcessor internal constructor( ) ) - // TODO [#288]: Remove Deprecated Usage of ConflatedBroadcastChannel - // TODO [#288]: https://github.com/zcash/zcash-android-wallet-sdk/issues/288 - private val _state: ConflatedBroadcastChannel = ConflatedBroadcastChannel(Initialized) - private val _progress = ConflatedBroadcastChannel(0) - private val _processorInfo = - ConflatedBroadcastChannel(ProcessorInfo(null, null, null, null, null)) + private val _state: MutableStateFlow = MutableStateFlow(Initialized) + private val _progress = MutableStateFlow(0) + private val _processorInfo = MutableStateFlow(ProcessorInfo(null, null, null, null, null)) private val _networkHeight = MutableStateFlow(null) private val processingMutex = Mutex() @@ -166,28 +160,19 @@ class CompactBlockProcessor internal constructor( * The flow of state values so that a wallet can monitor the state of this class without needing * to poll. */ - // TODO [#658] Replace ComputableFlow and asFlow() obsolete Coroutine usage - // TODO [#658] https://github.com/zcash/zcash-android-wallet-sdk/issues/658 - @Suppress("DEPRECATION") - val state = _state.asFlow() + val state = _state.asStateFlow() /** * The flow of progress values so that a wallet can monitor how much downloading remains * without needing to poll. */ - // TODO [#658] Replace ComputableFlow and asFlow() obsolete Coroutine usage - // TODO [#658] https://github.com/zcash/zcash-android-wallet-sdk/issues/658 - @Suppress("DEPRECATION") - val progress = _progress.asFlow() + val progress = _progress.asStateFlow() /** * The flow of detailed processorInfo like the range of blocks that shall be downloaded and * scanned. This gives the wallet a lot of insight into the work of this processor. */ - // TODO [#658] Replace ComputableFlow and asFlow() obsolete Coroutine usage - // TODO [#658] https://github.com/zcash/zcash-android-wallet-sdk/issues/658 - @Suppress("DEPRECATION") - val processorInfo = _processorInfo.asFlow() + val processorInfo = _processorInfo.asStateFlow() /** * The flow of network height. This value is updated at the same time that [currentInfo] is @@ -260,7 +245,7 @@ class CompactBlockProcessor internal constructor( } } } - } while (isActive && !_state.isClosedForSend && _state.value !is Stopped) + } while (isActive && _state.value !is Stopped) twig("processor complete") stop() } @@ -600,7 +585,7 @@ class CompactBlockProcessor internal constructor( if (null == range || range.isEmpty()) { twig("no blocks to download") } else { - _state.send(Downloading) + _state.value = Downloading Twig.sprout("downloading") twig("downloading blocks in range $range", -1) @@ -633,7 +618,7 @@ class CompactBlockProcessor internal constructor( } twig("downloaded $count blocks!") progress = (i / batches.toFloat() * 100).roundToInt() - _progress.send(progress) + _progress.value = progress val lastDownloadedHeight = downloader.getLastDownloadedHeight() updateProgress(lastDownloadedHeight = lastDownloadedHeight) downloadedBlockHeight = end + 1 @@ -641,7 +626,7 @@ class CompactBlockProcessor internal constructor( } Twig.clip("downloading") } - _progress.send(100) + _progress.value = 100 } /** @@ -756,7 +741,7 @@ class CompactBlockProcessor internal constructor( withContext(IO) { _networkHeight.value = networkBlockHeight - _processorInfo.send(currentInfo) + _processorInfo.value = currentInfo } } @@ -859,7 +844,7 @@ class CompactBlockProcessor internal constructor( lastDownloadRange = (targetHeight + 1)..currentNetworkBlockHeight ) } - _progress.send(0) + _progress.value = 0 } else { if (null == currentNetworkBlockHeight) { updateProgress( @@ -873,7 +858,7 @@ class CompactBlockProcessor internal constructor( ) } - _progress.send(0) + _progress.value = 0 if (null != lastScannedHeight) { val range = (targetHeight + 1)..lastScannedHeight @@ -1058,7 +1043,7 @@ class CompactBlockProcessor internal constructor( * Transmits the given state for this processor. */ private suspend fun setState(newState: State) { - _state.send(newState) + _state.value = newState } /** diff --git a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/ext/android/ComputableFlow.kt b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/ext/android/ComputableFlow.kt index b9829e45..c4550b84 100644 --- a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/ext/android/ComputableFlow.kt +++ b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/ext/android/ComputableFlow.kt @@ -3,37 +3,29 @@ package cash.z.ecc.android.sdk.internal.ext.android import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.ObsoleteCoroutinesApi -import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.ConflatedBroadcastChannel -import kotlinx.coroutines.flow.asFlow -import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.launch /* Adapted from ComputableLiveData */ -// TODO [#658] https://github.com/zcash/zcash-android-wallet-sdk/issues/658 -@Suppress("DEPRECATION") -@OptIn(ObsoleteCoroutinesApi::class) abstract class ComputableFlow(dispatcher: CoroutineDispatcher = Dispatchers.IO) { - private val computationScope: CoroutineScope = CoroutineScope(dispatcher + SupervisorJob()) - private val computationChannel: ConflatedBroadcastChannel = ConflatedBroadcastChannel() - internal val flow = computationChannel.asFlow().flowOn(dispatcher).onStart { - invalidate() - } + private val computationScope: CoroutineScope = CoroutineScope(dispatcher) + private val computationFlow: MutableSharedFlow = MutableSharedFlow(replay = 1) + internal val flow = computationFlow.asSharedFlow().onStart { invalidate() } /** * Invalidates the flow. * This will trigger a call to [.compute]. */ fun invalidate() { - computationScope.launch { computationChannel.send(compute()) } + computationScope.launch { computationFlow.emit(compute()) } } fun cancel() { computationScope.cancel() - computationChannel.cancel() + computationFlow.resetReplayCache() } protected abstract fun compute(): T diff --git a/sdk-lib/src/test/java/cash/z/ecc/android/sdk/internal/ext/android/ComputableFlowTest.kt b/sdk-lib/src/test/java/cash/z/ecc/android/sdk/internal/ext/android/ComputableFlowTest.kt new file mode 100644 index 00000000..4f3d10b5 --- /dev/null +++ b/sdk-lib/src/test/java/cash/z/ecc/android/sdk/internal/ext/android/ComputableFlowTest.kt @@ -0,0 +1,35 @@ +package cash.z.ecc.android.sdk.internal.ext.android + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.runBlocking +import org.junit.Test +import kotlin.test.assertEquals + +class ComputableFlowTest { + + private class TestComputableFlow : ComputableFlow(dispatcher = Dispatchers.Unconfined) { + var computationCounter: Int = 0 + private set + + override fun compute() = ++computationCounter + } + + @Test + fun shouldComputeOnInvalidation() { + val testComputableFlow = TestComputableFlow() + + testComputableFlow.invalidate() + testComputableFlow.invalidate() + + assertEquals(2, testComputableFlow.computationCounter) + } + + @Test + fun shouldInvalidateOnCollection() = runBlocking { + val testComputableFlow = TestComputableFlow() + testComputableFlow.flow.first() + + assertEquals(1, testComputableFlow.computationCounter) + } +}