Added selectable TCP_WRITE_FLAG_COPY mode on a per-socket basis, updated lwIP driver for socket limit checks

This commit is contained in:
Joseph Henry
2017-09-08 16:13:56 -07:00
parent 05fec81757
commit 1d4f36a811
9 changed files with 208 additions and 129 deletions

View File

@@ -24,7 +24,17 @@
* of your own application.
*/
// lwIP network stack driver
/*
lwIP network stack driver
NOTES:
Calls made in this network stack driver may never block since all packet
processing (input and output) as well as timer processing (TCP mainly) is done
in a single execution context.
*/
#include <algorithm>
@@ -36,6 +46,8 @@
#include "netif/ethernet.h"
#include "lwip/etharp.h"
#include "priv/tcp_priv.h"
#if defined(LIBZT_IPV6)
#include "lwip/ethip6.h"
#include "lwip/nd6.h"
@@ -169,6 +181,64 @@ namespace ZeroTier
}
}
int lwIP::lwip_num_current_tcp_pcbs()
{
// TODO: These will likely need some sort of locking protection
int count = 0;
struct tcp_pcb *pcb_ptr = tcp_active_pcbs; // PCBs that can RX/TX data
while(pcb_ptr) {
pcb_ptr = pcb_ptr->next;
count++;
DEBUG_ERROR("FOUND --- tcp_active_pcbs PCB COUNT = %d", count);
}
pcb_ptr = tcp_tw_pcbs; // PCBs in TIME-WAIT state
while(pcb_ptr) {
pcb_ptr = pcb_ptr->next;
count++;
DEBUG_ERROR("FOUND --- tcp_tw_pcbs PCB COUNT = %d", count);
}
/* TODO
pcb_ptr = tcp_listen_pcbs;
while(pcb_ptr) {
pcb_ptr = pcb_ptr->next;
count++;
DEBUG_ERROR("FOUND --- tcp_listen_pcbs PCB COUNT = %d", count);
}*/
pcb_ptr = tcp_bound_pcbs; // PCBs in a bound state
while(pcb_ptr) {
pcb_ptr = pcb_ptr->next;
count++;
DEBUG_ERROR("FOUND --- tcp_bound_pcbs PCB COUNT = %d", count);
}
return count;
}
int lwIP::lwip_num_current_udp_pcbs()
{
// TODO: These will likely need some sort of locking protection
int count = 0;
struct udp_pcb *pcb_ptr = udp_pcbs;
while(pcb_ptr) {
pcb_ptr = pcb_ptr->next;
count++;
DEBUG_ERROR("FOUND --- udp_pcbs PCB COUNT = %d", count);
}
return count;
}
int lwIP::lwip_num_current_raw_pcbs()
{
// TODO: These will likely need some sort of locking protection
int count = 0;
struct raw_pcb *pcb_ptr = raw_pcbs;
while(pcb_ptr) {
pcb_ptr = pcb_ptr->next;
count++;
DEBUG_ERROR("FOUND --- raw_pcbs PCB COUNT = %d", count);
}
return count;
}
int lwIP::lwip_add_dns_nameserver(struct sockaddr *addr)
{
return -1;
@@ -280,8 +350,7 @@ namespace ZeroTier
int lwIP::lwip_Socket(void **pcb, int socket_family, int socket_type, int protocol)
{
//DEBUG_INFO();
if(!can_provision_new_socket()) {
if(!can_provision_new_socket(socket_type)) {
DEBUG_ERROR("unable to create new socket due to limitation of network stack");
return -1;
}
@@ -340,49 +409,15 @@ namespace ZeroTier
tcp_poll(tpcb, lwip_cb_poll, LWIP_APPLICATION_POLL_FREQ);
tcp_arg(tpcb, vs);
//DEBUG_EXTRA(" pcb->state=%x", vs->TCP_pcb->state);
//if(vs->TCP_pcb->state != CLOSED) {
// DEBUG_INFO(" cannot connect using this PCB, PCB!=CLOSED");
// tap->sendReturnValue(tap->_phy.getDescriptor(rpcSock), -1, EAGAIN);
// return;
//}
if((err = tcp_connect(tpcb,&ba,port,lwip_cb_connected)) < 0)
{
if(err == ERR_ISCONN) {
// Already in connected state
errno = EISCONN;
return -1;
} if(err == ERR_USE) {
// Already in use
errno = EADDRINUSE;
return -1;
} if(err == ERR_VAL) {
// Invalid ipaddress parameter
errno = EINVAL;
return -1;
} if(err == ERR_RTE) {
// No route to host
errno = ENETUNREACH;
return -1;
} if(err == ERR_BUF) {
// No more ports available
errno = EAGAIN;
return -1;
}
if(err == ERR_MEM) {
// TODO: Doesn't describe the problem well, but closest match
errno = EAGAIN;
return -1;
}
errno = lwip_err_to_errno(err);
// We should only return a value if failure happens immediately
// Otherwise, we still need to wait for a callback from lwIP.
// - This is because an ERR_OK from tcp_connect() only verifies
// that the SYN packet was enqueued onto the stack properly,
// that's it!
// - Most instances of a retval for a connect() should happen
// in the nc_connect() and lwip_cb_err() callbacks!
DEBUG_ERROR("unable to connect");
errno = EAGAIN;
return -1;
}
}
@@ -391,12 +426,12 @@ namespace ZeroTier
int lwIP::lwip_Bind(VirtualTap *tap, VirtualSocket *vs, const struct sockaddr *addr, socklen_t addrlen)
{
// TODO: Check case for IP_ADDR_ANY
//DEBUG_EXTRA("vs=%p", vs);
ip_addr_t ba;
char addrstr[INET6_ADDRSTRLEN];
memset(addrstr, 0, INET6_ADDRSTRLEN);
int port = 0, err = 0;
#if defined(LIBZT_IPV4)
struct sockaddr_in *in4 = (struct sockaddr_in *)addr;
if(addr->sa_family == AF_INET) {
@@ -415,38 +450,23 @@ namespace ZeroTier
}
#endif
if(vs->socket_type == SOCK_DGRAM) {
err = udp_bind((struct udp_pcb*)vs->pcb, (const ip_addr_t *)&ba, port);
if(err == ERR_USE) {
if((err = udp_bind((struct udp_pcb*)vs->pcb, (const ip_addr_t *)&ba, port)) < 0) {
errno = lwip_err_to_errno(err);
err = -1;
errno = EADDRINUSE; // port in use
}
else {
// set the recv callback
// set callback
udp_recv((struct udp_pcb*)vs->pcb, lwip_cb_udp_recved, vs);
err = ERR_OK;
errno = ERR_OK; // success
}
}
else if (vs->socket_type == SOCK_STREAM) {
err = tcp_bind((struct tcp_pcb*)vs->pcb, (const ip_addr_t *)&ba, port);
if(err != ERR_OK) {
DEBUG_ERROR("err=%d", err);
if(err == ERR_USE){
err = -1;
errno = EADDRINUSE;
}
if(err == ERR_MEM){
err = -1;
errno = ENOMEM;
}
if(err == ERR_BUF){
err = -1;
errno = ENOMEM;
}
}
if((err = tcp_bind((struct tcp_pcb*)vs->pcb, (const ip_addr_t *)&ba, port)) < 0) {
errno = lwip_err_to_errno(err);
err = -1;
}
else {
err = ERR_OK;
errno = ERR_OK; // success
}
}
return err;
@@ -454,19 +474,25 @@ namespace ZeroTier
int lwIP::lwip_Listen(VirtualSocket *vs, int backlog)
{
//DEBUG_INFO("vs=%p", vs);
int err = 0;
struct tcp_pcb* listeningPCB;
#ifdef TCP_LISTEN_BACKLOG
listeningPCB = tcp_listen_with_backlog((struct tcp_pcb*)vs->pcb, backlog);
#else
listeningPCB = tcp_listen((struct tcp_pcb*)vs->pcb);
#endif
if(listeningPCB != NULL) {
if(listeningPCB) {
vs->pcb = listeningPCB;
tcp_accept(listeningPCB, lwip_cb_accept); // set callback
// set callback
tcp_accept(listeningPCB, lwip_cb_accept);
tcp_arg(listeningPCB, vs);
err = ERR_OK;
}
return 0;
else {
errno = ENOMEM;
err = -1;
}
return err;
}
VirtualSocket* lwIP::lwip_Accept(VirtualSocket *vs)
@@ -536,11 +562,10 @@ namespace ZeroTier
DEBUG_ERROR("no virtual socket");
return -1;
}
if(vs->socket_type == SOCK_DGRAM)
{
if(vs->socket_type == SOCK_DGRAM) {
// TODO: Packet re-assembly hasn't yet been tested with lwIP so UDP packets are limited to MTU-sized chunks
int udp_trans_len = std::min(len, (ssize_t)ZT_MAX_MTU);
//DEBUG_EXTRA("allocating pbuf chain of size=%d for UDP packet", udp_trans_len);
// DEBUG_EXTRA("allocating pbuf chain of size=%d for UDP packet", udp_trans_len);
struct pbuf * pb = pbuf_alloc(PBUF_TRANSPORT, udp_trans_len, PBUF_POOL);
if(!pb){
DEBUG_ERROR("unable to allocate new pbuf of size=%d", vs->TXbuf->count());
@@ -561,8 +586,7 @@ namespace ZeroTier
return udp_trans_len;
}
}
if(vs->socket_type == SOCK_STREAM)
{
if(vs->socket_type == SOCK_STREAM) {
// How much we are currently allowed to write to the VirtualSocket
ssize_t sndbuf = ((struct tcp_pcb*)vs->pcb)->snd_buf;
int err, r;
@@ -589,7 +613,8 @@ namespace ZeroTier
// Writes data pulled from the client's socket buffer to LWIP. This merely sends the
// data to LWIP to be enqueued and eventually sent to the network.
if(r > 0) {
err = tcp_write((struct tcp_pcb*)vs->pcb, vs->TXbuf->get_buf(), r, TCP_WRITE_FLAG_COPY);
err = tcp_write((struct tcp_pcb*)vs->pcb, vs->TXbuf->get_buf(), r, vs->copymode);
tcp_output((struct tcp_pcb*)vs->pcb);
if(err != ERR_OK) {
DEBUG_ERROR("error while writing to lwIP tcp_pcb, err=%d", err);
@@ -597,7 +622,15 @@ namespace ZeroTier
DEBUG_ERROR("lwIP out of memory");
return -1;
} else {
vs->TXbuf->consume(r); // success
if(vs->copymode & TCP_WRITE_FLAG_COPY) {
// since we copied the data (allocated pbufs), we can consume the buffer
vs->TXbuf->consume(r); // success
}
else {
// since we only processed the data by pointer reference we
// want to preserve it until it has been ACKed by the remote host
// (DO NOTHING)
}
return ERR_OK;
}
}
@@ -835,17 +868,16 @@ namespace ZeroTier
err_t lwIP::lwip_cb_sent(void* arg, struct tcp_pcb *PCB, u16_t len)
{
//DEBUG_EXTRA("pcb=%p", PCB);
/*
VirtualSocket *vs = (VirtualSocket *)arg;
Mutex::Lock _l(vs->tap->_tcpconns_m);
if(vs && len) {
int softmax = vs->socket_type == SOCK_STREAM ? ZT_TCP_TX_BUF_SZ : ZT_UDP_TX_BUF_SZ;
if(vs->TXbuf->count() < softmax) {
vs->tap->_phy.setNotifyReadable(vs->sock, true);
vs->tap->_phy.whack();
}
if(!vs){
DEBUG_ERROR("invalid vs for PCB=%p, len=%d", PCB, len);
}
if(!(vs->copymode & TCP_WRITE_FLAG_COPY)) {
// since we decided in lwip_Write() not to consume the buffere data, as it
// was not copied and was only used by pointer reference, we can now consume
// the data on the buffer since we've got an ACK back from the remote host
vs->TXbuf->consume(len);
}
*/
return ERR_OK;
}
@@ -875,79 +907,63 @@ namespace ZeroTier
VirtualSocket *vs = (VirtualSocket *)arg;
if(!vs){
DEBUG_ERROR("err=%d, invalid virtual socket", err);
errno = -1; // FIXME: Find more appropriate value
errno = -1;
}
DEBUG_ERROR("vs=%p, pcb=%p, fd=%d, err=%d", vs, vs->pcb, vs->app_fd, err);
vs->tap->Close(vs);
switch(err)
{
case ERR_MEM: // -1
DEBUG_ERROR("ERR_MEM->ENOMEM, Out of memory error.");
errno = ENOMEM;
break;
case ERR_BUF: // -2
DEBUG_ERROR("ERR_BUF->ENOBUFS, Buffer error.");
errno = ENOBUFS;
break;
case ERR_TIMEOUT: // -3
DEBUG_ERROR("ERR_TIMEOUT->ETIMEDOUT, Timeout.");
errno = ETIMEDOUT;
break;
case ERR_RTE: // -4
DEBUG_ERROR("ERR_RTE->ENETUNREACH, Routing problem.");
errno = ENETUNREACH;
break;
case ERR_INPROGRESS: // -5
DEBUG_ERROR("ERR_INPROGRESS->EINPROGRESS, Operation in progress.");
errno = EINPROGRESS;
break;
case ERR_VAL: // -6
DEBUG_ERROR("ERR_VAL->EINVAL, Illegal value.");
errno = EINVAL;
break;
case ERR_WOULDBLOCK: // -7
DEBUG_ERROR("ERR_WOULDBLOCK->EWOULDBLOCK, Operation would block.");
errno = EWOULDBLOCK;
break;
case ERR_USE: // -8
DEBUG_ERROR("ERR_USE->EADDRINUSE, Address in use.");
errno = EADDRINUSE;
break;
case ERR_ALREADY: // -9 ?
DEBUG_ERROR("ERR_ALREADY->EISCONN, Already connecting.");
errno = EISCONN;
break;
case ERR_ISCONN: // -10
DEBUG_ERROR("ERR_ISCONN->EISCONN, Already connected");
errno = EISCONN;
break;
case ERR_CONN: // -11 ?
DEBUG_ERROR("ERR_CONN->EISCONN, Not connected");
errno = EISCONN;
break;
case ERR_IF: // -12
DEBUG_ERROR("ERR_IF, Low-level netif error.");
errno = -1;
break;
case ERR_ABRT: // -13
DEBUG_ERROR("ERR_ABRT, Connection aborted.");
errno = -1;
break;
case ERR_RST: // -14
DEBUG_ERROR("ERR_RST, Connection reset.");
errno = -1;
break;
case ERR_CLSD: // -15
DEBUG_ERROR("ERR_CLSD, Connection closed.");
errno = -1;
break;
case ERR_ARG: // -16
DEBUG_ERROR("ERR_ARG, Illegal argument.");
errno = -1;
break;
default:
break;
}
errno = lwip_err_to_errno(err);
}
}