901 lines
24 KiB
C++
901 lines
24 KiB
C++
/*
|
|
* Copyright 2008, 2011 Free Software Foundation, Inc.
|
|
*
|
|
* This software is distributed under the terms of the GNU Affero Public License.
|
|
* See the COPYING file in the main directory for details.
|
|
*
|
|
* This use of this software may be subject to additional restrictions.
|
|
* See the LEGAL file in the main directory for details.
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Affero General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Affero General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
*/
|
|
|
|
|
|
#ifndef INTERTHREAD_H
|
|
#define INTERTHREAD_H
|
|
|
|
#include "Defines.h"
|
|
#include "Timeval.h"
|
|
#include "Threads.h"
|
|
#include "LinkedLists.h"
|
|
#include <map>
|
|
#include <vector>
|
|
#include <queue>
|
|
#include <list>
|
|
|
|
|
|
|
|
|
|
|
|
/**@defgroup Templates for interthread mechanisms. */
|
|
//@{
|
|
|
|
|
|
/** Pointer FIFO for interthread operations. */
|
|
// (pat) The elements in the queue are type T*, and
|
|
// the Fifo class implements the underlying queue.
|
|
// The default is class PointerFIFO, which does not place any restrictions on the type of T,
|
|
// and is implemented by allocating auxilliary structures for the queue,
|
|
// or SingleLinkedList, which implements the queue using an internal pointer in type T,
|
|
// which must implement the functional interface of class SingleLinkListNode,
|
|
// namely: functions T*next() and void setNext(T*).
|
|
#if UNUSED
|
|
//template <class T, class Fifo=PointerFIFO> class InterthreadQueue {
|
|
//
|
|
// protected:
|
|
//
|
|
// Fifo mQ;
|
|
// mutable Mutex mLock;
|
|
// mutable Signal mWriteSignal;
|
|
//
|
|
// public:
|
|
//
|
|
// /** Delete contents. */
|
|
// void clear()
|
|
// {
|
|
// ScopedLock lock(mLock);
|
|
// while (mQ.size()>0) delete (T*)mQ.get();
|
|
// }
|
|
//
|
|
// /** Empty the queue, but don't delete. */
|
|
// void flushNoDelete()
|
|
// {
|
|
// ScopedLock lock(mLock);
|
|
// while (mQ.size()>0) mQ.get();
|
|
// }
|
|
//
|
|
//
|
|
// ~InterthreadQueue()
|
|
// { clear(); }
|
|
//
|
|
//
|
|
// size_t size() const
|
|
// {
|
|
// ScopedLock lock(mLock);
|
|
// return mQ.size();
|
|
// }
|
|
//
|
|
// size_t totalSize() const // pat added
|
|
// {
|
|
// ScopedLock lock(mLock);
|
|
// return mQ.totalSize();
|
|
// }
|
|
//
|
|
// /**
|
|
// Blocking read.
|
|
// @return Pointer to object (will not be NULL).
|
|
// */
|
|
// T* read()
|
|
// {
|
|
// ScopedLock lock(mLock);
|
|
// T* retVal = (T*)mQ.get();
|
|
// while (retVal==NULL) {
|
|
// mWriteSignal.wait(mLock);
|
|
// retVal = (T*)mQ.get();
|
|
// }
|
|
// return retVal;
|
|
// }
|
|
//
|
|
// /** Non-blocking peek at the first element; returns NULL if empty. */
|
|
// T* front()
|
|
// {
|
|
// ScopedLock lock(mLock);
|
|
// return (T*) mQ.front();
|
|
// }
|
|
//
|
|
// /**
|
|
// Blocking read with a timeout.
|
|
// @param timeout The read timeout in ms.
|
|
// @return Pointer to object or NULL on timeout.
|
|
// */
|
|
// T* read(unsigned timeout)
|
|
// {
|
|
// if (timeout==0) return readNoBlock();
|
|
// Timeval waitTime(timeout);
|
|
// ScopedLock lock(mLock);
|
|
// while ((mQ.size()==0) && (!waitTime.passed()))
|
|
// mWriteSignal.wait(mLock,waitTime.remaining());
|
|
// T* retVal = (T*)mQ.get();
|
|
// return retVal;
|
|
// }
|
|
//
|
|
// /**
|
|
// Non-blocking read.
|
|
// @return Pointer to object or NULL if FIFO is empty.
|
|
// */
|
|
// T* readNoBlock()
|
|
// {
|
|
// ScopedLock lock(mLock);
|
|
// return (T*)mQ.get();
|
|
// }
|
|
//
|
|
// /** Non-blocking write. */
|
|
// void write(T* val)
|
|
// {
|
|
// ScopedLock lock(mLock);
|
|
// mQ.put(val);
|
|
// mWriteSignal.signal();
|
|
// }
|
|
//
|
|
// /** Non-block write to the front of the queue. */
|
|
// void write_front(T* val) // pat added
|
|
// {
|
|
// ScopedLock lock(mLock);
|
|
// mQ.push_front(val);
|
|
// mWriteSignal.signal();
|
|
// }
|
|
//};
|
|
#endif
|
|
|
|
// A list designed for pointers so we can use get methods that return NULL on error.
|
|
template<class T>
|
|
class PtrList : public std::list<T*> {
|
|
//typedef typename std::list<T*> type_t;
|
|
public:
|
|
typedef typename std::list<T*>::iterator iter_t;
|
|
// Like pop_front but return the value, or NULL if none.
|
|
T* pop_frontr() {
|
|
if (this->empty()) { return NULL; }
|
|
T* result = this->front();
|
|
this->pop_front();
|
|
return result;
|
|
}
|
|
// Like pop_back but return the value, or NULL if none.
|
|
T* pop_backr() {
|
|
if (this->empty()) { return NULL; }
|
|
T* result = this->back();
|
|
this->pop_back();
|
|
return result;
|
|
}
|
|
|
|
// These functions are solely for use by InterthreadQueue, necessitated by backward compatibility with PointerFIFO.
|
|
void put(T*v) { this->push_back(v); }
|
|
T* get() { return this->pop_frontr(); }
|
|
};
|
|
|
|
|
|
|
|
// (pat 8-2013) The scoped iterators are a great idea and worked fine, but it is not KISS, so I stopped using it.
|
|
// If you want to iterate through an InterthreadQueue or InterthreadMap, just get the lock using the appropropriate qGetLock method.
|
|
#if USE_SCOPED_ITERATORS
|
|
// This ScopedIterator locks the container for as long as the iterator exists.
|
|
// You have to create the ScopedIterator using the thread-safe container object, example:
|
|
// ThreadSafeMap<a,b> mymap;
|
|
// ScopedIterator it(mymap);
|
|
// for (it = mymap.begin(); it != mymap.end(); it++) {}
|
|
// Then begin() and end() are passed through to the normal underlying iterator.
|
|
// An alternative implementation would have been to modify begin and end to return scoped iterators,
|
|
// but that is more complicated and would probably require multiple locks/unlocks on the Mutex.
|
|
template <class BaseType,class DerivedType,class ValueType>
|
|
class ScopedIteratorTemplate : public BaseType::iterator {
|
|
Mutex &mLockRef;
|
|
public:
|
|
ScopedIteratorTemplate(DerivedType &wOwner) : mLockRef(wOwner.qGetLock()) { mLockRef.lock(); }
|
|
~ScopedIteratorTemplate() { mLockRef.unlock(); }
|
|
void operator=(typename BaseType::iterator it) { this->BaseType::iterator::operator=(it); }
|
|
void operator++() { this->BaseType::iterator::operator++(); } // ++prefix
|
|
void operator++(int) { this->BaseType::iterator::operator++(0); } // postfix++
|
|
ValueType& operator*() { return this->BaseType::iterator::operator*(); }
|
|
ValueType* operator->() { return this->BaseType::iterator::operator->(); }
|
|
};
|
|
#endif
|
|
|
|
|
|
|
|
// (pat) There was a transition period from the old to the new InterthreadQueue when the new
|
|
// one was named InterthreadQueue2, and that still exists in some versions of the SGSN/GPRS code.
|
|
#define InterthreadQueue2 InterthreadQueue
|
|
|
|
// (pat) The original InterthreadQueue had a complicated threading problem that this version fixed.
|
|
// I started out using this new version only in GPRS and SGSN, for fear of breaking something in GSM,
|
|
// but in release 4 I removed the old version above.
|
|
// 5-2013: Changed the ultimate base class to a PtrList and added ScopedIterator.
|
|
// 8-2013: Removed the ScopedIterator even though it is an elegant solution, because it is easy to
|
|
// just use the internal lock directly when one needs to iterate through one of these.
|
|
//template <class T, class Fifo=PointerFIFO> class InterthreadQueue {
|
|
template <class T, class Fifo=PtrList<T> > class InterthreadQueue {
|
|
//protected:
|
|
|
|
Fifo mQ;
|
|
// (pat) DO NOT USE mLock and mWriteSignal; instead use mLockPointer and mWriteSignalPointer.
|
|
// That allows us to connect two InterthreadQueue together such that a single thread can wait on either.
|
|
mutable Mutex mLock, *mLockPointer;
|
|
mutable Signal mWriteSignal, *mWriteSignalPointer;
|
|
|
|
protected:
|
|
|
|
public:
|
|
InterthreadQueue() : mLockPointer(&mLock), mWriteSignalPointer(&mWriteSignal) {}
|
|
|
|
// This connects the two InterthreadQueue permanently so they use the same lock and Signal.
|
|
// Subsequently you can use iqWaitForEither.
|
|
void iqConnect(InterthreadQueue &other) {
|
|
mLockPointer = other.mLockPointer;
|
|
mWriteSignalPointer = other.mWriteSignalPointer;
|
|
}
|
|
|
|
Mutex &qGetLock() { return mLock; }
|
|
#if USE_SCOPED_ITERATORS
|
|
// The Iterator locks the InterthreadQueue until the Iterator falls out of scope.
|
|
// Semantics are different from normal C++ iterators - the begin,end,erase methods are in
|
|
// the Iterator, not the base type.
|
|
// Use like this:
|
|
// InterthreadQueue<T>::ScopedIterator sit(someInterthreadQueue);
|
|
// for (T*val = sit.front(); val = *sit; sit++) ...
|
|
// if (something) val.erase();
|
|
typedef typename Fifo::iterator iterator;
|
|
class ScopedIterator {
|
|
typedef InterthreadQueue<T,Fifo> BaseType_t;
|
|
typedef typename Fifo::iterator iterator_t;
|
|
BaseType_t &mParent;
|
|
iterator_t mit;
|
|
|
|
public:
|
|
ScopedIterator(BaseType_t&wParent) : mParent(wParent) { mParent.mLockPointer->lock(); }
|
|
~ScopedIterator() { mParent.mLockPointer->unlock(); }
|
|
|
|
// Regular old iterators in case you want to use em.
|
|
iterator_t begin() { return mParent.mQ.begin(); }
|
|
iterator_t end() { return mParent.mQ.end(); }
|
|
|
|
// Accessors and operators. Accessors move the iterator, eg, using front() rewinds iter to begin().
|
|
T* current() { return mit != end() ? *mit : NULL; }
|
|
T* front() { mit = begin(); return current(); }
|
|
T* next() { if (mit != end()) { ++mit; } return current(); }
|
|
// Erase current element and advance the iterator forward.
|
|
void erase() { if (mit != end()) mit = mParent.mQ.erase(mit); }
|
|
T* operator++() { return next(); } // prefix ++
|
|
T* operator++(int) { T*result = current(); next(); return result; } // postfix ++
|
|
T* operator*() { return current(); }
|
|
|
|
// And here is random access in case you want it.
|
|
// Note that this is inefficient, so dont use it unless you know the queue is small.
|
|
// This is inside ScopedIterator so that the entire InterthreadQueue is locked while you do whatever it is you are doing.
|
|
// Eg: { InterthreadQueue<T>::ScopedIterator sit(myinterthreadqueue); for (unsigned i=0; i<10; i++) { T*foo = sit[i]; ... } }
|
|
T* operator[](unsigned ind) {
|
|
unsigned i = 0;
|
|
for (iterator_t itr = begin(); itr != end(); itr++) { if (i++ == ind) return *itr; }
|
|
return NULL; // Out of bounds.
|
|
}
|
|
};
|
|
#endif
|
|
|
|
/** Delete contents. */
|
|
void clear()
|
|
{
|
|
ScopedLock lock(*mLockPointer);
|
|
while (mQ.size()>0) delete (T*)mQ.get();
|
|
}
|
|
|
|
/** Empty the queue, but don't delete. */
|
|
void flushNoDelete()
|
|
{
|
|
ScopedLock lock(*mLockPointer);
|
|
while (mQ.size()>0) mQ.get();
|
|
}
|
|
|
|
|
|
~InterthreadQueue()
|
|
{ clear(); }
|
|
|
|
|
|
size_t size() const
|
|
{
|
|
ScopedLock lock(*mLockPointer);
|
|
return mQ.size();
|
|
}
|
|
|
|
size_t totalSize() const // pat added
|
|
{
|
|
ScopedLock lock(*mLockPointer);
|
|
return mQ.totalSize();
|
|
}
|
|
|
|
// Wait for something on either of the two queues connected by iqConnect. Kind of hokey, but it works. Timeout is in msecs.
|
|
void iqWaitForEither(InterthreadQueue &other, unsigned timeout) {
|
|
ScopedLock lock(*mLockPointer);
|
|
if (timeout) {
|
|
Timeval waitTime(timeout);
|
|
while (mQ.size() == 0 && other.mQ.size() == 0) {
|
|
mWriteSignalPointer->wait(*mLockPointer,waitTime.remaining());
|
|
}
|
|
} else { // Wait forever.
|
|
while (mQ.size() == 0 && other.mQ.size() == 0) {
|
|
mWriteSignalPointer->wait(*mLockPointer);
|
|
}
|
|
}
|
|
}
|
|
|
|
// (pat 8-2013) Removed. Bad idea to use this name - conflicts with wait() in InterthreadQueueWithWait
|
|
//void wait() { // (pat 7-25-2013) Added. Wait for something to appear in the queue.
|
|
// ScopedLock lock(*mLockPointer);
|
|
// while (mQ.size() == 0) {
|
|
// mWriteSignalPointer->wait(*mLockPointer);
|
|
// }
|
|
//}
|
|
|
|
/**
|
|
Blocking read from back of queue.
|
|
@return Pointer to object (will not be NULL).
|
|
*/
|
|
T* read()
|
|
{
|
|
ScopedLock lock(*mLockPointer);
|
|
T* retVal = (T*)mQ.get();
|
|
while (retVal==NULL) {
|
|
mWriteSignalPointer->wait(*mLockPointer);
|
|
retVal = (T*)mQ.get();
|
|
}
|
|
return retVal;
|
|
}
|
|
|
|
/** Non-blocking peek at the first element; returns NULL if empty. */
|
|
T* front()
|
|
{
|
|
ScopedLock lock(*mLockPointer);
|
|
return (T*) mQ.front();
|
|
}
|
|
|
|
/**
|
|
Blocking read with a timeout.
|
|
@param timeout The read timeout in ms.
|
|
@return Pointer to object or NULL on timeout.
|
|
*/
|
|
T* read(unsigned timeout)
|
|
{
|
|
if (timeout==0) return readNoBlock();
|
|
Timeval waitTime(timeout);
|
|
ScopedLock lock(*mLockPointer);
|
|
while (mQ.size()==0) {
|
|
long remaining = waitTime.remaining();
|
|
// (pat) How high to we expect the precision here to be? I dont think they are used a precision timers,
|
|
// so dont try to wait if the remainder is just a few msecs.
|
|
if (remaining < 2) { return NULL; }
|
|
mWriteSignalPointer->wait(*mLockPointer,remaining);
|
|
}
|
|
T* retVal = (T*)mQ.get();
|
|
return retVal;
|
|
}
|
|
|
|
/**
|
|
Non-blocking read. aka pop_front.
|
|
@return Pointer to object or NULL if FIFO is empty.
|
|
*/
|
|
T* readNoBlock()
|
|
{
|
|
ScopedLock lock(*mLockPointer);
|
|
return (T*)mQ.get();
|
|
}
|
|
|
|
/** Non-blocking write. aka push_back */
|
|
void write(T* val)
|
|
{
|
|
// (pat) The Mutex mLock must be released before signaling the mWriteSignal condition.
|
|
// This is an implicit requirement of pthread_cond_wait() called from signal().
|
|
// If you do not do that, the InterthreadQueue read() function cannot start
|
|
// because the mutex is still locked by the thread calling the write(),
|
|
// so the read() thread yields its immediate execution opportunity.
|
|
// This recurs (and the InterthreadQueue fills up with data)
|
|
// until the read thread's accumulated temporary priority causes it to
|
|
// get a second pre-emptive activation over the writing thread,
|
|
// resulting in bursts of activity by the read thread.
|
|
{ ScopedLock lock(*mLockPointer);
|
|
mQ.put(val);
|
|
}
|
|
mWriteSignalPointer->signal();
|
|
}
|
|
|
|
/** Non-block write to the front of the queue. aka push_front */
|
|
void write_front(T* val) // pat added
|
|
{
|
|
// (pat) See comments above.
|
|
{ ScopedLock lock(*mLockPointer);
|
|
mQ.push_front(val);
|
|
}
|
|
mWriteSignalPointer->signal();
|
|
}
|
|
};
|
|
|
|
|
|
|
|
/** Pointer FIFO for interthread operations. */
|
|
// Pat thinks this should be combined with InterthreadQueue by simply moving the wait method there.
|
|
template <class T> class InterthreadQueueWithWait {
|
|
|
|
protected:
|
|
|
|
PointerFIFO mQ;
|
|
mutable Mutex mLock;
|
|
mutable Signal mWriteSignal;
|
|
mutable Signal mReadSignal;
|
|
|
|
virtual void freeElement(T* element) const { delete element; };
|
|
|
|
public:
|
|
|
|
/** Delete contents. */
|
|
void clear()
|
|
{
|
|
ScopedLock lock(mLock);
|
|
while (mQ.size()>0) freeElement((T*)mQ.get());
|
|
mReadSignal.signal();
|
|
}
|
|
|
|
|
|
|
|
virtual ~InterthreadQueueWithWait()
|
|
{ clear(); }
|
|
|
|
|
|
size_t size() const
|
|
{
|
|
ScopedLock lock(mLock);
|
|
return mQ.size();
|
|
}
|
|
|
|
/**
|
|
Blocking read.
|
|
@return Pointer to object (will not be NULL).
|
|
*/
|
|
T* read()
|
|
{
|
|
ScopedLock lock(mLock);
|
|
T* retVal = (T*)mQ.get();
|
|
while (retVal==NULL) {
|
|
mWriteSignal.wait(mLock);
|
|
retVal = (T*)mQ.get();
|
|
}
|
|
mReadSignal.signal();
|
|
return retVal;
|
|
}
|
|
|
|
/**
|
|
Blocking read with a timeout.
|
|
@param timeout The read timeout in ms.
|
|
@return Pointer to object or NULL on timeout.
|
|
*/
|
|
T* read(unsigned timeout)
|
|
{
|
|
if (timeout==0) return readNoBlock();
|
|
Timeval waitTime(timeout);
|
|
ScopedLock lock(mLock);
|
|
// (pat 8-2013) This commented out code has a deadlock problem.
|
|
//while ((mQ.size()==0) && (!waitTime.passed()))
|
|
// mWriteSignal.wait(mLock,waitTime.remaining());
|
|
while (mQ.size()==0) {
|
|
long remaining = waitTime.remaining();
|
|
// (pat) How high to we expect the precision here to be? I dont think they are used a precision timers,
|
|
// so dont try to wait if the remainder is just a few msecs.
|
|
if (remaining < 2) { return NULL; }
|
|
mWriteSignal.wait(mLock,remaining);
|
|
}
|
|
T* retVal = (T*)mQ.get();
|
|
if (retVal!=NULL) mReadSignal.signal();
|
|
return retVal;
|
|
}
|
|
|
|
/**
|
|
Non-blocking read.
|
|
@return Pointer to object or NULL if FIFO is empty.
|
|
*/
|
|
T* readNoBlock()
|
|
{
|
|
ScopedLock lock(mLock);
|
|
T* retVal = (T*)mQ.get();
|
|
if (retVal!=NULL) mReadSignal.signal();
|
|
return retVal;
|
|
}
|
|
|
|
/** Non-blocking write. */
|
|
void write(T* val)
|
|
{
|
|
// (pat) 8-14: Taking out the threading problem fix temporarily for David to use in the field.
|
|
ScopedLock lock(mLock);
|
|
mQ.put(val);
|
|
mWriteSignal.signal();
|
|
}
|
|
|
|
/** Wait until the queue falls below a low water mark. */
|
|
// (pat) This function suffers from the same problem as documented
|
|
// at InterthreadQueue.write(), but I am not fixing it because I cannot test it.
|
|
// The caller of this function will eventually get to run, just not immediately
|
|
// after the mReadSignal condition is fulfilled.
|
|
void wait(size_t sz=0)
|
|
{
|
|
ScopedLock lock(mLock);
|
|
while (mQ.size()>sz) mReadSignal.wait(mLock);
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
/** Thread-safe map of pointers to class D, keyed by class K. */
|
|
template <class K, class D > class InterthreadMap
|
|
{
|
|
public:
|
|
typedef std::map<K,D*> Map; // (pat) It would be more generally useful if this were D instead of D*
|
|
|
|
protected:
|
|
|
|
Map mMap;
|
|
mutable Mutex mLock;
|
|
//mutable Mutex mModificationLock; // (pat) Lock prevents deletion from the map while an iterator is active.
|
|
Signal mWriteSignal;
|
|
|
|
public:
|
|
|
|
void clear()
|
|
{
|
|
// Delete everything in the map.
|
|
//ScopedLock lock1(mModificationLock); // Lock this first!
|
|
ScopedLock lock(mLock);
|
|
typename Map::iterator iter = mMap.begin();
|
|
while (iter != mMap.end()) {
|
|
delete iter->second;
|
|
++iter;
|
|
}
|
|
mMap.clear();
|
|
}
|
|
|
|
~InterthreadMap() { clear(); }
|
|
|
|
/**
|
|
Non-blocking write. WARNING: This deletes any pre-existing element!
|
|
@param key The index to write to.
|
|
@param wData Pointer to data, not to be deleted until removed from the map.
|
|
*/
|
|
void write(const K &key, D * wData)
|
|
{
|
|
//ScopedLock lock1(mModificationLock); // Lock this first!
|
|
ScopedLock lock(mLock);
|
|
typename Map::iterator iter = mMap.find(key);
|
|
if (iter!=mMap.end()) {
|
|
delete iter->second;
|
|
iter->second = wData;
|
|
} else {
|
|
mMap[key] = wData;
|
|
}
|
|
mWriteSignal.broadcast();
|
|
}
|
|
|
|
/**
|
|
Identical to readNoBlock but with element removal.
|
|
@param key Key to read from.
|
|
@return Pointer at key or NULL if key not found, to be deleted by caller.
|
|
*/
|
|
D* getNoBlock(const K& key, bool bRemove = true)
|
|
{
|
|
ScopedLock lock(mLock);
|
|
typename Map::iterator iter = mMap.find(key);
|
|
if (iter==mMap.end()) return NULL;
|
|
D* retVal = iter->second;
|
|
if (bRemove) { mMap.erase(iter); }
|
|
return retVal;
|
|
}
|
|
|
|
/**
|
|
Blocking read with a timeout and element removal.
|
|
@param key The key to read from.
|
|
@param timeout The blocking timeout in ms.
|
|
@return Pointer at key or NULL on timeout, to be deleted by caller.
|
|
*/
|
|
D* get(const K &key, unsigned timeout, bool bRemove = true)
|
|
{
|
|
if (timeout==0) return getNoBlock(key,bRemove);
|
|
ScopedLock lock(mLock);
|
|
Timeval waitTime(timeout);
|
|
// (pat 8-2013) This commented out code suffered from a deadlock problem.
|
|
//typename Map::iterator iter = mMap.find(key);
|
|
//while ((iter==mMap.end()) && (!waitTime.passed())) {
|
|
// mWriteSignal.wait(mLock,waitTime.remaining());
|
|
// iter = mMap.find(key);
|
|
//}
|
|
//if (iter==mMap.end()) return NULL;
|
|
//D* retVal = iter->second;
|
|
//mMap.erase(iter);
|
|
//return retVal;
|
|
while (1) {
|
|
typename Map::iterator iter = mMap.find(key);
|
|
if (iter!=mMap.end()) {
|
|
D* retVal = iter->second;
|
|
if (bRemove) { mMap.erase(iter); }
|
|
return retVal;
|
|
}
|
|
long remaining = waitTime.remaining();
|
|
if (remaining < 2) { return NULL; }
|
|
mWriteSignal.wait(mLock,remaining);
|
|
}
|
|
}
|
|
|
|
/**
|
|
Blocking read with and element removal.
|
|
@param key The key to read from.
|
|
@return Pointer at key, to be deleted by caller.
|
|
*/
|
|
D* get(const K &key, bool bRemove = true)
|
|
{
|
|
ScopedLock lock(mLock);
|
|
typename Map::iterator iter = mMap.find(key);
|
|
while (iter==mMap.end()) {
|
|
mWriteSignal.wait(mLock);
|
|
iter = mMap.find(key);
|
|
}
|
|
D* retVal = iter->second;
|
|
if (bRemove) { mMap.erase(iter); }
|
|
return retVal;
|
|
}
|
|
|
|
|
|
/**
|
|
Remove an entry and delete it.
|
|
@param key The key of the entry to delete.
|
|
@return True if it was actually found and deleted.
|
|
(pat) If you just want remove without deleting, see getNoBlock.
|
|
*/
|
|
bool remove(const K &key )
|
|
{
|
|
//ScopedLock lock(mModificationLock);
|
|
D* val = getNoBlock(key);
|
|
if (!val) return false;
|
|
delete val;
|
|
return true;
|
|
}
|
|
|
|
|
|
/**
|
|
Non-blocking read. (pat) Actually, it blocks until the map is available.
|
|
@param key Key to read from.
|
|
@return Pointer at key or NULL if key not found.
|
|
*/
|
|
D* readNoBlock(const K& key) const
|
|
{
|
|
return Unconst(this)->getNoBlock(key,false);
|
|
// (pat 8-2013) Lets use common code.
|
|
//D* retVal=NULL;
|
|
//ScopedLock lock(mLock);
|
|
//typename Map::const_iterator iter = mMap.find(key);
|
|
//if (iter!=mMap.end()) retVal = iter->second;
|
|
//return retVal;
|
|
}
|
|
|
|
/**
|
|
Blocking read with a timeout.
|
|
@param key The key to read from.
|
|
@param timeout The blocking timeout in ms.
|
|
@return Pointer at key or NULL on timeout.
|
|
*/
|
|
D* read(const K &key, unsigned timeout) const
|
|
{
|
|
return Unconst(this)->get(key,timeout,false);
|
|
// (pat 8-2013) This commented out code suffered from a deadlock problem.
|
|
//if (timeout==0) return readNoBlock(key);
|
|
//ScopedLock lock(mLock);
|
|
//Timeval waitTime(timeout);
|
|
//typename Map::const_iterator iter = mMap.find(key);
|
|
//while ((iter==mMap.end()) && (!waitTime.passed())) {
|
|
// mWriteSignal.wait(mLock,waitTime.remaining());
|
|
// iter = mMap.find(key);
|
|
//}
|
|
//if (iter==mMap.end()) return NULL;
|
|
//D* retVal = iter->second;
|
|
//return retVal;
|
|
}
|
|
|
|
/**
|
|
Blocking read. Blocks until the key exists.
|
|
@param key The key to read from.
|
|
@return Pointer at key.
|
|
*/
|
|
D* read(const K &key) const
|
|
{
|
|
return Unconst(this)->get(key,false);
|
|
// (pat 8-2013) Lets use common code.
|
|
//ScopedLock lock(mLock);
|
|
//typename Map::const_iterator iter = mMap.find(key);
|
|
//while (iter==mMap.end()) {
|
|
// mWriteSignal.wait(mLock);
|
|
// iter = mMap.find(key);
|
|
//}
|
|
//D* retVal = iter->second;
|
|
//return retVal;
|
|
}
|
|
|
|
// pat added.
|
|
unsigned size() const { return mMap.size(); }
|
|
|
|
//Mutex &getModificationLock() { return mModificationLock; }
|
|
//void iterLock() { mModificationLock.lock(); }
|
|
//void iterUnlock() { mModificationLock.unlock(); }
|
|
|
|
#if USE_SCOPED_ITERATORS
|
|
typedef ScopedIteratorTemplate<Map,InterthreadMap<K,D>,std::pair<const K,D*> > ScopedIterator;
|
|
#endif
|
|
// WARNING: These iterators are not intrinsically thread safe.
|
|
// Caller must use ScopiedIterator or the modification lock or enclose the entire iteration in some higher level lock.
|
|
Mutex &qGetLock() { return mLock; }
|
|
typedef typename Map::iterator iterator;
|
|
typename Map::iterator begin() { return mMap.begin(); }
|
|
typename Map::iterator end() { return mMap.end(); }
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** This class is used to provide pointer-based comparison in priority_queues. */
|
|
template <class T> class PointerCompare {
|
|
|
|
public:
|
|
|
|
/** Compare the objects pointed to, not the pointers themselves. */
|
|
bool operator()(const T *v1, const T *v2)
|
|
{ return (*v1)>(*v2); }
|
|
|
|
};
|
|
|
|
|
|
|
|
/**
|
|
Priority queue for interthread operations.
|
|
Passes pointers to objects.
|
|
*/
|
|
template <class T, class C = std::vector<T*>, class Cmp = PointerCompare<T> > class InterthreadPriorityQueue {
|
|
|
|
protected:
|
|
|
|
std::priority_queue<T*,C,Cmp> mQ;
|
|
mutable Mutex mLock;
|
|
mutable Signal mWriteSignal;
|
|
|
|
public:
|
|
|
|
|
|
/** Clear the FIFO. */
|
|
void clear()
|
|
{
|
|
ScopedLock lock(mLock);
|
|
while (mQ.size()>0) {
|
|
T* ptr = mQ.top();
|
|
mQ.pop();
|
|
delete ptr;
|
|
}
|
|
}
|
|
|
|
|
|
~InterthreadPriorityQueue()
|
|
{
|
|
clear();
|
|
}
|
|
|
|
size_t size() const
|
|
{
|
|
ScopedLock lock(mLock);
|
|
return mQ.size();
|
|
}
|
|
|
|
|
|
/** Non-blocking read. */
|
|
T* readNoBlock()
|
|
{
|
|
ScopedLock lock(mLock);
|
|
T* retVal = NULL;
|
|
if (mQ.size()!=0) {
|
|
retVal = mQ.top();
|
|
mQ.pop();
|
|
}
|
|
return retVal;
|
|
}
|
|
|
|
/** Blocking read. */
|
|
T* read()
|
|
{
|
|
ScopedLock lock(mLock);
|
|
T* retVal;
|
|
while (mQ.size()==0) mWriteSignal.wait(mLock);
|
|
retVal = mQ.top();
|
|
mQ.pop();
|
|
return retVal;
|
|
}
|
|
|
|
/** Non-blocking write. */
|
|
void write(T* val)
|
|
{
|
|
// (pat) 8-14: Taking out the threading problem fix temporarily for David to use in the field.
|
|
ScopedLock lock(mLock);
|
|
mQ.push(val);
|
|
mWriteSignal.signal();
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
class Semaphore {
|
|
|
|
private:
|
|
|
|
bool mFlag;
|
|
Signal mSignal;
|
|
mutable Mutex mLock;
|
|
|
|
public:
|
|
|
|
Semaphore()
|
|
:mFlag(false)
|
|
{ }
|
|
|
|
void post()
|
|
{
|
|
ScopedLock lock(mLock);
|
|
mFlag=true;
|
|
mSignal.signal();
|
|
}
|
|
|
|
void get()
|
|
{
|
|
ScopedLock lock(mLock);
|
|
while (!mFlag) mSignal.wait(mLock);
|
|
mFlag=false;
|
|
}
|
|
|
|
bool semtry()
|
|
{
|
|
ScopedLock lock(mLock);
|
|
bool retVal = mFlag;
|
|
mFlag = false;
|
|
return retVal;
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
//@}
|
|
|
|
|
|
|
|
|
|
#endif
|
|
// vim: ts=4 sw=4
|