简介
本次实验要求实现的是一个流重组器(Stream Reassembler)。在 lab0 中我们实现了顺序流的结构 ByteStream,在实际 TCP 连接中,两端的 socket 中各有一个 ByteStream,用来存储从对方接收到的字节流,供主机进程读取数据。在真实的 TCP 通信中,这些字节流的到来是乱序的,另外还可能会发生重传、丢失、重叠等情况。因此本实验设计的流重组器功能就是将这些杂乱的数据重新整理成顺序排列的信息,确保存入 ByteStream 的是正确的字节流。
实现
ByteStream 的结构大致如下所示,第一段是已被用户进程读出的字节流,第二段是存储在 ByteStream 中还未被读出的,第三段是 ByteStream 中目前最多还可以写入的。第二三段的长度和就是 ByteStream 的总容量。若收到的信息全部位于第一、第二或第四段,则直接丢弃。若收到的信息有一部分位于第四段,则将超出的部分截断。将收到的信息中 first unassembled 之前的数据写入 ByteStream。接着,检查缓冲区中的消息串是否可以写入 ByteStream,若可以写入,则写入,并将其从缓存区移出。
ByteStream
若消息完全落在 first unassembled 之后,则将其插入缓存区。缓存区是一个 set 集合,新插入的数据应该要顺序排放,同时不重叠,不重复,需要在插入时进行处理。
关键代码实现如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 void StreamReassembler::insert_substr_waiting (const struct Node& node) { if (_substr_waiting.empty()) { _substr_waiting.insert(node); _bytes_unassembled += node.data.size(); return ; } struct Node tmp = node; auto it = _substr_waiting.lower_bound(node); size_t x=tmp.index, sz = tmp.data.size(); if (it != _substr_waiting.begin()) { it--; if (x < it->index + it->data.size()) { if (x+sz >= it->index + it->data.size()) return ; tmp.data = it->data + tmp.data.substr(it->index+it->data.size()-x); tmp.index = it->index; x = tmp.index; sz = tmp.data.size(); _substr_waiting.erase(it++); _bytes_unassembled -= it->data.size(); } else { it++; } } while (it != _substr_waiting.end() && x + sz > it->index) { if (x >= it->index && x + sz < it->index + it->data.size()) return ; if (x + sz < it->index + it->data.size()) { tmp.data += it->data.substr(x + sz - it->index); } _bytes_unassembled -= it->data.size(); _substr_waiting.erase(it++); } _substr_waiting.insert(tmp); _bytes_unassembled += tmp.data.size(); } void StreamReassembler::push_substring (const string &data, const size_t index, const bool eof) { struct Node node { data , index}; size_t first_unread = _output.bytes_read(); size_t first_unassembled = _output.bytes_written(); size_t first_unacceptance = first_unread + _capacity; if (index + data.size() < first_unassembled || index >= first_unacceptance) { return ; } if (index +data.size() > first_unacceptance) { node.data = node.data.substr(0 , first_unacceptance-index); } if (index <= first_unassembled) { _output.write(node.data.substr(first_unacceptance-index)); auto it = _substr_waiting.begin(); while (it->index <= _output.bytes_written()) { if (it->index + it->data.size() > node.index + node.data.size()) { _output.write(it->data.substr(_output.bytes_written()-it->index)); } _bytes_unassembled -= it->data.size(); _substr_waiting.erase(it); it++; } } else { insert_substr_waiting(node); } if (eof) { _flag_eof = true ; _pos_eof = index + data.size(); } if (_flag_eof && _output.bytes_written() == _pos_eof) { _output.end_input(); } } size_t StreamReassembler::unassembled_bytes () const { return _bytes_unassembled; }bool StreamReassembler::empty () const { return _bytes_unassembled==0 ; }