fixed longstanding buffer logic bug

This commit is contained in:
Joseph Henry
2017-03-20 07:54:08 -07:00
parent 2f57ea600a
commit db3bf75b7d
6 changed files with 55 additions and 105 deletions

View File

@@ -57,7 +57,6 @@ namespace ZeroTier {
void pico_init_interface(NetconEthernetTap *tap, const InetAddress &ip)
{
picoTCP_stack *stack = tap->picostack;
DEBUG_INFO();
if (std::find(picotap->_ips.begin(),picotap->_ips.end(),ip) == picotap->_ips.end()) {
picotap->_ips.push_back(ip);
std::sort(picotap->_ips.begin(),picotap->_ips.end());
@@ -128,7 +127,6 @@ namespace ZeroTier {
// After this step, buffer will be emptied periodically by pico_handleRead()
void pico_cb_tcp_read(NetconEthernetTap *tap, struct pico_socket *s)
{
DEBUG_INFO();
Connection *conn = tap->getConnection(s);
if(conn) {
int r;
@@ -171,8 +169,6 @@ namespace ZeroTier {
//
void pico_cb_udp_read(NetconEthernetTap *tap, struct pico_socket *s)
{
DEBUG_INFO();
Connection *conn = tap->getConnection(s);
if(conn) {
@@ -190,16 +186,14 @@ namespace ZeroTier {
// RX
int r = tap->picostack->__pico_socket_recvfrom(s, tmpbuf, SDK_MTU, (void *)&peer.ip4.addr, &port);
DEBUG_FLOW(" [ RXBUF <- STACK] Receiving (%d) from stack, copying to receving buffer", r);
//DEBUG_FLOW(" [ RXBUF <- STACK] Receiving (%d) from stack, copying to receving buffer", r);
// Mutex::Lock _l2(tap->_rx_buf_m);
// struct sockaddr_in6 addr_in6;
// addr_in6.sin6_addr.s6_addr;
// addr_in6.sin6_port = Utils::ntoh(s->remote_port);
// DEBUG_ATTN("remote_port=%d, local_port=%d", s->remote_port, Utils::ntoh(s->local_port));
picotap->_rx_buf_m.lock();
if(conn->rxsz == DEFAULT_UDP_RX_BUF_SZ) { // if UDP buffer full
//DEBUG_FLOW(" [ RXBUF <- STACK] UDP RX buffer full. Discarding oldest payload segment");
memmove(conn->rxbuf, conn->rxbuf + SDK_MTU, DEFAULT_UDP_RX_BUF_SZ - SDK_MTU);
@@ -214,19 +208,22 @@ namespace ZeroTier {
payload_pos = addr_pos + sizeof(struct sockaddr_storage) + sizeof(r);
memcpy(addr_pos, &addr_in, sizeof(struct sockaddr_storage));
memcpy(payload_pos, tmpbuf, r); // write payload to app's socket
// Adjust buffer size
if(r) {
conn->rxsz += SDK_MTU;
memcpy(sz_pos, &r, sizeof(r));
tap->phyOnUnixWritable(conn->sock, NULL, true);
//tap->_phy.setNotifyWritable(conn->sock, false);
}
if (r < 0) {
if (r < 0) {
DEBUG_ERROR("unable to read from picosock=%p", s);
}
memcpy(payload_pos, tmpbuf, r); // write payload to app's socket
picotap->_rx_buf_m.unlock();
// TODO: Revisit logic
if(r)
tap->phyOnUnixWritable(conn->sock, NULL, true);
//DEBUG_EXTRA(" Copied onto rxbuf (%d) from stack socket", r);
picotap->_rx_buf_m.unlock();
return;
}
}
@@ -239,8 +236,6 @@ namespace ZeroTier {
DEBUG_ERROR("invalid connection");
if(!conn->txsz)
return;
DEBUG_INFO("txsz=%d bytes ready to be written", conn->txsz);
// Only called from a locked context, no need to lock anything
if(conn->txsz > 0) {
int r, max_write_len = conn->txsz < SDK_MTU ? conn->txsz : SDK_MTU;
@@ -262,7 +257,6 @@ namespace ZeroTier {
// Main callback for TCP connections
void pico_cb_socket_activity(uint16_t ev, struct pico_socket *s)
{
DEBUG_INFO();
int err;
Mutex::Lock _l(picotap->_tcpconns_m);
Connection *conn = picotap->getConnection(s);
@@ -313,7 +307,6 @@ namespace ZeroTier {
}
return;
}
// Read from picoTCP socket
if (ev & PICO_SOCK_EV_RD) {
if(conn->type==SOCK_STREAM)
@@ -352,7 +345,6 @@ namespace ZeroTier {
// -----------------------------------------
int pico_eth_send(struct pico_device *dev, void *buf, int len)
{
//DEBUG_INFO("len=%d", len);
struct pico_eth_hdr *ethhdr;
ethhdr = (struct pico_eth_hdr *)buf;
@@ -376,7 +368,6 @@ namespace ZeroTier {
// It will then periodically be transfered into the network stack via pico_eth_poll()
void pico_rx(NetconEthernetTap *tap, const MAC &from,const MAC &to,unsigned int etherType,const void *data,unsigned int len)
{
DEBUG_INFO();
// Since picoTCP only allows the reception of frames from within the polling function, we
// must enqueue each frame into a memory structure shared by both threads. This structure will
Mutex::Lock _l(tap->_pico_frame_rxbuf_m);
@@ -392,7 +383,7 @@ namespace ZeroTier {
while(newlen > (MAX_PICO_FRAME_RX_BUF_SZ-tap->pico_frame_rxbuf_tot) && ethhdr.proto == 56710)
{
mylen = 0;
DEBUG_ERROR(" [ ZTWIRE -> FBUF ] not enough space left on RX frame buffer, dropping oldest packet in buffer");
//DEBUG_FLOW(" [ ZTWIRE -> FBUF ] not enough space left on RX frame buffer, dropping oldest packet in buffer");
/*
memcpy(&mylen, picotap->pico_frame_rxbuf, sizeof(len));
memmove(tap->pico_frame_rxbuf, tap->pico_frame_rxbuf + mylen, MAX_PICO_FRAME_RX_BUF_SZ-mylen); // shift buffer
@@ -406,28 +397,6 @@ namespace ZeroTier {
memcpy(tap->pico_frame_rxbuf + tap->pico_frame_rxbuf_tot + sizeof(newlen) + sizeof(ethhdr), data, len); // frame data
tap->pico_frame_rxbuf_tot += newlen;
DEBUG_FLOW(" [ ZTWIRE -> FBUF ] Move FRAME(sz=%d) into FBUF(sz=%d), data_len=%d", newlen, picotap->pico_frame_rxbuf_tot, len);
/*
char graph[GRAPH_BUF_SZ];
gengraph(&graph, GRAPH_BUF_SZ, '|', 0.6);
DEBUG_FLOW(graph);
*/
//}
//else
//{
/*
if(newlen > (MAX_PICO_FRAME_RX_BUF_SZ-tap->pico_frame_rxbuf_tot)) {
DEBUG_ERROR("dropping packet (len = %d) - not enough space left on RX frame buffer", len);
return;
}
memcpy(tap->pico_frame_rxbuf + tap->pico_frame_rxbuf_tot, &newlen, sizeof(newlen)); // size of frame + meta
memcpy(tap->pico_frame_rxbuf + tap->pico_frame_rxbuf_tot + sizeof(newlen), &ethhdr, sizeof(ethhdr)); // new eth header
memcpy(tap->pico_frame_rxbuf + tap->pico_frame_rxbuf_tot + sizeof(newlen) + sizeof(ethhdr), data, len); // frame data
tap->pico_frame_rxbuf_tot += newlen;
DEBUG_FLOW(" [ ZTWIRE -> FBUF ] Moved FRAME(sz=%d) into FBUF(sz=%d), data_len=%d, ethhdr.proto=%d", newlen, picotap->pico_frame_rxbuf_tot, len, ethhdr.proto);
*/
//}
}
// Called periodically by the stack, this removes data from the locked memory buffer (FBUF) and feeds it into the stack.
@@ -440,19 +409,18 @@ namespace ZeroTier {
// -----------------------------------------
int pico_eth_poll(struct pico_device *dev, int loop_score)
{
//DEBUG_ERROR();
// OPTIMIZATION: The copy logic and/or buffer structure should be reworked for better performance after the BETA
// NetconEthernetTap *tap = (NetconEthernetTap*)netif->state;
Mutex::Lock _l(picotap->_pico_frame_rxbuf_m);
unsigned char frame[SDK_MTU];
int len;
while (picotap->pico_frame_rxbuf_tot > 0 && loop_score > 0) {
DEBUG_INFO(" [ FBUF -> STACK] Frame buffer SZ=%d", picotap->pico_frame_rxbuf_tot);
//DEBUG_FLOW(" [ FBUF -> STACK] Frame buffer SZ=%d", picotap->pico_frame_rxbuf_tot);
memset(frame, 0, sizeof(frame));
len = 0;
memcpy(&len, picotap->pico_frame_rxbuf, sizeof(len)); // get frame len
if(len >= 0) {
DEBUG_FLOW(" [ FBUF -> STACK] Moving FRAME of size (%d) from FBUF(sz=%d) into stack",len, picotap->pico_frame_rxbuf_tot-len);
//DEBUG_FLOW(" [ FBUF -> STACK] Moving FRAME of size (%d) from FBUF(sz=%d) into stack",len, picotap->pico_frame_rxbuf_tot-len);
memcpy(frame, picotap->pico_frame_rxbuf + sizeof(len), len-(sizeof(len)) ); // get frame data
memmove(picotap->pico_frame_rxbuf, picotap->pico_frame_rxbuf + len, MAX_PICO_FRAME_RX_BUF_SZ-len); // shift buffer
picotap->picostack->__pico_stack_recv(dev, (uint8_t*)frame, (len-sizeof(len)));
@@ -470,7 +438,6 @@ namespace ZeroTier {
// Creates a new pico_socket and Connection object to represent a new connection to be.
Connection *pico_handleSocket(PhySocket *sock, void **uptr, struct socket_st* socket_rpc)
{
DEBUG_INFO();
struct pico_socket * psock;
int protocol, protocol_version;
@@ -494,33 +461,33 @@ namespace ZeroTier {
*uptr = newConn;
newConn->type = socket_rpc->socket_type;
newConn->sock = sock;
/*
/*
int res = 0;
int sendbuff = UNIX_SOCK_BUF_SIZE;
socklen_t optlen = sizeof(sendbuff);
res = setsockopt(picotap->_phy.getDescriptor(sock), SOL_SOCKET, SO_RCVBUF, &sendbuff, sizeof(sendbuff));
if(res == -1)
DEBUG_ERROR("Error while setting RX buffer limits");
//DEBUG_ERROR("Error while setting RX buffer limits");
res = setsockopt(picotap->_phy.getDescriptor(sock), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff));
if(res == -1)
DEBUG_ERROR("Error while setting TX buffer limits");
//DEBUG_ERROR("Error while setting TX buffer limits");
// Get buffer size
// optlen = sizeof(sendbuff);
// res = getsockopt(picotap->_phy.getDescriptor(sock), SOL_SOCKET, SO_SNDBUF, &sendbuff, &optlen);
// DEBUG_INFO("buflen=%d", sendbuff);
*/
*/
newConn->local_addr = NULL;
// newConn->peer_addr = NULL;
newConn->picosock = psock;
picotap->_Connections.push_back(newConn);
memset(newConn->rxbuf, 0, DEFAULT_UDP_RX_BUF_SZ);
return newConn;
}
else {
else
DEBUG_ERROR("failed to create pico_socket");
}
return NULL;
}
@@ -533,7 +500,6 @@ namespace ZeroTier {
// -----------------------------------------
void pico_handleWrite(Connection *conn)
{
//DEBUG_INFO();
if(!conn || !conn->picosock) {
DEBUG_ERROR(" invalid connection");
return;
@@ -579,15 +545,14 @@ namespace ZeroTier {
}
if(conn->type == SOCK_DGRAM) {
max = DEFAULT_UDP_TX_BUF_SZ;
//DEBUG_TRANS("[UDP TX] ---> :: {TX: %.3f%%, RX: %.3f%%, physock=%p} :: %d bytes",
// (float)conn->txsz / (float)max, (float)conn->rxsz / max, conn->sock, r);
DEBUG_TRANS("[UDP TX] ---> :: {TX: %.3f%%, RX: %.3f%%, physock=%p} :: %d bytes",
(float)conn->txsz / (float)max, (float)conn->rxsz / max, conn->sock, r);
}
}
// Instructs the stack to connect to a remote host
void pico_handleConnect(PhySocket *sock, PhySocket *rpcSock, Connection *conn, struct connect_st* connect_rpc)
{
//DEBUG_INFO();
if(conn->picosock) {
struct sockaddr_in *addr = (struct sockaddr_in *) &connect_rpc->addr;
int ret;
@@ -612,15 +577,13 @@ namespace ZeroTier {
memcpy(&(conn->peer_addr), &connect_rpc->addr, sizeof(struct sockaddr_storage));
if(ret == PICO_ERR_EPROTONOSUPPORT) {
if(ret == PICO_ERR_EPROTONOSUPPORT)
DEBUG_ERROR("PICO_ERR_EPROTONOSUPPORT");
}
if(ret == PICO_ERR_EINVAL) {
if(ret == PICO_ERR_EINVAL)
DEBUG_ERROR("PICO_ERR_EINVAL");
}
if(ret == PICO_ERR_EHOSTUNREACH) {
if(ret == PICO_ERR_EHOSTUNREACH)
DEBUG_ERROR("PICO_ERR_EHOSTUNREACH");
}
picotap->sendReturnValue(picotap->_phy.getDescriptor(rpcSock), 0, ERR_OK);
}
}
@@ -628,7 +591,6 @@ namespace ZeroTier {
// Instructs the stack to bind to a given address
void pico_handleBind(PhySocket *sock, PhySocket *rpcSock, void **uptr, struct bind_st *bind_rpc)
{
DEBUG_INFO();
Connection *conn = picotap->getConnection(sock);
if(!sock) {
DEBUG_ERROR("invalid connection");
@@ -706,26 +668,35 @@ namespace ZeroTier {
// -----------------------------------------
void pico_handleRead(PhySocket *sock,void **uptr,bool lwip_invoked)
{
DEBUG_INFO();
if(!lwip_invoked) {
// The stack thread writes to RXBUF as well
picotap->_tcpconns_m.lock();
picotap->_rx_buf_m.lock();
}
int tot = 0, n = -1;
int tot = 0, n = -1, write_attempts = 0;
Connection *conn = picotap->getConnection(sock);
if(conn && conn->rxsz) {
float max = conn->type == SOCK_STREAM ? (float)DEFAULT_TCP_RX_BUF_SZ : (float)DEFAULT_UDP_RX_BUF_SZ;
if(conn->type==SOCK_DGRAM) {
//DEBUG_FLOW(" [ ZTSOCK <- RXBUF] attempting write, RXBUF(%d)", conn->rxsz);
//DEBUG_ERROR(" [ ZTSOCK <- RXBUF] attempting write, RXBUF(%d)", conn->rxsz);
// Try to write SDK_MTU-sized chunk to app socket
while(tot < SDK_MTU) {
write_attempts++;
n = picotap->_phy.streamSend(conn->sock, (conn->rxbuf)+tot, SDK_MTU);
tot += n;
DEBUG_FLOW(" [ ZTSOCK <- RXBUF] wrote = %d, total = %d", n, tot);
//DEBUG_ERROR(" [ ZTSOCK <- RXBUF] wrote = %d, total = %d, errno=%d", n, tot, errno);
// If socket is unavailable, attempt to write N times before giving up
if(errno==35) {
if(write_attempts == 1024) {
n = SDK_MTU; // say we wrote it, even though we didn't (drop packet)
tot = SDK_MTU;
}
}
}
// DEBUG_EXTRA("SOCK_DGRAM, conn=%p, physock=%p", conn, sock);
int payload_sz, addr_sz_offset = sizeof(struct sockaddr_storage);
@@ -765,18 +736,17 @@ namespace ZeroTier {
picotap->_phy.setNotifyWritable(sock, false);
}
}
//picotap->_phy.whack();
if(!lwip_invoked) {
picotap->_tcpconns_m.unlock();
picotap->_rx_buf_m.unlock();
}
DEBUG_FLOW(" [ ZTSOCK <- RXBUF] Emitted (%d) from RXBUF(%d) to socket", tot, conn->rxsz);
}
// Closes a pico_socket
void pico_handleClose(PhySocket *sock)
{
DEBUG_INFO();
/*
int ret;
if(conn && conn->picosock) {