ONPOSIX  2.0
 All Classes Namespaces Files Functions Variables Enumerator Friends Macros Pages
PosixDescriptor.hpp
Go to the documentation of this file.
1 /*
2  * Descriptor.hpp
3  *
4  * Copyright (C) 2012 Evidence Srl - www.evidence.eu.com
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #ifndef POSIXDESCRIPTOR_HPP_
22 #define POSIXDESCRIPTOR_HPP_
23 
24 #include <fcntl.h>
25 #include <stdlib.h>
26 #include <strings.h>
27 #include <unistd.h>
28 #include <sys/ioctl.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <sys/un.h>
32 #include <sys/socket.h>
33 #include <netinet/in.h>
34 #include <netinet/ip.h>
35 #include <string>
36 #include <stdexcept>
37 #include <iostream>
38 #include <queue>
39 
40 #include "Logger.hpp"
41 #include "Buffer.hpp"
42 #include "AbstractThread.hpp"
43 #include "PosixMutex.hpp"
44 #include "PosixCondition.hpp"
45 
46 // Uncomment to enable Linux-specific methods:
47 #define ONPOSIX_LINUX_SPECIFIC
48 
49 namespace onposix {
50 
51 /**
52  * \brief Abstraction of a POSIX descriptor.
53  *
54  * This is an abstract class for the concept of Posix descriptor.
55  * The descriptor can correspond to a file (class FileDescriptor)
56  * or to a socket (class StreamSocketServerDescriptor).
57  */
59 
60  /**
61  * \brief Single asynchronous operation
62  *
63  * This data structure contains information about a single pending
64  * asynchronous operation.
65  */
66  struct job {
67  /**
68  * \brief Type of scheduled async operation
69  */
70  enum {
71  NONE = 0, //< No operation scheduled
72  READ_BUFFER = 1, //< Read operation on a Buffer
73  READ_VOID = 2, //< Read operation on a void*
74  WRITE_BUFFER = 3, //< Write operation on a Buffer
75  WRITE_VOID = 4 //< Write operation on a void*
76  } job_type_;
77 
78  /// Size of data to be read/written
79  size_t size_;
80 
81  /**
82  * \brief Handler in case of read/write operation on a
83  * Buffer
84  */
85  void (*buff_handler_) (Buffer* b, size_t size);
86 
87  /**
88  * \brief Buffer in case of read/write operation on a Buffer
89  */
91 
92  /**
93  * \brief Handler in case of read/write operation on a void*
94  */
95  void (*void_handler_) (void* b, size_t size);
96 
97  /**
98  * \brief void* pointer in case of read/write operation on a
99  * void*
100  */
102  };
103 
104  /**
105  * \brief Class for synchronization between the main thread and the worker thread.
106  *
107  * This data structure is in charge of keeping a queue of pending asynchronous operations
108  * (shared between the main thread and the worker thread) and synchronize the two threads.
109  */
110  class shared_queue {
111 
112  /**
113  * \brief Queue of all pending operations.
114  */
115  std::queue<job*> queue_;
116 
117  /**
118  * \brief Signal the worker to not block anymore.
119  *
120  * This variable signals the worker thread that must flush all
121  * pending operations and not block on the condition variable
122  * because the descriptor is going to be closed.
123  */
125 
126  /**
127  * \brief Mutex to avoid contentions.
128  *
129  * This mutex protects accesses to queue_ and flush_and_close_
130  * when asynchronous operations are scheduled.
131  */
133 
134  /**
135  * \brief Condition for not empty queue.
136  *
137  * This condition signals the worker thread that there is
138  * work to be carried out (i.e., the queue is not empty).
139  */
141 
142  /**
143  * \brief Condition for empty queue.
144  *
145  * This condition signals the main thread that the queue is
146  * empty. Used by the worker thread to signal the main
147  * thread when the descriptor is going to be closed.
148  */
150 
151  public:
152  /// Constructor
154 
155  /**
156  * \brief Add an asynchronous operation
157  *
158  * @param j asynchronous operation
159  */
160  inline void push(struct job* j){
161  lock_.lock();
162  queue_.push(j);
164  lock_.unlock();
165  }
166 
167  /**
168  * \brief Signal the worker that there are new operations
169  *
170  * This method is used by the main thread to signal the worker
171  * that there are new operations in the queue.
172  */
173  inline void signal_not_empty(){
174  lock_.lock();
176  lock_.unlock();
177  }
178 
179  /**
180  * \brief Signal the main thread that the queue is empty
181  *
182  * This method is used by the worker to signal the main thread
183  * that the queue is empty and all pending operations have
184  * been carried out.
185  */
186  inline void signal_empty(){
187  lock_.lock();
189  lock_.unlock();
190  }
191 
192  /**
193  * \brief Wait until there are new operations
194  *
195  * This method is used by the worker to wait until there are
196  * new operations in the queue.
197  */
198  inline void wait_not_empty(){
199  lock_.lock();
201  lock_.unlock();
202  }
203 
204  /**
205  * \brief Wait until the queue is empty
206  *
207  * This method is used by the main thread to wait until the
208  * queue is empty to close the descriptor.
209  */
210  inline void wait_empty(){
211  lock_.lock();
213  lock_.unlock();
214  }
215 
216  /**
217  * \brief Signal that the descriptor is going to be close
218  *
219  * This method is used to let the main thread signal the worker
220  * that the descriptor is going to be closed, so it must flush
221  * all pending operations and not block on wait_not_empty()
222  * anymore.
223  */
224  inline void set_flush_and_close(){
225  lock_.lock();
226  flush_and_close_ = true;
227  lock_.unlock();
228  }
229 
230  /**
231  * \brief Pop the next operation from the queue.
232  *
233  * This method is used by the worker to pop the next
234  * operation from the queue.
235  * @param close Pointer to a boolean used as return variable to
236  * tell the worker if the descriptor is going to be closed.
237  * @return pointer to a job instance allocated in the heap; 0
238  * if the queue is empty.
239  */
240  inline job* pop (bool* close){
241  job* ret = 0;
242  lock_.lock();
243  if (!queue_.empty()){
244  ret = queue_.front();
245  queue_.pop();
246  }
247  if (queue_.empty()){
249  }
250  *close = flush_and_close_;
251  lock_.unlock();
252  return ret;
253  }
254  };
255 
256  /**
257  * \brief Worker thread to perform asynchronous operations.
258  *
259  * This class is used to run asynchronous operations (i.e.,
260  * read and write). These operations are run on a different thread.
261  */
262  class Worker: public AbstractThread {
263 
264  /// Disable the default constructor
265  Worker();
266 
267  /**
268  * \brief Method automatically called by start()
269  *
270  * This method is automatically called by start() which,
271  * in turn, is called by startAsyncOperation()
272  */
273  void run();
274 
275  /**
276  * \brief File descriptor
277  *
278  * This is a pointer to the same PosixDescriptor that "owns"
279  * this instance of Worker.
280  * The pointer is needed to perform the operation
281  * (i.e., read or write).
282  */
284 
285  /**
286  * \brief Pointer to the shared queue
287  *
288  * This variable points the shared_queue used for asynchronous
289  * operations and synchronization between the main thread
290  * and the worker thread.
291  */
293 
294  public:
295 
296  /**
297  * \brief Constructor.
298  *
299  * It just initializes the variables.
300  * @param q Pointer to the shared_queue for synchronization and
301  * pending jobs
302  * @param des Pointer to the PosixDescriptor that "owns"
303  * this worker
304  */
306  des_(des), queue_(q) {}
307 
309  }
310 
311  void startAsyncOperation (bool read_operation,
312  void (*handler) (Buffer* b, size_t size),
313  Buffer* buff, size_t size);
314 
315  void startAsyncOperation (bool read_operation,
316  void (*handler) (void* b, size_t size),
317  void* buff, size_t size);
318 
319  };
320 
321  /**
322  * \brief Pointer to the worker that performs asynchronous operations.
323  *
324  * The worker is allocated on the heap in the constructors
325  * (2 standard + 1 copy) and deallocated in the destructor.
326  */
328 
329  /**
330  * \brief Pointer to the shared_queue for synchronization with the
331  * worker thread
332  *
333  * This data structure is allocated on the heap in the constructors
334  * (2 standard + 1 copy) and deallocated in the destructor.
335  */
337 
338  /**
339  * \brief If the worker thread has been already started.
340  *
341  * This variable is modified in async_read() and async_write();
342  */
344 
345  /**
346  * \brief Private constructor used by derived classes
347  *
348  * It allocates queue_ and worker_.
349  * @param fd File descriptor number returned by open(), socket(),
350  * accept(), etc.
351  */
352  PosixDescriptor(int fd): queue_(0), worker_started_(false), fd_(fd) {
353  queue_ = new shared_queue;
354  worker_ = new Worker (queue_, this);
355  }
356 
357  friend class Pipe;
358  friend class AsyncThread;
359 
360 protected:
361  /**
362  * \brief Number of the file descriptor.
363  *
364  * This is the return value of open(), socket() or accept().
365  */
366  int fd_;
367 
368  int do_read (void* p, size_t size);
369  int do_write (const void* p, size_t size);
370 
371  /**
372  * \brief Constructor
373  *
374  * It allocates queue_ and worker_.
375  */
377  queue_ = new shared_queue;
378  worker_ = new Worker (queue_, this);
379  }
380 
381 public:
382  /**
383  * \brief Destructor.
384  *
385  * It closes the file descriptor and deallocates queue_ and
386  * worker_.
387  */
388  virtual ~PosixDescriptor() {
389  DEBUG("Destroying descriptor...");
390  DEBUG("Closing desciptor...");
391  close();
392  DEBUG("delete thread...");
393  delete(worker_);
394  delete(queue_);
395 
396  DEBUG("Descriptor succesfully destroyed. Let's move on!");
397  }
398 
399 
400  /**
401  * \brief Run asynchronous read operation
402  *
403  * This method schedules an asynchronous read operation.
404  * The operation is internally run on a different thread.
405  * @param handler Function to be run when the read operation has
406  * finished.
407  * This function will have two parameters: a pointer to the Buffer
408  * where data has been saved, and the number of bytes actually read.
409  * @param b Pointer to the Buffer to be provided to the handler
410  * function as argument
411  * @param size Number of bytes to be read
412  */
413  inline void async_read(void (*handler)(Buffer* b, size_t size),
414  Buffer* b,
415  size_t size){
416  DEBUG("async_read() called!");
417  if (!worker_started_){
418  worker_->start();
419  worker_started_ = true;
420  }
421  worker_->startAsyncOperation(true, handler, b, size);
422  }
423 
424  /**
425  * \brief Run asynchronous read operation
426  *
427  * This method schedules an asynchronous read operation.
428  * The operation is internally run on a different thread.
429  * @param handler Function to be run when the read operation has
430  * finished.
431  * This function will have two parameters: a void* where data
432  * has been saved, and the number of bytes actually read.
433  * @param b Pointer to be provided to the handler function as
434  * argument
435  * @param size Number of bytes to be read
436  */
437  inline void async_read(void (*handler)(void* b, size_t size),
438  void* b,
439  size_t size){
440  DEBUG("async_read() called!");
441  if (!worker_started_){
442  worker_->start();
443  worker_started_ = true;
444  }
445  worker_->startAsyncOperation(true, handler, b, size);
446  }
447 
448  /**
449  * \brief Run asynchronous write operation
450  *
451  * This method schedules an asynchronous write operation.
452  * The operation is internally run on a different thread.
453  * @param handler Function to be run when the write operation has
454  * finished.
455  * This function will have two parameters: a pointer to the Buffer
456  * where original data was stored, and the number of bytes actually
457  * written.
458  * @param b Pointer to the Buffer to be provided to the handler
459  * function as argument
460  * @param size Number of bytes to be written.
461  */
462  inline void async_write(void (*handler)(Buffer* b, size_t size),
463  Buffer* b,
464  size_t size){
465  if (!worker_started_){
466  worker_->start();
467  worker_started_ = true;
468  }
469  worker_->startAsyncOperation(false, handler, b, size);
470  }
471 
472  /**
473  * \brief Run asynchronous write operation
474  *
475  * This method schedules an asynchronous write operation.
476  * The operation is internally run on a different thread.
477  * @param handler Function to be run when the write operation has
478  * finished.
479  * This function will have two parameters: a void* where original
480  * data was stored, and the number of bytes actually written.
481  * @param b Pointer to be provided to the handler function as
482  * argument
483  * @param size Number of bytes to be written
484  */
485  inline void async_write(void (*handler)(void* b, size_t size),
486  void* b,
487  size_t size){
488  if (!worker_started_){
489  worker_->start();
490  worker_started_ = true;
491  }
492  worker_->startAsyncOperation(false, handler, b, size);
493  }
494 
495  int read (Buffer* b, size_t size);
496  int read (void* p, size_t size);
497  int write (Buffer* b, size_t size);
498  int write (const void* p, size_t size);
499  int write (const std::string& s);
500 
501  /**
502  * \brief Method to close the descriptor.
503  *
504  * Note: currently there is no method to re-open the descriptor.
505  * In case the worker thread has been started, it signals the worker
506  * that it must not block on wait anymore (through
507  * set_flush_and_close()); then it unblocks the worker (through
508  * signal_not_empty()).
509  */
510  inline virtual void close(){
511  if (worker_started_){
512  DEBUG("Flushing pending data...")
515  queue_->wait_empty();
517  }
518  ::close(fd_);
519  }
520 
521 
522 
523  /**
524  * \brief Method to get descriptor number.
525  *
526  * @return Descriptor number.
527  */
528  inline int getDescriptorNumber() const {
529  return fd_;
530  }
531 
532  /**
533  * \brief Copy constructor.
534  *
535  * The copy constructor is called to copy an existing object to
536  * another object that is being constructed.
537  * Examples:
538  * \code
539  * PosixDescriptor p1;
540  * PosixDescriptor p2 = p1;
541  * PosixDesscriptor p3 (p1);
542  * \endcode
543  * It allocates queue_ and worker_.
544  * @exception runtime_error if the ::dup() returns an error
545  */
547  fd_ = ::dup(src.fd_);
548  if (fd_ < 0) {
549  ERROR("Bad file descriptor");
550  throw std::runtime_error("PosixDescriptor: error in copy constructor");
551  }
552  DEBUG("Creating worker (stopped)");
553  queue_ = new shared_queue;
554  worker_ = new Worker (queue_, this);
555  }
556 
557  /**
558  * \brief Assignment operator.
559  *
560  * The assignment operator is called to copy an existing object to
561  * another object that is already existing as well.
562  * Examples:
563  * \code
564  * PosixDescriptor p1, p2;
565  * p2 = p1;
566  * \endcode
567  * @exception runtime_error if the ::dup() returns an error
568  */
570  if (::dup2(src.fd_, fd_) < 0) {
571  ERROR("Bad file descriptor");
572  throw std::runtime_error("PosixDescriptor: error in operator=");
573  }
574  return *this;
575  }
576 
577 #ifdef ONPOSIX_LINUX_SPECIFIC
578 
579  /**
580  * \brief Method to flush this specific descriptor
581  */
582  inline bool flush(){
583  if (syncfs(fd_) < 0)
584  return false;
585  else
586  return true;
587  }
588 
589  /**
590  * \brief Ioctl on the file descriptor
591  */
592  inline int ioctl(int request){
593  return ::ioctl(fd_, request);
594  }
595 
596  /**
597  * \brief Ioctl on the file descriptor
598  */
599  inline int ioctl(int request, void* argp){
600  return ::ioctl(fd_, request, argp);
601  }
602 #endif /* ONPOSIX_LINUX_SPECIFIC */
603 };
604 
605 
606 } /* onposix */
607 
608 #endif /* POSIXDESCRIPTOR_HPP_ */