[#646] SDK sync process resumes correctly

- Previously we had one range for each sync which was used for each
  phase of sync process. Newly there is separate range for each phase of
  the sync process.
- I added `InternalSyncProgress` utility. This utility tracks progress
  of some phases. And it is able to compute right ranges which should be
  used for syncing.
- Some unused download code from `CompactBlockProcessor` is removed.

Fix tests

Address review comments

- Rebase to master
- Update how range for `processBatchFinished()` is computed.
- Refactor `InternalSyncProgress`
- Add tests for `InternalSyncProgress`

Address review comments

Change how latest downloaded block is tracked

- Cache DB is no longer used to track which block was downloaded as
  latest. `InternalSyncProgress` is used to do that.
- Thanks to that #660 is fixed. And cache DB can be completely removed
  after sync process.
- Added sleep(1) to some darkside tests when latest block height is set.
  Otherwise lightwalletd in darkside mode doesn't have time to setup and
  tests are flaky.

Fix TransactionEnhancementTests.testBasicEnhancement test
This commit is contained in:
Michal Fousek 2022-11-23 19:05:49 +01:00
parent 3b7202c922
commit e01c83690f
28 changed files with 578 additions and 389 deletions

View File

@ -65,7 +65,7 @@ class SyncBlocksViewController: UIViewController {
.store(in: &notificationCancellables) .store(in: &notificationCancellables)
} }
self.lastMetric self.lastMetric
.throttle(for: 5, scheduler: DispatchQueue.main, latest: true) .throttle(for: 5, scheduler: DispatchQueue.main, latest: true)
.receive(on: DispatchQueue.main) .receive(on: DispatchQueue.main)
.sink { report in .sink { report in
@ -218,7 +218,6 @@ class SyncBlocksViewController: UIViewController {
} }
} }
struct ProcessorMetrics { struct ProcessorMetrics {
var minHeight: BlockHeight var minHeight: BlockHeight
var maxHeight: BlockHeight var maxHeight: BlockHeight
@ -244,8 +243,16 @@ struct ProcessorMetrics {
.init( .init(
minHeight: min(prev.minHeight, current.startHeight), minHeight: min(prev.minHeight, current.startHeight),
maxHeight: max(prev.maxHeight, current.progressHeight), maxHeight: max(prev.maxHeight, current.progressHeight),
maxDuration: compareDuration(prev.maxDuration, (current.duration, current.progressHeight - current.batchSize ... current.progressHeight), max), maxDuration: compareDuration(
minDuration: compareDuration(prev.minDuration, (current.duration, current.progressHeight - current.batchSize ... current.progressHeight), min), prev.maxDuration,
(current.duration, current.progressHeight - current.batchSize ... current.progressHeight),
max
),
minDuration: compareDuration(
prev.minDuration,
(current.duration, current.progressHeight - current.batchSize ... current.progressHeight),
min
),
cummulativeDuration: prev.cummulativeDuration + current.duration, cummulativeDuration: prev.cummulativeDuration + current.duration,
measuredCount: prev.measuredCount + 1 measuredCount: prev.measuredCount + 1
) )
@ -260,7 +267,6 @@ struct ProcessorMetrics {
} }
} }
extension ProcessorMetrics: CustomDebugStringConvertible { extension ProcessorMetrics: CustomDebugStringConvertible {
var debugDescription: String { var debugDescription: String {
""" """
@ -281,7 +287,6 @@ extension ProcessorMetrics: CustomDebugStringConvertible {
} }
} }
extension CompactBlockRange { extension CompactBlockRange {
var description: String { var description: String {
"\(self.lowerBound) ... \(self.upperBound)" "\(self.lowerBound) ... \(self.upperBound)"

View File

@ -98,21 +98,6 @@ extension CompactBlockStorage: CompactBlockRepository {
return try await task.value return try await task.value
} }
func latestBlock() throws -> ZcashCompactBlock {
let dataColumn = self.dataColumn()
let heightColumn = self.heightColumn()
let query = compactBlocksTable()
.select(dataColumn, heightColumn)
.order(heightColumn.desc)
.limit(1)
guard let blockData = try dbProvider.connection().prepare(query).first(where: { _ in return true }) else {
throw StorageError.latestBlockNotFound
}
return ZcashCompactBlock(height: Int(blockData[heightColumn]), data: Data(blob: blockData[dataColumn]))
}
func write(blocks: [ZcashCompactBlock]) async throws { func write(blocks: [ZcashCompactBlock]) async throws {
let task = Task(priority: .userInitiated) { let task = Task(priority: .userInitiated) {
try insert(blocks) try insert(blocks)

View File

@ -43,6 +43,7 @@ extension CompactBlockProcessor {
buffer.append(zcashCompactBlock) buffer.append(zcashCompactBlock)
if buffer.count >= blockBufferSize { if buffer.count >= blockBufferSize {
try await storage.write(blocks: buffer) try await storage.write(blocks: buffer)
await blocksBufferWritten(buffer)
buffer.removeAll(keepingCapacity: true) buffer.removeAll(keepingCapacity: true)
} }
@ -54,6 +55,7 @@ extension CompactBlockProcessor {
notifyProgress(.download(progress)) notifyProgress(.download(progress))
} }
try await storage.write(blocks: buffer) try await storage.write(blocks: buffer)
await blocksBufferWritten(buffer)
buffer.removeAll(keepingCapacity: true) buffer.removeAll(keepingCapacity: true)
} catch { } catch {
guard let err = error as? LightWalletServiceError, case .userCancelled = err else { guard let err = error as? LightWalletServiceError, case .userCancelled = err else {
@ -61,6 +63,11 @@ extension CompactBlockProcessor {
} }
} }
} }
private func blocksBufferWritten(_ buffer: [ZcashCompactBlock]) async {
guard let lastBlock = buffer.last else { return }
await internalSyncProgress.set(lastBlock.height, .latestDownloadedBlockHeight)
}
} }
extension CompactBlockProcessor { extension CompactBlockProcessor {
@ -77,84 +84,3 @@ extension CompactBlockProcessor {
} }
} }
} }
extension CompactBlockProcessor {
enum CompactBlockBatchDownloadError: Error {
case startHeightMissing
case batchDownloadFailed(range: CompactBlockRange, error: Error?)
}
func compactBlockBatchDownload(
range: CompactBlockRange,
batchSize: Int = 100,
maxRetries: Int = 5
) async throws {
try Task.checkCancellation()
var startHeight = range.lowerBound
let targetHeight = range.upperBound
do {
let localDownloadedHeight = try await self.storage.latestHeightAsync()
if localDownloadedHeight != BlockHeight.empty() && localDownloadedHeight > startHeight {
LoggerProxy.warn("provided startHeight (\(startHeight)) differs from local latest downloaded height (\(localDownloadedHeight))")
startHeight = localDownloadedHeight + 1
}
var currentHeight = startHeight
notifyProgress(
.download(
BlockProgress(
startHeight: currentHeight,
targetHeight: targetHeight,
progressHeight: currentHeight
)
)
)
while !Task.isCancelled && currentHeight <= targetHeight {
var retries = 0
var success = true
var localError: Error?
let range = CompactBlockRange(uncheckedBounds: (lower: currentHeight, upper: min(currentHeight + batchSize, targetHeight)))
repeat {
do {
let stream: AsyncThrowingStream<ZcashCompactBlock, Error> = service.blockRange(range)
var blocks: [ZcashCompactBlock] = []
for try await compactBlock in stream {
blocks.append(compactBlock)
}
try storage.insert(blocks)
success = true
} catch {
success = false
localError = error
retries += 1
}
} while !Task.isCancelled && !success && retries < maxRetries
if retries >= maxRetries {
throw CompactBlockBatchDownloadError.batchDownloadFailed(range: range, error: localError)
}
notifyProgress(
.download(
BlockProgress(
startHeight: startHeight,
targetHeight: targetHeight,
progressHeight: range.upperBound
)
)
)
currentHeight = range.upperBound + 1
}
} catch {
throw error
}
}
}

View File

