ONPOSIX  2.0
 All Classes Namespaces Files Functions Variables Enumerator Friends Macros Pages
PosixPrioritySharedQueue.hpp
Go to the documentation of this file.
1 #ifndef POSIXPRIORITYSHAREDQUEUE_HPP_
2 #define POSIXPRIORITYSHAREDQUEUE_HPP_
3 
4 #include <pthread.h>
5 #include <map>
6 #include <queue>
7 #include <assert.h>
8 #include "PosixMutex.hpp"
9 
10 namespace onposix {
11 
12 /**
13  * \brief Thread-safe FIFO priority queue class.
14  *
15  * No aging techniques are implemented, so that the low priority elements
16  * can starve. The class is noncopyable. \n
17  * The template paramaters are:
18  * <ul>
19  * <li> T is the type of the element in the queue
20  * <li> _Priority is the priority type (shall support the less operator)
21  * </ul>
22  */
23 template<typename T, typename _Priority = int>
25 
26  std::map< _Priority, std::queue<T> > queues_;
27  pthread_cond_t empty_;
28  mutable pthread_mutex_t mutex_;
29  size_t globalSize_;
30 
33 
34 public:
37 
38  void addQueue(const _Priority& prio);
39 
40  void push(const T& data, const _Priority& prio);
41 
42  T pop();
43 
44  void clear();
45 
46 
47  size_t size() const;
48 };
49 
50 /**
51  * \brief Constructor. Initialize the queue.
52  *
53  * @exception runtime_error if the initialization fails.
54  */
55 template<typename T, typename _Priority>
57  globalSize_(0)
58 {
59  if (pthread_mutex_init(&mutex_, NULL) != 0)
60  throw std::runtime_error(std::string("Mutex initialization: ") +
61  strerror(errno));
62  if (pthread_cond_init(&empty_, NULL) != 0)
63  throw std::runtime_error(
64  std::string("Condition variable initialization: ") +
65  strerror(errno));
66 }
67 
68 /**
69  * \brief Destructor. Clean up the resources.
70  */
71 template<typename T, typename _Priority>
73 {
74  VERIFY_ASSERTION(!pthread_mutex_destroy(&mutex_));
75  VERIFY_ASSERTION(!pthread_cond_destroy(&empty_));
76 }
77 
78 /**
79  * \brief Adds new priority.
80  *
81  * If the priority already exists does nothing.
82  * \param prio The priority to be added.
83  */
84 template<typename T, typename _Priority>
86 {
87  PthreadMutexLocker lock(mutex_);
88  if (queues_.find(prio) == queues_.end())
89  queues_.insert(std::make_pair<_Priority, std::queue<T> >(
90  prio, std::queue<T>()));
91 }
92 
93 /**
94  * \brief Insert an new element in the queue.
95  *
96  * @param data The element to be added.
97  * @param prio The priority of the element.
98  */
99 template<typename T, typename _Priority>
101  const _Priority& prio)
102 {
103  pthread_mutex_lock(&mutex_);
104  if (queues_.find(prio) != queues_.end()){
105  queues_[prio].push(data);
106  ++globalSize_;
107  pthread_mutex_unlock(&mutex_);
108  pthread_cond_signal(&empty_);
109  return;
110  }
111  pthread_mutex_unlock(&mutex_);
112 }
113 
114 /**
115  * \brief Extract an element from the queue.
116  *
117  * If the queue is empty the calling thread is blocked.
118  * @return The first of the highest priority element in the queue.
119  */
120 template<typename T, typename _Priority>
122 {
123  PthreadMutexLocker lock(mutex_);
124  while (!globalSize_)
125  pthread_cond_wait(&empty_, &mutex_);
126  typename std::map<_Priority, std::queue<T> >::iterator it = queues_.begin();
127  typename std::map<_Priority, std::queue<T> >::iterator itEnd =
128  queues_.end();
129  for(; it != itEnd; ++it){
130  if (!(it->second).empty()){
131  T data = (it->second).front();
132  (it->second).pop();
133  --globalSize_;
134  return data;
135  }
136  }
137  assert(false);
138  //Just to silent the compiler
139  T data;
140  return data;
141 }
142 
143 /**
144  * \brief Empties the queue.
145  *
146  * In order to efficiently accomplish its task,
147  * this function exchanges its content with an empty queue using the specialized
148  * version of swap() implemented for the STL container std::map.
149  */
150 template<typename T, typename _Priority>
152 {
153  PthreadMutexLocker lock(mutex_);
154  std::map<_Priority, std::queue<T> > empty;
155  std::swap(queues_, empty);
156  globalSize_ = 0;
157 }
158 
159 /**
160  * \brief The current size of the queue.
161  *
162  * @return The queue size.
163  */
164 template<typename T, typename _Priority>
166 {
167  PthreadMutexLocker lock(mutex_);
168  return globalSize_;
169 }
170 
171 } /* onposix */
172 
173 #endif /* POSIXPRIORITYSHAREDQUEUE_HPP_ */