μHAL (v2.8.17)
Part of the IPbus software repository
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
ProtocolUDP.cpp
Go to the documentation of this file.
1/*
2---------------------------------------------------------------------------
3
4 This file is part of uHAL.
5
6 uHAL is a hardware access library and programming framework
7 originally developed for upgrades of the Level-1 trigger of the CMS
8 experiment at CERN.
9
10 uHAL is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation, either version 3 of the License, or
13 (at your option) any later version.
14
15 uHAL is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with uHAL. If not, see <http://www.gnu.org/licenses/>.
22
23
24 Andrew Rose, Imperial College, London
25 email: awr01 <AT> imperial.ac.uk
26
27 Marc Magrans de Abril, CERN
28 email: marc.magrans.de.abril <AT> cern.ch
29
30---------------------------------------------------------------------------
31*/
32
33#include "uhal/ProtocolUDP.hpp"
34
35
36#include <exception>
37#include <mutex>
38#include <utility>
39
40#include <boost/asio/connect.hpp>
41#include <boost/asio/write.hpp>
42#include <boost/asio/read.hpp>
43#include <boost/asio/placeholders.hpp>
44#include "boost/date_time/posix_time/posix_time.hpp"
45
50#include "uhal/log/log.hpp"
51#include "uhal/grammars/URI.hpp"
52#include "uhal/Buffers.hpp"
54
55
56namespace uhal
57{
58 template < typename InnerProtocol >
59 UDP< InnerProtocol >::UDP ( const std::string& aId, const URI& aUri ) :
60 InnerProtocol ( aId , aUri ),
61 mMaxPayloadSize (350 * 4),
62 mIOservice ( ),
63 mSocket ( mIOservice , boost::asio::ip::udp::endpoint ( boost::asio::ip::udp::v4(), 0 ) ),
64 mEndpoint ( *boost::asio::ip::udp::resolver ( mIOservice ).resolve ( boost::asio::ip::udp::resolver::query ( boost::asio::ip::udp::v4() , aUri.mHostname , aUri.mPort ) ) ),
65 mDeadlineTimer ( mIOservice ),
66 mReplyMemory ( ),
67 mIOserviceWork ( mIOservice ),
68 mDispatchThread ( [this] () { mIOservice.run(); } ),
69 mDispatchQueue(),
70 mReplyQueue(),
71 mPacketsInFlight ( 0 ),
72 mFlushDone ( true ),
73 mAsynchronousException ( NULL )
74 {
75 mDeadlineTimer.async_wait ([this] (const boost::system::error_code&) { this->CheckDeadline(); });
76
77 // Extract value of 'max_payload_size' attribute, if present
78 for (const auto& lArg : aUri.mArguments) {
79 if (lArg.first == "max_payload_size") {
80 try {
81 mMaxPayloadSize = boost::lexical_cast<size_t>(lArg.second);
82 }
83 catch (const boost::bad_lexical_cast&) {
84 throw exception::InvalidURI("Client URI \"" + this->uri() + "\": Invalid value, \"" + lArg.second + "\", specified for attribute \"" + lArg.first + "\"");
85 }
86 log (Info(), "Client with URI ", Quote(this->uri()), ": Maximum UDP payload size set to ", std::to_string(mMaxPayloadSize), " bytes");
87 }
88 else
89 throw exception::InvalidURI("Client URI \"" + this->uri() + "\" has unexpected attribute \"" + lArg.first + "\"");
90 }
91
92 mReplyMemory.resize(mMaxPayloadSize + 20, 0x00000000);
93 }
94
95
96 template < typename InnerProtocol >
98 {
99 try
100 {
101 mSocket.close();
102
103 while ( mSocket.is_open() )
104 {}
105
106 mIOservice.stop();
107 mDispatchThread.join();
108 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
109 ClientInterface::returnBufferToPool ( mDispatchQueue );
111 }
112 catch ( const std::exception& aExc )
113 {
114 log ( Error() , "Exception " , Quote ( aExc.what() ) , " caught in " , Type<UDP< InnerProtocol > >(), " destructor" );
115 }
116 }
117
118
119
120 template < typename InnerProtocol >
121 void UDP< InnerProtocol >::implementDispatch ( std::shared_ptr< Buffers > aBuffers )
122 {
123 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
124
125 if ( mAsynchronousException )
126 {
127 log ( *mAsynchronousException , "Rethrowing Asynchronous Exception from 'implementDispatch' method of " , Type<UDP< InnerProtocol > >() );
128 mAsynchronousException->throwAsDerivedType();
129 }
130
131 if ( ! mSocket.is_open() )
132 {
133 connect();
134 }
135
136
137 if ( mDispatchBuffers || mPacketsInFlight == this->getMaxNumberOfBuffers() )
138 {
139 mDispatchQueue.push_back ( aBuffers );
140 }
141 else
142 {
143 mDispatchBuffers = aBuffers;
144 write ( );
145 }
146 }
147
148
149 template < typename InnerProtocol >
151 {
152 return mMaxPayloadSize;
153 }
154
155
156 template < typename InnerProtocol >
158 {
159 return mMaxPayloadSize;
160 }
161
162
163 template < typename InnerProtocol >
165 {
166 log ( Info() , "Creating new UDP socket for device " , Quote ( this->uri() ) , ", as it appears to have been closed..." );
167 //mSocket = boost::asio::ip::udp::socket ( mIOservice , boost::asio::ip::udp::endpoint ( boost::asio::ip::udp::v4(), 0 ) );
168 mSocket.open ( boost::asio::ip::udp::v4() );
169 // boost::asio::socket_base::non_blocking_io lNonBlocking ( true );
170 // mSocket.io_control ( lNonBlocking );
171 log ( Info() , "UDP socket created successfully." );
172 }
173
174
175 template < typename InnerProtocol >
177 {
178 NotifyConditionalVariable ( false );
179
180 if ( !mDispatchBuffers )
181 {
182 log ( Error() , __PRETTY_FUNCTION__ , " called when 'mDispatchBuffers' was NULL" );
183 return;
184 }
185
186 std::vector< boost::asio::const_buffer > lAsioSendBuffer;
187 lAsioSendBuffer.push_back ( boost::asio::const_buffer ( mDispatchBuffers->getSendBuffer() , mDispatchBuffers->sendCounter() ) );
188 log ( Debug() , "Sending " , Integer ( mDispatchBuffers->sendCounter() ) , " bytes" );
189 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
190
191 // Patch for suspected bug in using boost asio with boost python; see https://svnweb.cern.ch/trac/cactus/ticket/323#comment:7
192 while ( mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
193 {
194 log ( Debug() , "Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ", mDeadlineTimer.expires_from_now() , ")." );
195 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
196 }
197
198 mSocket.async_send_to ( lAsioSendBuffer , mEndpoint , [&] (const boost::system::error_code& e, std::size_t n) { this->write_callback(e, n); });
199 mPacketsInFlight++;
200 }
201
202
203 template < typename InnerProtocol >
204 void UDP< InnerProtocol >::write_callback ( const boost::system::error_code& aErrorCode , std::size_t aBytesTransferred )
205 {
206 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
207
208 if ( mAsynchronousException )
209 {
210 NotifyConditionalVariable ( true );
211 return;
212 }
213
214 if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
215 {
216 exception::UdpTimeout* lExc = new exception::UdpTimeout();
217 log ( *lExc , "Timeout (" , Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) , " milliseconds) occurred for UDP send to target with URI: ", this->uri() );
218
219 if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
220 {
221 log ( *lExc , "ASIO reported an error: " , Quote ( aErrorCode.message() ) );
222 }
223
224 mAsynchronousException = lExc;
225 NotifyConditionalVariable ( true );
226 return;
227 }
228
229 if ( ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) ) || ( aBytesTransferred != mDispatchBuffers->sendCounter() ) )
230 {
231 mSocket.close();
232 exception::ASIOUdpError* lExc = new exception::ASIOUdpError();
233 if ( aErrorCode )
234 {
235 log ( *lExc , "Error ", Quote ( aErrorCode.message() ) , " encountered during send to UDP target with URI: " , this->uri() );
236 }
237 if ( aBytesTransferred != mDispatchBuffers->sendCounter() )
238 {
239 log ( *lExc , "Only ", Integer ( aBytesTransferred ) , " of " , Integer ( mDispatchBuffers->sendCounter() ) , " bytes transferred in UDP send to URI: " , this->uri() );
240 }
241 mAsynchronousException = lExc;
242 NotifyConditionalVariable ( true );
243 return;
244 }
245
246 if ( mReplyBuffers )
247 {
248 mReplyQueue.push_back ( mDispatchBuffers );
249 }
250 else
251 {
252 mReplyBuffers = mDispatchBuffers;
253 read ( );
254 }
255
256 if ( mDispatchQueue.size() && mPacketsInFlight != this->getMaxNumberOfBuffers() )
257 {
258 mDispatchBuffers = mDispatchQueue.front();
259 mDispatchQueue.pop_front();
260 write();
261 }
262 else
263 {
264 mDispatchBuffers.reset();
265 }
266 }
267
268
269 template < typename InnerProtocol >
271 {
272 if ( !mReplyBuffers )
273 {
274 log ( Error() , __PRETTY_FUNCTION__ , " called when 'mReplyBuffers' was NULL" );
275 NotifyConditionalVariable ( true );
276 return;
277 }
278
279 std::vector<boost::asio::mutable_buffer> lAsioReplyBuffer ( 1 , boost::asio::mutable_buffer ( & ( mReplyMemory.at ( 0 ) ) , mReplyBuffers->replyCounter() ) );
280 log ( Debug() , "Expecting " , Integer ( mReplyBuffers->replyCounter() ) , " bytes in reply." );
281 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
282
283 // Patch for suspected bug in using boost asio with boost python; see https://svnweb.cern.ch/trac/cactus/ticket/323#comment:7
284 while ( mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
285 {
286 log ( Debug() , "Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ", mDeadlineTimer.expires_from_now() , ")." );
287 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
288 }
289
290 mSocket.async_receive ( lAsioReplyBuffer , 0 , [&] (const boost::system::error_code& e, std::size_t n) { this->read_callback(e, n); });
291 }
292
293
294 template < typename InnerProtocol >
295 void UDP< InnerProtocol >::read_callback ( const boost::system::error_code& aErrorCode , std::size_t aBytesTransferred )
296 {
297 {
298 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
299 if ( mAsynchronousException )
300 {
301 NotifyConditionalVariable ( true );
302 return;
303 }
304
305 if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
306 {
307 mAsynchronousException = new exception::UdpTimeout();
308 log ( *mAsynchronousException , "Timeout (" , Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) , " milliseconds) occurred for UDP receive from target with URI: ", this->uri() );
309
310 if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
311 {
312 log ( *mAsynchronousException , "ASIO reported an error: " , Quote ( aErrorCode.message() ) );
313 }
314
315 NotifyConditionalVariable ( true );
316 return;
317 }
318 }
319
320 if ( !mReplyBuffers )
321 {
322 log ( Error() , __PRETTY_FUNCTION__ , " called when 'mReplyBuffers' was NULL" );
323 return;
324 }
325
326 if ( aBytesTransferred != mReplyBuffers->replyCounter() )
327 {
328 log ( Error() , "Expected " , Integer ( mReplyBuffers->replyCounter() ) , "-byte UDP payload from target " , Quote ( this->uri() ) , ", but only received " , Integer ( aBytesTransferred ) , " bytes. Validating returned data to work out where error occurred." );
329 }
330
331 if ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) )
332 {
333 mSocket.close();
334
335 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
336 mAsynchronousException = new exception::ASIOUdpError();
337 log ( *mAsynchronousException , "Error ", Quote ( aErrorCode.message() ) , " encountered during receive from UDP target with URI: " , this->uri() );
338
339 NotifyConditionalVariable ( true );
340 return;
341 }
342
343
344 std::deque< std::pair< uint8_t* , uint32_t > >& lReplyBuffers ( mReplyBuffers->getReplyBuffer() );
345 uint8_t* lReplyBuf ( & ( mReplyMemory.at ( 0 ) ) );
346
347 for (const auto& lBuffer: lReplyBuffers)
348 {
349 // Don't copy more of mReplyMemory than was written to, for cases when less data received than expected
350 if ( static_cast<uint32_t> ( lReplyBuf - ( & mReplyMemory.at ( 0 ) ) ) >= aBytesTransferred )
351 break;
352
353 uint32_t lNrBytesToCopy = std::min ( lBuffer.second , static_cast<uint32_t> ( aBytesTransferred - ( lReplyBuf - ( & mReplyMemory.at ( 0 ) ) ) ) );
354 memcpy ( lBuffer.first, lReplyBuf, lNrBytesToCopy );
355 lReplyBuf += lNrBytesToCopy;
356 }
357
358 try
359 {
360 if ( uhal::exception::exception* lExc = ClientInterface::validate ( mReplyBuffers ) ) //Control of the pointer has been passed back to the client interface
361 {
362 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
363 mAsynchronousException = lExc;
364 }
365 }
366 catch ( exception::exception& aExc )
367 {
368 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
369 mAsynchronousException = new exception::ValidationError ();
370 log ( *mAsynchronousException , "Exception caught during reply validation for UDP device with URI " , Quote ( this->uri() ) , "; what returned: " , Quote ( aExc.what() ) );
371 }
372
373 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
374
375 if ( mAsynchronousException )
376 {
377 NotifyConditionalVariable ( true );
378 return;
379 }
380
381 if ( mReplyQueue.size() )
382 {
383 mReplyBuffers = mReplyQueue.front();
384 mReplyQueue.pop_front();
385 read();
386 }
387 else
388 {
389 mReplyBuffers.reset();
390 }
391
392 mPacketsInFlight--;
393
394 if ( !mDispatchBuffers && mDispatchQueue.size() && mPacketsInFlight != this->getMaxNumberOfBuffers() )
395 {
396 mDispatchBuffers = mDispatchQueue.front();
397 mDispatchQueue.pop_front();
398 write();
399 }
400
401 if ( !mDispatchBuffers && !mReplyBuffers )
402 {
403 mDeadlineTimer.expires_from_now( boost::posix_time::seconds(60) );
404 NotifyConditionalVariable ( true );
405 }
406 }
407
408
409
410 template < typename InnerProtocol >
412 {
413 // Check whether the deadline has passed. We compare the deadline against
414 // the current time since a new asynchronous operation may have moved the
415 // deadline before this actor had a chance to run.
416 std::lock_guard<std::mutex> lLock ( this->mTransportLayerMutex );
417
418 if ( mDeadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now() )
419 {
420 // SETTING THE EXCEPTION HERE CAN APPEAR AS A TIMEOUT WHEN NONE ACTUALLY EXISTS
421 if ( mDispatchBuffers || mReplyBuffers )
422 {
423 log ( Warning() , "Closing UDP socket for URI " , Quote ( this->uri() ) , " since deadline has passed" );
424 }
425 else
426 {
427 log ( Debug() , "Closing UDP socket for URI " , Quote ( this->uri() ) , " since no communication in last 60 seconds" );
428 }
429
430 // The deadline has passed. The socket is closed so that any outstanding
431 // asynchronous operations are cancelled.
432 mSocket.close();
433 // There is no longer an active deadline. The expiry is set to positive
434 // infinity so that the actor takes no action until a new deadline is set.
435 mDeadlineTimer.expires_at ( boost::posix_time::pos_infin );
436 }
437
438 // Put the actor back to sleep.
439 mDeadlineTimer.async_wait ([this] (const boost::system::error_code&) { this->CheckDeadline(); });
440 }
441
442
443 template < typename InnerProtocol >
445 {
446 WaitOnConditionalVariable();
447
448 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
449 if ( mAsynchronousException )
450 {
451 mAsynchronousException->throwAsDerivedType();
452 }
453 }
454
455
456
457 template < typename InnerProtocol >
459 {
460 log ( Warning() , "Closing Socket since exception detected." );
461
462 if ( mSocket.is_open() )
463 {
464 try
465 {
466 mSocket.close();
467
468 while ( mSocket.is_open() )
469 {}
470 }
471 catch ( ... )
472 {}
473 }
474
475 NotifyConditionalVariable ( true );
476
477 if ( mAsynchronousException )
478 {
479 delete mAsynchronousException;
480 mAsynchronousException = NULL;
481 }
482
483 ClientInterface::returnBufferToPool ( mDispatchQueue );
485 mPacketsInFlight = 0;
486
487 ClientInterface::returnBufferToPool ( mDispatchBuffers );
488 mDispatchBuffers.reset();
489 ClientInterface::returnBufferToPool ( mReplyBuffers );
490 mReplyBuffers.reset();
491
492 InnerProtocol::dispatchExceptionHandler();
493 }
494
495
496 template < typename InnerProtocol >
498 {
499 {
500 std::lock_guard<std::mutex> lLock ( mConditionalVariableMutex );
501 mFlushDone = aValue;
502 }
503 mConditionalVariable.notify_one();
504 }
505
506
507 template < typename InnerProtocol >
509 {
510 std::unique_lock<std::mutex> lLock ( mConditionalVariableMutex );
511
512 while ( !mFlushDone )
513 {
514 mConditionalVariable.wait ( lLock );
515 }
516 }
517
518
519 template class UDP< IPbus< 1 , 3 > >;
520 template class UDP< IPbus< 2 , 0 > >;
521}
522
void returnBufferToPool(std::shared_ptr< Buffers > &aBuffers)
Function to return a buffer to the buffer pool.
virtual exception::exception * validate(std::shared_ptr< Buffers > aBuffers)
Function which dispatch calls when the reply is received to check that the headers are as expected.
Transport protocol to transfer an IPbus buffer via UDP.
Definition: ProtocolUDP.hpp:83
virtual void Flush()
Concrete implementation of the synchronization function to block until all buffers have been sent,...
void write_callback(const boost::system::error_code &aErrorCode, std::size_t aBytesTransferred)
Callback function which is called upon completion of the ASIO async send This, then,...
virtual void dispatchExceptionHandler()
Function which tidies up this protocol layer in the event of an exception.
boost::asio::io_service mIOservice
The boost::asio::io_service used to create the connections.
void write()
Initialize performing the next UDP write operation In multi-threaded mode, this runs the ASIO async s...
void CheckDeadline()
Function called by the ASIO deadline timer.
void implementDispatch(std::shared_ptr< Buffers > aBuffers)
Send the IPbus buffer to the target, read back the response and call the packing-protocol's validate ...
void read_callback(const boost::system::error_code &aErrorCode, std::size_t aBytesTransferred)
Callback function which is called upon completion of the ASIO async receive This, then,...
void NotifyConditionalVariable(const bool &aValue)
Function to set the value of a variable associated with a BOOST conditional-variable and then notify ...
virtual ~UDP()
Destructor.
Definition: ProtocolUDP.cpp:97
UDP(const UDP &aUDP)
Copy Constructor This creates a new socket, dispatch queue, dispatch thread, etc.
void WaitOnConditionalVariable()
Function to block a thread pending a BOOST conditional-variable and its associated regular variable.
void read()
Initialize performing the next UDP read operation In multi-threaded mode, this runs the ASIO async re...
void connect()
Set up the UDP socket.
uint32_t getMaxReplySize()
Return the maximum size of reply packet based on the buffer size in the target.
uint32_t getMaxSendSize()
Return the maximum size to be sent based on the buffer size in the target.
An abstract base exception class, including an interface to throw as the derived type (for passing ex...
Definition: exception.hpp:71
virtual const char * what() const
Function which returns the error message associated with an exception If no error message has previou...
Definition: exception.cpp:87
std::string uri
Definition: test_single.cpp:89
DebugLevel Debug
Definition: LogLevels.cpp:133
_Quote< T > Quote(const T &aT)
_Integer< T, IntFmt<> > Integer(const T &aT)
Forward declare a function which creates an instance of the ultra-lightweight wrapper from an integer...
ErrorLevel Error
Definition: LogLevels.cpp:61
void log(FatalLevel &aFatal, const T0 &aArg0)
Function to add a log entry at Fatal level.
Definition: log.hxx:18
WarningLevel Warning
Definition: LogLevels.cpp:79
InfoLevel Info
Definition: LogLevels.cpp:115
_Type< T > Type()
Struct to store a URI when parsed by boost spirit.
Definition: URI.hpp:50