Merge branch 'master' of github.com:ethcore/parity-bitcoin into synchronizer

This commit is contained in:
debris 2016-11-16 09:31:33 +01:00
commit 1c1d342f73
19 changed files with 486 additions and 128 deletions

View File

@ -80,7 +80,7 @@ pub struct Storage {
best_block: RwLock<Option<BestBlock>>,
}
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum MetaError {
UnsupportedVersion,
}
@ -92,8 +92,36 @@ pub enum Error {
DB(String),
/// Io error
Io(std::io::Error),
/// Invalid meta info
/// Invalid meta info (while opening the database)
Meta(MetaError),
/// Database blockchain consistency error
Consistency(ConsistencyError),
}
impl Error {
fn unknown_hash(h: &H256) -> Self {
Error::Consistency(ConsistencyError::Unknown(h.clone()))
}
fn unknown_number(n: u32) -> Self {
Error::Consistency(ConsistencyError::UnknownNumber(n))
}
fn double_spend(h: &H256) -> Self {
Error::Consistency(ConsistencyError::DoubleSpend(h.clone()))
}
fn not_main(h: &H256) -> Self {
Error::Consistency(ConsistencyError::NotMain(h.clone()))
}
fn reorganize(h: &H256) -> Self {
Error::Consistency(ConsistencyError::Reorganize(h.clone()))
}
}
#[derive(Debug, PartialEq)]
pub enum ConsistencyError {
/// Unknown hash
Unknown(H256),
/// Unknown number
@ -104,8 +132,12 @@ pub enum Error {
ForkTooLong,
/// Main chain block transaction attempts to double-spend
DoubleSpend(H256),
/// Transaction tries to spend
UnknownSpending(H256),
/// Chain has no best block
NoBestBlock,
/// Failed reorganization caused by block
Reorganize(H256),
}
impl From<String> for Error {
@ -305,7 +337,7 @@ impl Storage {
if !match context.meta.get_mut(&input.previous_output.hash) {
Some(ref mut meta) => {
if meta.is_spent(input.previous_output.index as usize) {
return Err(Error::DoubleSpend(input.previous_output.hash.clone()));
return Err(Error::double_spend(&input.previous_output.hash));
}
meta.note_used(input.previous_output.index as usize);
@ -314,14 +346,13 @@ impl Storage {
None => false,
} {
let mut meta =
self.transaction_meta(&input.previous_output.hash)
.unwrap_or_else(|| panic!(
"No transaction metadata for {}! Corrupted DB? Reindex?",
&input.previous_output.hash
));
try!(
self.transaction_meta(&input.previous_output.hash)
.ok_or(Error::Consistency(ConsistencyError::UnknownSpending(input.previous_output.hash.clone())))
);
if meta.is_spent(input.previous_output.index as usize) {
return Err(Error::DoubleSpend(input.previous_output.hash.clone()));
return Err(Error::double_spend(&input.previous_output.hash));
}
meta.note_used(input.previous_output.index as usize);
@ -344,7 +375,7 @@ impl Storage {
trace!(target: "reorg", "Decanonizing block {}", hash);
// ensure that block is of the main chain
try!(self.block_number(hash).ok_or(Error::NotMain(hash.clone())));
try!(self.block_number(hash).ok_or(Error::not_main(hash)));
// only canonical blocks have numbers, so remove this number entry for the hash
context.db_transaction.delete(Some(COL_BLOCK_NUMBERS), &**hash);
@ -371,6 +402,8 @@ impl Storage {
let mut meta =
self.transaction_meta(&input.previous_output.hash)
.unwrap_or_else(|| panic!(
// decanonization should always have meta
// because block could not have made canonical without writing meta
"No transaction metadata for {}! Corrupted DB? Reindex?",
&input.previous_output.hash
));
@ -389,7 +422,7 @@ impl Storage {
/// Returns the height where the fork occurred and chain up to this place (not including last canonical hash)
fn fork_route(&self, max_route: usize, hash: &H256) -> Result<(u32, Vec<H256>), Error> {
let header = try!(self.block_header_by_hash(hash).ok_or(Error::Unknown(hash.clone())));
let header = try!(self.block_header_by_hash(hash).ok_or(Error::unknown_hash(hash)));
// only main chain blocks has block numbers
// so if it has, it is not a fork and we return empty route
@ -405,10 +438,10 @@ impl Storage {
return Ok((number, result));
}
result.push(next_hash.clone());
next_hash = try!(self.block_header_by_hash(&next_hash).ok_or(Error::Unknown(hash.clone())))
next_hash = try!(self.block_header_by_hash(&next_hash).ok_or(Error::unknown_hash(hash)))
.previous_header_hash;
}
Err(Error::ForkTooLong)
Err(Error::Consistency(ConsistencyError::ForkTooLong))
}
fn best_number(&self) -> Option<u32> {
@ -460,11 +493,11 @@ impl Storage {
return Ok(None);
}
let mut now_best = try!(self.best_number().ok_or(Error::NoBestBlock));
let mut now_best = try!(self.best_number().ok_or(Error::Consistency(ConsistencyError::NoBestBlock)));
// decanonizing main chain to the split point
loop {
let next_decanonize = try!(self.block_hash(now_best).ok_or(Error::UnknownNumber(now_best)));
let next_decanonize = try!(self.block_hash(now_best).ok_or(Error::unknown_number(now_best)));
try!(self.decanonize_block(context, &next_decanonize));
now_best -= 1;
@ -591,16 +624,49 @@ impl Store for Storage {
// the block does not continue the main chain
// but can cause reorganization here
// this can canonize the block parent if block parent + this block is longer than the main chain
else if let Some((reorg_number, _)) = self.maybe_reorganize(&mut context, &block.header().previous_header_hash).unwrap_or(None) {
// if so, we have new best main chain block
new_best_number = reorg_number + 1;
new_best_hash = block_hash;
else {
match self.maybe_reorganize(&mut context, &block.header().previous_header_hash) {
Ok(Some((reorg_number, _))) => {
// if so, we have new best main chain block
new_best_number = reorg_number + 1;
new_best_hash = block_hash;
// and we canonize it also by provisioning transactions
try!(self.update_transactions_meta(&mut context, new_best_number, block.transactions()));
context.db_transaction.write_u32(Some(COL_META), KEY_BEST_BLOCK_NUMBER, new_best_number);
context.db_transaction.put(Some(COL_BLOCK_HASHES), &u32_key(new_best_number), std::ops::Deref::deref(&new_best_hash));
context.db_transaction.write_u32(Some(COL_BLOCK_NUMBERS), std::ops::Deref::deref(&new_best_hash), new_best_number);
// and we canonize it also by provisioning transactions
try!(self.update_transactions_meta(&mut context, new_best_number, block.transactions()));
context.db_transaction.write_u32(Some(COL_META), KEY_BEST_BLOCK_NUMBER, new_best_number);
context.db_transaction.put(Some(COL_BLOCK_HASHES), &u32_key(new_best_number), std::ops::Deref::deref(&new_best_hash));
context.db_transaction.write_u32(Some(COL_BLOCK_NUMBERS), std::ops::Deref::deref(&new_best_hash), new_best_number);
},
Err(Error::Consistency(consistency_error)) => {
match consistency_error {
ConsistencyError::DoubleSpend(hash) => {
warn!(target: "reorg", "Failed to reorganize to {} due to double-spend at {}", &block_hash, &hash);
// return without any commit
return Err(Error::reorganize(&hash));
},
ConsistencyError::UnknownSpending(hash) => {
warn!(target: "reorg", "Failed to reorganize to {} due to spending unknown transaction {}", &block_hash, &hash);
// return without any commit
return Err(Error::reorganize(&hash));
},
ConsistencyError::Unknown(hash) => {
// this is orphan block inserted or disconnected chain head updated, we allow that (by now)
// so it is no-op
warn!(target: "reorg", "Disconnected chain head {} updated with {}", &hash, &block_hash);
},
_ => {
// we don't allow other errors on side chain/orphans
return Err(Error::Consistency(consistency_error))
}
}
},
Err(e) => {
return Err(e)
},
Ok(None) => {
// reorganize didn't happen but the block is ok, no-op here
}
}
}
// we always update best hash even if it is not changed
@ -655,7 +721,7 @@ impl Store for Storage {
#[cfg(test)]
mod tests {
use super::{Storage, Store, UpdateContext};
use super::{Storage, Store, UpdateContext, Error, ConsistencyError};
use devtools::RandomTempPath;
use chain::{Block, RepresentH256};
use super::super::{BlockRef, BlockLocation};
@ -1263,6 +1329,46 @@ mod tests {
assert_eq!(None, location);
}
#[test]
fn double_spend() {
let path = RandomTempPath::create_dir();
let store = Storage::new(path.as_path()).unwrap();
let genesis = test_data::genesis();
store.insert_block(&genesis).unwrap();
let genesis_coinbase = genesis.transactions()[0].hash();
let block = test_data::block_builder()
.header().parent(genesis.hash()).build()
.transaction().coinbase().build()
.transaction()
.input().hash(genesis_coinbase.clone()).build()
.build()
.build();
store.insert_block(&block).expect("inserting first block in the double spend test should not fail");
let _dup_block = test_data::block_builder()
.header().parent(block.hash()).build()
.transaction().coinbase().build()
.transaction()
.input().hash(genesis_coinbase.clone()).build()
.build()
.build();
let insert_result = store.insert_block(&block);
if insert_result.is_ok() { panic!("Insert should fail because of the double-spend"); }
let _ = insert_result.map_err(|e| {
let should_be = ConsistencyError::DoubleSpend(genesis_coinbase);
if let Error::Consistency(consistency_error) = e {
assert_eq!(should_be, consistency_error, "Store should return double spend consistency return");
}
else { panic!("Insert should fail because of the double-spend"); }
});
}
#[test]
fn fork_route() {
let path = RandomTempPath::create_dir();

View File

@ -0,0 +1,38 @@
use super::Magic;
#[derive(Debug, Clone)]
/// Parameters that influence chain consensus.
pub struct ConsensusParams {
/// Block height at which BIP65 becomes active.
/// See https://github.com/bitcoin/bips/blob/master/bip-0065.mediawiki
pub bip65_height: u32,
}
impl ConsensusParams {
pub fn with_magic(magic: Magic) -> Self {
match magic {
Magic::Mainnet => ConsensusParams {
bip65_height: 388381, // 000000000000000004c2b624ed5d7756c508d90fd0da2c7c679febfa6c4735f0
},
Magic::Testnet => ConsensusParams {
bip65_height: 581885, // 00000000007f6655f22f98e72ed80d8b06dc761d5da09df0fa1dc4be4f861eb6
},
Magic::Regtest => ConsensusParams {
bip65_height: 1351,
},
}
}
}
#[cfg(test)]
mod tests {
use super::super::Magic;
use super::ConsensusParams;
#[test]
fn test_consensus_params_bip65_height() {
assert_eq!(ConsensusParams::with_magic(Magic::Mainnet).bip65_height, 388381);
assert_eq!(ConsensusParams::with_magic(Magic::Testnet).bip65_height, 581885);
assert_eq!(ConsensusParams::with_magic(Magic::Regtest).bip65_height, 1351);
}
}

View File

@ -4,6 +4,7 @@
use ser::{Stream, Serializable};
use chain::Block;
use Error;
use super::ConsensusParams;
const MAGIC_MAINNET: u32 = 0xD9B4BEF9;
const MAGIC_TESTNET: u32 = 0x0709110B;
@ -62,6 +63,10 @@ impl Magic {
Magic::Regtest => "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4adae5494dffff7f20020000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into(),
}
}
pub fn consensus_params(&self) -> ConsensusParams {
ConsensusParams::with_magic(*self)
}
}
impl Serializable for Magic {

View File

@ -3,6 +3,7 @@ mod block_header_and_ids;
mod block_transactions;
mod block_transactions_request;
mod command;
mod consensus;
mod inventory;
mod ip;
mod magic;
@ -15,6 +16,7 @@ pub use self::block_header_and_ids::BlockHeaderAndIDs;
pub use self::block_transactions::BlockTransactions;
pub use self::block_transactions_request::BlockTransactionsRequest;
pub use self::command::Command;
pub use self::consensus::ConsensusParams;
pub use self::inventory::{InventoryVector, InventoryType};
pub use self::ip::IpAddress;
pub use self::magic::Magic;

View File

@ -29,9 +29,6 @@ args:
- printtoconsole:
long: printtoconsole
help: Send trace/debug info to console instead of debug.log file
- diskdb:
long: diskdb
help: Use disk storage instead of in-memory one
subcommands:
- import:
about: Import blocks from bitcoin core database

View File

@ -30,7 +30,7 @@ pub fn start(cfg: config::Config) -> Result<(), String> {
};
let sync_handle = el.handle();
let sync_connection_factory = create_sync_connection_factory(&sync_handle, db);
let sync_connection_factory = create_sync_connection_factory(&sync_handle, cfg.magic.consensus_params(), db);
let p2p = try!(p2p::P2P::new(p2p_cfg, sync_connection_factory, el.handle()).map_err(|x| x.to_string()));
try!(p2p.run().map_err(|_| "Failed to start p2p module"));

View File

@ -8,12 +8,10 @@ pub struct Config {
pub connect: Option<net::SocketAddr>,
pub seednode: Option<String>,
pub print_to_console: bool,
pub use_disk_database: bool,
}
pub fn parse(matches: &clap::ArgMatches) -> Result<Config, String> {
let print_to_console = matches.is_present("printtoconsole");
let use_disk_database = matches.is_present("diskdb");
let magic = match (matches.is_present("testnet"), matches.is_present("regtest")) {
(true, false) => Magic::Testnet,
(false, true) => Magic::Regtest,
@ -47,7 +45,6 @@ pub fn parse(matches: &clap::ArgMatches) -> Result<Config, String> {
port: port,
connect: connect,
seednode: seednode,
use_disk_database: use_disk_database,
};
Ok(config)

View File

@ -5,13 +5,9 @@ use chain::RepresentH256;
use {db, APP_INFO};
use config::Config;
pub fn open_db(cfg: &Config) -> Arc<db::Store> {
if cfg.use_disk_database {
let db_path = app_dir(AppDataType::UserData, &APP_INFO, "db").expect("Failed to get app dir");
Arc::new(db::Storage::new(db_path).expect("Failed to open database"))
} else {
Arc::new(db::TestStorage::default())
}
pub fn open_db(_cfg: &Config) -> Arc<db::Store> {
let db_path = app_dir(AppDataType::UserData, &APP_INFO, "db").expect("Failed to get app dir");
Arc::new(db::Storage::new(db_path).expect("Failed to open database"))
}
pub fn node_table_path() -> PathBuf {

View File

@ -71,5 +71,10 @@ impl VerificationFlags {
self.verify_p2sh = value;
self
}
pub fn verify_clocktimeverify(mut self, value: bool) -> Self {
self.verify_clocktimeverify = value;
self
}
}

View File

@ -447,35 +447,37 @@ pub fn eval_script(
},
Opcode::OP_NOP => break,
Opcode::OP_CHECKLOCKTIMEVERIFY => {
if !flags.verify_clocktimeverify && flags.verify_discourage_upgradable_nops {
if flags.verify_discourage_upgradable_nops {
return Err(Error::DiscourageUpgradableNops);
}
// Note that elsewhere numeric opcodes are limited to
// operands in the range -2**31+1 to 2**31-1, however it is
// legal for opcodes to produce results exceeding that
// range. This limitation is implemented by CScriptNum's
// default 4-byte limit.
//
// If we kept to that limit we'd have a year 2038 problem,
// even though the nLockTime field in transactions
// themselves is uint32 which only becomes meaningless
// after the year 2106.
//
// Thus as a special case we tell CScriptNum to accept up
// to 5-byte bignums, which are good until 2**39-1, well
// beyond the 2**32-1 limit of the nLockTime field itself.
let lock_time = try!(Num::from_slice(try!(stack.last()), flags.verify_minimaldata, 5));
if flags.verify_clocktimeverify {
// Note that elsewhere numeric opcodes are limited to
// operands in the range -2**31+1 to 2**31-1, however it is
// legal for opcodes to produce results exceeding that
// range. This limitation is implemented by CScriptNum's
// default 4-byte limit.
//
// If we kept to that limit we'd have a year 2038 problem,
// even though the nLockTime field in transactions
// themselves is uint32 which only becomes meaningless
// after the year 2106.
//
// Thus as a special case we tell CScriptNum to accept up
// to 5-byte bignums, which are good until 2**39-1, well
// beyond the 2**32-1 limit of the nLockTime field itself.
let lock_time = try!(Num::from_slice(try!(stack.last()), flags.verify_minimaldata, 5));
// In the rare event that the argument may be < 0 due to
// some arithmetic being done first, you can always use
// 0 MAX CHECKLOCKTIMEVERIFY.
if lock_time.is_negative() {
return Err(Error::NegativeLocktime);
}
// In the rare event that the argument may be < 0 due to
// some arithmetic being done first, you can always use
// 0 MAX CHECKLOCKTIMEVERIFY.
if lock_time.is_negative() {
return Err(Error::NegativeLocktime);
}
if !checker.check_lock_time(lock_time) {
return Err(Error::UnsatisfiedLocktime);
if !checker.check_lock_time(lock_time) {
return Err(Error::UnsatisfiedLocktime);
}
}
},
Opcode::OP_CHECKSEQUENCEVERIFY => {
@ -1870,4 +1872,26 @@ mod tests {
.verify_p2sh(true);
assert_eq!(verify_script(&input, &output, &flags, &checker), Ok(()));
}
// https://blockchain.info/rawtx/eb3b82c0884e3efa6d8b0be55b4915eb20be124c9766245bcc7f34fdac32bccb
#[test]
fn test_transaction_bip65() {
let tx: Transaction = "01000000024de8b0c4c2582db95fa6b3567a989b664484c7ad6672c85a3da413773e63fdb8000000006b48304502205b282fbc9b064f3bc823a23edcc0048cbb174754e7aa742e3c9f483ebe02911c022100e4b0b3a117d36cab5a67404dddbf43db7bea3c1530e0fe128ebc15621bd69a3b0121035aa98d5f77cd9a2d88710e6fc66212aff820026f0dad8f32d1f7ce87457dde50ffffffff4de8b0c4c2582db95fa6b3567a989b664484c7ad6672c85a3da413773e63fdb8010000006f004730440220276d6dad3defa37b5f81add3992d510d2f44a317fd85e04f93a1e2daea64660202200f862a0da684249322ceb8ed842fb8c859c0cb94c81e1c5308b4868157a428ee01ab51210232abdc893e7f0631364d7fd01cb33d24da45329a00357b3a7886211ab414d55a51aeffffffff02e0fd1c00000000001976a914380cb3c594de4e7e9b8e18db182987bebb5a4f7088acc0c62d000000000017142a9bc5447d664c1d0141392a842d23dba45c4f13b17500000000".into();
let signer: TransactionInputSigner = tx.into();
let checker = TransactionSignatureChecker {
signer: signer,
input_index: 1,
};
let input: Script = "004730440220276d6dad3defa37b5f81add3992d510d2f44a317fd85e04f93a1e2daea64660202200f862a0da684249322ceb8ed842fb8c859c0cb94c81e1c5308b4868157a428ee01ab51210232abdc893e7f0631364d7fd01cb33d24da45329a00357b3a7886211ab414d55a51ae".into();
let output: Script = "142a9bc5447d664c1d0141392a842d23dba45c4f13b175".into();
let flags = VerificationFlags::default()
.verify_p2sh(true);
assert_eq!(verify_script(&input, &output, &flags, &checker), Ok(()));
let flags = VerificationFlags::default()
.verify_p2sh(true)
.verify_clocktimeverify(true);
assert_eq!(verify_script(&input, &output, &flags, &checker), Err(Error::NumberOverflow));
}
}

View File

@ -313,7 +313,7 @@ impl Script {
Opcodes { position: 0, script: self }
}
pub fn sigop_count(&self) -> Result<usize, Error> {
pub fn sigop_count(&self, accurate: bool) -> Result<usize, Error> {
let mut last_opcode = Opcode::OP_0;
let mut result = 0;
for opcode in self.opcodes() {
@ -322,30 +322,34 @@ impl Script {
match opcode {
Opcode::OP_CHECKSIG | Opcode::OP_CHECKSIGVERIFY => { result += 1; },
Opcode::OP_CHECKMULTISIG | Opcode::OP_CHECKMULTISIGVERIFY => {
match last_opcode {
Opcode::OP_1 |
Opcode::OP_2 |
Opcode::OP_3 |
Opcode::OP_4 |
Opcode::OP_5 |
Opcode::OP_6 |
Opcode::OP_7 |
Opcode::OP_8 |
Opcode::OP_9 |
Opcode::OP_10 |
Opcode::OP_11 |
Opcode::OP_12 |
Opcode::OP_13 |
Opcode::OP_14 |
Opcode::OP_15 |
Opcode::OP_16 => {
result += (last_opcode as u8 - (Opcode::OP_1 as u8 - 1)) as usize;
},
_ => {
result += MAX_PUBKEYS_PER_MULTISIG;
if accurate {
match last_opcode {
Opcode::OP_1 |
Opcode::OP_2 |
Opcode::OP_3 |
Opcode::OP_4 |
Opcode::OP_5 |
Opcode::OP_6 |
Opcode::OP_7 |
Opcode::OP_8 |
Opcode::OP_9 |
Opcode::OP_10 |
Opcode::OP_11 |
Opcode::OP_12 |
Opcode::OP_13 |
Opcode::OP_14 |
Opcode::OP_15 |
Opcode::OP_16 => {
result += (last_opcode as u8 - (Opcode::OP_1 as u8 - 1)) as usize;
},
_ => {
result += MAX_PUBKEYS_PER_MULTISIG;
}
}
}
else {
result += MAX_PUBKEYS_PER_MULTISIG;
}
},
_ => { }
};
@ -529,9 +533,10 @@ OP_ADD
#[test]
fn test_sigops_count() {
assert_eq!(1usize, Script::from("76a914aab76ba4877d696590d94ea3e02948b55294815188ac").sigop_count().unwrap());
assert_eq!(2usize, Script::from("522102004525da5546e7603eefad5ef971e82f7dad2272b34e6b3036ab1fe3d299c22f21037d7f2227e6c646707d1c61ecceb821794124363a2cf2c1d2a6f28cf01e5d6abe52ae").sigop_count().unwrap());
assert_eq!(0usize, Script::from("a9146262b64aec1f4a4c1d21b32e9c2811dd2171fd7587").sigop_count().unwrap());
assert_eq!(1usize, Script::from("4104ae1a62fe09c5f51b13905f07f06b99a2f7159b2225f374cd378d71302fa28414e7aab37397f554a7df5f142c21c1b7303b8a0626f1baded5c72a704f7e6cd84cac").sigop_count().unwrap());
assert_eq!(1usize, Script::from("76a914aab76ba4877d696590d94ea3e02948b55294815188ac").sigop_count(false).unwrap());
assert_eq!(2usize, Script::from("522102004525da5546e7603eefad5ef971e82f7dad2272b34e6b3036ab1fe3d299c22f21037d7f2227e6c646707d1c61ecceb821794124363a2cf2c1d2a6f28cf01e5d6abe52ae").sigop_count(true).unwrap());
assert_eq!(20usize, Script::from("522102004525da5546e7603eefad5ef971e82f7dad2272b34e6b3036ab1fe3d299c22f21037d7f2227e6c646707d1c61ecceb821794124363a2cf2c1d2a6f28cf01e5d6abe52ae").sigop_count(false).unwrap());
assert_eq!(0usize, Script::from("a9146262b64aec1f4a4c1d21b32e9c2811dd2171fd7587").sigop_count(false).unwrap());
assert_eq!(1usize, Script::from("4104ae1a62fe09c5f51b13905f07f06b99a2f7159b2225f374cd378d71302fa28414e7aab37397f554a7df5f142c21c1b7303b8a0626f1baded5c72a704f7e6cd84cac").sigop_count(false).unwrap());
}
}

View File

@ -33,6 +33,7 @@ mod synchronization_server;
use std::sync::Arc;
use parking_lot::RwLock;
use tokio_core::reactor::Handle;
use message::common::ConsensusParams;
/// Sync errors.
#[derive(Debug)]
@ -51,7 +52,7 @@ pub fn create_sync_blocks_writer(db: Arc<db::Store>) -> blocks_writer::BlocksWri
}
/// Create inbound synchronization connections factory for given `db`.
pub fn create_sync_connection_factory(handle: &Handle, db: Arc<db::Store>) -> p2p::LocalSyncNodeRef {
pub fn create_sync_connection_factory(handle: &Handle, consensus_params: ConsensusParams, db: Arc<db::Store>) -> p2p::LocalSyncNodeRef {
use synchronization_chain::Chain as SyncChain;
use synchronization_executor::LocalSynchronizationTaskExecutor as SyncExecutor;
use local_node::LocalNode as SyncNode;
@ -62,7 +63,7 @@ pub fn create_sync_connection_factory(handle: &Handle, db: Arc<db::Store>) -> p2
let sync_chain = Arc::new(RwLock::new(SyncChain::new(db)));
let sync_executor = SyncExecutor::new(sync_chain.clone());
let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone()));
let sync_client = SynchronizationClient::new(SynchronizationConfig::default(), handle, sync_executor.clone(), sync_chain);
let sync_client = SynchronizationClient::new(SynchronizationConfig::with_consensus_params(consensus_params), handle, sync_executor.clone(), sync_chain);
let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor));
SyncConnectionFactory::with_local_node(sync_node)
}

View File

@ -4,11 +4,12 @@ use parking_lot::Mutex;
use db;
use chain::RepresentH256;
use p2p::OutboundSyncConnectionRef;
use message::common::InventoryType;
use message::common::{InventoryType, InventoryVector};
use message::types;
use synchronization_client::{Client, SynchronizationClient};
use synchronization_executor::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor, LocalSynchronizationTaskExecutor};
use synchronization_server::{Server, SynchronizationServer};
use primitives::hash::H256;
pub type LocalNodeRef = Arc<LocalNode<LocalSynchronizationTaskExecutor, SynchronizationServer, SynchronizationClient<LocalSynchronizationTaskExecutor>>>;
@ -83,13 +84,8 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
// (2) with 500 entries
// what is (1)?
// process blocks first
let blocks_inventory: Vec<_> = message.inventory.iter()
.filter(|item| item.inv_type == InventoryType::MessageBlock)
.map(|item| item.hash.clone())
.collect();
// if there are unknown blocks => start synchronizing with peer
let blocks_inventory = self.blocks_inventory(&message.inventory);
if !blocks_inventory.is_empty() {
self.client.lock().on_new_blocks_inventory(peer_index, blocks_inventory);
}
@ -199,8 +195,18 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
trace!(target: "sync", "Got `blocktxn` message from peer#{}", peer_index);
}
pub fn on_peer_notfound(&self, peer_index: usize, _message: types::NotFound) {
pub fn on_peer_notfound(&self, peer_index: usize, message: types::NotFound) {
trace!(target: "sync", "Got `notfound` message from peer#{}", peer_index);
let blocks_inventory = self.blocks_inventory(&message.inventory);
self.client.lock().on_peer_blocks_notfound(peer_index, blocks_inventory);
}
fn blocks_inventory(&self, inventory: &Vec<InventoryVector>) -> Vec<H256> {
inventory.iter()
.filter(|item| item.inv_type == InventoryType::MessageBlock)
.map(|item| item.hash.clone())
.collect()
}
}
@ -215,7 +221,7 @@ mod tests {
use synchronization_chain::Chain;
use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef};
use message::types;
use message::common::{InventoryVector, InventoryType};
use message::common::{Magic, ConsensusParams, InventoryVector, InventoryType};
use db;
use super::LocalNode;
use test_data;
@ -259,7 +265,7 @@ mod tests {
let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block()))));
let executor = DummyTaskExecutor::new();
let server = Arc::new(DummyServer::new());
let config = Config { threads_num: 1, skip_verification: true };
let config = Config { consensus_params: ConsensusParams::with_magic(Magic::Mainnet), threads_num: 1, skip_verification: true };
let client = SynchronizationClient::new(config, &handle, executor.clone(), chain);
let local_node = LocalNode::new(server.clone(), client, executor.clone());
(event_loop, handle, executor, server, local_node)

View File

@ -12,13 +12,14 @@ use futures_cpupool::CpuPool;
use linked_hash_map::LinkedHashMap;
use db;
use chain::{Block, BlockHeader, RepresentH256};
use message::common::ConsensusParams;
use primitives::hash::H256;
use synchronization_peers::Peers;
#[cfg(test)] use synchronization_peers::{Information as PeersInformation};
use synchronization_chain::{Chain, ChainRef, BlockState, HeadersIntersection};
#[cfg(test)]
use synchronization_chain::{Information as ChainInformation};
use verification::{ChainVerifier, Error as VerificationError, Verify};
use verification::{ChainVerifier, Verify};
use synchronization_executor::{Task, TaskExecutor};
use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchronization_peers_inventory,
manage_unknown_orphaned_blocks, MANAGEMENT_INTERVAL_MS, ManagePeersConfig, ManageUnknownBlocksConfig};
@ -186,13 +187,14 @@ enum VerificationTask {
pub trait Client : Send + 'static {
fn best_block(&self) -> db::BestBlock;
fn state(&self) -> State;
fn on_new_blocks_inventory(&mut self, peer_index: usize, peer_hashes: Vec<H256>);
fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
fn on_new_blocks_headers(&mut self, peer_index: usize, blocks_headers: Vec<BlockHeader>);
fn on_peer_blocks_notfound(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
fn on_peer_block(&mut self, peer_index: usize, block: Block);
fn on_peer_disconnected(&mut self, peer_index: usize);
fn get_peers_nearly_blocks_waiter(&mut self, peer_index: usize) -> (bool, Option<Arc<PeersBlocksWaiter>>);
fn on_block_verification_success(&mut self, block: Block);
fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256);
fn on_block_verification_error(&mut self, err: &str, hash: &H256);
}
/// Synchronization peer blocks waiter
@ -206,14 +208,18 @@ pub struct PeersBlocksWaiter {
/// Synchronization client configuration options.
pub struct Config {
/// Consensus-related parameters.
pub consensus_params: ConsensusParams,
/// Number of threads to allocate in synchronization CpuPool.
pub threads_num: usize,
/// Do not verify incoming blocks before inserting to db.
/// Do not verify incoming blocks before inserting to db.
pub skip_verification: bool,
}
/// Synchronization client.
pub struct SynchronizationClient<T: TaskExecutor> {
/// Synchronization configuration.
config: Config,
/// Synchronization state.
state: State,
/// Cpu pool.
@ -240,9 +246,10 @@ pub struct SynchronizationClient<T: TaskExecutor> {
verifying_blocks_waiters: HashMap<usize, (HashSet<H256>, Option<Arc<PeersBlocksWaiter>>)>,
}
impl Default for Config {
fn default() -> Self {
impl Config {
pub fn with_consensus_params(consensus_params: ConsensusParams) -> Self {
Config {
consensus_params: consensus_params,
threads_num: 4,
skip_verification: false,
}
@ -358,6 +365,25 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
self.execute_synchronization_tasks(None);
}
/// When peer has no blocks
fn on_peer_blocks_notfound(&mut self, peer_index: usize, blocks_hashes: Vec<H256>) {
if let Some(requested_blocks) = self.peers.get_blocks_tasks(peer_index) {
// check if peer has responded with notfound to requested blocks
let notfound_blocks: HashSet<H256> = blocks_hashes.into_iter().collect();
if requested_blocks.intersection(&notfound_blocks).nth(0).is_none() {
// if notfound some other blocks => just ignore the message
return;
}
// for now, let's exclude peer from synchronization - we are relying on full nodes for synchronization
let removed_tasks = self.peers.reset_blocks_tasks(peer_index);
self.peers.unuseful_peer(peer_index);
// if peer has had some blocks tasks, rerequest these blocks
self.execute_synchronization_tasks(Some(removed_tasks));
}
}
/// Process new block.
fn on_peer_block(&mut self, peer_index: usize, block: Block) {
let block_hash = block.hash();
@ -404,7 +430,7 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
fn on_block_verification_success(&mut self, block: Block) {
let hash = block.hash();
// insert block to the storage
{
match {
let mut chain = self.chain.write();
// remove block from verification queue
@ -413,19 +439,30 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
if chain.forget_with_state_leave_header(&hash, BlockState::Verifying) != HashPosition::Missing {
// block was in verification queue => insert to storage
chain.insert_best_block(hash.clone(), block)
.expect("Error inserting to db.");
} else {
Ok(())
}
} {
Ok(_) => {
// awake threads, waiting for this block insertion
self.awake_waiting_threads(&hash);
// continue with synchronization
self.execute_synchronization_tasks(None);
},
Err(db::Error::Consistency(e)) => {
// process as verification error
self.on_block_verification_error(&format!("{:?}", db::Error::Consistency(e)), &hash);
},
Err(e) => {
// process as irrecoverable failure
panic!("Block {:?} insertion failed with error {:?}", hash, e);
}
}
// awake threads, waiting for this block insertion
self.awake_waiting_threads(&hash);
// continue with synchronization
self.execute_synchronization_tasks(None);
}
/// Process failed block verification
fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256) {
fn on_block_verification_error(&mut self, err: &str, hash: &H256) {
warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash, err);
{
@ -447,6 +484,7 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
impl<T> SynchronizationClient<T> where T: TaskExecutor {
/// Create new synchronization window
pub fn new(config: Config, handle: &Handle, executor: Arc<Mutex<T>>, chain: ChainRef) -> Arc<Mutex<Self>> {
let skip_verification = config.skip_verification;
let sync = Arc::new(Mutex::new(
SynchronizationClient {
state: State::Saturated,
@ -461,19 +499,21 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
verification_worker_thread: None,
verifying_blocks_by_peer: HashMap::new(),
verifying_blocks_waiters: HashMap::new(),
config: config,
}
));
if !config.skip_verification {
if !skip_verification {
let (verification_work_sender, verification_work_receiver) = channel();
let csync = sync.clone();
let mut lsync = sync.lock();
let storage = chain.read().storage();
let verifier = ChainVerifier::new(storage);
lsync.verification_work_sender = Some(verification_work_sender);
lsync.verification_worker_thread = Some(thread::Builder::new()
.name("Sync verification thread".to_string())
.spawn(move || {
SynchronizationClient::verification_worker_proc(csync, storage, verification_work_receiver)
SynchronizationClient::verification_worker_proc(csync, verifier, verification_work_receiver)
})
.expect("Error creating verification thread"));
}
@ -525,6 +565,11 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
}
}
/// Get configuration parameters.
pub fn config<'a>(&'a self) -> &'a Config {
&self.config
}
/// Process new blocks inventory
fn process_new_blocks_headers(&mut self, peer_index: usize, mut hashes: Vec<H256>, mut headers: Vec<BlockHeader>) {
assert_eq!(hashes.len(), headers.len());
@ -884,17 +929,38 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
}
/// Thread procedure for handling verification tasks
fn verification_worker_proc(sync: Arc<Mutex<Self>>, storage: Arc<db::Store>, work_receiver: Receiver<VerificationTask>) {
let verifier = ChainVerifier::new(storage);
fn verification_worker_proc(sync: Arc<Mutex<Self>>, mut verifier: ChainVerifier, work_receiver: Receiver<VerificationTask>) {
let mut parameters_change_steps = Some(0);
while let Ok(task) = work_receiver.recv() {
match task {
VerificationTask::VerifyBlock(block) => {
// change verifier parameters, if needed
if let Some(steps_left) = parameters_change_steps {
if steps_left == 0 {
let sync = sync.lock();
let config = sync.config();
let best_storage_block = sync.chain.read().best_storage_block();
let is_bip65_active = best_storage_block.number >= config.consensus_params.bip65_height;
verifier = verifier.verify_clocktimeverify(is_bip65_active);
if is_bip65_active {
parameters_change_steps = None;
} else {
parameters_change_steps = Some(config.consensus_params.bip65_height - best_storage_block.number);
}
} else {
parameters_change_steps = Some(steps_left - 1);
}
}
// verify block
match verifier.verify(&block) {
Ok(_chain) => {
sync.lock().on_block_verification_success(block)
},
Err(err) => {
sync.lock().on_block_verification_error(&err, &block.hash())
Err(e) => {
sync.lock().on_block_verification_error(&format!("{:?}", e), &block.hash())
}
}
},
@ -927,6 +993,7 @@ pub mod tests {
use parking_lot::{Mutex, RwLock};
use tokio_core::reactor::{Core, Handle};
use chain::{Block, RepresentH256};
use message::common::{Magic, ConsensusParams};
use super::{Client, Config, SynchronizationClient};
use synchronization_executor::Task;
use synchronization_chain::{Chain, ChainRef};
@ -950,7 +1017,7 @@ pub mod tests {
};
let chain = ChainRef::new(RwLock::new(Chain::new(storage.clone())));
let executor = DummyTaskExecutor::new();
let config = Config { threads_num: 1, skip_verification: true };
let config = Config { consensus_params: ConsensusParams::with_magic(Magic::Mainnet), threads_num: 1, skip_verification: true };
let client = SynchronizationClient::new(config, &handle, executor.clone(), chain.clone());
(event_loop, handle, executor, chain, client)
@ -1392,4 +1459,61 @@ pub mod tests {
assert_eq!(tasks, vec![Task::RequestBlocks(2, vec![block1.hash()])]);
}
}
#[test]
fn sync_after_db_insert_nonfatal_fail() {
// TODO: implement me
}
#[test]
fn peer_removed_from_sync_after_responding_with_requested_block_notfound() {
let (_, _, executor, _, sync) = create_sync(None);
let mut sync = sync.lock();
let b1 = test_data::block_h1();
let b2 = test_data::block_h2();
sync.on_new_blocks_headers(1, vec![b1.block_header.clone(), b2.block_header.clone()]);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), Task::RequestBlocks(1, vec![b1.hash(), b2.hash()])]);
assert_eq!(sync.information().peers.idle, 0);
assert_eq!(sync.information().peers.unuseful, 0);
assert_eq!(sync.information().peers.active, 1);
sync.on_peer_blocks_notfound(1, vec![b1.hash()]);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1)]);
assert_eq!(sync.information().peers.idle, 0);
assert_eq!(sync.information().peers.unuseful, 1);
assert_eq!(sync.information().peers.active, 0);
}
#[test]
fn peer_not_removed_from_sync_after_responding_with_requested_block_notfound() {
let (_, _, executor, _, sync) = create_sync(None);
let mut sync = sync.lock();
let b1 = test_data::block_h1();
let b2 = test_data::block_h2();
sync.on_new_blocks_headers(1, vec![b1.block_header.clone(), b2.block_header.clone()]);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), Task::RequestBlocks(1, vec![b1.hash(), b2.hash()])]);
assert_eq!(sync.information().peers.idle, 0);
assert_eq!(sync.information().peers.unuseful, 0);
assert_eq!(sync.information().peers.active, 1);
sync.on_peer_blocks_notfound(1, vec![test_data::block_h170().hash()]);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![]);
assert_eq!(sync.information().peers.idle, 0);
assert_eq!(sync.information().peers.unuseful, 0);
assert_eq!(sync.information().peers.active, 1);
}
}

View File

@ -120,6 +120,11 @@ impl Peers {
.collect()
}
/// Get peer tasks
pub fn get_blocks_tasks(&self, peer_index: usize) -> Option<HashSet<H256>> {
self.blocks_requests.get(&peer_index).cloned()
}
/// Mark peer as useful.
pub fn useful_peer(&mut self, peer_index: usize) {
// if peer is unknown => insert to idle queue
@ -133,6 +138,20 @@ impl Peers {
}
}
/// Mark peer as unuseful.
pub fn unuseful_peer(&mut self, peer_index: usize) {
// if peer is unknown => insert to idle queue
// if peer is known && not useful => insert to idle queue
assert!(!self.blocks_requests.contains_key(&peer_index));
assert!(!self.blocks_requests_order.contains_key(&peer_index));
self.idle.remove(&peer_index);
self.unuseful.insert(peer_index);
self.failures.remove(&peer_index);
self.inventory_requests.remove(&peer_index);
self.inventory_requests_order.remove(&peer_index);
}
/// Peer has been disconnected
pub fn on_peer_disconnected(&mut self, peer_index: usize) -> Option<Vec<H256>> {
// forget this peer without any chances to reuse

View File

@ -352,6 +352,7 @@ impl<F> TransactionInputBuilder<F> where F: Invoke<chain::TransactionInput> {
pub fn coinbase(mut self) -> Self {
self.output = Some(chain::OutPoint { hash: H256::from(0), index: 0xffffffff });
self.signature = vec![0u8; 2].into();
self
}

View File

@ -10,16 +10,23 @@ use utils;
const BLOCK_MAX_FUTURE: i64 = 2 * 60 * 60; // 2 hours
const COINBASE_MATURITY: u32 = 100; // 2 hours
const MAX_BLOCK_SIGOPS: usize = 20000;
const MAX_BLOCK_SIZE: usize = 1000000;
pub struct ChainVerifier {
store: Arc<db::Store>,
verify_clocktimeverify: bool,
skip_pow: bool,
skip_sig: bool,
}
impl ChainVerifier {
pub fn new(store: Arc<db::Store>) -> Self {
ChainVerifier { store: store, skip_pow: false, skip_sig: false }
ChainVerifier {
store: store,
verify_clocktimeverify: false,
skip_pow: false,
skip_sig: false
}
}
#[cfg(test)]
@ -34,6 +41,11 @@ impl ChainVerifier {
self
}
pub fn verify_clocktimeverify(mut self, verify: bool) -> Self {
self.verify_clocktimeverify = verify;
self
}
fn ordered_verify(&self, block: &chain::Block, at_height: u32) -> Result<(), Error> {
let coinbase_spends = block.transactions()[0].total_spends();
@ -128,7 +140,9 @@ impl ChainVerifier {
let input: Script = input.script_sig().to_vec().into();
let output: Script = paired_output.script_pubkey.to_vec().into();
let flags = VerificationFlags::default().verify_p2sh(true);
let flags = VerificationFlags::default()
.verify_p2sh(true)
.verify_clocktimeverify(self.verify_clocktimeverify);
// for tests only, skips as late as possible
if self.skip_sig { continue; }
@ -163,6 +177,12 @@ impl Verify for ChainVerifier {
return Err(Error::Timestamp);
}
// todo: serialized_size function is at least suboptimal
let size = ::serialization::Serializable::serialized_size(block);
if size > MAX_BLOCK_SIZE {
return Err(Error::Size(size))
}
// verify merkle root
if block.merkle_root() != block.header().merkle_root_hash {
return Err(Error::MerkleRoot);
@ -173,6 +193,14 @@ impl Verify for ChainVerifier {
return Err(Error::Coinbase)
}
// check that coinbase has a valid signature
let coinbase = &block.transactions()[0];
// is_coinbase() = true above guarantees that there is at least one input
let coinbase_script_len = coinbase.inputs[0].script_sig().len();
if coinbase_script_len < 2 || coinbase_script_len > 100 {
return Err(Error::CoinbaseSignatureLength(coinbase_script_len));
}
// verify transactions (except coinbase)
let mut block_sigops = try!(
utils::transaction_sigops(&block.transactions()[0])

View File

@ -50,6 +50,10 @@ pub enum Error {
/// Maximum sigops operations exceeded - will not provide how much it was in total
/// since it stops counting once `MAX_BLOCK_SIGOPS` is reached
MaximumSigops,
/// Coinbase signature is not in the range 2-100
CoinbaseSignatureLength(usize),
/// Block size is invalid
Size(usize),
}
#[derive(Debug, PartialEq)]

View File

@ -60,14 +60,14 @@ pub fn transaction_sigops(transaction: &chain::Transaction) -> Result<usize, scr
for output in transaction.outputs.iter() {
let output_script: Script = output.script_pubkey.to_vec().into();
// todo: not always allow malformed output?
result += output_script.sigop_count().unwrap_or(0);
result += output_script.sigop_count(false).unwrap_or(0);
}
if transaction.is_coinbase() { return Ok(result); }
for input in transaction.inputs.iter() {
let input_script: Script = input.script_sig().to_vec().into();
result += try!(input_script.sigop_count());
result += try!(input_script.sigop_count(false));
}
Ok(result)