Unfinished callback improvements, consolidated netif4 and netif6

This commit is contained in:
Joseph Henry
2019-02-14 17:27:16 -08:00
parent c8e6662d24
commit a43d1d04e8
9 changed files with 515 additions and 267 deletions

View File

@@ -113,40 +113,81 @@ OneService *service;
static jmethodID _userCallbackMethodRef = NULL;
#endif
void (*_userEventCallbackFunc)(uint64_t, int);
void (*_userEventCallbackFunc)(struct zts_callback_msg *);
extern moodycamel::ConcurrentQueue<std::pair<uint64_t, int>*> _callbackMsgQueue;
moodycamel::ConcurrentQueue<struct zts_callback_msg*> _callbackMsgQueue;
//////////////////////////////////////////////////////////////////////////////
// Internal ZeroTier Service Controls (user application shall not use these)//
//////////////////////////////////////////////////////////////////////////////
void postEvent(uint64_t id, int eventCode)
void postEvent(int eventCode, void *arg)
{
// Queue callback event messages from other threads (such as lwIP driver)
_callbackMsgQueue.enqueue(new std::pair<uint64_t, int>(id, eventCode));
struct zts_callback_msg *msg = new zts_callback_msg();
msg->eventCode = eventCode;
if (NODE_EVENT_TYPE(eventCode)) {
msg->node = (struct zts_node_details*)arg;
} if (NETWORK_EVENT_TYPE(eventCode)) {
msg->network = (struct zts_network_details*)arg;
} if (NETIF_EVENT_TYPE(eventCode)) {
msg->netif = (struct zts_netif_details*)arg;
} if (ROUTE_EVENT_TYPE(eventCode)) {
msg->route = (struct zts_virtual_network_route*)arg;
} if (PATH_EVENT_TYPE(eventCode)) {
msg->path = (struct zts_physical_path*)arg;
} if (PEER_EVENT_TYPE(eventCode)) {
msg->peer = (struct zts_peer_details*)arg;
} if (ADDR_EVENT_TYPE(eventCode)) {
msg->addr = (struct zts_addr_details*)arg;
}
_callbackMsgQueue.enqueue(msg);
}
void _process_callback_event_helper(uint64_t nwid, int eventCode)
void postEvent(int eventCode) {
postEvent(eventCode, (void*)0);
}
void freeEvent(struct zts_callback_msg *msg)
{
if (msg->node) { delete msg->node; }
if (msg->network) { delete msg->network; }
if (msg->netif) { delete msg->netif; }
if (msg->route) { delete msg->route; }
if (msg->path) { delete msg->path; }
if (msg->peer) { delete msg->peer; }
if (msg->addr) { delete msg->addr; }
}
void _process_callback_event_helper(struct zts_callback_msg *msg)
{
#ifdef SDK_JNI
if(_userCallbackMethodRef) {
JNIEnv *env;
jint rs = jvm->AttachCurrentThread(&env, NULL);
assert (rs == JNI_OK);
env->CallVoidMethod(objRef, _userCallbackMethodRef, nwid, eventCode);
if (NODE_EVENT_TYPE(msg->eventCode)) {
arg = msg->networkId;
}
if (NODE_EVENT_TYPE(msg->eventCode)) {
arg = msg->nodeId;
}
if (NODE_EVENT_TYPE(msg->eventCode)) {
arg = msg->nodeId;
}
env->CallVoidMethod(objRef, _userCallbackMethodRef, msg->networkId, msg->eventCode);
}
#else
if (_userEventCallbackFunc) {
_userEventCallbackFunc(nwid, eventCode);
_userEventCallbackFunc(msg);
}
#endif
}
void _process_callback_event(uint64_t nwid, int eventCode)
void _process_callback_event(struct zts_callback_msg *msg)
{
_callback_lock.lock();
_process_callback_event_helper(nwid, eventCode);
_process_callback_event_helper(msg);
_callback_lock.unlock();
}
@@ -215,11 +256,11 @@ void *_zts_run_callbacks(void *thread_id)
#endif
while (_run_callbacks || _callbackMsgQueue.size_approx() > 0)
{
std::pair<uint64_t, int> *msg;
for (int j = 0; j != 32; j++) { // TODO: Check size of queue
struct zts_callback_msg *msg;
int sz = _callbackMsgQueue.size_approx();
for (int j = 0; j < sz; j++) {
if (_callbackMsgQueue.try_dequeue(msg)) {
// DEBUG_INFO("deqeueuing front: %llx,%d", msg->first, msg->second);
_process_callback_event(msg->first, msg->second);
_process_callback_event(msg);
delete msg;
}
}
@@ -272,12 +313,12 @@ void *_zts_run_service(void *arg)
switch(service->run()) {
case OneService::ONE_STILL_RUNNING:
case OneService::ONE_NORMAL_TERMINATION:
postEvent((uint64_t)0, ZTS_EVENT_NODE_NORMAL_TERMINATION);
postEvent(ZTS_EVENT_NODE_NORMAL_TERMINATION);
break;
case OneService::ONE_UNRECOVERABLE_ERROR:
DEBUG_ERROR("fatal error: %s", service->fatalErrorMessage().c_str());
err = true;
postEvent((uint64_t)0, ZTS_EVENT_NODE_UNRECOVERABLE_ERROR);
postEvent(ZTS_EVENT_NODE_UNRECOVERABLE_ERROR);
break;
case OneService::ONE_IDENTITY_COLLISION: {
err = true;
@@ -290,7 +331,7 @@ void *_zts_run_service(void *arg)
OSUtils::rm((_path + ZT_PATH_SEPARATOR_S + "identity.secret").c_str());
OSUtils::rm((_path + ZT_PATH_SEPARATOR_S + "identity.public").c_str());
}
postEvent((uint64_t)0, ZTS_EVENT_NODE_IDENTITY_COLLISION);
postEvent(ZTS_EVENT_NODE_IDENTITY_COLLISION);
} continue; // restart!
}
break; // terminate loop -- normally we don't keep restarting
@@ -300,7 +341,7 @@ void *_zts_run_service(void *arg)
delete service;
service = (OneService *)0;
_service_lock.unlock();
postEvent((uint64_t)0, ZTS_EVENT_NODE_DOWN);
postEvent(ZTS_EVENT_NODE_DOWN);
} catch ( ... ) {
DEBUG_ERROR("unexpected exception starting ZeroTier instance");
}
@@ -411,7 +452,7 @@ zts_err_t zts_deorbit(uint64_t moonWorldId)
#ifdef SDK_JNI
#endif
zts_err_t zts_start(const char *path, void (*callback)(uint64_t, int), int port)
zts_err_t zts_start(const char *path, void (*callback)(struct zts_callback_msg*), int port)
{
Mutex::Lock _l(_service_lock);
lwip_driver_init();