/* * Copyright 2008, 2011 Free Software Foundation, Inc. * Copyright 2014 Range Networks, Inc. * * This software is distributed under the terms of the GNU Affero Public License. * See the COPYING file in the main directory for details. * * This use of this software may be subject to additional restrictions. * See the LEGAL file in the main directory for details. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ #ifndef INTERTHREAD_H #define INTERTHREAD_H #include "Defines.h" #include "Timeval.h" #include "Threads.h" #include "LinkedLists.h" #include #include #include #include /**@defgroup Templates for interthread mechanisms. */ //@{ // A list designed for pointers so we can use get methods that return NULL on error. template class PtrList : public std::list { //typedef typename std::list type_t; public: typedef typename std::list::iterator iter_t; // Like pop_front but return the value, or NULL if none. T* pop_frontr() { if (this->empty()) { return NULL; } T* result = this->front(); this->pop_front(); return result; } // Like pop_back but return the value, or NULL if none. T* pop_backr() { if (this->empty()) { return NULL; } T* result = this->back(); this->pop_back(); return result; } // These functions are solely for use by InterthreadQueue, necessitated by backward compatibility with PointerFIFO. void put(T*v) { this->push_back(v); } T* get() { return this->pop_frontr(); } }; // (pat 8-2013) The scoped iterators are a great idea and worked fine, but it is not KISS, so I stopped using it. // If you want to iterate through an InterthreadQueue or InterthreadMap, just get the lock using the appropropriate qGetLock method. #if USE_SCOPED_ITERATORS // This ScopedIterator locks the container for as long as the iterator exists. // You have to create the ScopedIterator using the thread-safe container object, example: // ThreadSafeMap mymap; // ScopedIterator it(mymap); // for (it = mymap.begin(); it != mymap.end(); it++) {} // Then begin() and end() are passed through to the normal underlying iterator. // An alternative implementation would have been to modify begin and end to return scoped iterators, // but that is more complicated and would probably require multiple locks/unlocks on the Mutex. template class ScopedIteratorTemplate : public BaseType::iterator { Mutex &mLockRef; public: ScopedIteratorTemplate(DerivedType &wOwner) : mLockRef(wOwner.qGetLock()) { mLockRef.lock(); } ~ScopedIteratorTemplate() { mLockRef.unlock(); } void operator=(typename BaseType::iterator it) { this->BaseType::iterator::operator=(it); } void operator++() { this->BaseType::iterator::operator++(); } // ++prefix void operator++(int) { this->BaseType::iterator::operator++(0); } // postfix++ ValueType& operator*() { return this->BaseType::iterator::operator*(); } ValueType* operator->() { return this->BaseType::iterator::operator->(); } }; #endif // (pat) There was a transition period from the old to the new InterthreadQueue when the new // one was named InterthreadQueue2, and that still exists in some versions of the SGSN/GPRS code. #define InterthreadQueue2 InterthreadQueue // (pat) The original InterthreadQueue had a complicated threading problem that this version fixed. // I started out using this new version only in GPRS and SGSN, for fear of breaking something in GSM, // but in release 4 I removed the old version above. // 5-2013: Changed the ultimate base class to a PtrList and added ScopedIterator. // 8-2013: Removed the ScopedIterator even though it is an elegant solution, because it is easy to // just use the internal lock directly when one needs to iterate through one of these. //template class InterthreadQueue { template > class InterthreadQueue { //protected: Fifo mQ; // (pat) DO NOT USE mLock and mWriteSignal; instead use mLockPointer and mWriteSignalPointer. // That allows us to connect two InterthreadQueue together such that a single thread can wait on either. mutable Mutex mLock, *mLockPointer; mutable Signal mWriteSignal, *mWriteSignalPointer; protected: public: InterthreadQueue() : mLockPointer(&mLock), mWriteSignalPointer(&mWriteSignal) {} // This connects the two InterthreadQueue permanently so they use the same lock and Signal. // Subsequently you can use iqWaitForEither. void iqConnect(InterthreadQueue &other) { mLockPointer = other.mLockPointer; mWriteSignalPointer = other.mWriteSignalPointer; } // (pat) This provides a client the ability to lock the InterthreadQueue and iterate it. Mutex &qGetLock() const { return mLock; } typedef typename Fifo::iterator iterator; typedef typename Fifo::const_iterator const_iterator; iterator begin() { assert(mLock.lockcnt()); return mQ.begin(); } iterator end() { assert(mLock.lockcnt()); return mQ.end(); } const_iterator begin() const { assert(mLock.lockcnt()); return mQ.begin(); } const_iterator end() const { assert(mLock.lockcnt()); return mQ.end(); } #if USE_SCOPED_ITERATORS // The Iterator locks the InterthreadQueue until the Iterator falls out of scope. // Semantics are different from normal C++ iterators - the begin,end,erase methods are in // the Iterator, not the base type. // Use like this: // InterthreadQueue::ScopedIterator sit(someInterthreadQueue); // for (T*val = sit.front(); val = *sit; sit++) ... // if (something) val.erase(); typedef typename Fifo::iterator iterator; class ScopedIterator { typedef InterthreadQueue BaseType_t; typedef typename Fifo::iterator iterator_t; BaseType_t &mParent; iterator_t mit; public: ScopedIterator(BaseType_t&wParent) : mParent(wParent) { mParent.mLockPointer->lock(); } ~ScopedIterator() { mParent.mLockPointer->unlock(); } // Regular old iterators in case you want to use em. iterator_t begin() { return mParent.mQ.begin(); } iterator_t end() { return mParent.mQ.end(); } // Accessors and operators. Accessors move the iterator, eg, using front() rewinds iter to begin(). T* current() { return mit != end() ? *mit : NULL; } T* front() { mit = begin(); return current(); } T* next() { if (mit != end()) { ++mit; } return current(); } // Erase current element and advance the iterator forward. void erase() { if (mit != end()) mit = mParent.mQ.erase(mit); } T* operator++() { return next(); } // prefix ++ T* operator++(int) { T*result = current(); next(); return result; } // postfix ++ T* operator*() { return current(); } // And here is random access in case you want it. // Note that this is inefficient, so dont use it unless you know the queue is small. // This is inside ScopedIterator so that the entire InterthreadQueue is locked while you do whatever it is you are doing. // Eg: { InterthreadQueue::ScopedIterator sit(myinterthreadqueue); for (unsigned i=0; i<10; i++) { T*foo = sit[i]; ... } } T* operator[](unsigned ind) { unsigned i = 0; for (iterator_t itr = begin(); itr != end(); itr++) { if (i++ == ind) return *itr; } return NULL; // Out of bounds. } }; #endif /** Delete contents. */ void clear() { ScopedLock lock(*mLockPointer); while (mQ.size()>0) delete (T*)mQ.get(); } /** Empty the queue, but don't delete. */ void flushNoDelete() { ScopedLock lock(*mLockPointer); while (mQ.size()>0) mQ.get(); } ~InterthreadQueue() { clear(); } size_t size() const { ScopedLock lock(*mLockPointer); return mQ.size(); } size_t totalSize() const // pat added { ScopedLock lock(*mLockPointer); return mQ.totalSize(); } // Wait for something on either of the two queues connected by iqConnect. Kind of hokey, but it works. Timeout is in msecs. void iqWaitForEither(InterthreadQueue &other, unsigned timeout) { ScopedLock lock(*mLockPointer); if (timeout) { Timeval waitTime(timeout); while (mQ.size() == 0 && other.mQ.size() == 0) { mWriteSignalPointer->wait(*mLockPointer,waitTime.remaining()); } } else { // Wait forever. while (mQ.size() == 0 && other.mQ.size() == 0) { mWriteSignalPointer->wait(*mLockPointer); } } } // (pat 8-2013) Removed. Bad idea to use this name - conflicts with wait() in InterthreadQueueWithWait //void wait() { // (pat 7-25-2013) Added. Wait for something to appear in the queue. // ScopedLock lock(*mLockPointer); // while (mQ.size() == 0) { // mWriteSignalPointer->wait(*mLockPointer); // } //} /** Blocking read from back of queue. @return Pointer to object (will not be NULL). */ T* read() { ScopedLock lock(*mLockPointer); T* retVal = (T*)mQ.get(); while (retVal==NULL) { mWriteSignalPointer->wait(*mLockPointer); retVal = (T*)mQ.get(); } return retVal; } /** Non-blocking peek at the first element; returns NULL if empty. */ T* front() const { ScopedLock lock(*mLockPointer); return (T*) (mQ.size() ? mQ.front() : NULL); } /** Blocking read with a timeout. @param timeout The read timeout in ms. @return Pointer to object or NULL on timeout. */ T* read(unsigned timeout) { if (timeout==0) return readNoBlock(); Timeval waitTime(timeout); ScopedLock lock(*mLockPointer); while (mQ.size()==0) { long remaining = waitTime.remaining(); // (pat) How high do we expect the precision here to be? I dont think they used precision timers, // so dont try to wait if the remainder is just a few msecs. if (remaining < 2) { return NULL; } mWriteSignalPointer->wait(*mLockPointer,remaining); } T* retVal = (T*)mQ.get(); return retVal; } /** Non-blocking read. aka pop_front. @return Pointer to object or NULL if FIFO is empty. */ T* readNoBlock() { ScopedLock lock(*mLockPointer); return (T*)mQ.get(); } /** Non-blocking write. aka push_back */ void write(T* val) { // (pat) The Mutex mLock must be released before signaling the mWriteSignal condition. // This is an implicit requirement of pthread_cond_wait() called from signal(). // If you do not do that, the InterthreadQueue read() function cannot start // because the mutex is still locked by the thread calling the write(), // so the read() thread yields its immediate execution opportunity. // This recurs (and the InterthreadQueue fills up with data) // until the read thread's accumulated temporary priority causes it to // get a second pre-emptive activation over the writing thread, // resulting in bursts of activity by the read thread. { ScopedLock lock(*mLockPointer); mQ.put(val); } mWriteSignalPointer->signal(); } /** Non-block write to the front of the queue. aka push_front */ void write_front(T* val) // pat added { // (pat) See comments above. { ScopedLock lock(*mLockPointer); mQ.push_front(val); } mWriteSignalPointer->signal(); } }; /** Pointer FIFO for interthread operations. */ // Pat thinks this should be combined with InterthreadQueue by simply moving the wait method there. template class InterthreadQueueWithWait { protected: PointerFIFO mQ; mutable Mutex mLock; mutable Signal mWriteSignal; mutable Signal mReadSignal; virtual void freeElement(T* element) const { delete element; }; public: /** Delete contents. */ void clear() { ScopedLock lock(mLock); while (mQ.size()>0) freeElement((T*)mQ.get()); mReadSignal.signal(); } virtual ~InterthreadQueueWithWait() { clear(); } size_t size() const { ScopedLock lock(mLock); return mQ.size(); } /** Blocking read. @return Pointer to object (will not be NULL). */ T* read() { ScopedLock lock(mLock); T* retVal = (T*)mQ.get(); while (retVal==NULL) { mWriteSignal.wait(mLock); retVal = (T*)mQ.get(); } mReadSignal.signal(); return retVal; } /** Blocking read with a timeout. @param timeout The read timeout in ms. @return Pointer to object or NULL on timeout. */ T* read(unsigned timeout) { if (timeout==0) return readNoBlock(); Timeval waitTime(timeout); ScopedLock lock(mLock); // (pat 8-2013) This commented out code has a deadlock problem. //while ((mQ.size()==0) && (!waitTime.passed())) // mWriteSignal.wait(mLock,waitTime.remaining()); while (mQ.size()==0) { long remaining = waitTime.remaining(); // (pat) How high do we expect the precision here to be? I dont think they are used as precision timers, // so dont try to wait if the remainder is just a few msecs. if (remaining < 2) { return NULL; } mWriteSignal.wait(mLock,remaining); } T* retVal = (T*)mQ.get(); if (retVal!=NULL) mReadSignal.signal(); return retVal; } /** Non-blocking read. @return Pointer to object or NULL if FIFO is empty. */ T* readNoBlock() { ScopedLock lock(mLock); T* retVal = (T*)mQ.get(); if (retVal!=NULL) mReadSignal.signal(); return retVal; } /** Non-blocking write. */ void write(T* val) { { ScopedLock lock(mLock); mQ.put(val); } mWriteSignal.signal(); } /** Wait until the queue falls below a low water mark. */ // (pat) This function suffers from the same problem as documented // at InterthreadQueue.write(), but I am not fixing it because I cannot test it. // The caller of this function will eventually get to run, just not immediately // after the mReadSignal condition is fulfilled. void wait(size_t sz=0) { ScopedLock lock(mLock); while (mQ.size()>sz) mReadSignal.wait(mLock); } }; // (pat) Same as an InterthreadMap but the mapped type is "D" instead of "D*"; // the only difference is that we cannot automatically delete the content on destruction (no clear method). template class InterthreadMap1 { public: typedef std::map Map; protected: Map mMap; mutable Mutex mLock; Signal mWriteSignal; public: // User can over-ride this method if they want to delete type D elements. virtual void vdelete(D) {} ~InterthreadMap1() { ScopedLock lock(mLock); typename Map::iterator iter = mMap.begin(); while (iter != mMap.end()) { vdelete(iter->second); ++iter; } mMap.clear(); } /** Non-blocking write. WARNING: This deletes any pre-existing element! @param key The index to write to. @param wData Pointer to data, not to be deleted until removed from the map. */ void write(const K &key, D wData) { ScopedLock lock(mLock); typename Map::iterator iter = mMap.find(key); if (iter!=mMap.end()) { vdelete(iter->second); iter->second = wData; } else { mMap[key] = wData; } mWriteSignal.broadcast(); } /** Identical to readNoBlock but with element removal. @param key Key to read from. @return Pointer at key or NULL if key not found, to be deleted by caller. */ bool getNoBlock(const K& key, D &result, bool bRemove = true) { ScopedLock lock(mLock); typename Map::iterator iter = mMap.find(key); if (iter==mMap.end()) return false; result = iter->second; if (bRemove) { mMap.erase(iter); } return true; } /** Blocking read with a timeout and element removal. @param key The key to read from. @param timeout The blocking timeout in ms. @return Pointer at key or NULL on timeout, to be deleted by caller. */ bool get(const K &key, D &result, unsigned timeout, bool bRemove = true) { if (timeout==0) return getNoBlock(key,result,bRemove); ScopedLock lock(mLock); Timeval waitTime(timeout); while (1) { typename Map::iterator iter = mMap.find(key); if (iter!=mMap.end()) { result = iter->second; if (bRemove) { mMap.erase(iter); } return true; } long remaining = waitTime.remaining(); if (remaining < 2) { return false; } mWriteSignal.wait(mLock,remaining); } } /** Blocking read with and element removal. @param key The key to read from. @return Pointer at key, to be deleted by caller. This always returns true. */ bool get(const K &key, D &result, bool bRemove = true) { ScopedLock lock(mLock); typename Map::iterator iter = mMap.find(key); while (iter==mMap.end()) { mWriteSignal.wait(mLock); iter = mMap.find(key); } result = iter->second; if (bRemove) { mMap.erase(iter); } return true; } /** Remove an entry and delete it. @param key The key of the entry to delete. @return True if it was actually found and deleted. (pat) If you just want remove without deleting, see getNoBlock. */ bool remove(const K &key ) { D val; if (getNoBlock(key,val,true)) { vdelete(val); return true; } else { return false; } } /** Non-blocking read. (pat) Actually, it blocks until the map is available. @param key Key to read from. @return Pointer at key or NULL if key not found. */ D readNoBlock(const K& key) const { D result = NULL; Unconst(this)->getNoBlock(key,result,false); return result; } /** Blocking read with a timeout. @param key The key to read from. @param timeout The blocking timeout in ms. @return Pointer at key or NULL on timeout. */ D read(const K &key, unsigned timeout) const { D result = NULL; Unconst(this)->get(key,result,timeout,false); return result; } /** Blocking read. Blocks until the key exists. @param key The key to read from. @return Pointer at key. */ D read(const K &key) const { D result; Unconst(this)->get(key,result,false); return result; } // pat added. unsigned size() const { ScopedLock(mLock); return mMap.size(); } // WARNING: These iterators are not intrinsically thread safe. // Caller must use ScopedIterator or the modification lock or enclose the entire iteration in some higher level lock. Mutex &qGetLock() { return mLock; } typedef typename Map::iterator iterator; typename Map::iterator begin() { return mMap.begin(); } typename Map::iterator end() { return mMap.end(); } }; // (pat) The original InterthreadMap works only for pointers; now it is derived from the more general version above. /** Thread-safe map of pointers to class D, keyed by class K. */ template class InterthreadMap : public InterthreadMap1 { void vdelete(D* foo) { delete foo; } }; /** This class is used to provide pointer-based comparison in priority_queues. */ // The priority_queue sorts the largest element to the 'top' of the priority_queue, which is the back of the underlying vector. template class PointerCompare { public: /** Compare the objects pointed to, not the pointers themselves. */ bool operator()(const T *v1, const T *v2) { return (*v1)>(*v2); } }; /** Priority queue for interthread operations. Passes pointers to objects. */ template , class Cmp = PointerCompare > class InterthreadPriorityQueue { protected: std::priority_queue mQ; mutable Mutex mLock; mutable Signal mWriteSignal; // Assumes caller holds the lock. T*ipqGet() { if (!mQ.size()) { return NULL; } T*result = mQ.top(); mQ.pop(); return result; } public: /** Clear the FIFO. */ void clear() { ScopedLock lock(mLock); while (mQ.size()>0) { T* ptr = mQ.top(); mQ.pop(); delete ptr; } } ~InterthreadPriorityQueue() { clear(); } size_t size() const { ScopedLock lock(mLock); return mQ.size(); } /** Non-blocking read. */ T* readNoBlock() { ScopedLock lock(mLock); return ipqGet(); } /** Blocking read. */ T* read() { ScopedLock lock(mLock); while (mQ.size()==0) mWriteSignal.wait(mLock); return ipqGet(); } T* read(unsigned timeout) { if (timeout==0) return readNoBlock(); Timeval waitTime(timeout); ScopedLock lock(mLock); while (mQ.size()==0) { long remaining = waitTime.remaining(); // (pat) How high do we expect the precision here to be? I dont think they used precision timers, // so dont try to wait if the remainder is just a few msecs. if (remaining < 2) { return NULL; } mWriteSignal.wait(mLock,remaining); } return ipqGet(); } // pat added 4-2014. Return but do not pop the top element, if any, or NULL. T* peek() { ScopedLock lock(mLock); return mQ.size() ? mQ.top() : NULL; } /** Non-blocking write. */ void write(T* val) { { ScopedLock lock(mLock); mQ.push(val); } mWriteSignal.signal(); } }; class Semaphore { private: bool mFlag; Signal mSignal; mutable Mutex mLock; public: Semaphore() :mFlag(false) { } void post() { ScopedLock lock(mLock); mFlag=true; mSignal.signal(); } void get() { ScopedLock lock(mLock); while (!mFlag) mSignal.wait(mLock); mFlag=false; } bool semtry() { ScopedLock lock(mLock); bool retVal = mFlag; mFlag = false; return retVal; } }; //@} #endif // vim: ts=4 sw=4