This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/platform/src/proxy.cpp

245 lines
7.0 KiB
C++
Raw Normal View History

/*
* Proxy engine, built around libevent 2.x.
*/
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/un.h>
#include <assert.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/bufferevent_ssl.h>
#include <event2/buffer.h>
#include <event2/thread.h>
#include <MESA/MESA_handle_logger.h>
#include <tfe_utils.h>
#include <tfe_future.h>
#include <tfe_stream.h>
#include <platform.h>
#include <proxy.h>
#include <kni_acceptor.h>
#include <tcp_stream.h>
#include <MESA/MESA_prof_load.h>
static int signals[] = {SIGTERM, SIGQUIT, SIGHUP, SIGPIPE, SIGUSR1};
/* Global Resource */
void * g_default_logger = NULL;
struct tfe_proxy * g_default_proxy = NULL;
/* Per thread resource */
thread_local unsigned int __currect_thread_id = 0;
thread_local void * __currect_default_logger = NULL;
struct tfe_thread_ctx * tfe_proxy_thread_ctx_acquire(struct tfe_proxy * ctx)
{
unsigned int min_thread_id = 0;
unsigned int min_load = 0;
for(unsigned int tid = 0; tid < ctx->nr_work_threads; tid++)
{
struct tfe_thread_ctx * thread_ctx = ctx->work_threads[tid];
min_thread_id = min_load > thread_ctx->load ? tid : min_thread_id;
min_load = min_load > thread_ctx->load ? thread_ctx->load : min_load;
}
ctx->work_threads[min_thread_id]->load++;
return ctx->work_threads[min_thread_id];
}
void tfe_proxy_thread_ctx_release(struct tfe_thread_ctx * thread_ctx)
{
thread_ctx->load--;
}
int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_para * para)
{
tfe_thread_ctx * worker_thread_ctx = tfe_proxy_thread_ctx_acquire(ctx);
struct tfe_stream * stream = tfe_stream_create(ctx, worker_thread_ctx);
tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, &para->session_type, sizeof(para->session_type));
/* FOR DEBUG */
if (para->passthrough || ctx->tcp_all_passthrough)
{
bool __true = true;
enum tfe_stream_proto __session_type = STREAM_PROTO_PLAIN;
tfe_stream_option_set(stream, TFE_STREAM_OPT_PASSTHROUGH, &__true, sizeof(__true));
tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, &__session_type, sizeof(__session_type));
}
tfe_stream_init_by_fds(stream, para->downstream_fd, para->upstream_fd);
TFE_LOG_DEBUG(ctx->logger, "%p, Fds(downstream = %d, upstream = %d, type = %d) accepted",
stream, para->downstream_fd, para->upstream_fd, para->session_type);
return 0;
}
void tfe_proxy_loopbreak(tfe_proxy * ctx)
{
event_base_loopbreak(ctx->evbase);
}
void tfe_proxy_free(tfe_proxy * ctx)
{
return;
}
static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg)
{
return;
}
static void __signal_handler_cb(evutil_socket_t fd, short what, void * arg)
{
tfe_proxy * ctx = (tfe_proxy *) arg;
switch (fd)
{
case SIGTERM:
case SIGQUIT:
case SIGINT:
case SIGHUP:
break;
case SIGUSR1:
break;
case SIGPIPE:
TFE_LOG_ERROR(ctx->logger, "Warning: Received SIGPIPE; ignoring.\n");
break;
default:
TFE_LOG_ERROR(ctx->logger, "Warning: Received unexpected signal %i\n", fd);
break;
}
}
static void __gc_handler_cb(evutil_socket_t fd, short what, void * arg)
{
tfe_proxy * ctx = (tfe_proxy *) arg;
(void)fd;
(void)what;
}
static void * __thread_ctx_entry(void * arg)
{
struct tfe_thread_ctx * ctx = (struct tfe_thread_ctx *) arg;
struct timeval timer_delay = {60, 0};
struct event * ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, NULL);
if (unlikely(ev == NULL))
{
TFE_LOG_ERROR(g_default_logger, "Failed at creating dummy event for thread %u", ctx->thread_id);
exit(EXIT_FAILURE);
}
evtimer_add(ev, &timer_delay);
ctx->running = 1;
__currect_thread_id = ctx->thread_id;
TFE_LOG_INFO(g_default_logger, "Thread %u is running...", ctx->thread_id);
event_base_dispatch(ctx->evbase);
event_free(ev);
return (void *)NULL;
}
struct tfe_thread_ctx * __thread_ctx_create(struct tfe_proxy * proxy, unsigned int thread_id)
{
struct tfe_thread_ctx * __thread_ctx = ALLOC(struct tfe_thread_ctx, 1);
assert(__thread_ctx != NULL);
__thread_ctx->thread_id = thread_id;
__thread_ctx->evbase = event_base_new();
int ret = pthread_create(&__thread_ctx->thr, NULL, __thread_ctx_entry, (void *)__thread_ctx);
if (unlikely(ret < 0))
{
TFE_LOG_ERROR(proxy->logger, "Failed at pthread_create() for thread %d: %s", strerror(errno));
goto __errout;
}
return __thread_ctx;
__errout:
if (__thread_ctx != NULL && __thread_ctx->evbase != NULL) event_base_free(__thread_ctx->evbase);
if (__thread_ctx != NULL) free(__thread_ctx);
return NULL;
}
int tfe_proxy_config(struct tfe_proxy * proxy, const char * profile)
{
MESA_load_profile_uint_def(profile, "main", "nr_worker_threads", &proxy->nr_work_threads, 1);
MESA_load_profile_uint_def(profile, "debug", "passthrough_all_tcp", &proxy->tcp_all_passthrough, 0);
return 0;
}
#define CHECK_OR_EXIT(condition, fmt, ...) \
do { if(!(condition)) { TFE_LOG_ERROR(g_default_logger, fmt, ##__VA_ARGS__); exit(EXIT_FAILURE); } } while(0) \
int main(int argc, char *argv[])
{
const char* main_profile="./conf/tfe.conf";
g_default_logger = MESA_create_runtime_log_handle("log/tfe.log", RLOG_LV_DEBUG);
if (unlikely(g_default_logger == NULL))
{
TFE_LOG_ERROR(g_default_logger, "Failed at creating default logger: %s", "log/tfe.log");
exit(EXIT_FAILURE);
}
future_promise_library_init();
/* PROXY INSTANCE */
g_default_proxy = ALLOC(struct tfe_proxy, 1);
assert(g_default_proxy);
/* CONFIG */
int ret = tfe_proxy_config(g_default_proxy, main_profile);
CHECK_OR_EXIT(ret == 0, "Failed at loading profile %s, Exit.", main_profile);
/* LOGGER */
g_default_proxy->logger = g_default_logger;
/* MAIN THREAD EVBASE */
g_default_proxy->evbase = event_base_new();
CHECK_OR_EXIT(g_default_proxy->evbase, "Failed at creating evbase for main thread. Exit.");
/* GC EVENT */
g_default_proxy->gcev = event_new(g_default_proxy->evbase, -1, EV_PERSIST, __gc_handler_cb, g_default_proxy);
CHECK_OR_EXIT(g_default_proxy->gcev, "Failed at creating GC event. Exit. ");
/* SSL INIT */
g_default_proxy->ssl_mgr_handler = ssl_manager_init(main_profile, "ssl",
g_default_proxy->evbase, g_default_logger, NULL);
CHECK_OR_EXIT(g_default_proxy->ssl_mgr_handler, "Failed at init SSL manager. Exit.");
/* MODULE INIT */
g_default_proxy->kni_acceptor_handler = kni_acceptor_init(g_default_proxy, main_profile, g_default_logger);
CHECK_OR_EXIT(g_default_proxy->kni_acceptor_handler, "Failed at init KNI acceptor. Exit. ");
struct timeval gc_delay = {60, 0};
evtimer_add(g_default_proxy->gcev , &gc_delay);
/* WORKER THREAD */
for(unsigned tid = 0; tid < g_default_proxy->nr_work_threads; tid++)
{
g_default_proxy->work_threads[tid] = __thread_ctx_create(g_default_proxy, tid);
CHECK_OR_EXIT(g_default_proxy->work_threads[tid], "Failed at creating thread %u", tid);
}
TFE_LOG_ERROR(g_default_logger, "Tango Frontend Engine initialized. ");
event_base_dispatch(g_default_proxy->evbase);
return 0;
}