40#include <boost/asio/connect.hpp>
41#include <boost/asio/write.hpp>
42#include <boost/asio/read.hpp>
43#include <boost/asio/placeholders.hpp>
44#include "boost/date_time/posix_time/posix_time.hpp"
58 template <
typename InnerProtocol >
60 InnerProtocol ( aId , aUri ),
61 mMaxPayloadSize (350 * 4),
63 mSocket ( mIOservice ,
boost::asio::ip::udp::endpoint (
boost::asio::ip::udp::v4(), 0 ) ),
64 mEndpoint ( *
boost::asio::ip::udp::resolver ( mIOservice ).resolve (
boost::asio::ip::udp::resolver::query (
boost::asio::ip::udp::v4() , aUri.mHostname , aUri.mPort ) ) ),
65 mDeadlineTimer ( mIOservice ),
67 mIOserviceWork ( mIOservice ),
68 mDispatchThread ( [this] () {
mIOservice.run(); } ),
71 mPacketsInFlight ( 0 ),
73 mAsynchronousException ( NULL )
75 mDeadlineTimer.async_wait ([
this] (
const boost::system::error_code&) { this->CheckDeadline(); });
78 for (
const auto& lArg : aUri.mArguments) {
79 if (lArg.first ==
"max_payload_size") {
81 mMaxPayloadSize = boost::lexical_cast<size_t>(lArg.second);
83 catch (
const boost::bad_lexical_cast&) {
84 throw exception::InvalidURI(
"Client URI \"" + this->
uri() +
"\": Invalid value, \"" + lArg.second +
"\", specified for attribute \"" + lArg.first +
"\"");
86 log (
Info(),
"Client with URI ",
Quote(this->
uri()),
": Maximum UDP payload size set to ", std::to_string(mMaxPayloadSize),
" bytes");
89 throw exception::InvalidURI(
"Client URI \"" + this->
uri() +
"\" has unexpected attribute \"" + lArg.first +
"\"");
92 mReplyMemory.resize(mMaxPayloadSize + 20, 0x00000000);
96 template <
typename InnerProtocol >
103 while ( mSocket.is_open() )
107 mDispatchThread.join();
108 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
112 catch (
const std::exception& aExc )
120 template <
typename InnerProtocol >
123 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
125 if ( mAsynchronousException )
127 log ( *mAsynchronousException ,
"Rethrowing Asynchronous Exception from 'implementDispatch' method of " ,
Type<
UDP< InnerProtocol > >() );
128 mAsynchronousException->throwAsDerivedType();
131 if ( ! mSocket.is_open() )
137 if ( mDispatchBuffers || mPacketsInFlight == this->getMaxNumberOfBuffers() )
139 mDispatchQueue.push_back ( aBuffers );
143 mDispatchBuffers = aBuffers;
149 template <
typename InnerProtocol >
152 return mMaxPayloadSize;
156 template <
typename InnerProtocol >
159 return mMaxPayloadSize;
163 template <
typename InnerProtocol >
166 log (
Info() ,
"Creating new UDP socket for device " ,
Quote ( this->uri() ) ,
", as it appears to have been closed..." );
168 mSocket.open ( boost::asio::ip::udp::v4() );
171 log (
Info() ,
"UDP socket created successfully." );
175 template <
typename InnerProtocol >
178 NotifyConditionalVariable (
false );
180 if ( !mDispatchBuffers )
182 log (
Error() , __PRETTY_FUNCTION__ ,
" called when 'mDispatchBuffers' was NULL" );
186 std::vector< boost::asio::const_buffer > lAsioSendBuffer;
187 lAsioSendBuffer.push_back ( boost::asio::const_buffer ( mDispatchBuffers->getSendBuffer() , mDispatchBuffers->sendCounter() ) );
188 log (
Debug() ,
"Sending " ,
Integer ( mDispatchBuffers->sendCounter() ) ,
" bytes" );
189 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
192 while ( mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
194 log (
Debug() ,
"Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ", mDeadlineTimer.expires_from_now() ,
")." );
195 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
198 mSocket.async_send_to ( lAsioSendBuffer , mEndpoint , [&] (
const boost::system::error_code& e, std::size_t n) { this->write_callback(e, n); });
203 template <
typename InnerProtocol >
206 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
208 if ( mAsynchronousException )
210 NotifyConditionalVariable (
true );
214 if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
216 exception::UdpTimeout* lExc =
new exception::UdpTimeout();
217 log ( *lExc ,
"Timeout (" ,
Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) ,
" milliseconds) occurred for UDP send to target with URI: ", this->uri() );
219 if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
221 log ( *lExc ,
"ASIO reported an error: " ,
Quote ( aErrorCode.message() ) );
224 mAsynchronousException = lExc;
225 NotifyConditionalVariable (
true );
229 if ( ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) ) || ( aBytesTransferred != mDispatchBuffers->sendCounter() ) )
232 exception::ASIOUdpError* lExc =
new exception::ASIOUdpError();
235 log ( *lExc ,
"Error ",
Quote ( aErrorCode.message() ) ,
" encountered during send to UDP target with URI: " , this->uri() );
237 if ( aBytesTransferred != mDispatchBuffers->sendCounter() )
239 log ( *lExc ,
"Only ",
Integer ( aBytesTransferred ) ,
" of " ,
Integer ( mDispatchBuffers->sendCounter() ) ,
" bytes transferred in UDP send to URI: " , this->uri() );
241 mAsynchronousException = lExc;
242 NotifyConditionalVariable (
true );
248 mReplyQueue.push_back ( mDispatchBuffers );
252 mReplyBuffers = mDispatchBuffers;
256 if ( mDispatchQueue.size() && mPacketsInFlight != this->getMaxNumberOfBuffers() )
258 mDispatchBuffers = mDispatchQueue.front();
259 mDispatchQueue.pop_front();
264 mDispatchBuffers.reset();
269 template <
typename InnerProtocol >
272 if ( !mReplyBuffers )
274 log (
Error() , __PRETTY_FUNCTION__ ,
" called when 'mReplyBuffers' was NULL" );
275 NotifyConditionalVariable (
true );
279 std::vector<boost::asio::mutable_buffer> lAsioReplyBuffer ( 1 , boost::asio::mutable_buffer ( & ( mReplyMemory.at ( 0 ) ) , mReplyBuffers->replyCounter() ) );
280 log (
Debug() ,
"Expecting " ,
Integer ( mReplyBuffers->replyCounter() ) ,
" bytes in reply." );
281 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
284 while ( mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
286 log (
Debug() ,
"Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ", mDeadlineTimer.expires_from_now() ,
")." );
287 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
290 mSocket.async_receive ( lAsioReplyBuffer , 0 , [&] (
const boost::system::error_code& e, std::size_t n) { this->read_callback(e, n); });
294 template <
typename InnerProtocol >
298 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
299 if ( mAsynchronousException )
301 NotifyConditionalVariable (
true );
305 if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
307 mAsynchronousException =
new exception::UdpTimeout();
308 log ( *mAsynchronousException ,
"Timeout (" ,
Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) ,
" milliseconds) occurred for UDP receive from target with URI: ", this->uri() );
310 if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
312 log ( *mAsynchronousException ,
"ASIO reported an error: " ,
Quote ( aErrorCode.message() ) );
315 NotifyConditionalVariable (
true );
320 if ( !mReplyBuffers )
322 log (
Error() , __PRETTY_FUNCTION__ ,
" called when 'mReplyBuffers' was NULL" );
326 if ( aBytesTransferred != mReplyBuffers->replyCounter() )
328 log (
Error() ,
"Expected " ,
Integer ( mReplyBuffers->replyCounter() ) ,
"-byte UDP payload from target " ,
Quote ( this->uri() ) ,
", but only received " ,
Integer ( aBytesTransferred ) ,
" bytes. Validating returned data to work out where error occurred." );
331 if ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) )
335 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
336 mAsynchronousException =
new exception::ASIOUdpError();
337 log ( *mAsynchronousException ,
"Error ",
Quote ( aErrorCode.message() ) ,
" encountered during receive from UDP target with URI: " , this->uri() );
339 NotifyConditionalVariable (
true );
344 std::deque< std::pair< uint8_t* , uint32_t > >& lReplyBuffers ( mReplyBuffers->getReplyBuffer() );
345 uint8_t* lReplyBuf ( & ( mReplyMemory.at ( 0 ) ) );
347 for (
const auto& lBuffer: lReplyBuffers)
350 if (
static_cast<uint32_t
> ( lReplyBuf - ( & mReplyMemory.at ( 0 ) ) ) >= aBytesTransferred )
353 uint32_t lNrBytesToCopy = std::min ( lBuffer.second ,
static_cast<uint32_t
> ( aBytesTransferred - ( lReplyBuf - ( & mReplyMemory.at ( 0 ) ) ) ) );
354 memcpy ( lBuffer.first, lReplyBuf, lNrBytesToCopy );
355 lReplyBuf += lNrBytesToCopy;
362 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
363 mAsynchronousException = lExc;
368 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
369 mAsynchronousException =
new exception::ValidationError ();
370 log ( *mAsynchronousException ,
"Exception caught during reply validation for UDP device with URI " ,
Quote ( this->uri() ) ,
"; what returned: " ,
Quote ( aExc.
what() ) );
373 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
375 if ( mAsynchronousException )
377 NotifyConditionalVariable (
true );
381 if ( mReplyQueue.size() )
383 mReplyBuffers = mReplyQueue.front();
384 mReplyQueue.pop_front();
389 mReplyBuffers.reset();
394 if ( !mDispatchBuffers && mDispatchQueue.size() && mPacketsInFlight != this->getMaxNumberOfBuffers() )
396 mDispatchBuffers = mDispatchQueue.front();
397 mDispatchQueue.pop_front();
401 if ( !mDispatchBuffers && !mReplyBuffers )
403 mDeadlineTimer.expires_from_now( boost::posix_time::seconds(60) );
404 NotifyConditionalVariable (
true );
410 template <
typename InnerProtocol >
416 std::lock_guard<std::mutex> lLock ( this->mTransportLayerMutex );
418 if ( mDeadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now() )
421 if ( mDispatchBuffers || mReplyBuffers )
423 log (
Warning() ,
"Closing UDP socket for URI " ,
Quote ( this->uri() ) ,
" since deadline has passed" );
427 log (
Debug() ,
"Closing UDP socket for URI " ,
Quote ( this->uri() ) ,
" since no communication in last 60 seconds" );
435 mDeadlineTimer.expires_at ( boost::posix_time::pos_infin );
439 mDeadlineTimer.async_wait ([
this] (
const boost::system::error_code&) { this->CheckDeadline(); });
443 template <
typename InnerProtocol >
446 WaitOnConditionalVariable();
448 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
449 if ( mAsynchronousException )
451 mAsynchronousException->throwAsDerivedType();
457 template <
typename InnerProtocol >
460 log (
Warning() ,
"Closing Socket since exception detected." );
462 if ( mSocket.is_open() )
468 while ( mSocket.is_open() )
475 NotifyConditionalVariable (
true );
477 if ( mAsynchronousException )
479 delete mAsynchronousException;
480 mAsynchronousException = NULL;
485 mPacketsInFlight = 0;
488 mDispatchBuffers.reset();
490 mReplyBuffers.reset();
492 InnerProtocol::dispatchExceptionHandler();
496 template <
typename InnerProtocol >
500 std::lock_guard<std::mutex> lLock ( mConditionalVariableMutex );
503 mConditionalVariable.notify_one();
507 template <
typename InnerProtocol >
510 std::unique_lock<std::mutex> lLock ( mConditionalVariableMutex );
512 while ( !mFlushDone )
514 mConditionalVariable.wait ( lLock );
void returnBufferToPool(std::shared_ptr< Buffers > &aBuffers)
Function to return a buffer to the buffer pool.
virtual exception::exception * validate(std::shared_ptr< Buffers > aBuffers)
Function which dispatch calls when the reply is received to check that the headers are as expected.
Transport protocol to transfer an IPbus buffer via UDP.
virtual void Flush()
Concrete implementation of the synchronization function to block until all buffers have been sent,...
void write_callback(const boost::system::error_code &aErrorCode, std::size_t aBytesTransferred)
Callback function which is called upon completion of the ASIO async send This, then,...
virtual void dispatchExceptionHandler()
Function which tidies up this protocol layer in the event of an exception.
boost::asio::io_service mIOservice
The boost::asio::io_service used to create the connections.
void write()
Initialize performing the next UDP write operation In multi-threaded mode, this runs the ASIO async s...
void CheckDeadline()
Function called by the ASIO deadline timer.
void implementDispatch(std::shared_ptr< Buffers > aBuffers)
Send the IPbus buffer to the target, read back the response and call the packing-protocol's validate ...
void read_callback(const boost::system::error_code &aErrorCode, std::size_t aBytesTransferred)
Callback function which is called upon completion of the ASIO async receive This, then,...
void NotifyConditionalVariable(const bool &aValue)
Function to set the value of a variable associated with a BOOST conditional-variable and then notify ...
virtual ~UDP()
Destructor.
UDP(const UDP &aUDP)
Copy Constructor This creates a new socket, dispatch queue, dispatch thread, etc.
void WaitOnConditionalVariable()
Function to block a thread pending a BOOST conditional-variable and its associated regular variable.
void read()
Initialize performing the next UDP read operation In multi-threaded mode, this runs the ASIO async re...
void connect()
Set up the UDP socket.
uint32_t getMaxReplySize()
Return the maximum size of reply packet based on the buffer size in the target.
uint32_t getMaxSendSize()
Return the maximum size to be sent based on the buffer size in the target.
An abstract base exception class, including an interface to throw as the derived type (for passing ex...
virtual const char * what() const
Function which returns the error message associated with an exception If no error message has previou...
_Quote< T > Quote(const T &aT)
_Integer< T, IntFmt<> > Integer(const T &aT)
Forward declare a function which creates an instance of the ultra-lightweight wrapper from an integer...
void log(FatalLevel &aFatal, const T0 &aArg0)
Function to add a log entry at Fatal level.
Struct to store a URI when parsed by boost spirit.