Added standardization target for code style, fixed LWIP_DEBUG redefinition warning, fixed rare tcp_close() and cb_poll() bugs

This commit is contained in:
Joseph Henry
2017-09-13 22:34:25 -07:00
parent 0dc390ddcf
commit 3bec79314e
8 changed files with 188 additions and 151 deletions

View File

@@ -28,13 +28,13 @@
#include <sys/syscall.h>
#include <sys/types.h>
#define ZT_DEBUG_LEVEL 6 // Set this to adjust what you'd like to see in the debug traces
#define ZT_DEBUG_LEVEL 5 // Set this to adjust what you'd like to see in the debug traces
#define ZT_MSG_TEST 1 // For use in selftest
#define ZT_MSG_ERROR 2 // Errors
#define ZT_MSG_INFO 3 // Information which is generally useful to any developer
#define ZT_MSG_EXTRA 4 // If nothing in your world makes sense
#define ZT_MSG_TRANSFER 5 // RX/TX specific statements
#define ZT_MSG_TRANSFER 4 // RX/TX specific statements
#define ZT_MSG_EXTRA 5 // If nothing in your world makes sense
#define ZT_MSG_FLOW 6 // High-level flow messages
#define ZT_COLOR true

View File

@@ -73,8 +73,6 @@
*/
#define LWIP_DBG_HALT 0x08U
#define LWIP_DEBUG 0
//#define LWIP_DBG_TYPES_ON LWIP_DBG_TRACE | LWIP_DBG_STATE | LWIP_DBG_FRESH
#define LWIP_DBG_MIN_LEVEL LWIP_DBG_LEVEL_ALL

View File

