diff --git a/src/lwIP.cpp b/src/lwIP.cpp index dee0e09..5ba9ad3 100644 --- a/src/lwIP.cpp +++ b/src/lwIP.cpp @@ -57,7 +57,8 @@ void nd6_tmr(void); err_t tapif_init(struct netif *netif) { - return ERR_OK; + // we do the actual initialization in lwip_init_interface + return ERR_OK; } err_t lwip_eth_tx(struct netif *netif, struct pbuf *p) @@ -96,7 +97,6 @@ err_t lwip_eth_tx(struct netif *netif, struct pbuf *p) DEBUG_TRANS("len=%5d dst=%s [%s TX <-- %s] proto=0x%04x %s %s", totalLength, macBuf, nodeBuf, tap->nodeId().c_str(), ZeroTier::Utils::ntoh(ethhdr->type), beautify_eth_proto_nums(ZeroTier::Utils::ntoh(ethhdr->type)), flagbuf); } - return ERR_OK; } @@ -129,6 +129,7 @@ namespace ZeroTier tap->_mac.copyTo(tap->lwipdev.hwaddr, tap->lwipdev.hwaddr_len); tap->lwipdev.flags = NETIF_FLAG_BROADCAST | NETIF_FLAG_ETHARP | NETIF_FLAG_IGMP | NETIF_FLAG_LINK_UP | NETIF_FLAG_UP; netif_set_default(&(tap->lwipdev)); + netif_set_link_up(&(tap->lwipdev)); netif_set_up(&(tap->lwipdev)); char macbuf[ZT_MAC_ADDRSTRLEN]; mac2str(macbuf, ZT_MAC_ADDRSTRLEN, tap->lwipdev.hwaddr); @@ -167,12 +168,14 @@ namespace ZeroTier netif_ip6_addr_set_state(&(tap->lwipdev6), 1, IP6_ADDR_TENTATIVE); netif_set_default(&(tap->lwipdev6)); - netif_set_up(&(tap->lwipdev6)); + netif_set_link_up(&(tap->lwipdev6)); // state and flags tap->lwipdev6.state = tap; tap->lwipdev6.flags = NETIF_FLAG_LINK_UP | NETIF_FLAG_UP; + netif_set_up(&(tap->lwipdev6)); + char macbuf[ZT_MAC_ADDRSTRLEN]; mac2str(macbuf, ZT_MAC_ADDRSTRLEN, tap->lwipdev6.hwaddr); DEBUG_INFO("mac=%s, addr=%s", macbuf, ip.toString(ipbuf)); @@ -189,13 +192,11 @@ namespace ZeroTier while(pcb_ptr) { pcb_ptr = pcb_ptr->next; count++; - DEBUG_ERROR("FOUND --- tcp_active_pcbs PCB COUNT = %d", count); } pcb_ptr = tcp_tw_pcbs; // PCBs in TIME-WAIT state while(pcb_ptr) { pcb_ptr = pcb_ptr->next; count++; - DEBUG_ERROR("FOUND --- tcp_tw_pcbs PCB COUNT = %d", count); } /* TODO pcb_ptr = tcp_listen_pcbs; @@ -208,7 +209,6 @@ namespace ZeroTier while(pcb_ptr) { pcb_ptr = pcb_ptr->next; count++; - DEBUG_ERROR("FOUND --- tcp_bound_pcbs PCB COUNT = %d", count); } return count; } @@ -221,7 +221,6 @@ namespace ZeroTier while(pcb_ptr) { pcb_ptr = pcb_ptr->next; count++; - DEBUG_ERROR("FOUND --- udp_pcbs PCB COUNT = %d", count); } return count; } @@ -355,6 +354,7 @@ namespace ZeroTier { if(!can_provision_new_socket(socket_type)) { DEBUG_ERROR("unable to create new socket due to limitation of network stack"); + errno = ENOMEM; return -1; } if(socket_type == SOCK_STREAM) { @@ -368,6 +368,7 @@ namespace ZeroTier *pcb = new_udp_PCB; return ERR_OK; } + errno = ENOMEM; return -1; } @@ -380,7 +381,7 @@ namespace ZeroTier #if defined(LIBZT_IPV4) struct sockaddr_in *in4 = (struct sockaddr_in *)addr; - if(addr->sa_family == AF_INET) { + if(addr->sa_family == AF_INET && vs->socket_type == SOCK_STREAM) { inet_ntop(AF_INET, &(in4->sin_addr), addrstr, INET_ADDRSTRLEN); DEBUG_EXTRA("connecting to %s : %d", addrstr, lwip_ntohs(in4->sin_port)); } @@ -390,7 +391,7 @@ namespace ZeroTier #if defined(LIBZT_IPV6) struct sockaddr_in6 *in6 = (struct sockaddr_in6*)&addr; in6_to_ip6((ip6_addr *)&ba, in6); - if(addr->sa_family == AF_INET6) { + if(addr->sa_family == AF_INET6 && vs->socket_type == SOCK_STREAM) { inet_ntop(AF_INET6, &(in6->sin6_addr), addrstr, INET6_ADDRSTRLEN); DEBUG_EXTRA("connecting to %s : %d", addrstr, lwip_ntohs(in6->sin6_port)); } @@ -403,17 +404,14 @@ namespace ZeroTier udp_recv((struct udp_pcb*)vs->pcb, lwip_cb_udp_recved, vs); return ERR_OK; } - if(vs->socket_type == SOCK_STREAM) { struct tcp_pcb *tpcb = (struct tcp_pcb*)vs->pcb; tcp_sent(tpcb, lwip_cb_sent); tcp_recv(tpcb, lwip_cb_tcp_recved); tcp_err(tpcb, lwip_cb_err); tcp_poll(tpcb, lwip_cb_poll, LWIP_APPLICATION_POLL_FREQ); - tcp_arg(tpcb, vs); - - if((err = tcp_connect(tpcb,&ba,port,lwip_cb_connected)) < 0) - { + tcp_arg(tpcb, vs); + if((err = tcp_connect(tpcb,&ba,port,lwip_cb_connected)) < 0) { errno = lwip_err_to_errno(err); // We should only return a value if failure happens immediately // Otherwise, we still need to wait for a callback from lwIP. @@ -421,7 +419,7 @@ namespace ZeroTier // that the SYN packet was enqueued onto the stack properly, // that's it! DEBUG_ERROR("unable to connect"); - return -1; + err = -1; } } return err; @@ -559,19 +557,18 @@ namespace ZeroTier int lwIP::lwip_Write(VirtualSocket *vs, void *data, ssize_t len) { - DEBUG_EXTRA("vs=%p, len=%d", vs, len); int err = 0; if(!vs) { DEBUG_ERROR("no virtual socket"); return -1; } + DEBUG_EXTRA("fd=%d, vs=%p, len=%d", vs->app_fd, vs, len); if(vs->socket_type == SOCK_DGRAM) { // TODO: Packet re-assembly hasn't yet been tested with lwIP so UDP packets are limited to MTU-sized chunks int udp_trans_len = std::min(len, (ssize_t)ZT_MAX_MTU); - // DEBUG_EXTRA("allocating pbuf chain of size=%d for UDP packet", udp_trans_len); struct pbuf * pb = pbuf_alloc(PBUF_TRANSPORT, udp_trans_len, PBUF_POOL); - if(!pb){ - DEBUG_ERROR("unable to allocate new pbuf of size=%d", vs->TXbuf->count()); + if(!pb) { + DEBUG_ERROR("unable to allocate new pbuf of len=%d", udp_trans_len); return -1; } memcpy(pb->payload, data, udp_trans_len); @@ -592,67 +589,78 @@ namespace ZeroTier if(vs->socket_type == SOCK_STREAM) { // How much we are currently allowed to write to the VirtualSocket ssize_t sndbuf = ((struct tcp_pcb*)vs->pcb)->snd_buf; - int err, r; + int r; if(!sndbuf) { // PCB send buffer is full, turn off readability notifications for the // corresponding PhySocket until lwip_cb_sent() is called and confirms that there is // now space on the buffer DEBUG_ERROR("lwIP stack is full, sndbuf==0"); //vs->tap->_phy.setNotifyReadable(vs->sock, false); - return -1; + err = -1; } + vs->_tx_m.lock(); int buf_w = vs->TXbuf->write((const unsigned char*)data, len); if (buf_w != len) { - // because we checked ZT_TCP_TX_BUF_SZ above, this should not happen - DEBUG_ERROR("TX wrote only %d but expected to write %d", buf_w, len); + DEBUG_ERROR("only wrote len=%d but expected to write len=%d to TX buffer", buf_w, len); handle_general_failure(); - return ZT_ERR_GENERAL_FAILURE; + err = ZT_ERR_GENERAL_FAILURE; } if(vs->TXbuf->count() <= 0) { - return -1; // nothing to write + err = -1; // nothing to write } - if(vs->sock) { + if(!err) { r = std::min((ssize_t)vs->TXbuf->count(), sndbuf); // Writes data pulled from the client's socket buffer to LWIP. This merely sends the // data to LWIP to be enqueued and eventually sent to the network. if(r > 0) { - err = tcp_write((struct tcp_pcb*)vs->pcb, vs->TXbuf->get_buf(), r, vs->copymode); tcp_output((struct tcp_pcb*)vs->pcb); if(err != ERR_OK) { DEBUG_ERROR("error while writing to lwIP tcp_pcb, err=%d", err); - if(err == -1) + if(err == ERR_MEM) { DEBUG_ERROR("lwIP out of memory"); - return -1; + } + err = -1; } else { if(vs->copymode & TCP_WRITE_FLAG_COPY) { // since we copied the data (allocated pbufs), we can consume the buffer vs->TXbuf->consume(r); // success + DEBUG_TRANS("len=%5d tx_buf_len=%10d [VSTXBF --> NSLWIP]", err, vs->TXbuf->count()); } else { // since we only processed the data by pointer reference we // want to preserve it until it has been ACKed by the remote host // (DO NOTHING) } - return ERR_OK; + err = ERR_OK; } } } + vs->_tx_m.unlock(); } return err; } int lwIP::lwip_Close(VirtualSocket *vs) { + if(!vs) { + DEBUG_ERROR("invalid vs"); + handle_general_failure(); + return -1; + } + DEBUG_EXTRA("fd=%d, vs=%p", vs->app_fd, vs); int err = 0; errno = 0; if(vs->socket_type == SOCK_DGRAM) { udp_remove((struct udp_pcb*)vs->pcb); } - // FIXME: check if already closed? vs->TCP_pcb->state != CLOSED + if(vs->socket_type == SOCK_STREAM) { + // according to documentation, tcp_pcb is deallocated by the stack's own tcp code. do nothing + } + // TODO: check if already closed? vs->TCP_pcb->state != CLOSED if(vs->pcb) { if(((struct tcp_pcb*)vs->pcb)->state == SYN_SENT /*|| vs->TCP_pcb->state == CLOSE_WAIT*/) { - DEBUG_EXTRA("ignoring close request. invalid PCB state for this operation. sock=%p", vs->sock); + // DEBUG_EXTRA("ignoring close request. invalid PCB state for this operation. sock=%p", vs->sock); // TODO: errno = ?; return -1; } @@ -666,9 +674,9 @@ namespace ZeroTier tcp_poll(tpcb, NULL, 1); } else { - DEBUG_EXTRA("error while calling tcp_close() sock=%p", vs->sock); + DEBUG_ERROR("error while calling tcp_close, fd=%d, vs=%p, pcb=%p", vs->app_fd, vs, vs->pcb); + errno = lwip_err_to_errno(err); err = -1; - // TODO: set errno } } return err; @@ -731,8 +739,9 @@ namespace ZeroTier vs->_rx_m.lock(); // cycle through pbufs and write them to the RX buffer while(p != NULL) { - if(p->len <= 0) + if(p->len <= 0) { break; + } int avail = ZT_TCP_RX_BUF_SZ - vs->RXbuf->count(); int len = p->len; if(avail < len) { @@ -746,7 +755,7 @@ namespace ZeroTier } if(tot) { tcp_recved(PCB, tot); - DEBUG_TRANS("len=%5d buf_len=%13d [NSLWIP --> VSRXBF]", tot, vs->RXbuf->count()); + DEBUG_TRANS("len=%5d rx_buf_len=%10d [NSLWIP --> VSRXBF]", tot, vs->RXbuf->count()); int w, write_attempt_sz = vs->RXbuf->count() < ZT_MAX_MTU ? vs->RXbuf->count() : ZT_MAX_MTU; if((w = write(vs->sdk_fd, vs->RXbuf->get_buf(), write_attempt_sz)) < 0) { DEBUG_ERROR("write(fd=%d)=%d, errno=%d", vs->sdk_fd, w, errno); @@ -754,11 +763,11 @@ namespace ZeroTier if(w > 0) { vs->RXbuf->consume(w); if(w < write_attempt_sz) { - DEBUG_TRANS("len=%5d buf_len=%13d [VSRXBF --> APPFDS]", w, vs->RXbuf->count()); - DEBUG_ERROR("warning, intended to write %d bytes", write_attempt_sz); + DEBUG_TRANS("len=%5d rx_buf_len=%10d [VSRXBF --> APPFDS]", w, vs->RXbuf->count()); + DEBUG_ERROR("intended to write len=%d, only wrote len=%d", write_attempt_sz, w); } else { - DEBUG_TRANS("len=%5d buf_len=%13d [VSRXBF --> APPFDS]", w, vs->RXbuf->count()); + DEBUG_TRANS("len=%5d rx_buf_len=%10d [VSRXBF --> APPFDS]", w, vs->RXbuf->count()); } } } @@ -876,10 +885,33 @@ namespace ZeroTier DEBUG_ERROR("invalid vs for PCB=%p, len=%d", PCB, len); } if(!(vs->copymode & TCP_WRITE_FLAG_COPY)) { + /* + From lwIP docs: + + To achieve zero-copy on transmit, the data passed to the raw API must + remain unchanged until sent. Because the send- (or write-)functions return + when the packets have been enqueued for sending, data must be kept stable + after that, too. + + This implies that PBUF_RAM/PBUF_POOL pbufs passed to raw-API send functions + must *not* be reused by the application unless their ref-count is 1. + + For no-copy pbufs (PBUF_ROM/PBUF_REF), data must be kept unchanged, too, + but the stack/driver will/must copy PBUF_REF'ed data when enqueueing, while + PBUF_ROM-pbufs are just enqueued (as ROM-data is expected to never change). + + Also, data passed to tcp_write without the copy-flag must not be changed! + + Therefore, be careful which type of PBUF you use and if you copy TCP data + or not! + */ + // since we decided in lwip_Write() not to consume the buffere data, as it // was not copied and was only used by pointer reference, we can now consume // the data on the buffer since we've got an ACK back from the remote host + vs->_tx_m.lock(); vs->TXbuf->consume(len); + vs->_tx_m.unlock(); } return ERR_OK; } @@ -902,6 +934,48 @@ namespace ZeroTier err_t lwIP::lwip_cb_poll(void* arg, struct tcp_pcb *PCB) { + VirtualSocket *vs = (VirtualSocket *)arg; + DEBUG_EXTRA("fd=%d, vs=%p, PCB=%p", vs->app_fd, vs, PCB); + if(!vs) { + DEBUG_ERROR("invalid vs"); + handle_general_failure(); + return ERR_OK; // TODO: determine appropriate error value, if any + } + + // --- Check buffers to see if we need to finish reading/writing anything --- + + // TODO: Make a more generic form of each of these RX/TX blocks that can be shared + // between all polling callbacks and read write methods + + // RX + vs->_rx_m.lock(); + if(vs->RXbuf->count()) { + // this data has already been acknowledged via tcp_recved(), we merely need to + // move it off of the ringbuffer and into the client app + int w, write_attempt_sz = vs->RXbuf->count() < ZT_MAX_MTU ? vs->RXbuf->count() : ZT_MAX_MTU; + if((w = write(vs->sdk_fd, vs->RXbuf->get_buf(), write_attempt_sz)) < 0) { + DEBUG_ERROR("write(fd=%d)=%d, errno=%d", vs->sdk_fd, w, errno); + } + if(w > 0) { + vs->RXbuf->consume(w); + if(w < write_attempt_sz) { + DEBUG_TRANS("len=%5d rx_buf_len=%10d [VSRXBF --> APPFDS]", w, vs->RXbuf->count()); + DEBUG_ERROR("intended to write len=%d, only wrote len=%d", write_attempt_sz, w); + } + else { + DEBUG_TRANS("len=%5d rx_buf_len=%10d [VSRXBF --> APPFDS]", w, vs->RXbuf->count()); + } + } + } + vs->_rx_m.unlock(); + + // No need to lock the TX buffer since lwip_Write() will lock it for us + // TX + if(vs->TXbuf->count()) { + // we previously attempted to tcp_write(), but something went wrong, this + // is where we retry + lwipstack->lwip_Write(vs, vs->TXbuf->get_buf(), vs->TXbuf->count()); + } return ERR_OK; }