@ -65,6 +65,7 @@ extension CompactBlockProcessor {
// fetch transactions // fetch transactions
do { do {
guard let transactions = try transactionRepository.findTransactions(in: blockRange, limit: Int.max), !transactions.isEmpty else { guard let transactions = try transactionRepository.findTransactions(in: blockRange, limit: Int.max), !transactions.isEmpty else {
await internalSyncProgress.set(range.upperBound, .latestEnhancedHeight)
LoggerProxy.debug("no transactions detected on range: \(blockRange.printRange)") LoggerProxy.debug("no transactions detected on range: \(blockRange.printRange)")
return return
} }
@ -88,6 +89,8 @@ extension CompactBlockProcessor {
) )
) )
) )
await internalSyncProgress.set(confirmedTx.minedHeight, .latestEnhancedHeight)
} catch { } catch {
retries += 1 retries += 1
LoggerProxy.error("could not enhance txId \(transaction.transactionId.toHexStringTxId()) - Error: \(error)") LoggerProxy.error("could not enhance txId \(transaction.transactionId.toHexStringTxId()) - Error: \(error)")
@ -106,6 +109,8 @@ extension CompactBlockProcessor {
notifyTransactions(foundTxs, in: blockRange) notifyTransactions(foundTxs, in: blockRange)
} }
await internalSyncProgress.set(range.upperBound, .latestEnhancedHeight)
if Task.isCancelled { if Task.isCancelled {
LoggerProxy.debug("Warning: compactBlockEnhancement on range \(range) cancelled") LoggerProxy.debug("Warning: compactBlockEnhancement on range \(range) cancelled")
} }

View File

