lab3

发布时间 2023-03-22 21:10:58作者: lyc2002

明确函数作用

  • fill_window() 只关心 _segments_out 和 _outstanding_segments 这两个数据结构,它要做的就是构造报文并将其添加到这两个数据结构中,添加到 _segments_out 中就表示发送了,添加到 _outstanding_segments 中就表示这个报文段是未确认的

  • ack_received() 只关心 _outstanding_segments,它要做的是根据传入的 ackno 从 _outstanding_segments 中丢掉已经确认的报文段

  • tick() 只关心 _outstanding_segments,它要做的是累计时间,若超时的话,重传 _outstanding_segments 中最早的报文段

  • bytes_in_flight() 指明了 _outstanding_segments 占据的序列号长度

TCPSender 设计

class TCPSender {
  private:
    ...
        
    // 指明当前 TCPSender 所处状态,方便程序编写,_st 初始化为 CLOSED 状态
    enum class SenderState { CLOSED = 0, SYN_SENT, SYN_ACKED, FIN_SENT, FIN_ACKED, ERROR };
    SenderState _st{SenderState::CLOSED};
    
    // 使用队列来存储未解决的报文段
    std::queue<TCPSegment> _outstanding_segments{};
    
    // _next_ackno 表示 TCPSender 希望被 ack 的第一个绝对序列号,也就是说 _next_seqno - _next_ackno 即为 bytes_in_flight()
    uint64_t _next_ackno{0};  
    
    // 对端窗口大小,初始化为 1
    size_t _window_size{1};
    
    // 重传计时器
    Timer _timer;
    
    // 私有成员函数,发送一个新报文段,fill_window() 使用
    void send_new_segment(const TCPSegment &seg);

  public:
    ...
};

Timer 设计

具体逻辑

只有一个计时器,该计时器记录的时间是最早加入 _outstanding_segments 中的那个报文段的时间,连续重传的次数也是重传该报文段的次数,故 Timer 类设计如下

class Timer {
  private:
    bool _state; // 指示计时器当前状态,true 是启动,false 是关闭
    unsigned int _initial_retransmission_timeout; // 初始 rto
    unsigned int _cur_retransmission_timeout; // 当前 rto
    unsigned int _time; // 累计时间
    unsigned int _consecutive_retransmissions; // 连续重传次数

  public:
    Timer(unsigned int retx_timeout)
        : _state(false)
        , _initial_retransmission_timeout(retx_timeout)
        , _cur_retransmission_timeout(retx_timeout)
        , _time(0)
        , _consecutive_retransmissions(0) {}
    
    void run() {
        _state = true;
        _time = 0;
    }
    void stop() { _state = false; }
    bool state() const { return _state; }
    
    bool expired(const size_t ms_since_last_tick) {
        _time += ms_since_last_tick;
        return _time >= _cur_retransmission_timeout;
    }
    
    void double_rto() { _cur_retransmission_timeout *= 2; }
    void reset_rto() { _cur_retransmission_timeout = _initial_retransmission_timeout; }
    
    unsigned int consecutive_retx() const { return _consecutive_retransmissions; }
    void inc_consecutive_retx() { _consecutive_retransmissions++; }
    void reset_consecutive_retx() { _consecutive_retransmissions = 0; }
};

Timer 如何使用

在 fill_window() 中

在 ack_received() 中


在 tick() 中

fill_window() 设计

发送一个新报文段

void TCPSender::send_new_segment(const TCPSegment &seg) {
    // 对于不占据序列长度的报文段不予发送
    if (seg.length_in_sequence_space() > 0) {
        segments_out().push(seg);
        _outstanding_segments.push(seg);
        // 发送了新报文段,就要更新 _next_seqno
        _next_seqno += seg.length_in_sequence_space();
        // 如果计时器未打开,就打开它
        if (_timer.state() == false) {
            _timer.run();
        }
    }
}

具体逻辑

