From 222c6cf06419515062598c0ea3c4575a0bab18df Mon Sep 17 00:00:00 2001 From: Lukas Korba Date: Mon, 8 May 2023 13:35:10 +0200 Subject: [PATCH] [#1029] AsyncThrowingStream update so it doesn't use Task - refactored [#1029] AsyncThrowingStream update so it doesn't use Task (#1030) - resolved file reverted --- .../Service/GRPC/LightWalletGRPCService.swift | 81 ++++++++----------- 1 file changed, 35 insertions(+), 46 deletions(-) diff --git a/Sources/ZcashLightClientKit/Modules/Service/GRPC/LightWalletGRPCService.swift b/Sources/ZcashLightClientKit/Modules/Service/GRPC/LightWalletGRPCService.swift index e85dea0d..a6b47eca 100644 --- a/Sources/ZcashLightClientKit/Modules/Service/GRPC/LightWalletGRPCService.swift +++ b/Sources/ZcashLightClientKit/Modules/Service/GRPC/LightWalletGRPCService.swift @@ -164,18 +164,15 @@ extension LightWalletGRPCService: LightWalletService { func blockRange(_ range: CompactBlockRange) -> AsyncThrowingStream { let stream = compactTxStreamer.getBlockRange(range.blockRange()) - - return AsyncThrowingStream { continuation in - Task { - do { - for try await block in stream { - continuation.yield(ZcashCompactBlock(compactBlock: block)) - } - continuation.finish() - } catch { - let serviceError = error.mapToServiceError() - continuation.finish(throwing: ZcashError.serviceBlockRangeFailed(serviceError)) - } + var iterator = stream.makeAsyncIterator() + + return AsyncThrowingStream() { + do { + guard let block = try await iterator.next() else { return nil } + return ZcashCompactBlock(compactBlock: block) + } catch { + let serviceError = error.mapToServiceError() + throw ZcashError.serviceBlockRangeFailed(serviceError) } } } @@ -220,29 +217,24 @@ extension LightWalletGRPCService: LightWalletService { utxoArgs.startHeight = UInt64(height) } let stream = compactTxStreamer.getAddressUtxosStream(args) - - return AsyncThrowingStream { continuation in - Task { - do { - for try await reply in stream { - continuation.yield( - UTXO( - id: nil, - address: reply.address, - prevoutTxId: reply.txid, - prevoutIndex: Int(reply.index), - script: reply.script, - valueZat: Int(reply.valueZat), - height: Int(reply.height), - spentInTx: nil - ) - ) - } - continuation.finish() - } catch { - let serviceError = error.mapToServiceError() - continuation.finish(throwing: ZcashError.serviceFetchUTXOsFailed(serviceError)) - } + var iterator = stream.makeAsyncIterator() + + return AsyncThrowingStream() { + do { + guard let reply = try await iterator.next() else { return nil } + return UTXO( + id: nil, + address: reply.address, + prevoutTxId: reply.txid, + prevoutIndex: Int(reply.index), + script: reply.script, + valueZat: Int(reply.valueZat), + height: Int(reply.height), + spentInTx: nil + ) + } catch { + let serviceError = error.mapToServiceError() + throw ZcashError.serviceFetchUTXOsFailed(serviceError) } } } @@ -258,18 +250,15 @@ extension LightWalletGRPCService: LightWalletService { ), callOptions: Self.callOptions(timeLimit: self.streamingCallTimeout) ) + var iterator = stream.makeAsyncIterator() - return AsyncThrowingStream { continuation in - Task { - do { - for try await compactBlock in stream { - continuation.yield(ZcashCompactBlock(compactBlock: compactBlock)) - } - continuation.finish() - } catch { - let serviceError = error.mapToServiceError() - continuation.finish(throwing: ZcashError.serviceBlockStreamFailed(serviceError)) - } + return AsyncThrowingStream() { + do { + guard let compactBlock = try await iterator.next() else { return nil } + return ZcashCompactBlock(compactBlock: compactBlock) + } catch { + let serviceError = error.mapToServiceError() + throw ZcashError.serviceBlockStreamFailed(serviceError) } } }