Added a lock-free queue to lwIP driver for RX packet processing. Added checks to force strict ordering of callback events

This commit is contained in:
Joseph Henry
2019-02-07 14:11:17 -08:00
parent 52a7e9229e
commit 1f8d3030c8
4 changed files with 89 additions and 70 deletions

View File

@@ -56,6 +56,9 @@
#include "Controls.hpp"
extern void postEvent(uint64_t id, int eventCode);
#include "concurrentqueue.h"
moodycamel::ConcurrentQueue<struct ZeroTier::zts_sorted_packet*> rx_queue;
#if defined(_WIN32)
#include <time.h>
#endif
@@ -67,17 +70,19 @@ extern void postEvent(uint64_t id, int eventCode);
namespace ZeroTier {
bool main_loop_exited = false;
bool lwip_driver_initialized = false;
bool has_already_been_initialized = false;
bool _has_exited = false;
int hibernationDelayMultiplier = 1;
extern bool _run_lwip_tcpip;
Mutex lwip_driver_m;
Mutex driver_m;
std::queue<struct zts_sorted_packet*> rx_queue;
Mutex _rx_input_lock_m;
void lwip_sleep(long ms)
{
#if defined(_WIN32)
Sleep(ms*hibernationDelayMultiplier);
#else
usleep(ms*1000*hibernationDelayMultiplier);
#endif
}
void lwip_hibernate_driver()
{
@@ -94,53 +99,46 @@ static void tcpip_init_done(void *arg)
{
sys_sem_t *sem;
sem = (sys_sem_t *)arg;
lwip_driver_initialized = true;
_run_lwip_tcpip = true;
postEvent((uint64_t)0, ZTS_EVENT_NETWORK_STACK_UP);
driver_m.unlock();
sys_sem_signal(sem);
}
void my_tcpip_callback(void *arg)
{
if (main_loop_exited) {
if (!_run_lwip_tcpip) {
return;
}
err_t err = ERR_OK;
int loop_score = LWIP_FRAMES_HANDLED_PER_CORE_CALL; // max num of packets to read per polling call
while (loop_score > 0) {
struct zts_sorted_packet *sp;
while (loop_score > 0 && rx_queue.size_approx() > 0) {
// TODO: Swap this block out for a thread-safe container
_rx_input_lock_m.lock();
if (rx_queue.size() == 0) {
_rx_input_lock_m.unlock();
return;
}
struct zts_sorted_packet *sp = rx_queue.front();
struct pbuf *p = sp->p;
rx_queue.pop();
_rx_input_lock_m.unlock();
// Feed packet into appropriate lwIP netif
if (sp->p && sp->n) {
if ((err = sp->n->input(sp->p, sp->n)) != ERR_OK) {
DEBUG_ERROR("packet input error (p=%p, n=%p)=%d", p, sp->n, err);
pbuf_free(p);
struct pbuf *p;
if (rx_queue.try_dequeue(sp)) {
p = sp->p;
// Feed packet into appropriate lwIP netif
if (sp->p && sp->n) {
if ((err = sp->n->input(sp->p, sp->n)) != ERR_OK) {
DEBUG_ERROR("packet input error (p=%p, n=%p)=%d", p, sp->n, err);
pbuf_free(p);
}
sp->p = NULL;
}
sp->p = NULL;
delete sp;
sp = NULL;
}
delete sp;
sp = NULL;
loop_score--;
}
}
// Main thread which starts the initialization process
static void main_lwip_driver_loop(void *arg)
{
#if defined(__linux__)
pthread_setname_np(pthread_self(), "lwipDriver");
pthread_setname_np(pthread_self(), ZTS_LWIP_DRIVER_THREAD_NAME);
#endif
#if defined(__APPLE__)
pthread_setname_np("lwipDriver");
pthread_setname_np(ZTS_LWIP_DRIVER_THREAD_NAME);
#endif
sys_sem_t sem;
LWIP_UNUSED_ARG(arg);
@@ -148,53 +146,63 @@ static void main_lwip_driver_loop(void *arg)
DEBUG_ERROR("failed to create semaphore");
}
tcpip_init(tcpip_init_done, &sem);
has_already_been_initialized = true;
sys_sem_wait(&sem);
while(lwip_driver_initialized) {
#if defined(_WIN32)
Sleep(LWIP_GUARDED_BUF_CHECK_INTERVAL*hibernationDelayMultiplier);
#else
usleep(LWIP_GUARDED_BUF_CHECK_INTERVAL*1000*hibernationDelayMultiplier);
#endif
// Main loop
while(_run_lwip_tcpip) {
lwip_sleep(LWIP_GUARDED_BUF_CHECK_INTERVAL);
// Handle incoming packets from the core's thread context.
// If you feed frames into the core directly you will violate the core's thread model
tcpip_callback_with_block(my_tcpip_callback, NULL, 1);
}
main_loop_exited = true;
_run_lwip_tcpip = false;
_has_exited = true;
}
bool lwip_is_up()
{
Mutex::Lock _l(lwip_driver_m);
return _run_lwip_tcpip;
}
bool lwip_has_previously_shutdown()
{
Mutex::Lock _l(lwip_driver_m);
return _has_exited;
}
// Initialize the lwIP stack
void lwip_driver_init()
{
driver_m.lock(); // Unlocked from callback indicating completion of driver init
if (has_already_been_initialized || lwip_driver_initialized) {
// Already initialized, skip
driver_m.unlock();
return;
} if (main_loop_exited) {
DEBUG_ERROR("stack has previously been shutdown an cannot be restarted.");
driver_m.unlock();
if (lwip_is_up()) {
return;
}
if (lwip_has_previously_shutdown()) {
return;
}
Mutex::Lock _l(lwip_driver_m);
#if defined(_WIN32)
sys_init(); // Required for win32 init of critical sections
#endif
void *st = sys_thread_new("main_thread", main_lwip_driver_loop,
void *st = sys_thread_new(ZTS_LWIP_DRIVER_THREAD_NAME, main_lwip_driver_loop,
NULL, DEFAULT_THREAD_STACKSIZE, DEFAULT_THREAD_PRIO);
}
void lwip_driver_shutdown()
{
if (main_loop_exited) {
if (lwip_has_previously_shutdown()) {
return;
}
lwip_driver_initialized = false;
// Give the stack time to call the frame feed callback one last time before shutting everything down
int callbackInterval = LWIP_GUARDED_BUF_CHECK_INTERVAL*hibernationDelayMultiplier*1000;
usleep(callbackInterval*3);
while(!main_loop_exited) {
usleep(LWIP_GUARDED_BUF_CHECK_INTERVAL*1000);
Mutex::Lock _l(lwip_driver_m);
// Set flag to stop sending frames into the core
_run_lwip_tcpip = false;
// Wait until the main lwIP thread has exited
while (!_has_exited) { lwip_sleep(LWIP_GUARDED_BUF_CHECK_INTERVAL); }
// After we're certain the stack isn't processing anymore traffic,
// start dequeing from the RX queue. This queue should be rejecting
// new frames at this point.
struct zts_sorted_packet *sp;
for (int i = 0; i < ZTS_LWIP_MAX_RX_QUEUE_LEN; i++) {
if (rx_queue.try_dequeue(sp)) {
delete sp;
}
}
/*
if (tcpip_shutdown() == ERR_OK) {
@@ -248,7 +256,7 @@ err_t lwip_eth_tx(struct netif *netif, struct pbuf *p)
int len = totalLength - sizeof(struct eth_hdr);
int proto = Utils::ntoh((uint16_t)ethhdr->type);
tap->_handler(tap->_arg, NULL, tap->_nwid, src_mac, dest_mac, proto, 0, data, len);
/*
if (ZT_MSG_TRANSFER == true) {
char flagbuf[32];
memset(&flagbuf, 0, 32);
@@ -262,19 +270,22 @@ err_t lwip_eth_tx(struct netif *netif, struct pbuf *p)
DEBUG_TRANS("len=%5d dst=%s [%s TX <-- %s] proto=0x%04x %s", totalLength, macBuf, nodeBuf, tap->nodeId().c_str(),
Utils::ntoh(ethhdr->type), flagbuf);
}
*/
return ERR_OK;
}
void lwip_eth_rx(VirtualTap *tap, const MAC &from, const MAC &to, unsigned int etherType,
const void *data, unsigned int len)
{
if (!_run_lwip_tcpip) {
return;
}
struct pbuf *p,*q;
struct eth_hdr ethhdr;
from.copyTo(ethhdr.src.addr, 6);
to.copyTo(ethhdr.dest.addr, 6);
ethhdr.type = Utils::hton((uint16_t)etherType);
/*
if (ZT_MSG_TRANSFER == true) {
char flagbuf[32];
memset(&flagbuf, 0, 32);
@@ -288,7 +299,7 @@ void lwip_eth_rx(VirtualTap *tap, const MAC &from, const MAC &to, unsigned int e
DEBUG_TRANS("len=%5d dst=%s [%s RX --> %s] proto=0x%04x %s", len, macBuf, nodeBuf, tap->nodeId().c_str(),
Utils::ntoh(ethhdr.type), flagbuf);
}
*/
if (etherType == 0x0800 || etherType == 0x0806) { // ip4 or ARP
if (!tap->netif4) {
DEBUG_ERROR("dropped packet: no netif to accept this packet (etherType=%x) on this vtap (%p)", etherType, tap);
@@ -326,9 +337,8 @@ void lwip_eth_rx(VirtualTap *tap, const MAC &from, const MAC &to, unsigned int e
dataptr += q->len;
}
_rx_input_lock_m.lock();
if (rx_queue.size() >= LWIP_MAX_GUARDED_RX_BUF_SZ) {
DEBUG_INFO("dropped packet: rx_queue is full (>= %d)", LWIP_MAX_GUARDED_RX_BUF_SZ);
if (rx_queue.size_approx() >= ZTS_LWIP_MAX_RX_QUEUE_LEN) {
DEBUG_INFO("dropped packet: rx_queue is full (>= %d)", ZTS_LWIP_MAX_RX_QUEUE_LEN);
// TODO: Test performance scenarios: dropping this packet, dropping oldest front packet
pbuf_free(p);
p = NULL;
@@ -353,8 +363,7 @@ void lwip_eth_rx(VirtualTap *tap, const MAC &from, const MAC &to, unsigned int e
DEBUG_ERROR("dropped packet: unhandled (etherType=%x)", etherType);
break;
}
rx_queue.push(sp);
_rx_input_lock_m.unlock();
rx_queue.enqueue(sp);
}
static void print_netif_info(struct netif *netif) {