[#472] CompactBlockBatchScanningOperation to async (#505)

- CompactBlockBatchScanningOperation operation's main reimplemented to be Task based

[472] CompactBlockBatchScanningOperation to async

cleanup

[472] CompactBlockBatchScanningOperation to async (505)

- CompactBlockBatchScanningOperation wrapped to Task
This commit is contained in:
Lukas Korba 2022-08-30 16:28:00 +02:00 committed by GitHub
parent 87f50a796c
commit 7b90e598ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 140 additions and 112 deletions

View File

@ -194,10 +194,11 @@ class CompactBlockBatchDownloadOperation: ZcashOperation {
override var isAsynchronous: Bool { false }
private var batch: Int
private var done = false
private var maxRetries: Int
private var storage: CompactBlockStorage
private var service: LightWalletService
private var cancelable: CancellableCall?
private var cancelableTask: Task<Void, Error>?
private var startHeight: BlockHeight
private var targetHeight: BlockHeight
@ -229,72 +230,85 @@ class CompactBlockBatchDownloadOperation: ZcashOperation {
return
}
self.startedHandler?()
do {
let localDownloadedHeight = try self.storage.latestHeight()
if localDownloadedHeight != BlockHeight.empty() && localDownloadedHeight > startHeight {
LoggerProxy.warn("provided startHeight (\(startHeight)) differs from local latest downloaded height (\(localDownloadedHeight))")
startHeight = localDownloadedHeight + 1
}
var currentHeight = startHeight
self.progressDelegate?.progressUpdated(
.download(
BlockProgress(
startHeight: currentHeight,
targetHeight: targetHeight,
progressHeight: currentHeight
)
)
)
while !isCancelled && currentHeight <= targetHeight {
var retries = 0
var success = true
var localError: Error?
let range = nextRange(currentHeight: currentHeight, targetHeight: targetHeight)
cancelableTask = Task {
do {
let localDownloadedHeight = try await self.storage.latestHeightAsync()
repeat {
do {
let blocks = try service.blockRange(range)
try storage.insert(blocks)
success = true
} catch {
success = false
localError = error
retries += 1
}
} while !isCancelled && !success && retries < maxRetries
if retries >= maxRetries {
throw CompactBlockBatchDownloadOperationError.batchDownloadFailed(range: range, error: localError)
if localDownloadedHeight != BlockHeight.empty() && localDownloadedHeight > startHeight {
LoggerProxy.warn("provided startHeight (\(startHeight)) differs from local latest downloaded height (\(localDownloadedHeight))")
startHeight = localDownloadedHeight + 1
}
var currentHeight = startHeight
self.progressDelegate?.progressUpdated(
.download(
BlockProgress(
startHeight: startHeight,
startHeight: currentHeight,
targetHeight: targetHeight,
progressHeight: range.upperBound
progressHeight: currentHeight
)
)
)
while !isCancelled && currentHeight <= targetHeight {
var retries = 0
var success = true
var localError: Error?
let range = nextRange(currentHeight: currentHeight, targetHeight: targetHeight)
repeat {
do {
let stream: AsyncThrowingStream<ZcashCompactBlock, Error> = service.blockRange(range)
currentHeight = range.upperBound + 1
var blocks: [ZcashCompactBlock] = []
for try await compactBlock in stream {
blocks.append(compactBlock)
}
try storage.insert(blocks)
success = true
} catch {
success = false
localError = error
retries += 1
}
} while !isCancelled && !success && retries < maxRetries
if retries >= maxRetries {
throw CompactBlockBatchDownloadOperationError.batchDownloadFailed(range: range, error: localError)
}
self.progressDelegate?.progressUpdated(
.download(
BlockProgress(
startHeight: startHeight,
targetHeight: targetHeight,
progressHeight: range.upperBound
)
)
)
currentHeight = range.upperBound + 1
}
self.done = true
} catch {
self.fail(error: error)
}
} catch {
self.fail(error: error)
}
while !done && !isCancelled {
sleep(1)
}
}
override func fail(error: Error? = nil) {
self.cancelable?.cancel()
self.cancelableTask?.cancel()
super.fail(error: error)
}
override func cancel() {
self.cancelable?.cancel()
self.cancelableTask?.cancel()
super.cancel()
}

View File

@ -125,6 +125,8 @@ class CompactBlockBatchScanningOperation: ZcashOperation {
private var blockRange: CompactBlockRange
private var transactionRepository: TransactionRepository
private var network: NetworkType
private var cancelableTask: Task<Void, Error>?
private var done = false
private weak var progressDelegate: CompactBlockProgressDelegate?
@ -157,82 +159,94 @@ class CompactBlockBatchScanningOperation: ZcashOperation {
self.startedHandler?()
do {
if batchSize == 0 {
let scanStartTime = Date()
guard self.rustBackend.scanBlocks(dbCache: self.cacheDb, dbData: self.dataDb, limit: batchSize, networkType: network) else {
self.scanFailed(self.rustBackend.lastError() ?? ZcashOperationError.unknown)
return
}
let scanFinishTime = Date()
NotificationCenter.default.post(
SDKMetrics.progressReportNotification(
progress: BlockProgress(
startHeight: self.blockRange.lowerBound,
targetHeight: self.blockRange.upperBound,
progressHeight: self.blockRange.upperBound
),
start: scanStartTime,
end: scanFinishTime,
task: .scanBlocks
)
)
let seconds = scanFinishTime.timeIntervalSinceReferenceDate - scanStartTime.timeIntervalSinceReferenceDate
LoggerProxy.debug("Scanned \(blockRange.count) blocks in \(seconds) seconds")
} else {
let scanStartHeight = try transactionRepository.lastScannedHeight()
let targetScanHeight = blockRange.upperBound
var scannedNewBlocks = false
var lastScannedHeight = scanStartHeight
repeat {
guard !shouldCancel() else {
cancel()
return
}
let previousScannedHeight = lastScannedHeight
cancelableTask = Task {
do {
if batchSize == 0 {
let scanStartTime = Date()
guard self.rustBackend.scanBlocks(
dbCache: self.cacheDb,
dbData: self.dataDb,
limit: batchSize,
networkType: network
) else {
guard self.rustBackend.scanBlocks(dbCache: self.cacheDb, dbData: self.dataDb, limit: batchSize, networkType: network) else {
self.scanFailed(self.rustBackend.lastError() ?? ZcashOperationError.unknown)
return
}
let scanFinishTime = Date()
lastScannedHeight = try transactionRepository.lastScannedHeight()
scannedNewBlocks = previousScannedHeight != lastScannedHeight
if scannedNewBlocks {
let progress = BlockProgress(startHeight: scanStartHeight, targetHeight: targetScanHeight, progressHeight: lastScannedHeight)
progressDelegate?.progressUpdated(.scan(progress))
NotificationCenter.default.post(
SDKMetrics.progressReportNotification(
progress: progress,
start: scanStartTime,
end: scanFinishTime,
task: .scanBlocks
)
NotificationCenter.default.post(
SDKMetrics.progressReportNotification(
progress: BlockProgress(
startHeight: self.blockRange.lowerBound,
targetHeight: self.blockRange.upperBound,
progressHeight: self.blockRange.upperBound
),
start: scanStartTime,
end: scanFinishTime,
task: .scanBlocks
)
let heightCount = lastScannedHeight - previousScannedHeight
let seconds = scanFinishTime.timeIntervalSinceReferenceDate - scanStartTime.timeIntervalSinceReferenceDate
LoggerProxy.debug("Scanned \(heightCount) blocks in \(seconds) seconds")
}
} while !self.isCancelled && scannedNewBlocks && lastScannedHeight < targetScanHeight
)
let seconds = scanFinishTime.timeIntervalSinceReferenceDate - scanStartTime.timeIntervalSinceReferenceDate
LoggerProxy.debug("Scanned \(blockRange.count) blocks in \(seconds) seconds")
} else {
let scanStartHeight = try transactionRepository.lastScannedHeight()
let targetScanHeight = blockRange.upperBound
var scannedNewBlocks = false
var lastScannedHeight = scanStartHeight
repeat {
guard !shouldCancel() else {
cancel()
return
}
let previousScannedHeight = lastScannedHeight
let scanStartTime = Date()
guard self.rustBackend.scanBlocks(
dbCache: self.cacheDb,
dbData: self.dataDb,
limit: batchSize,
networkType: network
) else {
self.scanFailed(self.rustBackend.lastError() ?? ZcashOperationError.unknown)
return
}
let scanFinishTime = Date()
lastScannedHeight = try transactionRepository.lastScannedHeight()
scannedNewBlocks = previousScannedHeight != lastScannedHeight
if scannedNewBlocks {
let progress = BlockProgress(startHeight: scanStartHeight, targetHeight: targetScanHeight, progressHeight: lastScannedHeight)
progressDelegate?.progressUpdated(.scan(progress))
NotificationCenter.default.post(
SDKMetrics.progressReportNotification(
progress: progress,
start: scanStartTime,
end: scanFinishTime,
task: .scanBlocks
)
)
let heightCount = lastScannedHeight - previousScannedHeight
let seconds = scanFinishTime.timeIntervalSinceReferenceDate - scanStartTime.timeIntervalSinceReferenceDate
LoggerProxy.debug("Scanned \(heightCount) blocks in \(seconds) seconds")
}
} while !self.isCancelled && scannedNewBlocks && lastScannedHeight < targetScanHeight
self.done = true
}
} catch {
scanFailed(error)
}
} catch {
scanFailed(error)
}
while !done && !isCancelled {
sleep(1)
}
}
func scanFailed(_ error: Error) {
self.error = error
self.cancelableTask?.cancel()
LoggerProxy.debug("block scanning failed with error: \(String(describing: self.error))")
self.fail()
super.fail(error: error)
}
override func cancel() {
self.cancelableTask?.cancel()
super.cancel()
}
}