tcp_received() fix

This commit is contained in:
Joseph Henry
2017-09-07 18:09:19 -07:00
parent e18c1ccf80
commit 4fd2db7dd6
10 changed files with 223 additions and 124 deletions

View File

@@ -57,13 +57,11 @@ err_t lwip_eth_tx(struct netif *netif, struct pbuf *p)
ZeroTier::VirtualTap *tap = (ZeroTier::VirtualTap*)netif->state;
bufptr = buf;
// Copy data from each pbuf, one at a time
for(q = p; q != NULL; q = q->next) {
memcpy(bufptr, q->payload, q->len);
bufptr += q->len;
totalLength += q->len;
}
// Split ethernet header and feed into handler
struct eth_hdr *ethhdr;
ethhdr = (struct eth_hdr *)buf;
@@ -290,6 +288,7 @@ namespace ZeroTier
if(socket_type == SOCK_STREAM) {
struct tcp_pcb *new_tcp_PCB = tcp_new();
*pcb = new_tcp_PCB;
tcp_nagle_disable(new_tcp_PCB);
return ERR_OK;
}
if(socket_type == SOCK_DGRAM) {
@@ -455,7 +454,7 @@ namespace ZeroTier
int lwIP::lwip_Listen(VirtualSocket *vs, int backlog)
{
DEBUG_INFO("vs=%p", vs);
//DEBUG_INFO("vs=%p", vs);
struct tcp_pcb* listeningPCB;
#ifdef TCP_LISTEN_BACKLOG
listeningPCB = tcp_listen_with_backlog((struct tcp_pcb*)vs->pcb, backlog);
@@ -466,19 +465,20 @@ namespace ZeroTier
vs->pcb = listeningPCB;
tcp_accept(listeningPCB, lwip_cb_accept); // set callback
tcp_arg(listeningPCB, vs);
//fcntl(tap->_phy.getDescriptor(vs->sock), F_SETFL, O_NONBLOCK);
}
return 0;
}
VirtualSocket* lwIP::lwip_Accept(VirtualSocket *vs)
{
//DEBUG_INFO();
if(!vs) {
DEBUG_ERROR("invalid virtual socket");
handle_general_failure();
return NULL;
}
// Retreive first of queued VirtualSockets from parent VirtualSocket
// TODO: check multithreaded behaviour
VirtualSocket *new_vs = NULL;
if(vs->_AcceptedConnections.size()) {
new_vs = vs->_AcceptedConnections.front();
@@ -538,7 +538,6 @@ namespace ZeroTier
}
if(vs->socket_type == SOCK_DGRAM)
{
//DEBUG_ERROR("socket_type==SOCK_DGRAM");
// TODO: Packet re-assembly hasn't yet been tested with lwIP so UDP packets are limited to MTU-sized chunks
int udp_trans_len = std::min(len, (ssize_t)ZT_MAX_MTU);
//DEBUG_EXTRA("allocating pbuf chain of size=%d for UDP packet", udp_trans_len);
@@ -564,7 +563,6 @@ namespace ZeroTier
}
if(vs->socket_type == SOCK_STREAM)
{
DEBUG_ERROR("socket_type==SOCK_STREAM");
// How much we are currently allowed to write to the VirtualSocket
ssize_t sndbuf = ((struct tcp_pcb*)vs->pcb)->snd_buf;
int err, r;
@@ -572,15 +570,16 @@ namespace ZeroTier
// PCB send buffer is full, turn off readability notifications for the
// corresponding PhySocket until lwip_cb_sent() is called and confirms that there is
// now space on the buffer
DEBUG_ERROR("lwIP stack is full, sndbuf == 0");
vs->tap->_phy.setNotifyReadable(vs->sock, false);
DEBUG_ERROR("lwIP stack is full, sndbuf==0");
//vs->tap->_phy.setNotifyReadable(vs->sock, false);
return -1;
}
int buf_w = vs->TXbuf->write((const unsigned char*)data, len);
if (buf_w != len) {
// because we checked ZT_TCP_TX_BUF_SZ above, this should not happen
DEBUG_ERROR("TX wrote only %d but expected to write %d", buf_w, len);
exit(0);
handle_general_failure();
return ZT_ERR_GENERAL_FAILURE;
}
if(vs->TXbuf->count() <= 0) {
return -1; // nothing to write
@@ -609,49 +608,61 @@ namespace ZeroTier
int lwIP::lwip_Close(VirtualSocket *vs)
{
//DEBUG_INFO();
int err = 0;
errno = 0;
if(vs->socket_type == SOCK_DGRAM) {
udp_remove((struct udp_pcb*)vs->pcb);
}
// FIXME: check if already closed? vs->TCP_pcb->state != CLOSED
if(vs->pcb) {
//DEBUG_EXTRA("vs=%p, sock=%p, PCB->state = %d",
// conn, sock, vs->TCP_pcb->state);
if(((struct tcp_pcb*)vs->pcb)->state == SYN_SENT /*|| vs->TCP_pcb->state == CLOSE_WAIT*/) {
DEBUG_EXTRA("ignoring close request. invalid PCB state for this operation. sock=%p", vs->sock);
// TODO: errno = ?;
return -1;
}
struct tcp_pcb* tpcb = (struct tcp_pcb*)vs->pcb;
if(tcp_close(tpcb) == ERR_OK) {
// Unregister callbacks for this PCB
tcp_arg(tpcb, NULL);
// unregister callbacks for this PCB
tcp_arg(tpcb, NULL);
tcp_recv(tpcb, NULL);
tcp_err(tpcb, NULL);
tcp_err(tpcb, NULL);
tcp_sent(tpcb, NULL);
tcp_poll(tpcb, NULL, 1);
}
else {
DEBUG_EXTRA("error while calling tcp_close() sock=%p", vs->sock);
err = -1;
// TODO: set errno
}
}
return 0;
return err;
}
/****************************************************************************/
/* Callbacks from lwIP stack */
/****************************************************************************/
// write data from processed packets from the stack to the client app
/*
With the raw API, tcp_recv() sets up to receive data via a callback function. Your callback
is delivered chains of pbufs as they become available. You have to manage extracting data
from the pbuf chain, and don't forget to watch out for multiple pbufs in a single callback:
the 'tot_len' field indicates the total length of data in the pbuf chain. You must call
tcp_recved() to tell LWIP when you have processed the received data. As with the netconn API,
you may receive more or less data than you want, and will have to either wait for further
callbacks, or hold onto excess data for later processing.
http://lwip.wikia.com/wiki/Receiving_data_with_LWIP
*/
err_t lwIP::lwip_cb_tcp_recved(void *arg, struct tcp_pcb *PCB, struct pbuf *p, err_t err)
{
DEBUG_INFO();
//DEBUG_INFO();
VirtualSocket *vs = (VirtualSocket *)arg;
int tot = 0;
if(!vs) {
DEBUG_ERROR("no virtual socket");
return ERR_OK; // FIXME: Determine if this is correct behaviour expected by the stack
}
struct pbuf* q = p;
if(p == NULL) {
/*
@@ -659,13 +670,10 @@ namespace ZeroTier
// FIXME: Implement?
}
*/
DEBUG_INFO("p == NULL");
return ERR_ABRT;
return ERR_ABRT; // close connection
}
vs->tap->_tcpconns_m.lock();
vs->_rx_m.lock();
// cycle through pbufs and write them to the RX buffer
while(p != NULL) {
if(p->len <= 0)
@@ -675,38 +683,51 @@ namespace ZeroTier
if(avail < len) {
DEBUG_ERROR("not enough room (%d bytes) on RX buffer", avail);
}
// get it on the buffer, fast!
// place new incoming data on ringbuffer before we try to send it to the app
memcpy(vs->RXbuf->get_buf(), p->payload, len);
vs->RXbuf->produce(len);
p = p->next;
tot += len;
}
DEBUG_INFO("tot=%d", tot);
if(tot) {
tcp_recved(PCB, tot);
DEBUG_TRANS("len=%5d buf_len=%13d [NSLWIP --> VSRXBF]", tot, vs->RXbuf->count());
int w, write_attempt_sz = vs->RXbuf->count() < ZT_MAX_MTU ? vs->RXbuf->count() : ZT_MAX_MTU;
if((w = write(vs->sdk_fd, vs->RXbuf->get_buf(), write_attempt_sz)) < 0) {
perror("write");
DEBUG_ERROR("write(fd=%d)=%d, errno=%d", vs->sdk_fd, w, errno);
}
if(w > 0) {
DEBUG_INFO("write_attempt_sz=%d, w=%d", write_attempt_sz, w);
vs->RXbuf->consume(w);
if(w < write_attempt_sz) {
DEBUG_TRANS("len=%5d buf_len=%13d [VSRXBF --> APPFDS]", w, vs->RXbuf->count());
DEBUG_ERROR("warning, intended to write %d bytes", write_attempt_sz);
}
else {
DEBUG_TRANS("len=%5d buf_len=%13d [VSRXBF --> APPFDS]", w, vs->RXbuf->count());
}
}
//vs->tap->_phy.setNotifyWritable(vs->sock, true);
//vs->tap->phyOnUnixWritable(vs->sock, NULL, true); // to app
}
else {
DEBUG_EXTRA("warning, wrote 0 bytes");
}
vs->tap->_tcpconns_m.unlock();
vs->_rx_m.unlock();
pbuf_free(q);
return ERR_OK;
}
/*
NSLWIP network_stack_lwip
NSPICO network_stack_pico
NSRXBF network_stack_pico guarded frame buffer RX
ZTVIRT zt_virtual_wire
APPFDS app_fd
VSRXBF app_fd TX buf
VSTXBF app_fd RX buf
*/
err_t lwIP::lwip_cb_accept(void *arg, struct tcp_pcb *newPCB, err_t err)
{
//DEBUG_INFO();
VirtualSocket *vs = (VirtualSocket*)arg;
struct sockaddr_storage ss;
#if defined(LIBZT_IPV4)
@@ -742,7 +763,7 @@ namespace ZeroTier
// copy processed datagram to app socket
void lwIP::lwip_cb_udp_recved(void * arg, struct udp_pcb * upcb, struct pbuf * p, const ip_addr_t * addr, u16_t port)
{
DEBUG_EXTRA("arg(vs)=%p, pcb=%p, port=%d)", arg, upcb, port);
//DEBUG_EXTRA("arg(vs)=%p, pcb=%p, port=%d)", arg, upcb, port);
VirtualSocket *vs = (VirtualSocket *)arg;
if(!vs) {
DEBUG_ERROR("invalid virtual socket");
@@ -799,9 +820,11 @@ namespace ZeroTier
pbuf_free(q);
}
// callback from stack to notify driver that data was sent
err_t lwIP::lwip_cb_sent(void* arg, struct tcp_pcb *PCB, u16_t len)
{
DEBUG_EXTRA("pcb=%p", PCB);
//DEBUG_EXTRA("pcb=%p", PCB);
/*
VirtualSocket *vs = (VirtualSocket *)arg;
Mutex::Lock _l(vs->tap->_tcpconns_m);
if(vs && len) {
@@ -811,6 +834,7 @@ namespace ZeroTier
vs->tap->_phy.whack();
}
}
*/
return ERR_OK;
}
@@ -837,76 +861,79 @@ namespace ZeroTier
void lwIP::lwip_cb_err(void *arg, err_t err)
{
DEBUG_ERROR("err=%d", err);
VirtualSocket *vs = (VirtualSocket *)arg;
if(!vs){
DEBUG_ERROR("vs==NULL");
DEBUG_ERROR("err=%d, invalid virtual socket", err);
errno = -1; // FIXME: Find more appropriate value
}
Mutex::Lock _l(vs->tap->_tcpconns_m);
int fd = vs->tap->_phy.getDescriptor(vs->sock);
DEBUG_ERROR("vs=%p, pcb=%p, fd=%d, err=%d", vs, vs->pcb, fd, err);
DEBUG_ERROR("closing virtual socket");
DEBUG_ERROR("vs=%p, pcb=%p, fd=%d, err=%d", vs, vs->pcb, vs->app_fd, err);
vs->tap->Close(vs);
switch(err)
{
case ERR_MEM:
DEBUG_ERROR("ERR_MEM->ENOMEM");
case ERR_MEM: // -1
DEBUG_ERROR("ERR_MEM->ENOMEM, Out of memory error.");
errno = ENOMEM;
break;
case ERR_BUF:
DEBUG_ERROR("ERR_BUF->ENOBUFS");
case ERR_BUF: // -2
DEBUG_ERROR("ERR_BUF->ENOBUFS, Buffer error.");
errno = ENOBUFS;
break;
case ERR_TIMEOUT:
DEBUG_ERROR("ERR_TIMEOUT->ETIMEDOUT");
case ERR_TIMEOUT: // -3
DEBUG_ERROR("ERR_TIMEOUT->ETIMEDOUT, Timeout.");
errno = ETIMEDOUT;
break;
case ERR_RTE:
DEBUG_ERROR("ERR_RTE->ENETUNREACH");
case ERR_RTE: // -4
DEBUG_ERROR("ERR_RTE->ENETUNREACH, Routing problem.");
errno = ENETUNREACH;
break;
case ERR_INPROGRESS:
DEBUG_ERROR("ERR_INPROGRESS->EINPROGRESS");
case ERR_INPROGRESS: // -5
DEBUG_ERROR("ERR_INPROGRESS->EINPROGRESS, Operation in progress.");
errno = EINPROGRESS;
break;
case ERR_VAL:
DEBUG_ERROR("ERR_VAL->EINVAL");
case ERR_VAL: // -6
DEBUG_ERROR("ERR_VAL->EINVAL, Illegal value.");
errno = EINVAL;
break;
case ERR_WOULDBLOCK:
DEBUG_ERROR("ERR_WOULDBLOCK->EWOULDBLOCK");
case ERR_WOULDBLOCK: // -7
DEBUG_ERROR("ERR_WOULDBLOCK->EWOULDBLOCK, Operation would block.");
errno = EWOULDBLOCK;
break;
case ERR_USE:
DEBUG_ERROR("ERR_USE->EADDRINUSE");
case ERR_USE: // -8
DEBUG_ERROR("ERR_USE->EADDRINUSE, Address in use.");
errno = EADDRINUSE;
break;
case ERR_ISCONN:
DEBUG_ERROR("ERR_ISvs->EISCONN");
case ERR_ALREADY: // -9 ?
DEBUG_ERROR("ERR_ALREADY->EISCONN, Already connecting.");
errno = EISCONN;
break;
case ERR_ABRT:
DEBUG_ERROR("ERR_ABRT->ECONNREFUSED");
errno = ECONNREFUSED;
case ERR_ISCONN: // -10
DEBUG_ERROR("ERR_ISCONN->EISCONN, Already connected");
errno = EISCONN;
break;
// TODO: Below are errors which don't have a standard errno correlate
case ERR_RST:
// -1
case ERR_CONN: // -11 ?
DEBUG_ERROR("ERR_CONN->EISCONN, Not connected");
errno = EISCONN;
break;
case ERR_CLSD:
// -1
case ERR_IF: // -12
DEBUG_ERROR("ERR_IF, Low-level netif error.");
errno = -1;
break;
case ERR_CONN:
// -1
case ERR_ABRT: // -13
DEBUG_ERROR("ERR_ABRT, Connection aborted.");
errno = -1;
break;
case ERR_RST: // -14
DEBUG_ERROR("ERR_RST, Connection reset.");
errno = -1;
break;
case ERR_ARG:
// -1
case ERR_CLSD: // -15
DEBUG_ERROR("ERR_CLSD, Connection closed.");
errno = -1;
break;
case ERR_IF:
// -1
case ERR_ARG: // -16
DEBUG_ERROR("ERR_ARG, Illegal argument.");
errno = -1;
break;
default:
break;