39 #include <boost/bind/bind.hpp> 40 #include <boost/lambda/lambda.hpp> 41 #include <boost/asio/connect.hpp> 42 #include <boost/asio/write.hpp> 43 #include <boost/asio/read.hpp> 44 #include <boost/asio/placeholders.hpp> 45 #include "boost/date_time/posix_time/posix_time.hpp" 51 #include "uhal/log/log.hpp" 63 template <
typename InnerProtocol >
65 InnerProtocol ( aId , aUri ),
67 mSocket ( mIOservice ,
boost::asio::ip::udp::endpoint (
boost::asio::ip::udp::v4(), 0 ) ),
68 mEndpoint ( *
boost::asio::ip::udp::resolver ( mIOservice ).resolve (
boost::asio::ip::udp::resolver::query (
boost::asio::ip::udp::v4() , aUri.mHostname , aUri.mPort ) ) ),
69 mDeadlineTimer ( mIOservice ),
70 mReplyMemory ( 1500 , 0x00000000 ),
71 mIOserviceWork ( mIOservice ),
72 mDispatchThread (
boost::bind ( &
boost::asio::io_service::run , & ( mIOservice ) ) ),
75 mPacketsInFlight ( 0 ),
79 mAsynchronousException ( NULL )
85 template <
typename InnerProtocol >
101 catch (
const std::exception& aExc )
109 template <
typename InnerProtocol >
139 template <
typename InnerProtocol >
146 template <
typename InnerProtocol >
153 template <
typename InnerProtocol >
156 log (
Info() ,
"Creating new UDP socket for device " ,
Quote ( this->
uri() ) ,
", as it appears to have been closed..." );
158 mSocket.open ( boost::asio::ip::udp::v4() );
161 log (
Info() ,
"UDP socket created successfully." );
165 template <
typename InnerProtocol >
172 log (
Error() , __PRETTY_FUNCTION__ ,
" called when 'mDispatchBuffers' was NULL" );
185 std::vector< boost::asio::const_buffer > lAsioSendBuffer;
188 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
191 while (
mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
193 log (
Debug() ,
"Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ",
mDeadlineTimer.expires_from_now() ,
")." );
194 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
202 template <
typename InnerProtocol >
218 if (
mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
220 exception::UdpTimeout* lExc =
new exception::UdpTimeout();
221 log ( *lExc ,
"Timeout (" ,
Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) ,
" milliseconds) occurred for UDP send to target with URI: ", this->
uri() );
223 if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
225 log ( *lExc ,
"ASIO reported an error: " ,
Quote ( aErrorCode.message() ) );
233 if ( ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) ) || ( aBytesTransferred !=
mDispatchBuffers->sendCounter() ) )
236 exception::ASIOUdpError* lExc =
new exception::ASIOUdpError();
239 log ( *lExc ,
"Error ",
Quote ( aErrorCode.message() ) ,
" encountered during send to UDP target with URI: " , this->
uri() );
243 log ( *lExc ,
"Only ",
Integer ( aBytesTransferred ) ,
" of " ,
Integer (
mDispatchBuffers->sendCounter() ) ,
" bytes transferred in UDP send to URI: " , this->
uri() );
275 template <
typename InnerProtocol >
280 log (
Error() , __PRETTY_FUNCTION__ ,
" called when 'mReplyBuffers' was NULL" );
285 std::vector<boost::asio::mutable_buffer> lAsioReplyBuffer ( 1 , boost::asio::mutable_buffer ( & (
mReplyMemory.at ( 0 ) ) ,
mReplyBuffers->replyCounter() ) );
287 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
290 while (
mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
292 log (
Debug() ,
"Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ",
mDeadlineTimer.expires_from_now() ,
")." );
293 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
300 template <
typename InnerProtocol >
311 if (
mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
314 log ( *
mAsynchronousException ,
"Timeout (" ,
Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) ,
" milliseconds) occurred for UDP receive from target with URI: ", this->
uri() );
316 if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
328 log (
Error() , __PRETTY_FUNCTION__ ,
" called when 'mReplyBuffers' was NULL" );
334 log (
Error() ,
"Expected " ,
Integer (
mReplyBuffers->replyCounter() ) ,
"-byte UDP payload from target " ,
Quote ( this->
uri() ) ,
", but only received " ,
Integer ( aBytesTransferred ) ,
" bytes. Validating returned data to work out where error occurred." );
337 if ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) )
343 log ( *
mAsynchronousException ,
"Error ",
Quote ( aErrorCode.message() ) ,
" encountered during receive from UDP target with URI: " , this->
uri() );
365 std::deque< std::pair< uint8_t* , uint32_t > >& lReplyBuffers (
mReplyBuffers->getReplyBuffer() );
368 for ( std::deque< std::pair< uint8_t* , uint32_t > >::iterator lIt = lReplyBuffers.begin() ; lIt != lReplyBuffers.end() ; ++lIt )
371 if ( static_cast<uint32_t> ( lReplyBuf - ( &
mReplyMemory.at ( 0 ) ) ) >= aBytesTransferred )
376 uint32_t lNrBytesToCopy =
std::min ( lIt->second , static_cast<uint32_t> ( aBytesTransferred - ( lReplyBuf - ( &
mReplyMemory.at ( 0 ) ) ) ) );
377 memcpy ( lIt->first, lReplyBuf, lNrBytesToCopy );
378 lReplyBuf += lNrBytesToCopy;
428 mDeadlineTimer.expires_from_now( boost::posix_time::seconds(60) );
435 template <
typename InnerProtocol >
443 if (
mDeadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now() )
448 log (
Warning() ,
"Closing UDP socket for URI " ,
Quote ( this->
uri() ) ,
" since deadline has passed" );
452 log (
Debug() ,
"Closing UDP socket for URI " ,
Quote ( this->
uri() ) ,
" since no communication in last 60 seconds" );
469 template <
typename InnerProtocol >
483 template <
typename InnerProtocol >
486 log (
Warning() ,
"Closing Socket since exception detected." );
518 InnerProtocol::dispatchExceptionHandler();
525 template <
typename InnerProtocol >
536 template <
typename InnerProtocol >
std::deque< boost::shared_ptr< Buffers > > mDispatchQueue
The list of buffers still waiting to be sent.
void returnBufferToPool(boost::shared_ptr< Buffers > &aBuffers)
Function to return a buffer to the buffer pool.
uint32_t getMaxSendSize()
Return the maximum size to be sent based on the buffer size in the target.
uint32_t getMaxReplySize()
Return the maximum size of reply packet based on the buffer size in the target.
Transport protocol to transfer an IPbus buffer via UDP.
boost::asio::io_service mIOservice
The boost::asio::io_service used to create the connections.
minutes past the hour formatted as two digits e.g.
uint32_t mPacketsInFlight
Counter of how many writes have been sent, for which no reply has yet been received.
void CheckDeadline()
Function called by the ASIO deadline timer.
void NotifyConditionalVariable(const bool &aValue)
Function to set the value of a variable associated with a BOOST conditional-variable and then notify ...
boost::asio::deadline_timer mDeadlineTimer
The mechanism for providing the time-out.
An abstract base exception class providing an interface to a throw/ThrowAsDerivedType mechanism which...
boost::shared_ptr< Buffers > mReplyBuffers
The receive operation currently in progress or the next to be done.
virtual ~UDP()
Destructor.
void read_callback(const boost::system::error_code &aErrorCode, std::size_t aBytesTransferred)
Callback function which is called upon completion of the ASIO async receive This, then...
virtual void dispatchExceptionHandler()
Function which tidies up this protocol layer in the event of an exception.
boost::shared_ptr< Buffers > mDispatchBuffers
The send operation currently in progress.
boost::mutex mTransportLayerMutex
A MutEx lock used to make sure the access functions are thread safe.
_Quote< T > Quote(const T &aT)
virtual void Flush()
Concrete implementation of the synchronization function to block until all buffers have been sent...
boost::condition_variable mConditionalVariable
A conditional variable for blocking the main thread until the variable with which it is associated is...
UDP(const UDP &aUDP)
Copy Constructor This creates a new socket, dispatch queue, dispatch thread, etc. ...
boost::asio::ip::udp::socket mSocket
A shared pointer to a boost::asio udp socket through which the operation will be performed.
void connect()
Set up the UDP socket.
boost::thread mDispatchThread
The Worker thread in Multi-threaded mode.
virtual const char * what() const
Function which returns the error message associated with an exception If no error message has previou...
void write_callback(const boost::system::error_code &aErrorCode, std::size_t aBytesTransferred)
Callback function which is called upon completion of the ASIO async send This, then, makes a call to read to read back the reply to what has just been sent.
void write()
Initialize performing the next UDP write operation In multi-threaded mode, this runs the ASIO async s...
void read()
Initialize performing the next UDP read operation In multi-threaded mode, this runs the ASIO async re...
std::vector< uint8_t > mReplyMemory
A block of memory into which we write replies, before copying them to their final destination...
std::deque< boost::shared_ptr< Buffers > > mReplyQueue
The list of buffers still awaiting a reply.
bool mFlushDone
A variable associated with the conditional variable which specifies whether all packets have been sen...
boost::asio::ip::udp::endpoint mEndpoint
A shared pointer to a boost::asio udp endpoint - used in the ASIO send and receive functions (UDP has...
void implementDispatch(boost::shared_ptr< Buffers > aBuffers)
Send the IPbus buffer to the target, read back the response and call the packing-protocol's validate ...
virtual exception::exception * validate(boost::shared_ptr< Buffers > aBuffers)
Function which dispatch calls when the reply is received to check that the headers are as expected...
uhal::exception::exception * mAsynchronousException
A pointer to an exception object for passing exceptions from the worker thread to the main thread...
void WaitOnConditionalVariable()
Function to block a thread pending a BOOST conditional-variable and its associated regular variable...
boost::mutex mConditionalVariableMutex
A mutex for use by the conditional variable.
_Integer< T, IntFmt<> > Integer(const T &aT)
Forward declare a function which creates an instance of the ultra-lightweight wrapper from an integer...
Struct to store a URI when parsed by boost spirit.