CS144-lab3

发布时间 2023-10-11 18:46:07作者: cerwang

Checkpoint 3 Writeup

该lab主要实现TCP发送方,细节比较多,具有一定难度,编写时需要从整体上理清设计思路,然后再实现具体的函数。

Timer

由于要实现TCP中的超时重传功能,所以需要在发送方维护一个定时器,但不需要自己使用计时函数,因为文档里说明了所有对时间的了解都是通过tick函数得到的,每次能从函数参数中得到已经流逝的时间。
通过阅读文档中发送方的行为,判断计时器应该完成的任务有:
开始计时、停止计时、判断是否正在计时、判断是否超时、RTO翻倍、RTO重置
因此本地应该维护四个成员初始RTO、当前RTO、当前时间、是否正在计时
下面是计时器的实现

class Timer
{
  uint64_t initial_RTO_ms_;
  uint64_t RTO_ms;
  uint64_t time_ms { 0 };
  bool is_timing { false };

public:
  Timer( uint64_t ms )
    : initial_RTO_ms_( ms )
    , RTO_ms( ms )
  {}
  void start()
  {
    time_ms = 0;
    is_timing = true;
  }
  void stop() { is_timing = false; }
  bool timing() { return is_timing; }
  bool expired()
  {
    return time_ms >= RTO_ms;
  }
  void tick( uint64_t ms_since_last_tick )
  {
    if ( is_timing ) {
      time_ms += ms_since_last_tick;
    }
  }
  void set_RTO() { RTO_ms = initial_RTO_ms_; }
  void double_RTO() { RTO_ms = 2 * RTO_ms; }
};

TCPSender

包含成员变量

  Wrap32 isn_;
  bool SYN_sent { false };
  bool FIN_sent { false };
  uint64_t initial_RTO_ms_;
  /*
  ? why 0 wrong , 1 right
  first SYN may be back off retranmission, so not 0
  win_size = 0 we must send a 1 sized message
  */
  uint64_t win_size { 1 };
  uint64_t ackno { 0 };
  uint64_t sentno { 0 };
  uint64_t retransmissions_count { 0 };
  uint64_t outbytes_count { 0 };
  std::queue<TCPSenderMessage> out_standing {};
  std::queue<TCPSenderMessage> sending_queue {};
  Timer timer;

其中Wrap32 isn_是一个32位的无符号整数,用于记录初始序列号,SYN_sent和FIN_sent用于记录是否发送过SYN和FIN,initial_RTO_ms_用于记录初始RTO,win_size用于记录窗口大小,ackno用于记录期望收到的ack(即已经确认的数据下一个序列号),sentno用于记录要发送的下一个序列号,retransmissions_count用于记录重传次数,outbytes_count用于记录已发送但未收到ack的字节数,out_standing用于记录已发送但未被确认的数据(队列的特性保证了out_standing队首报文一定是未确认的最早报文),sending_queue用于存放待发送的的数据,timer为定时器。
有一个细节是窗口大小初始值不应该为0,因为当窗口大小为0时,发送方无法判断这是到底是由于有容量但是SYN丢失还是接收方确实没容量,导致超时不会增加重传计数并加倍。

  • optional TCPSender::maybe_send()
    从发送队列sending_queue中取出一个TCPSenderMessage发送,如果计时器未运行,重启定时器

  • void TCPSender::push( Reader& outbound_stream )
    从outbound_stream中循环读取数据,将数据封装成TCPSenderMessage放入sending_queue中

    • 决定是否发送SYN,这里无论是SYN还是FIN报文都可以携带数据
    • 读取尽可能多但不超过窗口大小的payload,注意SYN和FIN报文也算入窗口大小,算入序列
    • 决定是否发送FIN
    • 直接丢弃空报文(在Reader无数据可读且未关闭的情况下,上述步骤可能会产生空报文)
    • 增加sentno,并把报文放入发送队列sending_queue和未确认队列out_standing
  • void TCPSender::receive( const TCPReceiverMessage& msg )
    接收接收方的ACK报文

    • 更新win_size
    • 丢弃ackno为空的报文和包含不可能存在的ackno的报文
    • 更新本地ackno,由接收方的性质可知TCPReceiverMessage的ackno之前的数据已全部接收,故只要大于本地ackno即可更新,并从未完成队列中清除已确认的报文,同时重置计时器RTO,若未完成队列非空,则还要启动计时器。
  • TCPSenderMessage TCPSender::send_empty_message() const
    发送空报文,可以用于检测程序是否正确

  • void TCPSender::tick( const size_t ms_since_last_tick )
    更新计时器计数值,若计时器超时,则重传未确认的数据(将out_standing队首报文放入sending_queue),重启计时器。如果窗口大小非零还要重传次数加一,RTO加倍。

