[#470] CompactBlockStreamDownloadOperation to async (#506)

- using new sync APIs for storage and service
- whole logic wrapped in the Task

Closes #470
This commit is contained in:
Lukas Korba 2022-08-29 21:53:49 +02:00 committed by GitHub
parent 16d1948b5b
commit be24044b51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 35 additions and 42 deletions

View File

@ -71,7 +71,6 @@ class CompactBlockStreamDownloadOperation: ZcashOperation {
private var storage: CompactBlockStorage private var storage: CompactBlockStorage
private var service: LightWalletService private var service: LightWalletService
private var done = false private var done = false
private var cancelable: CancellableCall?
private var cancelableTask: Task<Void, Error>? private var cancelableTask: Task<Void, Error>?
private var startHeight: BlockHeight? private var startHeight: BlockHeight?
private var targetHeight: BlockHeight? private var targetHeight: BlockHeight?
@ -120,59 +119,53 @@ class CompactBlockStreamDownloadOperation: ZcashOperation {
} }
self.startedHandler?() self.startedHandler?()
do { cancelableTask = Task {
if self.targetHeight == nil { do {
self.targetHeight = try service.latestBlockHeight() if self.targetHeight == nil {
} self.targetHeight = try await service.latestBlockHeightAsync()
guard let latestHeight = self.targetHeight else { }
throw LightWalletServiceError.generalError(message: "missing target height on block stream operation") guard let latestHeight = self.targetHeight else {
} throw LightWalletServiceError.generalError(message: "missing target height on block stream operation")
let latestDownloaded = try storage.latestHeight() }
let startHeight = max(self.startHeight ?? BlockHeight.empty(), latestDownloaded) let latestDownloaded = try await storage.latestHeightAsync()
let startHeight = max(self.startHeight ?? BlockHeight.empty(), latestDownloaded)
let stream = service.blockStream(
startHeight: startHeight, let stream = service.blockStream(
endHeight: latestHeight startHeight: startHeight,
) endHeight: latestHeight
)
cancelableTask = Task {
do { for try await zcashCompactBlock in stream {
for try await zcashCompactBlock in stream { try self.cache(zcashCompactBlock, flushCache: false)
try self.cache(zcashCompactBlock, flushCache: false) let progress = BlockProgress(
let progress = BlockProgress( startHeight: startHeight,
startHeight: startHeight, targetHeight: latestHeight,
targetHeight: latestHeight, progressHeight: zcashCompactBlock.height
progressHeight: zcashCompactBlock.height )
) self.progressDelegate?.progressUpdated(.download(progress))
self.progressDelegate?.progressUpdated(.download(progress)) }
} try self.flush()
try self.flush() self.done = true
} catch {
if let err = error as? LightWalletServiceError, case .userCancelled = err {
self.done = true self.done = true
} catch { } else {
if let err = error as? LightWalletServiceError, case .userCancelled = err { self.fail(error: error)
self.done = true
} else {
self.fail(error: error)
}
} }
} }
}
while !done && !isCancelled {
sleep(1) while !done && !isCancelled {
} sleep(1)
} catch {
self.fail(error: error)
} }
} }
override func fail(error: Error? = nil) { override func fail(error: Error? = nil) {
self.cancelable?.cancel()
self.cancelableTask?.cancel() self.cancelableTask?.cancel()
super.fail(error: error) super.fail(error: error)
} }
override func cancel() { override func cancel() {
self.cancelable?.cancel()
self.cancelableTask?.cancel() self.cancelableTask?.cancel()
super.cancel() super.cancel()
} }