ZcashLightClientKit/Sources/ZcashLightClientKit/Modules/Service/GRPC/LightWalletGRPCService.swift

356 lines
12 KiB
Swift
Raw Normal View History

//
// ServiceHelper.swift
// gRPC-PoC
//
// Created by Francisco Gindre on 29/08/2019.
// Copyright © 2019 Electric Coin Company. All rights reserved.
//
import Foundation
import GRPC
import NIO
import NIOHPACK
import NIOTransportServices
typealias Channel = GRPC.GRPCChannel
2021-09-17 06:49:58 -07:00
extension LightdInfo: LightWalletdInfo {}
extension SendResponse: LightWalletServiceResponse {}
2021-09-17 06:49:58 -07:00
/**
Swift GRPC implementation of Lightwalletd service
*/
enum GRPCResult: Equatable {
2021-09-17 06:49:58 -07:00
case success
case error(_ error: LightWalletServiceError)
}
protocol CancellableCall {
func cancel()
}
extension ServerStreamingCall: CancellableCall {
func cancel() {
self.cancel(promise: self.eventLoop.makePromise(of: Void.self))
}
}
protocol LatestBlockHeightProvider {
func latestBlockHeight(streamer: CompactTxStreamerAsyncClient) async throws -> BlockHeight
}
class LiveLatestBlockHeightProvider: LatestBlockHeightProvider {
func latestBlockHeight(streamer: CompactTxStreamerAsyncClient) async throws -> BlockHeight {
do {
let blockID = try await streamer.getLatestBlock(ChainSpec())
guard let blockHeight = Int(exactly: blockID.height) else {
throw LightWalletServiceError.generalError(message: "error creating blockheight from BlockID \(blockID)")
}
return blockHeight
} catch {
throw error.mapToServiceError()
}
}
}
class LightWalletGRPCService {
let channel: Channel
2021-09-17 06:49:58 -07:00
let connectionManager: ConnectionStatusManager
let compactTxStreamer: CompactTxStreamerAsyncClient
let singleCallTimeout: TimeLimit
let streamingCallTimeout: TimeLimit
var latestBlockHeightProvider: LatestBlockHeightProvider = LiveLatestBlockHeightProvider()
2021-09-17 06:49:58 -07:00
var queue: DispatchQueue
convenience init(endpoint: LightWalletEndpoint, connectionStateChange: @escaping (_ from: ConnectionState, _ to: ConnectionState) -> Void) {
2021-09-17 06:49:58 -07:00
self.init(
host: endpoint.host,
port: endpoint.port,
secure: endpoint.secure,
singleCallTimeout: endpoint.singleCallTimeoutInMillis,
streamingCallTimeout: endpoint.streamingCallTimeoutInMillis,
connectionStateChange: connectionStateChange
2021-09-17 06:49:58 -07:00
)
}
2021-09-17 06:49:58 -07:00
/// Inits a connection to a Lightwalletd service to the given
/// - Parameters:
/// - host: the hostname of the lightwalletd server
/// - port: port of the server. Default is 9067
/// - secure: whether this server is TLS or plaintext. default True (TLS)
/// - singleCallTimeout: Timeout for unary calls in milliseconds.
/// - streamingCallTimeout: Timeout for streaming calls in milliseconds.
init(
host: String,
port: Int = 9067,
secure: Bool = true,
singleCallTimeout: Int64,
streamingCallTimeout: Int64,
connectionStateChange: @escaping (_ from: ConnectionState, _ to: ConnectionState) -> Void
) {
self.connectionManager = ConnectionStatusManager(connectionStateChange: connectionStateChange)
self.queue = DispatchQueue.init(label: "LightWalletGRPCService")
self.streamingCallTimeout = TimeLimit.timeout(.milliseconds(streamingCallTimeout))
self.singleCallTimeout = TimeLimit.timeout(.milliseconds(singleCallTimeout))
2021-09-17 06:49:58 -07:00
let connectionBuilder = secure ?
ClientConnection.usingPlatformAppropriateTLS(for: NIOTSEventLoopGroup(loopCount: 1, defaultQoS: .default)) :
ClientConnection.insecure(group: NIOTSEventLoopGroup(loopCount: 1, defaultQoS: .default))
let channel = connectionBuilder
.withConnectivityStateDelegate(connectionManager, executingOn: queue)
.connect(host: host, port: port)
2021-09-17 06:49:58 -07:00
self.channel = channel
compactTxStreamer = CompactTxStreamerAsyncClient(
2021-09-17 06:49:58 -07:00
channel: self.channel,
defaultCallOptions: Self.callOptions(
timeLimit: self.singleCallTimeout
2021-09-17 06:49:58 -07:00
)
)
2019-10-31 15:43:09 -07:00
}
2021-09-17 06:49:58 -07:00
deinit {
_ = channel.close()
_ = compactTxStreamer.channel.close()
}
func stop() {
_ = channel.close()
}
func latestBlock() async throws -> BlockID {
do {
return try await compactTxStreamer.getLatestBlock(ChainSpec())
} catch {
throw error.mapToServiceError()
}
}
func getTx(hash: String) async throws -> RawTransaction {
var filter = TxFilter()
filter.hash = Data(hash.utf8)
2021-09-17 06:49:58 -07:00
do {
return try await compactTxStreamer.getTransaction(filter)
} catch {
throw error.mapToServiceError()
}
}
static func callOptions(timeLimit: TimeLimit) -> CallOptions {
2021-09-17 06:49:58 -07:00
CallOptions(
customMetadata: HPACKHeaders(),
timeLimit: timeLimit,
messageEncoding: .disabled,
requestIDProvider: .autogenerated,
requestIDHeader: nil,
cacheable: false
)
}
}
// MARK: - LightWalletService
extension LightWalletGRPCService: LightWalletService {
func getInfo() async throws -> LightWalletdInfo {
do {
return try await compactTxStreamer.getLightdInfo(Empty())
} catch {
throw error.mapToServiceError()
}
}
func latestBlockHeight() async throws -> BlockHeight {
try await latestBlockHeightProvider.latestBlockHeight(streamer: compactTxStreamer)
}
func blockRange(_ range: CompactBlockRange) -> AsyncThrowingStream<ZcashCompactBlock, Error> {
let stream = compactTxStreamer.getBlockRange(range.blockRange())
return AsyncThrowingStream { continuation in
Task {
do {
for try await block in stream {
continuation.yield(ZcashCompactBlock(compactBlock: block))
}
continuation.finish()
} catch {
[476] CompactBlockProcessor to async/await - getting rid of the Operation Queue - the cleanup is needed - the update of tests is needed - tested and it successfully finishes the sync process [476] CompactBlockProcessor to async/await - old processNewBlocks() removed [476] CompactBlockProcessor to async/await - unused operations removed [476] CompactBlockProcessor to async/await - unit tests update [476] CompactBlockProcessor to async/await - unit tests refactored [476] CompactBlockProcessor to async/await - cleanup of deprecated method [476] CompactBlockProcessor to async/await - fail(error) was called even for canceled tasks but that must be excluded [476] CompactBlockProcessor to async/await - removal of all ZcashOperations from the code (unit test will follow) [476] CompactBlockProcessor to async/await - network tests in building and success order again [476] CompactBlockProcessor to async/await - offline tests in building and success order [476] CompactBlockProcessor to async/await (519) - cleanup of suspending the task [476] CompactBlockProcessor to async/await (519) - most comments resolved [476] CompactBlockProcessor to async/await (519) - thread safe state for both sync and async context [476] CompactBlockProcessor to async/await (519) - fixed build for a sample project [476] CompactBlockProcessor to async/await (519) - func testStartNotifiesSuscriptors() reverted [476] CompactBlockProcessor to async/await (519) - TODO added to track why we used NSLock instead of an Actor - Task priority enhanced [476] CompactBlockProcessor to async/await (519) - cleanup in Tasks and priorities
2022-09-01 05:58:41 -07:00
continuation.finish(throwing: error.mapToServiceError())
}
}
}
}
2020-12-09 15:57:23 -08:00
func submit(spendTransaction: Data) async throws -> LightWalletServiceResponse {
do {
let transaction = RawTransaction.with { $0.data = spendTransaction }
return try await compactTxStreamer.sendTransaction(transaction)
} catch {
throw LightWalletServiceError.sentFailed(error: error)
2021-04-01 07:27:26 -07:00
}
}
func fetchTransaction(txId: Data) async throws -> ZcashTransaction.Fetched {
var txFilter = TxFilter()
txFilter.hash = txId
do {
let rawTx = try await compactTxStreamer.getTransaction(txFilter)
return ZcashTransaction.Fetched(rawID: txId, minedHeight: BlockHeight(rawTx.height), raw: rawTx.data)
} catch {
throw error.mapToServiceError()
}
}
func fetchUTXOs(for tAddress: String, height: BlockHeight) -> AsyncThrowingStream<UnspentTransactionOutputEntity, Error> {
return fetchUTXOs(for: [tAddress], height: height)
2021-04-01 07:27:26 -07:00
}
func fetchUTXOs(
for tAddresses: [String],
height: BlockHeight
) -> AsyncThrowingStream<UnspentTransactionOutputEntity, Error> {
guard !tAddresses.isEmpty else {
return AsyncThrowingStream { continuation in continuation.finish() }
}
let args = GetAddressUtxosArg.with { utxoArgs in
utxoArgs.addresses = tAddresses
utxoArgs.startHeight = UInt64(height)
}
let stream = compactTxStreamer.getAddressUtxosStream(args)
return AsyncThrowingStream { continuation in
Task {
do {
for try await reply in stream {
continuation.yield(
UTXO(
id: nil,
address: reply.address,
prevoutTxId: reply.txid,
prevoutIndex: Int(reply.index),
script: reply.script,
valueZat: Int(reply.valueZat),
height: Int(reply.height),
spentInTx: nil
)
)
}
continuation.finish()
} catch {
[476] CompactBlockProcessor to async/await - getting rid of the Operation Queue - the cleanup is needed - the update of tests is needed - tested and it successfully finishes the sync process [476] CompactBlockProcessor to async/await - old processNewBlocks() removed [476] CompactBlockProcessor to async/await - unused operations removed [476] CompactBlockProcessor to async/await - unit tests update [476] CompactBlockProcessor to async/await - unit tests refactored [476] CompactBlockProcessor to async/await - cleanup of deprecated method [476] CompactBlockProcessor to async/await - fail(error) was called even for canceled tasks but that must be excluded [476] CompactBlockProcessor to async/await - removal of all ZcashOperations from the code (unit test will follow) [476] CompactBlockProcessor to async/await - network tests in building and success order again [476] CompactBlockProcessor to async/await - offline tests in building and success order [476] CompactBlockProcessor to async/await (519) - cleanup of suspending the task [476] CompactBlockProcessor to async/await (519) - most comments resolved [476] CompactBlockProcessor to async/await (519) - thread safe state for both sync and async context [476] CompactBlockProcessor to async/await (519) - fixed build for a sample project [476] CompactBlockProcessor to async/await (519) - func testStartNotifiesSuscriptors() reverted [476] CompactBlockProcessor to async/await (519) - TODO added to track why we used NSLock instead of an Actor - Task priority enhanced [476] CompactBlockProcessor to async/await (519) - cleanup in Tasks and priorities
2022-09-01 05:58:41 -07:00
continuation.finish(throwing: error.mapToServiceError())
}
}
}
}
func blockStream(
startHeight: BlockHeight,
endHeight: BlockHeight
) -> AsyncThrowingStream<ZcashCompactBlock, Error> {
let stream = compactTxStreamer.getBlockRange(
BlockRange(
startHeight: startHeight,
endHeight: endHeight
),
callOptions: Self.callOptions(timeLimit: self.streamingCallTimeout)
)
return AsyncThrowingStream { continuation in
Task {
do {
for try await compactBlock in stream {
continuation.yield(ZcashCompactBlock(compactBlock: compactBlock))
}
continuation.finish()
} catch {
[476] CompactBlockProcessor to async/await - getting rid of the Operation Queue - the cleanup is needed - the update of tests is needed - tested and it successfully finishes the sync process [476] CompactBlockProcessor to async/await - old processNewBlocks() removed [476] CompactBlockProcessor to async/await - unused operations removed [476] CompactBlockProcessor to async/await - unit tests update [476] CompactBlockProcessor to async/await - unit tests refactored [476] CompactBlockProcessor to async/await - cleanup of deprecated method [476] CompactBlockProcessor to async/await - fail(error) was called even for canceled tasks but that must be excluded [476] CompactBlockProcessor to async/await - removal of all ZcashOperations from the code (unit test will follow) [476] CompactBlockProcessor to async/await - network tests in building and success order again [476] CompactBlockProcessor to async/await - offline tests in building and success order [476] CompactBlockProcessor to async/await (519) - cleanup of suspending the task [476] CompactBlockProcessor to async/await (519) - most comments resolved [476] CompactBlockProcessor to async/await (519) - thread safe state for both sync and async context [476] CompactBlockProcessor to async/await (519) - fixed build for a sample project [476] CompactBlockProcessor to async/await (519) - func testStartNotifiesSuscriptors() reverted [476] CompactBlockProcessor to async/await (519) - TODO added to track why we used NSLock instead of an Actor - Task priority enhanced [476] CompactBlockProcessor to async/await (519) - cleanup in Tasks and priorities
2022-09-01 05:58:41 -07:00
continuation.finish(throwing: error.mapToServiceError())
}
}
}
}
func closeConnection() {
_ = channel.close()
}
}
2021-09-17 06:49:58 -07:00
// MARK: - Extensions
extension ConnectivityState {
func toConnectionState() -> ConnectionState {
switch self {
case .connecting:
return .connecting
case .idle:
return .idle
case .ready:
return .online
case .shutdown:
return .shutdown
case .transientFailure:
return .reconnecting
}
}
2021-09-17 06:49:58 -07:00
}
extension TimeAmount {
static let singleCallTimeout = TimeAmount.seconds(30)
static let streamingCallTimeout = TimeAmount.minutes(10)
}
extension CallOptions {
static var lwdCall: CallOptions {
CallOptions(
customMetadata: HPACKHeaders(),
timeLimit: .timeout(.singleCallTimeout),
messageEncoding: .disabled,
requestIDProvider: .autogenerated,
requestIDHeader: nil,
cacheable: false
)
}
}
extension Error {
func mapToServiceError() -> LightWalletServiceError {
2021-09-17 06:49:58 -07:00
guard let grpcError = self as? GRPCStatusTransformable else {
return LightWalletServiceError.genericError(error: self)
}
return LightWalletServiceError.mapCode(grpcError.makeGRPCStatus())
}
}
extension LightWalletServiceError {
static func mapCode(_ status: GRPCStatus) -> LightWalletServiceError {
switch status.code {
case .ok:
2021-09-17 06:49:58 -07:00
return LightWalletServiceError.unknown
case .cancelled:
return LightWalletServiceError.userCancelled
case .unknown:
2021-04-08 10:18:16 -07:00
return LightWalletServiceError.generalError(message: status.message ?? "GRPC unknown error contains no message")
case .deadlineExceeded:
return LightWalletServiceError.timeOut
default:
return LightWalletServiceError.genericError(error: status)
}
}
}
class ConnectionStatusManager: ConnectivityStateDelegate {
let connectionStateChange: (_ from: ConnectionState, _ to: ConnectionState) -> Void
init(connectionStateChange: @escaping (_ from: ConnectionState, _ to: ConnectionState) -> Void) {
self.connectionStateChange = connectionStateChange
}
func connectivityStateDidChange(from oldState: ConnectivityState, to newState: ConnectivityState) {
connectionStateChange(oldState.toConnectionState(), newState.toConnectionState())
}
}