cleanup locking for mempool

This commit is contained in:
adityapk 2022-05-16 13:33:43 -05:00
parent 3bec6d290c
commit 98cb0ea9f2
1 changed files with 15 additions and 11 deletions

View File

@ -33,28 +33,34 @@ var (
// AddNewClient adds a new client to the list of clients to notify for mempool txns
func AddNewClient(client chan<- *walletrpc.RawTransaction) {
lock.Lock()
defer lock.Unlock()
//Log.Infoln("Adding new client, sending ", len(txns), " transactions")
// Log.Infoln("Adding new client, sending ", len(txns), " transactions")
// Copy map locally
lock.Lock()
localmap := make(map[string]*walletrpc.RawTransaction)
for k, rtx := range txns {
localmap[k] = rtx
}
lock.Unlock()
// Also send all pending mempool txns
for _, rtx := range txns {
for _, rtx := range localmap {
if client != nil {
client <- rtx
}
}
lock.Lock()
if client != nil {
clients = append(clients, client)
}
Metrics.MempoolClientsGauge.Set(float64(len(clients)))
lock.Unlock()
}
// RefreshMempoolTxns gets all new mempool txns and sends any new ones to waiting clients
func refreshMempoolTxns() error {
//Log.Infoln("Refreshing mempool")
// First check if another refresh is running, if it is, just return
if !atomic.CompareAndSwapInt32(&refreshing, 0, 1) {
Log.Warnln("Another refresh in progress, returning")
@ -62,9 +68,7 @@ func refreshMempoolTxns() error {
}
// Set refreshing to 0 when we exit
defer func() {
atomic.StoreInt32(&refreshing, 0)
}()
defer atomic.StoreInt32(&refreshing, 0)
// Check if the blockchain has changed, and if it has, then clear everything
@ -153,7 +157,7 @@ func refreshMempoolTxns() error {
// StartMempoolMonitor starts monitoring the mempool
func StartMempoolMonitor(cache *BlockCache, done <-chan bool) {
go func() {
ticker := time.NewTicker(30 * time.Second)
ticker := time.NewTicker(5 * time.Second)
blockcache = cache
lastHash = blockcache.GetLatestHash()