WIP Observing pattern

This commit is contained in:
obscuren 2014-02-25 11:22:27 +01:00
parent 4b8c50e2cd
commit e98b53bbef
1 changed files with 33 additions and 8 deletions

View File

@ -17,6 +17,17 @@ const (
) )
type TxPoolHook chan *Transaction type TxPoolHook chan *Transaction
type TxMsgTy byte
const (
TxPre = iota
TxPost
)
type TxMsg struct {
Tx *Transaction
Type TxMsgTy
}
func FindTx(pool *list.List, finder func(*Transaction, *list.Element) bool) *Transaction { func FindTx(pool *list.List, finder func(*Transaction, *list.Element) bool) *Transaction {
for e := pool.Front(); e != nil; e = e.Next() { for e := pool.Front(); e != nil; e = e.Next() {
@ -59,6 +70,8 @@ type TxPool struct {
BlockManager *BlockManager BlockManager *BlockManager
SecondaryProcessor TxProcessor SecondaryProcessor TxProcessor
subscribers []chan TxMsg
} }
func NewTxPool() *TxPool { func NewTxPool() *TxPool {
@ -73,21 +86,17 @@ func NewTxPool() *TxPool {
// Blocking function. Don't use directly. Use QueueTransaction instead // Blocking function. Don't use directly. Use QueueTransaction instead
func (pool *TxPool) addTransaction(tx *Transaction) { func (pool *TxPool) addTransaction(tx *Transaction) {
log.Println("Adding tx to pool")
pool.mutex.Lock() pool.mutex.Lock()
pool.pool.PushBack(tx) pool.pool.PushBack(tx)
pool.mutex.Unlock() pool.mutex.Unlock()
// Broadcast the transaction to the rest of the peers // Broadcast the transaction to the rest of the peers
pool.Speaker.Broadcast(ethwire.MsgTxTy, []interface{}{tx.RlpData()}) pool.Speaker.Broadcast(ethwire.MsgTxTy, []interface{}{tx.RlpData()})
log.Println("broadcasting it")
} }
// Process transaction validates the Tx and processes funds from the // Process transaction validates the Tx and processes funds from the
// sender to the recipient. // sender to the recipient.
func (pool *TxPool) ProcessTransaction(tx *Transaction, block *Block) (err error) { func (pool *TxPool) ProcessTransaction(tx *Transaction, block *Block) (err error) {
log.Printf("[TXPL] Processing Tx %x\n", tx.Hash())
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
log.Println(r) log.Println(r)
@ -132,6 +141,11 @@ func (pool *TxPool) ProcessTransaction(tx *Transaction, block *Block) (err error
block.UpdateAddr(tx.Sender(), sender) block.UpdateAddr(tx.Sender(), sender)
log.Printf("[TXPL] Processed Tx %x\n", tx.Hash())
// Notify the subscribers
pool.notifySubscribers(TxPost, tx)
return return
} }
@ -145,7 +159,8 @@ func (pool *TxPool) ValidateTransaction(tx *Transaction) error {
} }
// Get the sender // Get the sender
sender := block.GetAddr(tx.Sender()) accountState := pool.BlockManager.GetAddrState(tx.Sender())
sender := accountState.Account
totAmount := new(big.Int).Add(tx.Value, new(big.Int).Mul(TxFee, TxFeeRat)) totAmount := new(big.Int).Add(tx.Value, new(big.Int).Mul(TxFee, TxFeeRat))
// Make sure there's enough in the sender's account. Having insufficient // Make sure there's enough in the sender's account. Having insufficient
@ -185,9 +200,8 @@ out:
// doesn't matter since this is a goroutine // doesn't matter since this is a goroutine
pool.addTransaction(tx) pool.addTransaction(tx)
if pool.SecondaryProcessor != nil { // Notify the subscribers
pool.SecondaryProcessor.ProcessTransaction(tx) pool.notifySubscribers(TxPre, tx)
}
} }
case <-pool.quit: case <-pool.quit:
break out break out
@ -231,3 +245,14 @@ func (pool *TxPool) Stop() {
pool.Flush() pool.Flush()
} }
func (pool *TxPool) Subscribe(channel chan TxMsg) {
pool.subscribers = append(pool.subscribers, channel)
}
func (pool *TxPool) notifySubscribers(ty TxMsgTy, tx *Transaction) {
msg := TxMsg{Type: ty, Tx: tx}
for _, subscriber := range pool.subscribers {
subscriber <- msg
}
}