ONPOSIX  2.0
 All Classes Namespaces Files Functions Variables Enumerator Friends Macros Pages
PosixDescriptor.cpp
Go to the documentation of this file.
1 /*
2  * Descriptor.cpp
3  *
4  * Copyright (C) 2012 Evidence Srl - www.evidence.eu.com
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include "PosixDescriptor.hpp"
22 
23 namespace onposix {
24 
25 /**
26  * \brief Function to start an asynchronous operation
27  *
28  * This method allows to start an asynchronous operation, either read or write.
29  * @param read_operation Specifies if it is a read (true) or write (false) operation
30  * @param handler Function to be run at the end of the operation.
31  * This function will have as arguments a Buffer* where data is stored and the
32  * number of bytes actually transferred.
33  * @param buff Buffer where data must be copied to (for a read operation) or where
34  * data is contained (for a write operation)
35  * @param size Amount of bytes to be transferred
36  */
38  void (*handler) (Buffer* b, size_t size),
39  Buffer* buff, size_t size)
40 {
41 
42  DEBUG("Async operation started with buffer*");
43  struct job* j = new job;
44  j->size_ = size;
45  j->buff_handler_ = handler;
46  j->buff_buffer_ = buff;
47  if (read_operation)
49  else
51 
52  queue_->push(j);
53 }
54 
55 
56 
57 /**
58  * \brief Function so start an asynchronous operation
59  *
60  * This method allows to start an asynchronous operation, either read or write.
61  * @param read_operation Specifies if it is a read (true) or write (false) operation
62  * @param handler Function to be run at the end of the operation.
63  * This function will have as arguments a void* where data is stored and the
64  * number of bytes actually transferred.
65  * @param buff void* where data must be copied to (for a read operation) or where
66  * data is contained (for a write operation)
67  * @param size Amount of bytes to be transferred
68  */
70  void (*handler) (void* b, size_t size),
71  void* buff, size_t size)
72 {
73 
74  DEBUG("Async operation started with void*");
75  struct job* j = new job;
76  j->size_ = size;
77  j->void_handler_ = handler;
78  j->void_buffer_ = buff;
79  if (read_operation)
81  else
83 
84  queue_->push(j);
85 }
86 
87 /**
88  * \brief Function run on the separate thread.
89  *
90  * This is the function automatically called by start() which, in turn, is called by
91  * startAsyncOperation().
92  * The function is run by the worker thread. It performs the read/write operation
93  * and invokes the handler.
94  * @exception It throws runtime_error in case no operation has been scheduled
95  */
97 {
98  DEBUG("Worker running");
99  for (;;) {
100  DEBUG("===================");
101  DEBUG("New cycle");
102  bool close;
103  job* j = queue_->pop(&close);
104  if (j != 0){
105  int n;
106  DEBUG("Found one item in queue");
107  DEBUG("Need to read " << j->size_ << " bytes");
108  DEBUG("File descriptor = " << des_->getDescriptorNumber());
109 
110  if (j->job_type_ == job::READ_BUFFER)
111  n = des_->do_read(j->buff_buffer_->getBuffer(), j->size_);
112  else if (j->job_type_ == job::READ_VOID)
113  n = des_->do_read(j->void_buffer_, j->size_);
114  else if (j->job_type_ == job::WRITE_BUFFER)
115  n = des_->do_write(j->buff_buffer_->getBuffer(), j->size_);
116  else if (j->job_type_ == job::WRITE_VOID)
117  n = des_->do_write(j->void_buffer_, j->size_);
118  else {
119  ERROR("Handler called without operation!");
120  throw std::runtime_error ("Async error");
121  }
122  DEBUG("Read " << n << " bytes");
123 
124  DEBUG("Calling handler");
125  if ((j->job_type_ == job::READ_BUFFER) || (j->job_type_ == job::WRITE_BUFFER))
126  j->buff_handler_(j->buff_buffer_, n);
127  else
128  j->void_handler_(j->void_buffer_, n);
129  delete j;
130  } else {
131  queue_->signal_empty();
132  if (!close) {
133  DEBUG("No data in queue");
135  } else {
136  DEBUG("Exiting!!");
137  pthread_exit(0);
138  }
139  }
140  }
141 }
142 
143 
144 /**
145  * \brief Low-level read
146  *
147  * This method is private because it is meant to be used through the other read()
148  * methods.
149  * Note: it can block the caller, because it continues reading until the given
150  * number of bytes have been read.
151  * @param buffer Pointer to the buffer where read bytes must be stored
152  * @param size Number of bytes to be read
153  * @exception runtime_error if the ::read() returns an error
154  * @return The number of actually read bytes or -1 in case of error
155  */
156 int PosixDescriptor::do_read (void* buffer, size_t size)
157 {
158  size_t remaining = size;
159  while (remaining > 0) {
160  ssize_t ret = ::read (fd_, ((char*)buffer)+(size-remaining),
161  remaining);
162  if (ret == 0)
163  // End of file reached
164  break;
165  else if (ret < 0) {
166  throw std::runtime_error ("Read error");
167  return -1;
168  }
169  remaining -= ret;
170  }
171  return (size-remaining);
172 }
173 
174 
175 /**
176  * \brief Method to read from the descriptor and fill a buffer.
177  *
178  * Note: this method may block current thread if data is not available.
179  * The buffer is filled with the read data.
180  * @param b Pointer to the buffer to be filled
181  * @param size Number of bytes that must be read
182  * @return -1 in case of error; the number of bytes read otherwise
183  */
184 int PosixDescriptor::read (Buffer* b, size_t size)
185 {
186  if (b->getSize() == 0 || size > b->getSize()) {
187  ERROR("Buffer size not enough!");
188  return -1;
189  }
190  int ret = do_read(b->getBuffer(), size);
191  return ret;
192 }
193 
194 /**
195  * \brief Method to read from the descriptor.
196  *
197  * Note: this method may block current thread if data is not available.
198  * The buffer is filled with the read data.
199  * @param p Pointer to the memory space to be filled
200  * @param size Number of bytes that must be read
201  * @return -1 in case of error; the number of bytes read otherwise
202  */
203 int PosixDescriptor::read (void* p, size_t size)
204 {
205  int ret = do_read(p, size);
206  return ret;
207 }
208 
209 
210 
211 
212 /**
213  * \brief Low-level write
214  *
215  * This method is private because it is meant to be used through the other
216  * write() methods.
217  * Note: it can block the caller, because it continues writing until the
218  * given number of bytes have been written.
219  * @param buffer Pointer to the buffer containing bytes to be written
220  * @param size Number of bytes to be written
221  * @exception runtime_error if the ::write() returns 0 or an error
222  * @return The number of actually written bytes or -1 in case of error
223  */
224 int PosixDescriptor::do_write (const void* buffer, size_t size)
225 {
226  size_t remaining = size;
227  while (remaining > 0) {
228  ssize_t ret = ::write (fd_,
229  ((char*)buffer)+(size-remaining), remaining);
230  if (ret == 0)
231  // Cannot write more
232  break;
233  else if (ret < 0) {
234  throw std::runtime_error ("Write error");
235  return -1;
236  }
237  remaining -= ret;
238  }
239  return (size-remaining);
240 }
241 
242 
243 /**
244  * \brief Method to write data in a buffer to the descriptor.
245  *
246  * Note: this method may block current thread if data cannot be written.
247  * @param b Pointer to the buffer to be filled
248  * @param size Number of bytes that must be written
249  * @return -1 in case of error; the number of bytes read otherwise
250  */
251 int PosixDescriptor::write (Buffer* b, size_t size)
252 {
253  if (b->getSize() == 0 || size > b->getSize()) {
254  ERROR("Buffer size not enough!");
255  return -1;
256  }
257  return do_write(reinterpret_cast<const void*> (b->getBuffer()),
258  size);
259 }
260 
261 /**
262  * \brief Method to write to the descriptor.
263  *
264  * Note: this method may block current thread if data cannot be written.
265  * @param p Pointer to the memory space containing data
266  * @param size Number of bytes that must be written
267  * @return -1 in case of error; the number of bytes read otherwise
268  */
269 int PosixDescriptor::write (const void* p, size_t size)
270 {
271  return do_write(p, size);
272 }
273 
274 
275 /**
276  * \brief Method to write a string to the descriptor.
277  *
278  * Note: this method may block current thread if data cannot be written.
279  * @param string to be written
280  * @return -1 in case of error; the number of bytes read otherwise
281  */
282 int PosixDescriptor::write (const std::string& s)
283 {
284  return do_write(reinterpret_cast<const void*> (s.c_str()), s.size());
285 }
286 
287 } /* onposix */