minetest/src/connection.h
2012-11-29 22:13:22 +02:00

641 lines
14 KiB
C++

/*
Minetest-c55
Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2.1 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef CONNECTION_HEADER
#define CONNECTION_HEADER
#include "irrlichttypes_bloated.h"
#include "socket.h"
#include "exceptions.h"
#include "constants.h"
#include "util/pointer.h"
#include "util/container.h"
#include "util/thread.h"
#include <iostream>
#include <fstream>
namespace con
{
/*
Exceptions
*/
class NotFoundException : public BaseException
{
public:
NotFoundException(const char *s):
BaseException(s)
{}
};
class PeerNotFoundException : public BaseException
{
public:
PeerNotFoundException(const char *s):
BaseException(s)
{}
};
class ConnectionException : public BaseException
{
public:
ConnectionException(const char *s):
BaseException(s)
{}
};
class ConnectionBindFailed : public BaseException
{
public:
ConnectionBindFailed(const char *s):
BaseException(s)
{}
};
/*class ThrottlingException : public BaseException
{
public:
ThrottlingException(const char *s):
BaseException(s)
{}
};*/
class InvalidIncomingDataException : public BaseException
{
public:
InvalidIncomingDataException(const char *s):
BaseException(s)
{}
};
class InvalidOutgoingDataException : public BaseException
{
public:
InvalidOutgoingDataException(const char *s):
BaseException(s)
{}
};
class NoIncomingDataException : public BaseException
{
public:
NoIncomingDataException(const char *s):
BaseException(s)
{}
};
class ProcessedSilentlyException : public BaseException
{
public:
ProcessedSilentlyException(const char *s):
BaseException(s)
{}
};
#define SEQNUM_MAX 65535
inline bool seqnum_higher(u16 higher, u16 lower)
{
if(lower > higher && lower - higher > SEQNUM_MAX/2){
return true;
}
return (higher > lower);
}
struct BufferedPacket
{
BufferedPacket(u8 *a_data, u32 a_size):
data(a_data, a_size), time(0.0), totaltime(0.0)
{}
BufferedPacket(u32 a_size):
data(a_size), time(0.0), totaltime(0.0)
{}
SharedBuffer<u8> data; // Data of the packet, including headers
float time; // Seconds from buffering the packet or re-sending
float totaltime; // Seconds from buffering the packet
Address address; // Sender or destination
};
// This adds the base headers to the data and makes a packet out of it
BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
u32 protocol_id, u16 sender_peer_id, u8 channel);
BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
u32 protocol_id, u16 sender_peer_id, u8 channel);
// Add the TYPE_ORIGINAL header to the data
SharedBuffer<u8> makeOriginalPacket(
SharedBuffer<u8> data);
// Split data in chunks and add TYPE_SPLIT headers to them
core::list<SharedBuffer<u8> > makeSplitPacket(
SharedBuffer<u8> data,
u32 chunksize_max,
u16 seqnum);
// Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
// Increments split_seqnum if a split packet is made
core::list<SharedBuffer<u8> > makeAutoSplitPacket(
SharedBuffer<u8> data,
u32 chunksize_max,
u16 &split_seqnum);
// Add the TYPE_RELIABLE header to the data
SharedBuffer<u8> makeReliablePacket(
SharedBuffer<u8> data,
u16 seqnum);
struct IncomingSplitPacket
{
IncomingSplitPacket()
{
time = 0.0;
reliable = false;
}
// Key is chunk number, value is data without headers
core::map<u16, SharedBuffer<u8> > chunks;
u32 chunk_count;
float time; // Seconds from adding
bool reliable; // If true, isn't deleted on timeout
bool allReceived()
{
return (chunks.size() == chunk_count);
}
};
/*
=== NOTES ===
A packet is sent through a channel to a peer with a basic header:
TODO: Should we have a receiver_peer_id also?
Header (7 bytes):
[0] u32 protocol_id
[4] u16 sender_peer_id
[6] u8 channel
sender_peer_id:
Unique to each peer.
value 0 is reserved for making new connections
value 1 is reserved for server
channel:
The lower the number, the higher the priority is.
Only channels 0, 1 and 2 exist.
*/
#define BASE_HEADER_SIZE 7
#define PEER_ID_INEXISTENT 0
#define PEER_ID_SERVER 1
#define CHANNEL_COUNT 3
/*
Packet types:
CONTROL: This is a packet used by the protocol.
- When this is processed, nothing is handed to the user.
Header (2 byte):
[0] u8 type
[1] u8 controltype
controltype and data description:
CONTROLTYPE_ACK
[2] u16 seqnum
CONTROLTYPE_SET_PEER_ID
[2] u16 peer_id_new
CONTROLTYPE_PING
- There is no actual reply, but this can be sent in a reliable
packet to get a reply
CONTROLTYPE_DISCO
*/
#define TYPE_CONTROL 0
#define CONTROLTYPE_ACK 0
#define CONTROLTYPE_SET_PEER_ID 1
#define CONTROLTYPE_PING 2
#define CONTROLTYPE_DISCO 3
/*
ORIGINAL: This is a plain packet with no control and no error
checking at all.
- When this is processed, it is directly handed to the user.
Header (1 byte):
[0] u8 type
*/
#define TYPE_ORIGINAL 1
#define ORIGINAL_HEADER_SIZE 1
/*
SPLIT: These are sequences of packets forming one bigger piece of
data.
- When processed and all the packet_nums 0...packet_count-1 are
present (this should be buffered), the resulting data shall be
directly handed to the user.
- If the data fails to come up in a reasonable time, the buffer shall
be silently discarded.
- These can be sent as-is or atop of a RELIABLE packet stream.
Header (7 bytes):
[0] u8 type
[1] u16 seqnum
[3] u16 chunk_count
[5] u16 chunk_num
*/
#define TYPE_SPLIT 2
/*
RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
and they shall be delivered in the same order as sent. This is done
with a buffer in the receiving and transmitting end.
- When this is processed, the contents of each packet is recursively
processed as packets.
Header (3 bytes):
[0] u8 type
[1] u16 seqnum
*/
#define TYPE_RELIABLE 3
#define RELIABLE_HEADER_SIZE 3
//#define SEQNUM_INITIAL 0x10
#define SEQNUM_INITIAL 65500
/*
A buffer which stores reliable packets and sorts them internally
for fast access to the smallest one.
*/
typedef core::list<BufferedPacket>::Iterator RPBSearchResult;
class ReliablePacketBuffer
{
public:
void print();
bool empty();
u32 size();
RPBSearchResult findPacket(u16 seqnum);
RPBSearchResult notFound();
u16 getFirstSeqnum();
BufferedPacket popFirst();
BufferedPacket popSeqnum(u16 seqnum);
void insert(BufferedPacket &p);
void incrementTimeouts(float dtime);
void resetTimedOuts(float timeout);
bool anyTotaltimeReached(float timeout);
core::list<BufferedPacket> getTimedOuts(float timeout);
private:
core::list<BufferedPacket> m_list;
};
/*
A buffer for reconstructing split packets
*/
class IncomingSplitBuffer
{
public:
~IncomingSplitBuffer();
/*
Returns a reference counted buffer of length != 0 when a full split
packet is constructed. If not, returns one of length 0.
*/
SharedBuffer<u8> insert(BufferedPacket &p, bool reliable);
void removeUnreliableTimedOuts(float dtime, float timeout);
private:
// Key is seqnum
core::map<u16, IncomingSplitPacket*> m_buf;
};
class Connection;
struct Channel
{
Channel();
~Channel();
u16 next_outgoing_seqnum;
u16 next_incoming_seqnum;
u16 next_outgoing_split_seqnum;
// This is for buffering the incoming packets that are coming in
// the wrong order
ReliablePacketBuffer incoming_reliables;
// This is for buffering the sent packets so that the sender can
// re-send them if no ACK is received
ReliablePacketBuffer outgoing_reliables;
IncomingSplitBuffer incoming_splits;
};
class Peer;
class PeerHandler
{
public:
PeerHandler()
{
}
virtual ~PeerHandler()
{
}
/*
This is called after the Peer has been inserted into the
Connection's peer container.
*/
virtual void peerAdded(Peer *peer) = 0;
/*
This is called before the Peer has been removed from the
Connection's peer container.
*/
virtual void deletingPeer(Peer *peer, bool timeout) = 0;
};
class Peer
{
public:
Peer(u16 a_id, Address a_address);
virtual ~Peer();
/*
Calculates avg_rtt and resend_timeout.
rtt=-1 only recalculates resend_timeout
*/
void reportRTT(float rtt);
Channel channels[CHANNEL_COUNT];
// Address of the peer
Address address;
// Unique id of the peer
u16 id;
// Seconds from last receive
float timeout_counter;
// Ping timer
float ping_timer;
// This is changed dynamically
float resend_timeout;
// Updated when an ACK is received
float avg_rtt;
// This is set to true when the peer has actually sent something
// with the id we have given to it
bool has_sent_with_id;
float m_sendtime_accu;
float m_max_packets_per_second;
int m_num_sent;
int m_max_num_sent;
// Updated from configuration by Connection
float congestion_control_aim_rtt;
float congestion_control_max_rate;
float congestion_control_min_rate;
private:
};
/*
Connection
*/
struct OutgoingPacket
{
u16 peer_id;
u8 channelnum;
SharedBuffer<u8> data;
bool reliable;
OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
bool reliable_):
peer_id(peer_id_),
channelnum(channelnum_),
data(data_),
reliable(reliable_)
{
}
};
enum ConnectionEventType{
CONNEVENT_NONE,
CONNEVENT_DATA_RECEIVED,
CONNEVENT_PEER_ADDED,
CONNEVENT_PEER_REMOVED,
CONNEVENT_BIND_FAILED,
};
struct ConnectionEvent
{
enum ConnectionEventType type;
u16 peer_id;
Buffer<u8> data;
bool timeout;
Address address;
ConnectionEvent(): type(CONNEVENT_NONE) {}
std::string describe()
{
switch(type){
case CONNEVENT_NONE:
return "CONNEVENT_NONE";
case CONNEVENT_DATA_RECEIVED:
return "CONNEVENT_DATA_RECEIVED";
case CONNEVENT_PEER_ADDED:
return "CONNEVENT_PEER_ADDED";
case CONNEVENT_PEER_REMOVED:
return "CONNEVENT_PEER_REMOVED";
case CONNEVENT_BIND_FAILED:
return "CONNEVENT_BIND_FAILED";
}
return "Invalid ConnectionEvent";
}
void dataReceived(u16 peer_id_, SharedBuffer<u8> data_)
{
type = CONNEVENT_DATA_RECEIVED;
peer_id = peer_id_;
data = data_;
}
void peerAdded(u16 peer_id_, Address address_)
{
type = CONNEVENT_PEER_ADDED;
peer_id = peer_id_;
address = address_;
}
void peerRemoved(u16 peer_id_, bool timeout_, Address address_)
{
type = CONNEVENT_PEER_REMOVED;
peer_id = peer_id_;
timeout = timeout_;
address = address_;
}
void bindFailed()
{
type = CONNEVENT_BIND_FAILED;
}
};
enum ConnectionCommandType{
CONNCMD_NONE,
CONNCMD_SERVE,
CONNCMD_CONNECT,
CONNCMD_DISCONNECT,
CONNCMD_SEND,
CONNCMD_SEND_TO_ALL,
CONNCMD_DELETE_PEER,
};
struct ConnectionCommand
{
enum ConnectionCommandType type;
u16 port;
Address address;
u16 peer_id;
u8 channelnum;
Buffer<u8> data;
bool reliable;
ConnectionCommand(): type(CONNCMD_NONE) {}
void serve(u16 port_)
{
type = CONNCMD_SERVE;
port = port_;
}
void connect(Address address_)
{
type = CONNCMD_CONNECT;
address = address_;
}
void disconnect()
{
type = CONNCMD_DISCONNECT;
}
void send(u16 peer_id_, u8 channelnum_,
SharedBuffer<u8> data_, bool reliable_)
{
type = CONNCMD_SEND;
peer_id = peer_id_;
channelnum = channelnum_;
data = data_;
reliable = reliable_;
}
void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_)
{
type = CONNCMD_SEND_TO_ALL;
channelnum = channelnum_;
data = data_;
reliable = reliable_;
}
void deletePeer(u16 peer_id_)
{
type = CONNCMD_DELETE_PEER;
peer_id = peer_id_;
}
};
class Connection: public SimpleThread
{
public:
Connection(u32 protocol_id, u32 max_packet_size, float timeout);
Connection(u32 protocol_id, u32 max_packet_size, float timeout,
PeerHandler *peerhandler);
~Connection();
void * Thread();
/* Interface */
ConnectionEvent getEvent();
ConnectionEvent waitEvent(u32 timeout_ms);
void putCommand(ConnectionCommand &c);
void SetTimeoutMs(int timeout){ m_bc_receive_timeout = timeout; }
void Serve(unsigned short port);
void Connect(Address address);
bool Connected();
void Disconnect();
u32 Receive(u16 &peer_id, SharedBuffer<u8> &data);
void SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
void Send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
void RunTimeouts(float dtime); // dummy
u16 GetPeerID(){ return m_peer_id; }
Address GetPeerAddress(u16 peer_id);
float GetPeerAvgRTT(u16 peer_id);
void DeletePeer(u16 peer_id);
private:
void putEvent(ConnectionEvent &e);
void processCommand(ConnectionCommand &c);
void send(float dtime);
void receive();
void runTimeouts(float dtime);
void serve(u16 port);
void connect(Address address);
void disconnect();
void sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
void send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
void sendAsPacket(u16 peer_id, u8 channelnum,
SharedBuffer<u8> data, bool reliable);
void rawSendAsPacket(u16 peer_id, u8 channelnum,
SharedBuffer<u8> data, bool reliable);
void rawSend(const BufferedPacket &packet);
Peer* getPeer(u16 peer_id);
Peer* getPeerNoEx(u16 peer_id);
core::list<Peer*> getPeers();
bool getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst);
// Returns next data from a buffer if possible
// If found, returns true; if not, false.
// If found, sets peer_id and dst
bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
SharedBuffer<u8> &dst);
/*
Processes a packet with the basic header stripped out.
Parameters:
packetdata: Data in packet (with no base headers)
peer_id: peer id of the sender of the packet in question
channelnum: channel on which the packet was sent
reliable: true if recursing into a reliable packet
*/
SharedBuffer<u8> processPacket(Channel *channel,
SharedBuffer<u8> packetdata, u16 peer_id,
u8 channelnum, bool reliable);
bool deletePeer(u16 peer_id, bool timeout);
Queue<OutgoingPacket> m_outgoing_queue;
MutexedQueue<ConnectionEvent> m_event_queue;
MutexedQueue<ConnectionCommand> m_command_queue;
u32 m_protocol_id;
u32 m_max_packet_size;
float m_timeout;
UDPSocket m_socket;
u16 m_peer_id;
core::map<u16, Peer*> m_peers;
JMutex m_peers_mutex;
// Backwards compatibility
PeerHandler *m_bc_peerhandler;
int m_bc_receive_timeout;
void SetPeerID(u16 id){ m_peer_id = id; }
u32 GetProtocolID(){ return m_protocol_id; }
void PrintInfo(std::ostream &out);
void PrintInfo();
std::string getDesc();
u16 m_indentation;
};
} // namespace
#endif