From 16d1948b5b19ea826ef6404c409459bacfdc671f Mon Sep 17 00:00:00 2001 From: Lukas Korba Date: Mon, 29 Aug 2022 21:31:01 +0200 Subject: [PATCH] [#465] CompactBlockDownloading to Async/Await (#507) Closes #465 - CompactBlockDownloading closures removed - CompactBlockDownloading new async API provided --- .../Block/Downloader/BlockDownloader.swift | 165 ++++++++---------- .../CompactBlockDownloadOperation.swift | 31 +++- .../FetchUnspentTxOutputsOperation.swift | 4 +- .../Synchronizer/SDKSynchronizer.swift | 9 +- .../DarksideTests/BlockDownloaderTests.swift | 47 ++--- 5 files changed, 128 insertions(+), 128 deletions(-) diff --git a/Sources/ZcashLightClientKit/Block/Downloader/BlockDownloader.swift b/Sources/ZcashLightClientKit/Block/Downloader/BlockDownloader.swift index 9511d63a..d6bc7b3a 100644 --- a/Sources/ZcashLightClientKit/Block/Downloader/BlockDownloader.swift +++ b/Sources/ZcashLightClientKit/Block/Downloader/BlockDownloader.swift @@ -27,49 +27,44 @@ public protocol CompactBlockDownloading { Downloads and stores the given block range. Non-Blocking */ - func downloadBlockRange( - _ heightRange: CompactBlockRange, - completion: @escaping (Error?) -> Void - ) - - /** - Remove newer blocks and go back to the given height - - Parameters: - - height: the given height to rewind to - - completion: block to be executed after completing rewind - */ - func rewind(to height: BlockHeight, completion: @escaping (Error?) -> Void) - - /** - returns the height of the latest compact block stored locally - BlockHeight.empty() if no blocks are stored yet - non-blocking - */ - func lastDownloadedBlockHeight(result: @escaping (Result) -> Void) - - /** - Returns the last height on the blockchain - Non-blocking - */ - func latestBlockHeight(result: @escaping (Result) -> Void) - + func downloadBlockRangeAsync(_ heightRange: CompactBlockRange) async throws + /** Restore the download progress up to the given height. */ func rewind(to height: BlockHeight) throws + /** + Remove newer blocks and go back to the given height + - Parameter height: the given height to rewind to + */ + func rewindAsync(to height: BlockHeight) async throws + /** Returns the height of the latest compact block stored locally. BlockHeight.empty() if no blocks are stored yet Blocking */ func lastDownloadedBlockHeight() throws -> BlockHeight - + + /** + returns the height of the latest compact block stored locally + BlockHeight.empty() if no blocks are stored yet + non-blocking + */ + func lastDownloadedBlockHeightAsync() async throws -> BlockHeight + /** Returns the latest block height Blocking */ func latestBlockHeight() throws -> BlockHeight + + /** + Returns the last height on the blockchain + Non-blocking + */ + func latestBlockHeightAsync() async throws -> BlockHeight /** Gets the transaction for the Id given @@ -82,17 +77,30 @@ public protocol CompactBlockDownloading { /** Gets the transaction for the Id given - Parameter txId: Data representing the transaction Id - - Parameter result: a handler for the result of the operation */ - func fetchTransaction(txId: Data, result: @escaping (Result) -> Void) - + func fetchTransactionAsync(txId: Data) async throws -> TransactionEntity + func fetchUnspentTransactionOutputs(tAddress: String, startHeight: BlockHeight) throws -> [UnspentTransactionOutputEntity] + // TODO: will be removed with the issue 474 + // https://github.com/zcash/ZcashLightClientKit/issues/474 + // Use the new API fetchUnspentTransactionOutputs(...) -> AsyncThrowingStream func fetchUnspentTransactionOutputs(tAddress: String, startHeight: BlockHeight, result: @escaping (Result<[UnspentTransactionOutputEntity], Error>) -> Void) + + func fetchUnspentTransactionOutputs(tAddress: String, startHeight: BlockHeight) -> AsyncThrowingStream func fetchUnspentTransactionOutputs(tAddresses: [String], startHeight: BlockHeight) throws -> [UnspentTransactionOutputEntity] - func fetchUnspentTransactionOutputs(tAddresses: [String], startHeight: BlockHeight, result: @escaping (Result<[UnspentTransactionOutputEntity], Error>) -> Void) + // TODO: will be removed with the issue 474 + // https://github.com/zcash/ZcashLightClientKit/issues/474 + // Use the new API fetchUnspentTransactionOutputs(...) -> AsyncThrowingStream + func fetchUnspentTransactionOutputs( + tAddresses: [String], + startHeight: BlockHeight, + result: @escaping (Result<[UnspentTransactionOutputEntity], Error>) -> Void + ) + + func fetchUnspentTransactionOutputs(tAddresses: [String], startHeight: BlockHeight) -> AsyncThrowingStream func closeConnection() } @@ -138,6 +146,10 @@ extension CompactBlockDownloader: CompactBlockDownloading { } } } + + func fetchUnspentTransactionOutputs(tAddresses: [String], startHeight: BlockHeight ) -> AsyncThrowingStream { + lightwalletService.fetchUTXOs(for: tAddresses, height: startHeight) + } func fetchUnspentTransactionOutputs(tAddress: String, startHeight: BlockHeight) throws -> [UnspentTransactionOutputEntity] { try lightwalletService.fetchUTXOs(for: tAddress, height: startHeight) @@ -154,72 +166,54 @@ extension CompactBlockDownloader: CompactBlockDownloading { } } - func latestBlockHeight(result: @escaping (Result) -> Void) { - lightwalletService.latestBlockHeight { fetchResult in - switch fetchResult { - case .failure(let error): - result(.failure(error)) - case .success(let height): - result(.success(height)) - } - } + func fetchUnspentTransactionOutputs(tAddress: String, startHeight: BlockHeight) -> AsyncThrowingStream { + lightwalletService.fetchUTXOs(for: tAddress, height: startHeight) + } + + func latestBlockHeightAsync() async throws -> BlockHeight { + try await lightwalletService.latestBlockHeightAsync() } func latestBlockHeight() throws -> BlockHeight { try lightwalletService.latestBlockHeight() } - /** - Downloads and stores the given block range. - Non-Blocking - */ - func downloadBlockRange( - _ heightRange: CompactBlockRange, - completion: @escaping (Error?) -> Void - ) { - let stream: AsyncThrowingStream = lightwalletService.blockRange(heightRange) - Task { - do { - var compactBlocks: [ZcashCompactBlock] = [] - for try await compactBlock in stream { - compactBlocks.append(compactBlock) - } - try await self.storage.writeAsync(blocks: compactBlocks) - completion(nil) - } catch { - completion(error) - } - } - } - func downloadBlockRange(_ range: CompactBlockRange) throws { let blocks = try lightwalletService.blockRange(range) try storage.write(blocks: blocks) } - - func rewind(to height: BlockHeight, completion: @escaping (Error?) -> Void) { - Task { - do { - try await storage.rewindAsync(to: height) - completion(nil) - } catch { - completion(error) + + func downloadBlockRangeAsync( _ heightRange: CompactBlockRange) async throws { + let stream: AsyncThrowingStream = lightwalletService.blockRange(heightRange) + do { + var compactBlocks: [ZcashCompactBlock] = [] + for try await compactBlock in stream { + compactBlocks.append(compactBlock) } + try await self.storage.writeAsync(blocks: compactBlocks) + } catch { + throw error } } - func lastDownloadedBlockHeight(result: @escaping (Result) -> Void) { - Task { - do { - let latestHeight = try await storage.latestHeightAsync() - result(.success(latestHeight)) - } catch { - result(.failure(CompactBlockDownloadError.generalError(error: error))) - } - + func rewindAsync(to height: BlockHeight) async throws { + do { + try await storage.rewindAsync(to: height) + } catch { + throw error } } + func lastDownloadedBlockHeightAsync() async throws -> BlockHeight { + do { + let latestHeight = try await storage.latestHeightAsync() + return latestHeight + } catch { + throw CompactBlockDownloadError.generalError(error: error) + } + } + + func rewind(to height: BlockHeight) throws { try self.storage.rewind(to: height) } @@ -232,14 +226,7 @@ extension CompactBlockDownloader: CompactBlockDownloading { try lightwalletService.fetchTransaction(txId: txId) } - func fetchTransaction(txId: Data, result: @escaping (Result) -> Void) { - lightwalletService.fetchTransaction(txId: txId) { txResult in - switch txResult { - case .failure(let error): - result(.failure(error)) - case .success(let transaction): - result(.success(transaction)) - } - } + func fetchTransactionAsync(txId: Data) async throws -> TransactionEntity { + try await lightwalletService.fetchTransactionAsync(txId: txId) } } diff --git a/Sources/ZcashLightClientKit/Block/Processor/CompactBlockDownloadOperation.swift b/Sources/ZcashLightClientKit/Block/Processor/CompactBlockDownloadOperation.swift index 5a2ce023..8055b71b 100644 --- a/Sources/ZcashLightClientKit/Block/Processor/CompactBlockDownloadOperation.swift +++ b/Sources/ZcashLightClientKit/Block/Processor/CompactBlockDownloadOperation.swift @@ -14,7 +14,9 @@ class CompactBlockDownloadOperation: ZcashOperation { private var downloader: CompactBlockDownloading private var range: CompactBlockRange - + private var cancelableTask: Task? + private var done = false + required init(downloader: CompactBlockDownloading, range: CompactBlockRange) { self.range = range self.downloader = downloader @@ -28,12 +30,29 @@ class CompactBlockDownloadOperation: ZcashOperation { return } self.startedHandler?() - do { - try downloader.downloadBlockRange(range) - } catch { - self.error = error - self.fail() + + cancelableTask = Task { + do { + try await downloader.downloadBlockRangeAsync(range) + self.done = true + } catch { + self.fail(error: error) + } } + + while !done && !isCancelled { + sleep(1) + } + } + + override func fail(error: Error? = nil) { + self.cancelableTask?.cancel() + super.fail(error: error) + } + + override func cancel() { + self.cancelableTask?.cancel() + super.cancel() } } diff --git a/Sources/ZcashLightClientKit/Block/Processor/FetchUnspentTxOutputsOperation.swift b/Sources/ZcashLightClientKit/Block/Processor/FetchUnspentTxOutputsOperation.swift index fd7a8210..ad644a4e 100644 --- a/Sources/ZcashLightClientKit/Block/Processor/FetchUnspentTxOutputsOperation.swift +++ b/Sources/ZcashLightClientKit/Block/Processor/FetchUnspentTxOutputsOperation.swift @@ -66,7 +66,9 @@ class FetchUnspentTxOutputsOperation: ZcashOperation { throw FetchUTXOError.clearingFailed(error) } - let utxos = try downloader.fetchUnspentTransactionOutputs(tAddresses: tAddresses, startHeight: startHeight) + // TODO: will be replaced by new async API, issue 474 + // https://github.com/zcash/ZcashLightClientKit/issues/474 + let utxos: [UnspentTransactionOutputEntity] = try downloader.fetchUnspentTransactionOutputs(tAddresses: tAddresses, startHeight: startHeight) let result = storeUTXOs(utxos, in: dataDb) diff --git a/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift b/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift index dd682704..66c3e1f8 100644 --- a/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift +++ b/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift @@ -631,7 +631,14 @@ public class SDKSynchronizer: Synchronizer { } public func latestHeight(result: @escaping (Result) -> Void) { - blockProcessor.downloader.latestBlockHeight(result: result) + Task { + do { + let latestBlockHeight = try await blockProcessor.downloader.latestBlockHeightAsync() + result(.success(latestBlockHeight)) + } catch { + result(.failure(error)) + } + } } public func latestHeight() throws -> BlockHeight { diff --git a/Tests/DarksideTests/BlockDownloaderTests.swift b/Tests/DarksideTests/BlockDownloaderTests.swift index d625b503..b89eee9f 100644 --- a/Tests/DarksideTests/BlockDownloaderTests.swift +++ b/Tests/DarksideTests/BlockDownloaderTests.swift @@ -40,36 +40,23 @@ class BlockDownloaderTests: XCTestCase { try? FileManager.default.removeItem(at: cacheDB) } - func testSmallDownloadAsync() { - let expect = XCTestExpectation(description: self.description) - expect.expectedFulfillmentCount = 3 + func testSmallDownloadAsync() async { let lowerRange: BlockHeight = self.network.constants.saplingActivationHeight let upperRange: BlockHeight = self.network.constants.saplingActivationHeight + 99 let range = CompactBlockRange(uncheckedBounds: (lowerRange, upperRange)) - downloader.downloadBlockRange(range) { error in - expect.fulfill() - XCTAssertNil(error) + do { + try await downloader.downloadBlockRangeAsync(range) - Task { - do { - // check what was 'stored' - let latestHeight = try await self.storage.latestHeightAsync() - expect.fulfill() - - XCTAssertEqual(latestHeight, upperRange) - - self.downloader.lastDownloadedBlockHeight { resultHeight in - expect.fulfill() - XCTAssertTrue(self.validate(result: resultHeight, against: upperRange)) - } - } catch { - XCTFail("testSmallDownloadAsync() shouldn't fail") - } - } + // check what was 'stored' + let latestHeight = try await self.storage.latestHeightAsync() + XCTAssertEqual(latestHeight, upperRange) + + let resultHeight = try await self.downloader.lastDownloadedBlockHeightAsync() + XCTAssertEqual(resultHeight, upperRange) + } catch { + XCTFail("testSmallDownloadAsync() shouldn't fail") } - - wait(for: [expect], timeout: 2) } func testSmallDownload() { @@ -99,7 +86,7 @@ class BlockDownloaderTests: XCTestCase { XCTAssertEqual(currentLatest, upperRange ) } - func testFailure() { + func testFailure() async { let awfulDownloader = CompactBlockDownloader( service: AwfulLightWalletService( latestBlockHeight: self.network.constants.saplingActivationHeight + 1000, @@ -108,18 +95,16 @@ class BlockDownloaderTests: XCTestCase { storage: ZcashConsoleFakeStorage() ) - let expect = XCTestExpectation(description: self.description) - expect.expectedFulfillmentCount = 1 let lowerRange: BlockHeight = self.network.constants.saplingActivationHeight let upperRange: BlockHeight = self.network.constants.saplingActivationHeight + 99 let range = CompactBlockRange(uncheckedBounds: (lowerRange, upperRange)) - - awfulDownloader.downloadBlockRange(range) { error in - expect.fulfill() + + do { + try await awfulDownloader.downloadBlockRangeAsync(range) + } catch { XCTAssertNotNil(error) } - wait(for: [expect], timeout: 2) } }