diff --git a/cmd/geth/admin.go b/cmd/geth/admin.go index 49e2dc6f8..2b9956638 100644 --- a/cmd/geth/admin.go +++ b/cmd/geth/admin.go @@ -70,6 +70,7 @@ func (js *jsre) adminBindings() { miner.Set("stop", js.stopMining) miner.Set("hashrate", js.hashrate) miner.Set("setExtra", js.setExtra) + miner.Set("setGasPrice", js.setGasPrice) admin.Set("debug", struct{}{}) t, _ = admin.Get("debug") @@ -236,6 +237,17 @@ func (js *jsre) setExtra(call otto.FunctionCall) otto.Value { return otto.UndefinedValue() } +func (js *jsre) setGasPrice(call otto.FunctionCall) otto.Value { + gasPrice, err := call.Argument(0).ToString() + if err != nil { + fmt.Println(err) + return otto.UndefinedValue() + } + + js.ethereum.Miner().SetGasPrice(common.String2Big(gasPrice)) + return otto.UndefinedValue() +} + func (js *jsre) hashrate(otto.FunctionCall) otto.Value { return js.re.ToVal(js.ethereum.Miner().HashRate()) } diff --git a/cmd/geth/main.go b/cmd/geth/main.go index fd6925e6d..fd7aae4c2 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -51,7 +51,7 @@ import _ "net/http/pprof" const ( ClientIdentifier = "Geth" - Version = "0.9.17" + Version = "0.9.19" ) var ( @@ -244,6 +244,7 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso utils.MaxPeersFlag, utils.MaxPendingPeersFlag, utils.EtherbaseFlag, + utils.GasPriceFlag, utils.MinerThreadsFlag, utils.MiningEnabledFlag, utils.NATFlag, @@ -258,7 +259,7 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso utils.ProtocolVersionFlag, utils.NetworkIdFlag, utils.RPCCORSDomainFlag, - utils.LogLevelFlag, + utils.VerbosityFlag, utils.BacktraceAtFlag, utils.LogToStdErrFlag, utils.LogVModuleFlag, diff --git a/cmd/mist/main.go b/cmd/mist/main.go index 9d92cc175..4b55b3026 100644 --- a/cmd/mist/main.go +++ b/cmd/mist/main.go @@ -37,7 +37,7 @@ import ( const ( ClientIdentifier = "Mist" - Version = "0.9.0" + Version = "0.9.19" ) var ( @@ -73,7 +73,7 @@ func init() { utils.DataDirFlag, utils.ListenPortFlag, utils.LogFileFlag, - utils.LogLevelFlag, + utils.VerbosityFlag, utils.MaxPeersFlag, utils.MaxPendingPeersFlag, utils.MinerThreadsFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index b18d9851f..dd3b6c8a2 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -4,6 +4,7 @@ import ( "crypto/ecdsa" "fmt" "log" + "math/big" "net/http" "os" "path" @@ -116,6 +117,11 @@ var ( Usage: "Public address for block mining rewards. By default the address of your primary account is used", Value: "primary", } + GasPriceFlag = cli.StringFlag{ + Name: "gasprice", + Usage: "Sets the minimal gasprice when mining transactions", + Value: new(big.Int).Mul(big.NewInt(10), common.Szabo).String(), + } UnlockedAccountFlag = cli.StringFlag{ Name: "unlock", @@ -133,8 +139,8 @@ var ( Name: "logfile", Usage: "Send log output to a file", } - LogLevelFlag = cli.IntFlag{ - Name: "loglevel", + VerbosityFlag = cli.IntFlag{ + Name: "verbosity", Usage: "Logging verbosity: 0-6 (0=silent, 1=error, 2=warn, 3=info, 4=core, 5=debug, 6=debug detail)", Value: int(logger.InfoLevel), } @@ -270,7 +276,7 @@ func GetNodeKey(ctx *cli.Context) (key *ecdsa.PrivateKey) { func MakeEthConfig(clientID, version string, ctx *cli.Context) *eth.Config { // Set verbosity on glog - glog.SetV(ctx.GlobalInt(LogLevelFlag.Name)) + glog.SetV(ctx.GlobalInt(VerbosityFlag.Name)) // Set the log type //glog.SetToStderr(ctx.GlobalBool(LogToStdErrFlag.Name)) glog.SetToStderr(true) @@ -290,7 +296,7 @@ func MakeEthConfig(clientID, version string, ctx *cli.Context) *eth.Config { SkipBcVersionCheck: false, NetworkId: ctx.GlobalInt(NetworkIdFlag.Name), LogFile: ctx.GlobalString(LogFileFlag.Name), - LogLevel: ctx.GlobalInt(LogLevelFlag.Name), + Verbosity: ctx.GlobalInt(VerbosityFlag.Name), LogJSON: ctx.GlobalString(LogJSONFlag.Name), Etherbase: ctx.GlobalString(EtherbaseFlag.Name), MinerThreads: ctx.GlobalInt(MinerThreadsFlag.Name), @@ -305,6 +311,7 @@ func MakeEthConfig(clientID, version string, ctx *cli.Context) *eth.Config { Shh: ctx.GlobalBool(WhisperEnabledFlag.Name), Dial: true, BootNodes: ctx.GlobalString(BootnodesFlag.Name), + GasPrice: common.String2Big(ctx.GlobalString(GasPriceFlag.Name)), } } diff --git a/common/size.go b/common/size.go index 0d9dbf558..4ea7f7b11 100644 --- a/common/size.go +++ b/common/size.go @@ -44,12 +44,6 @@ func CurrencyToString(num *big.Int) string { ) switch { - case num.Cmp(Douglas) >= 0: - fin = new(big.Int).Div(num, Douglas) - denom = "Douglas" - case num.Cmp(Einstein) >= 0: - fin = new(big.Int).Div(num, Einstein) - denom = "Einstein" case num.Cmp(Ether) >= 0: fin = new(big.Int).Div(num, Ether) denom = "Ether" diff --git a/common/size_test.go b/common/size_test.go index 1cbeff0a8..cfe7efe31 100644 --- a/common/size_test.go +++ b/common/size_test.go @@ -25,8 +25,6 @@ func (s *SizeSuite) TestStorageSizeString(c *checker.C) { } func (s *CommonSuite) TestCommon(c *checker.C) { - douglas := CurrencyToString(BigPow(10, 43)) - einstein := CurrencyToString(BigPow(10, 22)) ether := CurrencyToString(BigPow(10, 19)) finney := CurrencyToString(BigPow(10, 16)) szabo := CurrencyToString(BigPow(10, 13)) @@ -35,8 +33,6 @@ func (s *CommonSuite) TestCommon(c *checker.C) { ada := CurrencyToString(BigPow(10, 4)) wei := CurrencyToString(big.NewInt(10)) - c.Assert(douglas, checker.Equals, "10 Douglas") - c.Assert(einstein, checker.Equals, "10 Einstein") c.Assert(ether, checker.Equals, "10 Ether") c.Assert(finney, checker.Equals, "10 Finney") c.Assert(szabo, checker.Equals, "10 Szabo") @@ -45,13 +41,3 @@ func (s *CommonSuite) TestCommon(c *checker.C) { c.Assert(ada, checker.Equals, "10 Ada") c.Assert(wei, checker.Equals, "10 Wei") } - -func (s *CommonSuite) TestLarge(c *checker.C) { - douglaslarge := CurrencyToString(BigPow(100000000, 43)) - adalarge := CurrencyToString(BigPow(100000000, 4)) - weilarge := CurrencyToString(big.NewInt(100000000)) - - c.Assert(douglaslarge, checker.Equals, "10000E298 Douglas") - c.Assert(adalarge, checker.Equals, "10000E7 Einstein") - c.Assert(weilarge, checker.Equals, "100 Babbage") -} diff --git a/core/events.go b/core/events.go index 3da668af5..1ea35c2f4 100644 --- a/core/events.go +++ b/core/events.go @@ -1,8 +1,10 @@ package core import ( - "github.com/ethereum/go-ethereum/core/types" + "math/big" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" ) // TxPreEvent is posted when a transaction enters the transaction pool. @@ -44,6 +46,8 @@ type ChainUncleEvent struct { type ChainHeadEvent struct{ Block *types.Block } +type GasPriceChanged struct{ Price *big.Int } + // Mining operation events type StartMining struct{} type TopMining struct{} diff --git a/core/manager.go b/core/manager.go index 9b5407a9e..433ada7ee 100644 --- a/core/manager.go +++ b/core/manager.go @@ -1,12 +1,14 @@ package core import ( + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" ) type Backend interface { + AccountManager() *accounts.Manager BlockProcessor() *BlockProcessor ChainManager() *ChainManager TxPool() *TxPool diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 6898a4bda..e68f7406a 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -21,7 +21,7 @@ var ( ErrInvalidSender = errors.New("Invalid sender") ErrNonce = errors.New("Nonce too low") ErrBalance = errors.New("Insufficient balance") - ErrNonExistentAccount = errors.New("Account does not exist") + ErrNonExistentAccount = errors.New("Account does not exist or account balance too low") ErrInsufficientFunds = errors.New("Insufficient funds for gas * price + value") ErrIntrinsicGas = errors.New("Intrinsic gas too low") ErrGasLimit = errors.New("Exceeds block gas limit") diff --git a/eth/backend.go b/eth/backend.go index 0f23cde2f..cdbe35b26 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math/big" "os" "path" "path/filepath" @@ -53,12 +54,12 @@ type Config struct { BlockChainVersion int SkipBcVersionCheck bool // e.g. blockchain export - DataDir string - LogFile string - LogLevel int - LogJSON string - VmDebug bool - NatSpec bool + DataDir string + LogFile string + Verbosity int + LogJSON string + VmDebug bool + NatSpec bool MaxPeers int MaxPendingPeers int @@ -76,6 +77,7 @@ type Config struct { Dial bool Etherbase string + GasPrice *big.Int MinerThreads int AccountManager *accounts.Manager @@ -200,7 +202,7 @@ type Ethereum struct { func New(config *Config) (*Ethereum, error) { // Bootstrap database - logger.New(config.DataDir, config.LogFile, config.LogLevel) + logger.New(config.DataDir, config.LogFile, config.Verbosity) if len(config.LogJSON) > 0 { logger.NewJSONsystem(config.DataDir, config.LogJSON) } @@ -266,6 +268,8 @@ func New(config *Config) (*Ethereum, error) { eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux()) eth.chainManager.SetProcessor(eth.blockProcessor) eth.miner = miner.New(eth, eth.pow, config.MinerThreads) + eth.miner.SetGasPrice(config.GasPrice) + eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager, eth.downloader) if config.Shh { eth.whisper = whisper.New() @@ -447,6 +451,8 @@ func (s *Ethereum) Start() error { return nil } +// sync databases every minute. If flushing fails we exit immediatly. The system +// may not continue under any circumstances. func (s *Ethereum) syncDatabases() { ticker := time.NewTicker(1 * time.Minute) done: @@ -455,13 +461,13 @@ done: case <-ticker.C: // don't change the order of database flushes if err := s.extraDb.Flush(); err != nil { - glog.V(logger.Error).Infof("error: flush extraDb: %v\n", err) + glog.Fatalf("fatal error: flush extraDb: %v\n", err) } if err := s.stateDb.Flush(); err != nil { - glog.V(logger.Error).Infof("error: flush stateDb: %v\n", err) + glog.Fatalf("fatal error: flush stateDb: %v\n", err) } if err := s.blockDb.Flush(); err != nil { - glog.V(logger.Error).Infof("error: flush blockDb: %v\n", err) + glog.Fatalf("fatal error: flush blockDb: %v\n", err) } case <-s.shutdownChan: break done diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 18f8d2ba8..14ca2cd3d 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -34,6 +34,9 @@ var ( errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") errAlreadyInPool = errors.New("hash already in pool") errBlockNumberOverflow = errors.New("received block which overflows") + errCancelHashFetch = errors.New("hash fetching cancelled (requested)") + errCancelBlockFetch = errors.New("block downloading cancelled (requested)") + errNoSyncActive = errors.New("no sync active") ) type hashCheckFn func(common.Hash) bool @@ -74,6 +77,7 @@ type Downloader struct { newPeerCh chan *peer hashCh chan hashPack blockCh chan blockPack + cancelCh chan struct{} } func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { @@ -129,6 +133,9 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { } defer atomic.StoreInt32(&d.synchronising, 0) + // Create cancel channel for aborting midflight + d.cancelCh = make(chan struct{}) + // Abort if the queue still contains some leftover data if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { return errPendingQueue @@ -161,7 +168,6 @@ func (d *Downloader) Has(hash common.Hash) bool { } func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) (err error) { - d.activePeer = p.id defer func() { // reset on error @@ -191,6 +197,42 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) return nil } +// Cancel cancels all of the operations and resets the queue. It returns true +// if the cancel operation was completed. +func (d *Downloader) Cancel() bool { + hs, bs := d.queue.Size() + // If we're not syncing just return. + if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 { + return false + } + + close(d.cancelCh) + + // clean up +hashDone: + for { + select { + case <-d.hashCh: + default: + break hashDone + } + } + +blockDone: + for { + select { + case <-d.blockCh: + default: + break blockDone + } + } + + // reset the queue + d.queue.Reset() + + return true +} + // XXX Make synchronous func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error { glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id) @@ -217,6 +259,8 @@ func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial b out: for { select { + case <-d.cancelCh: + return errCancelHashFetch case hashPack := <-d.hashCh: // Make sure the active peer is giving us the hashes if hashPack.peerId != activePeer.id { @@ -305,6 +349,8 @@ func (d *Downloader) startFetchingBlocks(p *peer) error { out: for { select { + case <-d.cancelCh: + return errCancelBlockFetch case blockPack := <-d.blockCh: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. @@ -394,11 +440,23 @@ out: // Deliver a chunk to the downloader. This is usually done through the BlocksMsg by // the protocol handler. -func (d *Downloader) DeliverChunk(id string, blocks []*types.Block) { +func (d *Downloader) DeliverChunk(id string, blocks []*types.Block) error { + // Make sure the downloader is active + if atomic.LoadInt32(&d.synchronising) == 0 { + return errNoSyncActive + } + d.blockCh <- blockPack{id, blocks} + + return nil } func (d *Downloader) AddHashes(id string, hashes []common.Hash) error { + // Make sure the downloader is active + if atomic.LoadInt32(&d.synchronising) == 0 { + return errNoSyncActive + } + // make sure that the hashes that are being added are actually from the peer // that's the current active peer. hashes that have been received from other // peers are dropped and ignored. diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 8ccc4d1a5..d0f8d4c8f 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -182,6 +182,49 @@ func TestTaking(t *testing.T) { } } +func TestInactiveDownloader(t *testing.T) { + targetBlocks := 1000 + hashes := createHashes(0, targetBlocks) + blocks := createBlocksFromHashSet(createHashSet(hashes)) + tester := newTester(t, hashes, nil) + + err := tester.downloader.AddHashes("bad peer 001", hashes) + if err != errNoSyncActive { + t.Error("expected no sync error, got", err) + } + + err = tester.downloader.DeliverChunk("bad peer 001", blocks) + if err != errNoSyncActive { + t.Error("expected no sync error, got", err) + } +} + +func TestCancel(t *testing.T) { + minDesiredPeerCount = 4 + blockTtl = 1 * time.Second + + targetBlocks := 1000 + hashes := createHashes(0, targetBlocks) + blocks := createBlocksFromHashes(hashes) + tester := newTester(t, hashes, blocks) + + tester.newPeer("peer1", big.NewInt(10000), hashes[0]) + + err := tester.sync("peer1", hashes[0]) + if err != nil { + t.Error("download error", err) + } + + if !tester.downloader.Cancel() { + t.Error("cancel operation unsuccessfull") + } + + hashSize, blockSize := tester.downloader.queue.Size() + if hashSize > 0 || blockSize > 0 { + t.Error("block (", blockSize, ") or hash (", hashSize, ") not 0") + } +} + func TestThrottling(t *testing.T) { minDesiredPeerCount = 4 blockTtl = 1 * time.Second diff --git a/eth/sync.go b/eth/sync.go index c49f5209d..d955eaa50 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -63,6 +63,9 @@ func (pm *ProtocolManager) processBlocks() error { max := int(math.Min(float64(len(blocks)), float64(blockProcAmount))) _, err := pm.chainman.InsertChain(blocks[:max]) if err != nil { + // cancel download process + pm.downloader.Cancel() + return err } blocks = blocks[max:] diff --git a/ethdb/database.go b/ethdb/database.go index 57a3f9ee6..15af02fdf 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -8,8 +8,11 @@ import ( "github.com/ethereum/go-ethereum/logger/glog" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/opt" ) +const openFileLimit = 128 + type LDBDatabase struct { fn string @@ -23,7 +26,7 @@ type LDBDatabase struct { func NewLDBDatabase(file string) (*LDBDatabase, error) { // Open the db - db, err := leveldb.OpenFile(file, nil) + db, err := leveldb.OpenFile(file, &opt.Options{OpenFilesCacheCapacity: openFileLimit}) if err != nil { return nil, err } diff --git a/miner/miner.go b/miner/miner.go index bff0026dc..efe6d3051 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -7,6 +7,8 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/pow" ) @@ -37,7 +39,18 @@ func (self *Miner) Mining() bool { return self.mining } +func (m *Miner) SetGasPrice(price *big.Int) { + // FIXME block tests set a nil gas price. Quick dirty fix + if price == nil { + return + } + + m.worker.gasPrice = price +} + func (self *Miner) Start(coinbase common.Address) { + glog.V(logger.Info).Infoln("Starting mining operation") + self.mining = true self.worker.coinbase = coinbase self.worker.start() diff --git a/miner/worker.go b/miner/worker.go index 87d17dfd6..e3dbae717 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -7,6 +7,7 @@ import ( "sync" "sync/atomic" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" @@ -21,12 +22,18 @@ import ( var jsonlogger = logger.NewJsonLogger() type environment struct { - totalUsedGas *big.Int - state *state.StateDB - coinbase *state.StateObject - block *types.Block - family *set.Set - uncles *set.Set + totalUsedGas *big.Int + state *state.StateDB + coinbase *state.StateObject + block *types.Block + family *set.Set + uncles *set.Set + remove *set.Set + tcount int + ignoredTransactors *set.Set + lowGasTransactors *set.Set + ownedAccounts *set.Set + lowGasTxs types.Transactions } func env(block *types.Block, eth core.Backend) *environment { @@ -72,6 +79,7 @@ type worker struct { proc *core.BlockProcessor coinbase common.Address + gasPrice *big.Int extra []byte currentMu sync.Mutex @@ -93,6 +101,7 @@ func newWorker(coinbase common.Address, eth core.Backend) *worker { eth: eth, mux: eth.EventMux(), recv: make(chan *types.Block), + gasPrice: new(big.Int), chain: eth.ChainManager(), proc: eth.BlockProcessor(), possibleUncles: make(map[common.Hash]*types.Block), @@ -123,15 +132,22 @@ func (self *worker) pendingBlock() *types.Block { } func (self *worker) start() { + self.mu.Lock() + defer self.mu.Unlock() + + atomic.StoreInt32(&self.mining, 1) + // spin up agents for _, agent := range self.agents { agent.Start() } - atomic.StoreInt32(&self.mining, 1) } func (self *worker) stop() { + self.mu.Lock() + defer self.mu.Unlock() + if atomic.LoadInt32(&self.mining) == 1 { // stop all agents for _, agent := range self.agents { @@ -144,6 +160,9 @@ func (self *worker) stop() { } func (self *worker) register(agent Agent) { + self.mu.Lock() + defer self.mu.Unlock() + self.agents = append(self.agents, agent) agent.SetReturnCh(self.recv) } @@ -163,8 +182,11 @@ out: self.possibleUncles[ev.Block.Hash()] = ev.Block self.uncleMu.Unlock() case core.TxPreEvent: + // Apply transaction to the pending state if we're not mining if atomic.LoadInt32(&self.mining) == 0 { - self.commitNewWork() + self.mu.Lock() + self.commitTransactions(types.Transactions{ev.Tx}) + self.mu.Unlock() } } case <-self.quit: @@ -230,13 +252,33 @@ func (self *worker) makeCurrent() { } block.Header().Extra = self.extra - self.current = env(block, self.eth) + current := env(block, self.eth) for _, ancestor := range self.chain.GetAncestors(block, 7) { - self.current.family.Add(ancestor.Hash()) + current.family.Add(ancestor.Hash()) } + accounts, _ := self.eth.AccountManager().Accounts() + // Keep track of transactions which return errors so they can be removed + current.remove = set.New() + current.tcount = 0 + current.ignoredTransactors = set.New() + current.lowGasTransactors = set.New() + current.ownedAccounts = accountAddressesSet(accounts) - parent := self.chain.GetBlock(self.current.block.ParentHash()) - self.current.coinbase.SetGasPool(core.CalcGasLimit(parent)) + parent := self.chain.GetBlock(current.block.ParentHash()) + current.coinbase.SetGasPool(core.CalcGasLimit(parent)) + + self.current = current +} + +func (w *worker) setGasPrice(p *big.Int) { + w.mu.Lock() + defer w.mu.Unlock() + + // calculate the minimal gas price the miner accepts when sorting out transactions. + const pct = int64(90) + w.gasPrice = gasprice(p, pct) + + w.mux.Post(core.GasPriceChanged{w.gasPrice}) } func (self *worker) commitNewWork() { @@ -248,54 +290,14 @@ func (self *worker) commitNewWork() { defer self.currentMu.Unlock() self.makeCurrent() + current := self.current transactions := self.eth.TxPool().GetTransactions() sort.Sort(types.TxByNonce{transactions}) - // Keep track of transactions which return errors so they can be removed - var ( - remove = set.New() - tcount = 0 - ignoredTransactors = set.New() - ) - - for _, tx := range transactions { - // We can skip err. It has already been validated in the tx pool - from, _ := tx.From() - // Move on to the next transaction when the transactor is in ignored transactions set - // This may occur when a transaction hits the gas limit. When a gas limit is hit and - // the transaction is processed (that could potentially be included in the block) it - // will throw a nonce error because the previous transaction hasn't been processed. - // Therefor we need to ignore any transaction after the ignored one. - if ignoredTransactors.Has(from) { - continue - } - - self.current.state.StartRecord(tx.Hash(), common.Hash{}, 0) - - err := self.commitTransaction(tx) - switch { - case core.IsNonceErr(err) || core.IsInvalidTxErr(err): - // Remove invalid transactions - from, _ := tx.From() - - self.chain.TxState().RemoveNonce(from, tx.Nonce()) - remove.Add(tx.Hash()) - - if glog.V(logger.Detail) { - glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err) - } - case state.IsGasLimitErr(err): - from, _ := tx.From() - // ignore the transactor so no nonce errors will be thrown for this account - // next time the worker is run, they'll be picked up again. - ignoredTransactors.Add(from) - - glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4]) - default: - tcount++ - } - } + // commit transactions for this run + self.commitTransactions(transactions) + self.eth.TxPool().RemoveTransactions(current.lowGasTxs) var ( uncles []*types.Header @@ -321,7 +323,7 @@ func (self *worker) commitNewWork() { // We only care about logging if we're actually mining if atomic.LoadInt32(&self.mining) == 1 { - glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles\n", self.current.block.Number(), tcount, len(uncles)) + glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles\n", current.block.Number(), current.tcount, len(uncles)) } for _, hash := range badUncles { @@ -361,6 +363,71 @@ func (self *worker) commitUncle(uncle *types.Header) error { return nil } +func (self *worker) commitTransactions(transactions types.Transactions) { + current := self.current + + for _, tx := range transactions { + // We can skip err. It has already been validated in the tx pool + from, _ := tx.From() + + // check if it falls within margin + if tx.GasPrice().Cmp(self.gasPrice) < 0 { + // ignore the transaction and transactor. We ignore the transactor + // because nonce will fail after ignoring this transaction so there's + // no point + current.lowGasTransactors.Add(from) + + glog.V(logger.Info).Infof("transaction(%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(self.gasPrice), from[:4]) + } + + // Continue with the next transaction if the transaction sender is included in + // the low gas tx set. This will also remove the tx and all sequential transaction + // from this transactor + if current.lowGasTransactors.Has(from) { + // add tx to the low gas set. This will be removed at the end of the run + // owned accounts are ignored + if !current.ownedAccounts.Has(from) { + current.lowGasTxs = append(current.lowGasTxs, tx) + } + continue + } + + // Move on to the next transaction when the transactor is in ignored transactions set + // This may occur when a transaction hits the gas limit. When a gas limit is hit and + // the transaction is processed (that could potentially be included in the block) it + // will throw a nonce error because the previous transaction hasn't been processed. + // Therefor we need to ignore any transaction after the ignored one. + if current.ignoredTransactors.Has(from) { + continue + } + + self.current.state.StartRecord(tx.Hash(), common.Hash{}, 0) + + err := self.commitTransaction(tx) + switch { + case core.IsNonceErr(err) || core.IsInvalidTxErr(err): + // Remove invalid transactions + from, _ := tx.From() + + self.chain.TxState().RemoveNonce(from, tx.Nonce()) + current.remove.Add(tx.Hash()) + + if glog.V(logger.Detail) { + glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err) + } + case state.IsGasLimitErr(err): + from, _ := tx.From() + // ignore the transactor so no nonce errors will be thrown for this account + // next time the worker is run, they'll be picked up again. + current.ignoredTransactors.Add(from) + + glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4]) + default: + current.tcount++ + } + } +} + func (self *worker) commitTransaction(tx *types.Transaction) error { snap := self.current.state.Copy() receipt, _, err := self.proc.ApplyTransaction(self.current.coinbase, self.current.state, self.current.block, tx, self.current.totalUsedGas, true) @@ -383,3 +450,20 @@ func (self *worker) HashRate() int64 { return tot } + +// gasprice calculates a reduced gas price based on the pct +// XXX Use big.Rat? +func gasprice(price *big.Int, pct int64) *big.Int { + p := new(big.Int).Set(price) + p.Div(p, big.NewInt(100)) + p.Mul(p, big.NewInt(pct)) + return p +} + +func accountAddressesSet(accounts []accounts.Account) *set.Set { + accountSet := set.New() + for _, account := range accounts { + accountSet.Add(common.BytesToAddress(account.Address)) + } + return accountSet +} diff --git a/rpc/api.go b/rpc/api.go index 7fab589f2..309c161ad 100644 --- a/rpc/api.go +++ b/rpc/api.go @@ -450,10 +450,18 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err *reply = newHexData(res) case "shh_version": + // Short circuit if whisper is not running + if api.xeth().Whisper() == nil { + return NewNotAvailableError(req.Method, "whisper offline") + } // Retrieves the currently running whisper protocol version *reply = api.xeth().WhisperVersion() case "shh_post": + // Short circuit if whisper is not running + if api.xeth().Whisper() == nil { + return NewNotAvailableError(req.Method, "whisper offline") + } // Injects a new message into the whisper network args := new(WhisperMessageArgs) if err := json.Unmarshal(req.Params, &args); err != nil { @@ -466,10 +474,18 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err *reply = true case "shh_newIdentity": + // Short circuit if whisper is not running + if api.xeth().Whisper() == nil { + return NewNotAvailableError(req.Method, "whisper offline") + } // Creates a new whisper identity to use for sending/receiving messages *reply = api.xeth().Whisper().NewIdentity() case "shh_hasIdentity": + // Short circuit if whisper is not running + if api.xeth().Whisper() == nil { + return NewNotAvailableError(req.Method, "whisper offline") + } // Checks if an identity if owned or not args := new(WhisperIdentityArgs) if err := json.Unmarshal(req.Params, &args); err != nil { @@ -478,6 +494,10 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err *reply = api.xeth().Whisper().HasIdentity(args.Identity) case "shh_newFilter": + // Short circuit if whisper is not running + if api.xeth().Whisper() == nil { + return NewNotAvailableError(req.Method, "whisper offline") + } // Create a new filter to watch and match messages with args := new(WhisperFilterArgs) if err := json.Unmarshal(req.Params, &args); err != nil { @@ -487,6 +507,10 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err *reply = newHexNum(big.NewInt(int64(id)).Bytes()) case "shh_uninstallFilter": + // Short circuit if whisper is not running + if api.xeth().Whisper() == nil { + return NewNotAvailableError(req.Method, "whisper offline") + } // Remove an existing filter watching messages args := new(FilterIdArgs) if err := json.Unmarshal(req.Params, &args); err != nil { @@ -495,6 +519,10 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err *reply = api.xeth().UninstallWhisperFilter(args.Id) case "shh_getFilterChanges": + // Short circuit if whisper is not running + if api.xeth().Whisper() == nil { + return NewNotAvailableError(req.Method, "whisper offline") + } // Retrieve all the new messages arrived since the last request args := new(FilterIdArgs) if err := json.Unmarshal(req.Params, &args); err != nil { @@ -503,12 +531,17 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err *reply = api.xeth().WhisperMessagesChanged(args.Id) case "shh_getMessages": + // Short circuit if whisper is not running + if api.xeth().Whisper() == nil { + return NewNotAvailableError(req.Method, "whisper offline") + } // Retrieve all the cached messages matching a specific, existing filter args := new(FilterIdArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err } *reply = api.xeth().WhisperMessages(args.Id) + case "eth_hashrate": *reply = newHexNum(api.xeth().HashRate()) diff --git a/rpc/http.go b/rpc/http.go index 4760601d8..c5bb10c80 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -116,7 +116,7 @@ func RpcResponse(api *EthereumApi, request *RpcRequest) *interface{} { switch reserr.(type) { case nil: response = &RpcSuccessResponse{Jsonrpc: jsonrpcver, Id: request.Id, Result: reply} - case *NotImplementedError: + case *NotImplementedError, *NotAvailableError: jsonerr := &RpcErrorObject{-32601, reserr.Error()} response = &RpcErrorResponse{Jsonrpc: jsonrpcver, Id: request.Id, Error: jsonerr} case *DecodeParamError, *InsufficientParamsError, *ValidationError, *InvalidTypeError: diff --git a/rpc/jeth.go b/rpc/jeth.go index ad52b72d7..2097ac30d 100644 --- a/rpc/jeth.go +++ b/rpc/jeth.go @@ -2,6 +2,7 @@ package rpc import ( "encoding/json" + "fmt" "github.com/ethereum/go-ethereum/jsre" "github.com/robertkrimen/otto" ) @@ -50,6 +51,7 @@ func (self *Jeth) Send(call otto.FunctionCall) (response otto.Value) { var respif interface{} err = self.ethApi.GetRequestReply(&req, &respif) if err != nil { + fmt.Println("Error response:", err) return self.err(call, -32603, err.Error(), req.Id) } call.Otto.Set("ret_jsonrpc", jsonrpcver) diff --git a/rpc/types.go b/rpc/types.go index 1784759a4..e6eb4f856 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -209,6 +209,22 @@ func NewNotImplementedError(method string) *NotImplementedError { } } +type NotAvailableError struct { + Method string + Reason string +} + +func (e *NotAvailableError) Error() string { + return fmt.Sprintf("%s method not available: %s", e.Method, e.Reason) +} + +func NewNotAvailableError(method string, reason string) *NotAvailableError { + return &NotAvailableError{ + Method: method, + Reason: reason, + } +} + type DecodeParamError struct { err string } diff --git a/tests/block_test.go b/tests/block_test.go index 79e3335b1..e72f2b548 100644 --- a/tests/block_test.go +++ b/tests/block_test.go @@ -103,7 +103,7 @@ func testEthConfig() *eth.Config { return ð.Config{ DataDir: common.DefaultDataDir(), - LogLevel: 5, + Verbosity: 5, Etherbase: "primary", AccountManager: accounts.NewManager(ks), NewDB: func(path string) (common.Database, error) { return ethdb.NewMemDatabase() }, diff --git a/xeth/xeth.go b/xeth/xeth.go index dc2d4f06f..06cd9dc1b 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -79,7 +79,6 @@ func New(eth *eth.Ethereum, frontend Frontend) *XEth { xeth := &XEth{ backend: eth, frontend: frontend, - whisper: NewWhisper(eth.Whisper()), quit: make(chan struct{}), filterManager: filter.NewFilterManager(eth.EventMux()), logQueue: make(map[int]*logQueue), @@ -88,6 +87,9 @@ func New(eth *eth.Ethereum, frontend Frontend) *XEth { messages: make(map[int]*whisperFilter), agent: miner.NewRemoteAgent(), } + if eth.Whisper() != nil { + xeth.whisper = NewWhisper(eth.Whisper()) + } eth.Miner().Register(xeth.agent) if frontend == nil { xeth.frontend = dummyFrontend{}