μHAL (v2.7.9)
Part of the IPbus software repository
ProtocolTCP.cpp
Go to the documentation of this file.
1 /*
2 ---------------------------------------------------------------------------
3 
4  This file is part of uHAL.
5 
6  uHAL is a hardware access library and programming framework
7  originally developed for upgrades of the Level-1 trigger of the CMS
8  experiment at CERN.
9 
10  uHAL is free software: you can redistribute it and/or modify
11  it under the terms of the GNU General Public License as published by
12  the Free Software Foundation, either version 3 of the License, or
13  (at your option) any later version.
14 
15  uHAL is distributed in the hope that it will be useful,
16  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  GNU General Public License for more details.
19 
20  You should have received a copy of the GNU General Public License
21  along with uHAL. If not, see <http://www.gnu.org/licenses/>.
22 
23 
24  Andrew Rose, Imperial College, London
25  email: awr01 <AT> imperial.ac.uk
26 
27  Marc Magrans de Abril, CERN
28  email: marc.magrans.de.abril <AT> cern.ch
29 
30 ---------------------------------------------------------------------------
31 */
32 
33 #include "uhal/ProtocolTCP.hpp"
34 
35 
36 #include <sys/time.h>
37 
38 #include <boost/bind/bind.hpp>
39 #include <boost/lambda/lambda.hpp>
40 #include <boost/asio/connect.hpp>
41 #include <boost/asio/write.hpp>
42 #include <boost/asio/read.hpp>
43 #include <boost/asio/placeholders.hpp>
44 #include <boost/date_time/posix_time/posix_time.hpp>
45 #include <boost/chrono/chrono_io.hpp>
46 
47 #include "uhal/Buffers.hpp"
48 #include "uhal/grammars/URI.hpp"
49 #include "uhal/IPbusInspector.hpp"
50 #include "uhal/log/LogLevels.hpp"
51 #include "uhal/log/log.hpp"
55 #include "uhal/ProtocolIPbus.hpp"
57 
58 
59 namespace uhal
60 {
61 
62  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
63  TCP< InnerProtocol, nr_buffers_per_send >::TCP ( const std::string& aId, const URI& aUri ) :
64  InnerProtocol ( aId , aUri ),
65  mIOservice ( ),
66  mSocket ( mIOservice ),
67  mEndpoint ( boost::asio::ip::tcp::resolver ( mIOservice ).resolve ( boost::asio::ip::tcp::resolver::query ( aUri.mHostname , aUri.mPort ) ) ),
68  mDeadlineTimer ( mIOservice ),
69  mIOserviceWork ( mIOservice ),
70  mDispatchThread ( boost::bind ( &boost::asio::io_service::run , & ( mIOservice ) ) ),
71  mDispatchQueue(),
72  mReplyQueue(),
73  mPacketsInFlight ( 0 ),
74  mFlushStarted ( false ),
75  mFlushDone ( true ),
76  mAsynchronousException ( NULL )
77  {
78  mDeadlineTimer.async_wait ( boost::bind ( &TCP::CheckDeadline, this ) );
79  }
80 
81 
82  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
84  {
85  try
86  {
87  mSocket.close();
88 
89  while ( mSocket.is_open() )
90  {}
91 
92  mIOservice.stop();
93  mDispatchThread.join();
94  ClientInterface::returnBufferToPool ( mDispatchQueue );
95  for (size_t i = 0; i < mReplyQueue.size(); i++)
96  ClientInterface::returnBufferToPool ( mReplyQueue.at(i).first );
97  ClientInterface::returnBufferToPool ( mDispatchBuffers );
98  ClientInterface::returnBufferToPool ( mReplyBuffers.first );
99  }
100  catch ( const std::exception& aExc )
101  {
102  log ( Error() , "Exception " , Quote ( aExc.what() ) , " caught in " , Type<TCP< InnerProtocol , nr_buffers_per_send > >(), " destructor" );
103  }
104  }
105 
106 
107  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
109  {
110  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
111 
112  if ( mAsynchronousException )
113  {
114  log ( *mAsynchronousException , "Rethrowing Asynchronous Exception from 'implementDispatch' method of " , Type<TCP< InnerProtocol , nr_buffers_per_send > >() );
115  mAsynchronousException->throwAsDerivedType();
116  }
117 
118  if ( ! mSocket.is_open() )
119  {
120  connect();
121  }
122 
123  mFlushStarted = false;
124  mDispatchQueue.push_back ( aBuffers );
125 
126  if ( mDispatchBuffers.empty() && ( mDispatchQueue.size() >= nr_buffers_per_send ) && ( mPacketsInFlight < this->getMaxNumberOfBuffers() ) )
127  {
128  write ( );
129  }
130  }
131 
132 
133  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
135  {
136  return (350 * 4);
137  }
138 
139 
140  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
142  {
143  return (350 * 4);
144  }
145 
146 
147  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
149  {
150  log ( Info() , "Attempting to create TCP connection to '" , mEndpoint->host_name() , "' port " , mEndpoint->service_name() , "." );
151  mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
152  boost::system::error_code lErrorCode;
153  boost::asio::connect ( mSocket , mEndpoint , lErrorCode );
154 
155  if ( lErrorCode )
156  {
157  mSocket.close();
158 
159  while ( mSocket.is_open() )
160  {}
161 
162  exception::TcpConnectionFailure lExc;
163  std::ostringstream oss;
164 
165  if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
166  {
167  oss << "Timeout (" << this->getBoostTimeoutPeriod().total_milliseconds() << " milliseconds) occurred when connecting to ";
168  }
169  else if ( lErrorCode == boost::asio::error::connection_refused )
170  {
171  oss << "Connection refused for ";
172  }
173  else
174  {
175  oss << "Error \"" << lErrorCode.message() << "\" encountered when connecting to ";
176  }
177 
178  log ( lExc , oss.str() , ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) ,
179  " at ", Quote ( this->mUri.mHostname + ":" + this->mUri.mPort ) , ". URI=" , Quote ( this->uri() ) );
180  throw lExc;
181  }
182 
183  mSocket.set_option ( boost::asio::ip::tcp::no_delay ( true ) );
184  // boost::asio::socket_base::non_blocking_io lNonBlocking ( true );
185  // mSocket.io_control ( lNonBlocking );
186  log ( Info() , "TCP connection succeeded" );
187  }
188 
189 
190 
191  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
193  {
194  NotifyConditionalVariable ( false );
195 
196  std::vector< boost::asio::const_buffer > lAsioSendBuffer;
197  lAsioSendBuffer.push_back ( boost::asio::const_buffer ( &mSendByteCounter , 4 ) );
198  mSendByteCounter = 0;
199  std::size_t lNrBuffersToSend = std::min ( mDispatchQueue.size(), nr_buffers_per_send );
200  mDispatchBuffers.reserve ( lNrBuffersToSend );
201 
202  for ( std::size_t i = 0; i < lNrBuffersToSend; i++ )
203  {
204  mDispatchBuffers.push_back ( mDispatchQueue.front() );
205  mDispatchQueue.pop_front();
206  const boost::shared_ptr<Buffers>& lBuffer = mDispatchBuffers.back();
207  mSendByteCounter += lBuffer->sendCounter();
208  lAsioSendBuffer.push_back ( boost::asio::const_buffer ( lBuffer->getSendBuffer() , lBuffer->sendCounter() ) );
209  }
210 
211  log ( Debug() , "Sending " , Integer ( mSendByteCounter ) , " bytes from ", Integer ( mDispatchBuffers.size() ), " buffers" );
212  mSendByteCounter = htonl ( mSendByteCounter );
213 
214  mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
215 
216  // Patch for suspected bug in using boost asio with boost python; see https://svnweb.cern.ch/trac/cactus/ticket/323#comment:7
217  while ( mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
218  {
219  log ( Debug() , "Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ", mDeadlineTimer.expires_from_now() , ")." );
220  mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
221  }
222 
223  boost::asio::async_write ( mSocket , lAsioSendBuffer , boost::bind ( &TCP< InnerProtocol , nr_buffers_per_send >::write_callback, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) );
224  mPacketsInFlight += mDispatchBuffers.size();
225 
226  SteadyClock_t::time_point lNow = SteadyClock_t::now();
227  if (mLastSendQueued > SteadyClock_t::time_point())
228  mInterSendTimeStats.add(mLastSendQueued, lNow);
229  mLastSendQueued = lNow;
230  }
231 
232 
233 
234  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
235  void TCP< InnerProtocol , nr_buffers_per_send >::write_callback ( const boost::system::error_code& aErrorCode , std::size_t aBytesTransferred )
236  {
237  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
238  mSendByteCounter = ntohl ( mSendByteCounter );
239 
240  if ( mAsynchronousException )
241  {
242  NotifyConditionalVariable ( true );
243  return;
244  }
245 
246  if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
247  {
248  mAsynchronousException = new exception::TcpTimeout();
249  log ( *mAsynchronousException , "Timeout (" , Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) , " milliseconds) occurred for send to ",
250  ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) , " with URI: ", this->uri() );
251 
252  if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
253  {
254  log ( *mAsynchronousException , "ASIO reported an error: " , Quote ( aErrorCode.message() ) );
255  }
256 
257  NotifyConditionalVariable ( true );
258  return;
259  }
260 
261  if ( ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) ) || ( aBytesTransferred != ( mSendByteCounter+4 ) ) )
262  {
263  mAsynchronousException = new exception::ASIOTcpError();
264  log ( *mAsynchronousException , "Error ", Quote ( aErrorCode.message() ) , " encountered during send to ",
265  ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) , " with URI: " , this->uri() );
266 
267  try
268  {
269  mSocket.close();
270 
271  while ( mSocket.is_open() )
272  {}
273  }
274  catch ( const std::exception& aExc )
275  {
276  log ( *mAsynchronousException , "Error closing TCP socket following the ASIO send error" );
277  }
278 
279  if ( aBytesTransferred != ( mSendByteCounter + 4 ) )
280  {
281  log ( *mAsynchronousException , "Attempted to send " , Integer ( mSendByteCounter ) , " bytes to ",
282  ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) ,
283  " with URI " , Quote ( this->uri() ) , ", but only sent " , Integer ( aBytesTransferred ) , " bytes" );
284  }
285 
286  NotifyConditionalVariable ( true );
287  return;
288  }
289 
290  if ( ! mReplyBuffers.first.empty() )
291  {
292  mReplyQueue.push_back ( std::make_pair(mDispatchBuffers, mLastSendQueued) );
293  }
294  else
295  {
296  mReplyBuffers = std::make_pair(mDispatchBuffers, mLastSendQueued);
297  read ( );
298  }
299 
300  mDispatchBuffers.clear();
301 
302  if ( ( mDispatchQueue.size() >= (mFlushStarted ? 1 : nr_buffers_per_send) ) && ( mPacketsInFlight < this->getMaxNumberOfBuffers() ) )
303  {
304  write();
305  }
306 
307  }
308 
309 
310  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
312  {
313  std::vector< boost::asio::mutable_buffer > lAsioReplyBuffer;
314  lAsioReplyBuffer.push_back ( boost::asio::mutable_buffer ( &mReplyByteCounter , 4 ) );
315  log ( Debug() , "Getting reply byte counter" );
316  boost::asio::ip::tcp::endpoint lEndpoint;
317  mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
318 
319  // Patch for suspected bug in using boost asio with boost python; see https://svnweb.cern.ch/trac/cactus/ticket/323#comment:7
320  while ( mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
321  {
322  log ( Debug() , "Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ", mDeadlineTimer.expires_from_now() , ")." );
323  mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
324  }
325 
326  boost::asio::async_read ( mSocket , lAsioReplyBuffer , boost::asio::transfer_exactly ( 4 ), boost::bind ( &TCP< InnerProtocol , nr_buffers_per_send >::read_callback, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) );
327 
328  SteadyClock_t::time_point lNow = SteadyClock_t::now();
329  if (mLastRecvQueued > SteadyClock_t::time_point())
330  mInterRecvTimeStats.add(mLastRecvQueued, lNow);
331  mLastRecvQueued = lNow;
332  }
333 
334 
335 
336 
337 
338  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
339  void TCP< InnerProtocol , nr_buffers_per_send >::read_callback ( const boost::system::error_code& aErrorCode , std::size_t aBytesTransferred )
340  {
341  std::size_t lNrReplyBuffers = 1;
342  uint32_t lRequestBytes = 0;
343  uint32_t lExpectedReplyBytes = 0;
344 
345  for ( std::vector< boost::shared_ptr<Buffers> >::const_iterator lBufIt = mReplyBuffers.first.begin(); lBufIt != mReplyBuffers.first.end(); lBufIt++ )
346  {
347  lNrReplyBuffers += ( *lBufIt )->getReplyBuffer().size();
348  lRequestBytes += ( *lBufIt )->sendCounter();
349  lExpectedReplyBytes += ( *lBufIt )->replyCounter();
350  }
351 
352  {
353  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
354  if ( mAsynchronousException )
355  {
356  NotifyConditionalVariable ( true );
357  return;
358  }
359 
360  SteadyClock_t::time_point lNow = SteadyClock_t::now();
361 
362  if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
363  {
364  exception::TcpTimeout* lExc = new exception::TcpTimeout();
365  log ( *lExc , "Timeout (" , Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) , " ms) occurred for receive (header) from ",
366  ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) , " with URI '", this->uri(), "'. ",
367  Integer(mPacketsInFlight), " packets in flight, ", Integer(mReplyBuffers.first.size()), " in this chunk (",
368  Integer(lRequestBytes), "/", Integer(lExpectedReplyBytes), " bytes sent/expected). Last send / receive queued ",
369  boost::chrono::duration<float, boost::milli>(lNow - mLastSendQueued).count(), " / ",
370  boost::chrono::duration<float, boost::milli>(lNow - mLastRecvQueued), " ago.");
371 
372  log ( Error(), "Extra timeout-related info - round-trip times: ", mRTTStats);
373  log ( Error(), "Extra timeout-related info - send-recv times: ", mLSTStats);
374  log ( Error(), "Extra timeout-related info - inter-send times: ", mInterSendTimeStats);
375  log ( Error(), "Extra timeout-related info - inter-recv times: ", mInterRecvTimeStats);
376 
377  if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
378  {
379  log ( *lExc , "ASIO reported an error: " , Quote ( aErrorCode.message() ) );
380  }
381 
382  mAsynchronousException = lExc;
383  NotifyConditionalVariable ( true );
384  return;
385  }
386  }
387 
388  if ( ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) ) || ( aBytesTransferred != 4 ) )
389  {
390  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
391  mAsynchronousException = new exception::ASIOTcpError();
392 
393  if ( aErrorCode )
394  {
395  log ( *mAsynchronousException , "Error ", Quote ( aErrorCode.message() ) , " encountered during receive from ",
396  ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) , " with URI: " , this->uri() );
397  }
398 
399  if ( aBytesTransferred != 4 )
400  {
401  log ( *mAsynchronousException, "Expected to receive 4-byte header in async read from ",
402  ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) ,
403  " with URI " , Quote ( this->uri() ) , ", but only received " , Integer ( aBytesTransferred ) , " bytes" );
404  }
405 
406  try
407  {
408  mSocket.close();
409  while ( mSocket.is_open() )
410  {}
411  }
412  catch ( const std::exception& aExc )
413  {
414  log ( *mAsynchronousException , "Error closing socket following ASIO read error" );
415  }
416 
417  NotifyConditionalVariable ( true );
418  return;
419  }
420 
421  SteadyClock_t::time_point lReadHeaderTimestamp = SteadyClock_t::now();
422  mRTTStats.add(mReplyBuffers.second, lReadHeaderTimestamp);
423  mLSTStats.add(mLastSendQueued, lReadHeaderTimestamp);
424 
425  mReplyByteCounter = ntohl ( mReplyByteCounter );
426  log ( Debug() , "Byte Counter says " , Integer ( mReplyByteCounter ) , " bytes are coming" );
427  std::vector< boost::asio::mutable_buffer > lAsioReplyBuffer;
428 
429  lAsioReplyBuffer.reserve ( lNrReplyBuffers );
430  log ( Debug() , "Expecting " , Integer ( lExpectedReplyBytes ) , " bytes in reply, for ", Integer ( mReplyBuffers.first.size() ), " buffers" );
431 
432  for ( std::vector< boost::shared_ptr<Buffers> >::const_iterator lBufIt = mReplyBuffers.first.begin(); lBufIt != mReplyBuffers.first.end(); lBufIt++ )
433  {
434  std::deque< std::pair< uint8_t* , uint32_t > >& lReplyBuffers ( ( *lBufIt )->getReplyBuffer() );
435 
436  for ( std::deque< std::pair< uint8_t* , uint32_t > >::iterator lIt = lReplyBuffers.begin() ; lIt != lReplyBuffers.end() ; ++lIt )
437  {
438  lAsioReplyBuffer.push_back ( boost::asio::mutable_buffer ( lIt->first , lIt->second ) );
439  }
440  }
441 
442  boost::system::error_code lErrorCode = boost::asio::error::would_block;
443  std::size_t lBytesTransferred = boost::asio::read ( mSocket , lAsioReplyBuffer , boost::asio::transfer_exactly ( mReplyByteCounter ), lErrorCode );
444 
445  if ( ( lErrorCode && ( lErrorCode != boost::asio::error::eof ) ) || ( lBytesTransferred != mReplyByteCounter ) )
446  {
447  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
448  mSocket.close();
449 
450  if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
451  {
452  mAsynchronousException = new exception::TcpTimeout();
453  log ( *mAsynchronousException , "Timeout (" , Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) , " milliseconds) occurred for receive (chunk) from ",
454  ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) , " with URI: ", this->uri() );
455  }
456  else
457  {
458  mAsynchronousException = new exception::ASIOTcpError();
459  log ( *mAsynchronousException , "Error ", Quote ( aErrorCode.message() ) , " encountered during receive from ",
460  ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) , " with URI: " , this->uri() );
461  }
462 
463  if ( lBytesTransferred != mReplyByteCounter )
464  {
465  log ( *mAsynchronousException, "Expected to receive " , Integer ( mReplyByteCounter ) , " bytes in read from ",
466  ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) ,
467  " with URI " , Quote ( this->uri() ) , ", but only received " , Integer ( lBytesTransferred ) , " bytes" );
468  }
469 
470  NotifyConditionalVariable ( true );
471  return;
472  }
473 
474 
475  for ( std::vector< boost::shared_ptr<Buffers> >::const_iterator lBufIt = mReplyBuffers.first.begin(); lBufIt != mReplyBuffers.first.end(); lBufIt++ )
476  {
477  try
478  {
479  if ( uhal::exception::exception* lExc = ClientInterface::validate ( *lBufIt ) ) //Control of the pointer has been passed back to the client interface
480  {
481  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
482  mAsynchronousException = lExc;
483  }
484  }
485  catch ( exception::exception& aExc )
486  {
487  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
488  mAsynchronousException = new exception::ValidationError ();
489  log ( *mAsynchronousException , "Exception caught during reply validation for TCP device with URI " , Quote ( this->uri() ) , "; what returned: " , Quote ( aExc.what() ) );
490  }
491 
492  if ( mAsynchronousException )
493  {
494  NotifyConditionalVariable ( true );
495  return;
496  }
497  }
498 
499  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
500  mPacketsInFlight -= mReplyBuffers.first.size();
501 
502  if ( mReplyQueue.size() )
503  {
504  mReplyBuffers = mReplyQueue.front();
505  mReplyQueue.pop_front();
506  read();
507  }
508  else
509  {
510  mReplyBuffers.first.clear();
511  }
512 
513  if ( mDispatchBuffers.empty() && ( mDispatchQueue.size() >= (mFlushStarted ? 1 : nr_buffers_per_send) ) && ( mPacketsInFlight < this->getMaxNumberOfBuffers() ) )
514  {
515  write();
516  }
517 
518  if ( mDispatchBuffers.empty() && mReplyBuffers.first.empty() )
519  {
520  mDeadlineTimer.expires_from_now ( boost::posix_time::seconds(60) );
521  NotifyConditionalVariable ( true );
522  }
523  }
524 
525 
526 
527  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
529  {
530  boost::lock_guard<boost::mutex> lLock ( this->mTransportLayerMutex );
531 
532  // Check whether the deadline has passed. We compare the deadline against the current time since a new asynchronous operation may have moved the deadline before this actor had a chance to run.
533  if ( mDeadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now() )
534  {
535  // SETTING THE EXCEPTION HERE CAN APPEAR AS A TIMEOUT WHEN NONE ACTUALLY EXISTS
536  if ( mDispatchBuffers.size() || mReplyBuffers.first.size() )
537  {
538  log ( Warning() , "Closing TCP socket for device with URI " , Quote ( this->uri() ) , " since deadline has passed" );
539  }
540  else
541  {
542  log ( Debug() , "Closing TCP socket for device with URI " , Quote ( this->uri() ) , " since no communication in 60 seconds" );
543  }
544 
545  // The deadline has passed. The socket is closed so that any outstanding asynchronous operations are cancelled.
546  mSocket.close();
547  // There is no longer an active deadline. The expiry is set to positive infinity so that the actor takes no action until a new deadline is set.
548  mDeadlineTimer.expires_at ( boost::posix_time::pos_infin );
549  }
550 
551  // Put the actor back to sleep.
552  mDeadlineTimer.async_wait ( boost::bind ( &TCP::CheckDeadline, this ) );
553  }
554 
555 
556  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
558  {
559  {
560  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
561  mFlushStarted = true;
562 
563  if ( mDispatchQueue.size() && mDispatchBuffers.empty() )
564  {
565  write();
566  }
567  }
568 
569  WaitOnConditionalVariable();
570 
571  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
572 
573  if ( mAsynchronousException )
574  {
575  mAsynchronousException->throwAsDerivedType();
576  }
577  }
578 
579 
580  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
582  {
583  if ( mSocket.is_open() )
584  {
585  log ( Warning() , "Closing TCP socket for device with URI " , Quote ( this->uri() ) , " since exception detected." );
586 
587  try
588  {
589  mSocket.close();
590 
591  while ( mSocket.is_open() )
592  {}
593  }
594  catch ( ... )
595  {}
596  }
597 
598  NotifyConditionalVariable ( true );
599 
600  if ( mAsynchronousException )
601  {
602  delete mAsynchronousException;
603  mAsynchronousException = NULL;
604  }
605 
606  ClientInterface::returnBufferToPool ( mDispatchQueue );
607  for (size_t i = 0; i < mReplyQueue.size(); i++)
608  ClientInterface::returnBufferToPool ( mReplyQueue.at(i).first );
609  mPacketsInFlight = 0;
610  ClientInterface::returnBufferToPool ( mDispatchBuffers );
611  ClientInterface::returnBufferToPool ( mReplyBuffers.first );
612 
613  mSendByteCounter = 0;
614  mReplyByteCounter = 0;
615 
616  InnerProtocol::dispatchExceptionHandler();
617  }
618 
619 
620  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
622  {
623  {
624  boost::lock_guard<boost::mutex> lLock ( mConditionalVariableMutex );
625  mFlushDone = aValue;
626  }
627  mConditionalVariable.notify_one();
628  }
629 
630  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
632  {
633  boost::unique_lock<boost::mutex> lLock ( mConditionalVariableMutex );
634 
635  while ( !mFlushDone )
636  {
637  mConditionalVariable.wait ( lLock );
638  }
639  }
640 
641 
642  template class TCP< IPbus< 1 , 3 > , 1 >;
643  template class TCP< IPbus< 2 , 0 > , 1 >;
644 
645  template class TCP< ControlHub < IPbus< 1 , 3 > > , 3 >;
646  template class TCP< ControlHub < IPbus< 2 , 0 > > , 3 >;
647 }
648 
uhal::Type
_Type< T > Type()
Definition: log_inserters.type.hxx:60
uhal::exception::exception::what
virtual const char * what() const
Function which returns the error message associated with an exception If no error message has previou...
Definition: exception.cpp:87
uhal::TCP::Flush
virtual void Flush()
Concrete implementation of the synchronization function to block until all buffers have been sent,...
Definition: ProtocolTCP.cpp:557
uhal::TCP::dispatchExceptionHandler
virtual void dispatchExceptionHandler()
Function which tidies up this protocol layer in the event of an exception.
Definition: ProtocolTCP.cpp:581
boost::shared_ptr
Definition: DerivedNodeFactory.hpp:52
uhal::TCP::~TCP
virtual ~TCP()
Destructor.
Definition: ProtocolTCP.cpp:83
uhal::TCP::mDeadlineTimer
boost::asio::deadline_timer mDeadlineTimer
The mechanism for providing the time-out.
Definition: ProtocolTCP.hpp:197
uhal::TCP::write_callback
void write_callback(const boost::system::error_code &aErrorCode, std::size_t aBytesTransferred)
Callback function which is called upon completion of the ASIO async write This, then,...
Definition: ProtocolTCP.cpp:235
ProtocolTCP.hpp
boost
Definition: log.hpp:13
uhal::TCP::connect
void connect()
Make the TCP connection.
Definition: ProtocolTCP.cpp:148
uhal::min
@ min
minutes past the hour formatted as two digits e.g.
Definition: log_inserters.time.hpp:63
uhal::TCP::read_callback
void read_callback(const boost::system::error_code &aErrorCode, std::size_t aBytesTransferred)
Callback function which is called upon completion of the ASIO async read This, then,...
Definition: ProtocolTCP.cpp:339
uhal::TCP
Transport protocol to transfer an IPbus buffer via TCP.
Definition: ProtocolTCP.hpp:85
uhal::TCP::getMaxSendSize
uint32_t getMaxSendSize()
Return the maximum size to be sent based on the buffer size in the target.
Definition: ProtocolTCP.cpp:134
uhal::exception::exception
An abstract base exception class, including an interface to throw as the derived type (for passing ex...
Definition: exception.hpp:71
uhal::TCP::implementDispatch
void implementDispatch(boost::shared_ptr< Buffers > aBuffers)
Send the IPbus buffer to the target, read back the response and call the packing-protocol's validate ...
Definition: ProtocolTCP.cpp:108
uhal
Definition: HttpResponseGrammar.hpp:49
ProtocolControlHub.hpp
IPbusInspector.hpp
log_inserters.quote.hpp
uhal::Info
InfoLevel Info
Definition: LogLevels.cpp:115
uhal::log
void log(FatalLevel &aFatal, const T0 &aArg0)
Function to add a log entry at Fatal level.
Definition: log.hxx:20
log_inserters.integer.hpp
uhal::Integer
_Integer< T, IntFmt<> > Integer(const T &aT)
Forward declare a function which creates an instance of the ultra-lightweight wrapper from an integer...
Definition: log_inserters.integer.hxx:43
uhal::Warning
WarningLevel Warning
Definition: LogLevels.cpp:79
ProtocolIPbus.hpp
uhal::Error
ErrorLevel Error
Definition: LogLevels.cpp:61
uhal::Quote
_Quote< T > Quote(const T &aT)
Definition: log_inserters.quote.hxx:49
uhal::TCP::CheckDeadline
void CheckDeadline()
Function called by the ASIO deadline timer.
Definition: ProtocolTCP.cpp:528
uhal::tests::uri
std::string uri
Definition: test_single.cpp:89
uhal::Debug
DebugLevel Debug
Definition: LogLevels.cpp:133
uhal::TCP::TCP
TCP(const TCP &aTCP)
Copy Constructor This creates a new socket, dispatch queue, dispatch thread, etc.
uhal::TCP::getMaxReplySize
uint32_t getMaxReplySize()
Return the maximum size of reply packet based on the buffer size in the target.
Definition: ProtocolTCP.cpp:141
log.hpp
uhal::TCP::write
void write()
Initialize performing the next TCP write operation In multi-threaded mode, this runs the ASIO async w...
Definition: ProtocolTCP.cpp:192
uhal::TCP::WaitOnConditionalVariable
void WaitOnConditionalVariable()
Function to block a thread pending a BOOST conditional-variable and its associated regular variable.
Definition: ProtocolTCP.cpp:631
uhal::ClientInterface::returnBufferToPool
void returnBufferToPool(boost::shared_ptr< Buffers > &aBuffers)
Function to return a buffer to the buffer pool.
Definition: ClientInterface.cpp:221
LogLevels.hpp
URI.hpp
log_inserters.type.hpp
uhal::tests::write
c write(addr, xx[0])
uhal::URI
Struct to store a URI when parsed by boost spirit.
Definition: URI.hpp:50
uhal::TCP::read
void read()
Initialize performing the next TCP read operation In multi-threaded mode, this runs the ASIO async re...
Definition: ProtocolTCP.cpp:311
uhal::TCP::NotifyConditionalVariable
void NotifyConditionalVariable(const bool &aValue)
Function to set the value of a variable associated with a BOOST conditional-variable and then notify ...
Definition: ProtocolTCP.cpp:621
uhal::ClientInterface::validate
virtual exception::exception * validate(boost::shared_ptr< Buffers > aBuffers)
Function which dispatch calls when the reply is received to check that the headers are as expected.
Definition: ClientInterface.cpp:188
Buffers.hpp