misc buffer-math bugfixes + increased performance
This commit is contained in:
@@ -39,7 +39,6 @@
|
||||
#include "pico_eth.h"
|
||||
|
||||
namespace ZeroTier {
|
||||
|
||||
// This may be removed in production
|
||||
void check_buffer_states(Connection *conn)
|
||||
{
|
||||
@@ -158,8 +157,8 @@ namespace ZeroTier {
|
||||
do {
|
||||
int avail = DEFAULT_TCP_RX_BUF_SZ - conn->rxsz;
|
||||
if(avail) {
|
||||
// r = tap->picostack->__pico_socket_read(s, conn->rxbuf + (conn->rxsz), ZT_MAX_MTU);
|
||||
r = tap->picostack->__pico_socket_recvfrom(s, conn->rxbuf + (conn->rxsz), ZT_MAX_MTU, (void *)&peer.ip4.addr, &port);
|
||||
// r = tap->picostack->__pico_socket_read(s, conn->rxbuf + (conn->rxsz), SDK_MTU);
|
||||
r = tap->picostack->__pico_socket_recvfrom(s, conn->rxbuf + (conn->rxsz), SDK_MTU, (void *)&peer.ip4.addr, &port);
|
||||
// DEBUG_ATTN("received packet (%d byte) from %08X:%u", r, long_be2(peer.ip4.addr), short_be(port));
|
||||
tap->_phy.setNotifyWritable(conn->sock, true);
|
||||
//DEBUG_EXTRA("read=%d", r);
|
||||
@@ -192,23 +191,22 @@ namespace ZeroTier {
|
||||
{
|
||||
Connection *conn = tap->getConnection(s);
|
||||
if(conn) {
|
||||
|
||||
|
||||
uint16_t port = 0;
|
||||
union {
|
||||
struct pico_ip4 ip4;
|
||||
struct pico_ip6 ip6;
|
||||
} peer;
|
||||
|
||||
char tmpbuf[ZT_MAX_MTU];
|
||||
int tot = 0;
|
||||
char tmpbuf[SDK_MTU];
|
||||
unsigned char *addr_pos, *sz_pos, *payload_pos;
|
||||
struct sockaddr_in addr_in;
|
||||
addr_in.sin_addr.s_addr = peer.ip4.addr;
|
||||
addr_in.sin_port = port;
|
||||
|
||||
// RX
|
||||
int r = tap->picostack->__pico_socket_recvfrom(s, tmpbuf, ZT_MAX_MTU, (void *)&peer.ip4.addr, &port);
|
||||
DEBUG_EXTRA("read=%d", r);
|
||||
int r = tap->picostack->__pico_socket_recvfrom(s, tmpbuf, SDK_MTU, (void *)&peer.ip4.addr, &port);
|
||||
DEBUG_FLOW(" [ RXBUF <- STACK] Receiving (%d) from stack, copying to receving buffer", r);
|
||||
|
||||
// Mutex::Lock _l2(tap->_rx_buf_m);
|
||||
// struct sockaddr_in6 addr_in6;
|
||||
@@ -216,23 +214,25 @@ namespace ZeroTier {
|
||||
// addr_in6.sin6_port = Utils::ntoh(s->remote_port);
|
||||
// DEBUG_ATTN("remote_port=%d, local_port=%d", s->remote_port, Utils::ntoh(s->local_port));
|
||||
|
||||
picotap->_rx_buf_m.lock();
|
||||
|
||||
if(conn->rxsz == DEFAULT_UDP_RX_BUF_SZ) { // if UDP buffer full
|
||||
DEBUG_INFO("UDP RX buffer full. Discarding oldest payload segment");
|
||||
memmove(conn->rxbuf, conn->rxbuf + ZT_MAX_MTU, DEFAULT_UDP_RX_BUF_SZ - ZT_MAX_MTU);
|
||||
addr_pos = conn->rxbuf + (DEFAULT_UDP_RX_BUF_SZ - ZT_MAX_MTU); // TODO:
|
||||
DEBUG_FLOW(" [ RXBUF <- STACK] UDP RX buffer full. Discarding oldest payload segment");
|
||||
memmove(conn->rxbuf, conn->rxbuf + SDK_MTU, DEFAULT_UDP_RX_BUF_SZ - SDK_MTU);
|
||||
addr_pos = conn->rxbuf + (DEFAULT_UDP_RX_BUF_SZ - SDK_MTU); // TODO:
|
||||
sz_pos = addr_pos + sizeof(struct sockaddr_storage);
|
||||
conn->rxsz -= ZT_MAX_MTU;
|
||||
conn->rxsz -= SDK_MTU;
|
||||
}
|
||||
else {
|
||||
addr_pos = conn->rxbuf + conn->rxsz; // where we'll prepend the size of the address
|
||||
sz_pos = addr_pos + sizeof(struct sockaddr_storage);
|
||||
}
|
||||
payload_pos = addr_pos + sizeof(struct sockaddr_storage) + sizeof(tot);
|
||||
payload_pos = addr_pos + sizeof(struct sockaddr_storage) + sizeof(r);
|
||||
memcpy(addr_pos, &addr_in, sizeof(struct sockaddr_storage));
|
||||
|
||||
// Adjust buffer size
|
||||
if(r) {
|
||||
conn->rxsz += ZT_MAX_MTU;
|
||||
conn->rxsz += SDK_MTU;
|
||||
memcpy(sz_pos, &r, sizeof(r));
|
||||
tap->phyOnUnixWritable(conn->sock, NULL, true);
|
||||
//tap->_phy.setNotifyWritable(conn->sock, false);
|
||||
@@ -241,6 +241,10 @@ namespace ZeroTier {
|
||||
DEBUG_ERROR("unable to read from picosock=%p", s);
|
||||
}
|
||||
memcpy(payload_pos, tmpbuf, r); // write payload to app's socket
|
||||
//DEBUG_EXTRA(" Copied onto rxbuf (%d) from stack socket", r);
|
||||
|
||||
picotap->_rx_buf_m.unlock();
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -257,7 +261,7 @@ namespace ZeroTier {
|
||||
|
||||
// Only called from a locked context, no need to lock anything
|
||||
if(conn->txsz > 0) {
|
||||
int r, max_write_len = conn->txsz < ZT_MAX_MTU ? conn->txsz : ZT_MAX_MTU;
|
||||
int r, max_write_len = conn->txsz < SDK_MTU ? conn->txsz : SDK_MTU;
|
||||
if((r = tap->picostack->__pico_socket_write(s, &conn->txbuf, max_write_len)) < 0) {
|
||||
DEBUG_ERROR("unable to write to picosock=%p", s);
|
||||
return;
|
||||
@@ -393,10 +397,6 @@ namespace ZeroTier {
|
||||
// Since picoTCP only allows the reception of frames from within the polling function, we
|
||||
// must enqueue each frame into a memory structure shared by both threads. This structure will
|
||||
Mutex::Lock _l(tap->_pico_frame_rxbuf_m);
|
||||
if(len > ((1024 * 1024) - tap->pico_frame_rxbuf_tot)) {
|
||||
DEBUG_ERROR("dropping packet (len = %d) - not enough space left on RX frame buffer", len);
|
||||
return;
|
||||
}
|
||||
//if(len != memcpy(pico_frame_rxbuf, data, len)) {
|
||||
// DEBUG_ERROR("dropping packet (len = %d) - unable to copy contents of frame to RX frame buffer", len);
|
||||
// return;
|
||||
@@ -407,14 +407,18 @@ namespace ZeroTier {
|
||||
from.copyTo(ethhdr.saddr, 6);
|
||||
to.copyTo(ethhdr.daddr, 6);
|
||||
ethhdr.proto = Utils::hton((uint16_t)etherType);
|
||||
int newlen = len+sizeof(struct pico_eth_hdr);
|
||||
//
|
||||
memcpy(tap->pico_frame_rxbuf + tap->pico_frame_rxbuf_tot, &newlen, sizeof(newlen)); // size of frame
|
||||
|
||||
int newlen = len + sizeof(int) + sizeof(struct pico_eth_hdr);
|
||||
if(newlen > (MAX_PICO_FRAME_RX_BUF_SZ-tap->pico_frame_rxbuf_tot)) {
|
||||
DEBUG_ERROR("dropping packet (len = %d) - not enough space left on RX frame buffer", len);
|
||||
return;
|
||||
}
|
||||
memcpy(tap->pico_frame_rxbuf + tap->pico_frame_rxbuf_tot, &newlen, sizeof(newlen)); // size of frame + meta
|
||||
memcpy(tap->pico_frame_rxbuf + tap->pico_frame_rxbuf_tot + sizeof(newlen), ðhdr, sizeof(ethhdr)); // new eth header
|
||||
memcpy(tap->pico_frame_rxbuf + tap->pico_frame_rxbuf_tot + sizeof(newlen) + sizeof(ethhdr), data, len); // frame data
|
||||
tap->pico_frame_rxbuf_tot += len + sizeof(len) + sizeof(ethhdr);
|
||||
// DEBUG_INFO("RX frame buffer %3f full", (float)pico_frame_rxbuf_tot / (float)(1024 * 1024));
|
||||
// DEBUG_INFO("len=%d", len);
|
||||
|
||||
tap->pico_frame_rxbuf_tot += newlen;
|
||||
DEBUG_FLOW(" [ ZTWIRE -> FBUF ] Moved FRAME(sz=%d) into FBUF(sz=%d), data_len=%d, ethhdr.proto=%d", newlen, picotap->pico_frame_rxbuf_tot, len, ethhdr.proto);
|
||||
}
|
||||
|
||||
// Called periodically by the stack, this removes data from the locked memory buffer and feeds it into the stack.
|
||||
@@ -431,25 +435,25 @@ namespace ZeroTier {
|
||||
// OPTIMIZATION: The copy logic and/or buffer structure should be reworked for better performance after the BETA
|
||||
// NetconEthernetTap *tap = (NetconEthernetTap*)netif->state;
|
||||
Mutex::Lock _l(picotap->_pico_frame_rxbuf_m);
|
||||
unsigned char frame[ZT_MAX_MTU];
|
||||
uint32_t len;
|
||||
|
||||
unsigned char frame[SDK_MTU];
|
||||
int len;
|
||||
//DEBUG_INFO(" [ FBUF -> STACK] Frame buffer SZ=%d", picotap->pico_frame_rxbuf_tot);
|
||||
while (picotap->pico_frame_rxbuf_tot > 0) {
|
||||
memset(frame, 0, sizeof(frame));
|
||||
len = 0;
|
||||
memcpy(&len, picotap->pico_frame_rxbuf, sizeof(len)); // get frame len
|
||||
/*
|
||||
if(len > ZT_MAX_MTU * 10) // FIXME: Remove or update {
|
||||
DEBUG_ERROR("len seems to be an unreasonable value, dumping entire buffer...");
|
||||
memset(picotap->pico_frame_rxbuf, 0, MAX_PICO_FRAME_RX_BUF_SZ);
|
||||
picotap->pico_frame_rxbuf_tot = 0;
|
||||
|
||||
if(len >= 0) {
|
||||
DEBUG_FLOW(" [ FBUF -> STACK] Moving FRAME of size (%d) from FBUF(sz=%d) into stack",len, picotap->pico_frame_rxbuf_tot-len);
|
||||
memcpy(frame, picotap->pico_frame_rxbuf + sizeof(len), len-(sizeof(len)) ); // get frame data
|
||||
//memset(picotap->pico_frame_rxbuf, 0, len); // FIXME: Candidate for removal
|
||||
memmove(picotap->pico_frame_rxbuf, picotap->pico_frame_rxbuf + len, MAX_PICO_FRAME_RX_BUF_SZ-len);
|
||||
picotap->picostack->__pico_stack_recv(dev, (uint8_t*)frame, (len-sizeof(len)));
|
||||
picotap->pico_frame_rxbuf_tot-=len;
|
||||
}
|
||||
*/
|
||||
if(len > 0) {
|
||||
memcpy(frame, picotap->pico_frame_rxbuf + sizeof(len), len); // get frame data
|
||||
memmove(picotap->pico_frame_rxbuf, picotap->pico_frame_rxbuf + sizeof(len) + len, ZT_MAX_MTU-(sizeof(len) + len));
|
||||
picotap->picostack->__pico_stack_recv(dev, (uint8_t*)frame, len);
|
||||
picotap->pico_frame_rxbuf_tot-=(sizeof(len) + len);
|
||||
else {
|
||||
DEBUG_ERROR("Skipping frame of size (%d)",len);
|
||||
exit(0);
|
||||
}
|
||||
loop_score--;
|
||||
}
|
||||
@@ -487,6 +491,7 @@ namespace ZeroTier {
|
||||
// newConn->peer_addr = NULL;
|
||||
newConn->picosock = psock;
|
||||
picotap->_Connections.push_back(newConn);
|
||||
memset(newConn->rxbuf, 0, DEFAULT_UDP_RX_BUF_SZ);
|
||||
return newConn;
|
||||
}
|
||||
else {
|
||||
@@ -510,7 +515,7 @@ namespace ZeroTier {
|
||||
return;
|
||||
}
|
||||
|
||||
int max, r, max_write_len = conn->txsz < ZT_MAX_MTU ? conn->txsz : ZT_MAX_MTU;
|
||||
int max, r, max_write_len = conn->txsz < SDK_MTU ? conn->txsz : SDK_MTU;
|
||||
if((r = picotap->picostack->__pico_socket_write(conn->picosock, &conn->txbuf, max_write_len)) < 0) {
|
||||
DEBUG_ERROR("unable to write to picosock=%p, r=%d", (conn->picosock), r);
|
||||
return;
|
||||
@@ -682,37 +687,48 @@ namespace ZeroTier {
|
||||
picotap->_rx_buf_m.lock();
|
||||
}
|
||||
|
||||
DEBUG_ATTN();
|
||||
Connection *conn = picotap->getConnection(sock);
|
||||
if(conn && conn->rxsz) {
|
||||
float max = conn->type == SOCK_STREAM ? (float)DEFAULT_TCP_RX_BUF_SZ : (float)DEFAULT_UDP_RX_BUF_SZ;
|
||||
int n = -1;
|
||||
// extract address and payload size info
|
||||
|
||||
if(conn->type==SOCK_DGRAM) {
|
||||
n = picotap->_phy.streamSend(conn->sock, conn->rxbuf, ZT_MAX_MTU);
|
||||
DEBUG_EXTRA("SOCK_DGRAM, conn=%p, physock=%p", conn, sock);
|
||||
|
||||
// Try to write SDK_MTU-sized chunk to app socket
|
||||
int total_written = 0;
|
||||
while(total_written < SDK_MTU) {
|
||||
n = picotap->_phy.streamSend(conn->sock, (conn->rxbuf)+total_written, SDK_MTU);
|
||||
total_written += n;
|
||||
}
|
||||
|
||||
//DEBUG_EXTRA("SOCK_DGRAM, conn=%p, physock=%p", conn, sock);
|
||||
int payload_sz, addr_sz_offset = sizeof(struct sockaddr_storage);
|
||||
memcpy(&payload_sz, conn->rxbuf + addr_sz_offset, sizeof(int));
|
||||
struct sockaddr_storage addr;
|
||||
memcpy(&addr, conn->rxbuf, addr_sz_offset);
|
||||
// adjust buffer
|
||||
if(conn->rxsz-n > 0) // If more remains on buffer
|
||||
memcpy(conn->rxbuf, conn->rxbuf+ZT_MAX_MTU, conn->rxsz - ZT_MAX_MTU);
|
||||
conn->rxsz -= ZT_MAX_MTU;
|
||||
DEBUG_FLOW(" [ ZTSOCK <- RXBUF] Copying data from receiving buffer to ZT-controlled app socket (n=%d, payload_sz=%d)", n, payload_sz);
|
||||
if(conn->rxsz-n > 0) { // If more remains on buffer
|
||||
memcpy(conn->rxbuf, conn->rxbuf+SDK_MTU, conn->rxsz - SDK_MTU);
|
||||
DEBUG_FLOW(" [ ZTSOCK <- RXBUF] Data(%d) still on buffer, moving it up by one MTU", conn->rxsz-n);
|
||||
////memset(conn->rxbuf, 0, DEFAULT_UDP_RX_BUF_SZ);
|
||||
////conn->rxsz=SDK_MTU;
|
||||
}
|
||||
conn->rxsz -= SDK_MTU;
|
||||
DEBUG_FLOW(" [ ZTSOCK <- RXBUF] conn->rxsz=%d", conn->rxsz);
|
||||
}
|
||||
|
||||
if(conn->type==SOCK_STREAM) {
|
||||
n = picotap->_phy.streamSend(conn->sock, conn->rxbuf, conn->rxsz);
|
||||
DEBUG_EXTRA("SOCK_STREAM, conn=%p, physock=%p, n=%d", conn, sock, n);
|
||||
//DEBUG_EXTRA("SOCK_STREAM, conn=%p, physock=%p, n=%d", conn, sock, n);
|
||||
if(conn->rxsz-n > 0) // If more remains on buffer
|
||||
memcpy(conn->rxbuf, conn->rxbuf+n, conn->rxsz - n);
|
||||
conn->rxsz -= n;
|
||||
}
|
||||
if(n) {
|
||||
if(conn->type==SOCK_STREAM) {
|
||||
DEBUG_TRANS("[TCP RX] <--- :: {TX: %.3f%%, RX: %.3f%%, physock=%p} :: %d bytes",
|
||||
(float)conn->txsz / max, (float)conn->rxsz / max, conn->sock, n);
|
||||
//DEBUG_TRANS("[TCP RX] <--- :: {TX: %.3f%%, RX: %.3f%%, physock=%p} :: %d bytes",
|
||||
// (float)conn->txsz / max, (float)conn->rxsz / max, conn->sock, n);
|
||||
}
|
||||
if(conn->rxsz == 0) {
|
||||
picotap->_phy.setNotifyWritable(sock, false);
|
||||
|
||||
Reference in New Issue
Block a user