μHAL (v2.6.5)
Part of the IPbus software repository
ProtocolTCP.cpp
Go to the documentation of this file.
1 /*
2 ---------------------------------------------------------------------------
3 
4  This file is part of uHAL.
5 
6  uHAL is a hardware access library and programming framework
7  originally developed for upgrades of the Level-1 trigger of the CMS
8  experiment at CERN.
9 
10  uHAL is free software: you can redistribute it and/or modify
11  it under the terms of the GNU General Public License as published by
12  the Free Software Foundation, either version 3 of the License, or
13  (at your option) any later version.
14 
15  uHAL is distributed in the hope that it will be useful,
16  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  GNU General Public License for more details.
19 
20  You should have received a copy of the GNU General Public License
21  along with uHAL. If not, see <http://www.gnu.org/licenses/>.
22 
23 
24  Andrew Rose, Imperial College, London
25  email: awr01 <AT> imperial.ac.uk
26 
27  Marc Magrans de Abril, CERN
28  email: marc.magrans.de.abril <AT> cern.ch
29 
30 ---------------------------------------------------------------------------
31 */
32 
33 #include "uhal/ProtocolTCP.hpp"
34 
35 
36 #include <sys/time.h>
37 
38 #include <boost/bind/bind.hpp>
39 #include <boost/lambda/lambda.hpp>
40 #include <boost/asio/connect.hpp>
41 #include <boost/asio/write.hpp>
42 #include <boost/asio/read.hpp>
43 #include <boost/asio/placeholders.hpp>
44 #include "boost/date_time/posix_time/posix_time.hpp"
45 #include <boost/chrono/chrono_io.hpp>
46 
47 #include "uhal/Buffers.hpp"
48 #include "uhal/grammars/URI.hpp"
49 #include "uhal/IPbusInspector.hpp"
50 #include "uhal/log/LogLevels.hpp"
51 #include "uhal/log/log.hpp"
55 #include "uhal/ProtocolIPbus.hpp"
57 
58 
59 namespace uhal
60 {
61 
62  //--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
63  //--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
64 
65  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
66  TCP< InnerProtocol, nr_buffers_per_send >::TCP ( const std::string& aId, const URI& aUri ) :
67  InnerProtocol ( aId , aUri ),
68  mIOservice ( ),
69  mSocket ( mIOservice ),
70  mEndpoint ( boost::asio::ip::tcp::resolver ( mIOservice ).resolve ( boost::asio::ip::tcp::resolver::query ( aUri.mHostname , aUri.mPort ) ) ),
71  mDeadlineTimer ( mIOservice ),
72  // mReplyMemory ( 65536 , 0x00000000 ),
73  mIOserviceWork ( mIOservice ),
74  mDispatchThread ( boost::bind ( &boost::asio::io_service::run , & ( mIOservice ) ) ),
75  mDispatchQueue(),
76  mReplyQueue(),
77  mPacketsInFlight ( 0 ),
78  mFlushStarted ( false ),
79  mFlushDone ( true ),
80  mAsynchronousException ( NULL )
81  {
82  mDeadlineTimer.async_wait ( boost::bind ( &TCP::CheckDeadline, this ) );
83  }
84 
85 
86  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
88  {
89  try
90  {
91  mSocket.close();
92 
93  while ( mSocket.is_open() )
94  {}
95 
96  mIOservice.stop();
97  mDispatchThread.join();
99  for (size_t i = 0; i < mReplyQueue.size(); i++)
103  }
104  catch ( const std::exception& aExc )
105  {
106  log ( Error() , "Exception " , Quote ( aExc.what() ) , " caught at " , ThisLocation() );
107  }
108  }
109 
110 
111  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
113  {
114  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
115 
117  {
118  log ( *mAsynchronousException , "Rethrowing Asynchronous Exception from " , ThisLocation() );
119  mAsynchronousException->ThrowAsDerivedType();
120  }
121 
122  if ( ! mSocket.is_open() )
123  {
124  connect();
125  }
126 
127  mFlushStarted = false;
128  mDispatchQueue.push_back ( aBuffers );
129 
130  if ( mDispatchBuffers.empty() && ( mDispatchQueue.size() >= nr_buffers_per_send ) && ( mPacketsInFlight < this->getMaxNumberOfBuffers() ) )
131  {
132  write ( );
133  }
134  }
135 
136 
137  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
139  {
140  return (350 * 4);
141  }
142 
143 
144  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
146  {
147  return (350 * 4);
148  }
149 
150 
151  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
153  {
154  log ( Info() , "Attempting to create TCP connection to '" , mEndpoint->host_name() , "' port " , mEndpoint->service_name() , "." );
155  mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
156  boost::system::error_code lErrorCode;
157  boost::asio::connect ( mSocket , mEndpoint , lErrorCode );
158 
159  if ( lErrorCode )
160  {
161  mSocket.close();
162 
163  while ( mSocket.is_open() )
164  {}
165 
166  exception::TcpConnectionFailure lExc;
167  std::ostringstream oss;
168 
169  if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
170  {
171  oss << "Timeout (" << this->getBoostTimeoutPeriod().total_milliseconds() << " milliseconds) occurred when connecting to ";
172  }
173  else if ( lErrorCode == boost::asio::error::connection_refused )
174  {
175  oss << "Connection refused for ";
176  }
177  else
178  {
179  oss << "Error \"" << lErrorCode.message() << "\" encountered when connecting to ";
180  }
181 
182  log ( lExc , oss.str() , ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) ,
183  " at ", Quote ( this->mUri.mHostname + ":" + this->mUri.mPort ) , ". URI=" , Quote ( this->uri() ) );
184  throw lExc;
185  }
186 
187  mSocket.set_option ( boost::asio::ip::tcp::no_delay ( true ) );
188  // boost::asio::socket_base::non_blocking_io lNonBlocking ( true );
189  // mSocket.io_control ( lNonBlocking );
190  log ( Info() , "TCP connection succeeded" );
191  }
192 
193 
194 
195  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
197  {
198  NotifyConditionalVariable ( false );
199 
200  std::vector< boost::asio::const_buffer > lAsioSendBuffer;
201  lAsioSendBuffer.push_back ( boost::asio::const_buffer ( &mSendByteCounter , 4 ) );
202  mSendByteCounter = 0;
203  std::size_t lNrBuffersToSend = std::min ( mDispatchQueue.size(), nr_buffers_per_send );
204  mDispatchBuffers.reserve ( lNrBuffersToSend );
205 
206  for ( std::size_t i = 0; i < lNrBuffersToSend; i++ )
207  {
208  mDispatchBuffers.push_back ( mDispatchQueue.front() );
209  mDispatchQueue.pop_front();
210  const boost::shared_ptr<Buffers>& lBuffer = mDispatchBuffers.back();
211  mSendByteCounter += lBuffer->sendCounter();
212  lAsioSendBuffer.push_back ( boost::asio::const_buffer ( lBuffer->getSendBuffer() , lBuffer->sendCounter() ) );
213  }
214 
215  log ( Debug() , "Sending " , Integer ( mSendByteCounter ) , " bytes from ", Integer ( mDispatchBuffers.size() ), " buffers" );
217 
218  mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
219 
220  // Patch for suspected bug in using boost asio with boost python; see https://svnweb.cern.ch/trac/cactus/ticket/323#comment:7
221  while ( mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
222  {
223  log ( Debug() , "Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ", mDeadlineTimer.expires_from_now() , ")." );
224  mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
225  }
226 
227  boost::asio::async_write ( mSocket , lAsioSendBuffer , boost::bind ( &TCP< InnerProtocol , nr_buffers_per_send >::write_callback, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) );
229 
230  SteadyClock_t::time_point lNow = SteadyClock_t::now();
231  if (mLastSendQueued > SteadyClock_t::time_point())
233  mLastSendQueued = lNow;
234  }
235 
236 
237 
238  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
239  void TCP< InnerProtocol , nr_buffers_per_send >::write_callback ( const boost::system::error_code& aErrorCode , std::size_t aBytesTransferred )
240  {
241  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
243 
245  {
246  NotifyConditionalVariable ( true );
247  return;
248  }
249 
250  if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
251  {
252  mAsynchronousException = new exception::TcpTimeout();
253  log ( *mAsynchronousException , "Timeout (" , Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) , " milliseconds) occurred for send to ",
254  ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) , " with URI: ", this->uri() );
255 
256  if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
257  {
258  log ( *mAsynchronousException , "ASIO reported an error: " , Quote ( aErrorCode.message() ) );
259  }
260 
261  NotifyConditionalVariable ( true );
262  return;
263  }
264 
265  if ( ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) ) || ( aBytesTransferred != ( mSendByteCounter+4 ) ) )
266  {
267  mAsynchronousException = new exception::ASIOTcpError();
268  log ( *mAsynchronousException , "Error ", Quote ( aErrorCode.message() ) , " encountered during send to ",
269  ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) , " with URI: " , this->uri() );
270 
271  try
272  {
273  mSocket.close();
274 
275  while ( mSocket.is_open() )
276  {}
277  }
278  catch ( const std::exception& aExc )
279  {
280  log ( *mAsynchronousException , "Error closing TCP socket following the ASIO send error" );
281  }
282 
283  if ( aBytesTransferred != ( mSendByteCounter + 4 ) )
284  {
285  log ( *mAsynchronousException , "Attempted to send " , Integer ( mSendByteCounter ) , " bytes to ",
286  ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) ,
287  " with URI " , Quote ( this->uri() ) , ", but only sent " , Integer ( aBytesTransferred ) , " bytes" );
288  }
289 
290  NotifyConditionalVariable ( true );
291  return;
292  }
293 
294  if ( ! mReplyBuffers.first.empty() )
295  {
296  mReplyQueue.push_back ( std::make_pair(mDispatchBuffers, mLastSendQueued) );
297  }
298  else
299  {
301  read ( );
302  }
303 
304  mDispatchBuffers.clear();
305 
306  if ( ( mDispatchQueue.size() >= (mFlushStarted ? 1 : nr_buffers_per_send) ) && ( mPacketsInFlight < this->getMaxNumberOfBuffers() ) )
307  {
308  write();
309  }
310 
311  }
312 
313 
314  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
316  {
317  // std::deque< std::pair< uint8_t* , uint32_t > >& lReplyBuffers ( mReplyBuffers->getReplyBuffer() );
318  // std::vector< boost::asio::mutable_buffer > lAsioReplyBuffer;
319  // lAsioReplyBuffer.reserve ( lReplyBuffers.size() +1 );
320  // for ( std::deque< std::pair< uint8_t* , uint32_t > >::iterator lIt = lReplyBuffers.begin() ; lIt != lReplyBuffers.end() ; ++lIt )
321  // {
322  // lAsioReplyBuffer.push_back ( boost::asio::mutable_buffer ( lIt->first , lIt->second ) );
323  // }
324  std::vector< boost::asio::mutable_buffer > lAsioReplyBuffer;
325  lAsioReplyBuffer.push_back ( boost::asio::mutable_buffer ( &mReplyByteCounter , 4 ) );
326  log ( Debug() , "Getting reply byte counter" );
327  boost::asio::ip::tcp::endpoint lEndpoint;
328  mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
329 
330  // Patch for suspected bug in using boost asio with boost python; see https://svnweb.cern.ch/trac/cactus/ticket/323#comment:7
331  while ( mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
332  {
333  log ( Debug() , "Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ", mDeadlineTimer.expires_from_now() , ")." );
334  mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
335  }
336 
337  boost::asio::async_read ( mSocket , lAsioReplyBuffer , boost::asio::transfer_exactly ( 4 ), boost::bind ( &TCP< InnerProtocol , nr_buffers_per_send >::read_callback, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) );
338 
339  SteadyClock_t::time_point lNow = SteadyClock_t::now();
340  if (mLastRecvQueued > SteadyClock_t::time_point())
342  mLastRecvQueued = lNow;
343  }
344 
345 
346 
347 
348 
349  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
350  void TCP< InnerProtocol , nr_buffers_per_send >::read_callback ( const boost::system::error_code& aErrorCode , std::size_t aBytesTransferred )
351  {
352  std::size_t lNrReplyBuffers = 1;
353  uint32_t lRequestBytes = 0;
354  uint32_t lExpectedReplyBytes = 0;
355 
356  for ( std::vector< boost::shared_ptr<Buffers> >::const_iterator lBufIt = mReplyBuffers.first.begin(); lBufIt != mReplyBuffers.first.end(); lBufIt++ )
357  {
358  lNrReplyBuffers += ( *lBufIt )->getReplyBuffer().size();
359  lRequestBytes += ( *lBufIt )->sendCounter();
360  lExpectedReplyBytes += ( *lBufIt )->replyCounter();
361  }
362 
363  {
364  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
366  {
367  NotifyConditionalVariable ( true );
368  return;
369  }
370 
371  SteadyClock_t::time_point lNow = SteadyClock_t::now();
372 
373  if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
374  {
375  exception::TcpTimeout* lExc = new exception::TcpTimeout();
376  log ( *lExc , "Timeout (" , Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) , " ms) occurred for receive (header) from ",
377  ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) , " with URI '", this->uri(), "'. ",
378  Integer(mPacketsInFlight), " packets in flight, ", Integer(mReplyBuffers.first.size()), " in this chunk (",
379  Integer(lRequestBytes), "/", Integer(lExpectedReplyBytes), " bytes sent/expected). Last send / receive queued ",
380  boost::chrono::duration<float, boost::milli>(lNow - mLastSendQueued).count(), " / ",
381  boost::chrono::duration<float, boost::milli>(lNow - mLastRecvQueued), " ago.");
382 
383  log ( Error(), "Extra timeout-related info - round-trip times: ", mRTTStats);
384  log ( Error(), "Extra timeout-related info - send-recv times: ", mLSTStats);
385  log ( Error(), "Extra timeout-related info - inter-send times: ", mInterSendTimeStats);
386  log ( Error(), "Extra timeout-related info - inter-recv times: ", mInterRecvTimeStats);
387 
388  if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
389  {
390  log ( *lExc , "ASIO reported an error: " , Quote ( aErrorCode.message() ) );
391  }
392 
393  mAsynchronousException = lExc;
394  NotifyConditionalVariable ( true );
395  return;
396  }
397  }
398 
399  if ( ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) ) || ( aBytesTransferred != 4 ) )
400  {
401  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
402  mAsynchronousException = new exception::ASIOTcpError();
403 
404  if ( aErrorCode )
405  {
406  log ( *mAsynchronousException , "Error ", Quote ( aErrorCode.message() ) , " encountered during receive from ",
407  ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) , " with URI: " , this->uri() );
408  }
409 
410  if ( aBytesTransferred != 4 )
411  {
412  log ( *mAsynchronousException, "Expected to receive 4-byte header in async read from ",
413  ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) ,
414  " with URI " , Quote ( this->uri() ) , ", but only received " , Integer ( aBytesTransferred ) , " bytes" );
415  }
416 
417  try
418  {
419  mSocket.close();
420  while ( mSocket.is_open() )
421  {}
422  }
423  catch ( const std::exception& aExc )
424  {
425  log ( *mAsynchronousException , "Error closing socket following ASIO read error" );
426  }
427 
428  NotifyConditionalVariable ( true );
429  return;
430  }
431 
432  SteadyClock_t::time_point lReadHeaderTimestamp = SteadyClock_t::now();
433  mRTTStats.add(mReplyBuffers.second, lReadHeaderTimestamp);
434  mLSTStats.add(mLastSendQueued, lReadHeaderTimestamp);
435 
437  log ( Debug() , "Byte Counter says " , Integer ( mReplyByteCounter ) , " bytes are coming" );
438  std::vector< boost::asio::mutable_buffer > lAsioReplyBuffer;
439 
440  lAsioReplyBuffer.reserve ( lNrReplyBuffers );
441  log ( Debug() , "Expecting " , Integer ( lExpectedReplyBytes ) , " bytes in reply, for ", Integer ( mReplyBuffers.first.size() ), " buffers" );
442 
443  for ( std::vector< boost::shared_ptr<Buffers> >::const_iterator lBufIt = mReplyBuffers.first.begin(); lBufIt != mReplyBuffers.first.end(); lBufIt++ )
444  {
445  std::deque< std::pair< uint8_t* , uint32_t > >& lReplyBuffers ( ( *lBufIt )->getReplyBuffer() );
446 
447  for ( std::deque< std::pair< uint8_t* , uint32_t > >::iterator lIt = lReplyBuffers.begin() ; lIt != lReplyBuffers.end() ; ++lIt )
448  {
449  lAsioReplyBuffer.push_back ( boost::asio::mutable_buffer ( lIt->first , lIt->second ) );
450  }
451  }
452 
453  boost::system::error_code lErrorCode = boost::asio::error::would_block;
454  std::size_t lBytesTransferred = boost::asio::read ( mSocket , lAsioReplyBuffer , boost::asio::transfer_exactly ( mReplyByteCounter ), lErrorCode );
455 
456  if ( ( lErrorCode && ( lErrorCode != boost::asio::error::eof ) ) || ( lBytesTransferred != mReplyByteCounter ) )
457  {
458  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
459  mSocket.close();
460 
461  if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
462  {
463  mAsynchronousException = new exception::TcpTimeout();
464  log ( *mAsynchronousException , "Timeout (" , Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) , " milliseconds) occurred for receive (chunk) from ",
465  ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) , " with URI: ", this->uri() );
466  }
467  else
468  {
469  mAsynchronousException = new exception::ASIOTcpError();
470  log ( *mAsynchronousException , "Error ", Quote ( aErrorCode.message() ) , " encountered during receive from ",
471  ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) , " with URI: " , this->uri() );
472  }
473 
474  if ( lBytesTransferred != mReplyByteCounter )
475  {
476  log ( *mAsynchronousException, "Expected to receive " , Integer ( mReplyByteCounter ) , " bytes in read from ",
477  ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) ,
478  " with URI " , Quote ( this->uri() ) , ", but only received " , Integer ( lBytesTransferred ) , " bytes" );
479  }
480 
481  NotifyConditionalVariable ( true );
482  return;
483  }
484 
485 
486  for ( std::vector< boost::shared_ptr<Buffers> >::const_iterator lBufIt = mReplyBuffers.first.begin(); lBufIt != mReplyBuffers.first.end(); lBufIt++ )
487  {
488  try
489  {
490  if ( uhal::exception::exception* lExc = ClientInterface::validate ( *lBufIt ) ) //Control of the pointer has been passed back to the client interface
491  {
492  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
493  mAsynchronousException = lExc;
494  }
495  }
496  catch ( exception::exception& aExc )
497  {
498  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
499  mAsynchronousException = new exception::ValidationError ();
500  log ( *mAsynchronousException , "Exception caught during reply validation for TCP device with URI " , Quote ( this->uri() ) , "; what returned: " , Quote ( aExc.what() ) );
501  }
502 
504  {
505  NotifyConditionalVariable ( true );
506  return;
507  }
508  }
509 
510  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
511  mPacketsInFlight -= mReplyBuffers.first.size();
512 
513  if ( mReplyQueue.size() )
514  {
515  mReplyBuffers = mReplyQueue.front();
516  mReplyQueue.pop_front();
517  read();
518  }
519  else
520  {
521  mReplyBuffers.first.clear();
522  }
523 
524  if ( mDispatchBuffers.empty() && ( mDispatchQueue.size() >= (mFlushStarted ? 1 : nr_buffers_per_send) ) && ( mPacketsInFlight < this->getMaxNumberOfBuffers() ) )
525  {
526  write();
527  }
528 
529  if ( mDispatchBuffers.empty() && mReplyBuffers.first.empty() )
530  {
531  mDeadlineTimer.expires_from_now ( boost::posix_time::seconds(60) );
532  NotifyConditionalVariable ( true );
533  }
534  }
535 
536 
537 
538  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
540  {
541  boost::lock_guard<boost::mutex> lLock ( this->mTransportLayerMutex );
542 
543  // Check whether the deadline has passed. We compare the deadline against the current time since a new asynchronous operation may have moved the deadline before this actor had a chance to run.
544  if ( mDeadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now() )
545  {
546  // SETTING THE EXCEPTION HERE CAN APPEAR AS A TIMEOUT WHEN NONE ACTUALLY EXISTS
547  if ( mDispatchBuffers.size() || mReplyBuffers.first.size() )
548  {
549  log ( Warning() , "Closing TCP socket for device with URI " , Quote ( this->uri() ) , " since deadline has passed" );
550  }
551  else
552  {
553  log ( Debug() , "Closing TCP socket for device with URI " , Quote ( this->uri() ) , " since no communication in 60 seconds" );
554  }
555 
556  // The deadline has passed. The socket is closed so that any outstanding asynchronous operations are cancelled.
557  mSocket.close();
558  // There is no longer an active deadline. The expiry is set to positive infinity so that the actor takes no action until a new deadline is set.
559  mDeadlineTimer.expires_at ( boost::posix_time::pos_infin );
560  }
561 
562  // Put the actor back to sleep.
563  mDeadlineTimer.async_wait ( boost::bind ( &TCP::CheckDeadline, this ) );
564  }
565 
566 
567  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
569  {
570  {
571  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
572  mFlushStarted = true;
573 
574  if ( mDispatchQueue.size() && mDispatchBuffers.empty() )
575  {
576  write();
577  }
578  }
579 
581 
582  boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
583 
585  {
586  mAsynchronousException->ThrowAsDerivedType();
587  }
588  }
589 
590 
591 
592  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
594  {
595  if ( mSocket.is_open() )
596  {
597  log ( Warning() , "Closing TCP socket for device with URI " , Quote ( this->uri() ) , " since exception detected." );
598 
599  try
600  {
601  mSocket.close();
602 
603  while ( mSocket.is_open() )
604  {}
605  }
606  catch ( ... )
607  {}
608  }
609 
610  NotifyConditionalVariable ( true );
611 
613  {
614  delete mAsynchronousException;
615  mAsynchronousException = NULL;
616  }
617 
619  for (size_t i = 0; i < mReplyQueue.size(); i++)
621  mPacketsInFlight = 0;
624 
625  mSendByteCounter = 0;
626  mReplyByteCounter = 0;
627 
628  InnerProtocol::dispatchExceptionHandler();
629  }
630 
631 
632  //--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
633  //--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
634 
635 
636  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
638  {
639  {
640  boost::lock_guard<boost::mutex> lLock ( mConditionalVariableMutex );
641  mFlushDone = aValue;
642  }
643  mConditionalVariable.notify_one();
644  }
645 
646  template < typename InnerProtocol , std::size_t nr_buffers_per_send >
648  {
649  boost::unique_lock<boost::mutex> lLock ( mConditionalVariableMutex );
650 
651  while ( !mFlushDone )
652  {
653  mConditionalVariable.wait ( lLock );
654  }
655  }
656 
657 
658  template class TCP< IPbus< 1 , 3 > , 1 >;
659  template class TCP< IPbus< 2 , 0 > , 1 >;
660 
661  template class TCP< ControlHub < IPbus< 1 , 3 > > , 3 >;
662  template class TCP< ControlHub < IPbus< 2 , 0 > > , 3 >;
663 }
664 
boost::condition_variable mConditionalVariable
A conditional variable for blocking the main thread until the variable with which it is associated is...
std::deque< std::pair< std::vector< boost::shared_ptr< Buffers > >, SteadyClock_t::time_point > > mReplyQueue
The list of buffers still awaiting a reply.
void returnBufferToPool(boost::shared_ptr< Buffers > &aBuffers)
Function to return a buffer to the buffer pool.
void CheckDeadline()
Function called by the ASIO deadline timer.
std::deque< boost::shared_ptr< Buffers > > mDispatchQueue
The list of buffers still waiting to be sent.
uint32_t mPacketsInFlight
Counter of how many writes have been sent, for which no reply has yet been received.
void read_callback(const boost::system::error_code &aErrorCode, std::size_t aBytesTransferred)
Callback function which is called upon completion of the ASIO async read This, then, checks the queue to see if there are more packets to be sent and if so, calls write.
minutes past the hour formatted as two digits e.g.
void write()
Initialize performing the next TCP write operation In multi-threaded mode, this runs the ASIO async w...
virtual void Flush()
Concrete implementation of the synchronization function to block until all buffers have been sent...
boost::asio::deadline_timer mDeadlineTimer
The mechanism for providing the time-out.
TimeIntervalStats mInterSendTimeStats
bool mFlushStarted
Boolean specifying whether or not the main thread is within TCP::Flush method. Its value checked by t...
void NotifyConditionalVariable(const bool &aValue)
Function to set the value of a variable associated with a BOOST conditional-variable and then notify ...
uhal::exception::exception * mAsynchronousException
A pointer to an exception object for passing exceptions from the worker thread to the main thread...
SteadyClock_t::time_point mLastRecvQueued
SteadyClock_t::time_point mLastSendQueued
WarningLevel Warning
Definition: LogLevels.cpp:79
boost::asio::ip::tcp::resolver::iterator mEndpoint
A shared pointer to a boost::asio tcp endpoint - used by the delayed (open-on-first-use) connect...
An abstract base exception class providing an interface to a throw/ThrowAsDerivedType mechanism which...
Definition: exception.hpp:89
TimeIntervalStats mRTTStats
uint32_t mSendByteCounter
Variable storing "number of bytes to follow" field for the TCP chunk currently being sent...
boost::mutex mConditionalVariableMutex
A mutex for use by the conditional variable.
ErrorLevel Error
Definition: LogLevels.cpp:61
void read()
Initialize performing the next TCP read operation In multi-threaded mode, this runs the ASIO async re...
void WaitOnConditionalVariable()
Function to block a thread pending a BOOST conditional-variable and its associated regular variable...
boost::mutex mTransportLayerMutex
A MutEx lock used to make sure the access functions are thread safe.
_Quote< T > Quote(const T &aT)
void write_callback(const boost::system::error_code &aErrorCode, std::size_t aBytesTransferred)
Callback function which is called upon completion of the ASIO async write This, then, makes a call to read to read back the reply to what has just been sent.
virtual const char * what() const
Function which returns the error message associated with an exception If no error message has previou...
Definition: exception.cpp:116
void connect()
Make the TCP connection.
TimeIntervalStats mLSTStats
TimeIntervalStats mInterRecvTimeStats
std::pair< std::vector< boost::shared_ptr< Buffers > >, SteadyClock_t::time_point > mReplyBuffers
The buffers containing the payloads for the receive operation that&#39;s currently in progress...
#define ThisLocation()
uint32_t getMaxSendSize()
Return the maximum size to be sent based on the buffer size in the target.
boost::thread mDispatchThread
The Worker thread in Multi-threaded mode.
virtual ~TCP()
Destructor.
Definition: ProtocolTCP.cpp:87
TCP(const TCP &aTCP)
Copy Constructor This creates a new socket, dispatch queue, dispatch thread, etc. ...
virtual void dispatchExceptionHandler()
Function which tidies up this protocol layer in the event of an exception.
std::string uri
Definition: test_single.cpp:89
DebugLevel Debug
Definition: LogLevels.cpp:133
boost::asio::io_service mIOservice
The boost::asio::io_service used to create the connections.
virtual exception::exception * validate(boost::shared_ptr< Buffers > aBuffers)
Function which dispatch calls when the reply is received to check that the headers are as expected...
void add(const Clock_t::time_point &aT1, const Clock_t::time_point &aT2)
Transport protocol to transfer an IPbus buffer via TCP.
Definition: ProtocolTCP.hpp:85
void implementDispatch(boost::shared_ptr< Buffers > aBuffers)
Send the IPbus buffer to the target, read back the response and call the packing-protocol&#39;s validate ...
InfoLevel Info
Definition: LogLevels.cpp:114
uint32_t mReplyByteCounter
Variable used to store "number of bytes to follow" field for the next/current TCP chunk being receive...
std::vector< boost::shared_ptr< Buffers > > mDispatchBuffers
The buffers containing the payload for the send operation that&#39;s currently in progress.
bool mFlushDone
A variable associated with the conditional variable which specifies whether all packets have been sen...
uint32_t getMaxReplySize()
Return the maximum size of reply packet based on the buffer size in the target.
_Integer< T, IntFmt<> > Integer(const T &aT)
Forward declare a function which creates an instance of the ultra-lightweight wrapper from an integer...
boost::asio::ip::tcp::socket mSocket
A shared pointer to a boost::asio tcp socket through which the operation will be performed.
Struct to store a URI when parsed by boost spirit.
Definition: URI.hpp:49