38 #include <boost/bind/bind.hpp>
39 #include <boost/lambda/lambda.hpp>
40 #include <boost/asio/connect.hpp>
41 #include <boost/asio/write.hpp>
42 #include <boost/asio/read.hpp>
43 #include <boost/asio/placeholders.hpp>
44 #include <boost/date_time/posix_time/posix_time.hpp>
45 #include <boost/chrono/chrono_io.hpp>
62 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
64 InnerProtocol ( aId , aUri ),
66 mSocket ( mIOservice ),
67 mEndpoint (
boost::asio::ip::tcp::resolver ( mIOservice ).resolve (
boost::asio::ip::tcp::resolver::query ( aUri.mHostname , aUri.mPort ) ) ),
68 mDeadlineTimer ( mIOservice ),
69 mIOserviceWork ( mIOservice ),
70 mDispatchThread (
boost::bind ( &
boost::asio::io_service::run , & ( mIOservice ) ) ),
73 mPacketsInFlight ( 0 ),
74 mFlushStarted ( false ),
76 mAsynchronousException ( NULL )
82 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
89 while ( mSocket.is_open() )
93 mDispatchThread.join();
95 for (
size_t i = 0; i < mReplyQueue.size(); i++)
100 catch (
const std::exception& aExc )
107 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
110 boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
112 if ( mAsynchronousException )
115 mAsynchronousException->throwAsDerivedType();
118 if ( ! mSocket.is_open() )
123 mFlushStarted =
false;
124 mDispatchQueue.push_back ( aBuffers );
126 if ( mDispatchBuffers.empty() && ( mDispatchQueue.size() >= nr_buffers_per_send ) && ( mPacketsInFlight < this->getMaxNumberOfBuffers() ) )
133 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
140 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
147 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
150 log (
Info() ,
"Attempting to create TCP connection to '" , mEndpoint->host_name() ,
"' port " , mEndpoint->service_name() ,
"." );
151 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
152 boost::system::error_code lErrorCode;
153 boost::asio::connect ( mSocket , mEndpoint , lErrorCode );
159 while ( mSocket.is_open() )
162 exception::TcpConnectionFailure lExc;
163 std::ostringstream oss;
165 if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
167 oss <<
"Timeout (" << this->getBoostTimeoutPeriod().total_milliseconds() <<
" milliseconds) occurred when connecting to ";
169 else if ( lErrorCode == boost::asio::error::connection_refused )
171 oss <<
"Connection refused for ";
175 oss <<
"Error \"" << lErrorCode.message() <<
"\" encountered when connecting to ";
178 log ( lExc , oss.str() , ( this->uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
179 " at ",
Quote ( this->mUri.mHostname +
":" + this->mUri.mPort ) ,
". URI=" ,
Quote ( this->
uri() ) );
183 mSocket.set_option ( boost::asio::ip::tcp::no_delay (
true ) );
186 log (
Info() ,
"TCP connection succeeded" );
191 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
194 NotifyConditionalVariable (
false );
196 std::vector< boost::asio::const_buffer > lAsioSendBuffer;
197 lAsioSendBuffer.push_back ( boost::asio::const_buffer ( &mSendByteCounter , 4 ) );
198 mSendByteCounter = 0;
199 std::size_t lNrBuffersToSend =
std::min ( mDispatchQueue.size(), nr_buffers_per_send );
200 mDispatchBuffers.reserve ( lNrBuffersToSend );
202 for ( std::size_t i = 0; i < lNrBuffersToSend; i++ )
204 mDispatchBuffers.push_back ( mDispatchQueue.front() );
205 mDispatchQueue.pop_front();
207 mSendByteCounter += lBuffer->sendCounter();
208 lAsioSendBuffer.push_back ( boost::asio::const_buffer ( lBuffer->getSendBuffer() , lBuffer->sendCounter() ) );
211 log (
Debug() ,
"Sending " ,
Integer ( mSendByteCounter ) ,
" bytes from ",
Integer ( mDispatchBuffers.size() ),
" buffers" );
212 mSendByteCounter = htonl ( mSendByteCounter );
214 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
217 while ( mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
219 log (
Debug() ,
"Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ", mDeadlineTimer.expires_from_now() ,
")." );
220 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
224 mPacketsInFlight += mDispatchBuffers.size();
226 SteadyClock_t::time_point lNow = SteadyClock_t::now();
227 if (mLastSendQueued > SteadyClock_t::time_point())
228 mInterSendTimeStats.add(mLastSendQueued, lNow);
229 mLastSendQueued = lNow;
234 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
237 boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
238 mSendByteCounter = ntohl ( mSendByteCounter );
240 if ( mAsynchronousException )
242 NotifyConditionalVariable (
true );
246 if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
248 mAsynchronousException =
new exception::TcpTimeout();
249 log ( *mAsynchronousException ,
"Timeout (" ,
Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) ,
" milliseconds) occurred for send to ",
250 ( this->
uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
" with URI: ", this->
uri() );
252 if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
254 log ( *mAsynchronousException ,
"ASIO reported an error: " ,
Quote ( aErrorCode.message() ) );
257 NotifyConditionalVariable (
true );
261 if ( ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) ) || ( aBytesTransferred != ( mSendByteCounter+4 ) ) )
263 mAsynchronousException =
new exception::ASIOTcpError();
264 log ( *mAsynchronousException ,
"Error ",
Quote ( aErrorCode.message() ) ,
" encountered during send to ",
265 ( this->uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
" with URI: " , this->uri() );
271 while ( mSocket.is_open() )
274 catch (
const std::exception& aExc )
276 log ( *mAsynchronousException ,
"Error closing TCP socket following the ASIO send error" );
279 if ( aBytesTransferred != ( mSendByteCounter + 4 ) )
281 log ( *mAsynchronousException ,
"Attempted to send " ,
Integer ( mSendByteCounter ) ,
" bytes to ",
282 ( this->
uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
283 " with URI " ,
Quote ( this->
uri() ) ,
", but only sent " ,
Integer ( aBytesTransferred ) ,
" bytes" );
286 NotifyConditionalVariable (
true );
290 if ( ! mReplyBuffers.first.empty() )
292 mReplyQueue.push_back ( std::make_pair(mDispatchBuffers, mLastSendQueued) );
296 mReplyBuffers = std::make_pair(mDispatchBuffers, mLastSendQueued);
300 mDispatchBuffers.clear();
302 if ( ( mDispatchQueue.size() >= (mFlushStarted ? 1 : nr_buffers_per_send) ) && ( mPacketsInFlight < this->getMaxNumberOfBuffers() ) )
310 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
313 std::vector< boost::asio::mutable_buffer > lAsioReplyBuffer;
314 lAsioReplyBuffer.push_back ( boost::asio::mutable_buffer ( &mReplyByteCounter , 4 ) );
315 log (
Debug() ,
"Getting reply byte counter" );
316 boost::asio::ip::tcp::endpoint lEndpoint;
317 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
320 while ( mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
322 log (
Debug() ,
"Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ", mDeadlineTimer.expires_from_now() ,
")." );
323 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
328 SteadyClock_t::time_point lNow = SteadyClock_t::now();
329 if (mLastRecvQueued > SteadyClock_t::time_point())
330 mInterRecvTimeStats.add(mLastRecvQueued, lNow);
331 mLastRecvQueued = lNow;
338 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
341 std::size_t lNrReplyBuffers = 1;
342 uint32_t lRequestBytes = 0;
343 uint32_t lExpectedReplyBytes = 0;
345 for ( std::vector<
boost::shared_ptr<Buffers> >::const_iterator lBufIt = mReplyBuffers.first.begin(); lBufIt != mReplyBuffers.first.end(); lBufIt++ )
347 lNrReplyBuffers += ( *lBufIt )->getReplyBuffer().size();
348 lRequestBytes += ( *lBufIt )->sendCounter();
349 lExpectedReplyBytes += ( *lBufIt )->replyCounter();
353 boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
354 if ( mAsynchronousException )
356 NotifyConditionalVariable (
true );
360 SteadyClock_t::time_point lNow = SteadyClock_t::now();
362 if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
364 exception::TcpTimeout* lExc =
new exception::TcpTimeout();
365 log ( *lExc ,
"Timeout (" ,
Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) ,
" ms) occurred for receive (header) from ",
366 ( this->
uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
" with URI '", this->
uri(),
"'. ",
367 Integer(mPacketsInFlight),
" packets in flight, ",
Integer(mReplyBuffers.first.size()),
" in this chunk (",
368 Integer(lRequestBytes),
"/",
Integer(lExpectedReplyBytes),
" bytes sent/expected). Last send / receive queued ",
369 boost::chrono::duration<float, boost::milli>(lNow - mLastSendQueued).count(),
" / ",
370 boost::chrono::duration<float, boost::milli>(lNow - mLastRecvQueued),
" ago.");
372 log (
Error(),
"Extra timeout-related info - round-trip times: ", mRTTStats);
373 log (
Error(),
"Extra timeout-related info - send-recv times: ", mLSTStats);
374 log (
Error(),
"Extra timeout-related info - inter-send times: ", mInterSendTimeStats);
375 log (
Error(),
"Extra timeout-related info - inter-recv times: ", mInterRecvTimeStats);
377 if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
379 log ( *lExc ,
"ASIO reported an error: " ,
Quote ( aErrorCode.message() ) );
382 mAsynchronousException = lExc;
383 NotifyConditionalVariable (
true );
388 if ( ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) ) || ( aBytesTransferred != 4 ) )
390 boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
391 mAsynchronousException =
new exception::ASIOTcpError();
395 log ( *mAsynchronousException ,
"Error ",
Quote ( aErrorCode.message() ) ,
" encountered during receive from ",
396 ( this->uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
" with URI: " , this->uri() );
399 if ( aBytesTransferred != 4 )
401 log ( *mAsynchronousException,
"Expected to receive 4-byte header in async read from ",
402 ( this->
uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
403 " with URI " ,
Quote ( this->
uri() ) ,
", but only received " ,
Integer ( aBytesTransferred ) ,
" bytes" );
409 while ( mSocket.is_open() )
412 catch (
const std::exception& aExc )
414 log ( *mAsynchronousException ,
"Error closing socket following ASIO read error" );
417 NotifyConditionalVariable (
true );
421 SteadyClock_t::time_point lReadHeaderTimestamp = SteadyClock_t::now();
422 mRTTStats.add(mReplyBuffers.second, lReadHeaderTimestamp);
423 mLSTStats.add(mLastSendQueued, lReadHeaderTimestamp);
425 mReplyByteCounter = ntohl ( mReplyByteCounter );
426 log (
Debug() ,
"Byte Counter says " ,
Integer ( mReplyByteCounter ) ,
" bytes are coming" );
427 std::vector< boost::asio::mutable_buffer > lAsioReplyBuffer;
429 lAsioReplyBuffer.reserve ( lNrReplyBuffers );
430 log (
Debug() ,
"Expecting " ,
Integer ( lExpectedReplyBytes ) ,
" bytes in reply, for ",
Integer ( mReplyBuffers.first.size() ),
" buffers" );
432 for ( std::vector<
boost::shared_ptr<Buffers> >::const_iterator lBufIt = mReplyBuffers.first.begin(); lBufIt != mReplyBuffers.first.end(); lBufIt++ )
434 std::deque< std::pair< uint8_t* , uint32_t > >& lReplyBuffers ( ( *lBufIt )->getReplyBuffer() );
436 for ( std::deque< std::pair< uint8_t* , uint32_t > >::iterator lIt = lReplyBuffers.begin() ; lIt != lReplyBuffers.end() ; ++lIt )
438 lAsioReplyBuffer.push_back ( boost::asio::mutable_buffer ( lIt->first , lIt->second ) );
442 boost::system::error_code lErrorCode = boost::asio::error::would_block;
443 std::size_t lBytesTransferred = boost::asio::read ( mSocket , lAsioReplyBuffer , boost::asio::transfer_exactly ( mReplyByteCounter ), lErrorCode );
445 if ( ( lErrorCode && ( lErrorCode != boost::asio::error::eof ) ) || ( lBytesTransferred != mReplyByteCounter ) )
447 boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
450 if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
452 mAsynchronousException =
new exception::TcpTimeout();
453 log ( *mAsynchronousException ,
"Timeout (" ,
Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) ,
" milliseconds) occurred for receive (chunk) from ",
454 ( this->
uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
" with URI: ", this->
uri() );
458 mAsynchronousException =
new exception::ASIOTcpError();
459 log ( *mAsynchronousException ,
"Error ",
Quote ( aErrorCode.message() ) ,
" encountered during receive from ",
460 ( this->uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
" with URI: " , this->uri() );
463 if ( lBytesTransferred != mReplyByteCounter )
465 log ( *mAsynchronousException,
"Expected to receive " ,
Integer ( mReplyByteCounter ) ,
" bytes in read from ",
466 ( this->
uri().find (
"chtcp-" ) == 0 ?
"ControlHub" :
"TCP server" ) ,
467 " with URI " ,
Quote ( this->
uri() ) ,
", but only received " ,
Integer ( lBytesTransferred ) ,
" bytes" );
470 NotifyConditionalVariable (
true );
475 for ( std::vector<
boost::shared_ptr<Buffers> >::const_iterator lBufIt = mReplyBuffers.first.begin(); lBufIt != mReplyBuffers.first.end(); lBufIt++ )
481 boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
482 mAsynchronousException = lExc;
487 boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
488 mAsynchronousException =
new exception::ValidationError ();
489 log ( *mAsynchronousException ,
"Exception caught during reply validation for TCP device with URI " ,
Quote ( this->
uri() ) ,
"; what returned: " ,
Quote ( aExc.
what() ) );
492 if ( mAsynchronousException )
494 NotifyConditionalVariable (
true );
499 boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
500 mPacketsInFlight -= mReplyBuffers.first.size();
502 if ( mReplyQueue.size() )
504 mReplyBuffers = mReplyQueue.front();
505 mReplyQueue.pop_front();
510 mReplyBuffers.first.clear();
513 if ( mDispatchBuffers.empty() && ( mDispatchQueue.size() >= (mFlushStarted ? 1 : nr_buffers_per_send) ) && ( mPacketsInFlight < this->getMaxNumberOfBuffers() ) )
518 if ( mDispatchBuffers.empty() && mReplyBuffers.first.empty() )
520 mDeadlineTimer.expires_from_now ( boost::posix_time::seconds(60) );
521 NotifyConditionalVariable (
true );
527 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
530 boost::lock_guard<boost::mutex> lLock ( this->mTransportLayerMutex );
533 if ( mDeadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now() )
536 if ( mDispatchBuffers.size() || mReplyBuffers.first.size() )
538 log (
Warning() ,
"Closing TCP socket for device with URI " ,
Quote ( this->
uri() ) ,
" since deadline has passed" );
542 log (
Debug() ,
"Closing TCP socket for device with URI " ,
Quote ( this->
uri() ) ,
" since no communication in 60 seconds" );
548 mDeadlineTimer.expires_at ( boost::posix_time::pos_infin );
556 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
560 boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
561 mFlushStarted =
true;
563 if ( mDispatchQueue.size() && mDispatchBuffers.empty() )
569 WaitOnConditionalVariable();
571 boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
573 if ( mAsynchronousException )
575 mAsynchronousException->throwAsDerivedType();
580 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
583 if ( mSocket.is_open() )
585 log (
Warning() ,
"Closing TCP socket for device with URI " ,
Quote ( this->
uri() ) ,
" since exception detected." );
591 while ( mSocket.is_open() )
598 NotifyConditionalVariable (
true );
600 if ( mAsynchronousException )
602 delete mAsynchronousException;
603 mAsynchronousException = NULL;
607 for (
size_t i = 0; i < mReplyQueue.size(); i++)
609 mPacketsInFlight = 0;
613 mSendByteCounter = 0;
614 mReplyByteCounter = 0;
616 InnerProtocol::dispatchExceptionHandler();
620 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
624 boost::lock_guard<boost::mutex> lLock ( mConditionalVariableMutex );
627 mConditionalVariable.notify_one();
630 template <
typename InnerProtocol , std::
size_t nr_buffers_per_send >
633 boost::unique_lock<boost::mutex> lLock ( mConditionalVariableMutex );
635 while ( !mFlushDone )
637 mConditionalVariable.wait ( lLock );