μHAL (v2.7.9)
Part of the IPbus software repository
ProtocolUDP.cpp
Go to the documentation of this file.
1 /*
2 ---------------------------------------------------------------------------
3 
4  This file is part of uHAL.
5 
6  uHAL is a hardware access library and programming framework
7  originally developed for upgrades of the Level-1 trigger of the CMS
8  experiment at CERN.
9 
10  uHAL is free software: you can redistribute it and/or modify
11  it under the terms of the GNU General Public License as published by
12  the Free Software Foundation, either version 3 of the License, or
13  (at your option) any later version.
14 
15  uHAL is distributed in the hope that it will be useful,
16  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  GNU General Public License for more details.
19 
20  You should have received a copy of the GNU General Public License
21  along with uHAL. If not, see <http://www.gnu.org/licenses/>.
22 
23 
24  Andrew Rose, Imperial College, London
25  email: awr01 <AT> imperial.ac.uk
26 
27  Marc Magrans de Abril, CERN
28  email: marc.magrans.de.abril <AT> cern.ch
29 
30 ---------------------------------------------------------------------------
31 */
32 
33 #include "uhal/ProtocolUDP.hpp"
34 
35 
36 #include <exception>
37 #include <utility>
38 
39 #include <boost/bind/bind.hpp>
40 #include <boost/lambda/lambda.hpp>
41 #include <boost/asio/connect.hpp>
42 #include <boost/asio/write.hpp>
43 #include <boost/asio/read.hpp>
44 #include <boost/asio/placeholders.hpp>
45 #include "boost/date_time/posix_time/posix_time.hpp"
46 
47 #include "uhal/log/LogLevels.hpp"
51 #include "uhal/log/log.hpp"
52 #include "uhal/grammars/URI.hpp"
53 #include "uhal/Buffers.hpp"
54 #include "uhal/ProtocolIPbus.hpp"
55 
56 
57 namespace uhal
58 {
59  template < typename InnerProtocol >
60  UDP< InnerProtocol >::UDP ( const std::string& aId, const URI& aUri ) :
61  InnerProtocol ( aId , aUri ),
62  mIOservice ( ),
63  mSocket ( mIOservice , boost::asio::ip::udp::endpoint ( boost::asio::ip::udp::v4(), 0 ) ),
64  mEndpoint ( *boost::asio::ip::udp::resolver ( mIOservice ).resolve ( boost::asio::ip::udp::resolver::query ( boost::asio::ip::udp::v4() , aUri.mHostname , aUri.mPort ) ) ),
65  mDeadlineTimer ( mIOservice ),
66  mReplyMemory ( 1500 , 0x00000000 ),
67  mIOserviceWork ( mIOservice ),
68  mDispatchThread ( boost::bind ( &boost::asio::io_service::run , & ( mIOservice ) ) ),
69  mDispatchQueue(),
70  mReplyQueue(),
71  mPacketsInFlight ( 0 ),
72  mFlushDone ( true ),
73  mAsynchronousException ( NULL )
74  {
75  mDeadlineTimer.async_wait ( boost::bind ( &UDP::CheckDeadline, this ) );
76  }
77 
78 
79  template < typename InnerProtocol >
81  {
82  try
83  {
84  mSocket.close();
85 
86  while ( mSocket.is_open() )
87  {}
88 
89  mIOservice.stop();
90  mDispatchThread.join();
91  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
92  ClientInterface::returnBufferToPool ( mDispatchQueue );
94  }
95  catch ( const std::exception& aExc )
96  {
97  log ( Error() , "Exception " , Quote ( aExc.what() ) , " caught in " , Type<UDP< InnerProtocol > >(), " destructor" );
98  }
99  }
100 
101 
102 
103  template < typename InnerProtocol >
105  {
106  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
107 
108  if ( mAsynchronousException )
109  {
110  log ( *mAsynchronousException , "Rethrowing Asynchronous Exception from 'implementDispatch' method of " , Type<UDP< InnerProtocol > >() );
111  mAsynchronousException->throwAsDerivedType();
112  }
113 
114  if ( ! mSocket.is_open() )
115  {
116  connect();
117  }
118 
119 
120  if ( mDispatchBuffers || mPacketsInFlight == this->getMaxNumberOfBuffers() )
121  {
122  mDispatchQueue.push_back ( aBuffers );
123  }
124  else
125  {
126  mDispatchBuffers = aBuffers;
127  write ( );
128  }
129  }
130 
131 
132  template < typename InnerProtocol >
134  {
135  return (350 * 4);
136  }
137 
138 
139  template < typename InnerProtocol >
141  {
142  return (350 * 4);
143  }
144 
145 
146  template < typename InnerProtocol >
148  {
149  log ( Info() , "Creating new UDP socket for device " , Quote ( this->uri() ) , ", as it appears to have been closed..." );
150  //mSocket = boost::asio::ip::udp::socket ( mIOservice , boost::asio::ip::udp::endpoint ( boost::asio::ip::udp::v4(), 0 ) );
151  mSocket.open ( boost::asio::ip::udp::v4() );
152  // boost::asio::socket_base::non_blocking_io lNonBlocking ( true );
153  // mSocket.io_control ( lNonBlocking );
154  log ( Info() , "UDP socket created successfully." );
155  }
156 
157 
158  template < typename InnerProtocol >
160  {
161  NotifyConditionalVariable ( false );
162 
163  if ( !mDispatchBuffers )
164  {
165  log ( Error() , __PRETTY_FUNCTION__ , " called when 'mDispatchBuffers' was NULL" );
166  return;
167  }
168 
169  std::vector< boost::asio::const_buffer > lAsioSendBuffer;
170  lAsioSendBuffer.push_back ( boost::asio::const_buffer ( mDispatchBuffers->getSendBuffer() , mDispatchBuffers->sendCounter() ) );
171  log ( Debug() , "Sending " , Integer ( mDispatchBuffers->sendCounter() ) , " bytes" );
172  mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
173 
174  // Patch for suspected bug in using boost asio with boost python; see https://svnweb.cern.ch/trac/cactus/ticket/323#comment:7
175  while ( mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
176  {
177  log ( Debug() , "Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ", mDeadlineTimer.expires_from_now() , ")." );
178  mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
179  }
180 
181  mSocket.async_send_to ( lAsioSendBuffer , mEndpoint , boost::bind ( &UDP< InnerProtocol >::write_callback, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) );
182  mPacketsInFlight++;
183  }
184 
185 
186  template < typename InnerProtocol >
187  void UDP< InnerProtocol >::write_callback ( const boost::system::error_code& aErrorCode , std::size_t aBytesTransferred )
188  {
189  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
190 
191  if ( mAsynchronousException )
192  {
193  NotifyConditionalVariable ( true );
194  return;
195  }
196 
197  if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
198  {
199  exception::UdpTimeout* lExc = new exception::UdpTimeout();
200  log ( *lExc , "Timeout (" , Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) , " milliseconds) occurred for UDP send to target with URI: ", this->uri() );
201 
202  if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
203  {
204  log ( *lExc , "ASIO reported an error: " , Quote ( aErrorCode.message() ) );
205  }
206 
207  mAsynchronousException = lExc;
208  NotifyConditionalVariable ( true );
209  return;
210  }
211 
212  if ( ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) ) || ( aBytesTransferred != mDispatchBuffers->sendCounter() ) )
213  {
214  mSocket.close();
215  exception::ASIOUdpError* lExc = new exception::ASIOUdpError();
216  if ( aErrorCode )
217  {
218  log ( *lExc , "Error ", Quote ( aErrorCode.message() ) , " encountered during send to UDP target with URI: " , this->uri() );
219  }
220  if ( aBytesTransferred != mDispatchBuffers->sendCounter() )
221  {
222  log ( *lExc , "Only ", Integer ( aBytesTransferred ) , " of " , Integer ( mDispatchBuffers->sendCounter() ) , " bytes transferred in UDP send to URI: " , this->uri() );
223  }
224  mAsynchronousException = lExc;
225  NotifyConditionalVariable ( true );
226  return;
227  }
228 
229  if ( mReplyBuffers )
230  {
231  mReplyQueue.push_back ( mDispatchBuffers );
232  }
233  else
234  {
235  mReplyBuffers = mDispatchBuffers;
236  read ( );
237  }
238 
239  if ( mDispatchQueue.size() && mPacketsInFlight != this->getMaxNumberOfBuffers() )
240  {
241  mDispatchBuffers = mDispatchQueue.front();
242  mDispatchQueue.pop_front();
243  write();
244  }
245  else
246  {
247  mDispatchBuffers.reset();
248  }
249  }
250 
251 
252  template < typename InnerProtocol >
254  {
255  if ( !mReplyBuffers )
256  {
257  log ( Error() , __PRETTY_FUNCTION__ , " called when 'mReplyBuffers' was NULL" );
258  NotifyConditionalVariable ( true );
259  return;
260  }
261 
262  std::vector<boost::asio::mutable_buffer> lAsioReplyBuffer ( 1 , boost::asio::mutable_buffer ( & ( mReplyMemory.at ( 0 ) ) , mReplyBuffers->replyCounter() ) );
263  log ( Debug() , "Expecting " , Integer ( mReplyBuffers->replyCounter() ) , " bytes in reply." );
264  mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
265 
266  // Patch for suspected bug in using boost asio with boost python; see https://svnweb.cern.ch/trac/cactus/ticket/323#comment:7
267  while ( mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
268  {
269  log ( Debug() , "Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ", mDeadlineTimer.expires_from_now() , ")." );
270  mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
271  }
272 
273  mSocket.async_receive ( lAsioReplyBuffer , 0 , boost::bind ( &UDP<InnerProtocol>::read_callback, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) );
274  }
275 
276 
277  template < typename InnerProtocol >
278  void UDP< InnerProtocol >::read_callback ( const boost::system::error_code& aErrorCode , std::size_t aBytesTransferred )
279  {
280  {
281  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
282  if ( mAsynchronousException )
283  {
284  NotifyConditionalVariable ( true );
285  return;
286  }
287 
288  if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
289  {
290  mAsynchronousException = new exception::UdpTimeout();
291  log ( *mAsynchronousException , "Timeout (" , Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) , " milliseconds) occurred for UDP receive from target with URI: ", this->uri() );
292 
293  if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
294  {
295  log ( *mAsynchronousException , "ASIO reported an error: " , Quote ( aErrorCode.message() ) );
296  }
297 
298  NotifyConditionalVariable ( true );
299  return;
300  }
301  }
302 
303  if ( !mReplyBuffers )
304  {
305  log ( Error() , __PRETTY_FUNCTION__ , " called when 'mReplyBuffers' was NULL" );
306  return;
307  }
308 
309  if ( aBytesTransferred != mReplyBuffers->replyCounter() )
310  {
311  log ( Error() , "Expected " , Integer ( mReplyBuffers->replyCounter() ) , "-byte UDP payload from target " , Quote ( this->uri() ) , ", but only received " , Integer ( aBytesTransferred ) , " bytes. Validating returned data to work out where error occurred." );
312  }
313 
314  if ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) )
315  {
316  mSocket.close();
317 
318  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
319  mAsynchronousException = new exception::ASIOUdpError();
320  log ( *mAsynchronousException , "Error ", Quote ( aErrorCode.message() ) , " encountered during receive from UDP target with URI: " , this->uri() );
321 
322  NotifyConditionalVariable ( true );
323  return;
324  }
325 
326 
327  std::deque< std::pair< uint8_t* , uint32_t > >& lReplyBuffers ( mReplyBuffers->getReplyBuffer() );
328  uint8_t* lReplyBuf ( & ( mReplyMemory.at ( 0 ) ) );
329 
330  for ( std::deque< std::pair< uint8_t* , uint32_t > >::iterator lIt = lReplyBuffers.begin() ; lIt != lReplyBuffers.end() ; ++lIt )
331  {
332  // Don't copy more of mReplyMemory than was written to, for cases when less data received than expected
333  if ( static_cast<uint32_t> ( lReplyBuf - ( & mReplyMemory.at ( 0 ) ) ) >= aBytesTransferred )
334  {
335  break;
336  }
337 
338  uint32_t lNrBytesToCopy = std::min ( lIt->second , static_cast<uint32_t> ( aBytesTransferred - ( lReplyBuf - ( & mReplyMemory.at ( 0 ) ) ) ) );
339  memcpy ( lIt->first, lReplyBuf, lNrBytesToCopy );
340  lReplyBuf += lNrBytesToCopy;
341  }
342 
343  try
344  {
345  if ( uhal::exception::exception* lExc = ClientInterface::validate ( mReplyBuffers ) ) //Control of the pointer has been passed back to the client interface
346  {
347  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
348  mAsynchronousException = lExc;
349  }
350  }
351  catch ( exception::exception& aExc )
352  {
353  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
354  mAsynchronousException = new exception::ValidationError ();
355  log ( *mAsynchronousException , "Exception caught during reply validation for UDP device with URI " , Quote ( this->uri() ) , "; what returned: " , Quote ( aExc.what() ) );
356  }
357 
358  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
359 
360  if ( mAsynchronousException )
361  {
362  NotifyConditionalVariable ( true );
363  return;
364  }
365 
366  if ( mReplyQueue.size() )
367  {
368  mReplyBuffers = mReplyQueue.front();
369  mReplyQueue.pop_front();
370  read();
371  }
372  else
373  {
374  mReplyBuffers.reset();
375  }
376 
377  mPacketsInFlight--;
378 
379  if ( !mDispatchBuffers && mDispatchQueue.size() && mPacketsInFlight != this->getMaxNumberOfBuffers() )
380  {
381  mDispatchBuffers = mDispatchQueue.front();
382  mDispatchQueue.pop_front();
383  write();
384  }
385 
386  if ( !mDispatchBuffers && !mReplyBuffers )
387  {
388  mDeadlineTimer.expires_from_now( boost::posix_time::seconds(60) );
389  NotifyConditionalVariable ( true );
390  }
391  }
392 
393 
394 
395  template < typename InnerProtocol >
397  {
398  // Check whether the deadline has passed. We compare the deadline against
399  // the current time since a new asynchronous operation may have moved the
400  // deadline before this actor had a chance to run.
401  boost::lock_guard<boost::mutex> lLock ( this->mTransportLayerMutex );
402 
403  if ( mDeadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now() )
404  {
405  // SETTING THE EXCEPTION HERE CAN APPEAR AS A TIMEOUT WHEN NONE ACTUALLY EXISTS
406  if ( mDispatchBuffers || mReplyBuffers )
407  {
408  log ( Warning() , "Closing UDP socket for URI " , Quote ( this->uri() ) , " since deadline has passed" );
409  }
410  else
411  {
412  log ( Debug() , "Closing UDP socket for URI " , Quote ( this->uri() ) , " since no communication in last 60 seconds" );
413  }
414 
415  // The deadline has passed. The socket is closed so that any outstanding
416  // asynchronous operations are cancelled.
417  mSocket.close();
418  // There is no longer an active deadline. The expiry is set to positive
419  // infinity so that the actor takes no action until a new deadline is set.
420  mDeadlineTimer.expires_at ( boost::posix_time::pos_infin );
421  }
422 
423  // Put the actor back to sleep.
424  mDeadlineTimer.async_wait ( boost::bind ( &UDP::CheckDeadline, this ) );
425  }
426 
427 
428  template < typename InnerProtocol >
430  {
431  WaitOnConditionalVariable();
432 
433  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
434  if ( mAsynchronousException )
435  {
436  mAsynchronousException->throwAsDerivedType();
437  }
438  }
439 
440 
441 
442  template < typename InnerProtocol >
444  {
445  log ( Warning() , "Closing Socket since exception detected." );
446 
447  if ( mSocket.is_open() )
448  {
449  try
450  {
451  mSocket.close();
452 
453  while ( mSocket.is_open() )
454  {}
455  }
456  catch ( ... )
457  {}
458  }
459 
460  NotifyConditionalVariable ( true );
461 
462  if ( mAsynchronousException )
463  {
464  delete mAsynchronousException;
465  mAsynchronousException = NULL;
466  }
467 
468  ClientInterface::returnBufferToPool ( mDispatchQueue );
469  ClientInterface::returnBufferToPool ( mReplyQueue );
470  mPacketsInFlight = 0;
471 
472  ClientInterface::returnBufferToPool ( mDispatchBuffers );
473  mDispatchBuffers.reset();
474  ClientInterface::returnBufferToPool ( mReplyBuffers );
475  mReplyBuffers.reset();
476 
477  InnerProtocol::dispatchExceptionHandler();
478  }
479 
480 
481  template < typename InnerProtocol >
483  {
484  {
485  boost::lock_guard<boost::mutex> lLock ( mConditionalVariableMutex );
486  mFlushDone = aValue;
487  }
488  mConditionalVariable.notify_one();
489  }
490 
491 
492  template < typename InnerProtocol >
494  {
495  boost::unique_lock<boost::mutex> lLock ( mConditionalVariableMutex );
496 
497  while ( !mFlushDone )
498  {
499  mConditionalVariable.wait ( lLock );
500  }
501  }
502 
503 
504  template class UDP< IPbus< 1 , 3 > >;
505  template class UDP< IPbus< 2 , 0 > >;
506 }
507 
uhal::Type
_Type< T > Type()
Definition: log_inserters.type.hxx:60
uhal::exception::exception::what
virtual const char * what() const
Function which returns the error message associated with an exception If no error message has previou...
Definition: exception.cpp:87
boost::shared_ptr
Definition: DerivedNodeFactory.hpp:52
uhal::UDP::getMaxReplySize
uint32_t getMaxReplySize()
Return the maximum size of reply packet based on the buffer size in the target.
Definition: ProtocolUDP.cpp:140
uhal::UDP::mDeadlineTimer
boost::asio::deadline_timer mDeadlineTimer
The mechanism for providing the time-out.
Definition: ProtocolUDP.hpp:194
boost
Definition: log.hpp:13
uhal::UDP::NotifyConditionalVariable
void NotifyConditionalVariable(const bool &aValue)
Function to set the value of a variable associated with a BOOST conditional-variable and then notify ...
Definition: ProtocolUDP.cpp:482
uhal::UDP::connect
void connect()
Set up the UDP socket.
Definition: ProtocolUDP.cpp:147
uhal::min
@ min
minutes past the hour formatted as two digits e.g.
Definition: log_inserters.time.hpp:63
uhal::exception::exception
An abstract base exception class, including an interface to throw as the derived type (for passing ex...
Definition: exception.hpp:71
uhal
Definition: HttpResponseGrammar.hpp:49
uhal::UDP::~UDP
virtual ~UDP()
Destructor.
Definition: ProtocolUDP.cpp:80
log_inserters.quote.hpp
uhal::UDP::read_callback
void read_callback(const boost::system::error_code &aErrorCode, std::size_t aBytesTransferred)
Callback function which is called upon completion of the ASIO async receive This, then,...
Definition: ProtocolUDP.cpp:278
uhal::Info
InfoLevel Info
Definition: LogLevels.cpp:115
uhal::log
void log(FatalLevel &aFatal, const T0 &aArg0)
Function to add a log entry at Fatal level.
Definition: log.hxx:20
log_inserters.integer.hpp
uhal::Integer
_Integer< T, IntFmt<> > Integer(const T &aT)
Forward declare a function which creates an instance of the ultra-lightweight wrapper from an integer...
Definition: log_inserters.integer.hxx:43
uhal::UDP::CheckDeadline
void CheckDeadline()
Function called by the ASIO deadline timer.
Definition: ProtocolUDP.cpp:396
uhal::UDP
Transport protocol to transfer an IPbus buffer via UDP.
Definition: ProtocolUDP.hpp:85
uhal::Warning
WarningLevel Warning
Definition: LogLevels.cpp:79
uhal::UDP::implementDispatch
void implementDispatch(boost::shared_ptr< Buffers > aBuffers)
Send the IPbus buffer to the target, read back the response and call the packing-protocol's validate ...
Definition: ProtocolUDP.cpp:104
ProtocolIPbus.hpp
uhal::UDP::read
void read()
Initialize performing the next UDP read operation In multi-threaded mode, this runs the ASIO async re...
Definition: ProtocolUDP.cpp:253
uhal::Error
ErrorLevel Error
Definition: LogLevels.cpp:61
uhal::UDP::Flush
virtual void Flush()
Concrete implementation of the synchronization function to block until all buffers have been sent,...
Definition: ProtocolUDP.cpp:429
uhal::UDP::write
void write()
Initialize performing the next UDP write operation In multi-threaded mode, this runs the ASIO async s...
Definition: ProtocolUDP.cpp:159
uhal::UDP::UDP
UDP(const UDP &aUDP)
Copy Constructor This creates a new socket, dispatch queue, dispatch thread, etc.
uhal::Quote
_Quote< T > Quote(const T &aT)
Definition: log_inserters.quote.hxx:49
uhal::tests::uri
std::string uri
Definition: test_single.cpp:89
uhal::UDP::WaitOnConditionalVariable
void WaitOnConditionalVariable()
Function to block a thread pending a BOOST conditional-variable and its associated regular variable.
Definition: ProtocolUDP.cpp:493
uhal::Debug
DebugLevel Debug
Definition: LogLevels.cpp:133
uhal::UDP::dispatchExceptionHandler
virtual void dispatchExceptionHandler()
Function which tidies up this protocol layer in the event of an exception.
Definition: ProtocolUDP.cpp:443
uhal::UDP::write_callback
void write_callback(const boost::system::error_code &aErrorCode, std::size_t aBytesTransferred)
Callback function which is called upon completion of the ASIO async send This, then,...
Definition: ProtocolUDP.cpp:187
log.hpp
uhal::ClientInterface::returnBufferToPool
void returnBufferToPool(boost::shared_ptr< Buffers > &aBuffers)
Function to return a buffer to the buffer pool.
Definition: ClientInterface.cpp:221
LogLevels.hpp
URI.hpp
log_inserters.type.hpp
uhal::tests::write
c write(addr, xx[0])
uhal::URI
Struct to store a URI when parsed by boost spirit.
Definition: URI.hpp:50
uhal::UDP::getMaxSendSize
uint32_t getMaxSendSize()
Return the maximum size to be sent based on the buffer size in the target.
Definition: ProtocolUDP.cpp:133
uhal::ClientInterface::validate
virtual exception::exception * validate(boost::shared_ptr< Buffers > aBuffers)
Function which dispatch calls when the reply is received to check that the headers are as expected.
Definition: ClientInterface.cpp:188
Buffers.hpp
ProtocolUDP.hpp