Switch to MEM_LIBC_MALLOC usage in lwIP. Added event callbacks. Use of finer-grained locks in RX logic. CRCs disabled on inbound packets
This commit is contained in:
116
src/lwIP.cpp
116
src/lwIP.cpp
@@ -31,6 +31,7 @@
|
||||
*/
|
||||
|
||||
#include <vector>
|
||||
#include <queue>
|
||||
|
||||
#include "MAC.hpp"
|
||||
|
||||
@@ -46,7 +47,6 @@
|
||||
#include "lwip/memp.h"
|
||||
#include "lwip/sys.h"
|
||||
#include "lwip/tcp.h"
|
||||
#include "lwip/priv/tcp_priv.h" /* for tcp_debug_print_pcbs() */
|
||||
#include "lwip/timeouts.h"
|
||||
#include "lwip/stats.h"
|
||||
#include "lwip/ethip6.h"
|
||||
@@ -65,9 +65,9 @@ void ms_sleep(unsigned long ms)
|
||||
}
|
||||
#endif
|
||||
|
||||
std::queue<struct pbuf *> rx_queue;
|
||||
|
||||
ZeroTier::Mutex _rx_input_lock_m;
|
||||
struct pbuf* lwip_frame_rxbuf[LWIP_MAX_GUARDED_RX_BUF_SZ];
|
||||
int lwip_frame_rxbuf_tot = 0;
|
||||
|
||||
bool main_loop_exited = false;
|
||||
bool lwip_driver_initialized = false;
|
||||
@@ -103,14 +103,19 @@ void my_tcpip_callback(void *arg)
|
||||
if (main_loop_exited) {
|
||||
return;
|
||||
}
|
||||
ZeroTier::Mutex::Lock _l(_rx_input_lock_m);
|
||||
err_t err = ERR_OK;
|
||||
int loop_score = LWIP_FRAMES_HANDLED_PER_CORE_CALL; // max num of packets to read per polling call
|
||||
// TODO: Optimize (use Ringbuffer)
|
||||
int pkt_num = 0;
|
||||
int count_initial = lwip_frame_rxbuf_tot;
|
||||
while (lwip_frame_rxbuf_tot > 0 && loop_score > 0) {
|
||||
struct pbuf *p = lwip_frame_rxbuf[pkt_num];
|
||||
pkt_num++;
|
||||
while (loop_score > 0) {
|
||||
// TODO: Swap this block out for a thread-safe container
|
||||
_rx_input_lock_m.lock();
|
||||
if (rx_queue.size() == 0) {
|
||||
_rx_input_lock_m.unlock();
|
||||
return;
|
||||
}
|
||||
struct pbuf *p = rx_queue.front();
|
||||
rx_queue.pop();
|
||||
_rx_input_lock_m.unlock();
|
||||
// Packet routing logic. Inputs packet into correct lwip netif interface depending on protocol type
|
||||
struct ip_hdr *iphdr;
|
||||
switch (((struct eth_hdr *)p->payload)->type)
|
||||
@@ -120,8 +125,8 @@ void my_tcpip_callback(void *arg)
|
||||
for (size_t i=0; i<lwip_netifs.size(); i++) {
|
||||
if (lwip_netifs[i]->output_ip6 &&
|
||||
lwip_netifs[i]->output_ip6 == ethip6_output) {
|
||||
if (lwip_netifs[i]->input(p, lwip_netifs[i]) != ERR_OK) {
|
||||
DEBUG_ERROR("packet input error (ipv6, p=%p, netif=%p)", p, &lwip_netifs[i]);
|
||||
if ((err = lwip_netifs[i]->input(p, lwip_netifs[i])) != ERR_OK) {
|
||||
DEBUG_ERROR("packet input error (ipv6, p=%p, netif=%p)=%d", p, &lwip_netifs[i], err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -134,8 +139,8 @@ void my_tcpip_callback(void *arg)
|
||||
lwip_netifs[i]->output == etharp_output) {
|
||||
if (lwip_netifs[i]->ip_addr.u_addr.ip4.addr == iphdr->dest.addr ||
|
||||
ip4_addr_isbroadcast_u32(iphdr->dest.addr, lwip_netifs[i])) {
|
||||
if (lwip_netifs[i]->input(p, lwip_netifs[i]) != ERR_OK) {
|
||||
DEBUG_ERROR("packet input error (ipv4, p=%p, netif=%p)", p, &lwip_netifs[i]);
|
||||
if ((err = lwip_netifs[i]->input(p, lwip_netifs[i])) != ERR_OK) {
|
||||
DEBUG_ERROR("packet input error (ipv4, p=%p, netif=%p)=%d", p, &lwip_netifs[i], err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -146,8 +151,8 @@ void my_tcpip_callback(void *arg)
|
||||
for (size_t i=0; i<lwip_netifs.size(); i++) {
|
||||
if (lwip_netifs[i]->state) {
|
||||
pbuf_ref(p);
|
||||
if (lwip_netifs[i]->input(p, lwip_netifs[i]) != ERR_OK) {
|
||||
DEBUG_ERROR("packet input error (arp, p=%p, netif=%p)", p, &lwip_netifs[i]);
|
||||
if ((err = lwip_netifs[i]->input(p, lwip_netifs[i])) != ERR_OK) {
|
||||
DEBUG_ERROR("packet input error (arp, p=%p, netif=%p)=%d", p, &lwip_netifs[i], err);
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -157,19 +162,19 @@ void my_tcpip_callback(void *arg)
|
||||
default:
|
||||
break;
|
||||
}
|
||||
lwip_frame_rxbuf_tot--;
|
||||
loop_score--;
|
||||
}
|
||||
int count_final = lwip_frame_rxbuf_tot;
|
||||
// Move pbuf frame pointer address buffer by the number of frames successfully fed into the stack core
|
||||
if (count_initial - count_final > 0) {
|
||||
memmove(lwip_frame_rxbuf, lwip_frame_rxbuf + count_final, count_initial - count_final);
|
||||
}
|
||||
}
|
||||
|
||||
// main thread which starts the initialization process
|
||||
static void main_lwip_driver_loop(void *arg)
|
||||
{
|
||||
#if defined(__linux__)
|
||||
pthread_setname_np(pthread_self(), "lwip_driver_loop");
|
||||
#endif
|
||||
#if defined(__APPLE__)
|
||||
pthread_setname_np("lwip_driver_loop");
|
||||
#endif
|
||||
sys_sem_t sem;
|
||||
LWIP_UNUSED_ARG(arg);
|
||||
if (sys_sem_new(&sem, 0) != ERR_OK) {
|
||||
@@ -178,8 +183,6 @@ static void main_lwip_driver_loop(void *arg)
|
||||
tcpip_init(tcpip_init_done, &sem);
|
||||
has_already_been_initialized = true;
|
||||
sys_sem_wait(&sem);
|
||||
//DEBUG_INFO("stack thread init complete");
|
||||
|
||||
while(lwip_driver_initialized) {
|
||||
#if defined(_WIN32)
|
||||
ms_sleep(LWIP_GUARDED_BUF_CHECK_INTERVAL*hibernationDelayMultiplier);
|
||||
@@ -301,12 +304,19 @@ err_t lwip_eth_tx(struct netif *netif, struct pbuf *p)
|
||||
DEBUG_TRANS("len=%5d dst=%s [%s TX <-- %s] proto=0x%04x %s", totalLength, macBuf, nodeBuf, tap->nodeId().c_str(),
|
||||
ZeroTier::Utils::ntoh(ethhdr->type), flagbuf);
|
||||
}
|
||||
|
||||
|
||||
return ERR_OK;
|
||||
}
|
||||
|
||||
void lwip_eth_rx(ZeroTier::VirtualTap *tap, const ZeroTier::MAC &from, const ZeroTier::MAC &to, unsigned int etherType,
|
||||
const void *data, unsigned int len)
|
||||
{
|
||||
if (!lwip_netifs.size()) {
|
||||
DEBUG_ERROR("there are no netifs set up to handle this packet. ignoring.");
|
||||
return;
|
||||
}
|
||||
|
||||
struct pbuf *p,*q;
|
||||
struct eth_hdr ethhdr;
|
||||
from.copyTo(ethhdr.src.addr, 6);
|
||||
@@ -326,41 +336,35 @@ void lwip_eth_rx(ZeroTier::VirtualTap *tap, const ZeroTier::MAC &from, const Zer
|
||||
DEBUG_TRANS("len=%5d dst=%s [%s RX --> %s] proto=0x%04x %s", len, macBuf, nodeBuf, tap->nodeId().c_str(),
|
||||
ZeroTier::Utils::ntoh(ethhdr.type), flagbuf);
|
||||
}
|
||||
|
||||
p = pbuf_alloc(PBUF_RAW, len+sizeof(struct eth_hdr), PBUF_POOL);
|
||||
if (p != NULL) {
|
||||
const char *dataptr = reinterpret_cast<const char *>(data);
|
||||
// First pbuf gets ethernet header at start
|
||||
q = p;
|
||||
if (q->len < sizeof(ethhdr)) {
|
||||
DEBUG_ERROR("dropped packet: first pbuf smaller than ethernet header");
|
||||
return;
|
||||
}
|
||||
memcpy(q->payload,ðhdr,sizeof(ethhdr));
|
||||
memcpy((char*)q->payload + sizeof(ethhdr),dataptr,q->len - sizeof(ethhdr));
|
||||
dataptr += q->len - sizeof(ethhdr);
|
||||
// Remaining pbufs (if any) get rest of data
|
||||
while ((q = q->next)) {
|
||||
memcpy(q->payload,dataptr,q->len);
|
||||
dataptr += q->len;
|
||||
}
|
||||
}
|
||||
else {
|
||||
DEBUG_ERROR("dropped packet: no pbufs available");
|
||||
|
||||
p = pbuf_alloc(PBUF_RAW, len+sizeof(struct eth_hdr), PBUF_RAM);
|
||||
if (!p) {
|
||||
DEBUG_ERROR("dropped packet: unable to allocate memory for pbuf");
|
||||
return;
|
||||
}
|
||||
if (!lwip_netifs.size()) {
|
||||
DEBUG_ERROR("there are no netifs set up to handle this packet. ignoring.");
|
||||
// First pbuf gets ethernet header at start
|
||||
q = p;
|
||||
if (q->len < sizeof(ethhdr)) {
|
||||
DEBUG_ERROR("dropped packet: first pbuf smaller than ethernet header");
|
||||
return;
|
||||
}
|
||||
ZeroTier::Mutex::Lock _l(_rx_input_lock_m);
|
||||
if (lwip_frame_rxbuf_tot == LWIP_MAX_GUARDED_RX_BUF_SZ) {
|
||||
DEBUG_ERROR("dropped packet -- guarded receive buffer full, adjust MAX_GUARDED_RX_BUF_SZ or LWIP_GUARDED_BUF_CHECK_INTERVAL");
|
||||
const char *dataptr = reinterpret_cast<const char *>(data);
|
||||
memcpy(q->payload,ðhdr,sizeof(ethhdr));
|
||||
int remainingPayloadSpace = q->len - sizeof(ethhdr);
|
||||
memcpy((char*)q->payload + sizeof(ethhdr),dataptr,remainingPayloadSpace);
|
||||
dataptr += remainingPayloadSpace;
|
||||
// Remaining pbufs (if any) get rest of data
|
||||
while ((q = q->next)) {
|
||||
memcpy(q->payload,dataptr,q->len);
|
||||
dataptr += q->len;
|
||||
}
|
||||
_rx_input_lock_m.lock();
|
||||
if (rx_queue.size() >= LWIP_MAX_GUARDED_RX_BUF_SZ) {
|
||||
DEBUG_INFO("dropped packet: rx_queue is full (>= %d)", LWIP_MAX_GUARDED_RX_BUF_SZ);
|
||||
return;
|
||||
}
|
||||
//pbuf_ref(p); // Increment reference to allow user application to copy data from buffer -- Will be automatically deallocated by socket API
|
||||
lwip_frame_rxbuf[lwip_frame_rxbuf_tot] = p;
|
||||
lwip_frame_rxbuf_tot += 1;
|
||||
rx_queue.push(p);
|
||||
_rx_input_lock_m.unlock();
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -377,9 +381,9 @@ void lwip_start_dhcp(void *netif)
|
||||
#endif
|
||||
}
|
||||
|
||||
/*
|
||||
static void netif_status_callback(struct netif *netif)
|
||||
{
|
||||
/*
|
||||
DEBUG_INFO("n=%p, %c%c, %d, o=%p, o6=%p, mc=%x:%x:%x:%x:%x:%x, hwln=%d, st=%p, flgs=%d\n",
|
||||
netif,
|
||||
netif->name[0],
|
||||
@@ -397,8 +401,8 @@ static void netif_status_callback(struct netif *netif)
|
||||
netif->state,
|
||||
netif->flags
|
||||
);
|
||||
*/
|
||||
}
|
||||
*/
|
||||
|
||||
ZeroTier::MAC _mac;
|
||||
|
||||
@@ -455,7 +459,7 @@ void lwip_init_interface(void *tapref, const ZeroTier::MAC &mac, const ZeroTier:
|
||||
IP4_ADDR(&gw,127,0,0,1);
|
||||
ipaddr.addr = *((u32_t *)ip.rawIpData());
|
||||
netmask.addr = *((u32_t *)ip.netmask().rawIpData());
|
||||
netif_set_status_callback(lwipdev, netif_status_callback);
|
||||
//netif_set_status_callback(lwipdev, netif_status_callback);
|
||||
netif_add(lwipdev, &ipaddr, &netmask, &gw, NULL, netif_init_4, tcpip_input);
|
||||
lwipdev->state = tapref;
|
||||
snprintf(macbuf, ZTS_MAC_ADDRSTRLEN, "%02x:%02x:%02x:%02x:%02x:%02x",
|
||||
@@ -475,7 +479,7 @@ void lwip_init_interface(void *tapref, const ZeroTier::MAC &mac, const ZeroTier:
|
||||
netif_create_ip6_linklocal_address(lwipdev, 1);
|
||||
netif_ip6_addr_set_state(lwipdev, 0, IP6_ADDR_TENTATIVE);
|
||||
netif_ip6_addr_set_state(lwipdev, 1, IP6_ADDR_TENTATIVE);
|
||||
netif_set_status_callback(lwipdev, netif_status_callback);
|
||||
//netif_set_status_callback(lwipdev, netif_status_callback);
|
||||
netif_set_default(lwipdev);
|
||||
netif_set_up(lwipdev);
|
||||
netif_set_link_up(lwipdev);
|
||||
|
||||
Reference in New Issue
Block a user