diff --git a/conf/tfe/tfe.conf b/conf/tfe/tfe.conf index 9577012..66069d8 100644 --- a/conf/tfe/tfe.conf +++ b/conf/tfe/tfe.conf @@ -42,6 +42,15 @@ cmsg_port=2475 watchdog_switch=1 watchdog_port=2476 +[watchdog_tfe] +# The worker thread updates the timestamp every two seconds +# The watchdog thread checks the timestamp every second +enable=1 +timeout_seconds=5 +statistics_window=20 +timeout_cnt_as_fail=3 +timeout_debug=0 + [ssl] ssl_ja3_debug=0 ssl_ja3_table=PXY_SSL_FINGERPRINT diff --git a/platform/CMakeLists.txt b/platform/CMakeLists.txt index 71912ea..388c8dd 100644 --- a/platform/CMakeLists.txt +++ b/platform/CMakeLists.txt @@ -4,7 +4,7 @@ find_package(NFNETLINK REQUIRED) add_executable(tfe src/acceptor_kni_v1.cpp src/acceptor_kni_v2.cpp src/acceptor_kni_v3.cpp src/ssl_stream.cpp src/key_keeper.cpp src/ssl_fetch_cert.cpp src/ssl_sess_cache.cpp src/ssl_sess_ticket.cpp src/ssl_service_cache.cpp src/ssl_trusted_cert_storage.cpp src/ev_root_ca_metadata.cpp src/ssl_utils.cpp - src/tcp_stream.cpp src/main.cpp src/proxy.cpp src/sender_scm.cpp src/watchdog_kni.cpp src/ssl_ja3.cpp) + src/tcp_stream.cpp src/main.cpp src/proxy.cpp src/sender_scm.cpp src/watchdog_kni.cpp src/watchdog_tfe.cpp src/ssl_ja3.cpp) target_include_directories(tfe PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include/external) target_include_directories(tfe PRIVATE ${CMAKE_CURRENT_LIST_DIR}/include/internal) diff --git a/platform/include/internal/platform.h b/platform/include/internal/platform.h index 00371ca..92540ac 100644 --- a/platform/include/internal/platform.h +++ b/platform/include/internal/platform.h @@ -11,6 +11,7 @@ struct tfe_thread_ctx { struct tfe_proxy *proxy; pthread_t thr; + int readable_tid; unsigned int thread_id; unsigned int load; diff --git a/platform/include/internal/proxy.h b/platform/include/internal/proxy.h index 5b91e94..5a71deb 100644 --- a/platform/include/internal/proxy.h +++ b/platform/include/internal/proxy.h @@ -118,6 +118,7 @@ struct tfe_proxy struct acceptor_kni_v3 * kni_v3_acceptor; struct sender_scm * scm_sender; struct watchdog_kni * watchdog_kni; + struct watchdog_tfe * watchdog_tfe; /* DEBUG OPTIONS */ unsigned int tcp_all_passthrough; diff --git a/platform/include/internal/watchdog_tfe.h b/platform/include/internal/watchdog_tfe.h new file mode 100644 index 0000000..4e555b6 --- /dev/null +++ b/platform/include/internal/watchdog_tfe.h @@ -0,0 +1,7 @@ +#ifndef TFE_WATCHDOG_TFE_H +#define TFE_WATCHDOG_TFE_H + +struct watchdog_tfe; +struct watchdog_tfe *watchdog_tfe_create(struct tfe_proxy *proxy, const char *profile, void *logger); + +#endif //TFE_WATCHDOG_TFE_H \ No newline at end of file diff --git a/platform/src/acceptor_kni_v3.cpp b/platform/src/acceptor_kni_v3.cpp index 61cf031..290167f 100644 --- a/platform/src/acceptor_kni_v3.cpp +++ b/platform/src/acceptor_kni_v3.cpp @@ -313,7 +313,7 @@ static int payload_handler_cb(struct nfq_q_handle *qh, struct nfgenmsg *nfmsg, s raw_payload_len = nfq_get_payload(nfa, &raw_payload); if ((unsigned int)raw_payload_len <= (MIN(sizeof(struct iphdr), sizeof(struct ip6_hdr)) + sizeof(struct tcphdr))) { - TFE_LOG_ERROR(g_default_logger, "Failed at nfq_get_payload(), paylod len %d too small, less than %d", raw_payload_len, (MIN(sizeof(struct iphdr), sizeof(struct ip6_hdr)) + sizeof(struct tcphdr))); + TFE_LOG_ERROR(g_default_logger, "Failed at nfq_get_payload(), paylod len %d too small, less than %lu", raw_payload_len, (MIN(sizeof(struct iphdr), sizeof(struct ip6_hdr)) + sizeof(struct tcphdr))); tfe_hexdump2file(stderr, "Failed at parsing payload, payload len too small", raw_payload, (unsigned int)raw_payload_len); goto end; } @@ -353,7 +353,7 @@ static int payload_handler_cb(struct nfq_q_handle *qh, struct nfgenmsg *nfmsg, s // check if there is a tcp options if (pktinfo.tcphdr_len <= sizeof(struct tcphdr)) { - TFE_LOG_ERROR(g_default_logger, "Failed at parser TCP header, TCP header len %d too small, less than %d", pktinfo.tcphdr_len, sizeof(struct tcphdr)); + TFE_LOG_ERROR(g_default_logger, "Failed at parser TCP header, TCP header len %d too small, less than %lu", pktinfo.tcphdr_len, sizeof(struct tcphdr)); tfe_hexdump2file(stderr, "Failed at parsing TCP header, TCP header len too small", raw_payload, (unsigned int)raw_payload_len); goto end; } diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index acdd45d..d3b895e 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -47,6 +48,7 @@ #include #include #include +#include #include /* Breakpad */ @@ -238,7 +240,7 @@ static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg) while (ATOMIC_READ(&(ctx->proxy->make_work_thread_sleep)) > 0) { - TFE_LOG_ERROR(g_default_logger, "recv SIGUSR1, make worker thread %d sleep", ctx->thread_id); + TFE_LOG_ERROR(g_default_logger, "recv SIGUSR1, make worker thread[%d] %d sleep", ctx->thread_id, ctx->readable_tid); sleep(1); } } @@ -290,6 +292,7 @@ static void * tfe_work_thread(void * arg) { struct tfe_thread_ctx * ctx = (struct tfe_thread_ctx *) arg; struct timeval timer_delay = {2, 0}; + ctx->readable_tid = syscall(SYS_gettid); struct event * ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, ctx); if (unlikely(ev == NULL)) @@ -534,12 +537,12 @@ void tfe_proxy_acceptor_init(struct tfe_proxy * proxy, const char * profile) static void usage(char *cmd) { fprintf(stderr, "USAGE: %s [OPTIONS]\n", cmd); - fprintf(stderr, " -v -- show version\n"); - fprintf(stderr, " -g -- generate coredump\n"); - fprintf(stderr, " -h -- show help info\n\n"); + fprintf(stderr, " -v -- show version\n"); + fprintf(stderr, " -g -- generate coredump\n"); + fprintf(stderr, " -h -- show help info\n\n"); fprintf(stderr, "kill -s SIGHUP $pid -- reload zlog configure\n"); fprintf(stderr, "kill -s SIGUSR1 $pid -- make worker thread sleep\n"); - fprintf(stderr, "kill -s SIGUSR2 $pid -- wake worker thread form sleep\n"); + fprintf(stderr, "kill -s SIGUSR2 $pid -- wake worker thread from sleep\n"); } int main(int argc, char * argv[]) @@ -680,6 +683,10 @@ int main(int argc, char * argv[]) g_default_proxy->watchdog_kni = watchdog_kni_create(g_default_proxy, main_profile, g_default_logger); CHECK_OR_EXIT(g_default_proxy->watchdog_kni != NULL, "Failed at creating KNI watchdog, Exit."); + /* Watchdog TFE */ + g_default_proxy->watchdog_tfe = watchdog_tfe_create(g_default_proxy, main_profile, g_default_logger); + CHECK_OR_EXIT(g_default_proxy->watchdog_tfe != NULL, "Failed at creating TFE watchdog, Exit."); + TFE_LOG_ERROR(g_default_logger, "Tango Frontend Engine initialized, Version: %s.", __tfe_version); /* If TFE is run by systemd's notify, then tell the systemd our tfe is ready. diff --git a/platform/src/ssl_stream.cpp b/platform/src/ssl_stream.cpp index 865e623..7b154bb 100644 --- a/platform/src/ssl_stream.cpp +++ b/platform/src/ssl_stream.cpp @@ -2039,7 +2039,7 @@ void ssl_stream_free(struct ssl_stream * s_stream, struct event_base * evbase, s if (s_stream->dir == CONN_DIR_UPSTREAM) { size_t rx_offset_this_time = 0; - int ret = tfe_stream_info_get(s_stream->tcp_stream, INFO_FROM_UPSTREAM_RX_OFFSET, &rx_offset_this_time, sizeof(rx_offset_this_time)); + tfe_stream_info_get(s_stream->tcp_stream, INFO_FROM_UPSTREAM_RX_OFFSET, &rx_offset_this_time, sizeof(rx_offset_this_time)); const char * sni = (s_stream->up_parts.client_hello && s_stream->up_parts.client_hello->sni) ? s_stream->up_parts.client_hello->sni : "null"; TFE_LOG_DEBUG(g_default_logger, "ssl up stream close, rx_offset:%d, sni:%s", rx_offset_this_time, sni); } diff --git a/platform/src/watchdog_kni.cpp b/platform/src/watchdog_kni.cpp index b467ef7..6238760 100644 --- a/platform/src/watchdog_kni.cpp +++ b/platform/src/watchdog_kni.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -202,37 +203,19 @@ retry: void * watchdog_kni_thread(void * arg) { + char thread_name[16]; + snprintf(thread_name, sizeof(thread_name), "watchdog:kni"); + prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL); + struct watchdog_kni * __ctx = (struct watchdog_kni *)arg; while(event_base_dispatch(__ctx->ev_base) >= 0) {} - DIE("watchdog thread is terminated."); -} - -static void health_check_for_thread_worker(evutil_socket_t fd, short what, void * arg) -{ - struct tfe_proxy *proxy = (struct tfe_proxy *)arg; - struct timespec now; - time_t temp; - - clock_gettime(CLOCK_MONOTONIC, &now); - - for (unsigned int i = 0; i < proxy->nr_work_threads; i++) - { - temp = ATOMIC_READ(&(proxy->work_threads[i]->lastime)); - if (temp + 2 + 2 + 1 < now.tv_sec) - { - TFE_LOG_ERROR(g_default_logger, "Watchdog thread nowtime %ld, Worker thread %d lastime %ld, Worker thread no reply, Exit ! ! ! ", now.tv_sec, proxy->work_threads[i]->thread_id, temp); - abort(); - } - // TFE_LOG_DEBUG(g_default_logger, "Watchdog thread nowtime %ld, Worker thread %d lastime %lds ", now.tv_sec, proxy->work_threads[i]->thread_id, temp); - } + DIE("Watchdog KNI thread is terminated."); } struct watchdog_kni * watchdog_kni_create(struct tfe_proxy * proxy, const char * profile, void * logger) { struct watchdog_kni * __ctx = ALLOC(struct watchdog_kni, 1); int ret = 0; - struct event *ev = NULL; - struct timeval timer_delay = {2, 0}; __ctx->proxy = proxy; __ctx->profile = profile; @@ -274,17 +257,6 @@ struct watchdog_kni * watchdog_kni_create(struct tfe_proxy * proxy, const char * errno = 0; goto __errout; } - - ev = event_new(__ctx->ev_base, -1, EV_PERSIST, health_check_for_thread_worker, proxy); - if (unlikely(ev == NULL)) - { - TFE_LOG_ERROR(__ctx->logger, "Failed at creating health check event for worker thread"); - /* after log, reset errno */ - errno = 0; - goto __errout; - } - evtimer_add(ev, &timer_delay); - watchdog_kni_reset(__ctx); watchdog_kni_try_connect(__ctx); @@ -298,7 +270,7 @@ struct watchdog_kni * watchdog_kni_create(struct tfe_proxy * proxy, const char * goto __errout; } - TFE_LOG_INFO(__ctx->logger, "KNI watchdog module init successfully."); + TFE_LOG_INFO(__ctx->logger, "Watchdog KNI module init successfully."); return __ctx; __errout: diff --git a/platform/src/watchdog_tfe.cpp b/platform/src/watchdog_tfe.cpp new file mode 100644 index 0000000..9cbbee6 --- /dev/null +++ b/platform/src/watchdog_tfe.cpp @@ -0,0 +1,152 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +struct watchdog_tfe +{ + struct tfe_proxy *proxy; + struct event_base *ev_base; + pthread_t pthread; + const char *profile; + void *logger; + + unsigned int enable; + unsigned int timeout_seconds; + unsigned int statistics_window; + unsigned int timeout_cnt_as_fail; + unsigned int timeout_debug; + + unsigned int cur_time_window_fail_cnt; + time_t cur_time_window_begin; + time_t cur_time_window_end; +}; + +void *watchdog_tfe_thread(void *arg) +{ + char thread_name[16]; + snprintf(thread_name, sizeof(thread_name), "watchdog:tfe"); + prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL); + + struct watchdog_tfe *__ctx = (struct watchdog_tfe *)arg; + while (event_base_dispatch(__ctx->ev_base) >= 0) + { + } + DIE("Watchdog TFE thread is terminated."); +} + +static void watchdog_tfe_thread_handle(evutil_socket_t fd, short what, void *arg) +{ + struct tfe_proxy *proxy = (struct tfe_proxy *)arg; + struct watchdog_tfe *__ctx = proxy->watchdog_tfe; + struct timespec now; + time_t temp; + + clock_gettime(CLOCK_MONOTONIC, &now); + + if (now.tv_sec > __ctx->cur_time_window_end) + { + __ctx->cur_time_window_begin = now.tv_sec; + __ctx->cur_time_window_end = now.tv_sec + __ctx->statistics_window; + __ctx->cur_time_window_fail_cnt = 0; + } + + for (unsigned int i = 0; i < proxy->nr_work_threads; i++) + { + temp = ATOMIC_READ(&(proxy->work_threads[i]->lastime)); + if (temp + __ctx->timeout_seconds < now.tv_sec) + { + if (__ctx->timeout_debug) + { + TFE_LOG_ERROR(__ctx->logger, "Current timestamp is %ld, Worker thread[%d] tid %d timestamp is %ld, Worker thread timeout, Exit !!!", + now.tv_sec, proxy->work_threads[i]->thread_id, proxy->work_threads[i]->readable_tid, temp); + abort(); + } + else + { + __ctx->cur_time_window_fail_cnt++; + TFE_LOG_ERROR(__ctx->logger, "Current timestamp is %ld, Worker thread[%d] tid %d timestamp is %ld, Worker thread timeout, fail count %d !!!", + now.tv_sec, proxy->work_threads[i]->thread_id, proxy->work_threads[i]->readable_tid, temp, __ctx->cur_time_window_fail_cnt); + if (__ctx->cur_time_window_fail_cnt >= __ctx->timeout_cnt_as_fail) + { + TFE_LOG_ERROR(__ctx->logger, "Frome %ld to %ld, there are %d timeouts of the worker threads, Exit !!!", + __ctx->cur_time_window_begin, __ctx->cur_time_window_end, __ctx->cur_time_window_fail_cnt); + exit(-1); + } + } + } + } +} + +struct watchdog_tfe *watchdog_tfe_create(struct tfe_proxy *proxy, const char *profile, void *logger) +{ + struct watchdog_tfe *__ctx = ALLOC(struct watchdog_tfe, 1); + int ret = 0; + struct event *ev = NULL; + // The worker thread updates the timestamp every two seconds + // The watchdog thread checks the timestamp every second + struct timeval timer_delay = {1, 0}; + + __ctx->proxy = proxy; + __ctx->profile = profile; + __ctx->logger = logger; + + MESA_load_profile_uint_def(profile, "watchdog_tfe", "enable", &(__ctx->enable), 1); + MESA_load_profile_uint_def(profile, "watchdog_tfe", "timeout_seconds", &(__ctx->timeout_seconds), 5); + MESA_load_profile_uint_def(profile, "watchdog_tfe", "statistics_window", &(__ctx->statistics_window), 20); + MESA_load_profile_uint_def(profile, "watchdog_tfe", "timeout_cnt_as_fail", &(__ctx->timeout_cnt_as_fail), 3); + MESA_load_profile_uint_def(profile, "watchdog_tfe", "timeout_debug", &(__ctx->timeout_debug), 0); + + if (!__ctx->enable) + { + return __ctx; + } + + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + __ctx->cur_time_window_begin = now.tv_sec; + __ctx->cur_time_window_end = now.tv_sec + __ctx->statistics_window; + __ctx->cur_time_window_fail_cnt = 0; + + __ctx->ev_base = event_base_new(); + if (!__ctx->ev_base) + { + TFE_LOG_ERROR(__ctx->logger, "Fail to create event base: %s", strerror(errno)); + errno = 0; + goto errout; + } + + ev = event_new(__ctx->ev_base, -1, EV_PERSIST, watchdog_tfe_thread_handle, proxy); + if (unlikely(ev == NULL)) + { + TFE_LOG_ERROR(__ctx->logger, "Fail to create tfe watchdog event"); + errno = 0; + goto errout; + } + evtimer_add(ev, &timer_delay); + + ret = pthread_create(&__ctx->pthread, NULL, watchdog_tfe_thread, (void *)__ctx); + if (unlikely(ret < 0)) + { + TFE_LOG_ERROR(__ctx->logger, "Fail to create tfe watchdog thread: %s", strerror(errno)); + errno = 0; + goto errout; + } + + TFE_LOG_INFO(__ctx->logger, "Watchdog TFE module init successfully."); + return __ctx; + +errout: + return NULL; +}; \ No newline at end of file