明确什么时候 active() 返回 false?
bool TCPConnection::active() const {
// 流出错了,肯定返回 false
if (_sender.stream_in().error() || _receiver.stream_out().error()) {
return false;
// 没出错,但入向流或出向流没结束,返回 true
} else if (!inbound_finish() || !outbound_finish()) {
return true;
// 入向流和出向流都结束了,但需要 linger,返回 true
} else if (_linger_after_streams_finish) {
return true;
// 入向流和出向流都结束了,也不需要 linger,说明正常关闭了,返回 false
} else {
return false;
}
}
明确什么时候要将 _linger_after_streams_finish 置为 false 来表示被动关闭?
if (inbound_finish() &&
!(_sender.stream_in().eof() && _sender.next_seqno_absolute() == _sender.stream_in().bytes_written() + 2)) {
_linger_after_streams_finish = false;
}
TCPConnection 设计
class TCPConnection {
private:
...
size_t _time_since_last_segment_received{0};
void send_segments(bool ack = false);
void send_rst_segment();
bool inbound_finish() const;
bool outbound_finish() const;
void unclean_shudown();
void clean_shutdown();
public:
...
};
私有成员函数实现
send_segments()
// 将 TCPSender 输出队列中的报文段送入 TCPConnection 的输出队列中,如果传入参数为 true,则必会发至少一个报文段
void TCPConnection::send_segments(bool ack) {
if (_sender.segments_out().empty() && ack) {
_sender.send_empty_segment();
}
while (!_sender.segments_out().empty()) {
TCPSegment seg = _sender.segments_out().front();
_sender.segments_out().pop();
if (_receiver.ackno().has_value()) {
seg.header().ack = true;
seg.header().ackno = _receiver.ackno().value();
seg.header().win = _receiver.window_size() > numeric_limits<uint16_t>::max()
? numeric_limits<uint16_t>::max()
: _receiver.window_size();
}
segments_out().push(seg);
}
}
send_rst_segment()
// 发送 rst 报文
void TCPConnection::send_rst_segment() {
_sender.send_empty_segment();
TCPSegment seg = _sender.segments_out().front();
_sender.segments_out().pop();
seg.header().rst = true;
segments_out().push(seg);
unclean_shudown();
}
inbound_finish()
// 入向流终止,即 TCPReceiver 处于 FIN_RECV 状态
bool TCPConnection::inbound_finish() const { return _receiver.stream_out().input_ended(); }
outbound_finish()
// 出向流终止,即 TCPSender 处于 FIN_ACKED 状态
bool TCPConnection::outbound_finish() const {
return _sender.stream_in().eof() && (_sender.next_seqno_absolute() == _sender.stream_in().bytes_written() + 2) && (_sender.bytes_in_flight() == 0);
}
unclean_shudown()
// 不正常终止
void TCPConnection::unclean_shudown() {
_sender.stream_in().set_error();
_receiver.stream_out().set_error();
}
clean_shutdown()
// 正常终止
void TCPConnection::clean_shutdown() { _linger_after_streams_finish = false; }
简单函数实现
size_t TCPConnection::remaining_outbound_capacity() const { return _sender.stream_in().remaining_capacity(); }
size_t TCPConnection::bytes_in_flight() const { return _sender.bytes_in_flight(); }
size_t TCPConnection::unassembled_bytes() const { return _receiver.unassembled_bytes(); }
size_t TCPConnection::time_since_last_segment_received() const { return _time_since_last_segment_received; }
void TCPConnection::connect() {
_sender.fill_window();
send_segments();
}
size_t TCPConnection::write(const string &data) {
size_t len = _sender.stream_in().write(data);
_sender.fill_window();
send_segments();
return len;
}
void TCPConnection::end_input_stream() {
_sender.stream_in().end_input();
_sender.fill_window();
send_segments();
}
TCPConnection::~TCPConnection() {
try {
if (active()) {
cerr << "Warning: Unclean shutdown of TCPConnection\n";
send_rst_segment();
}
} catch (const exception &e) {
std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl;
}
}
segment_received() 设计
既要负责收,也要负责发
void TCPConnection::segment_received(const TCPSegment &seg) {
// 接收到报文段,重置时间
_time_since_last_segment_received = 0;
// 如果是 rst 报文段,不正常关闭
if (seg.header().rst) {
unclean_shudown();
return;
}
// 告知 _receiver
_receiver.segment_received(seg);
// 判断被动关闭
if (inbound_finish() &&
!(_sender.stream_in().eof() && _sender.next_seqno_absolute() == _sender.stream_in().bytes_written() + 2)) {
_linger_after_streams_finish = false;
}
// 如果接收到的报文段 ack 被置位,告知 _sender
if (seg.header().ack) {
_sender.ack_received(seg.header().ackno, seg.header().win);
}
// 特判 keep-alive 报文
if (_receiver.ackno().has_value() && (seg.length_in_sequence_space() == 0) &&
seg.header().seqno == _receiver.ackno().value() - 1) {
send_segments(true);
return;
}
// 如果占据序列号,一定要回送 ack 报文
if (seg.length_in_sequence_space() > 0) {
_sender.fill_window();
send_segments(true);
// 如果不占据序列号,且发过 syn,尝试发外向流中的数据,若外向流中无数据也不强制回送 ack
} else if (_sender.next_seqno_absolute() > 0) {
_sender.fill_window();
send_segments();
}
}
tick() 设计
void TCPConnection::tick(const size_t ms_since_last_tick) {
// 累计从接收到上一个报文段到现在的时间
_time_since_last_segment_received += ms_since_last_tick;
// 告知 _sender,可能会超时导致重发报文段
_sender.tick(ms_since_last_tick);
// 重传次数过多,发送 rst 报文段直接结束
if (_sender.consecutive_retransmissions() > TCPConfig::MAX_RETX_ATTEMPTS) {
send_rst_segment();
return;
}
// 发送可能会有的重发报文
send_segments();
// 如果处于 linger 状态,且入向流和出向流终止,并且到达指定时间可以正常关闭
if (_linger_after_streams_finish && inbound_finish() && outbound_finish() &&
time_since_last_segment_received() >= 10 * _cfg.rt_timeout) {
clean_shutdown();
}
}