@ -369,8 +369,6 @@ public actor CompactBlockProcessor {
var rustBackend: ZcashRustBackendWelding.Type var rustBackend: ZcashRustBackendWelding.Type
private var retryAttempts: Int = 0 private var retryAttempts: Int = 0
private var backoffTimer: Timer? private var backoffTimer: Timer?
private var lowerBoundHeight: BlockHeight?
private var latestBlockHeight: BlockHeight
private var lastChainValidationFailure: BlockHeight? private var lastChainValidationFailure: BlockHeight?
private var consecutiveChainValidationErrors: Int = 0 private var consecutiveChainValidationErrors: Int = 0
var processingError: Error? var processingError: Error?
@ -385,6 +383,8 @@ public actor CompactBlockProcessor {
private var cancelableTask: Task<Void, Error>? private var cancelableTask: Task<Void, Error>?
let internalSyncProgress = InternalSyncProgress(storage: UserDefaults.standard)
/// Initializes a CompactBlockProcessor instance /// Initializes a CompactBlockProcessor instance
/// - Parameters: /// - Parameters:
/// - service: concrete implementation of `LightWalletService` protocol /// - service: concrete implementation of `LightWalletService` protocol
@ -447,7 +447,6 @@ public actor CompactBlockProcessor {
self.storage = storage self.storage = storage
self.config = config self.config = config
self.transactionRepository = repository self.transactionRepository = repository
self.latestBlockHeight = config.walletBirthday
self.accountRepository = accountRepository self.accountRepository = accountRepository
} }
@ -489,13 +488,6 @@ public actor CompactBlockProcessor {
} }
} }
static func nextBatchBlockRange(latestHeight: BlockHeight, latestDownloadedHeight: BlockHeight, walletBirthday: BlockHeight) -> CompactBlockRange {
let lowerBound = latestDownloadedHeight <= walletBirthday ? walletBirthday : latestDownloadedHeight + 1
let upperBound = latestHeight
return lowerBound ... upperBound
}
/// Starts the CompactBlockProcessor instance and starts downloading and processing blocks /// Starts the CompactBlockProcessor instance and starts downloading and processing blocks
/// ///
/// triggers the blockProcessorStartedDownloading notification /// triggers the blockProcessorStartedDownloading notification
@ -557,7 +549,7 @@ public actor CompactBlockProcessor {
public func rewindTo(_ height: BlockHeight?) async throws -> BlockHeight { public func rewindTo(_ height: BlockHeight?) async throws -> BlockHeight {
guard shouldStart else { throw CompactBlockProcessorError.rewindAttemptWhileProcessing } guard shouldStart else { throw CompactBlockProcessorError.rewindAttemptWhileProcessing }
let lastDownloaded = try downloader.lastDownloadedBlockHeight() let lastDownloaded = await internalSyncProgress.latestDownloadedBlockHeight
let height = Int32(height ?? lastDownloaded) let height = Int32(height ?? lastDownloaded)
let nearestHeight = rustBackend.getNearestRewindHeight( let nearestHeight = rustBackend.getNearestRewindHeight(
dbData: config.dataDb, dbData: config.dataDb,
@ -582,10 +574,12 @@ public actor CompactBlockProcessor {
} }
// clear cache // clear cache
try downloader.rewind(to: BlockHeight(rewindHeight)) let rewindBlockHeight = BlockHeight(rewindHeight)
try downloader.rewind(to: rewindBlockHeight)
await internalSyncProgress.rewind(to: rewindBlockHeight)
self.lastChainValidationFailure = nil self.lastChainValidationFailure = nil
self.lowerBoundHeight = try? downloader.lastDownloadedBlockHeight() return rewindBlockHeight
return BlockHeight(rewindHeight)
} }
func validateServer() async { func validateServer() async {
@ -605,37 +599,61 @@ public actor CompactBlockProcessor {
} }
/// Processes new blocks on the given range based on the configuration set for this instance /// Processes new blocks on the given range based on the configuration set for this instance
func processNewBlocks(range: CompactBlockRange, latestBlockHeight: BlockHeight) async { func processNewBlocks(ranges: SyncRanges) async {
self.foundBlocks = true self.foundBlocks = true
self.backoffTimer?.invalidate() self.backoffTimer?.invalidate()
self.backoffTimer = nil self.backoffTimer = nil
cancelableTask = Task(priority: .userInitiated) { cancelableTask = Task(priority: .userInitiated) {
do { do {
let lastDownloadedBlockHeight = try downloader.lastDownloadedBlockHeight() LoggerProxy.debug("""
Syncing with ranges:
downloadRange: \(ranges.downloadRange?.lowerBound ?? -1)...\(ranges.downloadRange?.upperBound ?? -1)
scanRange: \(ranges.scanRange?.lowerBound ?? -1)...\(ranges.scanRange?.upperBound ?? -1)
enhanceRange: \(ranges.enhanceRange?.lowerBound ?? -1)...\(ranges.enhanceRange?.upperBound ?? -1)
fetchUTXORange: \(ranges.fetchUTXORange?.lowerBound ?? -1)...\(ranges.fetchUTXORange?.upperBound ?? -1)
""")
var anyActionExecuted = false
try storage.createTable()
if let range = ranges.downloadRange {
anyActionExecuted = true
LoggerProxy.debug("Downloading with range: \(range.lowerBound)...\(range.upperBound)")
// It may happen that sync process is interrupted in scanning phase. And then when sync process is resumed we already have
// blocks downloaded.
//
// Therefore we want to skip downloading in case that we already have everything downloaded.
if lastDownloadedBlockHeight < latestBlockHeight {
try await compactBlockStreamDownload( try await compactBlockStreamDownload(
blockBufferSize: config.downloadBufferSize, blockBufferSize: config.downloadBufferSize,
startHeight: range.lowerBound, startHeight: range.lowerBound,
targetHeight: range.upperBound targetHeight: range.upperBound
) )
try await compactBlockValidation()
} }
try storage.createTable() if let range = ranges.scanRange {
anyActionExecuted = true
LoggerProxy.debug("Scanning with range: \(range.lowerBound)...\(range.upperBound)")
try await compactBlockBatchScanning(range: range)
}
if let range = ranges.enhanceRange {
anyActionExecuted = true
LoggerProxy.debug("Enhancing with range: \(range.lowerBound)...\(range.upperBound)")
try await compactBlockEnhancement(range: range)
}
if let range = ranges.fetchUTXORange {
anyActionExecuted = true
LoggerProxy.debug("Fetching UTXO with range: \(range.lowerBound)...\(range.upperBound)")
try await fetchUnspentTxOutputs(range: range)
}
try await compactBlockValidation()
try await compactBlockBatchScanning(range: range)
try await compactBlockEnhancement(range: range)
try await fetchUnspentTxOutputs(range: range)
try await handleSaplingParametersIfNeeded() try await handleSaplingParametersIfNeeded()
try await removeCacheDB()
if !Task.isCancelled { if !Task.isCancelled {
await processBatchFinished(range: range) await processBatchFinished(height: anyActionExecuted ? ranges.latestBlockHeight : nil)
} }
} catch { } catch {
LoggerProxy.error("Sync failed with error: \(error)") LoggerProxy.error("Sync failed with error: \(error)")
@ -720,27 +738,6 @@ public actor CompactBlockProcessor {
await self.setTimer() await self.setTimer()
} }
func retryProcessing(range: CompactBlockRange) async {
cancelableTask?.cancel()
// update retries
self.retryAttempts += 1
self.processingError = nil
guard self.retryAttempts < config.retries else {
self.notifyError(CompactBlockProcessorError.maxAttemptsReached(attempts: self.retryAttempts))
self.stop()
return
}
do {
try downloader.rewind(to: max(range.lowerBound, self.config.walletBirthday))
// process next batch
await nextBatch()
} catch {
await self.fail(error)
}
}
func mapError(_ error: Error) -> CompactBlockProcessorError { func mapError(_ error: Error) -> CompactBlockProcessorError {
if let processorError = error as? CompactBlockProcessorError { if let processorError = error as? CompactBlockProcessorError {
return processorError return processorError
@ -780,22 +777,18 @@ public actor CompactBlockProcessor {
downloader: self.downloader, downloader: self.downloader,
transactionRepository: transactionRepository, transactionRepository: transactionRepository,
config: self.config, config: self.config,
rustBackend: self.rustBackend rustBackend: self.rustBackend,
internalSyncProgress: internalSyncProgress
) )
switch nextState { switch nextState {
case .finishProcessing(let height): case .finishProcessing(let height):
self.latestBlockHeight = height
await self.processingFinished(height: height) await self.processingFinished(height: height)
case .processNewBlocks(let range, let latestBlockHeight): case .processNewBlocks(let ranges):
self.latestBlockHeight = range.upperBound await self.processNewBlocks(ranges: ranges)
self.lowerBoundHeight = range.lowerBound
await self.processNewBlocks(range: range, latestBlockHeight: latestBlockHeight)
case let .wait(latestHeight, latestDownloadHeight): case let .wait(latestHeight, latestDownloadHeight):
// Lightwalletd might be syncing // Lightwalletd might be syncing
self.lowerBoundHeight = latestDownloadHeight
self.latestBlockHeight = latestHeight
LoggerProxy.info( LoggerProxy.info(
"Lightwalletd might be syncing: latest downloaded block height is: \(latestDownloadHeight)" + "Lightwalletd might be syncing: latest downloaded block height is: \(latestDownloadHeight) " +
"while latest blockheight is reported at: \(latestHeight)" "while latest blockheight is reported at: \(latestHeight)"
) )
await self.processingFinished(height: latestDownloadHeight) await self.processingFinished(height: latestDownloadHeight)
@ -827,6 +820,7 @@ public actor CompactBlockProcessor {
do { do {
try downloader.rewind(to: rewindHeight) try downloader.rewind(to: rewindHeight)
await internalSyncProgress.rewind(to: rewindHeight)
// notify reorg // notify reorg
NotificationSender.default.post( NotificationSender.default.post(
@ -844,21 +838,15 @@ public actor CompactBlockProcessor {
} }
} }
internal func processBatchFinished(range: CompactBlockRange) async { internal func processBatchFinished(height: BlockHeight?) async {
guard processingError == nil else {
await retryProcessing(range: range)
return
}
retryAttempts = 0 retryAttempts = 0
consecutiveChainValidationErrors = 0 consecutiveChainValidationErrors = 0
guard !range.isEmpty else { if let height {
await processingFinished(height: range.upperBound) await processingFinished(height: height)
return } else {
await nextBatch()
} }
await nextBatch()
} }
private func processingFinished(height: BlockHeight) async { private func processingFinished(height: BlockHeight) async {
@ -880,26 +868,9 @@ public actor CompactBlockProcessor {
} }
private func removeCacheDB() async throws { private func removeCacheDB() async throws {
let latestBlock: ZcashCompactBlock
do {
latestBlock = try storage.latestBlock()
} catch let error {
// If we don't have anything downloaded we don't need to remove DB and we also don't want to throw error and error out whole sync process.
if let err = error as? StorageError, case .latestBlockNotFound = err {
return
} else {
throw error
}
}
storage.closeDBConnection() storage.closeDBConnection()
try FileManager.default.removeItem(at: config.cacheDb) try FileManager.default.removeItem(at: config.cacheDb)
try storage.createTable() try storage.createTable()
// Latest downloaded block needs to be preserved because after the sync process is interrupted it must be correctly resumed. And for that
// we need correct information which was downloaded as latest.
try await storage.write(blocks: [latestBlock])
LoggerProxy.info("Cache removed") LoggerProxy.info("Cache removed")
} }
@ -917,9 +888,8 @@ public actor CompactBlockProcessor {
""" """
Timer triggered: Starting compact Block processor!. Timer triggered: Starting compact Block processor!.
Processor State: \(await self.state) Processor State: \(await self.state)
latestHeight: \(await self.latestBlockHeight) latestHeight: \(try await self.transactionRepository.lastScannedHeight())
attempts: \(await self.retryAttempts) attempts: \(await self.retryAttempts)
lowerbound: \(String(describing: await self.lowerBoundHeight))
""" """
) )
await self.start() await self.start()
@ -1195,51 +1165,33 @@ extension CompactBlockProcessor {
downloader: CompactBlockDownloading, downloader: CompactBlockDownloading,
transactionRepository: TransactionRepository, transactionRepository: TransactionRepository,
config: Configuration, config: Configuration,
rustBackend: ZcashRustBackendWelding.Type rustBackend: ZcashRustBackendWelding.Type,
) async throws -> NextState { internalSyncProgress: InternalSyncProgress
) async throws -> CompactBlockProcessor.NextState {
// It should be ok to not create new Task here because this method is already async. But for some reason something not good happens
// when Task is not created here. For example tests start failing. Reason is unknown at this time.
let task = Task(priority: .userInitiated) { let task = Task(priority: .userInitiated) {
do { let info = try await service.getInfo()
let info = try await service.getInfo()
try CompactBlockProcessor.validateServerInfo( try CompactBlockProcessor.validateServerInfo(
info, info,
saplingActivation: config.saplingActivation, saplingActivation: config.saplingActivation,
localNetwork: config.network, localNetwork: config.network,
rustBackend: rustBackend rustBackend: rustBackend
) )
let lastDownloadedBlockHeight = try downloader.lastDownloadedBlockHeight() await internalSyncProgress.migrateIfNeeded(latestDownloadedBlockHeightFromCacheDB: try downloader.lastDownloadedBlockHeight())
let latestBlockheight = try service.latestBlockHeight()
// Syncing process can be interrupted in any phase. And here it must be detected in which phase is syncing process. let latestBlockHeight = try service.latestBlockHeight()
let latestDownloadedBlockHeight: BlockHeight let latestScannedHeight = try transactionRepository.lastScannedHeight()
// This means that there are some blocks that are not downloaded yet.
if lastDownloadedBlockHeight < latestBlockheight {
latestDownloadedBlockHeight = max(config.walletBirthday, lastDownloadedBlockHeight)
} else {
// Here all the blocks are downloaded and last scan height should be then used to compute processing range.
latestDownloadedBlockHeight = max(config.walletBirthday, try transactionRepository.lastScannedHeight())
}
return try await internalSyncProgress.computeNextState(
if latestDownloadedBlockHeight < latestBlockheight { latestBlockHeight: latestBlockHeight,
return NextState.processNewBlocks( latestScannedHeight: latestScannedHeight,
range: CompactBlockProcessor.nextBatchBlockRange( walletBirthday: config.walletBirthday
latestHeight: latestBlockheight, )
latestDownloadedHeight: latestDownloadedBlockHeight,
walletBirthday: config.walletBirthday
),
latestBlockHeight: latestBlockheight
)
} else if latestBlockheight == latestDownloadedBlockHeight {
return .finishProcessing(height: latestBlockheight)
}
return .wait(latestHeight: latestBlockheight, latestDownloadHeight: latestBlockheight)
} catch {
throw error
}
} }
return try await task.value return try await task.value
} }
} }

View File

@ -31,7 +31,11 @@ extension CompactBlockProcessor {
.flatMap({ $0 }) .flatMap({ $0 })
var utxos: [UnspentTransactionOutputEntity] = [] var utxos: [UnspentTransactionOutputEntity] = []
let stream: AsyncThrowingStream<UnspentTransactionOutputEntity, Error> = downloader.fetchUnspentTransactionOutputs(tAddresses: tAddresses.map { $0.stringEncoded }, startHeight: config.walletBirthday) let stream: AsyncThrowingStream<UnspentTransactionOutputEntity, Error> = downloader.fetchUnspentTransactionOutputs(
tAddresses: tAddresses.map { $0.stringEncoded },
startHeight: config.walletBirthday
)
for try await transaction in stream { for try await transaction in stream {
utxos.append(transaction) utxos.append(transaction)
} }
@ -50,6 +54,8 @@ extension CompactBlockProcessor {
height: utxo.height, height: utxo.height,
networkType: config.network.networkType networkType: config.network.networkType
) ? refreshed.append(utxo) : skipped.append(utxo) ) ? refreshed.append(utxo) : skipped.append(utxo)
await internalSyncProgress.set(utxo.height, .latestUTXOFetchedHeight)
} catch { } catch {
LoggerProxy.error("failed to put utxo - error: \(error)") LoggerProxy.error("failed to put utxo - error: \(error)")
skipped.append(utxo) skipped.append(utxo)
@ -64,6 +70,8 @@ extension CompactBlockProcessor {
userInfo: [CompactBlockProcessorNotificationKey.refreshedUTXOs: result] userInfo: [CompactBlockProcessorNotificationKey.refreshedUTXOs: result]
) )
await internalSyncProgress.set(range.upperBound, .latestUTXOFetchedHeight)
if Task.isCancelled { if Task.isCancelled {
LoggerProxy.debug("Warning: fetchUnspentTxOutputs on range \(range) cancelled") LoggerProxy.debug("Warning: fetchUnspentTxOutputs on range \(range) cancelled")
} }

View File

@ -10,7 +10,7 @@ import Foundation
extension CompactBlockProcessor { extension CompactBlockProcessor {
enum NextState: Equatable { enum NextState: Equatable {
case finishProcessing(height: BlockHeight) case finishProcessing(height: BlockHeight)
case processNewBlocks(range: CompactBlockRange, latestBlockHeight: BlockHeight) case processNewBlocks(ranges: SyncRanges)
case wait(latestHeight: BlockHeight, latestDownloadHeight: BlockHeight) case wait(latestHeight: BlockHeight, latestDownloadHeight: BlockHeight)
} }
@ -26,7 +26,8 @@ extension CompactBlockProcessor {
downloader: downloader, downloader: downloader,
transactionRepository: transactionRepository, transactionRepository: transactionRepository,
config: config, config: config,
rustBackend: rustBackend rustBackend: rustBackend,
internalSyncProgress: internalSyncProgress
) )
} catch { } catch {
throw error throw error

View File

@ -0,0 +1,152 @@
//
// InternalSyncProgress.swift
//
//
// Created by Michal Fousek on 23.11.2022.
//
import Foundation
struct SyncRanges: Equatable {
let latestBlockHeight: BlockHeight
let downloadRange: CompactBlockRange?
let scanRange: CompactBlockRange?
let enhanceRange: CompactBlockRange?
let fetchUTXORange: CompactBlockRange?
}
protocol InternalSyncProgressStorage {
func bool(forKey defaultName: String) -> Bool
func integer(forKey defaultName: String) -> Int
func set(_ value: Int, forKey defaultName: String)
func set(_ value: Bool, forKey defaultName: String)
@discardableResult func synchronize() -> Bool
}
extension UserDefaults: InternalSyncProgressStorage { }
actor InternalSyncProgress {
enum Key: String, CaseIterable {
case latestDownloadedBlockHeight
case latestEnhancedHeight
case latestUTXOFetchedHeight
}
private let storage: InternalSyncProgressStorage
var latestDownloadedBlockHeight: BlockHeight { get { get(.latestDownloadedBlockHeight) } }
var latestEnhancedHeight: BlockHeight { get { get(.latestEnhancedHeight) } }
var latestUTXOFetchedHeight: BlockHeight { get { get(.latestUTXOFetchedHeight) } }
init(storage: InternalSyncProgressStorage) {
self.storage = storage
}
func get(_ key: Key) -> BlockHeight {
storage.integer(forKey: key.rawValue)
}
func set(_ value: BlockHeight, _ key: Key) {
storage.set(value, forKey: key.rawValue)
storage.synchronize()
}
func rewind(to: BlockHeight) {
Key.allCases.forEach { key in
let finalRewindHeight = min(self.get(key), to)
self.set(finalRewindHeight, key)
}
}
/// `InternalSyncProgress` is from now on used to track which block were already downloaded. Previous versions of the SDK were using cache DB to
/// track this. Because of this we have to migrace height of latest downloaded block from cache DB to here.
///
/// - Parameter latestDownloadedBlockHeight: Height of latest downloaded block from cache DB.
func migrateIfNeeded(latestDownloadedBlockHeightFromCacheDB latestDownloadedBlockHeight: BlockHeight) {
let key = "InternalSyncProgressMigrated"
if !storage.bool(forKey: key) {
set(latestDownloadedBlockHeight, .latestDownloadedBlockHeight)
}
storage.set(true, forKey: key)
storage.synchronize()
}
/// Computes the next state for the sync process. Thanks to this it's possible to interrupt the sync process at any phase and then it can be safely
/// resumed.
///
/// The sync process has 4 phases (download, scan, enhance, fetch UTXO). `InternalSyncProgress` tracks independently which blocks were already
/// processed in each phase. To compute the next state these 4 numbers are compared with `latestBlockHeight`.
///
/// - If any of these numbers are larger than `latestBlockHeight` then `wait` is used as the next state. We have locally higher block heights than
/// are currently available at LightWalletd.
/// - If any of these numbers are lower than `latestBlockHeight` then `processNewBlocks` is used as the next state. The sync process should run.
/// - Otherwise `finishProcessing` is used as the next state. It means that local data are synced with what is available at LightWalletd.
///
/// - Parameters:
/// - latestBlockHeight: Latest height fetched from LightWalletd API.
/// - latestScannedHeight: Latest height of latest block scanned.
/// - walletBirthday: Wallet birthday.
/// - Returns: Computed state.
func computeNextState(
latestBlockHeight: BlockHeight,
latestScannedHeight: BlockHeight,
walletBirthday: BlockHeight
) throws -> CompactBlockProcessor.NextState {
LoggerProxy.debug("""
Init numbers:
latestBlockHeight: \(latestBlockHeight)
latestDownloadedHeight: \(latestDownloadedBlockHeight)
latestScannedHeight: \(latestScannedHeight)
latestEnhancedHeight: \(latestEnhancedHeight)
latestUTXOFetchedHeight: \(latestUTXOFetchedHeight)
""")
if latestDownloadedBlockHeight > latestBlockHeight ||
latestScannedHeight > latestBlockHeight ||
latestEnhancedHeight > latestBlockHeight ||
latestUTXOFetchedHeight > latestBlockHeight {
return .wait(latestHeight: latestBlockHeight, latestDownloadHeight: latestDownloadedBlockHeight)
} else if latestDownloadedBlockHeight < latestBlockHeight ||
latestScannedHeight < latestBlockHeight ||
latestEnhancedHeight < latestEnhancedHeight ||
latestUTXOFetchedHeight < latestBlockHeight {
let ranges = computeSyncRanges(
birthday: walletBirthday,
latestBlockHeight: latestBlockHeight,
latestScannedHeight: latestScannedHeight
)
return .processNewBlocks(ranges: ranges)
} else {
return .finishProcessing(height: latestBlockHeight)
}
}
func computeSyncRanges(
birthday: BlockHeight,
latestBlockHeight: BlockHeight,
latestScannedHeight: BlockHeight
) -> SyncRanges {
return SyncRanges(
latestBlockHeight: latestBlockHeight,
downloadRange: computeRange(
latestHeight: latestDownloadedBlockHeight,
birthday: birthday,
latestBlockHeight: latestBlockHeight
),
scanRange: computeRange(
latestHeight: latestScannedHeight,
birthday: birthday,
latestBlockHeight: latestBlockHeight
),
enhanceRange: computeRange(latestHeight: latestEnhancedHeight, birthday: birthday, latestBlockHeight: latestBlockHeight),
fetchUTXORange: computeRange(latestHeight: latestUTXOFetchedHeight, birthday: birthday, latestBlockHeight: latestBlockHeight)
)
}
private func computeRange(latestHeight: BlockHeight, birthday: BlockHeight, latestBlockHeight: BlockHeight) -> CompactBlockRange? {
guard latestHeight < latestBlockHeight else { return nil }
let lowerBound = latestHeight <= birthday ? birthday : latestHeight + 1
return lowerBound...latestBlockHeight
}
}

View File

@ -1,5 +1,5 @@
// //
// File.swift // NotificationSender.swift
// //
// //
// Created by Michal Fousek on 21.11.2022. // Created by Michal Fousek on 21.11.2022.

View File

@ -1,5 +1,5 @@
// //
// File.swift // PagedTransactionDAO.swift
// ZcashLightClientKit // ZcashLightClientKit
// //
// Created by Francisco Gindre on 12/9/19. // Created by Francisco Gindre on 12/9/19.

View File

@ -1,5 +1,5 @@
// //
// File.swift // TransactionEntity.swift
// ZcashLightClientKit // ZcashLightClientKit
// //
// Created by Francisco Gindre on 11/14/19. // Created by Francisco Gindre on 11/14/19.

View File

@ -1,5 +1,5 @@
// //
// File.swift // HexEncode.swift
// ZcashLightClientKit // ZcashLightClientKit
// //
// Created by Francisco Gindre on 12/13/19. // Created by Francisco Gindre on 12/13/19.

View File

@ -30,11 +30,6 @@ protocol CompactBlockRepository {
*/ */
func latestHeightAsync() async throws -> BlockHeight func latestHeightAsync() async throws -> BlockHeight
/**
Gets the block with the highest height that is currently stored.
*/
func latestBlock() throws -> ZcashCompactBlock
/** /**
Write the given blocks to this store, which may be anything from an in-memory cache to a DB. Write the given blocks to this store, which may be anything from an in-memory cache to a DB.
Non-Blocking Non-Blocking

View File

@ -34,15 +34,13 @@ class AdvancedReOrgTests: XCTestCase {
override func setUpWithError() throws { override func setUpWithError() throws {
try super.setUpWithError() try super.setUpWithError()
wait { [self] in self.coordinator = try TestCoordinator(
self.coordinator = try await TestCoordinator( seed: seedPhrase,
seed: seedPhrase, walletBirthday: birthday + 50, //don't use an exact birthday, users never do.
walletBirthday: birthday + 50, //don't use an exact birthday, users never do. channelProvider: ChannelProvider(),
channelProvider: ChannelProvider(), network: network
network: network )
) try coordinator.reset(saplingActivation: 663150, branchID: self.branchID, chainName: self.chainName)
try coordinator.reset(saplingActivation: 663150, branchID: self.branchID, chainName: self.chainName)
}
} }
override func tearDownWithError() throws { override func tearDownWithError() throws {
@ -482,11 +480,14 @@ class AdvancedReOrgTests: XCTestCase {
var preReorgTotalBalance = Zatoshi.zero var preReorgTotalBalance = Zatoshi.zero
var preReorgVerifiedBalance = Zatoshi.zero var preReorgVerifiedBalance = Zatoshi.zero
try coordinator.sync(completion: { synchronizer in try coordinator.sync(
preReorgTotalBalance = synchronizer.initializer.getBalance() completion: { synchronizer in
preReorgVerifiedBalance = synchronizer.initializer.getVerifiedBalance() preReorgTotalBalance = synchronizer.initializer.getBalance()
firstSyncExpectation.fulfill() preReorgVerifiedBalance = synchronizer.initializer.getVerifiedBalance()
}, error: self.handleError) firstSyncExpectation.fulfill()
},
error: self.handleError
)
wait(for: [firstSyncExpectation], timeout: 10) wait(for: [firstSyncExpectation], timeout: 10)
@ -502,11 +503,14 @@ class AdvancedReOrgTests: XCTestCase {
var postReorgTotalBalance = Zatoshi.zero var postReorgTotalBalance = Zatoshi.zero
var postReorgVerifiedBalance = Zatoshi.zero var postReorgVerifiedBalance = Zatoshi.zero
try coordinator.sync(completion: { synchronizer in try coordinator.sync(
postReorgTotalBalance = synchronizer.initializer.getBalance() completion: { synchronizer in
postReorgVerifiedBalance = synchronizer.initializer.getVerifiedBalance() postReorgTotalBalance = synchronizer.initializer.getBalance()
afterReorgSync.fulfill() postReorgVerifiedBalance = synchronizer.initializer.getVerifiedBalance()
}, error: self.handleError) afterReorgSync.fulfill()
},
error: self.handleError
)
wait(for: [reorgExpectation, afterReorgSync], timeout: 30) wait(for: [reorgExpectation, afterReorgSync], timeout: 30)
@ -595,6 +599,8 @@ class AdvancedReOrgTests: XCTestCase {
try coordinator.applyStaged(blockheight: incomingTxHeight + 1) try coordinator.applyStaged(blockheight: incomingTxHeight + 1)
sleep(1)
/* /*
1. sync up to an incoming transaction (incomingTxHeight + 1) 1. sync up to an incoming transaction (incomingTxHeight + 1)
*/ */
@ -654,6 +660,8 @@ class AdvancedReOrgTests: XCTestCase {
*/ */
try coordinator.applyStaged(blockheight: incomingTxHeight + 2) try coordinator.applyStaged(blockheight: incomingTxHeight + 2)
sleep(1)
let lastSyncExpectation = XCTestExpectation(description: "last sync expectation") let lastSyncExpectation = XCTestExpectation(description: "last sync expectation")
/* /*
@ -678,6 +686,8 @@ class AdvancedReOrgTests: XCTestCase {
let txReorgHeight = BlockHeight(663195) let txReorgHeight = BlockHeight(663195)
let finalHeight = BlockHeight(663200) let finalHeight = BlockHeight(663200)
try coordinator.applyStaged(blockheight: txReorgHeight) try coordinator.applyStaged(blockheight: txReorgHeight)
sleep(1)
let firstSyncExpectation = XCTestExpectation(description: "first sync test expectation") let firstSyncExpectation = XCTestExpectation(description: "first sync test expectation")
var initialBalance = Zatoshi(-1) var initialBalance = Zatoshi(-1)
var initialVerifiedBalance = Zatoshi(-1) var initialVerifiedBalance = Zatoshi(-1)
@ -692,6 +702,7 @@ class AdvancedReOrgTests: XCTestCase {
try coordinator.resetBlocks(dataset: .predefined(dataset: .txIndexChangeAfter)) try coordinator.resetBlocks(dataset: .predefined(dataset: .txIndexChangeAfter))
try coordinator.applyStaged(blockheight: finalHeight) try coordinator.applyStaged(blockheight: finalHeight)
sleep(1)
let lastSyncExpectation = XCTestExpectation(description: "last sync expectation") let lastSyncExpectation = XCTestExpectation(description: "last sync expectation")
@ -1068,6 +1079,7 @@ class AdvancedReOrgTests: XCTestCase {
let initialVerifiedBalance: Zatoshi = coordinator.synchronizer.initializer.getVerifiedBalance() let initialVerifiedBalance: Zatoshi = coordinator.synchronizer.initializer.getVerifiedBalance()
try coordinator.applyStaged(blockheight: reorgHeight) try coordinator.applyStaged(blockheight: reorgHeight)
sleep(1)
let secondSyncExpectation = XCTestExpectation(description: "second sync expectation") let secondSyncExpectation = XCTestExpectation(description: "second sync expectation")

View File

@ -536,6 +536,8 @@ class BalanceTests: XCTestCase {
try coordinator.applyStaged(blockheight: defaultLatestHeight) try coordinator.applyStaged(blockheight: defaultLatestHeight)
sleep(1)
try await withCheckedThrowingContinuation { continuation in try await withCheckedThrowingContinuation { continuation in
do { do {
try coordinator.sync(completion: { synchronizer in try coordinator.sync(completion: { synchronizer in
@ -841,6 +843,7 @@ class BalanceTests: XCTestCase {
func testVerifyIncomingTransaction() throws { func testVerifyIncomingTransaction() throws {
try FakeChainBuilder.buildChain(darksideWallet: coordinator.service, branchID: branchID, chainName: chainName) try FakeChainBuilder.buildChain(darksideWallet: coordinator.service, branchID: branchID, chainName: chainName)
try coordinator.applyStaged(blockheight: defaultLatestHeight) try coordinator.applyStaged(blockheight: defaultLatestHeight)
sleep(1)
try coordinator.sync(completion: { _ in try coordinator.sync(completion: { _ in
self.syncedExpectation.fulfill() self.syncedExpectation.fulfill()
}, error: self.handleError) }, error: self.handleError)
@ -877,6 +880,7 @@ class BalanceTests: XCTestCase {
try FakeChainBuilder.buildSingleNoteChain(darksideWallet: coordinator.service, branchID: branchID, chainName: chainName) try FakeChainBuilder.buildSingleNoteChain(darksideWallet: coordinator.service, branchID: branchID, chainName: chainName)
try coordinator.applyStaged(blockheight: defaultLatestHeight) try coordinator.applyStaged(blockheight: defaultLatestHeight)
sleep(1)
let sendExpectation = XCTestExpectation(description: "send expectation") let sendExpectation = XCTestExpectation(description: "send expectation")
let createToAddressExpectation = XCTestExpectation(description: "create to address") let createToAddressExpectation = XCTestExpectation(description: "create to address")

View File

@ -57,6 +57,8 @@ class DarksideSanityCheckTests: XCTestCase {
try coordinator.applyStaged(blockheight: expectedLastBlock.height) try coordinator.applyStaged(blockheight: expectedLastBlock.height)
sleep(1)
let syncExpectation = XCTestExpectation(description: "sync to \(expectedLastBlock.height)") let syncExpectation = XCTestExpectation(description: "sync to \(expectedLastBlock.height)")
try coordinator.sync( try coordinator.sync(

View File

@ -130,6 +130,7 @@ class ReOrgTests: XCTestCase {
try coordinator.reset(saplingActivation: birthday, branchID: branchID, chainName: chainName) try coordinator.reset(saplingActivation: birthday, branchID: branchID, chainName: chainName)
try coordinator.resetBlocks(dataset: .predefined(dataset: .beforeReOrg)) try coordinator.resetBlocks(dataset: .predefined(dataset: .beforeReOrg))
try coordinator.applyStaged(blockheight: firstLatestHeight) try coordinator.applyStaged(blockheight: firstLatestHeight)
sleep(1)
} catch { } catch {
XCTFail("Error: \(error)") XCTFail("Error: \(error)")
return return

View File

@ -36,6 +36,8 @@ class TransactionEnhancementTests: XCTestCase {
override func setUpWithError() throws { override func setUpWithError() throws {
try super.setUpWithError() try super.setUpWithError()
XCTestCase.wait { await InternalSyncProgress(storage: UserDefaults.standard).rewind(to: 0) }
logger = SampleLogger(logLevel: .debug) logger = SampleLogger(logLevel: .debug)
downloadStartedExpect = XCTestExpectation(description: "\(self.description) downloadStartedExpect") downloadStartedExpect = XCTestExpectation(description: "\(self.description) downloadStartedExpect")

View File

@ -136,75 +136,4 @@ class BlockStreamingTest: XCTestCase {
let elapsed = now.timeIntervalSince(date) let elapsed = now.timeIntervalSince(date)
print("took \(elapsed) seconds") print("took \(elapsed) seconds")
} }
func testBatch() async throws {
let service = LightWalletGRPCService(
host: LightWalletEndpointBuilder.eccTestnet.host,
port: 9067,
secure: true,
singleCallTimeout: 300000,
streamingCallTimeout: 10000
)
let storage = try TestDbBuilder.diskCompactBlockStorage(at: __dataDbURL() )
let targetHeight = try service.latestBlockHeight()
let startHeight = targetHeight - 10_000
let processorConfig = CompactBlockProcessor.Configuration.standard(
for: ZcashNetworkBuilder.network(for: .testnet),
walletBirthday: ZcashNetworkBuilder.network(for: .testnet).constants.saplingActivationHeight
)
let compactBlockProcessor = CompactBlockProcessor(
service: service,
storage: storage,
backend: ZcashRustBackend.self,
config: processorConfig
)
let range = CompactBlockRange(uncheckedBounds: (startHeight, targetHeight))
do {
try await compactBlockProcessor.compactBlockBatchDownload(range: range)
XCTAssertFalse(Task.isCancelled)
} catch {
XCTFail("failed with error: \(error)")
}
}
func testBatchCancellation() async throws {
let service = LightWalletGRPCService(
host: LightWalletEndpointBuilder.eccTestnet.host,
port: 9067,
secure: true,
singleCallTimeout: 300000,
streamingCallTimeout: 10000
)
let storage = try TestDbBuilder.diskCompactBlockStorage(at: __dataDbURL() )
let targetHeight = try service.latestBlockHeight()
let startHeight = targetHeight - 100_000
let processorConfig = CompactBlockProcessor.Configuration.standard(
for: ZcashNetworkBuilder.network(for: .testnet),
walletBirthday: ZcashNetworkBuilder.network(for: .testnet).constants.saplingActivationHeight
)
let compactBlockProcessor = CompactBlockProcessor(
service: service,
storage: storage,
backend: ZcashRustBackend.self,
config: processorConfig
)
let range = CompactBlockRange(uncheckedBounds: (startHeight, targetHeight))
let cancelableTask = Task {
do {
try await compactBlockProcessor.compactBlockBatchDownload(range: range)
XCTAssertTrue(Task.isCancelled)
} catch {
XCTFail("failed with error: \(error)")
}
}
try await Task.sleep(nanoseconds: 3_000_000_000)
cancelableTask.cancel()
}
} }

View File

@ -147,35 +147,59 @@ class CompactBlockProcessorTests: XCTestCase {
(abs(currentHeight - targetHeight) / batchSize) (abs(currentHeight - targetHeight) / batchSize)
} }
func testNextBatchBlockRange() { func testNextBatchBlockRange() async {
// test first range // test first range
var latestDownloadedHeight = processorConfig.walletBirthday // this can be either this or Wallet Birthday. var latestDownloadedHeight = processorConfig.walletBirthday // this can be either this or Wallet Birthday.
var latestBlockchainHeight = BlockHeight(network.constants.saplingActivationHeight + 1000) var latestBlockchainHeight = BlockHeight(network.constants.saplingActivationHeight + 1000)
var expectedBatchRange = CompactBlockRange(uncheckedBounds: (lower: latestDownloadedHeight, upper:latestBlockchainHeight)) var expectedSyncRanges = SyncRanges(
latestBlockHeight: latestBlockchainHeight,
downloadRange: latestDownloadedHeight...latestBlockchainHeight,
scanRange: processorConfig.walletBirthday...latestBlockchainHeight,
enhanceRange: processorConfig.walletBirthday...latestBlockchainHeight,
fetchUTXORange: processorConfig.walletBirthday...latestBlockchainHeight
)
var internalSyncProgress = InternalSyncProgress(storage: InternalSyncProgressMemoryStorage())
await internalSyncProgress.migrateIfNeeded(latestDownloadedBlockHeightFromCacheDB: latestDownloadedHeight)
var syncRanges = await internalSyncProgress.computeSyncRanges(
birthday: processorConfig.walletBirthday,
latestBlockHeight: latestBlockchainHeight,
latestScannedHeight: 0
)
XCTAssertEqual( XCTAssertEqual(
expectedBatchRange, expectedSyncRanges,
CompactBlockProcessor.nextBatchBlockRange( syncRanges,
latestHeight: latestBlockchainHeight, "Failure when testing first range"
latestDownloadedHeight: latestDownloadedHeight,
walletBirthday: processorConfig.walletBirthday
)
) )
// Test mid-range // Test mid-range
latestDownloadedHeight = BlockHeight(network.constants.saplingActivationHeight + ZcashSDK.DefaultDownloadBatch) latestDownloadedHeight = BlockHeight(network.constants.saplingActivationHeight + ZcashSDK.DefaultDownloadBatch)
latestBlockchainHeight = BlockHeight(network.constants.saplingActivationHeight + 1000) latestBlockchainHeight = BlockHeight(network.constants.saplingActivationHeight + 1000)
expectedBatchRange = CompactBlockRange(uncheckedBounds: (lower: latestDownloadedHeight + 1, upper: latestBlockchainHeight)) expectedSyncRanges = SyncRanges(
latestBlockHeight: latestBlockchainHeight,
downloadRange: latestDownloadedHeight+1...latestBlockchainHeight,
scanRange: processorConfig.walletBirthday...latestBlockchainHeight,
enhanceRange: processorConfig.walletBirthday...latestBlockchainHeight,
fetchUTXORange: processorConfig.walletBirthday...latestBlockchainHeight
)
internalSyncProgress = InternalSyncProgress(storage: InternalSyncProgressMemoryStorage())
await internalSyncProgress.migrateIfNeeded(latestDownloadedBlockHeightFromCacheDB: latestDownloadedHeight)
syncRanges = await internalSyncProgress.computeSyncRanges(
birthday: processorConfig.walletBirthday,
latestBlockHeight: latestBlockchainHeight,
latestScannedHeight: 0
)
XCTAssertEqual( XCTAssertEqual(
expectedBatchRange, expectedSyncRanges,
CompactBlockProcessor.nextBatchBlockRange( syncRanges,
latestHeight: latestBlockchainHeight, "Failure when testing mid range"
latestDownloadedHeight: latestDownloadedHeight,
walletBirthday: processorConfig.walletBirthday
)
) )
// Test last batch range // Test last batch range
@ -183,15 +207,27 @@ class CompactBlockProcessorTests: XCTestCase {
latestDownloadedHeight = BlockHeight(network.constants.saplingActivationHeight + 950) latestDownloadedHeight = BlockHeight(network.constants.saplingActivationHeight + 950)
latestBlockchainHeight = BlockHeight(network.constants.saplingActivationHeight + 1000) latestBlockchainHeight = BlockHeight(network.constants.saplingActivationHeight + 1000)
expectedBatchRange = CompactBlockRange(uncheckedBounds: (lower: latestDownloadedHeight + 1, upper: latestBlockchainHeight)) expectedSyncRanges = SyncRanges(
latestBlockHeight: latestBlockchainHeight,
downloadRange: latestDownloadedHeight+1...latestBlockchainHeight,
scanRange: processorConfig.walletBirthday...latestBlockchainHeight,
enhanceRange: processorConfig.walletBirthday...latestBlockchainHeight,
fetchUTXORange: processorConfig.walletBirthday...latestBlockchainHeight
)
internalSyncProgress = InternalSyncProgress(storage: InternalSyncProgressMemoryStorage())
await internalSyncProgress.migrateIfNeeded(latestDownloadedBlockHeightFromCacheDB: latestDownloadedHeight)
syncRanges = await internalSyncProgress.computeSyncRanges(
birthday: processorConfig.walletBirthday,
latestBlockHeight: latestBlockchainHeight,
latestScannedHeight: 0
)
XCTAssertEqual( XCTAssertEqual(
expectedBatchRange, expectedSyncRanges,
CompactBlockProcessor.nextBatchBlockRange( syncRanges,
latestHeight: latestBlockchainHeight, "Failure when testing last range"
latestDownloadedHeight: latestDownloadedHeight,
walletBirthday: processorConfig.walletBirthday
)
) )
} }

View File

@ -243,13 +243,13 @@ class BlockBatchValidationTests: XCTestCase {
latestBlockHeight: expectedLatestHeight, latestBlockHeight: expectedLatestHeight,
service: LightWalletGRPCService(endpoint: LightWalletEndpointBuilder.default) service: LightWalletGRPCService(endpoint: LightWalletEndpointBuilder.default)
) )
let expectedStoreLatestHeight = BlockHeight(1220000) let expectedStoredLatestHeight = BlockHeight(1220000)
let expectedResult = CompactBlockProcessor.NextState.wait( let expectedResult = CompactBlockProcessor.NextState.wait(
latestHeight: expectedLatestHeight, latestHeight: expectedLatestHeight,
latestDownloadHeight: expectedLatestHeight latestDownloadHeight: expectedStoredLatestHeight
) )
let repository = ZcashConsoleFakeStorage(latestBlockHeight: expectedStoreLatestHeight) let repository = ZcashConsoleFakeStorage(latestBlockHeight: expectedStoredLatestHeight)
let downloader = CompactBlockDownloader(service: service, storage: repository) let downloader = CompactBlockDownloader(service: service, storage: repository)
let config = CompactBlockProcessor.Configuration( let config = CompactBlockProcessor.Configuration(
@ -270,7 +270,7 @@ class BlockBatchValidationTests: XCTestCase {
unminedCount: 0, unminedCount: 0,
receivedCount: 0, receivedCount: 0,
sentCount: 0, sentCount: 0,
scannedHeight: expectedStoreLatestHeight, scannedHeight: expectedStoredLatestHeight,
network: network network: network
) )
@ -295,7 +295,8 @@ class BlockBatchValidationTests: XCTestCase {
downloader: downloader, downloader: downloader,
transactionRepository: transactionRepository, transactionRepository: transactionRepository,
config: config, config: config,
rustBackend: mockRust rustBackend: mockRust,
internalSyncProgress: InternalSyncProgress(storage: InternalSyncProgressMemoryStorage())
) )
XCTAssertFalse(Task.isCancelled) XCTAssertFalse(Task.isCancelled)
} catch { } catch {
@ -309,9 +310,9 @@ class BlockBatchValidationTests: XCTestCase {
XCTAssertTrue( XCTAssertTrue(
{ {
switch nextBatch { switch (nextBatch, expectedResult) {
case .wait(latestHeight: expectedLatestHeight, latestDownloadHeight: expectedLatestHeight): case (let .wait(latestHeight, latestDownloadHeight), let .wait(expectedLatestHeight, exectedLatestDownloadHeight)):
return true return latestHeight == expectedLatestHeight && latestDownloadHeight == exectedLatestDownloadHeight
default: default:
return false return false
} }
@ -329,14 +330,15 @@ class BlockBatchValidationTests: XCTestCase {
) )
let expectedStoreLatestHeight = BlockHeight(1220000) let expectedStoreLatestHeight = BlockHeight(1220000)
let walletBirthday = BlockHeight(1210000) let walletBirthday = BlockHeight(1210000)
let expectedResult = CompactBlockProcessor.NextState.processNewBlocks(
range: CompactBlockProcessor.nextBatchBlockRange( let ranges = SyncRanges(
latestHeight: expectedLatestHeight, latestBlockHeight: expectedLatestHeight,
latestDownloadedHeight: expectedStoreLatestHeight, downloadRange: expectedStoreLatestHeight+1...expectedLatestHeight,
walletBirthday: walletBirthday scanRange: expectedStoreLatestHeight+1...expectedLatestHeight,
), enhanceRange: walletBirthday...expectedLatestHeight,
latestBlockHeight: expectedLatestHeight fetchUTXORange: walletBirthday...expectedLatestHeight
) )
let expectedResult = CompactBlockProcessor.NextState.processNewBlocks(ranges: ranges)
let repository = ZcashConsoleFakeStorage(latestBlockHeight: expectedStoreLatestHeight) let repository = ZcashConsoleFakeStorage(latestBlockHeight: expectedStoreLatestHeight)
let downloader = CompactBlockDownloader(service: service, storage: repository) let downloader = CompactBlockDownloader(service: service, storage: repository)
@ -382,7 +384,8 @@ class BlockBatchValidationTests: XCTestCase {
downloader: downloader, downloader: downloader,
transactionRepository: transactionRepository, transactionRepository: transactionRepository,
config: config, config: config,
rustBackend: mockRust rustBackend: mockRust,
internalSyncProgress: InternalSyncProgress(storage: InternalSyncProgressMemoryStorage())
) )
XCTAssertFalse(Task.isCancelled) XCTAssertFalse(Task.isCancelled)
} catch { } catch {
@ -396,9 +399,9 @@ class BlockBatchValidationTests: XCTestCase {
XCTAssertTrue( XCTAssertTrue(
{ {
switch nextBatch { switch (nextBatch, expectedResult) {
case .processNewBlocks(range: CompactBlockRange(uncheckedBounds: (expectedStoreLatestHeight + 1, expectedLatestHeight)), latestBlockHeight: expectedLatestHeight): case (.processNewBlocks(let ranges), .processNewBlocks(let expectedRanges)):
return true return ranges == expectedRanges
default: default:
return false return false
} }
@ -433,6 +436,10 @@ class BlockBatchValidationTests: XCTestCase {
network: network network: network
) )
let internalSyncProgress = InternalSyncProgress(storage: InternalSyncProgressMemoryStorage())
await internalSyncProgress.set(expectedStoreLatestHeight, .latestEnhancedHeight)
await internalSyncProgress.set(expectedStoreLatestHeight, .latestUTXOFetchedHeight)
let transactionRepository = MockTransactionRepository( let transactionRepository = MockTransactionRepository(
unminedCount: 0, unminedCount: 0,
receivedCount: 0, receivedCount: 0,
@ -461,7 +468,8 @@ class BlockBatchValidationTests: XCTestCase {
downloader: downloader, downloader: downloader,
transactionRepository: transactionRepository, transactionRepository: transactionRepository,
config: config, config: config,
rustBackend: mockRust rustBackend: mockRust,
internalSyncProgress: internalSyncProgress
) )
XCTAssertFalse(Task.isCancelled) XCTAssertFalse(Task.isCancelled)
@ -477,9 +485,9 @@ class BlockBatchValidationTests: XCTestCase {
XCTAssertTrue( XCTAssertTrue(
{ {
switch nextBatch { switch (nextBatch, expectedResult) {
case .finishProcessing(height: expectedLatestHeight): case (.finishProcessing(let height), .finishProcessing(let expectedHeight)):
return true return height == expectedHeight
default: default:
return false return false
} }

View File

@ -0,0 +1,127 @@
//
// InternalSyncProgressTests.swift
//
//
// Created by Michal Fousek on 30.11.2022.
//
@testable import TestUtils
import XCTest
@testable import ZcashLightClientKit
class InternalSyncProgressTests: XCTestCase {
var storage: InternalSyncProgressStorage!
var internalSyncProgress: InternalSyncProgress!
override func setUp() {
super.setUp()
storage = InternalSyncProgressMemoryStorage()
internalSyncProgress = InternalSyncProgress(storage: storage)
}
func test__trackedValuesAreHigherThanLatestHeight__nextStateIsWait() async throws {
let latestHeight = 623000
await internalSyncProgress.migrateIfNeeded(latestDownloadedBlockHeightFromCacheDB: 630000)
await internalSyncProgress.set(630000, .latestUTXOFetchedHeight)
await internalSyncProgress.set(630000, .latestEnhancedHeight)
let nextState = try await internalSyncProgress.computeNextState(
latestBlockHeight: latestHeight,
latestScannedHeight: 630000,
walletBirthday: 600000
)
switch nextState {
case let .wait(latestHeight, latestDownloadHeight):
XCTAssertEqual(latestHeight, 623000)
XCTAssertEqual(latestDownloadHeight, 630000)
default:
XCTFail("State should be wait. Unexpected state: \(nextState)")
}
}
func test__trackedValuesAreLowerThanLatestHeight__nextStateIsProcessNewBlocks() async throws {
let latestHeight = 640000
await internalSyncProgress.migrateIfNeeded(latestDownloadedBlockHeightFromCacheDB: 630000)
await internalSyncProgress.set(630000, .latestUTXOFetchedHeight)
await internalSyncProgress.set(630000, .latestEnhancedHeight)
let nextState = try await internalSyncProgress.computeNextState(
latestBlockHeight: latestHeight,
latestScannedHeight: 630000,
walletBirthday: 600000
)
switch nextState {
case let .processNewBlocks(ranges):
XCTAssertEqual(ranges.downloadRange, 630001...640000)
XCTAssertEqual(ranges.scanRange, 630001...640000)
XCTAssertEqual(ranges.enhanceRange, 630001...640000)
XCTAssertEqual(ranges.fetchUTXORange, 630001...640000)
default:
XCTFail("State should be processNewBlocks. Unexpected state: \(nextState)")
}
}
func test__trackedValuesAreSameAsLatestHeight__nextStateIsFinishProcessing() async throws {
let latestHeight = 630000
await internalSyncProgress.migrateIfNeeded(latestDownloadedBlockHeightFromCacheDB: 630000)
await internalSyncProgress.set(630000, .latestUTXOFetchedHeight)
await internalSyncProgress.set(630000, .latestEnhancedHeight)
let nextState = try await internalSyncProgress.computeNextState(
latestBlockHeight: latestHeight,
latestScannedHeight: 630000,
walletBirthday: 600000
)
switch nextState {
case let .finishProcessing(height):
XCTAssertEqual(height, latestHeight)
default:
XCTFail("State should be finishProcessing. Unexpected state: \(nextState)")
}
}
func test__rewindToHeightThatIsHigherThanTrackedHeight__rewindsToTrackedHeight() async throws {
await internalSyncProgress.set(630000, .latestUTXOFetchedHeight)
await internalSyncProgress.set(630000, .latestEnhancedHeight)
await internalSyncProgress.rewind(to: 640000)
XCTAssertEqual(storage.integer(forKey: "latestEnhancedHeight"), 630000)
XCTAssertEqual(storage.integer(forKey: "latestUTXOFetchedHeight"), 630000)
}
func test__rewindToHeightThatIsLowerThanTrackedHeight__rewindsToRewindHeight() async throws {
await internalSyncProgress.set(630000, .latestUTXOFetchedHeight)
await internalSyncProgress.set(630000, .latestEnhancedHeight)
await internalSyncProgress.rewind(to: 620000)
XCTAssertEqual(storage.integer(forKey: "latestEnhancedHeight"), 620000)
XCTAssertEqual(storage.integer(forKey: "latestUTXOFetchedHeight"), 620000)
}
func test__get__returnsStoredValue() async throws {
storage.set(621000, forKey: "latestEnhancedHeight")
let latestEnhancedHeight = await internalSyncProgress.latestEnhancedHeight
XCTAssertEqual(latestEnhancedHeight, 621000)
storage.set(619000, forKey: "latestUTXOFetchedHeight")
let latestUTXOFetchedHeight = await internalSyncProgress.latestUTXOFetchedHeight
XCTAssertEqual(latestUTXOFetchedHeight, 619000)
}
func test__set__storeValue() async throws {
await internalSyncProgress.set(521000, .latestEnhancedHeight)
XCTAssertEqual(storage.integer(forKey: "latestEnhancedHeight"), 521000)
await internalSyncProgress.set(519000, .latestUTXOFetchedHeight)
XCTAssertEqual(storage.integer(forKey: "latestUTXOFetchedHeight"), 519000)
}
}

View File

@ -33,7 +33,7 @@ class WalletTests: XCTestCase {
} }
} }
func testWalletInitialization() async throws { func testWalletInitialization() throws {
let derivationTool = DerivationTool(networkType: network.networkType) let derivationTool = DerivationTool(networkType: network.networkType)
let ufvk = try derivationTool.deriveUnifiedSpendingKey(seed: seedData.bytes, accountIndex: 0) let ufvk = try derivationTool.deriveUnifiedSpendingKey(seed: seedData.bytes, accountIndex: 0)
.map( { try derivationTool.deriveUnifiedFullViewingKey(from: $0) }) .map( { try derivationTool.deriveUnifiedFullViewingKey(from: $0) })

View File

@ -0,0 +1,32 @@
//
// InternalSyncProgressMemoryStorage.swift
//
//
// Created by Michal Fousek on 24.11.2022.
//
import Foundation
@testable import ZcashLightClientKit
class InternalSyncProgressMemoryStorage: InternalSyncProgressStorage {
private var boolStorage: [String: Bool] = [:]
private var storage: [String: Int] = [:]
func bool(forKey defaultName: String) -> Bool {
return boolStorage[defaultName, default: false]
}
func integer(forKey defaultName: String) -> Int {
return storage[defaultName, default: 0]
}
func set(_ value: Int, forKey defaultName: String) {
storage[defaultName] = value
}
func set(_ value: Bool, forKey defaultName: String) {
boolStorage[defaultName] = value
}
func synchronize() -> Bool { true }
}

View File

@ -6,6 +6,7 @@
// //
import Foundation import Foundation
import XCTest
@testable import ZcashLightClientKit @testable import ZcashLightClientKit
/** /**
@ -76,6 +77,8 @@ class TestCoordinator {
channelProvider: ChannelProvider, channelProvider: ChannelProvider,
network: ZcashNetwork network: ZcashNetwork
) throws { ) throws {
XCTestCase.wait { await InternalSyncProgress(storage: UserDefaults.standard).rewind(to: 0) }
self.spendingKey = spendingKey self.spendingKey = spendingKey
self.birthday = walletBirthday self.birthday = walletBirthday
self.channelProvider = channelProvider self.channelProvider = channelProvider

View File

@ -1,5 +1,5 @@
// //
// File.swift // TestVector.swift
// //
// //
// Created by Francisco Gindre on 9/26/22. // Created by Francisco Gindre on 9/26/22.

View File

@ -11,7 +11,7 @@ import Foundation
import XCTest import XCTest
extension XCTestCase { extension XCTestCase {
func wait(asyncBlock: @escaping (() async throws -> Void)) { static func wait(asyncBlock: @escaping (() async throws -> Void)) {
let semaphore = DispatchSemaphore(value: 0) let semaphore = DispatchSemaphore(value: 0)
Task.init { Task.init {
try await asyncBlock() try await asyncBlock()
@ -19,4 +19,8 @@ extension XCTestCase {
} }
semaphore.wait() semaphore.wait()
} }
func wait(asyncBlock: @escaping (() async throws -> Void)) {
XCTestCase.wait(asyncBlock: asyncBlock)
}
} }