Merge pull request #1009 from Chlup/442_downloading_in_parallel_prototype

[#442] Implement parallel downloading and scanning
This commit is contained in:
Michal Fousek 2023-05-05 17:06:09 +02:00 committed by GitHub
commit 0324d9ace5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 449 additions and 228 deletions

View File

@ -1,3 +1,5 @@
# Unreleased
### [#1013] Enable more granular control over logging behavior
Now the SDK allows for more fine-tuning of its logging behavior. The `LoggingPolicy` enum
@ -9,6 +11,13 @@ Lastly, `noLogging` disables logging entirely.
To utilize this new configuration option, pass a `loggingPolicy` into the `Initializer`. If unspecified, the SDK
will utilize an internal `Logger` implementation with an `OSLogger.LogLevel` of `.debug`
### [#442] Implement parallel downloading and scanning
The SDK now parallelizes the download and scanning of blocks. If the network connection of the client device is fast enough then the scanning
process doesn't have to wait for blocks to be downloaded. This makes the whole sync process faster.
`Synchronizer.stop()` method is not async anymore.
# 0.21.0-beta
New checkpoints

View File

@ -629,6 +629,7 @@
CODE_SIGN_IDENTITY = "iPhone Developer";
COPY_PHASE_STRIP = NO;
DEBUG_INFORMATION_FORMAT = dwarf;
DEVELOPMENT_TEAM = RLPRR8CPQG;
ENABLE_STRICT_OBJC_MSGSEND = YES;
ENABLE_TESTABILITY = YES;
EXCLUDED_ARCHS = "$(inherited)";
@ -692,6 +693,7 @@
CODE_SIGN_IDENTITY = "iPhone Developer";
COPY_PHASE_STRIP = NO;
DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym";
DEVELOPMENT_TEAM = RLPRR8CPQG;
ENABLE_NS_ASSERTIONS = NO;
ENABLE_STRICT_OBJC_MSGSEND = YES;
EXCLUDED_ARCHS = "$(inherited)";
@ -718,7 +720,6 @@
buildSettings = {
ASSETCATALOG_COMPILER_APPICON_NAME = AppIcon;
CODE_SIGN_STYLE = Automatic;
DEVELOPMENT_TEAM = 6J82A38BF9;
ENABLE_BITCODE = NO;
"EXCLUDED_ARCHS[sdk=iphonesimulator*]" = "$(inherited)";
INFOPLIST_FILE = ZcashLightClientSample/Info.plist;
@ -741,7 +742,6 @@
buildSettings = {
ASSETCATALOG_COMPILER_APPICON_NAME = AppIcon;
CODE_SIGN_STYLE = Automatic;
DEVELOPMENT_TEAM = 6J82A38BF9;
ENABLE_BITCODE = NO;
"EXCLUDED_ARCHS[sdk=iphonesimulator*]" = "$(inherited)";
INFOPLIST_FILE = ZcashLightClientSample/Info.plist;

View File

@ -322,7 +322,7 @@ extension SDKSynchronizer {
return "Unprepared 😅"
case .error(let error):
return "Error: \(error.localizedDescription)"
return "Error: \(error)"
}
}
}

View File

@ -50,10 +50,8 @@ class SyncBlocksListViewController: UIViewController {
override func viewWillDisappear(_ animated: Bool) {
super.viewWillDisappear(animated)
cancellables = []
Task(priority: .userInitiated) {
for synchronizer in synchronizers {
await synchronizer.stop()
}
for synchronizer in synchronizers {
synchronizer.stop()
}
}
@ -85,7 +83,7 @@ class SyncBlocksListViewController: UIViewController {
loggerProxy.error("Can't start synchronizer: \(error)")
}
case .syncing, .enhancing, .fetching:
await synchronizer.stop()
synchronizer.stop()
}
}
}
@ -178,7 +176,7 @@ extension SyncStatus {
case let .syncing(progress):
return "Syncing 🤖 \(floor(progress.progress * 1000) / 10)%"
case let .error(error):
return "error 💔 \(error.localizedDescription)"
return "error 💔 \(error)"
case .stopped:
return "Stopped 🚫"
case .synced:

View File

@ -69,7 +69,7 @@ class SyncBlocksViewController: UIViewController {
override func viewWillDisappear(_ animated: Bool) {
super.viewWillDisappear(animated)
cancellables.forEach { $0.cancel() }
closureSynchronizer.stop() { }
closureSynchronizer.stop()
}
private func synchronizerStateUpdated(_ state: SynchronizerState) {
@ -86,9 +86,9 @@ class SyncBlocksViewController: UIViewController {
progressLabel.text = "\(floor(progress.progress * 1000) / 10)%"
let syncedDate = dateFormatter.string(from: Date(timeIntervalSince1970: state.latestScannedTime))
let progressText = """
synced date \(syncedDate)
synced block \(state.latestScannedHeight)
target block \(state.latestBlockHeight)
synced date \(syncedDate)
synced block \(state.latestScannedHeight)
latest block height \(state.latestBlockHeight)
"""
progressDataLabel.text = progressText
@ -163,7 +163,7 @@ class SyncBlocksViewController: UIViewController {
func doStartStop() async {
let syncStatus = synchronizer.latestState.syncStatus
switch syncStatus {
case .stopped, .unprepared:
case .stopped, .unprepared, .error:
do {
if syncStatus == .unprepared {
// swiftlint:disable:next force_try
@ -182,7 +182,7 @@ class SyncBlocksViewController: UIViewController {
updateUI()
}
default:
await synchronizer.stop()
synchronizer.stop()
synchronizer.metrics.disableMetrics()
updateUI()
}

View File

@ -271,11 +271,7 @@ actor CompactBlockProcessor {
/// Don't update this variable directly. Use `updateState()` method.
var state: State = .stopped
var config: Configuration {
willSet {
self.stop()
}
}
private(set) var config: Configuration
var maxAttemptsReached: Bool {
self.retryAttempts >= self.config.retries
@ -404,7 +400,7 @@ actor CompactBlockProcessor {
let internalSyncProgress = InternalSyncProgress(alias: config.alias, storage: UserDefaults.standard, logger: logger)
self.internalSyncProgress = internalSyncProgress
let blockDownloaderService = BlockDownloaderServiceImpl(service: service, storage: storage)
let blockDownloader = BlockDownloaderImpl(
self.blockDownloader = BlockDownloaderImpl(
service: service,
downloaderService: blockDownloaderService,
storage: storage,
@ -414,7 +410,6 @@ actor CompactBlockProcessor {
)
self.blockDownloaderService = blockDownloaderService
self.blockDownloader = blockDownloader
self.blockValidator = BlockValidatorImpl(
rustBackend: rustBackend,
@ -478,6 +473,11 @@ actor CompactBlockProcessor {
cancelableTask?.cancel()
}
func update(config: Configuration) async {
self.config = config
await stop()
}
func updateState(_ newState: State) async -> Void {
let oldState = state
state = newState
@ -576,11 +576,12 @@ actor CompactBlockProcessor {
Note: retry count is reset
*/
func stop() {
func stop() async {
self.backoffTimer?.invalidate()
self.backoffTimer = nil
cancelableTask?.cancel()
await blockDownloader.stopDownload()
self.retryAttempts = 0
}
@ -597,7 +598,7 @@ actor CompactBlockProcessor {
case .syncing, .enhancing, .fetching, .handlingSaplingFiles:
logger.debug("Stopping sync because of rewind")
afterSyncHooksManager.insert(hook: .rewind(context))
stop()
await stop()
case .stopped, .error, .synced:
logger.debug("Sync doesn't run. Executing rewind.")
@ -650,7 +651,7 @@ actor CompactBlockProcessor {
case .syncing, .enhancing, .fetching, .handlingSaplingFiles:
logger.debug("Stopping sync because of wipe")
afterSyncHooksManager.insert(hook: .wipe(context))
stop()
await stop()
case .stopped, .error, .synced:
logger.debug("Sync doesn't run. Executing wipe.")
@ -746,6 +747,7 @@ actor CompactBlockProcessor {
if let range = ranges.downloadAndScanRange {
logger.debug("Starting sync with range: \(range.lowerBound)...\(range.upperBound)")
try await blockDownloader.setSyncRange(range)
try await downloadAndScanBlocks(at: range, totalProgressRange: totalProgressRange)
}
@ -812,11 +814,6 @@ actor CompactBlockProcessor {
}
private func downloadAndScanBlocks(at range: CompactBlockRange, totalProgressRange: CompactBlockRange) async throws {
let downloadStream = try await blockDownloader.compactBlocksDownloadStream(
startHeight: range.lowerBound,
targetHeight: range.upperBound
)
// Divide `range` by `batchSize` and compute how many time do we need to run to download and scan all the blocks.
// +1 must be done here becase `range` is closed range. So even if upperBound and lowerBound are same there is one block to sync.
let blocksCountToSync = (range.upperBound - range.lowerBound) + 1
@ -825,33 +822,42 @@ actor CompactBlockProcessor {
loopsCount += 1
}
var lastScannedHeight: BlockHeight = .zero
for i in 0..<loopsCount {
let processingRange = computeSingleLoopDownloadRange(fullRange: range, loopCounter: i, batchSize: batchSize)
logger.debug("Sync loop #\(i + 1) range: \(processingRange.lowerBound)...\(processingRange.upperBound)")
// This is important. We must be sure that no new download is executed when this Task is canceled. Without this line `stop()` doesn't
// work.
try Task.checkCancellation()
do {
try await blockDownloader.downloadAndStoreBlocks(
using: downloadStream,
at: processingRange,
maxBlockBufferSize: config.downloadBufferSize,
totalProgressRange: totalProgressRange
)
await blockDownloader.setDownloadLimit(processingRange.upperBound + (2 * batchSize))
await blockDownloader.startDownload(maxBlockBufferSize: config.downloadBufferSize, syncRange: range)
try await blockDownloader.waitUntilRequestedBlocksAreDownloaded(in: processingRange)
} catch {
await ifTaskIsNotCanceledClearCompactBlockCache()
await ifTaskIsNotCanceledClearCompactBlockCache(lastScannedHeight: lastScannedHeight)
throw error
}
do {
try await blockValidator.validate()
} catch {
await ifTaskIsNotCanceledClearCompactBlockCache()
await ifTaskIsNotCanceledClearCompactBlockCache(lastScannedHeight: lastScannedHeight)
logger.error("Block validation failed with error: \(error)")
throw error
}
// Without this `stop()` would work. But this line improves support for Task cancelation.
try Task.checkCancellation()
do {
try await blockScanner.scanBlocks(at: range, totalProgressRange: totalProgressRange) { [weak self] lastScannedHeight in
lastScannedHeight = try await blockScanner.scanBlocks(
at: processingRange,
totalProgressRange: totalProgressRange
) { [weak self] lastScannedHeight in
let progress = BlockProgress(
startHeight: totalProgressRange.lowerBound,
targetHeight: totalProgressRange.upperBound,
@ -861,11 +867,11 @@ actor CompactBlockProcessor {
}
} catch {
logger.error("Scanning failed with error: \(error)")
await ifTaskIsNotCanceledClearCompactBlockCache()
await ifTaskIsNotCanceledClearCompactBlockCache(lastScannedHeight: lastScannedHeight)
throw error
}
try await clearCompactBlockCache()
try await clearCompactBlockCache(upTo: lastScannedHeight)
let progress = BlockProgress(
startHeight: totalProgressRange.lowerBound,
@ -929,6 +935,7 @@ actor CompactBlockProcessor {
func severeFailure(_ error: Error) async {
cancelableTask?.cancel()
await blockDownloader.stopDownload()
logger.error("show stopper failure: \(error)")
self.backoffTimer?.invalidate()
self.retryAttempts = config.retries
@ -941,6 +948,7 @@ actor CompactBlockProcessor {
// TODO: [#713] specify: failure. https://github.com/zcash/ZcashLightClientKit/issues/713
logger.error("\(error)")
cancelableTask?.cancel()
await blockDownloader.stopDownload()
self.retryAttempts += 1
self.processingError = error
switch self.state {
@ -998,6 +1006,7 @@ actor CompactBlockProcessor {
internal func validationFailed(at height: BlockHeight) async {
// cancel all Tasks
cancelableTask?.cancel()
await blockDownloader.stopDownload()
// register latest failure
self.lastChainValidationFailure = height
@ -1048,16 +1057,34 @@ actor CompactBlockProcessor {
await setTimer()
}
private func ifTaskIsNotCanceledClearCompactBlockCache() async {
private func ifTaskIsNotCanceledClearCompactBlockCache(lastScannedHeight: BlockHeight) async {
guard !Task.isCancelled else { return }
do {
// Blocks download work in parallel with scanning. So imagine this scenario:
//
// Scanning is done until height 10300. Blocks are downloaded until height 10400.
// And now validation fails and this method is called. And `.latestDownloadedBlockHeight` in `internalSyncProgress` is set to 10400. And
// all the downloaded blocks are removed here.
//
// If this line doesn't happen then when sync starts next time it thinks that all the blocks are downloaded until 10400. But all were
// removed. So blocks between 10300 and 10400 wouldn't ever be scanned.
//
// Scanning is done until 10300 so the SDK can be sure that blocks with height below 10300 are not required. So it makes sense to set
// `.latestDownloadedBlockHeight` to `lastScannedHeight`. And sync will work fine in next run.
await internalSyncProgress.set(lastScannedHeight, .latestDownloadedBlockHeight)
try await clearCompactBlockCache()
} catch {
logger.error("`clearCompactBlockCache` failed after error: \(error.localizedDescription)")
logger.error("`clearCompactBlockCache` failed after error: \(error)")
}
}
private func clearCompactBlockCache(upTo height: BlockHeight) async throws {
try await storage.clear(upTo: height)
logger.info("Cache removed upTo \(height)")
}
private func clearCompactBlockCache() async throws {
await blockDownloader.stopDownload()
try await storage.clear()
logger.info("Cache removed")
}

View File

@ -8,7 +8,7 @@
import Foundation
class BlockDownloaderStream {
private class BlockDownloaderStream {
let stream: AsyncThrowingStream<ZcashCompactBlock, Error>
var iterator: AsyncThrowingStream<ZcashCompactBlock, Error>.Iterator
@ -24,40 +24,139 @@ class BlockDownloaderStream {
/// Described object which can download blocks.
protocol BlockDownloader {
/// Create stream that can be used to download blocks for specific range.
func compactBlocksDownloadStream(startHeight: BlockHeight, targetHeight: BlockHeight) async throws -> BlockDownloaderStream
/// Set max height to which blocks will be downloaded. If this is higher than upper bound of the whole range then upper bound of the whole range
/// is used as limit.
func setDownloadLimit(_ limit: BlockHeight) async
/// Read (download) blocks from stream and store those in storage.
/// Set the range for the whole sync process. This method creates stream that is used to download blocks.
/// This method must be called before `startDownload()` is called. And it can't be called while download is in progress otherwise bad things
/// happen.
func setSyncRange(_ range: CompactBlockRange) async throws
/// Start downloading blocks.
///
/// This methods creates new detached Task which is used to do the actual downloading.
///
/// It's possible to call this methods anytime. If any download is already in progress nothing happens.
/// If the download limit is changed while download is in progress then blocks within new limit are downloaded automatically.
/// If the downlading finishes and then limit is changed you must call this method to start downloading.
///
/// - Parameters:
/// - stream: Stream used to read blocks.
/// - range: Range used to compute how many blocks to download.
/// - maxBlockBufferSize: Max amount of blocks that can be held in memory.
/// - totalProgressRange: Range that contains height for the whole sync process. This is used to compute progress.
func downloadAndStoreBlocks(
using stream: BlockDownloaderStream,
at range: CompactBlockRange,
maxBlockBufferSize: Int,
totalProgressRange: CompactBlockRange
) async throws
/// - maxBlockBufferSize: Number of blocks that is held in memory before blocks are written to disk.
/// - syncRange: Whole range in which blocks should be downloaded. This should be sync range.
func startDownload(maxBlockBufferSize: Int, syncRange: CompactBlockRange) async
/// Stop download. This method cancels Task used for downloading. And then it is waiting until internal `isDownloading` flag is set to `false`
func stopDownload() async
/// Waits until blocks from `range` are downloaded. This method does just the waiting. If `startDownload(maxBlockBufferSize:syncRange:)` isn't
/// called before then nothing is downloaded.
/// - Parameter range: Wait until blocks from `range` are downloaded.
func waitUntilRequestedBlocksAreDownloaded(in range: CompactBlockRange) async throws
}
struct BlockDownloaderImpl {
actor BlockDownloaderImpl {
let service: LightWalletService
let downloaderService: BlockDownloaderService
let storage: CompactBlockRepository
let internalSyncProgress: InternalSyncProgress
let metrics: SDKMetrics
let logger: Logger
}
extension BlockDownloaderImpl: BlockDownloader {
func compactBlocksDownloadStream(startHeight: BlockHeight, targetHeight: BlockHeight) async throws -> BlockDownloaderStream {
private var downloadStream: BlockDownloaderStream?
private var downloadToHeight: BlockHeight = 0
private var isDownloading = false
private var task: Task<Void, Error>?
private var lastError: Error?
init(
service: LightWalletService,
downloaderService: BlockDownloaderService,
storage: CompactBlockRepository,
internalSyncProgress: InternalSyncProgress,
metrics: SDKMetrics,
logger: Logger
) {
self.service = service
self.downloaderService = downloaderService
self.storage = storage
self.internalSyncProgress = internalSyncProgress
self.metrics = metrics
self.logger = logger
}
private func doDownload(maxBlockBufferSize: Int, syncRange: CompactBlockRange) async {
lastError = nil
do {
guard let downloadStream = self.downloadStream else {
logger.error("Dont have downloadStream. Trying to download blocks before sync range is not set.")
throw ZcashError.blockDownloadSyncRangeNotSet
}
let latestDownloadedBlockHeight = await internalSyncProgress.load(.latestDownloadedBlockHeight)
let downloadFrom = max(syncRange.lowerBound, latestDownloadedBlockHeight + 1)
let downloadTo = min(downloadToHeight, syncRange.upperBound)
if downloadFrom > downloadTo {
logger.debug("""
Download from \(downloadFrom) is higher or same as dowload to \(downloadTo). All blocks are probably downloaded. Exiting.
""")
isDownloading = false
task = nil
return
}
let range = downloadFrom...downloadTo
logger.debug("""
Starting downloading blocks.
syncRange: \(syncRange.lowerBound)...\(syncRange.upperBound)
downloadToHeight: \(downloadToHeight)
latestDownloadedBlockHeight: \(latestDownloadedBlockHeight)
range: \(range.lowerBound)...\(range.upperBound)
""")
try await downloadAndStoreBlocks(
using: downloadStream,
at: range,
maxBlockBufferSize: maxBlockBufferSize,
totalProgressRange: syncRange
)
task = nil
if downloadToHeight > range.upperBound {
logger.debug("""
Finished downloading with range: \(range.lowerBound)...\(range.upperBound). Going to start new download.
range upper bound: \(range.upperBound)
new downloadToHeight: \(downloadToHeight)
""")
await startDownload(maxBlockBufferSize: maxBlockBufferSize, syncRange: syncRange)
logger.debug("finishing after start download")
} else {
logger.debug("Finished downloading with range: \(range.lowerBound)...\(range.upperBound)")
isDownloading = false
}
} catch {
lastError = error
if Task.isCancelled {
logger.debug("Blocks downloading canceled.")
} else {
logger.error("Blocks downloading failed: \(error)")
}
isDownloading = false
task = nil
}
}
private func compactBlocksDownloadStream(startHeight: BlockHeight, targetHeight: BlockHeight) async throws -> BlockDownloaderStream {
try Task.checkCancellation()
let stream = service.blockStream(startHeight: startHeight, endHeight: targetHeight)
return BlockDownloaderStream(stream: stream)
}
func downloadAndStoreBlocks(
private func downloadAndStoreBlocks(
using stream: BlockDownloaderStream,
at range: CompactBlockRange,
maxBlockBufferSize: Int,
@ -70,7 +169,7 @@ extension BlockDownloaderImpl: BlockDownloader {
var counter = 0
var lastDownloadedBlockHeight = -1
let pushMetrics: (BlockHeight, Date, Date) -> Void = { lastDownloadedBlockHeight, startTime, finishTime in
let pushMetrics: (BlockHeight, Date, Date) -> Void = { [metrics] lastDownloadedBlockHeight, startTime, finishTime in
metrics.pushProgressReport(
progress: BlockProgress(
startHeight: totalProgressRange.lowerBound,
@ -118,3 +217,50 @@ extension BlockDownloaderImpl: BlockDownloader {
await internalSyncProgress.set(lastBlock.height, .latestDownloadedBlockHeight)
}
}
extension BlockDownloaderImpl: BlockDownloader {
func setDownloadLimit(_ limit: BlockHeight) async {
downloadToHeight = limit
}
func setSyncRange(_ range: CompactBlockRange) async throws {
downloadStream = try await compactBlocksDownloadStream(startHeight: range.lowerBound, targetHeight: range.upperBound)
}
func startDownload(maxBlockBufferSize: Int, syncRange: CompactBlockRange) async {
guard task == nil else {
logger.debug("Download already in progress.")
return
}
isDownloading = true
task = Task.detached() { [weak self] in
// Solve when self is nil, task should be niled.
await self?.doDownload(maxBlockBufferSize: maxBlockBufferSize, syncRange: syncRange)
}
}
func stopDownload() async {
task?.cancel()
task = nil
while isDownloading {
do {
try await Task.sleep(milliseconds: 10)
} catch {
break
}
}
}
func waitUntilRequestedBlocksAreDownloaded(in range: CompactBlockRange) async throws {
logger.debug("Waiting until requested blocks are downloaded at \(range)")
var latestDownloadedBlock = await internalSyncProgress.load(.latestDownloadedBlockHeight)
while latestDownloadedBlock < range.upperBound {
if let error = lastError {
throw error
}
try await Task.sleep(milliseconds: 10)
latestDownloadedBlock = await internalSyncProgress.load(.latestDownloadedBlockHeight)
}
logger.debug("Waiting done. Blocks are downloaded at \(range)")
}
}

View File

@ -52,7 +52,7 @@ extension FSCompactBlockRepository: CompactBlockRepository {
do {
try fileManager.createDirectory(at: blocksDirectory, withIntermediateDirectories: true)
} catch {
throw ZcashError.blockRepositoryCreateBlocksCacheDirectory(blocksDirectory)
throw ZcashError.blockRepositoryCreateBlocksCacheDirectory(blocksDirectory, error)
}
}
@ -81,7 +81,7 @@ extension FSCompactBlockRepository: CompactBlockRepository {
do {
try self.fileManager.removeItem(at: blockURL)
} catch {
throw ZcashError.blockRepositoryRemoveExistingBlock(error)
throw ZcashError.blockRepositoryRemoveExistingBlock(blockURL, error)
}
}
@ -90,7 +90,7 @@ extension FSCompactBlockRepository: CompactBlockRepository {
try self.fileWriter.writeToURL(block.data, blockURL)
} catch {
logger.error("Failed to write block: \(block.height) to path: \(blockURL.path) with error: \(error)")
throw ZcashError.blockRepositoryWriteBlock(block)
throw ZcashError.blockRepositoryWriteBlock(block, error)
}
savedBlocks.append(block)
@ -104,7 +104,7 @@ extension FSCompactBlockRepository: CompactBlockRepository {
// if there are any remaining blocks on the cache store them
try await self.metadataStore.saveBlocksMeta(savedBlocks)
} catch {
logger.error("failed to Block save to cache error: \(error.localizedDescription)")
logger.error("failed to Block save to cache error: \(error)")
throw error
}
}
@ -127,7 +127,19 @@ extension FSCompactBlockRepository: CompactBlockRepository {
do {
try self.fileManager.removeItem(at: item)
} catch {
throw ZcashError.blockRepositoryRemoveBlockAfterRewind(item)
throw ZcashError.blockRepositoryRemoveBlockAfterRewind(item, error)
}
}
}
func clear(upTo height: BlockHeight) async throws {
let files = try filesWithHeight(upTo: height)
logger.debug("Clearing up to height \(height). Clearing \(files.count) blocks from cache.")
for url in files {
do {
try self.fileManager.removeItem(at: url)
} catch {
throw ZcashError.blockRepositoryRemoveBlockClearingCache(url, error)
}
}
}
@ -137,7 +149,7 @@ extension FSCompactBlockRepository: CompactBlockRepository {
do {
try self.fileManager.removeItem(at: self.fsBlockDbRoot)
} catch {
throw ZcashError.blockRepositoryRemoveBlocksCacheDirectory(fsBlockDbRoot)
throw ZcashError.blockRepositoryRemoveBlocksCacheDirectory(fsBlockDbRoot, error)
}
}
try await create()
@ -213,6 +225,15 @@ extension FSCompactBlockRepository {
atPath: urlForBlock(block).path
)
}
func filesWithHeight(upTo height: BlockHeight) throws -> [URL] {
let sortedCachedContents = try contentProvider.listContents(of: blocksDirectory)
return try sortedCachedContents.filter { url in
guard let filename = try url.resourceValues(forKeys: [.nameKey]).name else { throw ZcashError.blockRepositoryGetFilename(url) }
return try filename.filterLowerThanOrEqual(height, with: blockDescriptor)
}
}
}
// MARK: Associated and Helper types
@ -304,18 +325,15 @@ enum DirectoryListingProviders {
class SortedDirectoryContentProvider: SortedDirectoryListing {
let fileManager: FileManager
let sorting: (URL, URL) throws -> Bool
let recursive: Bool
/// inits the `SortedDirectoryContentProvider`
/// - Parameter fileManager: an instance of `FileManager`
/// - Parameter sorting: A predicate that returns `true` if its
/// first argument should be ordered before its second argument;
/// otherwise, `false`.
/// - Parameter recursive: make this list subdirectories. Default
init(fileManager: FileManager, sorting: @escaping (URL, URL) throws -> Bool, recursive: Bool = false) {
init(fileManager: FileManager, sorting: @escaping (URL, URL) throws -> Bool) {
self.fileManager = fileManager
self.sorting = sorting
self.recursive = recursive
}
/// lists the contents of the given directory on `url` using the
@ -328,11 +346,11 @@ class SortedDirectoryContentProvider: SortedDirectoryListing {
return try fileManager.contentsOfDirectory(
at: url,
includingPropertiesForKeys: [.nameKey, .isDirectoryKey],
options: recursive ? [] : .skipsSubdirectoryDescendants
options: [.skipsHiddenFiles]
)
.sorted(by: sorting)
} catch {
throw ZcashError.blockRepositoryReadDirectoryContent(url)
throw ZcashError.blockRepositoryReadDirectoryContent(url, error)
}
}
}
@ -370,7 +388,7 @@ extension FileManager: SortedDirectoryListing {
options: .skipsSubdirectoryDescendants
)
} catch {
throw ZcashError.blockRepositoryReadDirectoryContent(url)
throw ZcashError.blockRepositoryReadDirectoryContent(url, error)
}
}
}
@ -390,4 +408,12 @@ extension String {
return blockHeight > height
}
func filterLowerThanOrEqual(_ height: BlockHeight, with descriptor: ZcashCompactBlockDescriptor) throws -> Bool {
guard let blockHeight = descriptor.height(self) else {
throw ZcashError.blockRepositoryParseHeightFromFilename(self)
}
return blockHeight <= height
}
}

View File

@ -13,7 +13,12 @@ struct BlockScannerConfig {
}
protocol BlockScanner {
func scanBlocks(at range: CompactBlockRange, totalProgressRange: CompactBlockRange, didScan: @escaping (BlockHeight) async -> Void) async throws
@discardableResult
func scanBlocks(
at range: CompactBlockRange,
totalProgressRange: CompactBlockRange,
didScan: @escaping (BlockHeight) async -> Void
) async throws -> BlockHeight
}
struct BlockScannerImpl {
@ -26,7 +31,13 @@ struct BlockScannerImpl {
}
extension BlockScannerImpl: BlockScanner {
func scanBlocks(at range: CompactBlockRange, totalProgressRange: CompactBlockRange, didScan: @escaping (BlockHeight) async -> Void) async throws {
@discardableResult
func scanBlocks(
at range: CompactBlockRange,
totalProgressRange: CompactBlockRange,
didScan: @escaping (BlockHeight) async -> Void
) async throws -> BlockHeight {
logger.debug("Going to scan blocks in range: \(range)")
try Task.checkCancellation()
let scanStartHeight = try await transactionRepository.lastScannedHeight()
@ -84,6 +95,8 @@ extension BlockScannerImpl: BlockScanner {
await Task.yield()
} while !Task.isCancelled && scannedNewBlocks && lastScannedHeight < targetScanHeight
return lastScannedHeight
}
private func scanBatchSize(startScanHeight height: BlockHeight, network: NetworkType) -> UInt32 {

View File

@ -30,7 +30,7 @@ public protocol ClosureSynchronizer {
)
func start(retry: Bool, completion: @escaping (Error?) -> Void)
func stop(completion: @escaping () -> Void)
func stop()
func getSaplingAddress(accountIndex: Int, completion: @escaping (Result<SaplingAddress, Error>) -> Void)
func getUnifiedAddress(accountIndex: Int, completion: @escaping (Result<UnifiedAddress, Error>) -> Void)

View File

@ -29,7 +29,7 @@ public protocol CombineSynchronizer {
) -> SinglePublisher<Initializer.InitializationResult, Error>
func start(retry: Bool) -> CompletablePublisher<Error>
func stop() -> CompletablePublisher<Never>
func stop()
func getSaplingAddress(accountIndex: Int) -> SinglePublisher<SaplingAddress, Error>
func getUnifiedAddress(accountIndex: Int) -> SinglePublisher<UnifiedAddress, Error>

View File

@ -318,7 +318,7 @@ public enum ZcashError: Equatable, Error {
case accountDAOUpdatedZeroRows
/// Failed to write block to disk.
/// ZBLRP00001
case blockRepositoryWriteBlock(_ block: ZcashCompactBlock)
case blockRepositoryWriteBlock(_ block: ZcashCompactBlock, _ error: Error)
/// Failed to get filename for the block from file URL.
/// ZBLRP0002
case blockRepositoryGetFilename(_ url: URL)
@ -327,22 +327,28 @@ public enum ZcashError: Equatable, Error {
case blockRepositoryParseHeightFromFilename(_ filename: String)
/// Failed to remove existing block from disk.
/// ZBLRP0004
case blockRepositoryRemoveExistingBlock(_ error: Error)
case blockRepositoryRemoveExistingBlock(_ url: URL, _ error: Error)
/// Failed to get filename and information if url points to directory from file URL.
/// ZBLRP0005
case blockRepositoryGetFilenameAndIsDirectory(_ url: URL)
/// Failed to create blocks cache directory.
/// ZBLRP0006
case blockRepositoryCreateBlocksCacheDirectory(_ url: URL)
case blockRepositoryCreateBlocksCacheDirectory(_ url: URL, _ error: Error)
/// Failed to read content of directory.
/// ZBLRP0007
case blockRepositoryReadDirectoryContent(_ url: URL)
case blockRepositoryReadDirectoryContent(_ url: URL, _ error: Error)
/// Failed to remove block from disk after rewind operation.
/// ZBLRP0008
case blockRepositoryRemoveBlockAfterRewind(_ url: URL)
case blockRepositoryRemoveBlockAfterRewind(_ url: URL, _ error: Error)
/// Failed to remove blocks cache directory while clearing storage.
/// ZBLRP0009
case blockRepositoryRemoveBlocksCacheDirectory(_ url: URL)
case blockRepositoryRemoveBlocksCacheDirectory(_ url: URL, _ error: Error)
/// Failed to remove block from cache when clearing cache up to some height.
/// ZBLRP0010
case blockRepositoryRemoveBlockClearingCache(_ url: URL, _ error: Error)
/// Trying to download blocks before sync range is set in `BlockDownloaderImpl`. This means that download stream is not created and download cant' start.
/// ZBDWN0001
case blockDownloadSyncRangeNotSet
/// Stream downloading the given block range failed.
/// ZBDSEO0001
case blockDownloaderServiceDownloadBlockRange(_ error: Error)
@ -697,6 +703,8 @@ public enum ZcashError: Equatable, Error {
case .blockRepositoryReadDirectoryContent: return "Failed to read content of directory."
case .blockRepositoryRemoveBlockAfterRewind: return "Failed to remove block from disk after rewind operation."
case .blockRepositoryRemoveBlocksCacheDirectory: return "Failed to remove blocks cache directory while clearing storage."
case .blockRepositoryRemoveBlockClearingCache: return "Failed to remove block from cache when clearing cache up to some height."
case .blockDownloadSyncRangeNotSet: return "Trying to download blocks before sync range is set in `BlockDownloaderImpl`. This means that download stream is not created and download cant' start."
case .blockDownloaderServiceDownloadBlockRange: return "Stream downloading the given block range failed."
case .zcashTransactionOverviewInit: return "Initialization of `ZcashTransaction.Overview` failed."
case .zcashTransactionReceivedInit: return "Initialization of `ZcashTransaction.Received` failed."
@ -875,6 +883,8 @@ public enum ZcashError: Equatable, Error {
case .blockRepositoryReadDirectoryContent: return .blockRepositoryReadDirectoryContent
case .blockRepositoryRemoveBlockAfterRewind: return .blockRepositoryRemoveBlockAfterRewind
case .blockRepositoryRemoveBlocksCacheDirectory: return .blockRepositoryRemoveBlocksCacheDirectory
case .blockRepositoryRemoveBlockClearingCache: return .blockRepositoryRemoveBlockClearingCache
case .blockDownloadSyncRangeNotSet: return .blockDownloadSyncRangeNotSet
case .blockDownloaderServiceDownloadBlockRange: return .blockDownloaderServiceDownloadBlockRange
case .zcashTransactionOverviewInit: return .zcashTransactionOverviewInit
case .zcashTransactionReceivedInit: return .zcashTransactionReceivedInit

View File

@ -193,6 +193,10 @@ public enum ZcashErrorCode: String {
case blockRepositoryRemoveBlockAfterRewind = "ZBLRP0008"
/// Failed to remove blocks cache directory while clearing storage.
case blockRepositoryRemoveBlocksCacheDirectory = "ZBLRP0009"
/// Failed to remove block from cache when clearing cache up to some height.
case blockRepositoryRemoveBlockClearingCache = "ZBLRP0010"
/// Trying to download blocks before sync range is set in `BlockDownloaderImpl`. This means that download stream is not created and download cant' start.
case blockDownloadSyncRangeNotSet = "ZBDWN0001"
/// Stream downloading the given block range failed.
case blockDownloaderServiceDownloadBlockRange = "ZBDSEO0001"
/// Initialization of `ZcashTransaction.Overview` failed.

View File

@ -360,7 +360,7 @@ enum ZcashErrorDefinition {
/// Failed to write block to disk.
// sourcery: code="ZBLRP00001"
case blockRepositoryWriteBlock(_ block: ZcashCompactBlock)
case blockRepositoryWriteBlock(_ block: ZcashCompactBlock, _ error: Error)
/// Failed to get filename for the block from file URL.
// sourcery: code="ZBLRP0002"
case blockRepositoryGetFilename(_ url: URL)
@ -369,22 +369,31 @@ enum ZcashErrorDefinition {
case blockRepositoryParseHeightFromFilename(_ filename: String)
/// Failed to remove existing block from disk.
// sourcery: code="ZBLRP0004"
case blockRepositoryRemoveExistingBlock(_ error: Error)
case blockRepositoryRemoveExistingBlock(_ url: URL, _ error: Error)
/// Failed to get filename and information if url points to directory from file URL.
// sourcery: code="ZBLRP0005"
case blockRepositoryGetFilenameAndIsDirectory(_ url: URL)
/// Failed to create blocks cache directory.
// sourcery: code="ZBLRP0006"
case blockRepositoryCreateBlocksCacheDirectory(_ url: URL)
case blockRepositoryCreateBlocksCacheDirectory(_ url: URL, _ error: Error)
/// Failed to read content of directory.
// sourcery: code="ZBLRP0007"
case blockRepositoryReadDirectoryContent(_ url: URL)
case blockRepositoryReadDirectoryContent(_ url: URL, _ error: Error)
/// Failed to remove block from disk after rewind operation.
// sourcery: code="ZBLRP0008"
case blockRepositoryRemoveBlockAfterRewind(_ url: URL)
case blockRepositoryRemoveBlockAfterRewind(_ url: URL, _ error: Error)
/// Failed to remove blocks cache directory while clearing storage.
// sourcery: code="ZBLRP0009"
case blockRepositoryRemoveBlocksCacheDirectory(_ url: URL)
case blockRepositoryRemoveBlocksCacheDirectory(_ url: URL, _ error: Error)
/// Failed to remove block from cache when clearing cache up to some height.
// sourcery: code="ZBLRP0010"
case blockRepositoryRemoveBlockClearingCache(_ url: URL, _ error: Error)
// MARK: - Block Download
/// Trying to download blocks before sync range is set in `BlockDownloaderImpl`. This means that download stream is not created and download cant' start.
// sourcery: code="ZBDWN0001"
case blockDownloadSyncRangeNotSet
// MARK: - BlockDownloaderService

View File

@ -35,6 +35,9 @@ protocol CompactBlockRepository {
*/
func rewind(to height: BlockHeight) async throws
/// Clear only blocks with height lower or equal than `height` from the repository.
func clear(upTo height: BlockHeight) async throws
/// Clears the repository
func clear() async throws
}

View File

@ -131,7 +131,9 @@ public protocol Synchronizer: AnyObject {
func start(retry: Bool) async throws
/// Stop this synchronizer. Implementations should ensure that calling this method cancels all jobs that were created by this instance.
func stop() async
/// It make some time before the SDK stops any activity. It doesn't have to be stopped when this function finishes.
/// Observe `stateStream` or `latestState` to recognize that the SDK stopped any activity.
func stop()
/// Gets the sapling shielded address for the given account.
/// - Parameter accountIndex: the optional accountId whose address is of interest. By default, the first account is used.

View File

@ -48,10 +48,8 @@ extension ClosureSDKSynchronizer: ClosureSynchronizer {
}
}
public func stop(completion: @escaping () -> Void) {
AsyncToClosureGateway.executeAction(completion) {
await self.synchronizer.stop()
}
public func stop() {
synchronizer.stop()
}
public func getSaplingAddress(accountIndex: Int, completion: @escaping (Result<SaplingAddress, Error>) -> Void) {

View File

@ -47,10 +47,8 @@ extension CombineSDKSynchronizer: CombineSynchronizer {
}
}
public func stop() -> CompletablePublisher<Never> {
AsyncToCombineGateway.executeAction() {
await self.synchronizer.stop()
}
public func stop() {
synchronizer.stop()
}
public func getSaplingAddress(accountIndex: Int) -> SinglePublisher<SaplingAddress, Error> {

View File

@ -185,14 +185,19 @@ public class SDKSynchronizer: Synchronizer {
}
/// Stops the synchronizer
public func stop() async {
let status = await self.status
guard status != .stopped, status != .disconnected else {
logger.info("attempted to stop when status was: \(status)")
return
}
public func stop() {
// Calling `await blockProcessor.stop()` make take some time. If the downloading of blocks is in progress then this method inside waits until
// downloading is really done. Which could block execution of the code on the client side. So it's better strategy to spin up new task and
// exit fast on client side.
Task(priority: .high) {
let status = await self.status
guard status != .stopped, status != .disconnected else {
logger.info("attempted to stop when status was: \(status)")
return
}
await blockProcessor.stop()
await blockProcessor.stop()
}
}
// MARK: Connectivity State

View File

@ -203,7 +203,7 @@ class PersistentTransactionManager: OutboundTransactionManager {
// MARK: other functions
private func updateOnFailure(transaction: PendingTransactionEntity, error: Error) async throws {
var pending = transaction
pending.errorMessage = error.localizedDescription
pending.errorMessage = "\(error)"
pending.encodeAttempts = transaction.encodeAttempts + 1
try self.repository.update(pending)
}

View File

@ -12,13 +12,9 @@ Represents what's expected from a logging entity
*/
public protocol Logger {
func debug(_ message: String, file: StaticString, function: StaticString, line: Int)
func info(_ message: String, file: StaticString, function: StaticString, line: Int)
func event(_ message: String, file: StaticString, function: StaticString, line: Int)
func warn(_ message: String, file: StaticString, function: StaticString, line: Int)
func error(_ message: String, file: StaticString, function: StaticString, line: Int)
}
@ -26,19 +22,15 @@ extension Logger {
func debug(_ message: String, file: StaticString = #file, function: StaticString = #function, line: Int = #line) {
debug(message, file: file, function: function, line: line)
}
func info(_ message: String, file: StaticString = #file, function: StaticString = #function, line: Int = #line) {
info(message, file: file, function: function, line: line)
}
func event(_ message: String, file: StaticString = #file, function: StaticString = #function, line: Int = #line) {
event(message, file: file, function: function, line: line)
}
func warn(_ message: String, file: StaticString = #file, function: StaticString = #function, line: Int = #line) {
warn(message, file: file, function: function, line: line)
}
func error(_ message: String, file: StaticString = #file, function: StaticString = #function, line: Int = #line) {
error(message, file: file, function: function, line: line)
}

View File

@ -0,0 +1,14 @@
//
// Task+sleep.swift
//
//
// Created by Michal Fousek on 28.04.2023.
//
import Foundation
extension Task where Success == Never, Failure == Never {
static func sleep(milliseconds duration: UInt64) async throws {
try await Task.sleep(nanoseconds: duration * 1000_000)
}
}

View File

@ -1,5 +1,8 @@
#!/bin/zsh
scriptDir=${0:a:h}
cd "${scriptDir}"
source servers_config.zsh
for syncAlias in $syncAliases; do

View File

@ -1,5 +1,8 @@
#!/bin/zsh
scriptDir=${0:a:h}
cd "${scriptDir}"
source servers_config.zsh
index=0

View File

@ -66,10 +66,10 @@ class ReOrgTests: XCTestCase {
self.coordinator = nil
cancellables = []
try await coordinator.stop()
try? FileManager.default.removeItem(at: coordinator.databases.fsCacheDbRoot)
try? FileManager.default.removeItem(at: coordinator.databases.dataDB)
try? FileManager.default.removeItem(at: coordinator.databases.pendingDB)
try await coordinator.stop()
}
func handleReOrgNotification(event: CompactBlockProcessor.Event) {

View File

@ -221,7 +221,7 @@ class ShieldFundsTests: XCTestCase {
shieldFundsExpectation.fulfill()
} catch {
shieldFundsExpectation.fulfill()
XCTFail("Failed With error: \(error.localizedDescription)")
XCTFail("Failed With error: \(error)")
}
await fulfillment(of: [shieldFundsExpectation], timeout: 30)

View File

@ -99,8 +99,7 @@ class Z2TReceiveTests: XCTestCase {
} catch {
sendExpectation.fulfill()
guard case ZcashError.synchronizerSendMemoToTransparentAddress = error else {
// swiftlint:disable:next line_length
XCTFail("expected SynchronizerError.genericError(\"Memos can't be sent to transparent addresses.\") but received \(error.localizedDescription)")
XCTFail("expected SynchronizerError.genericError(\"Memos can't be sent to transparent addresses.\") but received \(error)")
return
}
}

View File

@ -34,9 +34,9 @@ class BlockScanTests: XCTestCase {
let testFileManager = FileManager()
override func setUpWithError() throws {
// Put setup code here. This method is called before the invocation of each test method in the class.
try super.setUpWithError()
override func setUp() async throws {
try await super.setUp()
logger = OSLogger(logLevel: .debug)
dataDbURL = try! __dataDbURL()
spendParamsURL = try! __spendParamsURL()
outputParamsURL = try! __outputParamsURL()
@ -70,8 +70,6 @@ class BlockScanTests: XCTestCase {
}
func testSingleDownloadAndScan() async throws {
logger = OSLogger(logLevel: .debug)
_ = try await rustBackend.initDataDb(seed: nil)
let endpoint = LightWalletEndpoint(address: "lightwalletd.testnet.electriccoin.co", port: 9067)
@ -123,6 +121,8 @@ class BlockScanTests: XCTestCase {
latestScannedheight = repository.lastScannedBlockHeight()
XCTAssertEqual(latestScannedheight, range.upperBound)
await compactBlockProcessor.stop()
}
func observeBenchmark(_ metrics: SDKMetrics) {
@ -136,8 +136,6 @@ class BlockScanTests: XCTestCase {
func testScanValidateDownload() async throws {
let seed = "testreferencealicetestreferencealice"
logger = OSLogger(logLevel: .debug)
let metrics = SDKMetrics()
metrics.enableMetrics()
@ -217,17 +215,11 @@ class BlockScanTests: XCTestCase {
)
do {
let downloadStream = try await compactBlockProcessor.blockDownloader.compactBlocksDownloadStream(
startHeight: range.lowerBound,
targetHeight: range.upperBound
)
let blockDownloader = await compactBlockProcessor.blockDownloader
await blockDownloader.setDownloadLimit(range.upperBound)
await blockDownloader.startDownload(maxBlockBufferSize: 10, syncRange: range)
try await blockDownloader.waitUntilRequestedBlocksAreDownloaded(in: range)
try await compactBlockProcessor.blockDownloader.downloadAndStoreBlocks(
using: downloadStream,
at: range,
maxBlockBufferSize: 10,
totalProgressRange: range
)
XCTAssertFalse(Task.isCancelled)
try await compactBlockProcessor.blockValidator.validate()
@ -247,7 +239,8 @@ class BlockScanTests: XCTestCase {
XCTFail("Error should have been a timeLimit reached Error - \(error)")
}
}
await compactBlockProcessor.stop()
metrics.disableMetrics()
}
}

View File

@ -14,12 +14,13 @@ class BlockStreamingTest: XCTestCase {
var rustBackend: ZcashRustBackendWelding!
var testTempDirectory: URL!
override func setUpWithError() throws {
try super.setUpWithError()
override func setUp() async throws {
try await super.setUp()
logger = OSLogger(logLevel: .debug)
testTempDirectory = Environment.uniqueTestTempDirectory
try self.testFileManager.createDirectory(at: testTempDirectory, withIntermediateDirectories: false)
rustBackend = ZcashRustBackend.makeForTests(fsBlockDbRoot: testTempDirectory, networkType: .testnet)
logger = OSLogger(logLevel: .debug)
}
override func tearDownWithError() throws {
@ -102,24 +103,17 @@ class BlockStreamingTest: XCTestCase {
let cancelableTask = Task {
do {
let downloadStream = try await compactBlockProcessor.blockDownloader.compactBlocksDownloadStream(
startHeight: startHeight,
targetHeight: latestBlockHeight
)
try await compactBlockProcessor.blockDownloader.downloadAndStoreBlocks(
using: downloadStream,
at: startHeight...latestBlockHeight,
maxBlockBufferSize: 10,
totalProgressRange: startHeight...latestBlockHeight
)
let blockDownloader = await compactBlockProcessor.blockDownloader
await blockDownloader.setDownloadLimit(latestBlockHeight)
await blockDownloader.startDownload(maxBlockBufferSize: 10, syncRange: startHeight...latestBlockHeight)
try await blockDownloader.waitUntilRequestedBlocksAreDownloaded(in: startHeight...latestBlockHeight)
} catch {
XCTAssertTrue(Task.isCancelled)
}
}
try await Task.sleep(nanoseconds: 3_000_000_000)
cancelableTask.cancel()
await compactBlockProcessor.stop()
}
func testStreamTimeout() async throws {
@ -168,17 +162,10 @@ class BlockStreamingTest: XCTestCase {
let date = Date()
do {
let downloadStream = try await compactBlockProcessor.blockDownloader.compactBlocksDownloadStream(
startHeight: startHeight,
targetHeight: latestBlockHeight
)
try await compactBlockProcessor.blockDownloader.downloadAndStoreBlocks(
using: downloadStream,
at: startHeight...latestBlockHeight,
maxBlockBufferSize: 10,
totalProgressRange: startHeight...latestBlockHeight
)
let blockDownloader = await compactBlockProcessor.blockDownloader
await blockDownloader.setDownloadLimit(latestBlockHeight)
await blockDownloader.startDownload(maxBlockBufferSize: 10, syncRange: startHeight...latestBlockHeight)
try await blockDownloader.waitUntilRequestedBlocksAreDownloaded(in: startHeight...latestBlockHeight)
} catch {
if let lwdError = error as? ZcashError {
switch lwdError {
@ -196,5 +183,7 @@ class BlockStreamingTest: XCTestCase {
let elapsed = now.timeIntervalSince(date)
print("took \(elapsed) seconds")
await compactBlockProcessor.stop()
}
}

View File

@ -32,6 +32,8 @@ class CompactBlockProcessorTests: XCTestCase {
logger = OSLogger(logLevel: .debug)
testTempDirectory = Environment.uniqueTestTempDirectory
try? FileManager.default.removeItem(at: testTempDirectory)
try self.testFileManager.createDirectory(at: testTempDirectory, withIntermediateDirectories: false)
let pathProvider = DefaultResourceProvider(network: network)
@ -46,11 +48,7 @@ class CompactBlockProcessorTests: XCTestCase {
network: ZcashNetworkBuilder.network(for: .testnet)
)
await InternalSyncProgress(
alias: .default,
storage: UserDefaults.standard,
logger: logger
).rewind(to: 0)
await InternalSyncProgress(alias: .default, storage: UserDefaults.standard, logger: logger).rewind(to: 0)
let liveService = LightWalletServiceFactory(endpoint: LightWalletEndpointBuilder.eccTestnet).make()
let service = MockLightWalletService(
@ -132,7 +130,7 @@ class CompactBlockProcessorTests: XCTestCase {
override func tearDown() async throws {
try await super.tearDown()
await self.processor.stop()
await processor.stop()
try FileManager.default.removeItem(at: processorConfig.fsBlockCacheRoot)
try? FileManager.default.removeItem(at: processorConfig.dataDb)
cancellables = []

View File

@ -30,9 +30,11 @@ class CompactBlockReorgTests: XCTestCase {
override func setUp() async throws {
try await super.setUp()
logger = OSLogger(logLevel: .debug)
testTempDirectory = Environment.uniqueTestTempDirectory
logger = OSLogger(logLevel: .debug)
try? FileManager.default.removeItem(at: testTempDirectory)
try self.testFileManager.createDirectory(at: testTempDirectory, withIntermediateDirectories: false)
let pathProvider = DefaultResourceProvider(network: network)
@ -47,11 +49,7 @@ class CompactBlockReorgTests: XCTestCase {
network: ZcashNetworkBuilder.network(for: .testnet)
)
await InternalSyncProgress(
alias: .default,
storage: UserDefaults.standard,
logger: logger
).rewind(to: 0)
await InternalSyncProgress(alias: .default, storage: UserDefaults.standard, logger: logger).rewind(to: 0)
let liveService = LightWalletServiceFactory(endpoint: LightWalletEndpointBuilder.eccTestnet).make()
let service = MockLightWalletService(
@ -141,7 +139,7 @@ class CompactBlockReorgTests: XCTestCase {
override func tearDown() async throws {
try await super.tearDown()
await self.processor.stop()
await processor.stop()
try! FileManager.default.removeItem(at: processorConfig.fsBlockCacheRoot)
try? FileManager.default.removeItem(at: processorConfig.dataDb)
cancellables = []

View File

@ -16,9 +16,12 @@ class DownloadTests: XCTestCase {
var network = ZcashNetworkBuilder.network(for: .testnet)
var testTempDirectory: URL!
override func setUpWithError() throws {
try super.setUpWithError()
override func setUp() async throws {
try await super.setUp()
testTempDirectory = Environment.uniqueTestTempDirectory
try? FileManager.default.removeItem(at: testTempDirectory)
await InternalSyncProgress(alias: .default, storage: UserDefaults.standard, logger: logger).rewind(to: 0)
try self.testFileManager.createDirectory(at: testTempDirectory, withIntermediateDirectories: false)
}
@ -73,5 +76,7 @@ class DownloadTests: XCTestCase {
let latestHeight = await storage.latestHeight()
XCTAssertEqual(latestHeight, range.upperBound)
await compactBlockProcessor.stop()
}
}

View File

@ -182,14 +182,8 @@ class ClosureSynchronizerOfflineTests: XCTestCase {
stopCalled = true
}
let expectation = XCTestExpectation()
synchronizer.stop() {
XCTAssertTrue(stopCalled)
expectation.fulfill()
}
wait(for: [expectation], timeout: 0.5)
synchronizer.stop()
XCTAssertTrue(stopCalled)
}
func testGetSaplingAddressSucceed() {

View File

@ -215,27 +215,9 @@ class CombineSynchronizerOfflineTests: XCTestCase {
synchronizerMock.stopClosure = {
stopCalled = true
}
let expectation = XCTestExpectation()
synchronizer.stop()
.sink(
receiveCompletion: { result in
switch result {
case .finished:
XCTAssertTrue(stopCalled)
expectation.fulfill()
case let .failure(error):
XCTFail("Unpected failure with error: \(error)")
}
},
receiveValue: { _ in
XCTFail("No value is expected")
}
)
.store(in: &cancellables)
wait(for: [expectation], timeout: 0.5)
XCTAssertTrue(stopCalled)
}
func testGetSaplingAddressSucceed() {

View File

@ -369,7 +369,11 @@ final class FsBlockStorageTests: XCTestCase {
try await realCache.write(blocks: sandblastedBlocks)
XCTFail("This call should have failed")
} catch {
XCTAssertEqual(error as? ZcashError, ZcashError.blockRepositoryWriteBlock(sandblastedBlocks[0]))
if let error = error as? ZcashError, case let .blockRepositoryWriteBlock(url, _) = error {
XCTAssertEqual(url, sandblastedBlocks[0])
} else {
XCTFail("Unexpected error thrown: \(error)")
}
}
}

View File

@ -92,7 +92,7 @@ class Zip302MemoTests: XCTestCase {
XCTAssertThrowsError(try Memo(string: tooLongString)) { err in
guard let error = err as? ZcashError else {
XCTFail("Expected `ZCashError.memoTextInputTooLong` error but found \(err.localizedDescription)")
XCTFail("Expected `ZCashError.memoTextInputTooLong` error but found \(err)")
return
}
@ -100,7 +100,7 @@ class Zip302MemoTests: XCTestCase {
case .memoTextInputTooLong(let length):
XCTAssertEqual(length, 513)
default:
XCTFail("Expected `ZCashError.memoTextInputTooLong` error but found \(err.localizedDescription)")
XCTFail("Expected `ZCashError.memoTextInputTooLong` error but found \(err)")
}
}
}
@ -139,7 +139,7 @@ class Zip302MemoTests: XCTestCase {
case .memoTextInputTooLong(let count):
XCTAssertEqual(count, 515)
default:
XCTFail("Expected `ZCashError.memoTextInputTooLong` error but found \(err.localizedDescription)")
XCTFail("Expected `ZCashError.memoTextInputTooLong` error but found \(err)")
}
}
}
@ -171,7 +171,7 @@ class Zip302MemoTests: XCTestCase {
case .memoTextInputEndsWithNullBytes:
return
default:
XCTFail("Expected `ZCashError.memoTextInputEndsWithNullBytes` error but found \(thrownError.localizedDescription)")
XCTFail("Expected `ZCashError.memoTextInputEndsWithNullBytes` error but found \(thrownError)")
}
}
}
@ -189,7 +189,7 @@ class Zip302MemoTests: XCTestCase {
case .memoTextInputEndsWithNullBytes:
return
default:
XCTFail("Expected `ZCashError.memoTextInputEndsWithNullBytes` error but found \(thrownError.localizedDescription)")
XCTFail("Expected `ZCashError.memoTextInputEndsWithNullBytes` error but found \(thrownError)")
}
}
}

View File

@ -11,6 +11,8 @@ import Foundation
class ZcashConsoleFakeStorage: CompactBlockRepository {
func create() throws {}
func clear(upTo height: ZcashLightClientKit.BlockHeight) async throws { }
func clear() async throws {}
func write(blocks: [ZcashCompactBlock]) async throws {

View File

@ -102,11 +102,11 @@ class SynchronizerMock: Synchronizer {
var stopCalled: Bool {
return stopCallsCount > 0
}
var stopClosure: (() async -> Void)?
var stopClosure: (() -> Void)?
func stop() async {
func stop() {
stopCallsCount += 1
await stopClosure?()
stopClosure?()
}
// MARK: - getSaplingAddress

View File

@ -1,5 +1,8 @@
#!/bin/zsh
scriptDir=${0:a:h}
cd "${scriptDir}"
sourcery_version=2.0.2
if which sourcery >/dev/null; then

View File

@ -114,9 +114,9 @@ class TestCoordinator {
}
func stop() async throws {
await synchronizer.stop()
self.completionHandler = nil
self.errorHandler = nil
await synchronizer.blockProcessor.stop()
completionHandler = nil
errorHandler = nil
}
func setDarksideWalletState(_ state: DarksideData) throws {
@ -176,12 +176,6 @@ class TestCoordinator {
}
}
extension CompactBlockProcessor {
public func setConfig(_ config: Configuration) {
self.config = config
}
}
extension TestCoordinator {
func resetBlocks(dataset: DarksideData) throws {
switch dataset {
@ -235,7 +229,7 @@ extension TestCoordinator {
network: config.network
)
await self.synchronizer.blockProcessor.setConfig(newConfig)
await self.synchronizer.blockProcessor.update(config: newConfig)
}
try service.reset(saplingActivation: saplingActivation, branchID: branchID, chainName: chainName)