38 #include <boost/bind/bind.hpp> 39 #include <boost/lambda/lambda.hpp> 40 #include <boost/asio/connect.hpp> 41 #include <boost/asio/write.hpp> 42 #include <boost/asio/read.hpp> 43 #include <boost/asio/placeholders.hpp> 44 #include "boost/date_time/posix_time/posix_time.hpp" 45 #include <boost/chrono/chrono_io.hpp> 51 #include "uhal/log/log.hpp" 65 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
67 InnerProtocol ( aId , aUri ),
69 mSocket ( mIOservice ),
70 mEndpoint (
boost::asio::ip::tcp::resolver ( mIOservice ).resolve (
boost::asio::ip::tcp::resolver::query ( aUri.mHostname , aUri.mPort ) ) ),
71 mDeadlineTimer ( mIOservice ),
73 mIOserviceWork ( mIOservice ),
74 mDispatchThread (
boost::bind ( &
boost::asio::io_service::run , & ( mIOservice ) ) ),
77 mPacketsInFlight ( 0 ),
78 mFlushStarted ( false ),
80 mAsynchronousException ( NULL )
86 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
104 catch (
const std::exception& aExc )
111 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
137 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
144 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
151 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
154 log (
Info() ,
"Attempting to create TCP connection to '" ,
mEndpoint->host_name() ,
"' port " ,
mEndpoint->service_name() ,
"." );
155 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
156 boost::system::error_code lErrorCode;
166 exception::TcpConnectionFailure lExc;
167 std::ostringstream oss;
169 if (
mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
171 oss <<
"Timeout (" << this->getBoostTimeoutPeriod().total_milliseconds() <<
" milliseconds) occurred when connecting to ";
173 else if ( lErrorCode == boost::asio::error::connection_refused )
175 oss <<
"Connection refused for ";
179 oss <<
"Error \"" << lErrorCode.message() <<
"\" encountered when connecting to ";
182 log ( lExc , oss.str() , ( this->
uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
183 " at ",
Quote ( this->mUri.mHostname +
":" + this->mUri.mPort ) ,
". URI=" ,
Quote ( this->
uri() ) );
187 mSocket.set_option ( boost::asio::ip::tcp::no_delay (
true ) );
190 log (
Info() ,
"TCP connection succeeded" );
195 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
200 std::vector< boost::asio::const_buffer > lAsioSendBuffer;
201 lAsioSendBuffer.push_back ( boost::asio::const_buffer ( &
mSendByteCounter , 4 ) );
206 for ( std::size_t i = 0; i < lNrBuffersToSend; i++ )
212 lAsioSendBuffer.push_back ( boost::asio::const_buffer ( lBuffer->getSendBuffer() , lBuffer->sendCounter() ) );
218 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
221 while (
mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
223 log (
Debug() ,
"Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ",
mDeadlineTimer.expires_from_now() ,
")." );
224 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
230 SteadyClock_t::time_point lNow = SteadyClock_t::now();
238 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
250 if (
mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
253 log ( *
mAsynchronousException ,
"Timeout (" ,
Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) ,
" milliseconds) occurred for send to ",
254 ( this->
uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
" with URI: ", this->
uri() );
256 if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
265 if ( ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) ) || ( aBytesTransferred != (
mSendByteCounter+4 ) ) )
269 ( this->
uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
" with URI: " , this->
uri() );
278 catch (
const std::exception& aExc )
286 ( this->
uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
287 " with URI " ,
Quote ( this->
uri() ) ,
", but only sent " ,
Integer ( aBytesTransferred ) ,
" bytes" );
314 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
324 std::vector< boost::asio::mutable_buffer > lAsioReplyBuffer;
325 lAsioReplyBuffer.push_back ( boost::asio::mutable_buffer ( &
mReplyByteCounter , 4 ) );
326 log (
Debug() ,
"Getting reply byte counter" );
327 boost::asio::ip::tcp::endpoint lEndpoint;
328 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
331 while (
mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
333 log (
Debug() ,
"Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ",
mDeadlineTimer.expires_from_now() ,
")." );
334 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
339 SteadyClock_t::time_point lNow = SteadyClock_t::now();
349 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
352 std::size_t lNrReplyBuffers = 1;
353 uint32_t lRequestBytes = 0;
354 uint32_t lExpectedReplyBytes = 0;
358 lNrReplyBuffers += ( *lBufIt )->getReplyBuffer().size();
359 lRequestBytes += ( *lBufIt )->sendCounter();
360 lExpectedReplyBytes += ( *lBufIt )->replyCounter();
371 SteadyClock_t::time_point lNow = SteadyClock_t::now();
373 if (
mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
375 exception::TcpTimeout* lExc =
new exception::TcpTimeout();
376 log ( *lExc ,
"Timeout (" ,
Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) ,
" ms) occurred for receive (header) from ",
377 ( this->
uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
" with URI '", this->
uri(),
"'. ",
379 Integer(lRequestBytes),
"/",
Integer(lExpectedReplyBytes),
" bytes sent/expected). Last send / receive queued ",
380 boost::chrono::duration<float, boost::milli>(lNow -
mLastSendQueued).count(),
" / ",
381 boost::chrono::duration<float, boost::milli>(lNow -
mLastRecvQueued),
" ago.");
383 log (
Error(),
"Extra timeout-related info - round-trip times: ",
mRTTStats);
384 log (
Error(),
"Extra timeout-related info - send-recv times: ",
mLSTStats);
388 if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
390 log ( *lExc ,
"ASIO reported an error: " ,
Quote ( aErrorCode.message() ) );
399 if ( ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) ) || ( aBytesTransferred != 4 ) )
407 ( this->
uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
" with URI: " , this->
uri() );
410 if ( aBytesTransferred != 4 )
413 ( this->
uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
414 " with URI " ,
Quote ( this->
uri() ) ,
", but only received " ,
Integer ( aBytesTransferred ) ,
" bytes" );
423 catch (
const std::exception& aExc )
432 SteadyClock_t::time_point lReadHeaderTimestamp = SteadyClock_t::now();
438 std::vector< boost::asio::mutable_buffer > lAsioReplyBuffer;
440 lAsioReplyBuffer.reserve ( lNrReplyBuffers );
445 std::deque< std::pair< uint8_t* , uint32_t > >& lReplyBuffers ( ( *lBufIt )->getReplyBuffer() );
447 for ( std::deque< std::pair< uint8_t* , uint32_t > >::iterator lIt = lReplyBuffers.begin() ; lIt != lReplyBuffers.end() ; ++lIt )
449 lAsioReplyBuffer.push_back ( boost::asio::mutable_buffer ( lIt->first , lIt->second ) );
453 boost::system::error_code lErrorCode = boost::asio::error::would_block;
454 std::size_t lBytesTransferred = boost::asio::read (
mSocket , lAsioReplyBuffer , boost::asio::transfer_exactly (
mReplyByteCounter ), lErrorCode );
456 if ( ( lErrorCode && ( lErrorCode != boost::asio::error::eof ) ) || ( lBytesTransferred !=
mReplyByteCounter ) )
461 if (
mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
464 log ( *
mAsynchronousException ,
"Timeout (" ,
Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) ,
" milliseconds) occurred for receive (chunk) from ",
465 ( this->
uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
" with URI: ", this->
uri() );
471 ( this->
uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
" with URI: " , this->
uri() );
477 ( this->
uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
478 " with URI " ,
Quote ( this->
uri() ) ,
", but only received " ,
Integer ( lBytesTransferred ) ,
" bytes" );
531 mDeadlineTimer.expires_from_now ( boost::posix_time::seconds(60) );
538 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
544 if (
mDeadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now() )
549 log (
Warning() ,
"Closing TCP socket for device with URI " ,
Quote ( this->
uri() ) ,
" since deadline has passed" );
553 log (
Debug() ,
"Closing TCP socket for device with URI " ,
Quote ( this->
uri() ) ,
" since no communication in 60 seconds" );
567 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
592 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
597 log (
Warning() ,
"Closing TCP socket for device with URI " ,
Quote ( this->
uri() ) ,
" since exception detected." );
628 InnerProtocol::dispatchExceptionHandler();
636 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
646 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
boost::condition_variable mConditionalVariable
A conditional variable for blocking the main thread until the variable with which it is associated is...
std::deque< std::pair< std::vector< boost::shared_ptr< Buffers > >, SteadyClock_t::time_point > > mReplyQueue
The list of buffers still awaiting a reply.
void returnBufferToPool(boost::shared_ptr< Buffers > &aBuffers)
Function to return a buffer to the buffer pool.
void CheckDeadline()
Function called by the ASIO deadline timer.
std::deque< boost::shared_ptr< Buffers > > mDispatchQueue
The list of buffers still waiting to be sent.
uint32_t mPacketsInFlight
Counter of how many writes have been sent, for which no reply has yet been received.
void read_callback(const boost::system::error_code &aErrorCode, std::size_t aBytesTransferred)
Callback function which is called upon completion of the ASIO async read This, then, checks the queue to see if there are more packets to be sent and if so, calls write.
minutes past the hour formatted as two digits e.g.
void write()
Initialize performing the next TCP write operation In multi-threaded mode, this runs the ASIO async w...
virtual void Flush()
Concrete implementation of the synchronization function to block until all buffers have been sent...
boost::asio::deadline_timer mDeadlineTimer
The mechanism for providing the time-out.
TimeIntervalStats mInterSendTimeStats
bool mFlushStarted
Boolean specifying whether or not the main thread is within TCP::Flush method. Its value checked by t...
void NotifyConditionalVariable(const bool &aValue)
Function to set the value of a variable associated with a BOOST conditional-variable and then notify ...
uhal::exception::exception * mAsynchronousException
A pointer to an exception object for passing exceptions from the worker thread to the main thread...
SteadyClock_t::time_point mLastRecvQueued
SteadyClock_t::time_point mLastSendQueued
boost::asio::ip::tcp::resolver::iterator mEndpoint
A shared pointer to a boost::asio tcp endpoint - used by the delayed (open-on-first-use) connect...
An abstract base exception class providing an interface to a throw/ThrowAsDerivedType mechanism which...
TimeIntervalStats mRTTStats
uint32_t mSendByteCounter
Variable storing "number of bytes to follow" field for the TCP chunk currently being sent...
boost::mutex mConditionalVariableMutex
A mutex for use by the conditional variable.
void read()
Initialize performing the next TCP read operation In multi-threaded mode, this runs the ASIO async re...
void WaitOnConditionalVariable()
Function to block a thread pending a BOOST conditional-variable and its associated regular variable...
boost::mutex mTransportLayerMutex
A MutEx lock used to make sure the access functions are thread safe.
_Quote< T > Quote(const T &aT)
void write_callback(const boost::system::error_code &aErrorCode, std::size_t aBytesTransferred)
Callback function which is called upon completion of the ASIO async write This, then, makes a call to read to read back the reply to what has just been sent.
virtual const char * what() const
Function which returns the error message associated with an exception If no error message has previou...
void connect()
Make the TCP connection.
TimeIntervalStats mLSTStats
TimeIntervalStats mInterRecvTimeStats
std::pair< std::vector< boost::shared_ptr< Buffers > >, SteadyClock_t::time_point > mReplyBuffers
The buffers containing the payloads for the receive operation that's currently in progress...
uint32_t getMaxSendSize()
Return the maximum size to be sent based on the buffer size in the target.
boost::thread mDispatchThread
The Worker thread in Multi-threaded mode.
virtual ~TCP()
Destructor.
TCP(const TCP &aTCP)
Copy Constructor This creates a new socket, dispatch queue, dispatch thread, etc. ...
virtual void dispatchExceptionHandler()
Function which tidies up this protocol layer in the event of an exception.
boost::asio::io_service mIOservice
The boost::asio::io_service used to create the connections.
virtual exception::exception * validate(boost::shared_ptr< Buffers > aBuffers)
Function which dispatch calls when the reply is received to check that the headers are as expected...
void add(const Clock_t::time_point &aT1, const Clock_t::time_point &aT2)
Transport protocol to transfer an IPbus buffer via TCP.
void implementDispatch(boost::shared_ptr< Buffers > aBuffers)
Send the IPbus buffer to the target, read back the response and call the packing-protocol's validate ...
uint32_t mReplyByteCounter
Variable used to store "number of bytes to follow" field for the next/current TCP chunk being receive...
std::vector< boost::shared_ptr< Buffers > > mDispatchBuffers
The buffers containing the payload for the send operation that's currently in progress.
bool mFlushDone
A variable associated with the conditional variable which specifies whether all packets have been sen...
uint32_t getMaxReplySize()
Return the maximum size of reply packet based on the buffer size in the target.
_Integer< T, IntFmt<> > Integer(const T &aT)
Forward declare a function which creates an instance of the ultra-lightweight wrapper from an integer...
boost::asio::ip::tcp::socket mSocket
A shared pointer to a boost::asio tcp socket through which the operation will be performed.
Struct to store a URI when parsed by boost spirit.