* [#449] Use CompactBlock Streamer download instead of batch downloade This commit implements a small buffer for the stream download operation so it does not store a block at a time and does it in batches instead. Closes #449 * Fix tests * PR Suggestions
This commit is contained in:
parent
a5d0e44774
commit
a2283f0171
|
@ -55,12 +55,30 @@ class CompactBlockStreamDownloadOperation: ZcashOperation {
|
||||||
private var cancelable: CancellableCall?
|
private var cancelable: CancellableCall?
|
||||||
private var startHeight: BlockHeight?
|
private var startHeight: BlockHeight?
|
||||||
private var targetHeight: BlockHeight?
|
private var targetHeight: BlockHeight?
|
||||||
|
private var blockBufferSize: Int
|
||||||
|
private var buffer: [ZcashCompactBlock] = []
|
||||||
|
|
||||||
private weak var progressDelegate: CompactBlockProgressDelegate?
|
private weak var progressDelegate: CompactBlockProgressDelegate?
|
||||||
|
|
||||||
|
/// Creates an Compact Block Stream Download Operation Operation
|
||||||
|
/// - Parameters:
|
||||||
|
/// - service: instance that conforms to `LightWalletService`
|
||||||
|
/// - storage: instance that conforms to `CompactBlockStorage`
|
||||||
|
/// - blockBufferSize: the number of blocks that the stream downloader will store in memory
|
||||||
|
/// before writing them to disk. Making this number smaller makes the downloader easier on RAM
|
||||||
|
/// memory while being less efficient on disk writes. Making it bigger takes up more RAM memory
|
||||||
|
/// but is less straining on Disk Writes. Too little or too big buffer will make this less efficient.
|
||||||
|
/// - startHeight: the height this downloader will start downloading from. If `nil`,
|
||||||
|
/// it will start from the latest height found on the local cacheDb
|
||||||
|
/// - targetHeight: the upper bound for this stream download. If `nil`, the
|
||||||
|
/// streamer will call `service.latestBlockHeight()`
|
||||||
|
/// - progressDelegate: Optional delegate to report ongoing progress conforming to
|
||||||
|
/// `CompactBlockProgressDelegate`
|
||||||
|
///
|
||||||
required init(
|
required init(
|
||||||
service: LightWalletService,
|
service: LightWalletService,
|
||||||
storage: CompactBlockStorage,
|
storage: CompactBlockStorage,
|
||||||
|
blockBufferSize: Int,
|
||||||
startHeight: BlockHeight? = nil,
|
startHeight: BlockHeight? = nil,
|
||||||
targetHeight: BlockHeight? = nil,
|
targetHeight: BlockHeight? = nil,
|
||||||
progressDelegate: CompactBlockProgressDelegate? = nil
|
progressDelegate: CompactBlockProgressDelegate? = nil
|
||||||
|
@ -70,6 +88,7 @@ class CompactBlockStreamDownloadOperation: ZcashOperation {
|
||||||
self.startHeight = startHeight
|
self.startHeight = startHeight
|
||||||
self.targetHeight = targetHeight
|
self.targetHeight = targetHeight
|
||||||
self.progressDelegate = progressDelegate
|
self.progressDelegate = progressDelegate
|
||||||
|
self.blockBufferSize = blockBufferSize
|
||||||
super.init()
|
super.init()
|
||||||
self.name = "Download Stream Operation"
|
self.name = "Download Stream Operation"
|
||||||
}
|
}
|
||||||
|
@ -96,7 +115,12 @@ class CompactBlockStreamDownloadOperation: ZcashOperation {
|
||||||
case .success(let result):
|
case .success(let result):
|
||||||
switch result {
|
switch result {
|
||||||
case .success:
|
case .success:
|
||||||
self?.done = true
|
do {
|
||||||
|
try self?.flush()
|
||||||
|
self?.done = true
|
||||||
|
} catch {
|
||||||
|
self?.fail(error: error)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
case .error(let e):
|
case .error(let e):
|
||||||
self?.fail(error: e)
|
self?.fail(error: e)
|
||||||
|
@ -111,7 +135,7 @@ class CompactBlockStreamDownloadOperation: ZcashOperation {
|
||||||
} handler: {[weak self] block in
|
} handler: {[weak self] block in
|
||||||
guard let self = self else { return }
|
guard let self = self else { return }
|
||||||
do {
|
do {
|
||||||
try self.storage.insert(block)
|
try self.cache(block, flushCache: false)
|
||||||
} catch {
|
} catch {
|
||||||
self.fail(error: error)
|
self.fail(error: error)
|
||||||
}
|
}
|
||||||
|
@ -136,6 +160,19 @@ class CompactBlockStreamDownloadOperation: ZcashOperation {
|
||||||
self.cancelable?.cancel()
|
self.cancelable?.cancel()
|
||||||
super.cancel()
|
super.cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func cache(_ block: ZcashCompactBlock, flushCache: Bool) throws {
|
||||||
|
self.buffer.append(block)
|
||||||
|
|
||||||
|
if flushCache || buffer.count >= blockBufferSize {
|
||||||
|
try flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func flush() throws {
|
||||||
|
try self.storage.write(blocks: self.buffer)
|
||||||
|
self.buffer.removeAll(keepingCapacity: true)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class CompactBlockBatchDownloadOperation: ZcashOperation {
|
class CompactBlockBatchDownloadOperation: ZcashOperation {
|
||||||
|
|
|
@ -227,7 +227,7 @@ public class CompactBlockProcessor {
|
||||||
public var maxBackoffInterval = ZcashSDK.defaultMaxBackOffInterval
|
public var maxBackoffInterval = ZcashSDK.defaultMaxBackOffInterval
|
||||||
public var rewindDistance = ZcashSDK.defaultRewindDistance
|
public var rewindDistance = ZcashSDK.defaultRewindDistance
|
||||||
public var walletBirthday: BlockHeight
|
public var walletBirthday: BlockHeight
|
||||||
|
public private(set) var downloadBufferSize: Int = 10
|
||||||
private(set) var network: ZcashNetwork
|
private(set) var network: ZcashNetwork
|
||||||
private(set) var saplingActivation: BlockHeight
|
private(set) var saplingActivation: BlockHeight
|
||||||
|
|
||||||
|
@ -624,14 +624,12 @@ public class CompactBlockProcessor {
|
||||||
self.backoffTimer = nil
|
self.backoffTimer = nil
|
||||||
|
|
||||||
let cfg = self.config
|
let cfg = self.config
|
||||||
|
let downloadBlockOperation = CompactBlockStreamDownloadOperation(
|
||||||
let downloadBlockOperation = CompactBlockBatchDownloadOperation(
|
|
||||||
service: self.service,
|
service: self.service,
|
||||||
storage: self.storage,
|
storage: self.storage,
|
||||||
|
blockBufferSize: self.config.downloadBufferSize,
|
||||||
startHeight: range.lowerBound,
|
startHeight: range.lowerBound,
|
||||||
targetHeight: range.upperBound,
|
targetHeight: range.upperBound,
|
||||||
batchSize: self.config.downloadBatchSize,
|
|
||||||
maxRetries: self.config.retries,
|
|
||||||
progressDelegate: self
|
progressDelegate: self
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -452,7 +452,7 @@ class AdvancedReOrgTests: XCTestCase {
|
||||||
afterReorgSync.fulfill()
|
afterReorgSync.fulfill()
|
||||||
}, error: self.handleError)
|
}, error: self.handleError)
|
||||||
|
|
||||||
wait(for: [reorgExpectation, afterReorgSync], timeout: 15)
|
wait(for: [reorgExpectation, afterReorgSync], timeout: 30)
|
||||||
|
|
||||||
XCTAssertEqual(postReorgVerifiedBalance, preReorgVerifiedBalance)
|
XCTAssertEqual(postReorgVerifiedBalance, preReorgVerifiedBalance)
|
||||||
XCTAssertEqual(postReorgTotalBalance, preReorgTotalBalance)
|
XCTAssertEqual(postReorgTotalBalance, preReorgTotalBalance)
|
||||||
|
|
|
@ -185,6 +185,7 @@ class BlockScanOperationTests: XCTestCase {
|
||||||
let downloadOperation = CompactBlockStreamDownloadOperation(
|
let downloadOperation = CompactBlockStreamDownloadOperation(
|
||||||
service: service,
|
service: service,
|
||||||
storage: storage,
|
storage: storage,
|
||||||
|
blockBufferSize: 10,
|
||||||
startHeight: walletBirthDay.height,
|
startHeight: walletBirthDay.height,
|
||||||
targetHeight: walletBirthDay.height + 10000,
|
targetHeight: walletBirthDay.height + 10000,
|
||||||
progressDelegate: self
|
progressDelegate: self
|
||||||
|
|
|
@ -77,6 +77,7 @@ class BlockStreamingTest: XCTestCase {
|
||||||
let operation = CompactBlockStreamDownloadOperation(
|
let operation = CompactBlockStreamDownloadOperation(
|
||||||
service: service,
|
service: service,
|
||||||
storage: storage,
|
storage: storage,
|
||||||
|
blockBufferSize: 10,
|
||||||
startHeight: startHeight,
|
startHeight: startHeight,
|
||||||
progressDelegate: self
|
progressDelegate: self
|
||||||
)
|
)
|
||||||
|
@ -114,6 +115,7 @@ class BlockStreamingTest: XCTestCase {
|
||||||
let operation = CompactBlockStreamDownloadOperation(
|
let operation = CompactBlockStreamDownloadOperation(
|
||||||
service: service,
|
service: service,
|
||||||
storage: storage,
|
storage: storage,
|
||||||
|
blockBufferSize: 10,
|
||||||
startHeight: startHeight,
|
startHeight: startHeight,
|
||||||
progressDelegate: self
|
progressDelegate: self
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue