lab4

发布时间 2023-03-28 23:01:34作者: lyc2002

明确什么时候 active() 返回 false?

bool TCPConnection::active() const {
    // 流出错了,肯定返回 false
    if (_sender.stream_in().error() || _receiver.stream_out().error()) {
        return false;
    // 没出错,但入向流或出向流没结束,返回 true
    } else if (!inbound_finish() || !outbound_finish()) {
        return true;
    // 入向流和出向流都结束了,但需要 linger,返回 true
    } else if (_linger_after_streams_finish) {
        return true;
    // 入向流和出向流都结束了,也不需要 linger,说明正常关闭了,返回 false
    } else {
        return false;
    }
}

明确什么时候要将 _linger_after_streams_finish 置为 false 来表示被动关闭?

if (inbound_finish() && 
    !(_sender.stream_in().eof() && _sender.next_seqno_absolute() == _sender.stream_in().bytes_written() + 2)) {
    _linger_after_streams_finish = false;
}

TCPConnection 设计

class TCPConnection {
  private:
    ...
        
    size_t _time_since_last_segment_received{0};

    void send_segments(bool ack = false);
    void send_rst_segment();
    bool inbound_finish() const;
    bool outbound_finish() const;
    void unclean_shudown();
    void clean_shutdown();

  public:
   ...
};

私有成员函数实现

send_segments()

// 将 TCPSender 输出队列中的报文段送入 TCPConnection 的输出队列中,如果传入参数为 true,则必会发至少一个报文段
void TCPConnection::send_segments(bool ack) {
    if (_sender.segments_out().empty() && ack) {
        _sender.send_empty_segment();
    }

    while (!_sender.segments_out().empty()) {
        TCPSegment seg = _sender.segments_out().front();
        _sender.segments_out().pop();
        if (_receiver.ackno().has_value()) {
            seg.header().ack = true;
            seg.header().ackno = _receiver.ackno().value();
            seg.header().win = _receiver.window_size() > numeric_limits<uint16_t>::max()
                                   ? numeric_limits<uint16_t>::max()
                                   : _receiver.window_size();
        }
        segments_out().push(seg);
    }
}

send_rst_segment()

// 发送 rst 报文
void TCPConnection::send_rst_segment() {
    _sender.send_empty_segment();
    TCPSegment seg = _sender.segments_out().front();
    _sender.segments_out().pop();
    seg.header().rst = true;
    segments_out().push(seg);
    unclean_shudown();
}

inbound_finish()

// 入向流终止,即 TCPReceiver 处于 FIN_RECV 状态
bool TCPConnection::inbound_finish() const { return _receiver.stream_out().input_ended(); }

outbound_finish()

// 出向流终止,即 TCPSender 处于 FIN_ACKED 状态
bool TCPConnection::outbound_finish() const {
    return _sender.stream_in().eof() && (_sender.next_seqno_absolute() == _sender.stream_in().bytes_written() + 2) && (_sender.bytes_in_flight() == 0);
}

unclean_shudown()

// 不正常终止
void TCPConnection::unclean_shudown() {
    _sender.stream_in().set_error();
    _receiver.stream_out().set_error();
}

clean_shutdown()

// 正常终止
void TCPConnection::clean_shutdown() { _linger_after_streams_finish = false; }

简单函数实现

size_t TCPConnection::remaining_outbound_capacity() const { return _sender.stream_in().remaining_capacity(); }

size_t TCPConnection::bytes_in_flight() const { return _sender.bytes_in_flight(); }

size_t TCPConnection::unassembled_bytes() const { return _receiver.unassembled_bytes(); }

size_t TCPConnection::time_since_last_segment_received() const { return _time_since_last_segment_received; }

void TCPConnection::connect() {
    _sender.fill_window();
    send_segments();
}

size_t TCPConnection::write(const string &data) {
    size_t len = _sender.stream_in().write(data);
    _sender.fill_window();
    send_segments();
    return len;
}

void TCPConnection::end_input_stream() {
    _sender.stream_in().end_input();
    _sender.fill_window();
    send_segments();
}

TCPConnection::~TCPConnection() {
    try {
        if (active()) {
            cerr << "Warning: Unclean shutdown of TCPConnection\n";
            send_rst_segment();
        }
    } catch (const exception &e) {
        std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl;
    }
}

segment_received() 设计

既要负责收,也要负责发

void TCPConnection::segment_received(const TCPSegment &seg) {
    // 接收到报文段,重置时间
    _time_since_last_segment_received = 0;

    // 如果是 rst 报文段,不正常关闭
    if (seg.header().rst) {
        unclean_shudown();
        return;
    }

    // 告知 _receiver
    _receiver.segment_received(seg);

    // 判断被动关闭
    if (inbound_finish() &&
        !(_sender.stream_in().eof() && _sender.next_seqno_absolute() == _sender.stream_in().bytes_written() + 2)) {
        _linger_after_streams_finish = false;
    }

    // 如果接收到的报文段 ack 被置位,告知 _sender
    if (seg.header().ack) {
        _sender.ack_received(seg.header().ackno, seg.header().win);
    }

    // 特判 keep-alive 报文
    if (_receiver.ackno().has_value() && (seg.length_in_sequence_space() == 0) &&
        seg.header().seqno == _receiver.ackno().value() - 1) {
        send_segments(true);
        return;
    }

    // 如果占据序列号,一定要回送 ack 报文
    if (seg.length_in_sequence_space() > 0) {
        _sender.fill_window();
        send_segments(true);
    
    // 如果不占据序列号,且发过 syn,尝试发外向流中的数据,若外向流中无数据也不强制回送 ack
    } else if (_sender.next_seqno_absolute() > 0) {
        _sender.fill_window();
        send_segments();
    }
}

tick() 设计

void TCPConnection::tick(const size_t ms_since_last_tick) {
    // 累计从接收到上一个报文段到现在的时间
    _time_since_last_segment_received += ms_since_last_tick;
    
    // 告知 _sender,可能会超时导致重发报文段
    _sender.tick(ms_since_last_tick);
    
    // 重传次数过多,发送 rst 报文段直接结束
    if (_sender.consecutive_retransmissions() > TCPConfig::MAX_RETX_ATTEMPTS) {
        send_rst_segment();
        return;
    }
    
    // 发送可能会有的重发报文
    send_segments();

    // 如果处于 linger 状态,且入向流和出向流终止,并且到达指定时间可以正常关闭
    if (_linger_after_streams_finish && inbound_finish() && outbound_finish() &&
        time_since_last_segment_received() >= 10 * _cfg.rt_timeout) {
        clean_shutdown();
    }
}