diff --git a/src/Options.h b/src/Options.h index 1f26c4a..569c871 100644 --- a/src/Options.h +++ b/src/Options.h @@ -42,6 +42,7 @@ #define ZTS_SERVICE_THREAD_NAME "ZeroTierServiceThread" #define ZTS_EVENT_CALLBACK_THREAD_NAME "ZeroTierEventCallbackThread" +#define ZTS_LWIP_DRIVER_THREAD_NAME "lwipDriver" ////////////////////////////////////////////////////////////////////////////// // lwIP behaviour (tcpip driver) // @@ -58,9 +59,9 @@ #define LWIP_GUARDED_BUF_CHECK_INTERVAL 5 /** - * Number of frame pointers that can be cached waiting for receipt into core + * Number of packets that can be queued for ingress into the lwIP core */ -#define LWIP_MAX_GUARDED_RX_BUF_SZ 1024 +#define ZTS_LWIP_MAX_RX_QUEUE_LEN 1024 ////////////////////////////////////////////////////////////////////////////// // Service behaviour // diff --git a/src/Service.cpp b/src/Service.cpp index abc829a..a23315d 100644 --- a/src/Service.cpp +++ b/src/Service.cpp @@ -827,6 +827,9 @@ public: inline void generateEventMsgs() { + if (!lwip_is_up()) { + return; // Don't process peer status events unless the stack is up. + } // Generate messages to be dequeued by the callback message thread #if ZTS_NETWORK_CALLBACKS Mutex::Lock _l(_nets_m); @@ -870,7 +873,7 @@ public: ZT_PeerList *pl = _node->peers(); if (pl) { for(unsigned long i=0;ipeerCount;++i) { - if (!peerCache.count(pl->peers[i].address)) { // Add first entry + if (!peerCache.count(pl->peers[i].address)) { if (pl->peers[i].pathCount > 0) { _callbackMsgQueue.enqueue(new std::pair(pl->peers[i].address, ZTS_EVENT_PEER_P2P)); } @@ -885,6 +888,7 @@ public: _callbackMsgQueue.enqueue(new std::pair(pl->peers[i].address, ZTS_EVENT_PEER_RELAY)); } } + // Update our cache with most recently observed path count peerCache[pl->peers[i].address] = pl->peers[i].pathCount; } } diff --git a/src/lwipDriver.cpp b/src/lwipDriver.cpp index faf95d0..907ce34 100644 --- a/src/lwipDriver.cpp +++ b/src/lwipDriver.cpp @@ -56,6 +56,9 @@ #include "Controls.hpp" extern void postEvent(uint64_t id, int eventCode); +#include "concurrentqueue.h" +moodycamel::ConcurrentQueue rx_queue; + #if defined(_WIN32) #include #endif @@ -67,17 +70,19 @@ extern void postEvent(uint64_t id, int eventCode); namespace ZeroTier { -bool main_loop_exited = false; -bool lwip_driver_initialized = false; -bool has_already_been_initialized = false; +bool _has_exited = false; int hibernationDelayMultiplier = 1; - extern bool _run_lwip_tcpip; +Mutex lwip_driver_m; -Mutex driver_m; - -std::queue rx_queue; -Mutex _rx_input_lock_m; +void lwip_sleep(long ms) +{ +#if defined(_WIN32) + Sleep(ms*hibernationDelayMultiplier); +#else + usleep(ms*1000*hibernationDelayMultiplier); +#endif +} void lwip_hibernate_driver() { @@ -94,53 +99,46 @@ static void tcpip_init_done(void *arg) { sys_sem_t *sem; sem = (sys_sem_t *)arg; - lwip_driver_initialized = true; _run_lwip_tcpip = true; postEvent((uint64_t)0, ZTS_EVENT_NETWORK_STACK_UP); - driver_m.unlock(); sys_sem_signal(sem); } void my_tcpip_callback(void *arg) { - if (main_loop_exited) { + if (!_run_lwip_tcpip) { return; } err_t err = ERR_OK; int loop_score = LWIP_FRAMES_HANDLED_PER_CORE_CALL; // max num of packets to read per polling call - while (loop_score > 0) { + struct zts_sorted_packet *sp; + while (loop_score > 0 && rx_queue.size_approx() > 0) { // TODO: Swap this block out for a thread-safe container - _rx_input_lock_m.lock(); - if (rx_queue.size() == 0) { - _rx_input_lock_m.unlock(); - return; - } - struct zts_sorted_packet *sp = rx_queue.front(); - struct pbuf *p = sp->p; - rx_queue.pop(); - _rx_input_lock_m.unlock(); - // Feed packet into appropriate lwIP netif - if (sp->p && sp->n) { - if ((err = sp->n->input(sp->p, sp->n)) != ERR_OK) { - DEBUG_ERROR("packet input error (p=%p, n=%p)=%d", p, sp->n, err); - pbuf_free(p); + struct pbuf *p; + if (rx_queue.try_dequeue(sp)) { + p = sp->p; + // Feed packet into appropriate lwIP netif + if (sp->p && sp->n) { + if ((err = sp->n->input(sp->p, sp->n)) != ERR_OK) { + DEBUG_ERROR("packet input error (p=%p, n=%p)=%d", p, sp->n, err); + pbuf_free(p); + } + sp->p = NULL; } - sp->p = NULL; + delete sp; + sp = NULL; } - delete sp; - sp = NULL; loop_score--; } } -// Main thread which starts the initialization process static void main_lwip_driver_loop(void *arg) { #if defined(__linux__) - pthread_setname_np(pthread_self(), "lwipDriver"); + pthread_setname_np(pthread_self(), ZTS_LWIP_DRIVER_THREAD_NAME); #endif #if defined(__APPLE__) - pthread_setname_np("lwipDriver"); + pthread_setname_np(ZTS_LWIP_DRIVER_THREAD_NAME); #endif sys_sem_t sem; LWIP_UNUSED_ARG(arg); @@ -148,53 +146,63 @@ static void main_lwip_driver_loop(void *arg) DEBUG_ERROR("failed to create semaphore"); } tcpip_init(tcpip_init_done, &sem); - has_already_been_initialized = true; sys_sem_wait(&sem); - while(lwip_driver_initialized) { -#if defined(_WIN32) - Sleep(LWIP_GUARDED_BUF_CHECK_INTERVAL*hibernationDelayMultiplier); -#else - usleep(LWIP_GUARDED_BUF_CHECK_INTERVAL*1000*hibernationDelayMultiplier); -#endif + // Main loop + while(_run_lwip_tcpip) { + lwip_sleep(LWIP_GUARDED_BUF_CHECK_INTERVAL); // Handle incoming packets from the core's thread context. // If you feed frames into the core directly you will violate the core's thread model tcpip_callback_with_block(my_tcpip_callback, NULL, 1); } - main_loop_exited = true; - _run_lwip_tcpip = false; + _has_exited = true; +} + +bool lwip_is_up() +{ + Mutex::Lock _l(lwip_driver_m); + return _run_lwip_tcpip; +} + +bool lwip_has_previously_shutdown() +{ + Mutex::Lock _l(lwip_driver_m); + return _has_exited; } -// Initialize the lwIP stack void lwip_driver_init() { - driver_m.lock(); // Unlocked from callback indicating completion of driver init - if (has_already_been_initialized || lwip_driver_initialized) { - // Already initialized, skip - driver_m.unlock(); - return; - } if (main_loop_exited) { - DEBUG_ERROR("stack has previously been shutdown an cannot be restarted."); - driver_m.unlock(); + if (lwip_is_up()) { return; } + if (lwip_has_previously_shutdown()) { + return; + } + Mutex::Lock _l(lwip_driver_m); #if defined(_WIN32) sys_init(); // Required for win32 init of critical sections #endif - void *st = sys_thread_new("main_thread", main_lwip_driver_loop, + void *st = sys_thread_new(ZTS_LWIP_DRIVER_THREAD_NAME, main_lwip_driver_loop, NULL, DEFAULT_THREAD_STACKSIZE, DEFAULT_THREAD_PRIO); } void lwip_driver_shutdown() { - if (main_loop_exited) { + if (lwip_has_previously_shutdown()) { return; } - lwip_driver_initialized = false; - // Give the stack time to call the frame feed callback one last time before shutting everything down - int callbackInterval = LWIP_GUARDED_BUF_CHECK_INTERVAL*hibernationDelayMultiplier*1000; - usleep(callbackInterval*3); - while(!main_loop_exited) { - usleep(LWIP_GUARDED_BUF_CHECK_INTERVAL*1000); + Mutex::Lock _l(lwip_driver_m); + // Set flag to stop sending frames into the core + _run_lwip_tcpip = false; + // Wait until the main lwIP thread has exited + while (!_has_exited) { lwip_sleep(LWIP_GUARDED_BUF_CHECK_INTERVAL); } + // After we're certain the stack isn't processing anymore traffic, + // start dequeing from the RX queue. This queue should be rejecting + // new frames at this point. + struct zts_sorted_packet *sp; + for (int i = 0; i < ZTS_LWIP_MAX_RX_QUEUE_LEN; i++) { + if (rx_queue.try_dequeue(sp)) { + delete sp; + } } /* if (tcpip_shutdown() == ERR_OK) { @@ -248,7 +256,7 @@ err_t lwip_eth_tx(struct netif *netif, struct pbuf *p) int len = totalLength - sizeof(struct eth_hdr); int proto = Utils::ntoh((uint16_t)ethhdr->type); tap->_handler(tap->_arg, NULL, tap->_nwid, src_mac, dest_mac, proto, 0, data, len); -/* + if (ZT_MSG_TRANSFER == true) { char flagbuf[32]; memset(&flagbuf, 0, 32); @@ -262,19 +270,22 @@ err_t lwip_eth_tx(struct netif *netif, struct pbuf *p) DEBUG_TRANS("len=%5d dst=%s [%s TX <-- %s] proto=0x%04x %s", totalLength, macBuf, nodeBuf, tap->nodeId().c_str(), Utils::ntoh(ethhdr->type), flagbuf); } -*/ + return ERR_OK; } void lwip_eth_rx(VirtualTap *tap, const MAC &from, const MAC &to, unsigned int etherType, const void *data, unsigned int len) { + if (!_run_lwip_tcpip) { + return; + } struct pbuf *p,*q; struct eth_hdr ethhdr; from.copyTo(ethhdr.src.addr, 6); to.copyTo(ethhdr.dest.addr, 6); ethhdr.type = Utils::hton((uint16_t)etherType); -/* + if (ZT_MSG_TRANSFER == true) { char flagbuf[32]; memset(&flagbuf, 0, 32); @@ -288,7 +299,7 @@ void lwip_eth_rx(VirtualTap *tap, const MAC &from, const MAC &to, unsigned int e DEBUG_TRANS("len=%5d dst=%s [%s RX --> %s] proto=0x%04x %s", len, macBuf, nodeBuf, tap->nodeId().c_str(), Utils::ntoh(ethhdr.type), flagbuf); } -*/ + if (etherType == 0x0800 || etherType == 0x0806) { // ip4 or ARP if (!tap->netif4) { DEBUG_ERROR("dropped packet: no netif to accept this packet (etherType=%x) on this vtap (%p)", etherType, tap); @@ -326,9 +337,8 @@ void lwip_eth_rx(VirtualTap *tap, const MAC &from, const MAC &to, unsigned int e dataptr += q->len; } - _rx_input_lock_m.lock(); - if (rx_queue.size() >= LWIP_MAX_GUARDED_RX_BUF_SZ) { - DEBUG_INFO("dropped packet: rx_queue is full (>= %d)", LWIP_MAX_GUARDED_RX_BUF_SZ); + if (rx_queue.size_approx() >= ZTS_LWIP_MAX_RX_QUEUE_LEN) { + DEBUG_INFO("dropped packet: rx_queue is full (>= %d)", ZTS_LWIP_MAX_RX_QUEUE_LEN); // TODO: Test performance scenarios: dropping this packet, dropping oldest front packet pbuf_free(p); p = NULL; @@ -353,8 +363,7 @@ void lwip_eth_rx(VirtualTap *tap, const MAC &from, const MAC &to, unsigned int e DEBUG_ERROR("dropped packet: unhandled (etherType=%x)", etherType); break; } - rx_queue.push(sp); - _rx_input_lock_m.unlock(); + rx_queue.enqueue(sp); } static void print_netif_info(struct netif *netif) { diff --git a/src/lwipDriver.hpp b/src/lwipDriver.hpp index e9c2fda..e8ea7b4 100644 --- a/src/lwipDriver.hpp +++ b/src/lwipDriver.hpp @@ -77,6 +77,11 @@ void lwip_hibernate_driver(); */ void lwip_wake_driver(); +/** + * Returns whether the lwIP network stack is up and ready to process traffic + */ +bool lwip_is_up(); + /** * @brief Initialize network stack semaphores, threads, and timers. *