IT++ Logo
tcp.cpp
Go to the documentation of this file.
1 
32 #include <itpp/protocol/tcp.h>
33 #include <itpp/base/itfile.h>
34 #include <limits>
35 #include <cstdlib>
36 #include <ctime>
37 
39 
40 #ifdef _MSC_VER
41 #pragma warning(disable:4355)
42 #endif
43 
44 namespace itpp
45 {
46 
47 // -------------------- Default parameters ----------------------------------
48 
49 // TCP sender and receiver
50 
51 #define TCP_HEADERLENGTH 40
52 
53 // TCP sender
54 
55 #define TCP_VERSION kReno
56 #define TCP_SMSS 1460
57 #define TCP_INITIALCWNDREL 2 // related to MSS
58 #define TCP_INITIALSSTHRESHREL 1 // related to MaxCWnd
59 #define TCP_MAXCWNDREL 32 // related to MSS
60 #define TCP_DUPACKS 3
61 #define TCP_INITIALRTT 1
62 const double TCP_STIMERGRAN = 0.2;
63 const double TCP_SWSATIMERVALUE = 0.2;
64 #define TCP_MAXBACKOFF 64
65 const double TCP_MAXRTO = std::numeric_limits<double>::max();
66 #define TCP_IMMEDIATEBACKOFFRESET false
67 #define TCP_TIMESTAMPS false
68 #define TCP_KARN true
69 #define TCP_NAGLE false
70 #define TCP_GOBACKN true
71 #define TCP_FLIGHTSIZERECOVERY false
72 #define TCP_RENOCONSERVATION true
73 #define TCP_CAREFULSSTHRESHREDUCTION true
74 #define TCP_IGNOREDUPACKONTORECOVERY true
75 #define TCP_CAREFULMULFASTRTXAVOIDANCE true
76 #define TCP_RESTARTAFTERIDLE true
77 
78 // TCP receiver
79 
80 #define TCP_RMSS 1460
81 const int TCP_BUFFERSIZE = std::numeric_limits<int>::max() / 4;
82 #define TCP_DELAYEDACK true
83 const double TCP_ACKDELAYTIME = 0.2;
84 #define TCP_SENDPERIODICACKS false
85 #define TCP_STRICTPERIODICACKS false
86 #define TCP_PERIODICACKINTERVAL 1
87 #define TCP_ACKSCHEDULINGDELAY 0
88 #define TCP_ACKBUFFERWRITE false
89 #define TCP_ACKBUFFERREAD true
90 const int TCP_MAXUSERBLOCKSIZE = std::numeric_limits<int>::max() / 4;
91 #define TCP_MINUSERBLOCKSIZE 1
92 #define TCP_USERBLOCKPROCDELAY 0
93 
94 // TCP generator
95 
96 #define TCPGEN_BLOCKSIZE 1460
97 
98 // TCP applications
99 
100 #define TCPAPP_MAXNOOFACTIVEAPPS 500
101 #define TCPAPP_DISTSTATARRAYSIZE 100
102 #define TCPAPP_DISTSTATMAXGOODPUT 1000
103 #define TCPAPP_DISTSTATMAXTRANSFERTIME 10000
104 #define TCPAPP_CONDMEANSTATARRAYSIZE 100
105 #define TCPAPP_CONDMEANSTATMAXREQSIZE 100000
106 
107 
108 
109 inline int min(int opd1, int opd2)
110 {
111  return (opd1 < opd2) ? opd1 : opd2;
112 }
113 
114 
115 inline int max(int opd1, int opd2)
116 {
117  return (opd1 > opd2) ? opd1 : opd2;
118 }
119 
120 
121 // round is used to map a double value (e.g. RTO in TTCPSender) to the
122 // next higher value of a certain granularity (e.g. timer granularity).
123 inline double round(const double value, const double granularity)
124 {
125  return (std::ceil(value / granularity) * granularity);
126 }
127 
128 // -------------------- TCP_Segment ----------------------------------------
129 
131  seq_begin(),
132  seq_end()
133 {
134 }
135 
136 TCP_Segment::TCP_Segment(const Sequence_Number &sn_begin, const Sequence_Number &sn_end) :
137  seq_begin(sn_begin),
138  seq_end(sn_end)
139 {
140  it_assert(seq_begin <= seq_end, "TCP_Segment::TCP_Segment, end byte " + to_str(seq_end.value()) +
141  " < begin byte " + to_str(seq_begin.value()));
142 }
143 
144 
145 TCP_Segment::TCP_Segment(const TCP_Segment &segment) :
146  seq_begin(segment.seq_begin),
147  seq_end(segment.seq_end)
148 {
149 }
150 
151 
152 TCP_Segment &TCP_Segment::operator=(const TCP_Segment &segment)
153 {
154  this->seq_begin = segment.seq_begin;
155  this->seq_end = segment.seq_end;
156 
157  return *this;
158 }
159 
160 
161 void TCP_Segment::combine(const TCP_Segment &segment)
162 {
163  it_assert(can_be_combined(segment), "TCP_Segment::CombineWith, segments cannot be combined");
164 
165  seq_begin = min(seq_begin, segment.seq_begin);
166  seq_end = max(seq_end, segment.seq_end);
167 }
168 
169 
170 std::ostream & operator<<(std::ostream &os, const TCP_Segment &segment)
171 {
172  os << "(" << segment.seq_begin << "," << segment.seq_end << ")";
173  return os;
174 }
175 
176 
177 // -------------------- TCP_Packet ----------------------------------------
179  fSegment(),
180  fACK(),
181  fWnd(0),
182  fSessionId(0),
183  fInfo(0)
184 {
185 }
186 
187 
188 TCP_Packet::TCP_Packet(const TCP_Packet &packet) :
189  fSegment(packet.fSegment),
190  fACK(packet.fACK),
191  fWnd(packet.fWnd),
192  fSessionId(packet.fSessionId),
193  fInfo(0)
194 {
195  std::cout << "TCP_Packet::TCP_Packet ############" << " ";
196 
197  if (packet.fInfo != 0) {
198  std::cout << "TCP_Packet::TCP_Packet rhs.fInfo ###########" << " ";
199  fInfo = new TDebugInfo(*packet.fInfo);
200  }
201 }
202 
203 
205 {
206  delete fInfo;
207 }
208 
209 
210 TCP_Packet & TCP_Packet::clone() const
211 {
212  return *new TCP_Packet(*this);
213 }
214 
215 
216 void TCP_Packet::set_info(unsigned ssThresh, unsigned recWnd, unsigned cWnd,
217  double estRTT, Sequence_Number sndUna,
218  Sequence_Number sndNxt, bool isRtx)
219 {
220  if (fInfo == 0) {
221  fInfo = new TDebugInfo;
222  }
223 
224  fInfo->fSSThresh = ssThresh;
225  fInfo->fRecWnd = recWnd;
226  fInfo->fCWnd = cWnd;
227  fInfo->fRTTEstimate = estRTT;
228  fInfo->fSndUna = sndUna;
229  fInfo->fSndNxt = sndNxt;
230  fInfo->fRtxFlag = isRtx;
231 }
232 
233 
234 void TCP_Packet::print_header(std::ostream &) const
235 {
236  std::cout << "Hello!\n";
237 
238  std::cout << "Ses = " << get_session_id() << " ";
239 
240  std::cout << "Segment = " << get_segment() << " "
241  << "ACK = " << get_ACK() << " "
242  << "Wnd = " << get_wnd() << " ";
243 
244  std::cout << "DestPort = " << fDestinationPort << " "
245  << "SourcePort = " << fSourcePort << " ";
246 
247 
248  if (fInfo != 0) {
249  std::cout << "SndSSThresh = " << fInfo->fSSThresh << " ";
250  std::cout << "RecWnd = " << fInfo->fRecWnd << " ";
251  std::cout << "SndCWnd = " << fInfo->fCWnd << " ";
252  std::cout << "RTTEstimate = " << fInfo->fRTTEstimate << " ";
253  std::cout << "RtxFlag = " << fInfo->fRtxFlag;
254  }
255  else
256  std::cout << "fInfo = " << fInfo << " ";
257 
258  std::cout << std::endl;
259 
260 }
261 
262 
263 
264 std::ostream & operator<<(std::ostream & out, TCP_Packet & msg)
265 {
266  msg.print_header(out);
267  return out;
268 }
269 
270 
271 // -------------------- TCP_Sender ----------------------------------------
272 TCP_Sender::TCP_Sender(int label) :
273  fLabel(label),
274  fTCPVersion(TCP_VERSION),
275  fMSS(TCP_SMSS),
276  fTCPIPHeaderLength(TCP_HEADERLENGTH),
277  fInitialRTT(TCP_INITIALRTT),
278  fInitialCWnd(0), // default initialization see below
279  fInitialSSThresh(0), // default initialization see below
280  fMaxCWnd(0), // default initialization see below
281  fDupACKThreshold(TCP_DUPACKS),
282  fTimerGranularity(TCP_STIMERGRAN),
283  fMaxRTO(TCP_MAXRTO),
284  fMaxBackoff(TCP_MAXBACKOFF),
285  fImmediateBackoffReset(TCP_IMMEDIATEBACKOFFRESET),
286  fKarn(TCP_KARN),
287  fGoBackN(TCP_GOBACKN),
288  fFlightSizeRecovery(TCP_FLIGHTSIZERECOVERY),
289  fRenoConservation(TCP_RENOCONSERVATION),
290  fCarefulSSThreshReduction(TCP_CAREFULSSTHRESHREDUCTION),
291  fIgnoreDupACKOnTORecovery(TCP_IGNOREDUPACKONTORECOVERY),
292  fCarefulMulFastRtxAvoidance(TCP_CAREFULMULFASTRTXAVOIDANCE),
293  fNagle(TCP_NAGLE),
294  fSWSATimerValue(TCP_SWSATIMERVALUE),
295  fRestartAfterIdle(TCP_RESTARTAFTERIDLE),
296  fDebug(false),
297  fTrace(false),
298  fSessionId(0),
299  fRtxTimer(*this, &TCP_Sender::HandleRtxTimeout),
300  fSWSATimer(*this, &TCP_Sender::HandleSWSATimeout)/*,*/
301 {
302 
303  // default values and parameter check for MaxCWND, InitCWND, InitSSThresh
304  if (fMaxCWnd == 0) {
305  fMaxCWnd = (unsigned)(TCP_MAXCWNDREL * fMSS);
306  }
307  else if (fMaxCWnd < fMSS) {
308  // throw (UL_CException("TCP_Sender::TCP_Sender",
309  // "MaxCWnd must be >= MSS"));
310  }
311 
312  if (fInitialCWnd == 0) {
313  fInitialCWnd = (unsigned)(TCP_INITIALCWNDREL * fMSS);
314  }
315  else if ((fInitialCWnd < fMSS) || (fInitialCWnd > fMaxCWnd)) {
316  // throw (UL_CException("TCP_Sender::TCP_Sender",
317  // "initial CWnd must be >= MSS and <= MaxCWnd"));
318  }
319 
320  if ((fInitialSSThresh == 0) && (fMaxCWnd >= 2 * fMSS)) {
321  fInitialSSThresh = (unsigned)(TCP_INITIALSSTHRESHREL * fMaxCWnd);
322  }
323  else if ((fInitialSSThresh < 2*fMSS) || (fInitialCWnd > fMaxCWnd)) {
324  // throw (UL_CException("TCP_Sender::TCP_Sender",
325  // "initial CWnd must be >= 2*MSS and <= MaxCWnd"));
326  }
327 
328  setup();
329 
330  InitStatistics();
331 
332 
333  tcp_send.set_name("TCP Send");
334  tcp_receive_ack.forward(this, &TCP_Sender::ReceiveMessageFromNet);
335  tcp_receive_ack.set_name("TCP ACK");
336  tcp_socket_write.forward(this, &TCP_Sender::HandleUserMessageIndication);
337  tcp_socket_write.set_name("SocketWrite");
338  tcp_release.forward(this, &TCP_Sender::release);
339  tcp_release.set_name("Release");
340 
341 }
342 
343 
345 {
346 }
347 
348 void TCP_Sender::set_debug(const bool enable_debug)
349 {
350  fDebug = enable_debug;
351  tcp_send.set_debug(enable_debug);
352 }
353 
354 void TCP_Sender::set_debug(bool enable_debug, bool enable_signal_debug)
355 {
356  fDebug = enable_debug;
357  tcp_send.set_debug(enable_signal_debug);
358 }
359 
360 void TCP_Sender::set_trace(const bool enable_trace)
361 {
362  fTrace = enable_trace;
363 }
364 
365 void TCP_Sender::set_label(int label)
366 {
367  fLabel = label;
368 }
369 
370 void TCP_Sender::setup()
371 {
372  fSndUna = 0;
373  fSndNxt = 0;
374  fSndMax = 0;
375  fMaxRecWnd = 0;
376  fRecWnd = fMaxCWnd;
377  fUserNxt = 0;
378  fCWnd = fInitialCWnd;
379  fSSThresh = fInitialSSThresh;
380  fRecoveryDupACK = 0;
381  fRecoveryTO = 0;
382  fDupACKCnt = 0;
383 
384  // timers
385  fBackoff = 1;
386  fPendingBackoffReset = false;
387  fLastSendTime = Event_Queue::now();
388 
389  // RTT measurement
390  fTimUna = 0;
391  fSRTT = 0;
392  fRTTVar = 0;
393  fRTTEstimate = fInitialRTT;
394  fRTTMPending = false;
395  fRTTMByte = 0;
396 
397  CWnd_val.set_size(1000);
398  CWnd_val.zeros();
399  CWnd_time.set_size(1000);
400  CWnd_time.zeros();
401  CWnd_val(0) = fInitialCWnd;
402  CWnd_time(0) = 0;
403  CWnd_index = 1;
404 
405  SSThresh_val.set_size(1000);
406  SSThresh_val.zeros();
407  SSThresh_time.set_size(1000);
408  SSThresh_time.zeros();
409  SSThresh_val(0) = fInitialSSThresh;
410  SSThresh_time(0) = 0;
411  SSThresh_index = 1;
412 
413  sent_seq_num_val.set_size(1000);
414  sent_seq_num_val.zeros();
415  sent_seq_num_time.set_size(1000);
416  sent_seq_num_time.zeros();
417  sent_seq_num_val(0) = 0;
418  sent_seq_num_time(0) = 0;
419  sent_seq_num_index = 1;
420 
421  sender_recv_ack_seq_num_val.set_size(1000);
422  sender_recv_ack_seq_num_val.zeros();
423  sender_recv_ack_seq_num_time.set_size(1000);
424  sender_recv_ack_seq_num_time.zeros();
425  sender_recv_ack_seq_num_val(0) = 0;
426  sender_recv_ack_seq_num_time(0) = 0;
427  sender_recv_ack_seq_num_index = 1;
428 
429  RTTEstimate_val.set_size(1000);
430  RTTEstimate_val.zeros();
431  RTTEstimate_time.set_size(1000);
432  RTTEstimate_time.zeros();
433  RTTEstimate_val(0) = fInitialRTT;
434  RTTEstimate_time(0) = 0;
435  RTTEstimate_index = 1;
436 
437  RTTsample_val.set_size(1000);
438  RTTsample_val.zeros();
439  RTTsample_time.set_size(1000);
440  RTTsample_time.zeros();
441  RTTsample_val(0) = 0;
442  RTTsample_time(0) = 0;
443  RTTsample_index = 1;
444 
445 }
446 
447 std::string TCP_Sender::GenerateFilename()
448 {
449  time_t rawtime;
450 #ifndef _MSC_VER
451  struct tm *timeinfo;
452  timeinfo = localtime(&rawtime);
453 #else
454  time(&rawtime);
455  struct tm _timeinfo;
456  struct tm *timeinfo = &_timeinfo;
457  localtime_s(timeinfo, &rawtime);
458 #endif
459  std::ostringstream filename_stream;
460  filename_stream << "trace_tcp_sender_u" << fLabel
461  << "_" << 1900 + timeinfo->tm_year
462  << "_" << timeinfo->tm_mon
463  << "_" << timeinfo->tm_mday
464  << "__" << timeinfo->tm_hour
465  << "_" << timeinfo->tm_min
466  << "_" << timeinfo->tm_sec
467  << "_.it";
468  return filename_stream.str();
469 }
470 
471 
472 void TCP_Sender::release(std::string file)
473 {
474  std::string filename;
475  fSessionId++;
476 
477  fRtxTimer.Reset();
478  fSWSATimer.Reset();
479 
480  if (fTrace) {
481  if (file == "")
482  filename = GenerateFilename();
483  else
484  filename = file;
485 
486  save_trace(filename);
487  }
488 }
489 
490 
491 void TCP_Sender::InitStatistics()
492 {
493  fNumberOfTimeouts = 0;
494  fNumberOfIdleTimeouts = 0;
495  fNumberOfFastRetransmits = 0;
496  fNumberOfRTTMeasurements = 0;
497  fNumberOfReceivedACKs = 0;
498 }
499 
500 
501 void TCP_Sender::StopTransientPhase()
502 {
503  InitStatistics();
504 }
505 
506 
507 void TCP_Sender::HandleUserMessageIndication(itpp::Packet *user_data_p)
508 {
509  if (fDebug) {
510  std::cout << "TCP_Sender::HandleUserMessageIndication"
511  << " byte_size=" << user_data_p->bit_size() / 8
512  << " ptr=" << user_data_p
513  << " time=" << Event_Queue::now() << std::endl;
514  }
515 
516  SocketWriteQueue.push(user_data_p);
517 
518  SendNewData(); // will call GetMessage (via GetNextSegmentSize)
519  // if new data can be sent
520 }
521 
522 
523 void TCP_Sender::ReceiveMessageFromNet(itpp::Packet *msg)
524 {
525  TCP_Packet & packet = (TCP_Packet &) * msg;
526 
527  if (fDebug) {
528  std::cout << "TCP_Sender::ReceiveMessageFromNet"
529  << " byte_size=" << msg->bit_size() / 8
530  << " ptr=" << msg
531  << " time=" << Event_Queue::now() << std::endl;
532  }
533 
534  if ((packet.get_session_id() == fSessionId) && // ACK of current session
535  (packet.get_ACK() >= fSndUna)) { // ACK is OK
536  HandleACK(packet);
537  }
538 
539  delete &packet;
540 }
541 
542 
543 void TCP_Sender::HandleACK(TCP_Packet &msg)
544 {
545  it_assert(msg.get_ACK() <= fSndMax, "TCP_Sender::HandleACK, received ACK > SndMax at ");
546 
547  fNumberOfReceivedACKs++;
548 
549  if (fTrace) {
550  TraceACKedSeqNo(msg.get_ACK());
551  }
552 
553  if (fDebug) {
554  std::cout << "sender " << fLabel << ": "
555  << "receive ACK: "
556  << " t = " << Event_Queue::now() << ", "
557  << msg << std::endl;
558  }
559 
560  // update receiver advertised window size
561  fRecWnd = msg.get_wnd();
562  fMaxRecWnd = max(fRecWnd, fMaxRecWnd);
563 
564  if (msg.get_ACK() == fSndUna) { // duplicate ACK
565 
566  bool ignoreDupACK = (fSndMax == fSndUna); // no outstanding data
567 
568  if (fIgnoreDupACKOnTORecovery) {
569  // don't count dupacks during TO recovery!
570  if (fCarefulMulFastRtxAvoidance) { // see RFC 2582, Section 5
571  // like in Solaris
572  ignoreDupACK = ignoreDupACK || (fSndUna <= fRecoveryTO);
573  }
574  else {
575  // like in ns
576  ignoreDupACK = ignoreDupACK || (fSndUna < fRecoveryTO);
577  }
578  }
579 
580  if (!ignoreDupACK) {
581  fDupACKCnt++; // count the number of duplicate ACKs
582 
583  if (fDupACKCnt == fDupACKThreshold) {
584  // dupack threshold is reached
585  fNumberOfFastRetransmits++;
586 
587  fRecoveryDupACK = fSndMax;
588 
589  ReduceSSThresh(); // halve ssthresh (in most cases)
590 
591  if ((fTCPVersion == kReno) || (fTCPVersion == kNewReno)) {
592  fCWnd = fSSThresh;
593  }
594  else if (fTCPVersion == kTahoe) {
595  fCWnd = fMSS;
596  }
597 
598  if (fTCPVersion == kReno || fTCPVersion == kNewReno) {
599  // conservation of packets:
600  if (fRenoConservation) {
601  fCWnd += fDupACKThreshold * fMSS;
602  }
603  }
604  else if (fTCPVersion == kTahoe) {
605  if (fGoBackN) {
606  fSndNxt = fSndUna; // Go-Back-N (like in ns)
607  }
608  }
609 
610  UnaRetransmit(); // initiate retransmission
611  }
612  else if (fDupACKCnt > fDupACKThreshold) {
613  if (fTCPVersion == kReno || fTCPVersion == kNewReno) {
614  // conservation of packets
615  // CWnd may exceed MaxCWnd during fast recovery,
616  // however, the result of SendWindow() is always <= MaxCwnd
617  if (fRenoConservation) {
618  fCWnd += fMSS;
619  }
620  }
621  }
622  }
623  }
624  else { // new ACK
625  Sequence_Number oldSndUna = fSndUna; // required for NewReno partial ACK
626  fSndUna = msg.get_ACK();
627  fSndNxt = max(fSndNxt, fSndUna); // required in case of "Go-Back-N"
628 
629  // reset retransmission timer
630 
631  if ((fSndUna > fTimUna) && fRtxTimer.IsPending()) {
632  // seq. no. for which rtx timer is running has been received
633  fRtxTimer.Reset();
634  }
635 
636  // backoff reset
637 
638  if (fImmediateBackoffReset) {
639  fBackoff = 1;
640  }
641  else {
642  if (fPendingBackoffReset) {
643  fBackoff = 1;
644  fPendingBackoffReset = false;
645  }
646  else if (fBackoff > 1) {
647  // reset backoff counter only on next new ACK (this is probably
648  // the way to operate intended by Karn)
649  fPendingBackoffReset = true;
650  }
651  }
652 
653  // RTT measurement
654 
655  if ((fSndUna > fRTTMByte) && fRTTMPending) {
656  UpdateRTTVariables(Event_Queue::now() - fRTTMStartTime);
657  fRTTMPending = false;
658  }
659 
660  // update CWnd and reset dupack counter
661 
662  if (fDupACKCnt >= fDupACKThreshold) {
663  // we are in fast recovery
664  if (fTCPVersion == kNewReno && fSndUna < fRecoveryDupACK) {
665  // New Reno partial ACK handling
666  if (fRenoConservation) {
667  fCWnd = max(fMSS, fCWnd - (fSndUna - oldSndUna) + fMSS);
668  }
669  UnaRetransmit(); // start retransmit immediately
670  }
671  else {
672  FinishFastRecovery();
673  }
674  }
675  else {
676  // no fast recovery
677  fDupACKCnt = 0;
678  if (fCWnd < fSSThresh) {
679  // slow start phase
680  fCWnd = min(fCWnd + fMSS, fMaxCWnd);
681  }
682  else {
683  // congestion avoidance phase
684  fCWnd += max(fMSS * fMSS / fCWnd, 1); // RFC 2581
685  fCWnd = min(fCWnd, fMaxCWnd);
686  }
687  }
688  } // new ACK
689 
690  SendNewData(); // try to send new data (even in the case that a retransmit
691  // had to be performed)
692 
693  if (fTrace) {
694  TraceCWnd();
695  }
696 }
697 
698 
699 void TCP_Sender::SendNewData(bool skipSWSA)
700 {
701  unsigned nextSegmentSize;
702 
703  it_assert(fSndUna <= fSndNxt, "TCP_Sender::SendNewData, SndUna > SndNxt in sender " + to_str(fLabel) + "!");
704 
705  if (fRestartAfterIdle) {
706  IdleCheck();
707  }
708 
709  bool sillyWindowAvoidanceFailed = false;
710 
711  while (!sillyWindowAvoidanceFailed &&
712  ((nextSegmentSize = GetNextSegmentSize(fSndNxt)) > 0)) {
713  // there is new data to send and window is large enough
714 
715  // SWSA and Nagle (RFC 1122): assume PUSH to be set
716  unsigned queuedUnsent = fUserNxt - fSndNxt;
717  unsigned usableWindow = max(0, (fSndUna + SendWindow()) - fSndNxt);
718 
719  if (((unsigned)min(queuedUnsent, usableWindow) >= fMSS) ||
720  ((!fNagle || (fSndUna == fSndNxt)) &&
721  ((queuedUnsent <= usableWindow) || // Silly W. A.
722  ((unsigned)min(queuedUnsent, usableWindow) >= fMaxRecWnd / 2)
723  )
724  ) ||
725  skipSWSA
726  ) {
727  // Silly Window Syndrome Avoidance (SWSA) and Nagle passed
728 
729  TCP_Segment nextSegment(fSndNxt, fSndNxt + nextSegmentSize);
730  TCP_Packet & msg = * new TCP_Packet();
731 
732  msg.set_segment(nextSegment);
733  msg.set_session_id(fSessionId);
734  msg.set_destination_port(fLabel); // The dest and src port are set to the same
735  msg.set_source_port(fLabel); // number for simplicity.
736  msg.set_bit_size(8 * (nextSegmentSize + fTCPIPHeaderLength));
737 
738  if (fDebug) {
739  std::cout << "TCP_Sender::SendNewData,"
740  << " nextSegmentSize=" << nextSegmentSize
741  << " fTCPIPHeaderLength=" << fTCPIPHeaderLength
742  << " byte_size=" << msg.bit_size() / 8
743  << " ptr=" << &msg
744  << " time=" << Event_Queue::now() << std::endl;
745  }
746 
747  // no RTT measurement for retransmitted segments
748  // changes on Dec. 13. 2002 (Ga, Bo, Scharf)
749 
750  if (!fRTTMPending && fSndNxt >= fSndMax) { // ##Bo##
751  fRTTMStartTime = Event_Queue::now();
752  fRTTMByte = nextSegment.begin();
753  fRTTMPending = true;
754  }
755 
756  fSndNxt += nextSegmentSize;
757  fSndMax = max(fSndNxt, fSndMax);
758 
759  // reset SWSA timer if necessary
760  if (skipSWSA) {
761  skipSWSA = false;
762  }
763  else if (fSWSATimer.IsPending()) {
764  fSWSATimer.Reset();
765  }
766 
767  // set rtx timer if necessary
768  if (!fRtxTimer.IsPending()) {
769  SetRtxTimer();
770  }
771 
772 
773  if (fDebug) {
774  msg.set_info(fSSThresh, fRecWnd, fCWnd, fRTTEstimate,
775  fSndUna, fSndNxt, false);
776  std::cout << "sender " << fLabel
777  << ": send new data: "
778  << " t = " << Event_Queue::now() << ", "
779  << msg << std::endl;
780  }
781 
782  SendMsg(msg);
783  }
784  else {
785  sillyWindowAvoidanceFailed = true;
786  // set SWSA timer
787  if (!fSWSATimer.IsPending()) {
788  fSWSATimer.Set(fSWSATimerValue);
789  }
790  }
791  }
792 
793  // set timers in case that no new data could have been sent
794  if (!fRtxTimer.IsPending()) {
795  if (fSndMax > fSndUna) { // there is outstanding data
796  if (!fImmediateBackoffReset && fPendingBackoffReset) {
797  // backoff is reset if no new data could have been sent since last
798  // (successfull) retransmission; this is useful in case of
799  // Reno recovery and multiple losses to avoid that in
800  // the (unavoidable) series of timeouts the timer value
801  // increases exponentially as this is not the intention
802  // of the delayed backoff reset in Karn's algorithm
803  fBackoff = 1;
804  fPendingBackoffReset = false;
805  }
806  SetRtxTimer();
807  }
808  }
809 }
810 
811 
812 void TCP_Sender::UnaRetransmit()
813 {
814  // resend after timeout or fast retransmit
815  unsigned nextSegmentSize = GetNextSegmentSize(fSndUna);
816 
817  if (nextSegmentSize > 0) {
818  TCP_Segment nextSegment(fSndUna, fSndUna + nextSegmentSize);
819  TCP_Packet & msg = *new TCP_Packet();
820  msg.set_segment(nextSegment);
821  msg.set_session_id(fSessionId);
822  msg.set_destination_port(fLabel); // The dest and src port are set to the same
823  msg.set_source_port(fLabel); // number for simplicity.
824  msg.set_bit_size(8 * (nextSegmentSize + fTCPIPHeaderLength));
825 
826  fSndNxt = max(fSndNxt, fSndUna + nextSegmentSize);
827  fSndMax = max(fSndNxt, fSndMax);
828 
829  // The RTT measurement is cancelled if the RTTM byte has a sequence
830  // number higher or equal than the first retransmitted byte as
831  // the ACK for the RTTM byte will be delayed by the rtx for at least
832  // one round
833  if (fKarn && (nextSegment.begin() <= fRTTMByte) && fRTTMPending) {
834  fRTTMPending = false;
835  }
836 
837  SetRtxTimer();
838 
839  if (fDebug) {
840  msg.set_info(fSSThresh, fRecWnd, fCWnd, fRTTEstimate,
841  fSndUna, fSndNxt, true);
842  std::cout << "sender " << fLabel;
843  if (fDupACKCnt >= fDupACKThreshold) {
844  std::cout << ": fast rtx: ";
845  }
846  else {
847  std::cout << ": TO rtx: ";
848  }
849  std::cout << " t = " << Event_Queue::now() << ", "
850  << msg << std::endl;
851  }
852 
853  SendMsg(msg);
854  }
855  else {
856  // throw(UL_CException("TCP_Sender::UnaRetransmit", "no bytes to send"));
857  }
858 }
859 
860 
861 void TCP_Sender::FinishFastRecovery()
862 {
863  if (fTCPVersion == kTahoe) {
864  fDupACKCnt = 0;
865  }
866  else if (fTCPVersion == kReno) {
867  // Reno fast recovery
868  fDupACKCnt = 0;
869  if (fFlightSizeRecovery) {
870  fCWnd = min(fSndMax - fSndUna + fMSS, fSSThresh);
871  }
872  else {
873  fCWnd = fSSThresh;
874  }
875  }
876  else if (fTCPVersion == kNewReno) {
877  // New Reno fast recovery
878  // "Set CWnd to ... min (ssthresh, FlightSize + MSS)
879  // ... or ssthresh" (RFC 2582)
880  if (fFlightSizeRecovery) {
881  fCWnd = min(fSndMax - fSndUna + fMSS, fSSThresh);
882  }
883  else {
884  fCWnd = fSSThresh;
885  }
886  fDupACKCnt = 0;
887  }
888 }
889 
890 
891 void TCP_Sender::ReduceSSThresh()
892 {
893  if (fCarefulSSThreshReduction) {
894  // If Reno conservation is enabled the amount of
895  // outstanding data ("flight size") might be rather large
896  // and even larger than twice the old ssthresh value;
897  // so this corresponds more to the ns behaviour where always cwnd is
898  // taken instead of flight size.
899  fSSThresh = max(2 * fMSS,
900  min(min(fCWnd, fSndMax - fSndUna), fRecWnd) / 2);
901  }
902  else {
903  // use filght size / 2 as recommended in RFC 2581
904  fSSThresh = max(2 * fMSS, min(fSndMax - fSndUna, fRecWnd) / 2);
905  }
906 
907  it_assert(fSSThresh <= fMaxCWnd, "TCP_Sender::HandleACK, internal error: SndSSThresh is > MaxCWnd");
908 
909  if (fTrace) {
910  TraceSSThresh();
911  }
912 }
913 
914 
915 void TCP_Sender::SendMsg(TCP_Packet &msg)
916 {
917  if (fTrace) {
918  TraceSentSeqNo(msg.get_segment().end());
919  }
920 
921  if (fRestartAfterIdle) {
922  fLastSendTime = Event_Queue::now(); // needed for idle detection
923  }
924 
925  tcp_send(&msg);
926 }
927 
928 
929 void TCP_Sender::IdleCheck()
930 {
931  // idle detection according to Jacobson, SIGCOMM, 1988:
932  // sender is currently idle and nothing has been send since RTO
933 
934  if (fSndMax == fSndUna && Event_Queue::now() - fLastSendTime > CalcRTOValue()) {
935  fCWnd = fInitialCWnd; // see RFC2581
936 
937  fNumberOfIdleTimeouts++;
938 
939  if (fTrace) {
940  TraceCWnd();
941  }
942 
943  if (fDebug) {
944  std::cout << "sender " << fLabel
945  << ": idle timeout: "
946  << "t = " << Event_Queue::now()
947  << ", SndNxt = " << fSndNxt
948  << ", SndUna = " << fSndUna
949  << ", Backoff = " << fBackoff
950  << std::endl;
951  }
952  }
953 }
954 
955 
956 void TCP_Sender::HandleRtxTimeout(Ttype)
957 {
958  fNumberOfTimeouts++;
959 
960  // update backoff
961  fBackoff = min(fMaxBackoff, fBackoff * 2);
962  if (!fImmediateBackoffReset) {
963  fPendingBackoffReset = false;
964  }
965 
966  if (fDupACKCnt >= fDupACKThreshold) {
967  FinishFastRecovery(); // reset dup ACK cnt and CWnd
968  }
969  else if (fDupACKCnt > 0) {
970  fDupACKCnt = 0; // don't allow dupack action during TO recovery
971  }
972 
973  // update CWnd and SSThresh
974  ReduceSSThresh(); // halve ssthresh (in most cases)
975  fCWnd = fMSS; // not initial CWnd, see RFC 2581
976 
977  it_assert(fSSThresh <= fMaxCWnd, "TCP_Sender::HandleRtxTimeout, internal error: SndSSThresh is > MaxCWnd");
978 
979  fRecoveryTO = fSndMax;
980 
981  if (fGoBackN) {
982  // go back N is mainly relevant in the case of multiple losses
983  // which would lead to a series of timeouts without resetting sndnxt
984  fSndNxt = fSndUna;
985  }
986 
987  if (fDebug) {
988  std::cout << "sender " << fLabel
989  << ": rtx timeout: "
990  << "t = " << Event_Queue::now()
991  << ", SndNxt = " << fSndNxt
992  << ", SndUna = " << fSndUna
993  << std::endl;
994  }
995 
996  if (fTrace) {
997  TraceCWnd();
998  }
999 
1000  UnaRetransmit(); // initiate retransmission
1001 }
1002 
1003 
1004 void TCP_Sender::HandleSWSATimeout(Ttype)
1005 {
1006  SendNewData(true);
1007 }
1008 
1009 
1010 unsigned TCP_Sender::GetNextSegmentSize(const Sequence_Number & begin)
1011 {
1012  // try to get new user messages if available and necessary
1013  while ((fUserNxt < begin + fMSS) && (!SocketWriteQueue.empty())) {
1014  itpp::Packet *packet_p = SocketWriteQueue.front();
1015  SocketWriteQueue.pop();
1016  fUserNxt += (unsigned) packet_p->bit_size() / 8;
1017  delete packet_p;
1018  }
1019 
1020  Sequence_Number end = min(min(fUserNxt, begin + fMSS),
1021  fSndUna + SendWindow());
1022 
1023  if (fDebug) {
1024  std::cout << "TCP_Sender::GetNextSegmentSize,"
1025  << " fUserNxt=" << fUserNxt
1026  << " begin_seq_num=" << begin
1027  << " fMSS=" << fMSS
1028  << " fSndUna=" << fSndUna
1029  << " SendWindow()=" << SendWindow()
1030  << " end_seq_num=" << end
1031  << " time=" << Event_Queue::now() << std::endl;
1032  }
1033 
1034  return max(0, end - begin);
1035 }
1036 
1037 
1038 unsigned TCP_Sender::SendWindow() const
1039 {
1040  return min(fRecWnd, min(fMaxCWnd, fCWnd));
1041 }
1042 
1043 
1044 double TCP_Sender::CalcRTOValue() const
1045 {
1046  static const double factor = 1 + 1e-8;
1047  // to avoid "simultaneous" TO/receive ACK events in case of const. RTT
1048 
1049  double rto = fBackoff * fRTTEstimate * factor;
1050 
1051  if (rto > fMaxRTO) {
1052  rto = fMaxRTO;
1053  }
1054 
1055  return rto;
1056 }
1057 
1058 
1059 void TCP_Sender::SetRtxTimer()
1060 {
1061  double rto = CalcRTOValue();
1062  fRtxTimer.Set(rto);
1063  fTimUna = fSndUna;
1064  if (fDebug) {
1065  std::cout << "sender " << fLabel
1066  << ": set rtx timer: "
1067  << "t = " << Event_Queue::now()
1068  << ", RTO = " << rto
1069  << ", Backoff = " << fBackoff
1070  << ", TimUna = " << fTimUna
1071  << std::endl;
1072  }
1073 }
1074 
1075 
1076 void TCP_Sender::UpdateRTTVariables(double sampleRTT)
1077 {
1078  if (fSRTT == 0) {
1079  fSRTT = sampleRTT;
1080  fRTTVar = sampleRTT / 2;
1081  }
1082  else {
1083  // see, e.g., Comer for the values used as weights
1084  fSRTT = 0.875 * fSRTT + 0.125 * sampleRTT;
1085  fRTTVar = 0.75 * fRTTVar + 0.25 * fabs(sampleRTT - fSRTT);
1086  }
1087 
1088  fRTTEstimate = round(fSRTT + 4 * fRTTVar, fTimerGranularity);
1089 
1090  if (fTrace) {
1091  TraceRTTVariables(sampleRTT);
1092  }
1093 
1094  fNumberOfRTTMeasurements++;
1095 }
1096 
1097 
1098 void TCP_Sender::TraceRTTVariables(double sampleRTT)
1099 {
1100  if (fDebug) {
1101  std::cout << "sender " << fLabel
1102  << ": RTT update: "
1103  << "t = " << Event_Queue::now()
1104  << ", sample = " << sampleRTT
1105  << ", SRTT = " << fSRTT
1106  << ", RTTVar = " << fRTTVar
1107  << ", RTTEstimate = " << fRTTEstimate
1108  << std::endl;
1109  }
1110 
1111  if (RTTsample_index >= RTTsample_time.size()) {
1112  RTTsample_time.set_size(2*RTTsample_time.size(), true);
1113  RTTsample_val.set_size(2*RTTsample_val.size(), true);
1114  }
1115  RTTsample_val(RTTsample_index) = sampleRTT;
1116  RTTsample_time(RTTsample_index) = Event_Queue::now();
1117  RTTsample_index++;
1118 
1119  if (RTTEstimate_index >= RTTEstimate_time.size()) {
1120  RTTEstimate_time.set_size(2*RTTEstimate_time.size(), true);
1121  RTTEstimate_val.set_size(2*RTTEstimate_val.size(), true);
1122  }
1123  RTTEstimate_val(RTTEstimate_index) = fRTTEstimate;
1124  RTTEstimate_time(RTTEstimate_index) = Event_Queue::now();
1125  RTTEstimate_index++;
1126 }
1127 
1128 
1129 void TCP_Sender::TraceCWnd()
1130 {
1131  if (fDebug) {
1132  std::cout << "sender " << fLabel
1133  << " t = " << Event_Queue::now()
1134  << " cwnd = " << fCWnd << std::endl;
1135  }
1136  if (CWnd_index >= CWnd_time.size()) {
1137  CWnd_time.set_size(2*CWnd_time.size(), true);
1138  CWnd_val.set_size(2*CWnd_val.size(), true);
1139  }
1140  CWnd_val(CWnd_index) = fCWnd;
1141  CWnd_time(CWnd_index) = Event_Queue::now();
1142  CWnd_index++;
1143 
1144 }
1145 
1146 void TCP_Sender::TraceSSThresh()
1147 {
1148  if (fDebug) {
1149  std::cout << "sender " << fLabel
1150  << " t = " << Event_Queue::now()
1151  << " cwnd = " << fSSThresh << std::endl;
1152  }
1153  if (SSThresh_index >= SSThresh_time.size()) {
1154  SSThresh_time.set_size(2*SSThresh_time.size(), true);
1155  SSThresh_val.set_size(2*SSThresh_val.size(), true);
1156  }
1157  SSThresh_val(SSThresh_index) = fSSThresh;
1158  SSThresh_time(SSThresh_index) = Event_Queue::now();
1159  SSThresh_index++;
1160 
1161 }
1162 
1163 void TCP_Sender::TraceSentSeqNo(const Sequence_Number sn)
1164 {
1166  if (fDebug) {
1167  std::cout << "sender " << fLabel
1168  << " t = " << Event_Queue::now()
1169  << " sent = " << sn
1170  << std::endl;
1171  }
1172  if (sent_seq_num_index >= sent_seq_num_time.size()) {
1173  sent_seq_num_time.set_size(2*sent_seq_num_time.size(), true);
1174  sent_seq_num_val.set_size(2*sent_seq_num_val.size(), true);
1175  }
1176  sent_seq_num_val(sent_seq_num_index) = sn.value();
1177  sent_seq_num_time(sent_seq_num_index) = Event_Queue::now();
1178  sent_seq_num_index++;
1179 }
1180 
1181 
1182 void TCP_Sender::TraceACKedSeqNo(const Sequence_Number sn)
1183 {
1184  if (fDebug) {
1185  std::cout << "sender " << fLabel
1186  << " t = " << Event_Queue::now()
1187  << " ACK = " << sn
1188  << std::endl;
1189  }
1190 
1191  if (sender_recv_ack_seq_num_index >= sender_recv_ack_seq_num_time.size()) {
1192  sender_recv_ack_seq_num_time.set_size(2*sender_recv_ack_seq_num_time.size(), true);
1193  sender_recv_ack_seq_num_val.set_size(2*sender_recv_ack_seq_num_val.size(), true);
1194  }
1195  sender_recv_ack_seq_num_val(sender_recv_ack_seq_num_index) = sn.value();
1196  sender_recv_ack_seq_num_time(sender_recv_ack_seq_num_index) = Event_Queue::now();
1197  sender_recv_ack_seq_num_index++;
1198 }
1199 
1200 
1201 void TCP_Sender::save_trace(std::string filename)
1202 {
1203 
1204  CWnd_val.set_size(CWnd_index, true);
1205  CWnd_time.set_size(CWnd_index, true);
1206 
1207  SSThresh_val.set_size(SSThresh_index, true);
1208  SSThresh_time.set_size(SSThresh_index, true);
1209 
1210  sent_seq_num_val.set_size(sent_seq_num_index, true);
1211  sent_seq_num_time.set_size(sent_seq_num_index, true);
1212 
1213  sender_recv_ack_seq_num_val.set_size(sender_recv_ack_seq_num_index, true);
1214  sender_recv_ack_seq_num_time.set_size(sender_recv_ack_seq_num_index, true);
1215 
1216  RTTEstimate_val.set_size(RTTEstimate_index, true);
1217  RTTEstimate_time.set_size(RTTEstimate_index, true);
1218 
1219  RTTsample_val.set_size(RTTsample_index, true);
1220  RTTsample_time.set_size(RTTsample_index, true);
1221 
1222  if (fDebug) {
1223  std::cout << "CWnd_val" << CWnd_val << std::endl;
1224  std::cout << "CWnd_time" << CWnd_time << std::endl;
1225  std::cout << "CWnd_index" << CWnd_index << std::endl;
1226 
1227  std::cout << "SSThresh_val" << SSThresh_val << std::endl;
1228  std::cout << "SSThresh_time" << SSThresh_time << std::endl;
1229  std::cout << "SSThresh_index" << SSThresh_index << std::endl;
1230 
1231  std::cout << "sent_seq_num_val" << sent_seq_num_val << std::endl;
1232  std::cout << "sent_seq_num_time" << sent_seq_num_time << std::endl;
1233  std::cout << "sent_seq_num_index" << sent_seq_num_index << std::endl;
1234 
1235  std::cout << "sender_recv_ack_seq_num_val" << sender_recv_ack_seq_num_val << std::endl;
1236  std::cout << "sender_recv_ack_seq_num_time" << sender_recv_ack_seq_num_time << std::endl;
1237  std::cout << "sender_recv_ack_seq_num_index" << sender_recv_ack_seq_num_index << std::endl;
1238 
1239  std::cout << "RTTEstimate_val" << RTTEstimate_val << std::endl;
1240  std::cout << "RTTEstimate_time" << RTTEstimate_time << std::endl;
1241  std::cout << "RTTEstimate_index" << RTTEstimate_index << std::endl;
1242 
1243  std::cout << "RTTsample_val" << RTTsample_val << std::endl;
1244  std::cout << "RTTsample_time" << RTTsample_time << std::endl;
1245  std::cout << "RTTsample_index" << RTTsample_index << std::endl;
1246 
1247  std::cout << "TCP_Sender::saving to file: " << filename << std::endl;
1248  }
1249 
1250  it_file ff2;
1251  ff2.open(filename);
1252 
1253  ff2 << Name("CWnd_val") << CWnd_val;
1254  ff2 << Name("CWnd_time") << CWnd_time;
1255  ff2 << Name("CWnd_index") << CWnd_index;
1256 
1257  ff2 << Name("SSThresh_val") << SSThresh_val;
1258  ff2 << Name("SSThresh_time") << SSThresh_time;
1259  ff2 << Name("SSThresh_index") << SSThresh_index;
1260 
1261  ff2 << Name("sent_seq_num_val") << sent_seq_num_val;
1262  ff2 << Name("sent_seq_num_time") << sent_seq_num_time;
1263  ff2 << Name("sent_seq_num_index") << sent_seq_num_index;
1264 
1265  ff2 << Name("sender_recv_ack_seq_num_val") << sender_recv_ack_seq_num_val;
1266  ff2 << Name("sender_recv_ack_seq_num_time") << sender_recv_ack_seq_num_time;
1267  ff2 << Name("sender_recv_ack_seq_num_index") << sender_recv_ack_seq_num_index;
1268 
1269  ff2 << Name("RTTEstimate_val") << RTTEstimate_val;
1270  ff2 << Name("RTTEstimate_time") << RTTEstimate_time;
1271  ff2 << Name("RTTEstimate_index") << RTTEstimate_index;
1272 
1273  ff2 << Name("RTTsample_val") << RTTsample_val;
1274  ff2 << Name("RTTsample_time") << RTTsample_time;
1275  ff2 << Name("RTTsample_index") << RTTsample_index;
1276 
1277  ff2.flush();
1278  ff2.close();
1279 }
1280 
1281 
1282 void TCP_Sender::print_item(std::ostream &, const std::string & keyword)
1283 {
1284  if (keyword == "Label") {
1285  std::cout << fLabel;
1286  }
1287  else if (keyword == "CWnd") {
1288  std::cout << fCWnd;
1289  }
1290  else if (keyword == "SSThresh") {
1291  std::cout << fSSThresh;
1292  }
1293  else if (keyword == "SRTT") {
1294  std::cout << fSRTT;
1295  }
1296  else if (keyword == "RTTvar") {
1297  std::cout << fRTTVar;
1298  }
1299  else if (keyword == "Backoff") {
1300  std::cout << fBackoff;
1301  }
1302  else if (keyword == "RTO") {
1303  std::cout << CalcRTOValue();
1304  }
1305  else if (keyword == "NoOfFastRets") {
1306  std::cout << fNumberOfFastRetransmits;
1307  }
1308  else if (keyword == "NoOfRetTOs") {
1309  std::cout << fNumberOfTimeouts;
1310  }
1311  else if (keyword == "NoOfIdleTOs") {
1312  std::cout << fNumberOfIdleTimeouts;
1313  }
1314  else if (keyword == "NoOfRTTMs") {
1315  std::cout << fNumberOfRTTMeasurements;
1316  }
1317  else if (keyword == "NoOfRecACKs") {
1318  std::cout << fNumberOfReceivedACKs;
1319  }
1320  else {
1321  }
1322 }
1323 
1324 
1325 // -------------------- TCP_Receiver_Buffer ----------------------------------------
1326 TCP_Receiver_Buffer::TCP_Receiver_Buffer() :
1327  fFirstByte()
1328 {
1329 }
1330 
1331 
1332 TCP_Receiver_Buffer::TCP_Receiver_Buffer(const TCP_Receiver_Buffer & rhs) :
1333  fFirstByte(rhs.fFirstByte),
1334  fBufList(rhs.fBufList)
1335 {
1336 }
1337 
1338 
1339 void TCP_Receiver_Buffer::reset()
1340 {
1341  fBufList.clear();
1342  fFirstByte = 0;
1343 }
1344 
1345 
1346 TCP_Receiver_Buffer::~TCP_Receiver_Buffer()
1347 {
1348 }
1349 
1350 
1351 void TCP_Receiver_Buffer::write(TCP_Segment newBlock)
1352 {
1353  // error cases
1354  it_assert(newBlock.begin() <= newBlock.end(), "TCP_Receiver_Buffer::Write, no valid segment");
1355 
1356  // cut blocks beginning before fFirstByte
1357  if (newBlock.begin() < fFirstByte) {
1358  if (newBlock.end() > fFirstByte) {
1359  newBlock.set_begin(fFirstByte);
1360  }
1361  else {
1362  return; //// TODO: Is this strange?
1363  }
1364  }
1365 
1366  if (newBlock.length() == 0) { // empty block, nothing to do
1367  return;
1368  }
1369 
1370  if (fBufList.empty() || (newBlock.begin() > fBufList.back().end())) {
1371  // new block is behind last block in buffer
1372  fBufList.push_back(newBlock);
1373  }
1374  else {
1375  // skip list entries if beginning of newBlock > end of current one
1376  // (search for correct list position)
1377  std::list<TCP_Segment>::iterator iter;
1378  iter = fBufList.begin();
1379  while (newBlock.begin() > iter->end()) {
1380  iter++;
1381  it_assert(iter != fBufList.end(), "TCP_Receiver_Buffer::Write, internal error");
1382  }
1383 
1384  TCP_Segment & exBlock = *iter;
1385 
1386  if (exBlock.can_be_combined(newBlock)) {
1387  // overlapping or contiguous blocks -> combine
1388  exBlock.combine(newBlock);
1389 
1390  // check following blocks
1391  iter++;
1392  while ((iter != fBufList.end()) &&
1393  exBlock.can_be_combined(*iter)) {
1394  exBlock.combine(*iter);
1395  iter = fBufList.erase(iter);
1396  }
1397  }
1398  else {
1399  // no overlap, newBlock lies between two existing list entries
1400  // new list entry has to be created
1401 
1402  fBufList.insert(iter, newBlock);
1403  }
1404  }
1405 
1406  it_assert(!fBufList.empty() && fBufList.front().begin() >= fFirstByte, "TCP_Receiver_Buffer::Write, internal error");
1407 
1408 }
1409 
1410 
1411 // The amount of data read from the buffer is given as parameter. It has
1412 // to be less than or equal to the size of the first block stored. This
1413 // mean the caller of Read should first check how much data is available
1414 // by calling FirstBlockSize.
1415 void TCP_Receiver_Buffer::read(unsigned noOfBytes)
1416 {
1417  it_assert(first_block_size() > 0, "TCP_Receiver_Buffer::Read, No block to read");
1418  it_assert(noOfBytes <= first_block_size(), "TCP_Receiver_Buffer::Read, submitted block size not valid");
1419 
1420 
1421  if (noOfBytes < first_block_size()) {
1422  fBufList.front().set_begin(fBufList.front().begin() + noOfBytes);
1423  }
1424  else { // first block will be read completely
1425  fBufList.pop_front();
1426  }
1427  fFirstByte += noOfBytes;
1428 
1429  it_assert(fBufList.empty() || fBufList.front().begin() >= fFirstByte, "TCP_Receiver_Buffer::Read, internal error");
1430 }
1431 
1432 
1433 // FirstBlockSize returns the size of the first block stored in the
1434 // buffer or 0 if the buffer is empty
1435 unsigned TCP_Receiver_Buffer::first_block_size() const
1436 {
1437  if (!fBufList.empty() && (fBufList.front().begin() == fFirstByte)) {
1438  return fBufList.front().length();
1439  }
1440  else {
1441  return 0;
1442  }
1443 }
1444 
1445 
1446 std::ostream & TCP_Receiver_Buffer::info(std::ostream &os, int detail) const
1447 {
1448  os << "receiver buffer information" << std::endl
1449  << "number of blocks: " << fBufList.size() << std::endl
1450  << "first byte stored: " << fFirstByte << std::endl
1451  << "last byte stored +1: " << last_byte() << std::endl
1452  << "next byte expected: " << next_expected() << std::endl;
1453 
1454  if (detail > 0) {
1455  os << "segments in receiver buffer:" << std::endl;
1456 
1457  typedef std::list<TCP_Segment>::const_iterator LI;
1458  for (LI i = fBufList.begin(); i != fBufList.end(); ++i) {
1459  const TCP_Segment & block = *i;
1460  os << ". segment: " << block << std::endl;
1461  }
1462 
1463  }
1464 
1465  return os;
1466 }
1467 
1468 
1469 // -------------------- TCP_Receiver ----------------------------------------
1470 TCP_Receiver::TCP_Receiver(int label) :
1471  fReceiverBuffer(),
1472  fLabel(label),
1473  fTCPIPHeaderLength(TCP_HEADERLENGTH),
1474  fMSS(TCP_RMSS),
1475  fBufferSize(TCP_BUFFERSIZE),
1476  fDelayedACK(TCP_DELAYEDACK),
1477  fACKDelayTime(TCP_ACKDELAYTIME),
1478  fSendPeriodicACKs(TCP_SENDPERIODICACKS),
1479  fStrictPeriodicACKs(TCP_STRICTPERIODICACKS),
1480  fPeriodicACKInterval(TCP_PERIODICACKINTERVAL),
1481  fACKSchedulingDelay(TCP_ACKSCHEDULINGDELAY),
1482  fACKOnBufferWrite(TCP_ACKBUFFERWRITE),
1483  fACKOnBufferRead(TCP_ACKBUFFERREAD),
1484  fMaxUserBlockSize(TCP_MAXUSERBLOCKSIZE),
1485  fMinUserBlockSize(TCP_MINUSERBLOCKSIZE),
1486  fUserBlockProcDelay(TCP_USERBLOCKPROCDELAY),
1487  fTrace(false),
1488  fDebug(false),
1489  fSessionId(0),
1490  fDelayedACKTimer(*this, &TCP_Receiver::DelayedACKHandler),
1491  fPeriodicACKTimer(*this, &TCP_Receiver::PeriodicACKHandler),
1492  fACKSchedulingTimer(*this, &TCP_Receiver::SendACKMessage),
1493  fWaitingACKMsg(0),
1494  fUserBlockProcTimer(*this, &TCP_Receiver::HandleEndOfProcessing)
1495 {
1496  fUserMessage = NULL;
1497 
1498 
1499  if (!fACKOnBufferRead && !fACKOnBufferWrite) {
1500  // throw(UL_CException("TCP_Receiver::TCP_Receiver",
1501  // "ACKs must be sent on buffer read or write or both"));
1502  }
1503 
1504  setup();
1505 
1506  tcp_receive.forward(this, &TCP_Receiver::ReceiveMessageFromNet);
1507  tcp_receive.set_name("TCP Receive");
1508  tcp_send_ack.set_name("TCP send ACK");
1509  tcp_new_data.set_name("TCP New Data");
1510  tcp_release.forward(this, &TCP_Receiver::release);
1511  tcp_release.set_name("TCP Release");
1512 
1513 }
1514 
1515 
1516 TCP_Receiver::~TCP_Receiver()
1517 {
1518  delete fWaitingACKMsg;
1519  delete fUserMessage;
1520 }
1521 
1522 
1523 void TCP_Receiver::set_debug(const bool enable_debug)
1524 {
1525  fDebug = enable_debug;
1526  tcp_send_ack.set_debug(enable_debug);
1527  tcp_new_data.set_debug();
1528 }
1529 
1530 void TCP_Receiver::set_debug(bool enable_debug, bool enable_signal_debug)
1531 {
1532  fDebug = enable_debug;
1533  tcp_send_ack.set_debug(enable_signal_debug);
1534  tcp_new_data.set_debug();
1535 }
1536 
1537 void TCP_Receiver::set_trace(const bool enable_trace)
1538 {
1539  fTrace = enable_trace;
1540 }
1541 
1542 
1543 
1544 void TCP_Receiver::setup()
1545 {
1546  fAdvRcvWnd = 0;
1547  fAdvRcvNxt = 0;
1548 
1549  if (fSendPeriodicACKs) {
1550  fPeriodicACKTimer.Set(fPeriodicACKInterval);
1551  }
1552 
1553  fReceiverBuffer.reset();
1554 
1555  received_seq_num_val.set_size(1000);
1556  received_seq_num_val.zeros();
1557  received_seq_num_time.set_size(1000);
1558  received_seq_num_time.zeros();
1559  received_seq_num_val(0) = 0;
1560  received_seq_num_time(0) = 0;
1561  received_seq_num_index = 1;
1562 }
1563 
1564 std::string TCP_Receiver::GenerateFilename()
1565 {
1566  time_t rawtime;
1567 #ifndef _MSC_VER
1568  struct tm *timeinfo;
1569  timeinfo = localtime(&rawtime);
1570 #else
1571  time(&rawtime);
1572  struct tm _timeinfo;
1573  struct tm *timeinfo = &_timeinfo;
1574  localtime_s(timeinfo, &rawtime);
1575 #endif
1576  std::ostringstream filename_stream;
1577  filename_stream << "trace_tcp_receiver_u" << fLabel
1578  << "_" << 1900 + timeinfo->tm_year
1579  << "_" << timeinfo->tm_mon
1580  << "_" << timeinfo->tm_mday
1581  << "__" << timeinfo->tm_hour
1582  << "_" << timeinfo->tm_min
1583  << "_" << timeinfo->tm_sec
1584  << "_.it";
1585  return filename_stream.str();
1586 }
1587 
1588 void TCP_Receiver::release(std::string file)
1589 {
1590  std::string filename;
1591  fSessionId++;
1592 
1593  if (fWaitingACKMsg != 0) {
1594  delete fWaitingACKMsg;
1595  fWaitingACKMsg = 0;
1596  }
1597  if (fUserMessage != 0) {
1598  delete fUserMessage;
1599  fUserMessage = 0;
1600  }
1601 
1602  fUserBlockProcTimer.Reset();
1603  fDelayedACKTimer.Reset();
1604  fPeriodicACKTimer.Reset();
1605  fACKSchedulingTimer.Reset();
1606 
1607  if (fTrace) {
1608  if (file == "")
1609  filename = GenerateFilename();
1610  else
1611  filename = file;
1612 
1613  save_trace(filename);
1614  }
1615 }
1616 
1617 
1618 void TCP_Receiver::ReceiveMessageFromNet(itpp::Packet *msg)
1619 {
1620  TCP_Packet & packet = (TCP_Packet &) * msg;
1621  if (packet.get_destination_port() == fLabel) {
1622  if (packet.get_session_id() == fSessionId) {
1623  ReceiveDataPacket(packet);
1624  }
1625  else {
1626  it_warning("Received a TCP packet with wrong SessionId");
1627  std::cout << "TCP_Receiver::ReceiveMessageFromNet, "
1628  << "fLabel= " << fLabel
1629  << "fSessionId= " << fSessionId << std::endl;
1630  std::cout << "packet=" << packet
1631  << ", next exp. = " << fReceiverBuffer.next_expected()
1632  << std::endl;
1633  exit(0);
1634  }
1635  }
1636  else {
1637  it_warning("Received a TCP packet with label");
1638  exit(0);
1639  }
1640 }
1641 
1642 
1643 void TCP_Receiver::ReceiveDataPacket(TCP_Packet &msg)
1644 {
1645  TCP_Segment segment = msg.get_segment();
1646 
1647  bool isOutOfOrder = (segment.begin() > fReceiverBuffer.next_expected()) ||
1648  (segment.end() <= fReceiverBuffer.next_expected());
1649 
1650  if (fDebug) {
1651  std::cout << "TCP_Receiver::ReceiveDataPacket receiver: " << fLabel << ": "
1652  << "receive msg: "
1653  << "t = " << Event_Queue::now()
1654  << ", next exp. = " << fReceiverBuffer.next_expected()
1655  << ", " << msg << std::endl;
1656  }
1657 
1658  if (fTrace) {
1659  TraceReceivedSeqNo(segment.end());
1660  }
1661 
1662  it_assert(segment.end() <= fReceiverBuffer.first_byte() + fBufferSize, "TCP_Receiver::ReceiveTCPPacket, packet exceeds window at ");
1663  it_assert(segment.begin() < segment.end(), "TCP_Receiver::ReceiveTCPPacket, silly packet received at ");
1664 
1665  fReceiverBuffer.write(segment);
1666 
1667  if (isOutOfOrder) {
1668  SendACK(true); // create dupack conditionless
1669  }
1670  else {
1671  if (fACKOnBufferWrite) {
1672  SendACK(false);
1673  }
1674  IndicateUserMessage();
1675  }
1676 
1677  delete &msg;
1678 }
1679 
1680 
1681 void TCP_Receiver::IndicateUserMessage()
1682 {
1683  if (fUserMessage == 0) {
1684  // receive a block
1685  unsigned noOfBytes = min(fReceiverBuffer.first_block_size(),
1686  fMaxUserBlockSize);
1687 
1688  if (fDebug) {
1689  std::cout << "TCP_Receiver::IndicateUserMessage "
1690  << "t = " << Event_Queue::now()
1691  << " noOfBytes = " << noOfBytes
1692  << " firstBlock = " << fReceiverBuffer.first_block_size()
1693  << std::endl;
1694  }
1695 
1696  if (noOfBytes >= fMinUserBlockSize) {
1697  fUserMessage = new Packet();
1698  fUserMessage->set_bit_size(8*noOfBytes);
1699  fUserBlockProcTimer.Set(fUserBlockProcDelay);
1700  }
1701  }
1702 }
1703 
1704 
1705 bool TCP_Receiver::is_user_message_available()
1706 {
1707  if (fUserMessage != 0) {
1708  return true;
1709  }
1710 
1711  unsigned noOfBytes = min(fReceiverBuffer.first_block_size(),
1712  fMaxUserBlockSize);
1713 
1714  if (noOfBytes >= fMinUserBlockSize) {
1715  fUserMessage = new Packet();
1716  fUserMessage->set_bit_size(8*noOfBytes);
1717  return true;
1718  }
1719  else {
1720  return false;
1721  }
1722 }
1723 
1724 
1725 itpp::Packet & TCP_Receiver::get_user_message()
1726 {
1727  it_assert(fUserMessage != 0, "TCP_Receiver::GetUserMessage, no message available");
1728  if (fDebug) {
1729  std::cout << "TCP_Receiver::GetUserMessage "
1730  << "receiver: " << fLabel << ": "
1731  << "read from buffer: "
1732  << "t = " << Event_Queue::now()
1733  << ", user msg length = " << (fUserMessage->bit_size() / 8)
1734  << ", first byte = " << fReceiverBuffer.first_byte()
1735  << ", first block size = " << fReceiverBuffer.first_block_size()
1736  << std::endl;
1737  }
1738 
1739  fReceiverBuffer.read(fUserMessage->bit_size() / 8);
1740  if (fACKOnBufferRead) {
1741  SendACK(false); // send acknowledgement
1742  }
1743 
1744  itpp::Packet & msg = *fUserMessage;
1745  fUserMessage = 0;
1746 
1747  if (fReceiverBuffer.first_block_size() > 0) {
1748  IndicateUserMessage();
1749  }
1750 
1751  return msg;
1752 }
1753 
1754 
1755 
1756 void TCP_Receiver::HandleEndOfProcessing(Ttype)
1757 {
1758  it_assert(fUserMessage != 0, "TCP_Receiver::HandleEndOfProcessing, no message available");
1759 
1760 
1761  tcp_new_data(fLabel);
1762 }
1763 
1764 
1765 void TCP_Receiver::DelayedACKHandler(Ttype)
1766 {
1767  if (fDebug) {
1768  std::cout << "TCP_Receiver::DelayedACKHandler "
1769  << "receiver " << fLabel
1770  << ": delACK TO: "
1771  << "t = " << Event_Queue::now() << std::endl;
1772  }
1773 
1774  SendACK(true);
1775 }
1776 
1777 
1778 void TCP_Receiver::PeriodicACKHandler(Ttype)
1779 {
1780  if (fDebug) {
1781  std::cout << "TCP_Receiver::PeriodicACKHandler"
1782  << "receiver " << fLabel
1783  << ": periodicACK TO: "
1784  << "t = " << Event_Queue::now() << std::endl;
1785  }
1786 
1787  SendACK(true);
1788 }
1789 
1790 
1791 void TCP_Receiver::SendACK(bool sendConditionless)
1792 {
1793  // sendConditionless is set
1794  // ... if packet was received out of order or
1795  // ... if delayed ACK timer has expired
1796 
1797  // Bei eingeschaltetem "delayed ACK" wird ein ACK nur
1798  // gesendet, wenn das Fenster um 2MSS oder 35% der
1799  // maximalen Fenstergroesse verschoben worden ist
1800  // ... oder nach delayed ACK Timeout
1801  // ... oder wenn es das ACK fur ein Out of Order Segment ist
1802  // ... oder (in der Realitat), wenn ich auch was zu senden habe.
1803 
1804  if (sendConditionless || !fDelayedACK ||
1805  (fReceiverBuffer.next_expected() - fAdvRcvNxt >= (int)(2 * fMSS)) ||
1806  (fReceiverBuffer.next_expected() - fAdvRcvNxt >=
1807  (int)(0.35 * fBufferSize))) {
1808  // Remark: RFC2581 recommends to acknowledge every second
1809  // packet conditionless (without setting this as a requirement)
1810  // in order to avoid excessive ack delays when the receiver MSS
1811  // is larger than the sender MSS. In this uni-directional
1812  // implementation, the receiver's MSS is not actively
1813  // used for sending but only for deciding when acknowledgments
1814  // have to be returned. Thus, the best solution to account for
1815  // RFC2581 is to set the receiver's MSS always equal to the
1816  // sender's MSS.
1817 
1818  // Receiver Silly Window Syndrome Avoidance:
1819 
1820  if (fAdvRcvNxt + fAdvRcvWnd + min(fBufferSize / 2, fMSS)
1821  <= fReceiverBuffer.first_byte() + fBufferSize) {
1822  // Die rechte Grenze des Empfangerfensters wird nur anders angezeigt
1823  // als beim letzten ACK, wenn sie sich seither um mindestens
1824  // min (BufferSize/ 2, MSS) geandert hat.
1825  fAdvRcvWnd = fBufferSize - fReceiverBuffer.first_block_size();
1826  }
1827  else {
1828  fAdvRcvWnd = fAdvRcvNxt + fAdvRcvWnd - fReceiverBuffer.next_expected();
1829  }
1830 
1831  fAdvRcvNxt = fReceiverBuffer.next_expected();
1832 
1833  if (fSendPeriodicACKs &&
1834  (!fStrictPeriodicACKs || !fPeriodicACKTimer.IsPending())) {
1835  fPeriodicACKTimer.Set(fPeriodicACKInterval);
1836  }
1837 
1838  if (fDelayedACK && fDelayedACKTimer.IsPending()) {
1839  fDelayedACKTimer.Reset();
1840  }
1841 
1842  ScheduleACKMessage();
1843  }
1844  else {
1845  if (!fDelayedACKTimer.IsPending()) {
1846  fDelayedACKTimer.Set(fACKDelayTime);
1847  if (fDebug) {
1848  std::cout << "TCP_Receiver::SendACK"
1849  << "receiver " << fLabel
1850  << ": set delACK timer: "
1851  << "t = " << Event_Queue::now() << std::endl;
1852  }
1853  }
1854  }
1855 }
1856 
1857 
1858 void TCP_Receiver::ScheduleACKMessage()
1859 {
1860  if (fWaitingACKMsg == 0) {
1861  fWaitingACKMsg = new TCP_Packet;
1862  }
1863 
1864  fWaitingACKMsg->set_ACK(fAdvRcvNxt);
1865  fWaitingACKMsg->set_wnd(fAdvRcvWnd);
1866  fWaitingACKMsg->set_session_id(fSessionId);
1867  fWaitingACKMsg->set_destination_port(fLabel);
1868  fWaitingACKMsg->set_source_port(fLabel);
1869  fWaitingACKMsg->set_bit_size(8*fTCPIPHeaderLength);
1870 
1871  if (fACKSchedulingDelay > 0) {
1872  if (!fACKSchedulingTimer.IsPending()) {
1873  fACKSchedulingTimer.Set(fACKSchedulingDelay);
1874  }
1875  }
1876  else {
1877  SendACKMessage(Event_Queue::now());
1878  }
1879 }
1880 
1881 
1882 void TCP_Receiver::SendACKMessage(Ttype)
1883 {
1884  it_assert(fWaitingACKMsg != 0, "TCP_Receiver::SendACKMessage, no ACK message waiting");
1885 
1886  if (fDebug) {
1887  std::cout << "TCP_Receiver::SendACKMessage Ack sent"
1888  << "receiver " << fLabel
1889  << ": send ACK: "
1890  << "t = " << Event_Queue::now()
1891  << ", " << (*fWaitingACKMsg)
1892  << " byte_size=" << fWaitingACKMsg->bit_size() / 8
1893  << " ptr=" << fWaitingACKMsg << std::endl;
1894  }
1895 
1896  tcp_send_ack(fWaitingACKMsg);
1897 
1898  fWaitingACKMsg = 0;
1899 }
1900 
1901 
1902 void TCP_Receiver::TraceReceivedSeqNo(const Sequence_Number &sn)
1903 {
1904  if (fDebug) {
1905  std::cout << "TCP_Receiver::TraceReceivedSeqNo "
1906  << "receiver " << fLabel
1907  << " t = " << Event_Queue::now()
1908  << " sn = " << sn << std::endl;
1909  }
1910  if (received_seq_num_index >= received_seq_num_time.size()) {
1911  received_seq_num_time.set_size(2*received_seq_num_time.size(), true);
1912  received_seq_num_val.set_size(2*received_seq_num_val.size(), true);
1913  }
1914  received_seq_num_val(received_seq_num_index) = sn.value();
1915  received_seq_num_time(received_seq_num_index) = Event_Queue::now();
1916  received_seq_num_index++;
1917 }
1918 
1919 
1920 void TCP_Receiver::save_trace(std::string filename)
1921 {
1922 
1923  received_seq_num_val.set_size(received_seq_num_index, true);
1924  received_seq_num_time.set_size(received_seq_num_index, true);
1925 
1926  if (fDebug) {
1927  std::cout << "received_seq_num_val" << received_seq_num_val << std::endl;
1928  std::cout << "received_seq_num_time" << received_seq_num_time << std::endl;
1929  std::cout << "received_seq_num_index" << received_seq_num_index << std::endl;
1930  std::cout << "TCP_Receiver::saving to file: " << filename << std::endl;
1931  }
1932 
1933  it_file ff2;
1934  ff2.open(filename);
1935 
1936  ff2 << Name("received_seq_num_val") << received_seq_num_val;
1937  ff2 << Name("received_seq_num_time") << received_seq_num_time;
1938  ff2 << Name("received_seq_num_index") << received_seq_num_index;
1939 
1940  ff2.flush();
1941  ff2.close();
1942 }
1943 
1944 
1945 } //namespace itpp
1946 
1947 #ifdef _MSC_VER
1948 #pragma warning(default:4355)
1949 #endif
1950 
SourceForge Logo

Generated on Sat Jul 6 2013 10:54:24 for IT++ by Doxygen 1.8.2