Expand C API and simplify NodeService
This commit is contained in:
211
src/Events.cpp
211
src/Events.cpp
@@ -4,7 +4,7 @@
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file in the project's root directory.
|
||||
*
|
||||
* Change Date: 2025-01-01
|
||||
* Change Date: 2026-01-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2.0 of the Apache License.
|
||||
@@ -14,49 +14,57 @@
|
||||
/**
|
||||
* @file
|
||||
*
|
||||
* Callback event processing logic
|
||||
* Callback event creation and distribution to user application
|
||||
*/
|
||||
|
||||
#include "concurrentqueue.h"
|
||||
|
||||
#ifdef ZTS_ENABLE_JAVA
|
||||
#include <jni.h>
|
||||
#endif
|
||||
#include "Events.hpp"
|
||||
|
||||
#include "Constants.hpp"
|
||||
#include "Debug.hpp"
|
||||
#include "Events.hpp"
|
||||
#include "Node.hpp"
|
||||
#include "NodeService.hpp"
|
||||
#include "OSUtils.hpp"
|
||||
#include "ZeroTierSockets.h"
|
||||
#include "concurrentqueue.h"
|
||||
|
||||
#define NODE_EVENT_TYPE(code) code >= ZTS_EVENT_NODE_UP&& code <= ZTS_EVENT_NODE_NORMAL_TERMINATION
|
||||
#define NETWORK_EVENT_TYPE(code) \
|
||||
code >= ZTS_EVENT_NETWORK_NOT_FOUND&& code <= ZTS_EVENT_NETWORK_UPDATE
|
||||
#define STACK_EVENT_TYPE(code) code >= ZTS_EVENT_STACK_UP&& code <= ZTS_EVENT_STACK_DOWN
|
||||
#define NETIF_EVENT_TYPE(code) code >= ZTS_EVENT_NETIF_UP&& code <= ZTS_EVENT_NETIF_LINK_DOWN
|
||||
#define PEER_EVENT_TYPE(code) code >= ZTS_EVENT_PEER_DIRECT&& code <= ZTS_EVENT_PEER_PATH_DEAD
|
||||
#define ROUTE_EVENT_TYPE(code) code >= ZTS_EVENT_ROUTE_ADDED&& code <= ZTS_EVENT_ROUTE_REMOVED
|
||||
#define ADDR_EVENT_TYPE(code) code >= ZTS_EVENT_ADDR_ADDED_IP4&& code <= ZTS_EVENT_ADDR_REMOVED_IP6
|
||||
#ifdef ZTS_ENABLE_JAVA
|
||||
#include <jni.h>
|
||||
#endif
|
||||
|
||||
#ifdef ZTS_ENABLE_PYTHON
|
||||
#include "Python.h"
|
||||
#include "Python.h"
|
||||
PythonDirectorCallbackClass* _userEventCallback = NULL;
|
||||
void PythonDirectorCallbackClass::on_zerotier_event(struct zts_callback_msg* msg)
|
||||
void PythonDirectorCallbackClass::on_zerotier_event(zts_event_msg_t* msg)
|
||||
{
|
||||
}
|
||||
#endif
|
||||
|
||||
#define ZTS_NODE_EVENT(code) code >= ZTS_EVENT_NODE_UP&& code <= ZTS_EVENT_NODE_FATAL_ERROR
|
||||
#define ZTS_NETWORK_EVENT(code) \
|
||||
code >= ZTS_EVENT_NETWORK_NOT_FOUND&& code <= ZTS_EVENT_NETWORK_UPDATE
|
||||
#define ZTS_STACK_EVENT(code) code >= ZTS_EVENT_STACK_UP&& code <= ZTS_EVENT_STACK_DOWN
|
||||
#define ZTS_NETIF_EVENT(code) code >= ZTS_EVENT_NETIF_UP&& code <= ZTS_EVENT_NETIF_LINK_DOWN
|
||||
#define ZTS_PEER_EVENT(code) code >= ZTS_EVENT_PEER_DIRECT&& code <= ZTS_EVENT_PEER_PATH_DEAD
|
||||
#define ZTS_ROUTE_EVENT(code) code >= ZTS_EVENT_ROUTE_ADDED&& code <= ZTS_EVENT_ROUTE_REMOVED
|
||||
#define ZTS_ADDR_EVENT(code) code >= ZTS_EVENT_ADDR_ADDED_IP4&& code <= ZTS_EVENT_ADDR_REMOVED_IP6
|
||||
#define ZTS_STORE_EVENT(code) \
|
||||
code >= ZTS_EVENT_STORE_IDENTITY_SECRET&& code <= ZTS_EVENT_STORE_NETWORK
|
||||
|
||||
namespace ZeroTier {
|
||||
|
||||
extern NodeService* service;
|
||||
extern NodeService* zts_service;
|
||||
|
||||
// Global state variable shared between Socket, Control, Event and NodeService logic.
|
||||
uint8_t _serviceStateFlags;
|
||||
// Global state variable shared between Socket, Control, Event and
|
||||
// NodeService logic.
|
||||
volatile uint8_t service_state = 0;
|
||||
int last_state_check;
|
||||
|
||||
#define RESET_FLAGS() service_state = 0;
|
||||
#define SET_FLAGS(f) service_state |= f;
|
||||
#define CLR_FLAGS(f) service_state &= ~f;
|
||||
#define GET_FLAGS(f) ((service_state & f) > 0)
|
||||
|
||||
// Lock to guard access to callback function pointers.
|
||||
Mutex _callbackLock;
|
||||
Mutex events_m;
|
||||
|
||||
#ifdef ZTS_ENABLE_PINVOKE
|
||||
void (*_userEventCallback)(void*);
|
||||
@@ -65,45 +73,74 @@ void (*_userEventCallback)(void*);
|
||||
void (*_userEventCallback)(void*);
|
||||
#endif
|
||||
|
||||
moodycamel::ConcurrentQueue<struct zts_callback_msg*> _callbackMsgQueue;
|
||||
moodycamel::ConcurrentQueue<zts_event_msg_t*> _callbackMsgQueue;
|
||||
|
||||
void _enqueueEvent(int16_t eventCode, void* arg)
|
||||
void Events::run()
|
||||
{
|
||||
struct zts_callback_msg* msg = new zts_callback_msg();
|
||||
msg->eventCode = eventCode;
|
||||
while (getState(ZTS_STATE_CALLBACKS_RUNNING) || _callbackMsgQueue.size_approx() > 0) {
|
||||
zts_event_msg_t* msg;
|
||||
size_t sz = _callbackMsgQueue.size_approx();
|
||||
for (size_t j = 0; j < sz; j++) {
|
||||
if (_callbackMsgQueue.try_dequeue(msg)) {
|
||||
events_m.lock();
|
||||
sendToUser(msg);
|
||||
events_m.unlock();
|
||||
delete msg;
|
||||
}
|
||||
}
|
||||
zts_util_delay(ZTS_CALLBACK_PROCESSING_INTERVAL);
|
||||
}
|
||||
}
|
||||
|
||||
if (NODE_EVENT_TYPE(eventCode)) {
|
||||
msg->node = (struct zts_node_details*)arg;
|
||||
void Events::enqueue(int16_t event_code, const void* arg, int len)
|
||||
{
|
||||
if (! _enabled) {
|
||||
return;
|
||||
}
|
||||
if (NETWORK_EVENT_TYPE(eventCode)) {
|
||||
msg->network = (struct zts_network_details*)arg;
|
||||
zts_event_msg_t* msg = new zts_event_msg_t();
|
||||
msg->event_code = event_code;
|
||||
|
||||
if (ZTS_NODE_EVENT(event_code)) {
|
||||
msg->node = (zts_node_info_t*)arg;
|
||||
msg->len = sizeof(zts_node_info_t);
|
||||
}
|
||||
if (STACK_EVENT_TYPE(eventCode)) {
|
||||
if (ZTS_NETWORK_EVENT(event_code)) {
|
||||
msg->network = (zts_net_info_t*)arg;
|
||||
msg->len = sizeof(zts_net_info_t);
|
||||
}
|
||||
if (ZTS_STACK_EVENT(event_code)) {
|
||||
/* nothing to convey to user */
|
||||
}
|
||||
if (NETIF_EVENT_TYPE(eventCode)) {
|
||||
msg->netif = (struct zts_netif_details*)arg;
|
||||
if (ZTS_NETIF_EVENT(event_code)) {
|
||||
msg->netif = (zts_netif_info_t*)arg;
|
||||
msg->len = sizeof(zts_netif_info_t);
|
||||
}
|
||||
if (ROUTE_EVENT_TYPE(eventCode)) {
|
||||
msg->route = (struct zts_virtual_network_route*)arg;
|
||||
if (ZTS_ROUTE_EVENT(event_code)) {
|
||||
msg->route = (zts_route_info_t*)arg;
|
||||
msg->len = sizeof(zts_route_info_t);
|
||||
}
|
||||
if (PEER_EVENT_TYPE(eventCode)) {
|
||||
msg->peer = (struct zts_peer_details*)arg;
|
||||
if (ZTS_PEER_EVENT(event_code)) {
|
||||
msg->peer = (zts_peer_info_t*)arg;
|
||||
msg->len = sizeof(zts_peer_info_t);
|
||||
}
|
||||
if (ADDR_EVENT_TYPE(eventCode)) {
|
||||
msg->addr = (struct zts_addr_details*)arg;
|
||||
if (ZTS_ADDR_EVENT(event_code)) {
|
||||
msg->addr = (zts_addr_info_t*)arg;
|
||||
msg->len = sizeof(zts_addr_info_t);
|
||||
}
|
||||
if (ZTS_STORE_EVENT(event_code)) {
|
||||
msg->cache = (void*)arg;
|
||||
msg->len = len;
|
||||
}
|
||||
|
||||
if (msg && _callbackMsgQueue.size_approx() > 1024) {
|
||||
// Rate-limit number of events
|
||||
_freeEvent(msg);
|
||||
destroy(msg);
|
||||
}
|
||||
else {
|
||||
_callbackMsgQueue.enqueue(msg);
|
||||
}
|
||||
}
|
||||
|
||||
void _freeEvent(struct zts_callback_msg* msg)
|
||||
void Events::destroy(zts_event_msg_t* msg)
|
||||
{
|
||||
if (! msg) {
|
||||
return;
|
||||
@@ -128,9 +165,9 @@ void _freeEvent(struct zts_callback_msg* msg)
|
||||
}
|
||||
}
|
||||
|
||||
void _passDequeuedEventToUser(struct zts_callback_msg* msg)
|
||||
void Events::sendToUser(zts_event_msg_t* msg)
|
||||
{
|
||||
bool bShouldStopCallbackThread = (msg->eventCode == ZTS_EVENT_STACK_DOWN);
|
||||
bool bShouldStopCallbackThread = (msg->event_code == ZTS_EVENT_STACK_DOWN);
|
||||
#ifdef ZTS_ENABLE_PYTHON
|
||||
PyGILState_STATE state = PyGILState_Ensure();
|
||||
_userEventCallback->on_zerotier_event(msg);
|
||||
@@ -139,24 +176,24 @@ void _passDequeuedEventToUser(struct zts_callback_msg* msg)
|
||||
#ifdef ZTS_ENABLE_JAVA
|
||||
if (_userCallbackMethodRef) {
|
||||
JNIEnv* env;
|
||||
#if defined(__ANDROID__)
|
||||
#if defined(__ANDROID__)
|
||||
jint rs = jvm->AttachCurrentThread(&env, NULL);
|
||||
#else
|
||||
#else
|
||||
jint rs = jvm->AttachCurrentThread((void**)&env, NULL);
|
||||
#endif
|
||||
#endif
|
||||
assert(rs == JNI_OK);
|
||||
uint64_t arg = 0;
|
||||
uint64_t id = 0;
|
||||
if (NODE_EVENT_TYPE(msg->eventCode)) {
|
||||
if (ZTS_NODE_EVENT(msg->event_code)) {
|
||||
id = msg->node ? msg->node->address : 0;
|
||||
}
|
||||
if (NETWORK_EVENT_TYPE(msg->eventCode)) {
|
||||
if (ZTS_NETWORK_EVENT(msg->event_code)) {
|
||||
id = msg->network ? msg->network->nwid : 0;
|
||||
}
|
||||
if (PEER_EVENT_TYPE(msg->eventCode)) {
|
||||
if (ZTS_PEER_EVENT(msg->event_code)) {
|
||||
id = msg->peer ? msg->peer->address : 0;
|
||||
}
|
||||
env->CallVoidMethod(objRef, _userCallbackMethodRef, id, msg->eventCode);
|
||||
env->CallVoidMethod(objRef, _userCallbackMethodRef, id, msg->event_code);
|
||||
}
|
||||
#endif // ZTS_ENABLE_JAVA
|
||||
#ifdef ZTS_ENABLE_PINVOKE
|
||||
@@ -169,53 +206,47 @@ void _passDequeuedEventToUser(struct zts_callback_msg* msg)
|
||||
_userEventCallback(msg);
|
||||
}
|
||||
#endif
|
||||
_freeEvent(msg);
|
||||
destroy(msg);
|
||||
if (bShouldStopCallbackThread) {
|
||||
/* Ensure last possible callback ZTS_EVENT_STACK_DOWN is
|
||||
delivered before callback thread is finally stopped. */
|
||||
_clrState(ZTS_STATE_CALLBACKS_RUNNING);
|
||||
clrState(ZTS_STATE_CALLBACKS_RUNNING);
|
||||
}
|
||||
}
|
||||
|
||||
bool _isCallbackRegistered()
|
||||
bool Events::hasCallback()
|
||||
{
|
||||
_callbackLock.lock();
|
||||
events_m.lock();
|
||||
bool retval = false;
|
||||
#ifdef ZTS_ENABLE_JAVA
|
||||
retval = (jvm && objRef && _userCallbackMethodRef);
|
||||
#else
|
||||
retval = _userEventCallback;
|
||||
#endif
|
||||
_callbackLock.unlock();
|
||||
events_m.unlock();
|
||||
return retval;
|
||||
}
|
||||
|
||||
void _clearRegisteredCallback()
|
||||
void Events::clrCallback()
|
||||
{
|
||||
_callbackLock.lock();
|
||||
events_m.lock();
|
||||
#ifdef ZTS_ENABLE_JAVA
|
||||
objRef = NULL;
|
||||
_userCallbackMethodRef = NULL;
|
||||
#else
|
||||
_userEventCallback = NULL;
|
||||
#endif
|
||||
_callbackLock.unlock();
|
||||
events_m.unlock();
|
||||
}
|
||||
|
||||
int _canPerformServiceOperation()
|
||||
int Events::canPerformServiceOperation()
|
||||
{
|
||||
return service && service->isRunning() && service->getNode() && service->getNode()->online()
|
||||
&& ! _getState(ZTS_STATE_FREE_CALLED);
|
||||
return zts_service && zts_service->isRunning() && ! getState(ZTS_STATE_FREE_CALLED);
|
||||
}
|
||||
|
||||
#define RESET_FLAGS() _serviceStateFlags = 0;
|
||||
#define SET_FLAGS(f) _serviceStateFlags |= f;
|
||||
#define CLR_FLAGS(f) _serviceStateFlags &= ~f;
|
||||
#define GET_FLAGS(f) ((_serviceStateFlags & f) > 0)
|
||||
|
||||
void _setState(uint8_t newFlags)
|
||||
void Events::setState(uint8_t newFlags)
|
||||
{
|
||||
if ((newFlags ^ _serviceStateFlags) & ZTS_STATE_NET_SERVICE_RUNNING) {
|
||||
if ((newFlags ^ service_state) & ZTS_STATE_NET_SERVICE_RUNNING) {
|
||||
return; // No effect. Not allowed to set this flag manually
|
||||
}
|
||||
SET_FLAGS(newFlags);
|
||||
@@ -228,7 +259,7 @@ void _setState(uint8_t newFlags)
|
||||
}
|
||||
}
|
||||
|
||||
void _clrState(uint8_t newFlags)
|
||||
void Events::clrState(uint8_t newFlags)
|
||||
{
|
||||
if (newFlags & ZTS_STATE_NET_SERVICE_RUNNING) {
|
||||
return; // No effect. Not allowed to set this flag manually
|
||||
@@ -243,39 +274,19 @@ void _clrState(uint8_t newFlags)
|
||||
}
|
||||
}
|
||||
|
||||
bool _getState(uint8_t testFlags)
|
||||
bool Events::getState(uint8_t testFlags)
|
||||
{
|
||||
return testFlags & _serviceStateFlags;
|
||||
return testFlags & service_state;
|
||||
}
|
||||
|
||||
#if defined(__WINDOWS__)
|
||||
DWORD WINAPI _runCallbacks(LPVOID thread_id)
|
||||
#else
|
||||
void* _runCallbacks(void* thread_id)
|
||||
#endif
|
||||
void Events::enable()
|
||||
{
|
||||
#if defined(__APPLE__)
|
||||
pthread_setname_np(ZTS_EVENT_CALLBACK_THREAD_NAME);
|
||||
#endif
|
||||
while (_getState(ZTS_STATE_CALLBACKS_RUNNING) || _callbackMsgQueue.size_approx() > 0) {
|
||||
struct zts_callback_msg* msg;
|
||||
size_t sz = _callbackMsgQueue.size_approx();
|
||||
for (size_t j = 0; j < sz; j++) {
|
||||
if (_callbackMsgQueue.try_dequeue(msg)) {
|
||||
_callbackLock.lock();
|
||||
_passDequeuedEventToUser(msg);
|
||||
_callbackLock.unlock();
|
||||
delete msg;
|
||||
}
|
||||
}
|
||||
zts_delay_ms(ZTS_CALLBACK_PROCESSING_INTERVAL);
|
||||
}
|
||||
#if ZTS_ENABLE_JAVA
|
||||
JNIEnv* env;
|
||||
jint rs = jvm->DetachCurrentThread();
|
||||
pthread_exit(0);
|
||||
#endif
|
||||
return NULL;
|
||||
_enabled = true;
|
||||
}
|
||||
|
||||
void Events::disable()
|
||||
{
|
||||
_enabled = false;
|
||||
}
|
||||
|
||||
} // namespace ZeroTier
|
||||
|
||||
Reference in New Issue
Block a user