μHAL (v2.8.17)
Part of the IPbus software repository
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
ProtocolTCP.cpp
Go to the documentation of this file.
1/*
2---------------------------------------------------------------------------
3
4 This file is part of uHAL.
5
6 uHAL is a hardware access library and programming framework
7 originally developed for upgrades of the Level-1 trigger of the CMS
8 experiment at CERN.
9
10 uHAL is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation, either version 3 of the License, or
13 (at your option) any later version.
14
15 uHAL is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with uHAL. If not, see <http://www.gnu.org/licenses/>.
22
23
24 Andrew Rose, Imperial College, London
25 email: awr01 <AT> imperial.ac.uk
26
27 Marc Magrans de Abril, CERN
28 email: marc.magrans.de.abril <AT> cern.ch
29
30---------------------------------------------------------------------------
31*/
32
33#include "uhal/ProtocolTCP.hpp"
34
35
36#include <chrono>
37#include <mutex>
38#include <sys/time.h>
39
40#include <boost/asio/connect.hpp>
41#include <boost/asio/write.hpp>
42#include <boost/asio/read.hpp>
43#include <boost/asio/placeholders.hpp>
44#include <boost/date_time/posix_time/posix_time.hpp>
45
46#include "uhal/Buffers.hpp"
47#include "uhal/grammars/URI.hpp"
50#include "uhal/log/log.hpp"
56
57
58namespace uhal
59{
60
61 template < typename InnerProtocol , std::size_t nr_buffers_per_send >
62 TCP< InnerProtocol, nr_buffers_per_send >::TCP ( const std::string& aId, const URI& aUri ) :
63 InnerProtocol ( aId , aUri ),
64 mMaxPayloadSize (350 * 4),
65 mIOservice ( ),
66 mSocket ( mIOservice ),
67 mEndpoint ( boost::asio::ip::tcp::resolver ( mIOservice ).resolve ( boost::asio::ip::tcp::resolver::query ( aUri.mHostname , aUri.mPort ) ) ),
68 mDeadlineTimer ( mIOservice ),
69 mIOserviceWork ( mIOservice ),
70 mDispatchThread ( [this] () { mIOservice.run(); } ),
71 mDispatchQueue(),
72 mReplyQueue(),
73 mPacketsInFlight ( 0 ),
74 mFlushStarted ( false ),
75 mFlushDone ( true ),
76 mAsynchronousException ( NULL )
77 {
78 mDeadlineTimer.async_wait ([this] (const boost::system::error_code&) { this->CheckDeadline(); });
79
80 // Extract value of 'max_payload_size' attribute, if present
81 for (const auto& lArg : aUri.mArguments) {
82 if (lArg.first == "max_payload_size") {
83 try {
84 mMaxPayloadSize = boost::lexical_cast<size_t>(lArg.second);
85 }
86 catch (const boost::bad_lexical_cast&) {
87 throw exception::InvalidURI("Client URI \"" + this->uri() + "\": Invalid value, \"" + lArg.second + "\", specified for attribute \"" + lArg.first + "\"");
88 }
89 log (Info(), "Client with URI ", Quote(this->uri()), ": Maximum UDP payload size set to ", std::to_string(mMaxPayloadSize), " bytes");
90 }
91 }
92 }
93
94
95 template < typename InnerProtocol , std::size_t nr_buffers_per_send >
97 {
98 try
99 {
100 mSocket.close();
101
102 while ( mSocket.is_open() )
103 {}
104
105 mIOservice.stop();
106 mDispatchThread.join();
107 ClientInterface::returnBufferToPool ( mDispatchQueue );
108 for (size_t i = 0; i < mReplyQueue.size(); i++)
109 ClientInterface::returnBufferToPool ( mReplyQueue.at(i).first );
110 ClientInterface::returnBufferToPool ( mDispatchBuffers );
111 ClientInterface::returnBufferToPool ( mReplyBuffers.first );
112 }
113 catch ( const std::exception& aExc )
114 {
115 log ( Error() , "Exception " , Quote ( aExc.what() ) , " caught in " , Type<TCP< InnerProtocol , nr_buffers_per_send > >(), " destructor" );
116 }
117 }
118
119
120 template < typename InnerProtocol , std::size_t nr_buffers_per_send >
121 void TCP< InnerProtocol , nr_buffers_per_send >::implementDispatch ( std::shared_ptr< Buffers > aBuffers )
122 {
123 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
124
125 if ( mAsynchronousException )
126 {
127 log ( *mAsynchronousException , "Rethrowing Asynchronous Exception from 'implementDispatch' method of " , Type<TCP< InnerProtocol , nr_buffers_per_send > >() );
128 mAsynchronousException->throwAsDerivedType();
129 }
130
131 if ( ! mSocket.is_open() )
132 {
133 connect();
134 }
135
136 mFlushStarted = false;
137 mDispatchQueue.push_back ( aBuffers );
138
139 if ( mDispatchBuffers.empty() && ( mDispatchQueue.size() >= nr_buffers_per_send ) && ( mPacketsInFlight < this->getMaxNumberOfBuffers() ) )
140 {
141 write ( );
142 }
143 }
144
145
146 template < typename InnerProtocol , std::size_t nr_buffers_per_send >
148 {
149 return mMaxPayloadSize;
150 }
151
152
153 template < typename InnerProtocol , std::size_t nr_buffers_per_send >
155 {
156 return mMaxPayloadSize;
157 }
158
159
160 template < typename InnerProtocol , std::size_t nr_buffers_per_send >
162 {
163 log ( Info() , "Attempting to create TCP connection to '" , mEndpoint->host_name() , "' port " , mEndpoint->service_name() , "." );
164 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
165 boost::system::error_code lErrorCode;
166 boost::asio::connect ( mSocket , mEndpoint , lErrorCode );
167
168 if ( lErrorCode )
169 {
170 mSocket.close();
171
172 while ( mSocket.is_open() )
173 {}
174
175 exception::TcpConnectionFailure lExc;
176 std::ostringstream oss;
177
178 if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
179 {
180 oss << "Timeout (" << this->getBoostTimeoutPeriod().total_milliseconds() << " milliseconds) occurred when connecting to ";
181 }
182 else if ( lErrorCode == boost::asio::error::connection_refused )
183 {
184 oss << "Connection refused for ";
185 }
186 else
187 {
188 oss << "Error \"" << lErrorCode.message() << "\" encountered when connecting to ";
189 }
190
191 log ( lExc , oss.str() , ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) ,
192 " at ", Quote ( this->mUri.mHostname + ":" + this->mUri.mPort ) , ". URI=" , Quote ( this->uri() ) );
193 throw lExc;
194 }
195
196 mSocket.set_option ( boost::asio::ip::tcp::no_delay ( true ) );
197 // boost::asio::socket_base::non_blocking_io lNonBlocking ( true );
198 // mSocket.io_control ( lNonBlocking );
199 log ( Info() , "TCP connection succeeded" );
200 }
201
202
203
204 template < typename InnerProtocol , std::size_t nr_buffers_per_send >
206 {
207 NotifyConditionalVariable ( false );
208
209 std::vector< boost::asio::const_buffer > lAsioSendBuffer;
210 lAsioSendBuffer.push_back ( boost::asio::const_buffer ( &mSendByteCounter , 4 ) );
211 mSendByteCounter = 0;
212 std::size_t lNrBuffersToSend = std::min ( mDispatchQueue.size(), nr_buffers_per_send );
213 mDispatchBuffers.reserve ( lNrBuffersToSend );
214
215 for ( std::size_t i = 0; i < lNrBuffersToSend; i++ )
216 {
217 mDispatchBuffers.push_back ( mDispatchQueue.front() );
218 mDispatchQueue.pop_front();
219 const std::shared_ptr<Buffers>& lBuffer = mDispatchBuffers.back();
220 mSendByteCounter += lBuffer->sendCounter();
221 lAsioSendBuffer.push_back ( boost::asio::const_buffer ( lBuffer->getSendBuffer() , lBuffer->sendCounter() ) );
222 }
223
224 log ( Debug() , "Sending " , Integer ( mSendByteCounter ) , " bytes from ", Integer ( mDispatchBuffers.size() ), " buffers" );
225 mSendByteCounter = htonl ( mSendByteCounter );
226
227 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
228
229 // Patch for suspected bug in using boost asio with boost python; see https://svnweb.cern.ch/trac/cactus/ticket/323#comment:7
230 while ( mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
231 {
232 log ( Debug() , "Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ", mDeadlineTimer.expires_from_now() , ")." );
233 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
234 }
235
236 boost::asio::async_write ( mSocket , lAsioSendBuffer , [&] (const boost::system::error_code& e, std::size_t n) { this->write_callback(e, n); });
237 mPacketsInFlight += mDispatchBuffers.size();
238
239 SteadyClock_t::time_point lNow = SteadyClock_t::now();
240 if (mLastSendQueued > SteadyClock_t::time_point())
241 mInterSendTimeStats.add(mLastSendQueued, lNow);
242 mLastSendQueued = lNow;
243 }
244
245
246
247 template < typename InnerProtocol , std::size_t nr_buffers_per_send >
248 void TCP< InnerProtocol , nr_buffers_per_send >::write_callback ( const boost::system::error_code& aErrorCode , std::size_t aBytesTransferred )
249 {
250 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
251 mSendByteCounter = ntohl ( mSendByteCounter );
252
253 if ( mAsynchronousException )
254 {
255 NotifyConditionalVariable ( true );
256 return;
257 }
258
259 if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
260 {
261 mAsynchronousException = new exception::TcpTimeout();
262 log ( *mAsynchronousException , "Timeout (" , Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) , " milliseconds) occurred for send to ",
263 ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) , " with URI: ", this->uri() );
264
265 if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
266 {
267 log ( *mAsynchronousException , "ASIO reported an error: " , Quote ( aErrorCode.message() ) );
268 }
269
270 NotifyConditionalVariable ( true );
271 return;
272 }
273
274 if ( ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) ) || ( aBytesTransferred != ( mSendByteCounter+4 ) ) )
275 {
276 mAsynchronousException = new exception::ASIOTcpError();
277 log ( *mAsynchronousException , "Error ", Quote ( aErrorCode.message() ) , " encountered during send to ",
278 ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) , " with URI: " , this->uri() );
279
280 try
281 {
282 mSocket.close();
283
284 while ( mSocket.is_open() )
285 {}
286 }
287 catch ( const std::exception& aExc )
288 {
289 log ( *mAsynchronousException , "Error closing TCP socket following the ASIO send error" );
290 }
291
292 if ( aBytesTransferred != ( mSendByteCounter + 4 ) )
293 {
294 log ( *mAsynchronousException , "Attempted to send " , Integer ( mSendByteCounter ) , " bytes to ",
295 ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) ,
296 " with URI " , Quote ( this->uri() ) , ", but only sent " , Integer ( aBytesTransferred ) , " bytes" );
297 }
298
299 NotifyConditionalVariable ( true );
300 return;
301 }
302
303 if ( ! mReplyBuffers.first.empty() )
304 {
305 mReplyQueue.push_back ( std::make_pair(mDispatchBuffers, mLastSendQueued) );
306 }
307 else
308 {
309 mReplyBuffers = std::make_pair(mDispatchBuffers, mLastSendQueued);
310 read ( );
311 }
312
313 mDispatchBuffers.clear();
314
315 if ( ( mDispatchQueue.size() >= (mFlushStarted ? 1 : nr_buffers_per_send) ) && ( mPacketsInFlight < this->getMaxNumberOfBuffers() ) )
316 {
317 write();
318 }
319
320 }
321
322
323 template < typename InnerProtocol , std::size_t nr_buffers_per_send >
325 {
326 std::vector< boost::asio::mutable_buffer > lAsioReplyBuffer;
327 lAsioReplyBuffer.push_back ( boost::asio::mutable_buffer ( &mReplyByteCounter , 4 ) );
328 log ( Debug() , "Getting reply byte counter" );
329 boost::asio::ip::tcp::endpoint lEndpoint;
330 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
331
332 // Patch for suspected bug in using boost asio with boost python; see https://svnweb.cern.ch/trac/cactus/ticket/323#comment:7
333 while ( mDeadlineTimer.expires_from_now() < boost::posix_time::microseconds ( 600 ) )
334 {
335 log ( Debug() , "Resetting deadline timer since it just got set to strange value, likely due to a bug within boost (expires_from_now was: ", mDeadlineTimer.expires_from_now() , ")." );
336 mDeadlineTimer.expires_from_now ( this->getBoostTimeoutPeriod() );
337 }
338
339 boost::asio::async_read ( mSocket , lAsioReplyBuffer , boost::asio::transfer_exactly ( 4 ), [&] (const boost::system::error_code& e, std::size_t n) { this->read_callback(e, n); });
340 SteadyClock_t::time_point lNow = SteadyClock_t::now();
341 if (mLastRecvQueued > SteadyClock_t::time_point())
342 mInterRecvTimeStats.add(mLastRecvQueued, lNow);
343 mLastRecvQueued = lNow;
344 }
345
346
347
348
349
350 template < typename InnerProtocol , std::size_t nr_buffers_per_send >
351 void TCP< InnerProtocol , nr_buffers_per_send >::read_callback ( const boost::system::error_code& aErrorCode , std::size_t aBytesTransferred )
352 {
353 std::size_t lNrReplyBuffers = 1;
354 uint32_t lRequestBytes = 0;
355 uint32_t lExpectedReplyBytes = 0;
356
357 for (const auto& lBuf: mReplyBuffers.first)
358 {
359 lNrReplyBuffers += lBuf->getReplyBuffer().size();
360 lRequestBytes += lBuf->sendCounter();
361 lExpectedReplyBytes += lBuf->replyCounter();
362 }
363
364 {
365 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
366 if ( mAsynchronousException )
367 {
368 NotifyConditionalVariable ( true );
369 return;
370 }
371
372 SteadyClock_t::time_point lNow = SteadyClock_t::now();
373
374 if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
375 {
376 exception::TcpTimeout* lExc = new exception::TcpTimeout();
377 log ( *lExc , "Timeout (" , Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) , " ms) occurred for receive (header) from ",
378 ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) , " with URI '", this->uri(), "'. ",
379 Integer(mPacketsInFlight), " packets in flight, ", Integer(mReplyBuffers.first.size()), " in this chunk (",
380 Integer(lRequestBytes), "/", Integer(lExpectedReplyBytes), " bytes sent/expected). Last send / receive queued ",
381 std::chrono::duration<float, std::milli>(lNow - mLastSendQueued).count(), " / ",
382 std::chrono::duration<float, std::milli>(lNow - mLastRecvQueued).count(), " ms ago.");
383
384 log ( Error(), "Extra timeout-related info - round-trip times: ", mRTTStats);
385 log ( Error(), "Extra timeout-related info - send-recv times: ", mLSTStats);
386 log ( Error(), "Extra timeout-related info - inter-send times: ", mInterSendTimeStats);
387 log ( Error(), "Extra timeout-related info - inter-recv times: ", mInterRecvTimeStats);
388
389 if ( aErrorCode && aErrorCode != boost::asio::error::operation_aborted )
390 {
391 log ( *lExc , "ASIO reported an error: " , Quote ( aErrorCode.message() ) );
392 }
393
394 mAsynchronousException = lExc;
395 NotifyConditionalVariable ( true );
396 return;
397 }
398 }
399
400 if ( ( aErrorCode && ( aErrorCode != boost::asio::error::eof ) ) || ( aBytesTransferred != 4 ) )
401 {
402 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
403 mAsynchronousException = new exception::ASIOTcpError();
404
405 if ( aErrorCode )
406 {
407 log ( *mAsynchronousException , "Error ", Quote ( aErrorCode.message() ) , " encountered during receive from ",
408 ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) , " with URI: " , this->uri() );
409 }
410
411 if ( aBytesTransferred != 4 )
412 {
413 log ( *mAsynchronousException, "Expected to receive 4-byte header in async read from ",
414 ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) ,
415 " with URI " , Quote ( this->uri() ) , ", but only received " , Integer ( aBytesTransferred ) , " bytes" );
416 }
417
418 try
419 {
420 mSocket.close();
421 while ( mSocket.is_open() )
422 {}
423 }
424 catch ( const std::exception& aExc )
425 {
426 log ( *mAsynchronousException , "Error closing socket following ASIO read error" );
427 }
428
429 NotifyConditionalVariable ( true );
430 return;
431 }
432
433 SteadyClock_t::time_point lReadHeaderTimestamp = SteadyClock_t::now();
434 mRTTStats.add(mReplyBuffers.second, lReadHeaderTimestamp);
435 mLSTStats.add(mLastSendQueued, lReadHeaderTimestamp);
436
437 mReplyByteCounter = ntohl ( mReplyByteCounter );
438 log ( Debug() , "Byte Counter says " , Integer ( mReplyByteCounter ) , " bytes are coming" );
439 std::vector< boost::asio::mutable_buffer > lAsioReplyBuffer;
440
441 lAsioReplyBuffer.reserve ( lNrReplyBuffers );
442 log ( Debug() , "Expecting " , Integer ( lExpectedReplyBytes ) , " bytes in reply, for ", Integer ( mReplyBuffers.first.size() ), " buffers" );
443
444 for ( std::vector< std::shared_ptr<Buffers> >::const_iterator lBufIt = mReplyBuffers.first.begin(); lBufIt != mReplyBuffers.first.end(); lBufIt++ )
445 {
446 std::deque< std::pair< uint8_t* , uint32_t > >& lReplyBuffers ( ( *lBufIt )->getReplyBuffer() );
447
448 for ( std::deque< std::pair< uint8_t* , uint32_t > >::iterator lIt = lReplyBuffers.begin() ; lIt != lReplyBuffers.end() ; ++lIt )
449 {
450 lAsioReplyBuffer.push_back ( boost::asio::mutable_buffer ( lIt->first , lIt->second ) );
451 }
452 }
453
454 boost::system::error_code lErrorCode = boost::asio::error::would_block;
455 std::size_t lBytesTransferred = boost::asio::read ( mSocket , lAsioReplyBuffer , boost::asio::transfer_exactly ( mReplyByteCounter ), lErrorCode );
456
457 if ( ( lErrorCode && ( lErrorCode != boost::asio::error::eof ) ) || ( lBytesTransferred != mReplyByteCounter ) )
458 {
459 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
460 mSocket.close();
461
462 if ( mDeadlineTimer.expires_at () == boost::posix_time::pos_infin )
463 {
464 mAsynchronousException = new exception::TcpTimeout();
465 log ( *mAsynchronousException , "Timeout (" , Integer ( this->getBoostTimeoutPeriod().total_milliseconds() ) , " milliseconds) occurred for receive (chunk) from ",
466 ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) , " with URI: ", this->uri() );
467 }
468 else
469 {
470 mAsynchronousException = new exception::ASIOTcpError();
471 log ( *mAsynchronousException , "Error ", Quote ( aErrorCode.message() ) , " encountered during receive from ",
472 ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) , " with URI: " , this->uri() );
473 }
474
475 if ( lBytesTransferred != mReplyByteCounter )
476 {
477 log ( *mAsynchronousException, "Expected to receive " , Integer ( mReplyByteCounter ) , " bytes in read from ",
478 ( this->uri().find ( "chtcp-" ) == 0 ? "ControlHub" : "TCP server" ) ,
479 " with URI " , Quote ( this->uri() ) , ", but only received " , Integer ( lBytesTransferred ) , " bytes" );
480 }
481
482 NotifyConditionalVariable ( true );
483 return;
484 }
485
486
487 for (const auto& lBuf: mReplyBuffers.first)
488 {
489 try
490 {
491 if ( uhal::exception::exception* lExc = ClientInterface::validate ( lBuf ) ) //Control of the pointer has been passed back to the client interface
492 {
493 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
494 mAsynchronousException = lExc;
495 }
496 }
497 catch ( exception::exception& aExc )
498 {
499 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
500 mAsynchronousException = new exception::ValidationError ();
501 log ( *mAsynchronousException , "Exception caught during reply validation for TCP device with URI " , Quote ( this->uri() ) , "; what returned: " , Quote ( aExc.what() ) );
502 }
503
504 if ( mAsynchronousException )
505 {
506 NotifyConditionalVariable ( true );
507 return;
508 }
509 }
510
511 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
512 mPacketsInFlight -= mReplyBuffers.first.size();
513
514 if ( mReplyQueue.size() )
515 {
516 mReplyBuffers = mReplyQueue.front();
517 mReplyQueue.pop_front();
518 read();
519 }
520 else
521 {
522 mReplyBuffers.first.clear();
523 }
524
525 if ( mDispatchBuffers.empty() && ( mDispatchQueue.size() >= (mFlushStarted ? 1 : nr_buffers_per_send) ) && ( mPacketsInFlight < this->getMaxNumberOfBuffers() ) )
526 {
527 write();
528 }
529
530 if ( mDispatchBuffers.empty() && mReplyBuffers.first.empty() )
531 {
532 mDeadlineTimer.expires_from_now ( boost::posix_time::seconds(60) );
533 NotifyConditionalVariable ( true );
534 }
535 }
536
537
538
539 template < typename InnerProtocol , std::size_t nr_buffers_per_send >
541 {
542 std::lock_guard<std::mutex> lLock ( this->mTransportLayerMutex );
543
544 // Check whether the deadline has passed. We compare the deadline against the current time since a new asynchronous operation may have moved the deadline before this actor had a chance to run.
545 if ( mDeadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now() )
546 {
547 // SETTING THE EXCEPTION HERE CAN APPEAR AS A TIMEOUT WHEN NONE ACTUALLY EXISTS
548 if ( mDispatchBuffers.size() || mReplyBuffers.first.size() )
549 {
550 log ( Warning() , "Closing TCP socket for device with URI " , Quote ( this->uri() ) , " since deadline has passed" );
551 }
552 else
553 {
554 log ( Debug() , "Closing TCP socket for device with URI " , Quote ( this->uri() ) , " since no communication in 60 seconds" );
555 }
556
557 // The deadline has passed. The socket is closed so that any outstanding asynchronous operations are cancelled.
558 mSocket.close();
559 // There is no longer an active deadline. The expiry is set to positive infinity so that the actor takes no action until a new deadline is set.
560 mDeadlineTimer.expires_at ( boost::posix_time::pos_infin );
561 }
562
563 // Put the actor back to sleep.
564 mDeadlineTimer.async_wait ([this] (const boost::system::error_code&) { this->CheckDeadline(); });
565 }
566
567
568 template < typename InnerProtocol , std::size_t nr_buffers_per_send >
570 {
571 {
572 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
573 mFlushStarted = true;
574
575 if ( mDispatchQueue.size() && mDispatchBuffers.empty() )
576 {
577 write();
578 }
579 }
580
581 WaitOnConditionalVariable();
582
583 std::lock_guard<std::mutex> lLock ( mTransportLayerMutex );
584
585 if ( mAsynchronousException )
586 {
587 mAsynchronousException->throwAsDerivedType();
588 }
589 }
590
591
592 template < typename InnerProtocol , std::size_t nr_buffers_per_send >
594 {
595 if ( mSocket.is_open() )
596 {
597 log ( Warning() , "Closing TCP socket for device with URI " , Quote ( this->uri() ) , " since exception detected." );
598
599 try
600 {
601 mSocket.close();
602
603 while ( mSocket.is_open() )
604 {}
605 }
606 catch ( ... )
607 {}
608 }
609
610 NotifyConditionalVariable ( true );
611
612 if ( mAsynchronousException )
613 {
614 delete mAsynchronousException;
615 mAsynchronousException = NULL;
616 }
617
618 ClientInterface::returnBufferToPool ( mDispatchQueue );
619 for (size_t i = 0; i < mReplyQueue.size(); i++)
620 ClientInterface::returnBufferToPool ( mReplyQueue.at(i).first );
621 mPacketsInFlight = 0;
622 ClientInterface::returnBufferToPool ( mDispatchBuffers );
623 ClientInterface::returnBufferToPool ( mReplyBuffers.first );
624
625 mSendByteCounter = 0;
626 mReplyByteCounter = 0;
627
628 InnerProtocol::dispatchExceptionHandler();
629 }
630
631
632 template < typename InnerProtocol , std::size_t nr_buffers_per_send >
634 {
635 {
636 std::lock_guard<std::mutex> lLock ( mConditionalVariableMutex );
637 mFlushDone = aValue;
638 }
639 mConditionalVariable.notify_one();
640 }
641
642 template < typename InnerProtocol , std::size_t nr_buffers_per_send >
644 {
645 std::unique_lock<std::mutex> lLock ( mConditionalVariableMutex );
646
647 while ( !mFlushDone )
648 {
649 mConditionalVariable.wait ( lLock );
650 }
651 }
652
653
654 template class TCP< IPbus< 1 , 3 > , 1 >;
655 template class TCP< IPbus< 2 , 0 > , 1 >;
656
657 template class TCP< ControlHub < IPbus< 1 , 3 > > , 3 >;
658 template class TCP< ControlHub < IPbus< 2 , 0 > > , 3 >;
659}
660
\rst Wraps a Python iterator so that it can also be used as a C++ input iterator
Definition: pytypes.h:1102
void returnBufferToPool(std::shared_ptr< Buffers > &aBuffers)
Function to return a buffer to the buffer pool.
virtual exception::exception * validate(std::shared_ptr< Buffers > aBuffers)
Function which dispatch calls when the reply is received to check that the headers are as expected.
Transport protocol to transfer an IPbus buffer via TCP.
Definition: ProtocolTCP.hpp:84
uint32_t getMaxSendSize()
Return the maximum size to be sent based on the buffer size in the target.
virtual void dispatchExceptionHandler()
Function which tidies up this protocol layer in the event of an exception.
uint32_t getMaxReplySize()
Return the maximum size of reply packet based on the buffer size in the target.
void read_callback(const boost::system::error_code &aErrorCode, std::size_t aBytesTransferred)
Callback function which is called upon completion of the ASIO async read This, then,...
void CheckDeadline()
Function called by the ASIO deadline timer.
virtual ~TCP()
Destructor.
Definition: ProtocolTCP.cpp:96
TCP(const TCP &aTCP)
Copy Constructor This creates a new socket, dispatch queue, dispatch thread, etc.
void WaitOnConditionalVariable()
Function to block a thread pending a BOOST conditional-variable and its associated regular variable.
void implementDispatch(std::shared_ptr< Buffers > aBuffers)
Send the IPbus buffer to the target, read back the response and call the packing-protocol's validate ...
void NotifyConditionalVariable(const bool &aValue)
Function to set the value of a variable associated with a BOOST conditional-variable and then notify ...
virtual void Flush()
Concrete implementation of the synchronization function to block until all buffers have been sent,...
void connect()
Make the TCP connection.
void read()
Initialize performing the next TCP read operation In multi-threaded mode, this runs the ASIO async re...
boost::asio::io_service mIOservice
The boost::asio::io_service used to create the connections.
void write()
Initialize performing the next TCP write operation In multi-threaded mode, this runs the ASIO async w...
void write_callback(const boost::system::error_code &aErrorCode, std::size_t aBytesTransferred)
Callback function which is called upon completion of the ASIO async write This, then,...
An abstract base exception class, including an interface to throw as the derived type (for passing ex...
Definition: exception.hpp:71
virtual const char * what() const
Function which returns the error message associated with an exception If no error message has previou...
Definition: exception.cpp:87
std::string uri
Definition: test_single.cpp:89
DebugLevel Debug
Definition: LogLevels.cpp:133
_Quote< T > Quote(const T &aT)
_Integer< T, IntFmt<> > Integer(const T &aT)
Forward declare a function which creates an instance of the ultra-lightweight wrapper from an integer...
ErrorLevel Error
Definition: LogLevels.cpp:61
void log(FatalLevel &aFatal, const T0 &aArg0)
Function to add a log entry at Fatal level.
Definition: log.hxx:18
WarningLevel Warning
Definition: LogLevels.cpp:79
InfoLevel Info
Definition: LogLevels.cpp:115
_Type< T > Type()
Struct to store a URI when parsed by boost spirit.
Definition: URI.hpp:50