@@ -282,6 +282,11 @@ nativetest:
## Misc ##
##############################################################################
standardize:
vera++ --transform trim_right src/*.cpp
vera++ --transform trim_right src/*.hpp
vera++ --transform trim_right include/*.cpp
clean:
-rm -rf $(BUILD)/*
-find . -type f \( -name '*.a' -o -name '*.o' -o -name '*.so' -o -name '*.o.d' -o -name '*.out' -o -name '*.log' -o -name '*.dSYM' \) -delete

View File

@@ -294,6 +294,11 @@ nativetest:
## Misc ##
##############################################################################
standardize:
vera++ --transform trim_right src/*.cpp
vera++ --transform trim_right src/*.hpp
vera++ --transform trim_right include/*.cpp
clean:
-rm -rf $(BUILD)/*
-find . -type f \( -name '*.a' -o -name '*.o' -o -name '*.so' -o -name '*.o.d' -o -name '*.out' -o -name '*.log' -o -name '*.dSYM' \) -delete

View File

@@ -1086,6 +1086,7 @@ int zts_close(ZT_CLOSE_SIG)
ZeroTier::VirtualSocket *vs = get_virtual_socket(fd);
if(!vs) {
DEBUG_ERROR("no vs found for fd=%d", fd);
handle_general_failure();
errno = EBADF;
return -1;
}
@@ -2157,6 +2158,9 @@ ZeroTier::VirtualSocket *get_virtual_socket(int fd)
if(p) {
vs = p->first;
}
else {
DEBUG_ERROR("unable to locate virtual socket");
}
}
ZeroTier::_multiplexer_lock.unlock();
return vs;
@@ -2378,7 +2382,7 @@ void *zts_start_service(void *thread_id) {
void handle_general_failure() {
#ifdef ZT_EXIT_ON_GENERAL_FAIL
DEBUG_ERROR("exiting (ZT_EXIT_ON_GENERAL_FAIL==1)");
//exit(-1);
exit(-1);
#endif
}

View File

@@ -26,11 +26,11 @@
/*
lwIP network stack driver
lwIP network stack driver
NOTES:
Calls made in this network stack driver may never block since all packet
Calls made in this network stack driver may never block since all packet
processing (input and output) as well as timer processing (TCP mainly) is done
in a single execution context.
@@ -83,8 +83,10 @@ err_t lwip_eth_tx(struct netif *netif, struct pbuf *p)
src_mac.setTo(ethhdr->src.addr, 6);
dest_mac.setTo(ethhdr->dest.addr, 6);
tap->_handler(tap->_arg,NULL,tap->_nwid,src_mac,dest_mac,
ZeroTier::Utils::ntoh((uint16_t)ethhdr->type),0,buf + sizeof(struct eth_hdr),totalLength - sizeof(struct eth_hdr));
char *data = buf + sizeof(struct eth_hdr);
int len = totalLength - sizeof(struct eth_hdr);
int proto = ZeroTier::Utils::ntoh((uint16_t)ethhdr->type);
tap->_handler(tap->_arg, NULL, tap->_nwid, src_mac, dest_mac, proto, 0, data, len);
if(ZT_DEBUG_LEVEL >= ZT_MSG_TRANSFER) {
char flagbuf[32];
@@ -94,7 +96,7 @@ err_t lwip_eth_tx(struct netif *netif, struct pbuf *p)
ZeroTier::MAC mac;
mac.setTo(ethhdr->dest.addr, 6);
mac.toAddress(tap->_nwid).toString(nodeBuf);
DEBUG_TRANS("len=%5d dst=%s [%s TX <-- %s] proto=0x%04x %s %s", totalLength, macBuf, nodeBuf, tap->nodeId().c_str(),
DEBUG_TRANS("len=%5d dst=%s [%s TX <-- %s] proto=0x%04x %s %s", totalLength, macBuf, nodeBuf, tap->nodeId().c_str(),
ZeroTier::Utils::ntoh(ethhdr->type), beautify_eth_proto_nums(ZeroTier::Utils::ntoh(ethhdr->type)), flagbuf);
}
return ERR_OK;
@@ -104,7 +106,7 @@ namespace ZeroTier
{
void lwIP::lwip_init_interface(VirtualTap *tap, const InetAddress &ip)
{
/* NOTE: It is a known issue that when assigned more than one IP address via
/* NOTE: It is a known issue that when assigned more than one IP address via
Central, this interface will be unable to transmit (including ARP). */
Mutex::Lock _l(tap->_ips_m);
@@ -127,7 +129,11 @@ namespace ZeroTier
tap->lwipdev.linkoutput = lwip_eth_tx;
tap->lwipdev.hwaddr_len = 6;
tap->_mac.copyTo(tap->lwipdev.hwaddr, tap->lwipdev.hwaddr_len);
tap->lwipdev.flags = NETIF_FLAG_BROADCAST | NETIF_FLAG_ETHARP | NETIF_FLAG_IGMP | NETIF_FLAG_LINK_UP | NETIF_FLAG_UP;
tap->lwipdev.flags = NETIF_FLAG_BROADCAST
| NETIF_FLAG_ETHARP
| NETIF_FLAG_IGMP
| NETIF_FLAG_LINK_UP
| NETIF_FLAG_UP;
netif_set_default(&(tap->lwipdev));
netif_set_link_up(&(tap->lwipdev));
netif_set_up(&(tap->lwipdev));
@@ -157,30 +163,31 @@ namespace ZeroTier
//struct netif *netif, const ip6_addr_t *ip6addr, s8_t *chosen_idx
//netif_add_ip6_address();
// linklocal
tap->lwipdev6.ip6_autoconfig_enabled = 1;
netif_create_ip6_linklocal_address(&(tap->lwipdev6), 1);
netif_ip6_addr_set_state(&(tap->lwipdev6), 0, IP6_ADDR_TENTATIVE);
netif_ip6_addr_set_state(&(tap->lwipdev6), 0, IP6_ADDR_TENTATIVE);
// manually config addresses
ip6_addr_copy(ip_2_ip6(tap->lwipdev6.ip6_addr[1]), addr6);
netif_ip6_addr_set_state(&(tap->lwipdev6), 1, IP6_ADDR_TENTATIVE);
netif_ip6_addr_set_state(&(tap->lwipdev6), 1, IP6_ADDR_TENTATIVE);
netif_set_default(&(tap->lwipdev6));
netif_set_link_up(&(tap->lwipdev6));
// state and flags
tap->lwipdev6.state = tap;
tap->lwipdev6.flags = NETIF_FLAG_LINK_UP | NETIF_FLAG_UP;
tap->lwipdev6.flags = NETIF_FLAG_LINK_UP
| NETIF_FLAG_UP;
netif_set_up(&(tap->lwipdev6));
netif_set_up(&(tap->lwipdev6));
char macbuf[ZT_MAC_ADDRSTRLEN];
mac2str(macbuf, ZT_MAC_ADDRSTRLEN, tap->lwipdev6.hwaddr);
DEBUG_INFO("mac=%s, addr=%s", macbuf, ip.toString(ipbuf));
DEBUG_INFO("mac=%s, addr=%s", macbuf, ip.toString(ipbuf));
}
#endif
#endif
}
}
@@ -192,7 +199,7 @@ namespace ZeroTier
while(pcb_ptr) {
pcb_ptr = pcb_ptr->next;
count++;
}
}
pcb_ptr = tcp_tw_pcbs; // PCBs in TIME-WAIT state
while(pcb_ptr) {
pcb_ptr = pcb_ptr->next;
@@ -217,11 +224,11 @@ namespace ZeroTier
{
// TODO: These will likely need some sort of locking protection
int count = 0;
struct udp_pcb *pcb_ptr = udp_pcbs;
struct udp_pcb *pcb_ptr = udp_pcbs;
while(pcb_ptr) {
pcb_ptr = pcb_ptr->next;
count++;
}
}
return count;
}
@@ -230,17 +237,22 @@ namespace ZeroTier
// TODO: These will likely need some sort of locking protection
/*
int count = 0;
struct raw_pcb *pcb_ptr = raw_pcbs;
struct raw_pcb *pcb_ptr = raw_pcbs;
while(pcb_ptr) {
pcb_ptr = pcb_ptr->next;
count++;
DEBUG_ERROR("FOUND --- raw_pcbs PCB COUNT = %d", count);
}
}
return count;
*/
return 0;
}
int lwIP::lwip_num_total_pcbs()
{
return lwip_num_current_raw_pcbs() + lwip_num_current_udp_pcbs() + lwip_num_current_tcp_pcbs();
}
int lwIP::lwip_add_dns_nameserver(struct sockaddr *addr)
{
return -1;
@@ -253,7 +265,6 @@ namespace ZeroTier
void lwIP::lwip_loop(VirtualTap *tap)
{
// DEBUG_INFO();
uint64_t prev_tcp_time = 0, prev_discovery_time = 0;
while(tap->_run)
{
@@ -272,7 +283,7 @@ namespace ZeroTier
if (since_tcp >= LWIP_TCP_TIMER_INTERVAL) {
prev_tcp_time = now;
tcp_tmr();
}
}
else {
tcp_remaining = LWIP_TCP_TIMER_INTERVAL - since_tcp;
}
@@ -292,7 +303,8 @@ namespace ZeroTier
}
}
void lwIP::lwip_eth_rx(VirtualTap *tap, const MAC &from,const MAC &to,unsigned int etherType,const void *data,unsigned int len)
void lwIP::lwip_eth_rx(VirtualTap *tap, const MAC &from, const MAC &to, unsigned int etherType,
const void *data, unsigned int len)
{
struct pbuf *p,*q;
if (!tap->_enabled){
@@ -320,7 +332,7 @@ namespace ZeroTier
memcpy(q->payload,dataptr,q->len);
dataptr += q->len;
}
}
}
if(ZT_DEBUG_LEVEL >= ZT_MSG_TRANSFER) {
char flagbuf[32];
memset(&flagbuf, 0, 32);
@@ -329,7 +341,7 @@ namespace ZeroTier
ZeroTier::MAC mac;
mac.setTo(ethhdr.src.addr, 6);
mac.toAddress(tap->_nwid).toString(nodeBuf);
DEBUG_TRANS("len=%5d dst=%s [%s RX --> %s] proto=0x%04x %s %s", len, macBuf, nodeBuf, tap->nodeId().c_str(),
DEBUG_TRANS("len=%5d dst=%s [%s RX --> %s] proto=0x%04x %s %s", len, macBuf, nodeBuf, tap->nodeId().c_str(),
ZeroTier::Utils::ntoh(ethhdr.type), beautify_eth_proto_nums(ZeroTier::Utils::ntoh(ethhdr.type)), flagbuf);
}
else {
@@ -353,7 +365,7 @@ namespace ZeroTier
int lwIP::lwip_Socket(void **pcb, int socket_family, int socket_type, int protocol)
{
if(!can_provision_new_socket(socket_type)) {
DEBUG_ERROR("unable to create new socket due to limitation of network stack");
DEBUG_ERROR("unable to create socket due to limitation of network stack, PCBs=%d", lwip_num_total_pcbs());
errno = ENOMEM;
return -1;
}
@@ -374,30 +386,28 @@ namespace ZeroTier
int lwIP::lwip_Connect(VirtualSocket *vs, const struct sockaddr *addr, socklen_t addrlen)
{
//DEBUG_INFO();
ip_addr_t ba;
char addrstr[INET6_ADDRSTRLEN];
int port = 0, err = 0;
#if defined(LIBZT_IPV4)
struct sockaddr_in *in4 = (struct sockaddr_in *)addr;
if(addr->sa_family == AF_INET && vs->socket_type == SOCK_STREAM) {
inet_ntop(AF_INET, &(in4->sin_addr), addrstr, INET_ADDRSTRLEN);
inet_ntop(AF_INET, &(in4->sin_addr), addrstr, INET_ADDRSTRLEN);
DEBUG_EXTRA("connecting to %s : %d", addrstr, lwip_ntohs(in4->sin_port));
}
ba = convert_ip(in4);
ba = convert_ip(in4);
port = lwip_ntohs(in4->sin_port);
#endif
#if defined(LIBZT_IPV6)
struct sockaddr_in6 *in6 = (struct sockaddr_in6*)&addr;
in6_to_ip6((ip6_addr *)&ba, in6);
if(addr->sa_family == AF_INET6 && vs->socket_type == SOCK_STREAM) {
if(addr->sa_family == AF_INET6 && vs->socket_type == SOCK_STREAM) {
inet_ntop(AF_INET6, &(in6->sin6_addr), addrstr, INET6_ADDRSTRLEN);
DEBUG_EXTRA("connecting to %s : %d", addrstr, lwip_ntohs(in6->sin6_port));
}
#endif
if(vs->socket_type == SOCK_DGRAM) {
// Generates no network traffic
// generates no network traffic
if((err = udp_connect((struct udp_pcb*)vs->pcb,(ip_addr_t *)&ba,port)) < 0) {
DEBUG_ERROR("error while connecting to with UDP");
}
@@ -410,7 +420,7 @@ namespace ZeroTier
tcp_recv(tpcb, lwip_cb_tcp_recved);
tcp_err(tpcb, lwip_cb_err);
tcp_poll(tpcb, lwip_cb_poll, LWIP_APPLICATION_POLL_FREQ);
tcp_arg(tpcb, vs);
tcp_arg(tpcb, vs);
if((err = tcp_connect(tpcb,&ba,port,lwip_cb_connected)) < 0) {
errno = lwip_err_to_errno(err);
// We should only return a value if failure happens immediately
@@ -421,14 +431,13 @@ namespace ZeroTier
DEBUG_ERROR("unable to connect");
err = -1;
}
}
}
return err;
}
int lwIP::lwip_Bind(VirtualTap *tap, VirtualSocket *vs, const struct sockaddr *addr, socklen_t addrlen)
{
// TODO: Check case for IP_ADDR_ANY
//DEBUG_EXTRA("vs=%p", vs);
ip_addr_t ba;
char addrstr[INET6_ADDRSTRLEN];
memset(addrstr, 0, INET6_ADDRSTRLEN);
@@ -436,16 +445,16 @@ namespace ZeroTier
#if defined(LIBZT_IPV4)
struct sockaddr_in *in4 = (struct sockaddr_in *)addr;
if(addr->sa_family == AF_INET) {
inet_ntop(AF_INET, &(in4->sin_addr), addrstr, INET_ADDRSTRLEN);
inet_ntop(AF_INET, &(in4->sin_addr), addrstr, INET_ADDRSTRLEN);
DEBUG_EXTRA("binding to %s : %d", addrstr, lwip_ntohs(in4->sin_port));
}
ba = convert_ip(in4);
ba = convert_ip(in4);
port = lwip_ntohs(in4->sin_port);
#endif
#if defined(LIBZT_IPV6)
struct sockaddr_in6 *in6 = (struct sockaddr_in6*)addr;
in6_to_ip6((ip6_addr *)&ba, in6);
if(addr->sa_family == AF_INET6) {
if(addr->sa_family == AF_INET6) {
inet_ntop(AF_INET6, &(in6->sin6_addr), addrstr, INET6_ADDRSTRLEN);
DEBUG_EXTRA("binding to %s : %d", addrstr, lwip_ntohs(in6->sin6_port));
}
@@ -458,7 +467,7 @@ namespace ZeroTier
else {
// set callback
udp_recv((struct udp_pcb*)vs->pcb, lwip_cb_udp_recved, vs);
err = ERR_OK;
err = ERR_OK;
}
}
else if (vs->socket_type == SOCK_STREAM) {
@@ -467,7 +476,7 @@ namespace ZeroTier
err = -1;
}
else {
err = ERR_OK;
err = ERR_OK;
}
}
return err;
@@ -485,7 +494,7 @@ namespace ZeroTier
if(listeningPCB) {
vs->pcb = listeningPCB;
// set callback
tcp_accept(listeningPCB, lwip_cb_accept);
tcp_accept(listeningPCB, lwip_cb_accept);
tcp_arg(listeningPCB, vs);
err = ERR_OK;
}
@@ -498,7 +507,6 @@ namespace ZeroTier
VirtualSocket* lwIP::lwip_Accept(VirtualSocket *vs)
{
//DEBUG_INFO();
if(!vs) {
DEBUG_ERROR("invalid virtual socket");
handle_general_failure();
@@ -525,27 +533,19 @@ namespace ZeroTier
if(!lwip_invoked) {
DEBUG_INFO("!lwip_invoked");
vs->tap->_tcpconns_m.lock();
vs->_rx_m.lock();
vs->_rx_m.lock();
}
if(vs->RXbuf->count()) {
int max = vs->socket_type == SOCK_STREAM ? ZT_STACK_TCP_SOCKET_RX_SZ : ZT_STACK_TCP_SOCKET_RX_SZ;
int wr = std::min((ssize_t)max, (ssize_t)vs->RXbuf->count());
if(vs->socket_type == SOCK_STREAM && vs->RXbuf->count()) {
handle_general_failure();
int wr = std::min((ssize_t)ZT_STACK_TCP_SOCKET_RX_SZ, (ssize_t)vs->RXbuf->count());
int n = vs->tap->_phy.streamSend(vs->sock, vs->RXbuf->get_buf(), wr);
char str[22];
memcpy(str, vs->RXbuf->get_buf(), 22);
vs->RXbuf->consume(n);
if(vs->socket_type == SOCK_DGRAM)
{
// TODO
}
if(vs->socket_type == SOCK_STREAM) { // Only acknolwedge receipt of TCP packets
if(n > 0) {
vs->RXbuf->consume(n);
tcp_recved((struct tcp_pcb*)vs->pcb, n);
DEBUG_TRANS("TCP RX %d bytes", n);
}
}
if(vs->RXbuf->count() == 0) {
DEBUG_INFO("wrote everything");
vs->tap->_phy.setNotifyWritable(vs->sock, false); // nothing else to send to the app
}
if(!lwip_invoked) {
@@ -562,7 +562,8 @@ namespace ZeroTier
DEBUG_ERROR("no virtual socket");
return -1;
}
DEBUG_EXTRA("fd=%d, vs=%p, len=%d", vs->app_fd, vs, len);
DEBUG_EXTRA("fd=%d, vs=%p, pcb=%p, pcb->state=%d, len=%d",
vs->app_fd, vs, (struct tcp_pcb*)(vs->pcb), ((struct tcp_pcb*)(vs->pcb))->state, len);
if(vs->socket_type == SOCK_DGRAM) {
// TODO: Packet re-assembly hasn't yet been tested with lwIP so UDP packets are limited to MTU-sized chunks
int udp_trans_len = std::min(len, (ssize_t)ZT_MAX_MTU);
@@ -573,14 +574,14 @@ namespace ZeroTier
}
memcpy(pb->payload, data, udp_trans_len);
int err = udp_send((struct udp_pcb*)vs->pcb, pb);
if(err == ERR_MEM) {
DEBUG_ERROR("error sending packet. out of memory");
} else if(err == ERR_RTE) {
DEBUG_ERROR("could not find route to destinations address");
} else if(err != ERR_OK) {
DEBUG_ERROR("error sending packet - %d", err);
}
}
pbuf_free(pb);
if(err == ERR_OK) {
return udp_trans_len;
@@ -589,7 +590,6 @@ namespace ZeroTier
if(vs->socket_type == SOCK_STREAM) {
// How much we are currently allowed to write to the VirtualSocket
ssize_t sndbuf = ((struct tcp_pcb*)vs->pcb)->snd_buf;
int r;
if(!sndbuf) {
// PCB send buffer is full, turn off readability notifications for the
// corresponding PhySocket until lwip_cb_sent() is called and confirms that there is
@@ -609,7 +609,7 @@ namespace ZeroTier
err = -1; // nothing to write
}
if(!err) {
r = std::min((ssize_t)vs->TXbuf->count(), sndbuf);
int r = std::min((ssize_t)vs->TXbuf->count(), sndbuf);
// Writes data pulled from the client's socket buffer to LWIP. This merely sends the
// data to LWIP to be enqueued and eventually sent to the network.
if(r > 0) {
@@ -628,7 +628,7 @@ namespace ZeroTier
DEBUG_TRANS("len=%5d tx_buf_len=%10d [VSTXBF --> NSLWIP]", err, vs->TXbuf->count());
}
else {
// since we only processed the data by pointer reference we
// since we only processed the data by pointer reference we
// want to preserve it until it has been ACKed by the remote host
// (DO NOTHING)
}
@@ -655,28 +655,38 @@ namespace ZeroTier
udp_remove((struct udp_pcb*)vs->pcb);
}
if(vs->socket_type == SOCK_STREAM) {
// according to documentation, tcp_pcb is deallocated by the stack's own tcp code. do nothing
}
// TODO: check if already closed? vs->TCP_pcb->state != CLOSED
if(vs->pcb) {
if(((struct tcp_pcb*)vs->pcb)->state == SYN_SENT /*|| vs->TCP_pcb->state == CLOSE_WAIT*/) {
// DEBUG_EXTRA("ignoring close request. invalid PCB state for this operation. sock=%p", vs->sock);
// TODO: errno = ?;
return -1;
}
struct tcp_pcb* tpcb = (struct tcp_pcb*)vs->pcb;
if(tcp_close(tpcb) == ERR_OK) {
if(vs->pcb) {
struct tcp_pcb* tpcb = (struct tcp_pcb*)vs->pcb;
if(tpcb->state == CLOSED) {
DEBUG_ERROR("pcb is in CLOSED state");
// calling tcp_close() here would be redundant
return 0;
}
if(tpcb->state == CLOSE_WAIT) {
DEBUG_ERROR("pcb is in CLOSE_WAIT state");
// calling tcp_close() here would be redundant
}
if(tpcb->state > TIME_WAIT) {
DEBUG_ERROR("warning, pcb=%p is in an invalid state=%d", vs->pcb, tpcb->state);
handle_general_failure();
err = -1;
}
// unregister callbacks for this PCB
tcp_arg(tpcb, NULL);
tcp_recv(tpcb, NULL);
tcp_err(tpcb, NULL);
tcp_sent(tpcb, NULL);
tcp_poll(tpcb, NULL, 1);
}
else {
DEBUG_ERROR("error while calling tcp_close, fd=%d, vs=%p, pcb=%p", vs->app_fd, vs, vs->pcb);
errno = lwip_err_to_errno(err);
err = -1;
tcp_arg(tpcb, NULL);
if(tpcb->state == LISTEN) {
tcp_accept(tpcb, NULL);
}
else {
tcp_recv(tpcb, NULL);
tcp_sent(tpcb, NULL);
tcp_poll(tpcb, NULL, 0);
tcp_err(tpcb, NULL);
}
if((err = tcp_close(tpcb)) < 0) {
DEBUG_ERROR("error while calling tcp_close, fd=%d, vs=%p, pcb=%p", vs->app_fd, vs, vs->pcb);
errno = lwip_err_to_errno(err);
err = -1;
}
}
}
return err;
@@ -691,7 +701,7 @@ namespace ZeroTier
if(how == SHUT_WR) {
shut_tx = 1;
}
if(how == SHUT_RDWR) {
if(how == SHUT_RDWR) {
shut_rx = 1;
shut_tx = 1;
}
@@ -707,12 +717,12 @@ namespace ZeroTier
// write data from processed packets from the stack to the client app
/*
With the raw API, tcp_recv() sets up to receive data via a callback function. Your callback
is delivered chains of pbufs as they become available. You have to manage extracting data
from the pbuf chain, and don't forget to watch out for multiple pbufs in a single callback:
the 'tot_len' field indicates the total length of data in the pbuf chain. You must call
tcp_recved() to tell LWIP when you have processed the received data. As with the netconn API,
you may receive more or less data than you want, and will have to either wait for further
With the raw API, tcp_recv() sets up to receive data via a callback function. Your callback
is delivered chains of pbufs as they become available. You have to manage extracting data
from the pbuf chain, and don't forget to watch out for multiple pbufs in a single callback:
the 'tot_len' field indicates the total length of data in the pbuf chain. You must call
tcp_recved() to tell LWIP when you have processed the received data. As with the netconn API,
you may receive more or less data than you want, and will have to either wait for further
callbacks, or hold onto excess data for later processing.
http://lwip.wikia.com/wiki/Receiving_data_with_LWIP
@@ -724,16 +734,13 @@ namespace ZeroTier
int tot = 0;
if(!vs) {
DEBUG_ERROR("no virtual socket");
return ERR_OK; // FIXME: Determine if this is correct behaviour expected by the stack
handle_general_failure();
return ERR_OK;
}
struct pbuf* q = p;
if(p == NULL) {
/*
if(((struct tcp_pcb*)vs->pcb)->state == CLOSE_WAIT) {
// FIXME: Implement?
}
*/
return ERR_ABRT; // close connection
DEBUG_INFO("p=0x0 for pcb=%p, vs->pcb=%p, this indicates a closure. No need to call tcp_close()", PCB, vs->pcb);
return ERR_ABRT;
}
vs->tap->_tcpconns_m.lock();
vs->_rx_m.lock();
@@ -747,7 +754,7 @@ namespace ZeroTier
if(avail < len) {
DEBUG_ERROR("not enough room (%d bytes) on RX buffer", avail);
}
// place new incoming data on ringbuffer before we try to send it to the app
// place new incoming data on ringbuffer before we try to send it to the app
memcpy(vs->RXbuf->get_buf(), p->payload, len);
vs->RXbuf->produce(len);
p = p->next;
@@ -764,7 +771,7 @@ namespace ZeroTier
vs->RXbuf->consume(w);
if(w < write_attempt_sz) {
DEBUG_TRANS("len=%5d rx_buf_len=%10d [VSRXBF --> APPFDS]", w, vs->RXbuf->count());
DEBUG_ERROR("intended to write len=%d, only wrote len=%d", write_attempt_sz, w);
DEBUG_EXTRA("intended to write len=%d, only wrote len=%d", write_attempt_sz, w);
}
else {
DEBUG_TRANS("len=%5d rx_buf_len=%10d [VSRXBF --> APPFDS]", w, vs->RXbuf->count());
@@ -783,7 +790,6 @@ namespace ZeroTier
// callback from stack to notify driver of the successful acceptance of a connection
err_t lwIP::lwip_cb_accept(void *arg, struct tcp_pcb *newPCB, err_t err)
{
//DEBUG_INFO();
VirtualSocket *vs = (VirtualSocket*)arg;
struct sockaddr_storage ss;
#if defined(LIBZT_IPV4)
@@ -796,7 +802,7 @@ namespace ZeroTier
// TODO: check this
memcpy(&(in6->sin6_addr.s6_addr), &(newPCB->remote_ip), sizeof(int32_t)*4);
in6->sin6_port = newPCB->remote_port;
#endif
#endif
VirtualSocket *new_vs = new VirtualSocket();
new_vs->socket_type = SOCK_STREAM;
new_vs->pcb = newPCB;
@@ -812,10 +818,10 @@ namespace ZeroTier
tcp_sent(newPCB, lwip_cb_sent);
tcp_poll(newPCB, lwip_cb_poll, 1);
// let lwIP know that it can queue additional incoming PCBs
tcp_accepted((struct tcp_pcb*)vs->pcb);
tcp_accepted((struct tcp_pcb*)vs->pcb);
return 0;
}
// copy processed datagram to app socket
void lwIP::lwip_cb_udp_recved(void * arg, struct udp_pcb * upcb, struct pbuf * p, const ip_addr_t * addr, u16_t port)
{
@@ -832,16 +838,16 @@ namespace ZeroTier
struct pbuf* q = p;
struct sockaddr_storage ss;
#if defined(LIBZT_IPV4)
#if defined(LIBZT_IPV4)
struct sockaddr_in *in4 = (struct sockaddr_in *)&ss;
in4->sin_addr.s_addr = addr->addr;
in4->sin_port = port;
#endif
#if defined(LIBZT_IPV6)
#endif
#if defined(LIBZT_IPV6)
struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)&ss;
memcpy(&(in6->sin6_addr.s6_addr), &(addr->addr), sizeof(int32_t)*4);
in6->sin6_port = port;
#endif
#endif
char udp_payload_buf[ZT_SOCKET_MSG_BUF_SZ];
char *msg_ptr = udp_payload_buf;
@@ -935,46 +941,57 @@ namespace ZeroTier
err_t lwIP::lwip_cb_poll(void* arg, struct tcp_pcb *PCB)
{
VirtualSocket *vs = (VirtualSocket *)arg;
DEBUG_EXTRA("fd=%d, vs=%p, PCB=%p", vs->app_fd, vs, PCB);
if(!vs) {
DEBUG_ERROR("invalid vs");
handle_general_failure();
return ERR_OK; // TODO: determine appropriate error value, if any
}
// --- Check buffers to see if we need to finish reading/writing anything ---
// TODO: Make a more generic form of each of these RX/TX blocks that can be shared
// between all polling callbacks and read write methods
// RX
vs->_rx_m.lock();
if(vs->RXbuf->count()) {
// this data has already been acknowledged via tcp_recved(), we merely need to
// move it off of the ringbuffer and into the client app
int w, write_attempt_sz = vs->RXbuf->count() < ZT_MAX_MTU ? vs->RXbuf->count() : ZT_MAX_MTU;
if((w = write(vs->sdk_fd, vs->RXbuf->get_buf(), write_attempt_sz)) < 0) {
DEBUG_ERROR("write(fd=%d)=%d, errno=%d", vs->sdk_fd, w, errno);
}
if(w > 0) {
vs->RXbuf->consume(w);
if(w < write_attempt_sz) {
DEBUG_TRANS("len=%5d rx_buf_len=%10d [VSRXBF --> APPFDS]", w, vs->RXbuf->count());
DEBUG_ERROR("intended to write len=%d, only wrote len=%d", write_attempt_sz, w);
}
else {
DEBUG_TRANS("len=%5d rx_buf_len=%10d [VSRXBF --> APPFDS]", w, vs->RXbuf->count());
}
}
if(vs->socket_type == SOCK_DGRAM) {
DEBUG_INFO("fd=%d, vs=%p, pcb=%p", vs->app_fd, vs, PCB, vs->pcb);
}
vs->_rx_m.unlock();
if(vs->socket_type == SOCK_STREAM) {
DEBUG_INFO("fd=%d, vs=%p, PCB=%p, vs->pcb=%p, vs->pcb->state=%d", vs->app_fd, vs, PCB, (struct tcp_pcb*)(vs->pcb), ((struct tcp_pcb*)(vs->pcb))->state);
if(((struct tcp_pcb*)(vs->pcb))->state == CLOSE_WAIT) {
DEBUG_EXTRA("pcb->state=CLOSE_WAIT. do nothing");
return ERR_OK;
}
if(((struct tcp_pcb*)(vs->pcb))->state == CLOSED) {
DEBUG_EXTRA("pcb->state=CLOSED. do nothing");
return ERR_OK;
}
// --- Check buffers to see if we need to finish reading/writing anything ---
// No need to lock the TX buffer since lwip_Write() will lock it for us
// TX
if(vs->TXbuf->count()) {
// we previously attempted to tcp_write(), but something went wrong, this
// is where we retry
lwipstack->lwip_Write(vs, vs->TXbuf->get_buf(), vs->TXbuf->count());
// TODO: Make a more generic form of each of these RX/TX blocks that can be shared
// between all polling callbacks and read write methods
// RX
vs->_rx_m.lock();
if(vs->RXbuf->count()) {
// this data has already been acknowledged via tcp_recved(), we merely need to
// move it off of the ringbuffer and into the client app
int w, write_attempt_sz = vs->RXbuf->count() < ZT_MAX_MTU ? vs->RXbuf->count() : ZT_MAX_MTU;
if((w = write(vs->sdk_fd, vs->RXbuf->get_buf(), write_attempt_sz)) < 0) {
DEBUG_ERROR("write(fd=%d)=%d, errno=%d", vs->sdk_fd, w, errno);
}
if(w > 0) {
vs->RXbuf->consume(w);
if(w < write_attempt_sz) {
DEBUG_TRANS("len=%5d rx_buf_len=%10d [VSRXBF --> APPFDS]", w, vs->RXbuf->count());
DEBUG_EXTRA("intended to write len=%d, only wrote len=%d", write_attempt_sz, w);
}
else {
DEBUG_TRANS("len=%5d rx_buf_len=%10d [VSRXBF --> APPFDS]", w, vs->RXbuf->count());
}
}
}
vs->_rx_m.unlock();
// No need to lock the TX buffer since lwip_Write() will lock it for us
// TX
if(vs->TXbuf->count()) {
// we previously attempted to tcp_write(), but something went wrong, this
// is where we retry
lwipstack->lwip_Write(vs, vs->TXbuf->get_buf(), vs->TXbuf->count());
}
}
return ERR_OK;
}
@@ -982,12 +999,16 @@ namespace ZeroTier
void lwIP::lwip_cb_err(void *arg, err_t err)
{
VirtualSocket *vs = (VirtualSocket *)arg;
if(!vs){
if(!vs) {
DEBUG_ERROR("err=%d, invalid virtual socket", err);
errno = -1;
}
DEBUG_ERROR("vs=%p, pcb=%p, fd=%d, err=%d", vs, vs->pcb, vs->app_fd, err);
vs->tap->Close(vs);
if(vs->socket_type == SOCK_STREAM) {
DEBUG_ERROR("vs=%p, pcb=%p, pcb->state=%d, fd=%d, err=%d", vs, vs->pcb, ((struct tcp_pcb*)(vs->pcb))->state, vs->app_fd, err);
}
if(vs->socket_type == SOCK_DGRAM) {
DEBUG_ERROR("vs=%p, pcb=%p, fd=%d, err=%d", vs, vs->pcb, vs->app_fd, err);
}
switch(err)
{
case ERR_MEM: // -1
@@ -1028,7 +1049,7 @@ namespace ZeroTier
break;
case ERR_ABRT: // -13
DEBUG_ERROR("ERR_ABRT, Connection aborted.");
break;
break;
case ERR_RST: // -14
DEBUG_ERROR("ERR_RST, Connection reset.");
break;

View File

@@ -216,6 +216,11 @@ namespace ZeroTier {
*/
static int lwip_num_current_raw_pcbs();
/*
* Returns the total number of PCBs of any time or state
*/
int lwip_num_total_pcbs();
/*
* Registers a DNS nameserver with the network stack
*/

View File

@@ -1150,7 +1150,6 @@ void udp_client_sustained_4(UDP_UNIT_TEST_SIG_4)
int num_to_send = 10;
for(int i=0; i<num_to_send; i++) {
sleep(1);
// tx
if((w = SENDTO(fd, msg.c_str(), strlen(msg.c_str()), 0, (struct sockaddr *)remote_addr, sizeof(*remote_addr))) < 0) {
DEBUG_ERROR("error sending packet, err=%d", errno);