// // ServiceHelper.swift // gRPC-PoC // // Created by Francisco Gindre on 29/08/2019. // Copyright © 2019 Electric Coin Company. All rights reserved. // import Foundation import GRPC import NIO import NIOHPACK public typealias Channel = GRPC.GRPCChannel extension TimeAmount { static let singleCallTimeout = TimeAmount.seconds(30) static let streamingCallTimeout = TimeAmount.minutes(10) } extension CallOptions { static var lwdCall: CallOptions { CallOptions(customMetadata: HPACKHeaders(), timeLimit: .timeout(.singleCallTimeout), messageEncoding: .disabled, requestIDProvider: .autogenerated, requestIDHeader: nil, cacheable: false) } } public protocol LightWalletdInfo { var version: String { get } var vendor: String { get } /// true var taddrSupport: Bool { get } /// either "main" or "test" var chainName: String { get } /// depends on mainnet or testnet var saplingActivationHeight: UInt64 { get } /// protocol identifier, see consensus/upgrades.cpp var consensusBranchID: String { get } /// latest block on the best chain var blockHeight: UInt64 { get } var gitCommit: String { get } var branch: String { get } var buildDate: String { get } var buildUser: String { get } /// less than tip height if zcashd is syncing var estimatedHeight: UInt64 { get } /// example: "v4.1.1-877212414" var zcashdBuild: String { get } /// example: "/MagicBean:4.1.1/" var zcashdSubversion: String { get } } extension LightdInfo: LightWalletdInfo {} /** Swift GRPC implementation of Lightwalletd service */ public enum GRPCResult: Equatable { case ok case error(_ error: LightWalletServiceError) } public protocol CancellableCall { func cancel() } extension ServerStreamingCall: CancellableCall { public func cancel() { self.cancel(promise: self.eventLoop.makePromise(of: Void.self)) } } struct BlockProgress: BlockStreamProgressReporting { var startHeight: BlockHeight var targetHeight: BlockHeight var progressHeight: BlockHeight } public class LightWalletGRPCService { var queue: DispatchQueue let channel: Channel let connectionDelegate: ConnectionStatusManager let compactTxStreamer: CompactTxStreamerClient let singleCallTimeout: TimeLimit let streamingCallTimeout: TimeLimit public convenience init(endpoint: LightWalletEndpoint) { self.init(host: endpoint.host, port: endpoint.port, secure: endpoint.secure) } public init(host: String, port: Int = 9067, secure: Bool = true, singleCallTimeout: Int64 = 10000, streamingCallTimeout: Int64 = 10000) { self.connectionDelegate = ConnectionStatusManager() self.queue = DispatchQueue.init(label: "LightWalletGRPCService") self.streamingCallTimeout = TimeLimit.timeout(.milliseconds(streamingCallTimeout)) self.singleCallTimeout = TimeLimit.timeout(.milliseconds(singleCallTimeout)) let configuration = ClientConnection.Configuration( target: .hostAndPort(host, port), eventLoopGroup: MultiThreadedEventLoopGroup(numberOfThreads: 1), connectivityStateDelegate: connectionDelegate, connectivityStateDelegateQueue: queue, tls: secure ? .init() : nil ) let channel = ClientConnection(configuration: configuration) self.channel = channel compactTxStreamer = CompactTxStreamerClient(channel: self.channel, defaultCallOptions: Self.callOptions(timeLimit: TimeLimit.timeout(.seconds(Int64(singleCallTimeout))))) } func stop() { _ = channel.close() } func blockRange(startHeight: BlockHeight, endHeight: BlockHeight? = nil, result: @escaping (CompactBlock) -> Void) throws -> ServerStreamingCall { compactTxStreamer.getBlockRange(BlockRange(startHeight: startHeight, endHeight: endHeight), handler: result) } func latestBlock() throws -> BlockID { try compactTxStreamer.getLatestBlock(ChainSpec()).response.wait() } func getTx(hash: String) throws -> RawTransaction { var filter = TxFilter() filter.hash = Data(hash.utf8) return try compactTxStreamer.getTransaction(filter).response.wait() } static func callOptions(timeLimit: TimeLimit) -> CallOptions { CallOptions(customMetadata: HPACKHeaders(), timeLimit: timeLimit, messageEncoding: .disabled, requestIDProvider: .autogenerated, requestIDHeader: nil, cacheable: false) } } extension LightWalletGRPCService: LightWalletService { @discardableResult public func blockStream(startHeight: BlockHeight, endHeight: BlockHeight, result: @escaping (Result) -> Void, handler: @escaping (ZcashCompactBlock) -> Void, progress: @escaping (BlockStreamProgressReporting) -> Void) -> CancellableCall { let future = compactTxStreamer.getBlockRange(BlockRange(startHeight: startHeight, endHeight: endHeight), callOptions: Self.callOptions(timeLimit: self.streamingCallTimeout), handler: { compactBlock in handler(ZcashCompactBlock(compactBlock: compactBlock)) progress(BlockProgress(startHeight: startHeight, targetHeight: endHeight, progressHeight: BlockHeight(compactBlock.height))) } ) future.status.whenComplete { r in switch r { case .success(let status): switch status.code { case .ok: result(.success(GRPCResult.ok)) default: result(.failure(LightWalletServiceError.mapCode(status))) } case .failure(let error): result(.failure(LightWalletServiceError.genericError(error: error))) } } return future } public func getInfo() throws -> LightWalletdInfo { try compactTxStreamer.getLightdInfo(Empty()).response.wait() } public func getInfo(result: @escaping (Result) -> Void) { compactTxStreamer.getLightdInfo(Empty()).response.whenComplete { r in switch r { case .success(let info): result(.success(info)) case .failure(let error): result(.failure(error.mapToServiceError())) } } } public func closeConnection() { _ = channel.close() } public func fetchTransaction(txId: Data) throws -> TransactionEntity { var txFilter = TxFilter() txFilter.hash = txId do { let rawTx = try compactTxStreamer.getTransaction(txFilter).response.wait() return TransactionBuilder.createTransactionEntity(txId: txId, rawTransaction: rawTx) } catch { throw error.mapToServiceError() } } public func fetchTransaction(txId: Data, result: @escaping (Result) -> Void) { var txFilter = TxFilter() txFilter.hash = txId compactTxStreamer.getTransaction(txFilter).response.whenComplete({ response in switch response { case .failure(let error): result(.failure(error.mapToServiceError())) case .success(let rawTx): result(.success(TransactionBuilder.createTransactionEntity(txId: txId, rawTransaction: rawTx))) } }) } public func submit(spendTransaction: Data, result: @escaping (Result) -> Void) { do { let tx = try RawTransaction(serializedData: spendTransaction) let response = self.compactTxStreamer.sendTransaction(tx).response response.whenComplete { (responseResult) in switch responseResult { case .failure(let e): result(.failure(LightWalletServiceError.sentFailed(error: e))) case .success(let s): result(.success(s)) } } } catch { result(.failure(error.mapToServiceError())) } } public func submit(spendTransaction: Data) throws -> LightWalletServiceResponse { let rawTx = RawTransaction.with { (raw) in raw.data = spendTransaction } do { return try compactTxStreamer.sendTransaction(rawTx).response.wait() } catch { throw error.mapToServiceError() } } public func blockRange(_ range: CompactBlockRange) throws -> [ZcashCompactBlock] { var blocks = [CompactBlock]() let response = compactTxStreamer.getBlockRange(range.blockRange(), handler: { blocks.append($0) }) let status = try response.status.wait() switch status.code { case .ok: do { return try blocks.asZcashCompactBlocks() } catch { LoggerProxy.error("invalid block in range: \(range) - Error: \(error)") throw LightWalletServiceError.genericError(error: error) } default: throw LightWalletServiceError.mapCode(status) } } public func latestBlockHeight(result: @escaping (Result) -> Void) { let response = compactTxStreamer.getLatestBlock(ChainSpec()).response response.whenSuccessBlocking(onto: queue) { blockID in guard let blockHeight = Int(exactly: blockID.height) else { result(.failure(LightWalletServiceError.generalError(message: "error creating blockheight from BlockID \(blockID)"))) return } result(.success(blockHeight)) } response.whenFailureBlocking(onto: queue) { error in result(.failure(error.mapToServiceError())) } } public func blockRange(_ range: CompactBlockRange, result: @escaping (Result<[ZcashCompactBlock], LightWalletServiceError>) -> Void) { queue.async { [weak self] in guard let self = self else { return } var blocks = [CompactBlock]() let response = self.compactTxStreamer.getBlockRange(range.blockRange(), handler: { blocks.append($0) }) do { let status = try response.status.wait() switch status.code { case .ok: do { result(.success(try blocks.asZcashCompactBlocks())) } catch { LoggerProxy.error("Error parsing compact blocks \(error)") result(.failure(LightWalletServiceError.invalidBlock)) } default: result(.failure(.mapCode(status))) } } catch { result(.failure(error.mapToServiceError())) } } } public func latestBlockHeight() throws -> BlockHeight { guard let height = try? latestBlock().compactBlockHeight() else { throw LightWalletServiceError.invalidBlock } return height } public func fetchUTXOs(for tAddress: String, height: BlockHeight = ZcashSDK.SAPLING_ACTIVATION_HEIGHT) throws -> [UnspentTransactionOutputEntity] { let arg = GetAddressUtxosArg.with { (utxoArgs) in utxoArgs.addresses = [tAddress] utxoArgs.startHeight = UInt64(height) } do { return try self.compactTxStreamer.getAddressUtxos(arg).response.wait().addressUtxos.map { reply in UTXO(id: nil, address: tAddress, prevoutTxId: reply.txid, prevoutIndex: Int(reply.index), script: reply.script, valueZat: Int(reply.valueZat), height: Int(reply.height), spentInTx: nil ) } } catch { throw error.mapToServiceError() } } public func fetchUTXOs(for tAddress: String, height: BlockHeight = ZcashSDK.SAPLING_ACTIVATION_HEIGHT, result: @escaping (Result<[UnspentTransactionOutputEntity], LightWalletServiceError>) -> Void) { queue.async { [weak self] in guard let self = self else { return } let arg = GetAddressUtxosArg.with { (utxoArgs) in utxoArgs.addresses = [tAddress] utxoArgs.startHeight = UInt64(height) } var utxos = [UnspentTransactionOutputEntity]() let response = self.compactTxStreamer.getAddressUtxosStream(arg) { (reply) in utxos.append( UTXO(id: nil, address: tAddress, prevoutTxId: reply.txid, prevoutIndex: Int(reply.index), script: reply.script, valueZat: Int(reply.valueZat), height: Int(reply.height), spentInTx: nil ) ) } do { let status = try response.status.wait() switch status.code { case .ok: result(.success(utxos)) default: result(.failure(.mapCode(status))) } } catch { result(.failure(error.mapToServiceError())) } } } public func fetchUTXOs(for tAddresses: [String], height: BlockHeight = ZcashSDK.SAPLING_ACTIVATION_HEIGHT) throws -> [UnspentTransactionOutputEntity] { guard tAddresses.count > 0 else { return [] // FIXME: throw a real error } var utxos = [UnspentTransactionOutputEntity]() let arg = GetAddressUtxosArg.with { (utxoArgs) in utxoArgs.addresses = tAddresses utxoArgs.startHeight = UInt64(height) } utxos.append(contentsOf: try self.compactTxStreamer.getAddressUtxos(arg).response.wait().addressUtxos.map({ reply in UTXO(id: nil, address: reply.address, prevoutTxId: reply.txid, prevoutIndex: Int(reply.index), script: reply.script, valueZat: Int(reply.valueZat), height: Int(reply.height), spentInTx: nil) }) ) return utxos } public func fetchUTXOs(for tAddresses: [String], height: BlockHeight, result: @escaping (Result<[UnspentTransactionOutputEntity], LightWalletServiceError>) -> Void) { guard tAddresses.count > 0 else { return result(.success([])) // FIXME: throw a real error } var utxos = [UnspentTransactionOutputEntity]() self.queue.async { [weak self] in guard let self = self else { return } let args = GetAddressUtxosArg.with { (utxoArgs) in utxoArgs.addresses = tAddresses utxoArgs.startHeight = UInt64(height) } do { let response = try self.compactTxStreamer.getAddressUtxosStream(args) { reply in utxos.append( UTXO(id: nil, address: reply.address, prevoutTxId: reply.txid, prevoutIndex: Int(reply.index), script: reply.script, valueZat: Int(reply.valueZat), height: Int(reply.height), spentInTx: nil) ) }.status.wait() switch response.code { case .ok: result(.success(utxos)) default: result(.failure(.mapCode(response))) } } catch { result(.failure(error.mapToServiceError())) } } } } extension Error { func mapToServiceError() -> LightWalletServiceError { guard let grpcError = self as? GRPCStatusTransformable else { return LightWalletServiceError.genericError(error: self) } return LightWalletServiceError.mapCode(grpcError.makeGRPCStatus()) } } extension LightWalletServiceError { static func mapCode(_ status: GRPCStatus) -> LightWalletServiceError { switch status.code { case .ok: return LightWalletServiceError.unknown case .cancelled: return LightWalletServiceError.userCancelled case .unknown: return LightWalletServiceError.generalError(message: status.message ?? "GRPC unknown error contains no message") case .deadlineExceeded: return LightWalletServiceError.timeOut default: return LightWalletServiceError.genericError(error: status) } } } // class ConnectionStatusManager: ConnectivityStateDelegate { func connectivityStateDidChange(from oldState: ConnectivityState, to newState: ConnectivityState) { LoggerProxy.event("Connection Changed from \(oldState) to \(newState)") } }