对于 fill_window() 函数

  • 在 CLOSED 状态下只发送不带有 payload 的 SYN 报文,然后到 SYN_SENT 状态
  • 在 SYN_ACKED 状态下发送带有 payload 的报文,如果 stream_in().eof() 且对端窗口仍有空间就发送 FIN 报文,然后到 FIN_SENT 状态
  • 其他状态不需要操作
void TCPSender::fill_window() {
    switch (_st) {
        case SenderState::CLOSED: {
            TCPSegment seg;
            seg.header().seqno = next_seqno();
            seg.header().syn = true;

            send_new_segment(seg);
            
            _st = SenderState::SYN_SENT;

            break;
        }
        case SenderState::SYN_ACKED: {
            // 如果对端窗口大小为 0 的话看作是 1
            size_t sequence_space = _window_size == 0 ? 1 : _window_size;
            
            // 只要有空间就尝试从字节流中读取数据,打包成新报文发送
            while (bytes_in_flight() < sequence_space) {
                TCPSegment seg;
                
                seg.header().seqno = next_seqno();
                size_t len = min(TCPConfig::MAX_PAYLOAD_SIZE, sequence_space - bytes_in_flight());
                seg.payload() = Buffer(stream_in().read(len));

                if (stream_in().eof() && bytes_in_flight() + seg.length_in_sequence_space() < sequence_space) {
                    seg.header().fin = true;
                    _st = SenderState::FIN_SENT;
                }
                
                send_new_segment(seg);
                
                // 如果已经发送了 FIN 报文或构造的报文所占据的序列长度为 0 就要跳出
                if (_st == SenderState::FIN_SENT || seg.length_in_sequence_space() == 0) {
                    break;
                }
            }

            break;
        }
        case SenderState::FIN_ACKED: {
            break;
        }
        case SenderState::ERROR: {
            break;
        }
        default: {
            break;
        }
    }
}

ack_received() 设计

对于 ack_received() 函数

  • 在 SYN_SENT 状态下,如果 SYN 报文段被确认,就到 SYN_ACKED 状态
  • 在 FIN_SENT 状态下,如果 FIN 报文段被确认,就到 FIN_ACKED 状态
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
    uint64_t next_ackno = unwrap(ackno, _isn, _next_ackno); // receiver 已经确认的绝对序列号 + 1
    _window_size = window_size;
    
    // 如果 next_ackno > next_seqno_absolute(),显然出错,因为 Sender 还没发
    // 如果 next_ackno <= _next_ackno,确认不了任何 _outstanding_segment
    if (next_ackno > next_seqno_absolute() || next_ackno <= _next_ackno) return; 

    while (!_outstanding_segments.empty()) {
        TCPSegment tmp = _outstanding_segments.front();
        // 要确保最早的报文段整个能被确认,才 pop
        if (_next_ackno + tmp.length_in_sequence_space() <= next_ackno) {
            _outstanding_segments.pop();

            _timer.reset_rto();
            _timer.reset_consecutive_retx();

            _next_ackno += tmp.length_in_sequence_space();

            if (tmp.header().syn) {
                _st = SenderState::SYN_ACKED;
            } else if (tmp.header().fin) {
                _st = SenderState::FIN_ACKED;
            }
        } else {
            break;
        }
    }

    if (_outstanding_segments.empty()) {
        _timer.stop();
    } else {
        _timer.run();
    }
}

tick() 设计

对于 tick() 函数

累计时间,如果超时就重传

void TCPSender::tick(const size_t ms_since_last_tick) {
    if (_timer.state() && _timer.expired(ms_since_last_tick)) {
        TCPSegment tmp = _outstanding_segments.front();
        segments_out().push(tmp);
        if (_window_size > 0) {
            _timer.inc_consecutive_retx();
            _timer.double_rto();
        }
        _timer.run();
    }
}

其他函数

uint64_t TCPSender::bytes_in_flight() const { return _next_seqno - _next_ackno; }

unsigned int TCPSender::consecutive_retransmissions() const { return _timer.consecutive_retx(); }

void TCPSender::send_empty_segment() {
    TCPSegment seg;
    seg.header().seqno = next_seqno();
    segments_out().push(seg);
}