40#include <boost/asio/connect.hpp>
41#include <boost/asio/write.hpp>
42#include <boost/asio/read.hpp>
43#include <boost/asio/placeholders.hpp>
44#include <boost/date_time/posix_time/posix_time.hpp>
61 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
63 InnerProtocol ( aId , aUri ),
64 mMaxPayloadSize (350 * 4),
66 mSocket ( mIOservice ),
67 mEndpoint (
boost::asio::ip::tcp::resolver ( mIOservice ).resolve (
boost::asio::ip::tcp::resolver::query ( aUri.mHostname , aUri.mPort ) ) ),
68 mDeadlineTimer ( mIOservice ),
69 mIOserviceWork ( mIOservice ),
70 mDispatchThread ( [this] () {
mIOservice.run(); } ),
73 mPacketsInFlight ( 0 ),
74 mFlushStarted (
false ),
76 mAsynchronousException ( NULL )
78 mDeadlineTimer.async_wait ([
this] (
const boost::system::error_code&) { this->CheckDeadline(); });
81 for (
const auto& lArg : aUri.mArguments) {
82 if (lArg.first ==
"max_payload_size") {
84 mMaxPayloadSize = boost::lexical_cast<size_t>(lArg.second);
86 catch (
const boost::bad_lexical_cast&) {
87 throw exception::InvalidURI(
"Client URI \"" + this->
uri() +
"\": Invalid value, \"" + lArg.second +
"\", specified for attribute \"" + lArg.first +
"\"");
89 log (
Info(),
"Client with URI ",
Quote(this->
uri()),
": Maximum UDP payload size set to ", std::to_string(mMaxPayloadSize),
" bytes");
95 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
102 while ( mSocket.is_open() )
106 mDispatchThread.join();
108 for (
size_t i = 0; i < mReplyQueue.size(); i++)
113 catch (
const std::exception& aExc )
120 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
123 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
125 if ( mAsynchronousException )
128 mAsynchronousException->throwAsDerivedType();
131 if ( ! mSocket.is_open() )
136 mFlushStarted =
false;
137 mDispatchQueue.push_back ( aBuffers );
139 if ( mDispatchBuffers.empty() && ( mDispatchQueue.size() >= nr_buffers_per_send ) && ( mPacketsInFlight < this->getMaxNumberOfBuffers() ) )
146 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
149 return mMaxPayloadSize;
153 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
156 return mMaxPayloadSize;
160 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
163 log (
Info() ,
"Attempting to create TCP connection to '" , mEndpoint->host_name() ,
"' port " , mEndpoint->service_name() ,
"." );
164 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
165 boost::system::error_code lErrorCode;
166 boost::asio::connect ( mSocket , mEndpoint , lErrorCode );
172 while ( mSocket.is_open() )
175 exception::TcpConnectionFailure lExc;
176 std::ostringstream oss;
178 if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
180 oss <<
"Timeout (" << this->getBoostTimeoutPeriod().total_milliseconds() <<
" milliseconds) occurred when connecting to ";
182 else if ( lErrorCode == boost::asio::error::connection_refused )
184 oss <<
"Connection refused for ";
188 oss <<
"Error \"" << lErrorCode.message() <<
"\" encountered when connecting to ";
191 log ( lExc , oss.str() , ( this->uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
192 " at ",
Quote ( this->mUri.mHostname +
":" + this->mUri.mPort ) ,
". URI=" ,
Quote ( this->uri() ) );
196 mSocket.set_option ( boost::asio::ip::tcp::no_delay (
true ) );
199 log (
Info() ,
"TCP connection succeeded" );
204 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
207 NotifyConditionalVariable (
false );
209 std::vector< boost::asio::const_buffer > lAsioSendBuffer;
210 lAsioSendBuffer.push_back ( boost::asio::const_buffer ( &mSendByteCounter , 4 ) );
211 mSendByteCounter = 0;
212 std::size_t lNrBuffersToSend = std::min ( mDispatchQueue.size(), nr_buffers_per_send );
213 mDispatchBuffers.reserve ( lNrBuffersToSend );
215 for ( std::size_t i = 0; i < lNrBuffersToSend; i++ )
217 mDispatchBuffers.push_back ( mDispatchQueue.front() );
218 mDispatchQueue.pop_front();
219 const std::shared_ptr<Buffers>& lBuffer = mDispatchBuffers.back();
220 mSendByteCounter += lBuffer->sendCounter();
221 lAsioSendBuffer.push_back ( boost::asio::const_buffer ( lBuffer->getSendBuffer() , lBuffer->sendCounter() ) );
224 log (
Debug() ,
"Sending " ,
Integer ( mSendByteCounter ) ,
" bytes from ",
Integer ( mDispatchBuffers.size() ),
" buffers" );
225 mSendByteCounter = htonl ( mSendByteCounter );
227 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
230 while ( mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
232 log (
Debug() ,
"Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ", mDeadlineTimer.expires_from_now() ,
")." );
233 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
236 boost::asio::async_write ( mSocket , lAsioSendBuffer , [&] (
const boost::system::error_code& e, std::size_t n) { this->write_callback(e, n); });
237 mPacketsInFlight += mDispatchBuffers.size();
239 SteadyClock_t::time_point lNow = SteadyClock_t::now();
240 if (mLastSendQueued > SteadyClock_t::time_point())
241 mInterSendTimeStats.add(mLastSendQueued, lNow);
242 mLastSendQueued = lNow;
247 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
250 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
251 mSendByteCounter = ntohl ( mSendByteCounter );
253 if ( mAsynchronousException )
255 NotifyConditionalVariable (
true );
259 if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
261 mAsynchronousException =
new exception::TcpTimeout();
262 log ( *mAsynchronousException ,
"Timeout (" ,
Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) ,
" milliseconds) occurred for send to ",
263 ( this->uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
" with URI: ", this->uri() );
265 if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
267 log ( *mAsynchronousException ,
"ASIO reported an error: " ,
Quote ( aErrorCode.message() ) );
270 NotifyConditionalVariable (
true );
274 if ( ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) ) || ( aBytesTransferred != ( mSendByteCounter+4 ) ) )
276 mAsynchronousException =
new exception::ASIOTcpError();
277 log ( *mAsynchronousException ,
"Error ",
Quote ( aErrorCode.message() ) ,
" encountered during send to ",
278 ( this->uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
" with URI: " , this->uri() );
284 while ( mSocket.is_open() )
287 catch (
const std::exception& aExc )
289 log ( *mAsynchronousException ,
"Error closing TCP socket following the ASIO send error" );
292 if ( aBytesTransferred != ( mSendByteCounter + 4 ) )
294 log ( *mAsynchronousException ,
"Attempted to send " ,
Integer ( mSendByteCounter ) ,
" bytes to ",
295 ( this->uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
296 " with URI " ,
Quote ( this->uri() ) ,
", but only sent " ,
Integer ( aBytesTransferred ) ,
" bytes" );
299 NotifyConditionalVariable (
true );
303 if ( ! mReplyBuffers.first.empty() )
305 mReplyQueue.push_back ( std::make_pair(mDispatchBuffers, mLastSendQueued) );
309 mReplyBuffers = std::make_pair(mDispatchBuffers, mLastSendQueued);
313 mDispatchBuffers.clear();
315 if ( ( mDispatchQueue.size() >= (mFlushStarted ? 1 : nr_buffers_per_send) ) && ( mPacketsInFlight < this->getMaxNumberOfBuffers() ) )
323 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
326 std::vector< boost::asio::mutable_buffer > lAsioReplyBuffer;
327 lAsioReplyBuffer.push_back ( boost::asio::mutable_buffer ( &mReplyByteCounter , 4 ) );
328 log (
Debug() ,
"Getting reply byte counter" );
329 boost::asio::ip::tcp::endpoint lEndpoint;
330 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
333 while ( mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
335 log (
Debug() ,
"Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ", mDeadlineTimer.expires_from_now() ,
")." );
336 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
339 boost::asio::async_read ( mSocket , lAsioReplyBuffer , boost::asio::transfer_exactly ( 4 ), [&] (
const boost::system::error_code& e, std::size_t n) { this->read_callback(e, n); });
340 SteadyClock_t::time_point lNow = SteadyClock_t::now();
341 if (mLastRecvQueued > SteadyClock_t::time_point())
342 mInterRecvTimeStats.add(mLastRecvQueued, lNow);
343 mLastRecvQueued = lNow;
350 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
353 std::size_t lNrReplyBuffers = 1;
354 uint32_t lRequestBytes = 0;
355 uint32_t lExpectedReplyBytes = 0;
357 for (
const auto& lBuf: mReplyBuffers.first)
359 lNrReplyBuffers += lBuf->getReplyBuffer().size();
360 lRequestBytes += lBuf->sendCounter();
361 lExpectedReplyBytes += lBuf->replyCounter();
365 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
366 if ( mAsynchronousException )
368 NotifyConditionalVariable (
true );
372 SteadyClock_t::time_point lNow = SteadyClock_t::now();
374 if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
376 exception::TcpTimeout* lExc =
new exception::TcpTimeout();
377 log ( *lExc ,
"Timeout (" ,
Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) ,
" ms) occurred for receive (header) from ",
378 ( this->uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
" with URI '", this->uri(),
"'. ",
379 Integer(mPacketsInFlight),
" packets in flight, ",
Integer(mReplyBuffers.first.size()),
" in this chunk (",
380 Integer(lRequestBytes),
"/",
Integer(lExpectedReplyBytes),
" bytes sent/expected). Last send / receive queued ",
381 std::chrono::duration<float, std::milli>(lNow - mLastSendQueued).count(),
" / ",
382 std::chrono::duration<float, std::milli>(lNow - mLastRecvQueued).count(),
" ms ago.");
384 log (
Error(),
"Extra timeout-related info - round-trip times: ", mRTTStats);
385 log (
Error(),
"Extra timeout-related info - send-recv times: ", mLSTStats);
386 log (
Error(),
"Extra timeout-related info - inter-send times: ", mInterSendTimeStats);
387 log (
Error(),
"Extra timeout-related info - inter-recv times: ", mInterRecvTimeStats);
389 if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
391 log ( *lExc ,
"ASIO reported an error: " ,
Quote ( aErrorCode.message() ) );
394 mAsynchronousException = lExc;
395 NotifyConditionalVariable (
true );
400 if ( ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) ) || ( aBytesTransferred != 4 ) )
402 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
403 mAsynchronousException =
new exception::ASIOTcpError();
407 log ( *mAsynchronousException ,
"Error ",
Quote ( aErrorCode.message() ) ,
" encountered during receive from ",
408 ( this->uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
" with URI: " , this->uri() );
411 if ( aBytesTransferred != 4 )
413 log ( *mAsynchronousException,
"Expected to receive 4-byte header in async read from ",
414 ( this->uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
415 " with URI " ,
Quote ( this->uri() ) ,
", but only received " ,
Integer ( aBytesTransferred ) ,
" bytes" );
421 while ( mSocket.is_open() )
424 catch (
const std::exception& aExc )
426 log ( *mAsynchronousException ,
"Error closing socket following ASIO read error" );
429 NotifyConditionalVariable (
true );
433 SteadyClock_t::time_point lReadHeaderTimestamp = SteadyClock_t::now();
434 mRTTStats.add(mReplyBuffers.second, lReadHeaderTimestamp);
435 mLSTStats.add(mLastSendQueued, lReadHeaderTimestamp);
437 mReplyByteCounter = ntohl ( mReplyByteCounter );
438 log (
Debug() ,
"Byte Counter says " ,
Integer ( mReplyByteCounter ) ,
" bytes are coming" );
439 std::vector< boost::asio::mutable_buffer > lAsioReplyBuffer;
441 lAsioReplyBuffer.reserve ( lNrReplyBuffers );
442 log (
Debug() ,
"Expecting " ,
Integer ( lExpectedReplyBytes ) ,
" bytes in reply, for ",
Integer ( mReplyBuffers.first.size() ),
" buffers" );
444 for ( std::vector< std::shared_ptr<Buffers> >::const_iterator lBufIt = mReplyBuffers.first.begin(); lBufIt != mReplyBuffers.first.end(); lBufIt++ )
446 std::deque< std::pair< uint8_t* , uint32_t > >& lReplyBuffers ( ( *lBufIt )->getReplyBuffer() );
448 for ( std::deque< std::pair< uint8_t* , uint32_t > >
::iterator lIt = lReplyBuffers.begin() ; lIt != lReplyBuffers.end() ; ++lIt )
450 lAsioReplyBuffer.push_back ( boost::asio::mutable_buffer ( lIt->first , lIt->second ) );
454 boost::system::error_code lErrorCode = boost::asio::error::would_block;
455 std::size_t lBytesTransferred = boost::asio::read ( mSocket , lAsioReplyBuffer , boost::asio::transfer_exactly ( mReplyByteCounter ), lErrorCode );
457 if ( ( lErrorCode && ( lErrorCode != boost::asio::error::eof ) ) || ( lBytesTransferred != mReplyByteCounter ) )
459 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
462 if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
464 mAsynchronousException =
new exception::TcpTimeout();
465 log ( *mAsynchronousException ,
"Timeout (" ,
Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) ,
" milliseconds) occurred for receive (chunk) from ",
466 ( this->uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
" with URI: ", this->uri() );
470 mAsynchronousException =
new exception::ASIOTcpError();
471 log ( *mAsynchronousException ,
"Error ",
Quote ( aErrorCode.message() ) ,
" encountered during receive from ",
472 ( this->uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
" with URI: " , this->uri() );
475 if ( lBytesTransferred != mReplyByteCounter )
477 log ( *mAsynchronousException,
"Expected to receive " ,
Integer ( mReplyByteCounter ) ,
" bytes in read from ",
478 ( this->uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
479 " with URI " ,
Quote ( this->uri() ) ,
", but only received " ,
Integer ( lBytesTransferred ) ,
" bytes" );
482 NotifyConditionalVariable (
true );
487 for (
const auto& lBuf: mReplyBuffers.first)
493 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
494 mAsynchronousException = lExc;
499 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
500 mAsynchronousException =
new exception::ValidationError ();
501 log ( *mAsynchronousException ,
"Exception caught during reply validation for TCP device with URI " ,
Quote ( this->uri() ) ,
"; what returned: " ,
Quote ( aExc.
what() ) );
504 if ( mAsynchronousException )
506 NotifyConditionalVariable (
true );
511 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
512 mPacketsInFlight -= mReplyBuffers.first.size();
514 if ( mReplyQueue.size() )
516 mReplyBuffers = mReplyQueue.front();
517 mReplyQueue.pop_front();
522 mReplyBuffers.first.clear();
525 if ( mDispatchBuffers.empty() && ( mDispatchQueue.size() >= (mFlushStarted ? 1 : nr_buffers_per_send) ) && ( mPacketsInFlight < this->getMaxNumberOfBuffers() ) )
530 if ( mDispatchBuffers.empty() && mReplyBuffers.first.empty() )
532 mDeadlineTimer.expires_from_now ( boost::posix_time::seconds(60) );
533 NotifyConditionalVariable (
true );
539 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
542 std::lock_guard<std::mutex> lLock ( this->mTransportLayerMutex );
545 if ( mDeadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now() )
548 if ( mDispatchBuffers.size() || mReplyBuffers.first.size() )
550 log (
Warning() ,
"Closing TCP socket for device with URI " ,
Quote ( this->uri() ) ,
" since deadline has passed" );
554 log (
Debug() ,
"Closing TCP socket for device with URI " ,
Quote ( this->uri() ) ,
" since no communication in 60 seconds" );
560 mDeadlineTimer.expires_at ( boost::posix_time::pos_infin );
564 mDeadlineTimer.async_wait ([
this] (
const boost::system::error_code&) { this->CheckDeadline(); });
568 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
572 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
573 mFlushStarted =
true;
575 if ( mDispatchQueue.size() && mDispatchBuffers.empty() )
581 WaitOnConditionalVariable();
583 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
585 if ( mAsynchronousException )
587 mAsynchronousException->throwAsDerivedType();
592 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
595 if ( mSocket.is_open() )
597 log (
Warning() ,
"Closing TCP socket for device with URI " ,
Quote ( this->uri() ) ,
" since exception detected." );
603 while ( mSocket.is_open() )
610 NotifyConditionalVariable (
true );
612 if ( mAsynchronousException )
614 delete mAsynchronousException;
615 mAsynchronousException = NULL;
619 for (
size_t i = 0; i < mReplyQueue.size(); i++)
621 mPacketsInFlight = 0;
625 mSendByteCounter = 0;
626 mReplyByteCounter = 0;
628 InnerProtocol::dispatchExceptionHandler();
632 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
636 std::lock_guard<std::mutex> lLock ( mConditionalVariableMutex );
639 mConditionalVariable.notify_one();
642 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
645 std::unique_lock<std::mutex> lLock ( mConditionalVariableMutex );
647 while ( !mFlushDone )
649 mConditionalVariable.wait ( lLock );
\rst Wraps a Python iterator so that it can also be used as a C++ input iterator
void returnBufferToPool(std::shared_ptr< Buffers > &aBuffers)
Function to return a buffer to the buffer pool.
virtual exception::exception * validate(std::shared_ptr< Buffers > aBuffers)
Function which dispatch calls when the reply is received to check that the headers are as expected.
Transport protocol to transfer an IPbus buffer via TCP.
uint32_t getMaxSendSize()
Return the maximum size to be sent based on the buffer size in the target.
virtual void dispatchExceptionHandler()
Function which tidies up this protocol layer in the event of an exception.
uint32_t getMaxReplySize()
Return the maximum size of reply packet based on the buffer size in the target.
void read_callback(const boost::system::error_code &aErrorCode, std::size_t aBytesTransferred)
Callback function which is called upon completion of the ASIO async read This, then,...
void CheckDeadline()
Function called by the ASIO deadline timer.
virtual ~TCP()
Destructor.
TCP(const TCP &aTCP)
Copy Constructor This creates a new socket, dispatch queue, dispatch thread, etc.
void WaitOnConditionalVariable()
Function to block a thread pending a BOOST conditional-variable and its associated regular variable.
void implementDispatch(std::shared_ptr< Buffers > aBuffers)
Send the IPbus buffer to the target, read back the response and call the packing-protocol's validate ...
void NotifyConditionalVariable(const bool &aValue)
Function to set the value of a variable associated with a BOOST conditional-variable and then notify ...
virtual void Flush()
Concrete implementation of the synchronization function to block until all buffers have been sent,...
void connect()
Make the TCP connection.
void read()
Initialize performing the next TCP read operation In multi-threaded mode, this runs the ASIO async re...
boost::asio::io_service mIOservice
The boost::asio::io_service used to create the connections.
void write()
Initialize performing the next TCP write operation In multi-threaded mode, this runs the ASIO async w...
void write_callback(const boost::system::error_code &aErrorCode, std::size_t aBytesTransferred)
Callback function which is called upon completion of the ASIO async write This, then,...
An abstract base exception class, including an interface to throw as the derived type (for passing ex...
virtual const char * what() const
Function which returns the error message associated with an exception If no error message has previou...
_Quote< T > Quote(const T &aT)
_Integer< T, IntFmt<> > Integer(const T &aT)
Forward declare a function which creates an instance of the ultra-lightweight wrapper from an integer...
void log(FatalLevel &aFatal, const T0 &aArg0)
Function to add a log entry at Fatal level.
Struct to store a URI when parsed by boost spirit.