Added application polling logic to lwip_cb_poll()

This commit is contained in:
Joseph Henry
2017-09-11 17:01:59 -07:00
parent 67b7c7e2e2
commit 5512ede4fd

View File

@@ -57,6 +57,7 @@ void nd6_tmr(void);
err_t tapif_init(struct netif *netif) err_t tapif_init(struct netif *netif)
{ {
// we do the actual initialization in lwip_init_interface
return ERR_OK; return ERR_OK;
} }
@@ -96,7 +97,6 @@ err_t lwip_eth_tx(struct netif *netif, struct pbuf *p)
DEBUG_TRANS("len=%5d dst=%s [%s TX <-- %s] proto=0x%04x %s %s", totalLength, macBuf, nodeBuf, tap->nodeId().c_str(), DEBUG_TRANS("len=%5d dst=%s [%s TX <-- %s] proto=0x%04x %s %s", totalLength, macBuf, nodeBuf, tap->nodeId().c_str(),
ZeroTier::Utils::ntoh(ethhdr->type), beautify_eth_proto_nums(ZeroTier::Utils::ntoh(ethhdr->type)), flagbuf); ZeroTier::Utils::ntoh(ethhdr->type), beautify_eth_proto_nums(ZeroTier::Utils::ntoh(ethhdr->type)), flagbuf);
} }
return ERR_OK; return ERR_OK;
} }
@@ -129,6 +129,7 @@ namespace ZeroTier
tap->_mac.copyTo(tap->lwipdev.hwaddr, tap->lwipdev.hwaddr_len); tap->_mac.copyTo(tap->lwipdev.hwaddr, tap->lwipdev.hwaddr_len);
tap->lwipdev.flags = NETIF_FLAG_BROADCAST | NETIF_FLAG_ETHARP | NETIF_FLAG_IGMP | NETIF_FLAG_LINK_UP | NETIF_FLAG_UP; tap->lwipdev.flags = NETIF_FLAG_BROADCAST | NETIF_FLAG_ETHARP | NETIF_FLAG_IGMP | NETIF_FLAG_LINK_UP | NETIF_FLAG_UP;
netif_set_default(&(tap->lwipdev)); netif_set_default(&(tap->lwipdev));
netif_set_link_up(&(tap->lwipdev));
netif_set_up(&(tap->lwipdev)); netif_set_up(&(tap->lwipdev));
char macbuf[ZT_MAC_ADDRSTRLEN]; char macbuf[ZT_MAC_ADDRSTRLEN];
mac2str(macbuf, ZT_MAC_ADDRSTRLEN, tap->lwipdev.hwaddr); mac2str(macbuf, ZT_MAC_ADDRSTRLEN, tap->lwipdev.hwaddr);
@@ -167,12 +168,14 @@ namespace ZeroTier
netif_ip6_addr_set_state(&(tap->lwipdev6), 1, IP6_ADDR_TENTATIVE); netif_ip6_addr_set_state(&(tap->lwipdev6), 1, IP6_ADDR_TENTATIVE);
netif_set_default(&(tap->lwipdev6)); netif_set_default(&(tap->lwipdev6));
netif_set_up(&(tap->lwipdev6)); netif_set_link_up(&(tap->lwipdev6));
// state and flags // state and flags
tap->lwipdev6.state = tap; tap->lwipdev6.state = tap;
tap->lwipdev6.flags = NETIF_FLAG_LINK_UP | NETIF_FLAG_UP; tap->lwipdev6.flags = NETIF_FLAG_LINK_UP | NETIF_FLAG_UP;
netif_set_up(&(tap->lwipdev6));
char macbuf[ZT_MAC_ADDRSTRLEN]; char macbuf[ZT_MAC_ADDRSTRLEN];
mac2str(macbuf, ZT_MAC_ADDRSTRLEN, tap->lwipdev6.hwaddr); mac2str(macbuf, ZT_MAC_ADDRSTRLEN, tap->lwipdev6.hwaddr);
DEBUG_INFO("mac=%s, addr=%s", macbuf, ip.toString(ipbuf)); DEBUG_INFO("mac=%s, addr=%s", macbuf, ip.toString(ipbuf));
@@ -189,13 +192,11 @@ namespace ZeroTier
while(pcb_ptr) { while(pcb_ptr) {
pcb_ptr = pcb_ptr->next; pcb_ptr = pcb_ptr->next;
count++; count++;
DEBUG_ERROR("FOUND --- tcp_active_pcbs PCB COUNT = %d", count);
} }
pcb_ptr = tcp_tw_pcbs; // PCBs in TIME-WAIT state pcb_ptr = tcp_tw_pcbs; // PCBs in TIME-WAIT state
while(pcb_ptr) { while(pcb_ptr) {
pcb_ptr = pcb_ptr->next; pcb_ptr = pcb_ptr->next;
count++; count++;
DEBUG_ERROR("FOUND --- tcp_tw_pcbs PCB COUNT = %d", count);
} }
/* TODO /* TODO
pcb_ptr = tcp_listen_pcbs; pcb_ptr = tcp_listen_pcbs;
@@ -208,7 +209,6 @@ namespace ZeroTier
while(pcb_ptr) { while(pcb_ptr) {
pcb_ptr = pcb_ptr->next; pcb_ptr = pcb_ptr->next;
count++; count++;
DEBUG_ERROR("FOUND --- tcp_bound_pcbs PCB COUNT = %d", count);
} }
return count; return count;
} }
@@ -221,7 +221,6 @@ namespace ZeroTier
while(pcb_ptr) { while(pcb_ptr) {
pcb_ptr = pcb_ptr->next; pcb_ptr = pcb_ptr->next;
count++; count++;
DEBUG_ERROR("FOUND --- udp_pcbs PCB COUNT = %d", count);
} }
return count; return count;
} }
@@ -355,6 +354,7 @@ namespace ZeroTier
{ {
if(!can_provision_new_socket(socket_type)) { if(!can_provision_new_socket(socket_type)) {
DEBUG_ERROR("unable to create new socket due to limitation of network stack"); DEBUG_ERROR("unable to create new socket due to limitation of network stack");
errno = ENOMEM;
return -1; return -1;
} }
if(socket_type == SOCK_STREAM) { if(socket_type == SOCK_STREAM) {
@@ -368,6 +368,7 @@ namespace ZeroTier
*pcb = new_udp_PCB; *pcb = new_udp_PCB;
return ERR_OK; return ERR_OK;
} }
errno = ENOMEM;
return -1; return -1;
} }
@@ -380,7 +381,7 @@ namespace ZeroTier
#if defined(LIBZT_IPV4) #if defined(LIBZT_IPV4)
struct sockaddr_in *in4 = (struct sockaddr_in *)addr; struct sockaddr_in *in4 = (struct sockaddr_in *)addr;
if(addr->sa_family == AF_INET) { if(addr->sa_family == AF_INET && vs->socket_type == SOCK_STREAM) {
inet_ntop(AF_INET, &(in4->sin_addr), addrstr, INET_ADDRSTRLEN); inet_ntop(AF_INET, &(in4->sin_addr), addrstr, INET_ADDRSTRLEN);
DEBUG_EXTRA("connecting to %s : %d", addrstr, lwip_ntohs(in4->sin_port)); DEBUG_EXTRA("connecting to %s : %d", addrstr, lwip_ntohs(in4->sin_port));
} }
@@ -390,7 +391,7 @@ namespace ZeroTier
#if defined(LIBZT_IPV6) #if defined(LIBZT_IPV6)
struct sockaddr_in6 *in6 = (struct sockaddr_in6*)&addr; struct sockaddr_in6 *in6 = (struct sockaddr_in6*)&addr;
in6_to_ip6((ip6_addr *)&ba, in6); in6_to_ip6((ip6_addr *)&ba, in6);
if(addr->sa_family == AF_INET6) { if(addr->sa_family == AF_INET6 && vs->socket_type == SOCK_STREAM) {
inet_ntop(AF_INET6, &(in6->sin6_addr), addrstr, INET6_ADDRSTRLEN); inet_ntop(AF_INET6, &(in6->sin6_addr), addrstr, INET6_ADDRSTRLEN);
DEBUG_EXTRA("connecting to %s : %d", addrstr, lwip_ntohs(in6->sin6_port)); DEBUG_EXTRA("connecting to %s : %d", addrstr, lwip_ntohs(in6->sin6_port));
} }
@@ -403,7 +404,6 @@ namespace ZeroTier
udp_recv((struct udp_pcb*)vs->pcb, lwip_cb_udp_recved, vs); udp_recv((struct udp_pcb*)vs->pcb, lwip_cb_udp_recved, vs);
return ERR_OK; return ERR_OK;
} }
if(vs->socket_type == SOCK_STREAM) { if(vs->socket_type == SOCK_STREAM) {
struct tcp_pcb *tpcb = (struct tcp_pcb*)vs->pcb; struct tcp_pcb *tpcb = (struct tcp_pcb*)vs->pcb;
tcp_sent(tpcb, lwip_cb_sent); tcp_sent(tpcb, lwip_cb_sent);
@@ -411,9 +411,7 @@ namespace ZeroTier
tcp_err(tpcb, lwip_cb_err); tcp_err(tpcb, lwip_cb_err);
tcp_poll(tpcb, lwip_cb_poll, LWIP_APPLICATION_POLL_FREQ); tcp_poll(tpcb, lwip_cb_poll, LWIP_APPLICATION_POLL_FREQ);
tcp_arg(tpcb, vs); tcp_arg(tpcb, vs);
if((err = tcp_connect(tpcb,&ba,port,lwip_cb_connected)) < 0) {
if((err = tcp_connect(tpcb,&ba,port,lwip_cb_connected)) < 0)
{
errno = lwip_err_to_errno(err); errno = lwip_err_to_errno(err);
// We should only return a value if failure happens immediately // We should only return a value if failure happens immediately
// Otherwise, we still need to wait for a callback from lwIP. // Otherwise, we still need to wait for a callback from lwIP.
@@ -421,7 +419,7 @@ namespace ZeroTier
// that the SYN packet was enqueued onto the stack properly, // that the SYN packet was enqueued onto the stack properly,
// that's it! // that's it!
DEBUG_ERROR("unable to connect"); DEBUG_ERROR("unable to connect");
return -1; err = -1;
} }
} }
return err; return err;
@@ -559,19 +557,18 @@ namespace ZeroTier
int lwIP::lwip_Write(VirtualSocket *vs, void *data, ssize_t len) int lwIP::lwip_Write(VirtualSocket *vs, void *data, ssize_t len)
{ {
DEBUG_EXTRA("vs=%p, len=%d", vs, len);
int err = 0; int err = 0;
if(!vs) { if(!vs) {
DEBUG_ERROR("no virtual socket"); DEBUG_ERROR("no virtual socket");
return -1; return -1;
} }
DEBUG_EXTRA("fd=%d, vs=%p, len=%d", vs->app_fd, vs, len);
if(vs->socket_type == SOCK_DGRAM) { if(vs->socket_type == SOCK_DGRAM) {
// TODO: Packet re-assembly hasn't yet been tested with lwIP so UDP packets are limited to MTU-sized chunks // TODO: Packet re-assembly hasn't yet been tested with lwIP so UDP packets are limited to MTU-sized chunks
int udp_trans_len = std::min(len, (ssize_t)ZT_MAX_MTU); int udp_trans_len = std::min(len, (ssize_t)ZT_MAX_MTU);
// DEBUG_EXTRA("allocating pbuf chain of size=%d for UDP packet", udp_trans_len);
struct pbuf * pb = pbuf_alloc(PBUF_TRANSPORT, udp_trans_len, PBUF_POOL); struct pbuf * pb = pbuf_alloc(PBUF_TRANSPORT, udp_trans_len, PBUF_POOL);
if(!pb) { if(!pb) {
DEBUG_ERROR("unable to allocate new pbuf of size=%d", vs->TXbuf->count()); DEBUG_ERROR("unable to allocate new pbuf of len=%d", udp_trans_len);
return -1; return -1;
} }
memcpy(pb->payload, data, udp_trans_len); memcpy(pb->payload, data, udp_trans_len);
@@ -592,67 +589,78 @@ namespace ZeroTier
if(vs->socket_type == SOCK_STREAM) { if(vs->socket_type == SOCK_STREAM) {
// How much we are currently allowed to write to the VirtualSocket // How much we are currently allowed to write to the VirtualSocket
ssize_t sndbuf = ((struct tcp_pcb*)vs->pcb)->snd_buf; ssize_t sndbuf = ((struct tcp_pcb*)vs->pcb)->snd_buf;
int err, r; int r;
if(!sndbuf) { if(!sndbuf) {
// PCB send buffer is full, turn off readability notifications for the // PCB send buffer is full, turn off readability notifications for the
// corresponding PhySocket until lwip_cb_sent() is called and confirms that there is // corresponding PhySocket until lwip_cb_sent() is called and confirms that there is
// now space on the buffer // now space on the buffer
DEBUG_ERROR("lwIP stack is full, sndbuf==0"); DEBUG_ERROR("lwIP stack is full, sndbuf==0");
//vs->tap->_phy.setNotifyReadable(vs->sock, false); //vs->tap->_phy.setNotifyReadable(vs->sock, false);
return -1; err = -1;
} }
vs->_tx_m.lock();
int buf_w = vs->TXbuf->write((const unsigned char*)data, len); int buf_w = vs->TXbuf->write((const unsigned char*)data, len);
if (buf_w != len) { if (buf_w != len) {
// because we checked ZT_TCP_TX_BUF_SZ above, this should not happen DEBUG_ERROR("only wrote len=%d but expected to write len=%d to TX buffer", buf_w, len);
DEBUG_ERROR("TX wrote only %d but expected to write %d", buf_w, len);
handle_general_failure(); handle_general_failure();
return ZT_ERR_GENERAL_FAILURE; err = ZT_ERR_GENERAL_FAILURE;
} }
if(vs->TXbuf->count() <= 0) { if(vs->TXbuf->count() <= 0) {
return -1; // nothing to write err = -1; // nothing to write
} }
if(vs->sock) { if(!err) {
r = std::min((ssize_t)vs->TXbuf->count(), sndbuf); r = std::min((ssize_t)vs->TXbuf->count(), sndbuf);
// Writes data pulled from the client's socket buffer to LWIP. This merely sends the // Writes data pulled from the client's socket buffer to LWIP. This merely sends the
// data to LWIP to be enqueued and eventually sent to the network. // data to LWIP to be enqueued and eventually sent to the network.
if(r > 0) { if(r > 0) {
err = tcp_write((struct tcp_pcb*)vs->pcb, vs->TXbuf->get_buf(), r, vs->copymode); err = tcp_write((struct tcp_pcb*)vs->pcb, vs->TXbuf->get_buf(), r, vs->copymode);
tcp_output((struct tcp_pcb*)vs->pcb); tcp_output((struct tcp_pcb*)vs->pcb);
if(err != ERR_OK) { if(err != ERR_OK) {
DEBUG_ERROR("error while writing to lwIP tcp_pcb, err=%d", err); DEBUG_ERROR("error while writing to lwIP tcp_pcb, err=%d", err);
if(err == -1) if(err == ERR_MEM) {
DEBUG_ERROR("lwIP out of memory"); DEBUG_ERROR("lwIP out of memory");
return -1; }
err = -1;
} else { } else {
if(vs->copymode & TCP_WRITE_FLAG_COPY) { if(vs->copymode & TCP_WRITE_FLAG_COPY) {
// since we copied the data (allocated pbufs), we can consume the buffer // since we copied the data (allocated pbufs), we can consume the buffer
vs->TXbuf->consume(r); // success vs->TXbuf->consume(r); // success
DEBUG_TRANS("len=%5d tx_buf_len=%10d [VSTXBF --> NSLWIP]", err, vs->TXbuf->count());
} }
else { else {
// since we only processed the data by pointer reference we // since we only processed the data by pointer reference we
// want to preserve it until it has been ACKed by the remote host // want to preserve it until it has been ACKed by the remote host
// (DO NOTHING) // (DO NOTHING)
} }
return ERR_OK; err = ERR_OK;
} }
} }
} }
vs->_tx_m.unlock();
} }
return err; return err;
} }
int lwIP::lwip_Close(VirtualSocket *vs) int lwIP::lwip_Close(VirtualSocket *vs)
{ {
if(!vs) {
DEBUG_ERROR("invalid vs");
handle_general_failure();
return -1;
}
DEBUG_EXTRA("fd=%d, vs=%p", vs->app_fd, vs);
int err = 0; int err = 0;
errno = 0; errno = 0;
if(vs->socket_type == SOCK_DGRAM) { if(vs->socket_type == SOCK_DGRAM) {
udp_remove((struct udp_pcb*)vs->pcb); udp_remove((struct udp_pcb*)vs->pcb);
} }
// FIXME: check if already closed? vs->TCP_pcb->state != CLOSED if(vs->socket_type == SOCK_STREAM) {
// according to documentation, tcp_pcb is deallocated by the stack's own tcp code. do nothing
}
// TODO: check if already closed? vs->TCP_pcb->state != CLOSED
if(vs->pcb) { if(vs->pcb) {
if(((struct tcp_pcb*)vs->pcb)->state == SYN_SENT /*|| vs->TCP_pcb->state == CLOSE_WAIT*/) { if(((struct tcp_pcb*)vs->pcb)->state == SYN_SENT /*|| vs->TCP_pcb->state == CLOSE_WAIT*/) {
DEBUG_EXTRA("ignoring close request. invalid PCB state for this operation. sock=%p", vs->sock); // DEBUG_EXTRA("ignoring close request. invalid PCB state for this operation. sock=%p", vs->sock);
// TODO: errno = ?; // TODO: errno = ?;
return -1; return -1;
} }
@@ -666,9 +674,9 @@ namespace ZeroTier
tcp_poll(tpcb, NULL, 1); tcp_poll(tpcb, NULL, 1);
} }
else { else {
DEBUG_EXTRA("error while calling tcp_close() sock=%p", vs->sock); DEBUG_ERROR("error while calling tcp_close, fd=%d, vs=%p, pcb=%p", vs->app_fd, vs, vs->pcb);
errno = lwip_err_to_errno(err);
err = -1; err = -1;
// TODO: set errno
} }
} }
return err; return err;
@@ -731,8 +739,9 @@ namespace ZeroTier
vs->_rx_m.lock(); vs->_rx_m.lock();
// cycle through pbufs and write them to the RX buffer // cycle through pbufs and write them to the RX buffer
while(p != NULL) { while(p != NULL) {
if(p->len <= 0) if(p->len <= 0) {
break; break;
}
int avail = ZT_TCP_RX_BUF_SZ - vs->RXbuf->count(); int avail = ZT_TCP_RX_BUF_SZ - vs->RXbuf->count();
int len = p->len; int len = p->len;
if(avail < len) { if(avail < len) {
@@ -746,7 +755,7 @@ namespace ZeroTier
} }
if(tot) { if(tot) {
tcp_recved(PCB, tot); tcp_recved(PCB, tot);
DEBUG_TRANS("len=%5d buf_len=%13d [NSLWIP --> VSRXBF]", tot, vs->RXbuf->count()); DEBUG_TRANS("len=%5d rx_buf_len=%10d [NSLWIP --> VSRXBF]", tot, vs->RXbuf->count());
int w, write_attempt_sz = vs->RXbuf->count() < ZT_MAX_MTU ? vs->RXbuf->count() : ZT_MAX_MTU; int w, write_attempt_sz = vs->RXbuf->count() < ZT_MAX_MTU ? vs->RXbuf->count() : ZT_MAX_MTU;
if((w = write(vs->sdk_fd, vs->RXbuf->get_buf(), write_attempt_sz)) < 0) { if((w = write(vs->sdk_fd, vs->RXbuf->get_buf(), write_attempt_sz)) < 0) {
DEBUG_ERROR("write(fd=%d)=%d, errno=%d", vs->sdk_fd, w, errno); DEBUG_ERROR("write(fd=%d)=%d, errno=%d", vs->sdk_fd, w, errno);
@@ -754,11 +763,11 @@ namespace ZeroTier
if(w > 0) { if(w > 0) {
vs->RXbuf->consume(w); vs->RXbuf->consume(w);
if(w < write_attempt_sz) { if(w < write_attempt_sz) {
DEBUG_TRANS("len=%5d buf_len=%13d [VSRXBF --> APPFDS]", w, vs->RXbuf->count()); DEBUG_TRANS("len=%5d rx_buf_len=%10d [VSRXBF --> APPFDS]", w, vs->RXbuf->count());
DEBUG_ERROR("warning, intended to write %d bytes", write_attempt_sz); DEBUG_ERROR("intended to write len=%d, only wrote len=%d", write_attempt_sz, w);
} }
else { else {
DEBUG_TRANS("len=%5d buf_len=%13d [VSRXBF --> APPFDS]", w, vs->RXbuf->count()); DEBUG_TRANS("len=%5d rx_buf_len=%10d [VSRXBF --> APPFDS]", w, vs->RXbuf->count());
} }
} }
} }
@@ -876,10 +885,33 @@ namespace ZeroTier
DEBUG_ERROR("invalid vs for PCB=%p, len=%d", PCB, len); DEBUG_ERROR("invalid vs for PCB=%p, len=%d", PCB, len);
} }
if(!(vs->copymode & TCP_WRITE_FLAG_COPY)) { if(!(vs->copymode & TCP_WRITE_FLAG_COPY)) {
/*
From lwIP docs:
To achieve zero-copy on transmit, the data passed to the raw API must
remain unchanged until sent. Because the send- (or write-)functions return
when the packets have been enqueued for sending, data must be kept stable
after that, too.
This implies that PBUF_RAM/PBUF_POOL pbufs passed to raw-API send functions
must *not* be reused by the application unless their ref-count is 1.
For no-copy pbufs (PBUF_ROM/PBUF_REF), data must be kept unchanged, too,
but the stack/driver will/must copy PBUF_REF'ed data when enqueueing, while
PBUF_ROM-pbufs are just enqueued (as ROM-data is expected to never change).
Also, data passed to tcp_write without the copy-flag must not be changed!
Therefore, be careful which type of PBUF you use and if you copy TCP data
or not!
*/
// since we decided in lwip_Write() not to consume the buffere data, as it // since we decided in lwip_Write() not to consume the buffere data, as it
// was not copied and was only used by pointer reference, we can now consume // was not copied and was only used by pointer reference, we can now consume
// the data on the buffer since we've got an ACK back from the remote host // the data on the buffer since we've got an ACK back from the remote host
vs->_tx_m.lock();
vs->TXbuf->consume(len); vs->TXbuf->consume(len);
vs->_tx_m.unlock();
} }
return ERR_OK; return ERR_OK;
} }
@@ -902,6 +934,48 @@ namespace ZeroTier
err_t lwIP::lwip_cb_poll(void* arg, struct tcp_pcb *PCB) err_t lwIP::lwip_cb_poll(void* arg, struct tcp_pcb *PCB)
{ {
VirtualSocket *vs = (VirtualSocket *)arg;
DEBUG_EXTRA("fd=%d, vs=%p, PCB=%p", vs->app_fd, vs, PCB);
if(!vs) {
DEBUG_ERROR("invalid vs");
handle_general_failure();
return ERR_OK; // TODO: determine appropriate error value, if any
}
// --- Check buffers to see if we need to finish reading/writing anything ---
// TODO: Make a more generic form of each of these RX/TX blocks that can be shared
// between all polling callbacks and read write methods
// RX
vs->_rx_m.lock();
if(vs->RXbuf->count()) {
// this data has already been acknowledged via tcp_recved(), we merely need to
// move it off of the ringbuffer and into the client app
int w, write_attempt_sz = vs->RXbuf->count() < ZT_MAX_MTU ? vs->RXbuf->count() : ZT_MAX_MTU;
if((w = write(vs->sdk_fd, vs->RXbuf->get_buf(), write_attempt_sz)) < 0) {
DEBUG_ERROR("write(fd=%d)=%d, errno=%d", vs->sdk_fd, w, errno);
}
if(w > 0) {
vs->RXbuf->consume(w);
if(w < write_attempt_sz) {
DEBUG_TRANS("len=%5d rx_buf_len=%10d [VSRXBF --> APPFDS]", w, vs->RXbuf->count());
DEBUG_ERROR("intended to write len=%d, only wrote len=%d", write_attempt_sz, w);
}
else {
DEBUG_TRANS("len=%5d rx_buf_len=%10d [VSRXBF --> APPFDS]", w, vs->RXbuf->count());
}
}
}
vs->_rx_m.unlock();
// No need to lock the TX buffer since lwip_Write() will lock it for us
// TX
if(vs->TXbuf->count()) {
// we previously attempted to tcp_write(), but something went wrong, this
// is where we retry
lwipstack->lwip_Write(vs, vs->TXbuf->get_buf(), vs->TXbuf->count());
}
return ERR_OK; return ERR_OK;
} }