/* TCPSender constructor (uses a random ISN if none given) */
TCPSender::TCPSender( uint64_t initial_RTO_ms, optional<Wrap32> fixed_isn )
  : isn_( fixed_isn.value_or( Wrap32 { random_device()() } ) )
  , initial_RTO_ms_( initial_RTO_ms )
  , timer( initial_RTO_ms )
{}

uint64_t TCPSender::sequence_numbers_in_flight() const
{
  // Your code here.
  return outbytes_count;
}

uint64_t TCPSender::consecutive_retransmissions() const
{
  // Your code here.
  return retransmissions_count;
}

optional<TCPSenderMessage> TCPSender::maybe_send()
{
  // Your code here.
  if ( sending_queue.empty() ) {
    return std::nullopt;
  }
  if ( !timer.timing() ) {
    timer.start();
  }
  TCPSenderMessage tosend = sending_queue.front();
  sending_queue.pop();
  return tosend;
}

void TCPSender::push( Reader& outbound_stream )
{
  // Your code here.
  // using temp var to ensure no back off rto in tick
  uint64_t cur_win_size = ( win_size > 0 ? win_size : 1 );
  while ( outbytes_count < cur_win_size ) {
    /*
    why inside the loop? wrong outside(cannt fill window, in fact like a random size)
    message contains Buffer, Buffer contans a shared_ptr<string> ,we must push each ptr(different) to
    string(different), rather than put the same ptr to only one thing(may not same as changed)
    */
    TCPSenderMessage tosend;
    if ( !SYN_sent ) {
      SYN_sent = true;
      tosend.SYN = true;
      outbytes_count++;
    }
    tosend.seqno = Wrap32::wrap( sentno, isn_ );
    read( outbound_stream, min( cur_win_size - outbytes_count, TCPConfig::MAX_PAYLOAD_SIZE ), tosend.payload );
    outbytes_count += tosend.payload.size();

    if ( !FIN_sent && outbound_stream.is_finished() && outbytes_count < cur_win_size ) {
      FIN_sent = true;
      tosend.FIN = true;
      outbytes_count++;
    }

    if ( !tosend.sequence_length() ) {
      break;
    }

    sentno += tosend.sequence_length();
    sending_queue.push( tosend );
    out_standing.push( tosend );

    // !
    if ( FIN_sent || outbound_stream.bytes_buffered() == 0 ) {
      break;
    }
  }
}

TCPSenderMessage TCPSender::send_empty_message() const
{
  // Your code here.
  return TCPSenderMessage { Wrap32::wrap( sentno, isn_ ), false, {}, false };
}

void TCPSender::receive( const TCPReceiverMessage& msg )
{
  win_size = msg.window_size;
  if ( msg.ackno.has_value() ) {
    auto ackno_recv = msg.ackno.value().unwrap( isn_, sentno );
    if ( ackno_recv > sentno ) {
      return;
    }
    if ( ackno_recv > ackno ) {
      ackno = ackno_recv;
      while ( !out_standing.empty() ) {
        TCPSenderMessage& first_msg = out_standing.front();
        // only having been acked, the message can be removed from out_standing
        if ( first_msg.seqno.unwrap( isn_, sentno ) + first_msg.sequence_length() <= ackno ) {
          outbytes_count -= first_msg.sequence_length();
          out_standing.pop();
        } else {
          break;
        }
      }
      timer.set_RTO();
      retransmissions_count = 0;
      if ( out_standing.empty() ) 
        timer.stop();
      else 
        timer.start();
    }
  }
}

void TCPSender::tick( const size_t ms_since_last_tick )
{
  // Your code here.
  timer.tick( ms_since_last_tick );
  if ( timer.expired() ) {
    sending_queue.push( out_standing.front() );
    if ( win_size != 0 ) {
      retransmissions_count++;
      timer.double_RTO();
    }
    timer.start();
  }
}