diff --git a/include/libzt.h b/include/libzt.h index 6807fad..742ea84 100644 --- a/include/libzt.h +++ b/include/libzt.h @@ -30,6 +30,8 @@ #include #include #include +#include +#include /****************************************************************************/ /* For SOCK_RAW support, it will initially be modeled after linux's API, so */ @@ -484,6 +486,11 @@ namespace ZeroTier int zts_get_pico_socket(int fd, struct pico_socket **s); #endif +/* + * Whether we can add a new socket or not. Depends on stack in use + */ +bool can_provision_new_socket(); + /** * Returns the number of sockets either already provisioned or waiting to be * Some network stacks may have a limit on the number of sockets that they can diff --git a/src/SocketTap.cpp b/src/SocketTap.cpp index 274671c..88bcd49 100644 --- a/src/SocketTap.cpp +++ b/src/SocketTap.cpp @@ -266,13 +266,12 @@ namespace ZeroTier { if(!conn) return; if(len){ - Write(conn, data, len); } return; } - void SocketTap::phyOnUnixWritable(PhySocket *sock,void **uptr,bool stack_invoked) + void SocketTap::phyOnUnixWritable(PhySocket *sock, void **uptr, bool stack_invoked) { if(sock) Read(sock,uptr,stack_invoked); @@ -283,6 +282,9 @@ namespace ZeroTier { /****************************************************************************/ int SocketTap::Connect(Connection *conn, int fd, const struct sockaddr *addr, socklen_t addrlen) { +#if defined(NO_STACK) + return -1; +#endif Mutex::Lock _l(_tcpconns_m); #if defined(STACK_PICO) if(picostack) @@ -296,6 +298,9 @@ namespace ZeroTier { } int SocketTap::Bind(Connection *conn, int fd, const struct sockaddr *addr, socklen_t addrlen) { +#if defined(NO_STACK) + return -1; +#endif Mutex::Lock _l(_tcpconns_m); #if defined(STACK_PICO) if(picostack) @@ -309,21 +314,37 @@ namespace ZeroTier { } int SocketTap::Listen(Connection *conn, int fd, int backlog) { -#if defined(STACK_PICO) +#if defined(NO_STACK) + return -1; +#endif Mutex::Lock _l(_tcpconns_m); +#if defined(STACK_PICO) if(picostack) return picostack->pico_Listen(conn, fd, backlog); return ZT_ERR_GENERAL_FAILURE; +#endif +#if defined(STACK_LWIP) + if(lwipstack) + return lwipstack->lwip_Listen(conn, backlog); + return ZT_ERR_GENERAL_FAILURE; #endif return ZT_ERR_GENERAL_FAILURE; } Connection* SocketTap::Accept(Connection *conn) { -#if defined(STACK_PICO) +#if defined(NO_STACK) + return NULL; +#endif Mutex::Lock _l(_tcpconns_m); +#if defined(STACK_PICO) if(picostack) return picostack->pico_Accept(conn); return NULL; +#endif +#if defined(STACK_LWIP) + if(lwipstack) + return lwipstack->lwip_Accept(conn); + return NULL; #endif return NULL; } @@ -332,12 +353,17 @@ namespace ZeroTier { #if defined(STACK_PICO) if(picostack) return picostack->pico_Read(this, sock, (Connection*)uptr, stack_invoked); +#endif +#if defined(STACK_LWIP) + if(lwipstack) + return lwipstack->lwip_Read((Connection*)*(_phy.getuptr(sock)), stack_invoked); #endif return -1; } int SocketTap::Write(Connection *conn, void *data, ssize_t len) { - if(conn->socket_type == SOCK_RAW) { // we don't want to use a stack, just VL2 + // VL2, SOCK_RAW, no network stack + if(conn->socket_type == SOCK_RAW) { struct ether_header *eh = (struct ether_header *) data; MAC src_mac; MAC dest_mac; @@ -350,6 +376,10 @@ namespace ZeroTier { #if defined(STACK_PICO) if(picostack) return picostack->pico_Write(conn, data, len); +#endif +#if defined(STACK_LWIP) + if(lwipstack) + return lwipstack->lwip_Write(conn, data, len); #endif return -1; } @@ -387,6 +417,10 @@ namespace ZeroTier { break; } } +#endif +#if defined(STACK_LWIP) + if(lwipstack) + lwipstack->lwip_Close(conn); #endif return 0; // TODO } diff --git a/src/SocketTap.hpp b/src/SocketTap.hpp index dc28e68..b8edd69 100644 --- a/src/SocketTap.hpp +++ b/src/SocketTap.hpp @@ -137,23 +137,23 @@ namespace ZeroTier { /* * For moving data onto the ZeroTier virtual wire */ - void (*_handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int, - const void *,unsigned int); + void (*_handler)(void *, void *, uint64_t, const MAC &, const MAC &, unsigned int, unsigned int, + const void *, unsigned int); /* * Signals us to close the TcpConnection associated with this PhySocket */ - void phyOnUnixClose(PhySocket *sock,void **uptr); + void phyOnUnixClose(PhySocket *sock, void **uptr); /* * Notifies us that there is data to be read from an application's socket */ - void phyOnUnixData(PhySocket *sock,void **uptr,void *data,ssize_t len); + void phyOnUnixData(PhySocket *sock, void **uptr, void *data, ssize_t len); /* * Notifies us that we can write to an application's socket */ - void phyOnUnixWritable(PhySocket *sock,void **uptr,bool lwip_invoked); + void phyOnUnixWritable(PhySocket *sock, void **uptr, bool lwip_invoked); /****************************************************************************/ /* Vars */ diff --git a/src/libzt.cpp b/src/libzt.cpp index 9b2b502..e1bc65a 100644 --- a/src/libzt.cpp +++ b/src/libzt.cpp @@ -689,6 +689,7 @@ int zts_bind(ZT_BIND_SIG) { else { tap->_Connections.push_back(conn); err = tap->Bind(conn, fd, addr, addrlen); + conn->tap = tap; if(err == 0) { // success ZeroTier::unmap.erase(fd); ZeroTier::fdmap[fd] = new std::pair(conn, tap); @@ -724,7 +725,6 @@ Linux: [ ] [EOPNOTSUPP] The socket is not of a type that supports the listen() operation. */ int zts_listen(ZT_LISTEN_SIG) { -#if defined(STACK_PICO) DEBUG_EXTRA("fd = %d", fd); int err = 0; if(fd < 0) { @@ -757,8 +757,6 @@ int zts_listen(ZT_LISTEN_SIG) { ZeroTier::_multiplexer_lock.unlock(); } return err; -#endif - return 0; } /* @@ -776,68 +774,63 @@ Darwin: [ ] [ENFILE] The system file table is full. */ int zts_accept(ZT_ACCEPT_SIG) { -#if defined(STACK_PICO) DEBUG_EXTRA("fd = %d", fd); int err = 0; if(fd < 0) { errno = EBADF; return -1; } - else - { - // +1 since we'll be creating a new pico_socket when we accept the connection - if(pico_ntimers()+1 >= PICO_MAX_TIMERS) { - DEBUG_ERROR("cannot provision additional socket due to limitation of PICO_MAX_TIMERS."); - errno = EMFILE; - err = -1; - } - ZeroTier::_multiplexer_lock.lock(); - std::pair *p = ZeroTier::fdmap[fd]; - if(!p) { - DEBUG_ERROR("unable to locate connection pair (did you zts_bind())?"); - errno = EBADF; - err = -1; - } - else { - ZeroTier::Connection *conn = p->first; - ZeroTier::SocketTap *tap = p->second; - - // BLOCKING: loop and keep checking until we find a newly accepted connection - int f_err, blocking = 1; - if ((f_err = fcntl(fd, F_GETFL, 0)) < 0) { - DEBUG_ERROR("fcntl error, err = %s, errno = %d", f_err, errno); - err = -1; - } - else { - blocking = !(f_err & O_NONBLOCK); - } - if(!err) { - ZeroTier::Connection *accepted_conn; - if(!blocking) { // non-blocking - DEBUG_EXTRA("EWOULDBLOCK, not a real error, assuming non-blocking mode"); - errno = EWOULDBLOCK; - err = -1; - accepted_conn = tap->Accept(conn); - } - else { // blocking - while(true) { - usleep(ZT_ACCEPT_RECHECK_DELAY * 1000); - accepted_conn = tap->Accept(conn); - if(accepted_conn) - break; // accepted fd = err - } - } - if(accepted_conn) { - ZeroTier::fdmap[accepted_conn->app_fd] = new std::pair(accepted_conn, tap); - err = accepted_conn->app_fd; - } - } - } - ZeroTier::_multiplexer_lock.unlock(); + // +1 since we'll be creating a new pico_socket when we accept the connection + if(!can_provision_new_socket()) { + DEBUG_ERROR("cannot provision additional socket due to limitation of network stack"); + errno = EMFILE; + return -1; } + ZeroTier::_multiplexer_lock.lock(); + std::pair *p = ZeroTier::fdmap[fd]; + if(!p) { + DEBUG_ERROR("unable to locate connection pair (did you zts_bind())?"); + errno = EBADF; + err = -1; + } + else { + ZeroTier::Connection *conn = p->first; + ZeroTier::SocketTap *tap = p->second; + + // BLOCKING: loop and keep checking until we find a newly accepted connection + int f_err, blocking = 1; + if ((f_err = fcntl(fd, F_GETFL, 0)) < 0) { + DEBUG_ERROR("fcntl error, err = %s, errno = %d", f_err, errno); + err = -1; + } + else { + blocking = !(f_err & O_NONBLOCK); + } + + if(!err) { + ZeroTier::Connection *accepted_conn; + if(!blocking) { // non-blocking + DEBUG_EXTRA("EWOULDBLOCK, not a real error, assuming non-blocking mode"); + errno = EWOULDBLOCK; + err = -1; + accepted_conn = tap->Accept(conn); + } + else { // blocking + while(true) { + usleep(ZT_ACCEPT_RECHECK_DELAY * 1000); + accepted_conn = tap->Accept(conn); + if(accepted_conn) + break; // accepted fd = err + } + } + if(accepted_conn) { + ZeroTier::fdmap[accepted_conn->app_fd] = new std::pair(accepted_conn, tap); + err = accepted_conn->app_fd; + } + } + } + ZeroTier::_multiplexer_lock.unlock(); return err; -#endif - return 0; } @@ -1301,12 +1294,12 @@ ssize_t zts_recvmsg(ZT_RECVMSG_SIG) } int zts_read(ZT_READ_SIG) { - //DEBUG_INFO("fd = %d", fd); + DEBUG_INFO("fd = %d", fd); return read(fd, buf, len); } int zts_write(ZT_WRITE_SIG) { - //DEBUG_INFO("fd = %d", fd); + DEBUG_INFO("fd = %d", fd); return write(fd, buf, len); } @@ -1695,6 +1688,20 @@ int zts_get_pico_socket(int fd, struct pico_socket **s) } #endif +bool can_provision_new_socket() +{ +#if defined(STACK_PICO) + if(pico_ntimers()+1 >= PICO_MAX_TIMERS) { + return false; + } + return true; +#endif +#if defined(STACK_LWIP) + // TODO: Add check here (see lwipopts.h) + return true; +#endif +} + int zts_nsockets() { ZeroTier::_multiplexer_lock.unlock(); diff --git a/src/lwIP.cpp b/src/lwIP.cpp index 32a0172..cab89e3 100644 --- a/src/lwIP.cpp +++ b/src/lwIP.cpp @@ -26,6 +26,8 @@ // lwIP network stack driver +#include + #include "libzt.h" #include "SocketTap.hpp" #include "Utilities.hpp" @@ -176,7 +178,7 @@ namespace ZeroTier void lwIP::lwip_rx(SocketTap *tap, const MAC &from,const MAC &to,unsigned int etherType,const void *data,unsigned int len) { - DEBUG_INFO(); + DEBUG_INFO("etherType=%x, len=%d", etherType, len); struct pbuf *p,*q; if (!tap->_enabled) return; @@ -224,8 +226,11 @@ namespace ZeroTier int lwIP::lwip_Socket(void **pcb, int socket_family, int socket_type, int protocol) { - // TODO: check lwIP timers, and max sockets DEBUG_INFO(); + if(!can_provision_new_socket()) { + DEBUG_ERROR("unable to create new socket due to limitation of network stack"); + return -1; + } if(socket_type == SOCK_STREAM) { struct tcp_pcb *new_tcp_PCB = tcp_new(); *pcb = new_tcp_PCB; @@ -236,16 +241,101 @@ namespace ZeroTier *pcb = new_udp_PCB; return ERR_OK; } - if(socket_type == SOCK_RAW) { - DEBUG_ERROR("SOCK_RAW, not currently supported."); - return -1; - } return -1; } int lwIP::lwip_Connect(Connection *conn, int fd, const struct sockaddr *addr, socklen_t addrlen) { DEBUG_INFO(); + ip_addr_t ba; + char addrstr[INET6_ADDRSTRLEN]; + int port = 0, err = 0; + +#if defined(LIBZT_IPV4) + struct sockaddr_in *in4; + if(addr->sa_family == AF_INET) { + in4 = (struct sockaddr_in *)addr; + inet_ntop(AF_INET, &(in4->sin_addr), addrstr, INET_ADDRSTRLEN); + DEBUG_INFO("%s:%d", addrstr, lwip_ntohs(in4->sin_port)); + } + ba = convert_ip(in4); + port = lwip_ntohs(in4->sin_port); +#endif +#if defined(LIBZT_IPV6) + struct sockaddr_in6 *in6 = (struct sockaddr_in6*)&addr; + in6_to_ip6((ip6_addr *)&ba, in6); + if(addr->sa_family == AF_INET6) { + struct sockaddr_in6 *connaddr6 = (struct sockaddr_in6 *)addr; + inet_ntop(AF_INET6, &(connaddr6->sin6_addr), addrstr, INET6_ADDRSTRLEN); + DEBUG_INFO("%s:%d", addrstr, lwip_ntohs(connaddr6->sin6_port)); + } +#endif + + DEBUG_INFO("addr=%s", addrstr); + + if(conn->socket_type == SOCK_DGRAM) { + // Generates no network traffic + if((err = udp_connect((struct udp_pcb*)conn->pcb,(ip_addr_t *)&ba,port)) < 0) { + DEBUG_ERROR("error while connecting to with UDP"); + } + udp_recv((struct udp_pcb*)conn->pcb, nc_udp_recved, conn); + return ERR_OK; + } + + if(conn->socket_type == SOCK_STREAM) { + struct tcp_pcb *tpcb = (struct tcp_pcb*)conn->pcb; + tcp_sent(tpcb, nc_sent); + tcp_recv(tpcb, nc_recved); + tcp_err(tpcb, nc_err); + tcp_poll(tpcb, nc_poll, LWIP_APPLICATION_POLL_FREQ); + tcp_arg(tpcb, conn); + + //DEBUG_EXTRA(" pcb->state=%x", conn->TCP_pcb->state); + //if(conn->TCP_pcb->state != CLOSED) { + // DEBUG_INFO(" cannot connect using this PCB, PCB!=CLOSED"); + // tap->sendReturnValue(tap->_phy.getDescriptor(rpcSock), -1, EAGAIN); + // return; + //} + if((err = tcp_connect(tpcb,&ba,port,nc_connected)) < 0) + { + if(err == ERR_ISCONN) { + // Already in connected state + errno = EISCONN; + return -1; + } if(err == ERR_USE) { + // Already in use + errno = EADDRINUSE; + return -1; + } if(err == ERR_VAL) { + // Invalid ipaddress parameter + errno = EINVAL; + return -1; + } if(err == ERR_RTE) { + // No route to host + errno = ENETUNREACH; + return -1; + } if(err == ERR_BUF) { + // No more ports available + errno = EAGAIN; + return -1; + } + if(err == ERR_MEM) { + // TODO: Doesn't describe the problem well, but closest match + errno = EAGAIN; + return -1; + } + // We should only return a value if failure happens immediately + // Otherwise, we still need to wait for a callback from lwIP. + // - This is because an ERR_OK from tcp_connect() only verifies + // that the SYN packet was enqueued onto the stack properly, + // that's it! + // - Most instances of a retval for a connect() should happen + // in the nc_connect() and nc_err() callbacks! + DEBUG_ERROR("unable to connect"); + errno = EAGAIN; + return -1; + } + } } int lwIP::lwip_Bind(SocketTap *tap, Connection *conn, int fd, const struct sockaddr *addr, socklen_t addrlen) @@ -256,20 +346,14 @@ namespace ZeroTier int port = 0, err = 0; #if defined(LIBZT_IPV4) - DEBUG_ERROR("A"); struct sockaddr_in *in4; if(addr->sa_family == AF_INET) { - DEBUG_ERROR("A"); in4 = (struct sockaddr_in *)addr; - DEBUG_ERROR("A"); inet_ntop(AF_INET, &(in4->sin_addr), addrstr, INET_ADDRSTRLEN); - DEBUG_ERROR("A"); DEBUG_INFO("%s:%d", addrstr, lwip_ntohs(in4->sin_port)); } ba = convert_ip(in4); port = lwip_ntohs(in4->sin_port); - DEBUG_INFO("port=%d", port); - DEBUG_INFO("port=%d", lwip_ntohs(port)); #endif #if defined(LIBZT_IPV6) struct sockaddr_in6 *in6 = (struct sockaddr_in6*)&addr; @@ -318,28 +402,199 @@ namespace ZeroTier return err; } - int lwIP::lwip_Listen(SocketTap *tap, PhySocket *sock, PhySocket *rpcSock, void **uptr, struct listen_st *listen_rpc) + int lwIP::lwip_Listen(Connection *conn, int backlog) { - DEBUG_INFO(); - // to be implemented + DEBUG_INFO("conn=%p", conn); + struct tcp_pcb* listeningPCB; +#ifdef TCP_LISTEN_BACKLOG + listeningPCB = tcp_listen_with_backlog((struct tcp_pcb*)conn->pcb, backlog); +#else + listeningPCB = tcp_listen((struct tcp_pcb*)conn->pcb); +#endif + if(listeningPCB != NULL) { + conn->pcb = listeningPCB; + tcp_accept(listeningPCB, nc_accept); // set callback + tcp_arg(listeningPCB, conn); + //fcntl(tap->_phy.getDescriptor(conn->sock), F_SETFL, O_NONBLOCK); + } + return 0; } - int lwIP::lwip_Read(SocketTap *tap, PhySocket *sock, void **uptr, bool lwip_invoked) + Connection* lwIP::lwip_Accept(Connection *conn) { - DEBUG_EXTRA(); - // to be implemented + DEBUG_EXTRA("conn=%p", conn); + if(!conn) { + DEBUG_ERROR("invalid conn"); + handle_general_failure(); + return NULL; + } + // Retreive first of queued Connections from parent connection + Connection *new_conn = NULL; + DEBUG_INFO("locking..."); + Mutex::Lock _l(conn->tap->_tcpconns_m); + DEBUG_INFO("locked."); + if(conn->_AcceptedConnections.size()) { + new_conn = conn->_AcceptedConnections.front(); + conn->_AcceptedConnections.pop(); + } + return new_conn; } - int lwIP::lwip_Write(SocketTap *tap, Connection *conn) + int lwIP::lwip_Read(Connection *conn, bool lwip_invoked) + { + DEBUG_EXTRA("conn=%p", conn); + if(!conn) { + DEBUG_ERROR("no connection"); + return -1; + } + if(!lwip_invoked) { + DEBUG_INFO("!lwip_invoked"); + conn->tap->_tcpconns_m.lock(); + conn->_rx_m.lock(); + } + if(conn->RXbuf->count()) { + int max = conn->socket_type == SOCK_STREAM ? ZT_STACK_TCP_SOCKET_RX_SZ : ZT_STACK_TCP_SOCKET_RX_SZ; + int wr = std::min((ssize_t)max, (ssize_t)conn->RXbuf->count()); + int n = conn->tap->_phy.streamSend(conn->sock, conn->RXbuf->get_buf(), wr); + char str[22]; + memcpy(str, conn->RXbuf->get_buf(), 22); + DEBUG_INFO("string = %s", str); + DEBUG_INFO("n =%d", n); + conn->RXbuf->consume(n); + + //if(n == max) + //{ + //if(conn->socket_type == SOCK_DGRAM){ + // conn->tap->_phy.setNotifyWritable(conn->sock, false); + //} + if(conn->socket_type == SOCK_STREAM) { // Only acknolwedge receipt of TCP packets + tcp_recved((struct tcp_pcb*)conn->pcb, n); + DEBUG_TRANS("TCP RX %ld bytes", n); + } + //} + } + if(conn->RXbuf->count() == 0) { + DEBUG_INFO("wrote everything"); + conn->tap->_phy.setNotifyWritable(conn->sock, false); // nothing else to send to the app + } + if(!lwip_invoked) { + DEBUG_INFO("unlocking..."); + conn->tap->_tcpconns_m.unlock(); + conn->_rx_m.unlock(); + } + + /* + int payload_sz, addr_sz_offset = sizeof(struct sockaddr_storage); + memcpy(&payload_sz, conn->rxbuf + addr_sz_offset, sizeof(int)); // OPT: + // extract address + struct sockaddr_storage addr; + memcpy(&addr, conn->rxbuf, addr_sz_offset); + */ + } + + int lwIP::lwip_Write(Connection *conn, void *data, ssize_t len) { DEBUG_EXTRA("conn=%p", (void*)&conn); - // to be implemented + if(!conn) { + DEBUG_ERROR("no connection"); + return -1; + } + if(conn->socket_type == SOCK_DGRAM) + { + // TODO: Packet re-assembly hasn't yet been tested with lwIP so UDP packets are limited to MTU-sized chunks + int udp_trans_len = std::min((ssize_t)conn->TXbuf->count(), (ssize_t)ZT_MAX_MTU); + DEBUG_EXTRA("allocating pbuf chain of size=%d for UDP packet, txsz=%d", udp_trans_len, conn->TXbuf->count()); + struct pbuf * pb = pbuf_alloc(PBUF_TRANSPORT, udp_trans_len, PBUF_POOL); + if(!pb){ + DEBUG_ERROR("unable to allocate new pbuf of size=%d", conn->TXbuf->count()); + return -1; + } + memcpy(pb->payload, conn->TXbuf->get_buf(), udp_trans_len); + int err = udp_send((struct udp_pcb*)conn->pcb, pb); + + if(err == ERR_MEM) { + DEBUG_ERROR("error sending packet. out of memory"); + } else if(err == ERR_RTE) { + DEBUG_ERROR("could not find route to destinations address"); + } else if(err != ERR_OK) { + DEBUG_ERROR("error sending packet - %d", err); + } else { + conn->TXbuf->consume(udp_trans_len); // success + } + pbuf_free(pb); + return ERR_OK; + } + if(conn->socket_type == SOCK_STREAM) + { + // How much we are currently allowed to write to the connection + ssize_t sndbuf = ((struct tcp_pcb*)conn->pcb)->snd_buf; + int err, sz, r; + + if(!sndbuf) { + // PCB send buffer is full, turn off readability notifications for the + // corresponding PhySocket until nc_sent() is called and confirms that there is + // now space on the buffer + DEBUG_ERROR(" LWIP stack is full, sndbuf == 0"); + conn->tap->_phy.setNotifyReadable(conn->sock, false); + return -1; + } + if(conn->TXbuf->count() <= 0) + return -1; // Nothing to write + + //if(!conn->listening) + // tcp_output(conn->TCP_pcb); + + if(conn->sock) { + r = std::min((ssize_t)conn->TXbuf->count(), sndbuf); + // Writes data pulled from the client's socket buffer to LWIP. This merely sends the + // data to LWIP to be enqueued and eventually sent to the network. + if(r > 0) { + err = tcp_write((struct tcp_pcb*)conn->pcb, conn->TXbuf->get_buf(), r, TCP_WRITE_FLAG_COPY); + tcp_output((struct tcp_pcb*)conn->pcb); + if(err != ERR_OK) { + DEBUG_ERROR(" error while writing to PCB, err=%d", err); + if(err == -1) + DEBUG_ERROR("out of memory"); + return -1; + } else { + conn->TXbuf->consume(r); // success + return ERR_OK; + } + } + } + } } - int lwIP::lwip_Close(SocketTap *tap, PhySocket *sock, Connection *conn) + int lwIP::lwip_Close(Connection *conn) { DEBUG_INFO(); - // to be implemented + + if(conn->socket_type == SOCK_DGRAM) { + udp_remove((struct udp_pcb*)conn->pcb); + } + // FIXME: check if already closed? conn->TCP_pcb->state != CLOSED + if(conn->pcb) { + //DEBUG_EXTRA("conn=%p, sock=%p, PCB->state = %d", + // (void*)&conn, (void*)&sock, conn->TCP_pcb->state); + if(((struct tcp_pcb*)conn->pcb)->state == SYN_SENT /*|| conn->TCP_pcb->state == CLOSE_WAIT*/) { + DEBUG_EXTRA("ignoring close request. invalid PCB state for this operation. sock=%p", conn->sock); + return -1; + } + // DEBUG_BLANK("__tcp_close(...)"); + struct tcp_pcb* tpcb = (struct tcp_pcb*)conn->pcb; + if(tcp_close(tpcb) == ERR_OK) { + // Unregister callbacks for this PCB + tcp_arg(tpcb, NULL); + tcp_recv(tpcb, NULL); + tcp_err(tpcb, NULL); + tcp_sent(tpcb, NULL); + tcp_poll(tpcb, NULL, 1); + } + else { + DEBUG_EXTRA("error while calling tcp_close() sock=%p", conn->sock); + } + } + return 0; } /****************************************************************************/ @@ -349,15 +604,85 @@ namespace ZeroTier err_t lwIP::nc_recved(void *arg, struct tcp_pcb *PCB, struct pbuf *p, err_t err) { DEBUG_INFO(); - // to be implemented + Connection *conn = (Connection *)arg; + int tot = 0; + + if(!conn) { + DEBUG_ERROR("no connection"); + return ERR_OK; // FIXME: Determine if this is correct behaviour expected by the stack + } + + //Mutex::Lock _l(conn->tap->_tcpconns_m); + //Mutex::Lock _l2(conn->_rx_m); + + DEBUG_INFO("locking..."); + + conn->tap->_tcpconns_m.lock(); + conn->_rx_m.lock(); + + DEBUG_INFO("locked."); + + struct pbuf* q = p; + if(p == NULL) { + if(((struct tcp_pcb*)conn->pcb)->state == CLOSE_WAIT) { + // FIXME: Implement? + } + DEBUG_INFO("p == NULL"); + return ERR_ABRT; + } + + // Cycle through pbufs and write them to the RX buffer + // The RX buffer will be emptied via phyOnUnixWritable() + while(p != NULL) { + if(p->len <= 0) + break; + int avail = ZT_TCP_RX_BUF_SZ - conn->RXbuf->count(); + int len = p->len; + if(avail < len) { + DEBUG_ERROR("not enough room (%d bytes) on RX buffer", avail); + } + memcpy(conn->RXbuf->get_buf(), p->payload, len); + conn->RXbuf->produce(len); + p = p->next; + tot += len; + } + DEBUG_INFO("tot=%d", tot); + + conn->tap->_tcpconns_m.unlock(); + conn->_rx_m.unlock(); + + if(tot) { + conn->tap->_phy.setNotifyWritable(conn->sock, true); + //conn->tap->phyOnUnixWritable(conn->sock, NULL, true); // to app + } + pbuf_free(q); return ERR_OK; } err_t lwIP::nc_accept(void *arg, struct tcp_pcb *newPCB, err_t err) { - DEBUG_INFO(); - // to be implemented - return -1; + Connection *conn = (Connection*)arg; + DEBUG_INFO("conn=%p", conn); + //Mutex::Lock _l(conn->tap->_tcpconns_m); + // create and populate new Connection object + Connection *new_conn = new Connection(); + new_conn->socket_type = SOCK_STREAM; + new_conn->pcb = newPCB; + new_conn->tap = conn->tap; + new_conn->sock = conn->tap->_phy.wrapSocket(new_conn->sdk_fd, new_conn); + //memcpy(new_conn->tap->_phy.getuptr(new_conn->sock), new_conn, sizeof(conn)); + DEBUG_INFO("new_conn=%p", new_conn); + // add new Connection object to parent connection so that we can find it via lwip_Accept() + conn->_AcceptedConnections.push(new_conn); + // set callbacks + tcp_arg(newPCB, new_conn); + tcp_recv(newPCB, nc_recved); + tcp_err(newPCB, nc_err); + tcp_sent(newPCB, nc_sent); + tcp_poll(newPCB, nc_poll, 1); + // let lwIP know that it can queue additional incoming connections + tcp_accepted((struct tcp_pcb*)conn->pcb); + return 0; } void lwIP::nc_udp_recved(void * arg, struct udp_pcb * upcb, struct pbuf * p, const ip_addr_t * addr, u16_t port) @@ -365,31 +690,113 @@ namespace ZeroTier DEBUG_INFO(); // to be implemented } - + err_t lwIP::nc_sent(void* arg, struct tcp_pcb *PCB, u16_t len) { DEBUG_EXTRA("pcb=%p", (void*)&PCB); - // to be implemented + Connection *conn = (Connection *)arg; + Mutex::Lock _l(conn->tap->_tcpconns_m); + if(conn && len) { + int softmax = conn->socket_type == SOCK_STREAM ? ZT_TCP_TX_BUF_SZ : ZT_UDP_TX_BUF_SZ; + if(conn->TXbuf->count() < softmax) { + conn->tap->_phy.setNotifyReadable(conn->sock, true); + conn->tap->_phy.whack(); + } + } return ERR_OK; } - + err_t lwIP::nc_connected(void *arg, struct tcp_pcb *PCB, err_t err) { DEBUG_ATTN("pcb=%p", (void*)&PCB); - // to be implemented - return ERR_OK; + Connection *conn = (Connection *)arg; + if(conn) + return ERR_OK; + return -1; + // FIXME: check stack for expected return values } err_t lwIP::nc_poll(void* arg, struct tcp_pcb *PCB) { - DEBUG_INFO(); - // to be implemented return ERR_OK; } void lwIP::nc_err(void *arg, err_t err) { - DEBUG_INFO(); - // to be implemented + DEBUG_ERROR("err=%d", err); + Connection *conn = (Connection *)arg; + Mutex::Lock _l(conn->tap->_tcpconns_m); + + if(!conn){ + DEBUG_ERROR("conn==NULL"); + errno = -1; // FIXME: Find more appropriate value + } + int fd = conn->tap->_phy.getDescriptor(conn->sock); + DEBUG_ERROR("conn=%p, pcb=%p, err=%d", conn, conn->pcb, err); + DEBUG_ERROR("closing connection"); + conn->tap->Close(conn); + switch(err) + { + case ERR_MEM: + DEBUG_ERROR("ERR_MEM->ENOMEM"); + errno = ENOMEM; + break; + case ERR_BUF: + DEBUG_ERROR("ERR_BUF->ENOBUFS"); + errno = ENOBUFS; + break; + case ERR_TIMEOUT: + DEBUG_ERROR("ERR_TIMEOUT->ETIMEDOUT"); + errno = ETIMEDOUT; + break; + case ERR_RTE: + DEBUG_ERROR("ERR_RTE->ENETUNREACH"); + errno = ENETUNREACH; + break; + case ERR_INPROGRESS: + DEBUG_ERROR("ERR_INPROGRESS->EINPROGRESS"); + errno = EINPROGRESS; + break; + case ERR_VAL: + DEBUG_ERROR("ERR_VAL->EINVAL"); + errno = EINVAL; + break; + case ERR_WOULDBLOCK: + DEBUG_ERROR("ERR_WOULDBLOCK->EWOULDBLOCK"); + errno = EWOULDBLOCK; + break; + case ERR_USE: + DEBUG_ERROR("ERR_USE->EADDRINUSE"); + errno = EADDRINUSE; + break; + case ERR_ISCONN: + DEBUG_ERROR("ERR_ISCONN->EISCONN"); + errno = EISCONN; + break; + case ERR_ABRT: + DEBUG_ERROR("ERR_ABRT->ECONNREFUSED"); + errno = ECONNREFUSED; + break; + + // TODO: Below are errors which don't have a standard errno correlate + + case ERR_RST: + //l->tap->sendReturnValue(fd, -1, -1); + break; + case ERR_CLSD: + //l->tap->sendReturnValue(fd, -1, -1); + break; + case ERR_CONN: + //l->tap->sendReturnValue(fd, -1, -1); + break; + case ERR_ARG: + //l->tap->sendReturnValue(fd, -1, -1); + break; + case ERR_IF: + //l->tap->sendReturnValue(fd, -1, -1); + break; + default: + break; + } } } diff --git a/src/lwIP.hpp b/src/lwIP.hpp index 9cbf556..8e566bb 100644 --- a/src/lwIP.hpp +++ b/src/lwIP.hpp @@ -187,10 +187,11 @@ namespace ZeroTier { int lwip_Socket(void **pcb, int socket_family, int socket_type, int protocol); int lwip_Connect(Connection *conn, int fd, const struct sockaddr *addr, socklen_t addrlen); int lwip_Bind(SocketTap *tap, Connection *conn, int fd, const struct sockaddr *addr, socklen_t addrlen); - int lwip_Listen(SocketTap *tap, PhySocket *sock, PhySocket *rpcSock, void **uptr, struct listen_st *listen_rpc); - int lwip_Read(SocketTap *tap, PhySocket *sock, void **uptr, bool lwip_invoked); - int lwip_Write(SocketTap *tap, Connection *conn); - int lwip_Close(SocketTap *tap, PhySocket *sock, Connection *conn); + int lwip_Listen(Connection *conn, int backlog); + Connection* lwip_Accept(Connection *conn); + int lwip_Read(Connection *conn, bool lwip_invoked); + int lwip_Write(Connection *conn, void *data, ssize_t len); + int lwip_Close(Connection *conn); static err_t nc_recved(void *arg, struct tcp_pcb *PCB, struct pbuf *p, err_t err); static err_t nc_accept(void *arg, struct tcp_pcb *newPCB, err_t err); diff --git a/src/picoTCP.cpp b/src/picoTCP.cpp index 2594c81..0edecbe 100644 --- a/src/picoTCP.cpp +++ b/src/picoTCP.cpp @@ -506,7 +506,7 @@ namespace ZeroTier { int picoTCP::pico_Socket(struct pico_socket **p, int socket_family, int socket_type, int protocol) { int err = 0; - if(pico_ntimers() >= PICO_MAX_TIMERS) { + if(!can_provision_new_socket()) { DEBUG_ERROR("cannot create additional socket, see PICO_MAX_TIMERS. current = %d", pico_ntimers()); errno = EMFILE; err = -1;