diff --git a/common/mempool.go b/common/mempool.go index e765e39..7c0c9b5 100644 --- a/common/mempool.go +++ b/common/mempool.go @@ -33,28 +33,34 @@ var ( // AddNewClient adds a new client to the list of clients to notify for mempool txns func AddNewClient(client chan<- *walletrpc.RawTransaction) { - lock.Lock() - defer lock.Unlock() - //Log.Infoln("Adding new client, sending ", len(txns), " transactions") + // Log.Infoln("Adding new client, sending ", len(txns), " transactions") + + // Copy map locally + lock.Lock() + localmap := make(map[string]*walletrpc.RawTransaction) + + for k, rtx := range txns { + localmap[k] = rtx + } + lock.Unlock() // Also send all pending mempool txns - for _, rtx := range txns { + for _, rtx := range localmap { if client != nil { client <- rtx } } + lock.Lock() if client != nil { clients = append(clients, client) } - Metrics.MempoolClientsGauge.Set(float64(len(clients))) + lock.Unlock() } // RefreshMempoolTxns gets all new mempool txns and sends any new ones to waiting clients func refreshMempoolTxns() error { - //Log.Infoln("Refreshing mempool") - // First check if another refresh is running, if it is, just return if !atomic.CompareAndSwapInt32(&refreshing, 0, 1) { Log.Warnln("Another refresh in progress, returning") @@ -62,9 +68,7 @@ func refreshMempoolTxns() error { } // Set refreshing to 0 when we exit - defer func() { - atomic.StoreInt32(&refreshing, 0) - }() + defer atomic.StoreInt32(&refreshing, 0) // Check if the blockchain has changed, and if it has, then clear everything @@ -153,7 +157,7 @@ func refreshMempoolTxns() error { // StartMempoolMonitor starts monitoring the mempool func StartMempoolMonitor(cache *BlockCache, done <-chan bool) { go func() { - ticker := time.NewTicker(30 * time.Second) + ticker := time.NewTicker(5 * time.Second) blockcache = cache lastHash = blockcache.GetLatestHash()