188-Throttle Emissions from `WalletViewModel.walletSnapshot`
This commit is contained in:
parent
bb2ec4d71d
commit
9536a966e6
|
@ -0,0 +1,67 @@
|
|||
package co.electriccoin.zcash.ui.common
|
||||
|
||||
import androidx.test.filters.FlakyTest
|
||||
import androidx.test.filters.SmallTest
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.flow
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertTrue
|
||||
import kotlin.time.Duration
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
import kotlin.time.ExperimentalTime
|
||||
import kotlin.time.TimeMark
|
||||
import kotlin.time.TimeSource
|
||||
|
||||
class FlowExtTest {
|
||||
|
||||
@OptIn(ExperimentalTime::class, ExperimentalCoroutinesApi::class)
|
||||
@Test
|
||||
@SmallTest
|
||||
fun throttle_one_sec() = runTest {
|
||||
val timer = TimeSource.Monotonic.markNow()
|
||||
val flow = flow {
|
||||
while (timer.elapsedNow() <= 5.seconds) {
|
||||
emit(1)
|
||||
}
|
||||
}.throttle(1.seconds)
|
||||
|
||||
var timeMark: TimeMark? = null
|
||||
flow.collect {
|
||||
if (timeMark == null) {
|
||||
timeMark = TimeSource.Monotonic.markNow()
|
||||
} else {
|
||||
assert(timeMark!!.elapsedNow() >= 1.seconds)
|
||||
timeMark = TimeSource.Monotonic.markNow()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@OptIn(ExperimentalTime::class)
|
||||
private fun raceConditionTest(duration: Duration): Boolean = runBlocking {
|
||||
val flow = (0..1000).asFlow().throttle(duration)
|
||||
|
||||
val values = mutableListOf<Int>()
|
||||
flow.collect {
|
||||
values.add(it)
|
||||
}
|
||||
|
||||
return@runBlocking values.zipWithNext().all { it.first <= it.second }
|
||||
}
|
||||
|
||||
@FlakyTest
|
||||
@Test
|
||||
fun stressTest() = runBlocking {
|
||||
for (i in 0..10) {
|
||||
assertTrue { raceConditionTest(0.001.seconds) }
|
||||
}
|
||||
for (i in 0..10) {
|
||||
assertTrue { raceConditionTest(0.0001.seconds) }
|
||||
}
|
||||
for (i in 0..10) {
|
||||
assertTrue { raceConditionTest(0.00001.seconds) }
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
package co.electriccoin.zcash.ui.common
|
||||
|
||||
import kotlinx.coroutines.Deferred
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.async
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.flow
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlin.time.Duration
|
||||
import kotlin.time.ExperimentalTime
|
||||
import kotlin.time.TimeSource
|
||||
|
||||
@OptIn(ExperimentalTime::class)
|
||||
fun <T> Flow<T>.throttle(
|
||||
duration: Duration,
|
||||
timeSource: TimeSource = TimeSource.Monotonic
|
||||
): Flow<T> = flow {
|
||||
coroutineScope {
|
||||
val context = coroutineContext
|
||||
val mutex = Mutex()
|
||||
|
||||
var timeMark = timeSource.markNow()
|
||||
var delayEmit: Deferred<Unit>? = null
|
||||
var firstValue = true
|
||||
var valueToEmit: T
|
||||
collect { value ->
|
||||
if (firstValue) {
|
||||
firstValue = false
|
||||
emit(value)
|
||||
timeMark = timeSource.markNow()
|
||||
return@collect
|
||||
}
|
||||
delayEmit?.cancel()
|
||||
valueToEmit = value
|
||||
|
||||
if (timeMark.elapsedNow() >= duration) {
|
||||
mutex.withLock {
|
||||
emit(valueToEmit)
|
||||
timeMark = timeSource.markNow()
|
||||
}
|
||||
} else {
|
||||
delayEmit = async(Dispatchers.Default) {
|
||||
mutex.withLock {
|
||||
delay(duration)
|
||||
withContext(context) {
|
||||
emit(valueToEmit)
|
||||
}
|
||||
timeMark = timeSource.markNow()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,6 +21,7 @@ import cash.z.ecc.sdk.model.PersistableWallet
|
|||
import cash.z.ecc.sdk.model.WalletAddresses
|
||||
import co.electriccoin.zcash.spackle.Twig
|
||||
import co.electriccoin.zcash.ui.common.ANDROID_STATE_FLOW_TIMEOUT
|
||||
import co.electriccoin.zcash.ui.common.throttle
|
||||
import co.electriccoin.zcash.ui.preference.EncryptedPreferenceKeys
|
||||
import co.electriccoin.zcash.ui.preference.EncryptedPreferenceSingleton
|
||||
import co.electriccoin.zcash.ui.preference.StandardPreferenceKeys
|
||||
|
@ -28,6 +29,7 @@ import co.electriccoin.zcash.ui.preference.StandardPreferenceSingleton
|
|||
import co.electriccoin.zcash.ui.screen.home.model.WalletSnapshot
|
||||
import co.electriccoin.zcash.work.WorkIds
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.FlowPreview
|
||||
import kotlinx.coroutines.channels.awaitClose
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
|
@ -48,6 +50,8 @@ import kotlinx.coroutines.launch
|
|||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
import kotlin.time.ExperimentalTime
|
||||
|
||||
// To make this more multiplatform compatible, we need to remove the dependency on Context
|
||||
// for loading the preferences.
|
||||
|
@ -121,7 +125,7 @@ class WalletViewModel(application: Application) : AndroidViewModel(application)
|
|||
null
|
||||
)
|
||||
|
||||
@OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class)
|
||||
@OptIn(ExperimentalCoroutinesApi::class, ExperimentalTime::class)
|
||||
val walletSnapshot: StateFlow<WalletSnapshot?> = synchronizer
|
||||
.flatMapLatest {
|
||||
if (null == it) {
|
||||
|
@ -130,6 +134,7 @@ class WalletViewModel(application: Application) : AndroidViewModel(application)
|
|||
it.toWalletSnapshot()
|
||||
}
|
||||
}
|
||||
.throttle(1.seconds)
|
||||
.stateIn(
|
||||
viewModelScope,
|
||||
SharingStarted.WhileSubscribed(ANDROID_STATE_FLOW_TIMEOUT),
|
||||
|
|
Loading…
Reference in New Issue