39 #include <boost/bind/bind.hpp>
40 #include <boost/lambda/lambda.hpp>
41 #include <boost/asio/connect.hpp>
42 #include <boost/asio/write.hpp>
43 #include <boost/asio/read.hpp>
44 #include <boost/asio/placeholders.hpp>
45 #include "boost/date_time/posix_time/posix_time.hpp"
59 template <
typename InnerProtocol >
61 InnerProtocol ( aId , aUri ),
63 mSocket ( mIOservice ,
boost::asio::ip::udp::endpoint (
boost::asio::ip::udp::v4(), 0 ) ),
64 mEndpoint ( *
boost::asio::ip::udp::resolver ( mIOservice ).resolve (
boost::asio::ip::udp::resolver::query (
boost::asio::ip::udp::v4() , aUri.mHostname , aUri.mPort ) ) ),
65 mDeadlineTimer ( mIOservice ),
66 mReplyMemory ( 1500 , 0x00000000 ),
67 mIOserviceWork ( mIOservice ),
68 mDispatchThread (
boost::bind ( &
boost::asio::io_service::run , & ( mIOservice ) ) ),
71 mPacketsInFlight ( 0 ),
73 mAsynchronousException ( NULL )
79 template <
typename InnerProtocol >
86 while ( mSocket.is_open() )
90 mDispatchThread.join();
91 boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
95 catch (
const std::exception& aExc )
103 template <
typename InnerProtocol >
106 boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
108 if ( mAsynchronousException )
110 log ( *mAsynchronousException ,
"Rethrowing Asynchronous Exception from 'implementDispatch' method of " ,
Type<
UDP< InnerProtocol > >() );
111 mAsynchronousException->throwAsDerivedType();
114 if ( ! mSocket.is_open() )
120 if ( mDispatchBuffers || mPacketsInFlight == this->getMaxNumberOfBuffers() )
122 mDispatchQueue.push_back ( aBuffers );
126 mDispatchBuffers = aBuffers;
132 template <
typename InnerProtocol >
139 template <
typename InnerProtocol >
146 template <
typename InnerProtocol >
149 log (
Info() ,
"Creating new UDP socket for device " ,
Quote ( this->
uri() ) ,
", as it appears to have been closed..." );
151 mSocket.open ( boost::asio::ip::udp::v4() );
154 log (
Info() ,
"UDP socket created successfully." );
158 template <
typename InnerProtocol >
161 NotifyConditionalVariable (
false );
163 if ( !mDispatchBuffers )
165 log (
Error() , __PRETTY_FUNCTION__ ,
" called when 'mDispatchBuffers' was NULL" );
169 std::vector< boost::asio::const_buffer > lAsioSendBuffer;
170 lAsioSendBuffer.push_back ( boost::asio::const_buffer ( mDispatchBuffers->getSendBuffer() , mDispatchBuffers->sendCounter() ) );
171 log (
Debug() ,
"Sending " ,
Integer ( mDispatchBuffers->sendCounter() ) ,
" bytes" );
172 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
175 while ( mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
177 log (
Debug() ,
"Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ", mDeadlineTimer.expires_from_now() ,
")." );
178 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
181 mSocket.async_send_to ( lAsioSendBuffer , mEndpoint , boost::bind ( &
UDP< InnerProtocol >::write_callback,
this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) );
186 template <
typename InnerProtocol >
189 boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
191 if ( mAsynchronousException )
193 NotifyConditionalVariable (
true );
197 if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
199 exception::UdpTimeout* lExc =
new exception::UdpTimeout();
200 log ( *lExc ,
"Timeout (" ,
Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) ,
" milliseconds) occurred for UDP send to target with URI: ", this->
uri() );
202 if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
204 log ( *lExc ,
"ASIO reported an error: " ,
Quote ( aErrorCode.message() ) );
207 mAsynchronousException = lExc;
208 NotifyConditionalVariable (
true );
212 if ( ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) ) || ( aBytesTransferred != mDispatchBuffers->sendCounter() ) )
215 exception::ASIOUdpError* lExc =
new exception::ASIOUdpError();
218 log ( *lExc ,
"Error ",
Quote ( aErrorCode.message() ) ,
" encountered during send to UDP target with URI: " , this->uri() );
220 if ( aBytesTransferred != mDispatchBuffers->sendCounter() )
222 log ( *lExc ,
"Only ",
Integer ( aBytesTransferred ) ,
" of " ,
Integer ( mDispatchBuffers->sendCounter() ) ,
" bytes transferred in UDP send to URI: " , this->uri() );
224 mAsynchronousException = lExc;
225 NotifyConditionalVariable (
true );
231 mReplyQueue.push_back ( mDispatchBuffers );
235 mReplyBuffers = mDispatchBuffers;
239 if ( mDispatchQueue.size() && mPacketsInFlight != this->getMaxNumberOfBuffers() )
241 mDispatchBuffers = mDispatchQueue.front();
242 mDispatchQueue.pop_front();
247 mDispatchBuffers.reset();
252 template <
typename InnerProtocol >
255 if ( !mReplyBuffers )
257 log (
Error() , __PRETTY_FUNCTION__ ,
" called when 'mReplyBuffers' was NULL" );
258 NotifyConditionalVariable (
true );
262 std::vector<boost::asio::mutable_buffer> lAsioReplyBuffer ( 1 , boost::asio::mutable_buffer ( & ( mReplyMemory.at ( 0 ) ) , mReplyBuffers->replyCounter() ) );
263 log (
Debug() ,
"Expecting " ,
Integer ( mReplyBuffers->replyCounter() ) ,
" bytes in reply." );
264 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
267 while ( mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
269 log (
Debug() ,
"Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ", mDeadlineTimer.expires_from_now() ,
")." );
270 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
273 mSocket.async_receive ( lAsioReplyBuffer , 0 , boost::bind ( &
UDP<InnerProtocol>::read_callback,
this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) );
277 template <
typename InnerProtocol >
281 boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
282 if ( mAsynchronousException )
284 NotifyConditionalVariable (
true );
288 if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
290 mAsynchronousException =
new exception::UdpTimeout();
291 log ( *mAsynchronousException ,
"Timeout (" ,
Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) ,
" milliseconds) occurred for UDP receive from target with URI: ", this->
uri() );
293 if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
295 log ( *mAsynchronousException ,
"ASIO reported an error: " ,
Quote ( aErrorCode.message() ) );
298 NotifyConditionalVariable (
true );
303 if ( !mReplyBuffers )
305 log (
Error() , __PRETTY_FUNCTION__ ,
" called when 'mReplyBuffers' was NULL" );
309 if ( aBytesTransferred != mReplyBuffers->replyCounter() )
311 log (
Error() ,
"Expected " ,
Integer ( mReplyBuffers->replyCounter() ) ,
"-byte UDP payload from target " ,
Quote ( this->
uri() ) ,
", but only received " ,
Integer ( aBytesTransferred ) ,
" bytes. Validating returned data to work out where error occurred." );
314 if ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) )
318 boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
319 mAsynchronousException =
new exception::ASIOUdpError();
320 log ( *mAsynchronousException ,
"Error ",
Quote ( aErrorCode.message() ) ,
" encountered during receive from UDP target with URI: " , this->uri() );
322 NotifyConditionalVariable (
true );
327 std::deque< std::pair< uint8_t* , uint32_t > >& lReplyBuffers ( mReplyBuffers->getReplyBuffer() );
328 uint8_t* lReplyBuf ( & ( mReplyMemory.at ( 0 ) ) );
330 for ( std::deque< std::pair< uint8_t* , uint32_t > >::iterator lIt = lReplyBuffers.begin() ; lIt != lReplyBuffers.end() ; ++lIt )
333 if (
static_cast<uint32_t
> ( lReplyBuf - ( & mReplyMemory.at ( 0 ) ) ) >= aBytesTransferred )
338 uint32_t lNrBytesToCopy =
std::min ( lIt->second ,
static_cast<uint32_t
> ( aBytesTransferred - ( lReplyBuf - ( & mReplyMemory.at ( 0 ) ) ) ) );
339 memcpy ( lIt->first, lReplyBuf, lNrBytesToCopy );
340 lReplyBuf += lNrBytesToCopy;
347 boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
348 mAsynchronousException = lExc;
353 boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
354 mAsynchronousException =
new exception::ValidationError ();
355 log ( *mAsynchronousException ,
"Exception caught during reply validation for UDP device with URI " ,
Quote ( this->
uri() ) ,
"; what returned: " ,
Quote ( aExc.
what() ) );
358 boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
360 if ( mAsynchronousException )
362 NotifyConditionalVariable (
true );
366 if ( mReplyQueue.size() )
368 mReplyBuffers = mReplyQueue.front();
369 mReplyQueue.pop_front();
374 mReplyBuffers.reset();
379 if ( !mDispatchBuffers && mDispatchQueue.size() && mPacketsInFlight != this->getMaxNumberOfBuffers() )
381 mDispatchBuffers = mDispatchQueue.front();
382 mDispatchQueue.pop_front();
386 if ( !mDispatchBuffers && !mReplyBuffers )
388 mDeadlineTimer.expires_from_now( boost::posix_time::seconds(60) );
389 NotifyConditionalVariable (
true );
395 template <
typename InnerProtocol >
401 boost::lock_guard<boost::mutex> lLock ( this->mTransportLayerMutex );
403 if ( mDeadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now() )
406 if ( mDispatchBuffers || mReplyBuffers )
408 log (
Warning() ,
"Closing UDP socket for URI " ,
Quote ( this->
uri() ) ,
" since deadline has passed" );
412 log (
Debug() ,
"Closing UDP socket for URI " ,
Quote ( this->
uri() ) ,
" since no communication in last 60 seconds" );
420 mDeadlineTimer.expires_at ( boost::posix_time::pos_infin );
428 template <
typename InnerProtocol >
431 WaitOnConditionalVariable();
433 boost::lock_guard<boost::mutex> lLock ( mTransportLayerMutex );
434 if ( mAsynchronousException )
436 mAsynchronousException->throwAsDerivedType();
442 template <
typename InnerProtocol >
445 log (
Warning() ,
"Closing Socket since exception detected." );
447 if ( mSocket.is_open() )
453 while ( mSocket.is_open() )
460 NotifyConditionalVariable (
true );
462 if ( mAsynchronousException )
464 delete mAsynchronousException;
465 mAsynchronousException = NULL;
470 mPacketsInFlight = 0;
473 mDispatchBuffers.reset();
475 mReplyBuffers.reset();
477 InnerProtocol::dispatchExceptionHandler();
481 template <
typename InnerProtocol >
485 boost::lock_guard<boost::mutex> lLock ( mConditionalVariableMutex );
488 mConditionalVariable.notify_one();
492 template <
typename InnerProtocol >
495 boost::unique_lock<boost::mutex> lLock ( mConditionalVariableMutex );
497 while ( !mFlushDone )
499 mConditionalVariable.wait ( lLock );