Minor code corrections

This commit is contained in:
sfan5 2024-01-05 11:03:19 +01:00
parent 84d4647329
commit eeb873b23c
3 changed files with 36 additions and 36 deletions

@ -194,7 +194,7 @@ u32 ReliablePacketBuffer::size()
return m_list.size(); return m_list.size();
} }
RPBSearchResult ReliablePacketBuffer::findPacketNoLock(u16 seqnum) ReliablePacketBuffer::FindResult ReliablePacketBuffer::findPacketNoLock(u16 seqnum)
{ {
for (auto it = m_list.begin(); it != m_list.end(); ++it) { for (auto it = m_list.begin(); it != m_list.end(); ++it) {
if ((*it)->getSeqnum() == seqnum) if ((*it)->getSeqnum() == seqnum)
@ -232,7 +232,7 @@ BufferedPacketPtr ReliablePacketBuffer::popFirst()
BufferedPacketPtr ReliablePacketBuffer::popSeqnum(u16 seqnum) BufferedPacketPtr ReliablePacketBuffer::popSeqnum(u16 seqnum)
{ {
MutexAutoLock listlock(m_list_mutex); MutexAutoLock listlock(m_list_mutex);
RPBSearchResult r = findPacketNoLock(seqnum); auto r = findPacketNoLock(seqnum);
if (r == m_list.end()) { if (r == m_list.end()) {
LOG(dout_con<<"Sequence number: " << seqnum LOG(dout_con<<"Sequence number: " << seqnum
<< " not found in reliable buffer"<<std::endl); << " not found in reliable buffer"<<std::endl);
@ -325,15 +325,21 @@ void ReliablePacketBuffer::insert(BufferedPacketPtr &p_ptr, u16 next_expected)
) )
{ {
/* if this happens your maximum transfer window may be to big */ /* if this happens your maximum transfer window may be to big */
fprintf(stderr, char buf[200];
snprintf(buf, sizeof(buf),
"Duplicated seqnum %d non matching packet detected:\n", "Duplicated seqnum %d non matching packet detected:\n",
seqnum); seqnum);
fprintf(stderr, "Old: seqnum: %05d size: %04zu, address: %s\n", warningstream << buf;
snprintf(buf, sizeof(buf),
"Old: seqnum: %05d size: %04zu, address: %s\n",
i->getSeqnum(), i->size(), i->getSeqnum(), i->size(),
i->address.serializeString().c_str()); i->address.serializeString().c_str());
fprintf(stderr, "New: seqnum: %05d size: %04zu, address: %s\n", warningstream << buf;
snprintf(buf, sizeof(buf),
"New: seqnum: %05d size: %04zu, address: %s\n",
p.getSeqnum(), p.size(), p.getSeqnum(), p.size(),
p.address.serializeString().c_str()); p.address.serializeString().c_str());
warningstream << buf << std::flush;
throw IncomingDataCorruption("duplicated packet isn't same as original one"); throw IncomingDataCorruption("duplicated packet isn't same as original one");
} }
} }
@ -357,11 +363,11 @@ void ReliablePacketBuffer::incrementTimeouts(float dtime)
} }
} }
std::list<ConstSharedPtr<BufferedPacket>> std::vector<ConstSharedPtr<BufferedPacket>>
ReliablePacketBuffer::getTimedOuts(float timeout, u32 max_packets) ReliablePacketBuffer::getTimedOuts(float timeout, u32 max_packets)
{ {
MutexAutoLock listlock(m_list_mutex); MutexAutoLock listlock(m_list_mutex);
std::list<ConstSharedPtr<BufferedPacket>> timed_outs; std::vector<ConstSharedPtr<BufferedPacket>> timed_outs;
for (auto &packet : m_list) { for (auto &packet : m_list) {
// resend time scales exponentially with each cycle // resend time scales exponentially with each cycle
const float pkt_timeout = timeout * powf(RESEND_SCALE_BASE, packet->resend_count); const float pkt_timeout = timeout * powf(RESEND_SCALE_BASE, packet->resend_count);
@ -504,9 +510,9 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacketPtr &p_ptr, bool reli
void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
{ {
MutexAutoLock listlock(m_map_mutex);
std::vector<u16> remove_queue; std::vector<u16> remove_queue;
{ {
MutexAutoLock listlock(m_map_mutex);
for (const auto &i : m_buf) { for (const auto &i : m_buf) {
IncomingSplitPacket *p = i.second; IncomingSplitPacket *p = i.second;
// Reliable ones are not removed by timeout // Reliable ones are not removed by timeout
@ -518,10 +524,10 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
} }
} }
for (u16 j : remove_queue) { for (u16 j : remove_queue) {
MutexAutoLock listlock(m_map_mutex);
LOG(dout_con<<"NOTE: Removing timed out unreliable split packet"<<std::endl); LOG(dout_con<<"NOTE: Removing timed out unreliable split packet"<<std::endl);
delete m_buf[j]; auto it = m_buf.find(j);
m_buf.erase(j); delete it->second;
m_buf.erase(it);
} }
} }
@ -967,7 +973,7 @@ void Peer::Drop()
delete this; delete this;
} }
UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection* connection) : UDPPeer::UDPPeer(session_t a_id, Address a_address, Connection* connection) :
Peer(a_address,a_id,connection) Peer(a_address,a_id,connection)
{ {
for (Channel &channel : channels) for (Channel &channel : channels)
@ -1134,8 +1140,6 @@ bool UDPPeer::processReliableSendCommand(
FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error"); FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error");
} }
// DO NOT REMOVE n_queued! It avoids a deadlock of async locked
// 'log_message_mutex' and 'm_list_mutex'.
u32 n_queued = chan.outgoing_reliables_sent.size(); u32 n_queued = chan.outgoing_reliables_sent.size();
LOG(dout_con<<m_connection->getDesc() LOG(dout_con<<m_connection->getDesc()
@ -1339,7 +1343,7 @@ PeerHelper Connection::getPeerNoEx(session_t peer_id)
} }
/* find peer_id for address */ /* find peer_id for address */
u16 Connection::lookupPeer(Address& sender) session_t Connection::lookupPeer(const Address& sender)
{ {
MutexAutoLock peerlock(m_peers_mutex); MutexAutoLock peerlock(m_peers_mutex);
std::map<u16, Peer*>::iterator j; std::map<u16, Peer*>::iterator j;
@ -1559,7 +1563,7 @@ float Connection::getLocalStat(rate_stat_type type)
return retval; return retval;
} }
u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd) session_t Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
{ {
// Somebody wants to make a new connection // Somebody wants to make a new connection

@ -250,8 +250,6 @@ private:
for fast access to the smallest one. for fast access to the smallest one.
*/ */
typedef std::list<BufferedPacketPtr>::iterator RPBSearchResult;
class ReliablePacketBuffer class ReliablePacketBuffer
{ {
public: public:
@ -264,7 +262,7 @@ public:
void insert(BufferedPacketPtr &p_ptr, u16 next_expected); void insert(BufferedPacketPtr &p_ptr, u16 next_expected);
void incrementTimeouts(float dtime); void incrementTimeouts(float dtime);
std::list<ConstSharedPtr<BufferedPacket>> getTimedOuts(float timeout, u32 max_packets); std::vector<ConstSharedPtr<BufferedPacket>> getTimedOuts(float timeout, u32 max_packets);
void print(); void print();
bool empty(); bool empty();
@ -272,7 +270,9 @@ public:
private: private:
RPBSearchResult findPacketNoLock(u16 seqnum); typedef std::list<BufferedPacketPtr>::iterator FindResult;
FindResult findPacketNoLock(u16 seqnum);
std::list<BufferedPacketPtr> m_list; std::list<BufferedPacketPtr> m_list;
@ -743,9 +743,9 @@ public:
protected: protected:
PeerHelper getPeerNoEx(session_t peer_id); PeerHelper getPeerNoEx(session_t peer_id);
u16 lookupPeer(Address& sender); session_t lookupPeer(const Address& sender);
u16 createPeer(Address& sender, MTProtocols protocol, int fd); session_t createPeer(Address& sender, MTProtocols protocol, int fd);
UDPPeer* createServerPeer(Address& sender); UDPPeer* createServerPeer(Address& sender);
bool deletePeer(session_t peer_id, bool timeout); bool deletePeer(session_t peer_id, bool timeout);

@ -44,8 +44,6 @@ namespace con
// TODO: Clean this up. // TODO: Clean this up.
#define LOG(a) a #define LOG(a) a
#define WINDOW_SIZE 5
static inline session_t readPeerId(const u8 *packetdata) static inline session_t readPeerId(const u8 *packetdata)
{ {
return readU16(&packetdata[4]); return readU16(&packetdata[4]);
@ -519,9 +517,9 @@ void ConnectionSendThread::serve(Address bind_address)
void ConnectionSendThread::connect(Address address) void ConnectionSendThread::connect(Address address)
{ {
LOG(dout_con << m_connection->getDesc() << " connecting to " dout_con << m_connection->getDesc() << " connecting to ";
<< address.serializeString() address.print(dout_con);
<< ":" << address.getPort() << std::endl); dout_con << std::endl;
UDPPeer *peer = m_connection->createServerPeer(address); UDPPeer *peer = m_connection->createServerPeer(address);
@ -529,11 +527,10 @@ void ConnectionSendThread::connect(Address address)
m_connection->putEvent(ConnectionEvent::peerAdded(peer->id, peer->address)); m_connection->putEvent(ConnectionEvent::peerAdded(peer->id, peer->address));
Address bind_addr; Address bind_addr;
if (address.isIPv6()) if (address.isIPv6())
bind_addr.setAddress((IPv6AddressBytes *) NULL); bind_addr.setAddress(static_cast<IPv6AddressBytes*>(nullptr));
else else
bind_addr.setAddress(0, 0, 0, 0); bind_addr.setAddress(static_cast<u32>(0));
m_connection->m_udpSocket.Bind(bind_addr); m_connection->m_udpSocket.Bind(bind_addr);
@ -951,9 +948,9 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
session_t peer_id = readPeerId(*packetdata); session_t peer_id = readPeerId(*packetdata);
u8 channelnum = readChannel(*packetdata); u8 channelnum = readChannel(*packetdata);
if (channelnum > CHANNEL_COUNT - 1) { if (channelnum >= CHANNEL_COUNT) {
LOG(derr_con << m_connection->getDesc() LOG(derr_con << m_connection->getDesc()
<< "Receive(): Invalid channel " << (u32)channelnum << std::endl); << "Receive(): Invalid channel " << (int)channelnum << std::endl);
return; return;
} }
@ -1008,15 +1005,14 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
peer->ResetTimeout(); peer->ResetTimeout();
} }
Channel *channel = nullptr; auto *udpPeer = dynamic_cast<UDPPeer *>(&peer);
if (dynamic_cast<UDPPeer *>(&peer)) { if (!udpPeer) {
channel = &dynamic_cast<UDPPeer *>(&peer)->channels[channelnum];
} else {
LOG(derr_con << m_connection->getDesc() LOG(derr_con << m_connection->getDesc()
<< "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!" << "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!"
" Ignoring." << std::endl); " Ignoring." << std::endl);
return; return;
} }
Channel *channel = &udpPeer->channels[channelnum];
channel->UpdateBytesReceived(received_size); channel->UpdateBytesReceived(received_size);