适配基于tfe-kmod的连接接收方式

This commit is contained in:
luqiuwen
2019-05-15 16:10:05 +08:00
parent 82844bed19
commit c1ff35bed7
2 changed files with 65 additions and 247 deletions

View File

@@ -47,6 +47,9 @@ do { MESA_handle_runtime_log(handler, RLOG_LV_DEBUG, __FUNCTION__, fmt, ##__VA_A
#define CHECK_OR_EXIT(condition, fmt, ...) \ #define CHECK_OR_EXIT(condition, fmt, ...) \
do { if(!(condition)) { TFE_LOG_ERROR(g_default_logger, fmt, ##__VA_ARGS__); exit(EXIT_FAILURE); } } while(0) \ do { if(!(condition)) { TFE_LOG_ERROR(g_default_logger, fmt, ##__VA_ARGS__); exit(EXIT_FAILURE); } } while(0) \
#define DIE(fmt, ...) \
do { TFE_LOG_ERROR(g_default_logger, "DIE: " fmt, ##__VA_ARGS__); abort(); } while(0) \
#define CHECK_OR_DIE(condition, fmt, ...) \ #define CHECK_OR_DIE(condition, fmt, ...) \
do { if(!(condition)) { TFE_LOG_ERROR(g_default_logger, fmt, ##__VA_ARGS__); abort(); } } while(0) \ do { if(!(condition)) { TFE_LOG_ERROR(g_default_logger, fmt, ##__VA_ARGS__); abort(); } } while(0) \

View File

@@ -7,19 +7,18 @@
#include <sys/errno.h> #include <sys/errno.h>
#include <pthread.h> #include <pthread.h>
#include <assert.h> #include <assert.h>
#include <event2/listener.h> #include <event2/listener.h>
#include <event2/util.h> #include <event2/util.h>
#include <event2/bufferevent.h>
#include <MESA/MESA_prof_load.h> #include <MESA/MESA_prof_load.h>
#include <tfe_stream.h> #include <tfe_stream.h>
#include <kni_acceptor.h> #include <kni_acceptor.h>
#include <proxy.h> #include <proxy.h>
#include <platform.h> #include <platform.h>
#ifndef TFE_CONFIG_KNI_UXDOMAIN_PATH_DEFAULT #ifndef TFE_CONFIG_KNI_SCM_SOCKET_FILE
#define TFE_CONFIG_KNI_UXDOMAIN_PATH_DEFAULT "/var/run/.tfe_kni_acceptor_handler" #define TFE_CONFIG_KNI_SCM_SOCKET_FILE "/var/run/.tfe_kmod_scm_socket"
#endif #endif
/* The KNI and TFE communicate with each other by UNIX-based socket, /* The KNI and TFE communicate with each other by UNIX-based socket,
@@ -49,43 +48,6 @@
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/ */
enum
{
KNI_TLV_MAGIC = 0x4d5a
};
enum KNI_TLV_TYPE
{
KNI_TLV_TYPE_PROTOCOL = 0x0001,
KNI_TLV_TYPE_KEYRING_ID = 0x0002
};
enum KNI_TLV_VALUE
{
KNI_TLV_VALUE_HTTP = 0x1,
KNI_TLV_VALUE_SSL = 0x2,
};
struct kni_tlv_header
{
uint16_t magic;
uint16_t counts;
} __attribute__((__packed__));
struct kni_tlv_info
{
uint16_t type;
uint16_t len;
union
{
uint8_t value_as_raw[0];
uint16_t value_as_uint16[0];
uint32_t value_as_uint32[0];
uint64_t value_as_uint64[0];
};
} __attribute__((__packed__));
struct kni_acceptor struct kni_acceptor
{ {
/* INPUT */ /* INPUT */
@@ -94,110 +56,15 @@ struct kni_acceptor
void * logger; void * logger;
/* CONFIG */ /* CONFIG */
char str_unixdomain_file[TFE_STRING_MAX]; char str_scm_socket_file[TFE_STRING_MAX];
/* PERSIST RUNTIME RESOURCE */ /* PERSIST RUNTIME RESOURCE */
int fd_unixdomain; int fd_scm_socket;
struct event_base * ev_base; struct event_base * ev_base;
struct evconnlistener * ev_listener; struct event * ev_scm_socket;
pthread_t thread; pthread_t thread;
/* PEER CONNECTION RESOUCE
* should close by __kni_conn_close() */
struct event * ev_kni_conn;
int fd_kni_conn;
pid_t pid_kni_conn;
}; };
void __kni_conn_close(struct kni_acceptor * ctx)
{
if (ctx->fd_kni_conn != 0)
{
close(ctx->fd_kni_conn);
ctx->fd_kni_conn = 0;
}
if (ctx->ev_kni_conn != NULL)
{
event_free(ctx->ev_kni_conn);
ctx->ev_kni_conn = NULL;
}
if (ctx->pid_kni_conn != 0) { ctx->pid_kni_conn = 0; }
}
static int __kni_parse_tlv_data(struct kni_acceptor * ctx,
struct tfe_proxy_accept_para * out_para, const char * data, size_t sz_data)
{
const char * __cursor = data;
size_t __left_data_len = sz_data;
/* Parse the STREAM header */
struct kni_tlv_header * tlv_header = (struct kni_tlv_header *) __cursor;
if (__left_data_len < sizeof(struct kni_tlv_header))
{
TFE_LOG_ERROR(ctx->logger, "Invalid KNI TLV header format, length is not enough.");
return -1;
}
if (tlv_header->magic != KNI_TLV_MAGIC)
{
TFE_LOG_ERROR(ctx->logger, "Invalid KNI TLV header magic number %x", tlv_header->magic);
return -2;
}
unsigned int nr_tlv_tuples = tlv_header->counts;
assert(nr_tlv_tuples != 0 && nr_tlv_tuples < 255);
__left_data_len -= sizeof(struct kni_tlv_header);
__cursor += sizeof(struct kni_tlv_header);
for (unsigned int iter_tlv_tuples = 0; iter_tlv_tuples < nr_tlv_tuples; iter_tlv_tuples++)
{
struct kni_tlv_info * tlv_info = (struct kni_tlv_info *) __cursor;
size_t sz_tlv_info = tlv_info->len + sizeof(struct kni_tlv_info);
if (__left_data_len < sz_tlv_info)
{
TFE_LOG_ERROR(ctx->logger, "TLV info declares invalid value length, Too long.");
return -3;
}
switch (tlv_info->type)
{
/* VALUE is uint32_t, length is 4 */
case KNI_TLV_TYPE_PROTOCOL:
{
uint32_t __value = tlv_info->value_as_uint32[0];
if (__value == KNI_TLV_VALUE_HTTP)
{
out_para->session_type = STREAM_PROTO_PLAIN;
}
if (__value == KNI_TLV_VALUE_SSL)
{
out_para->session_type = STREAM_PROTO_SSL;
}
assert(tlv_info->len == sizeof(uint32_t));
break;
}
/* VALUE is uint32_t, length is 4 */
case KNI_TLV_TYPE_KEYRING_ID:
{
uint32_t __value = tlv_info->value_as_uint32[0];
out_para->keyring_id = __value;
assert(tlv_info->len == sizeof(uint32_t));
break;
}
default: assert(0);
}
__cursor += sz_tlv_info;
__left_data_len -= sz_tlv_info;
}
return 0;
}
void __kni_event_cb(evutil_socket_t fd, short what, void * user) void __kni_event_cb(evutil_socket_t fd, short what, void * user)
{ {
struct kni_acceptor * __ctx = (struct kni_acceptor *) user; struct kni_acceptor * __ctx = (struct kni_acceptor *) user;
@@ -216,7 +83,7 @@ void __kni_event_cb(evutil_socket_t fd, short what, void * user)
constexpr static int __TRANS_FDS_MAX = 2; constexpr static int __TRANS_FDS_MAX = 2;
constexpr static int __CONTROLLEN = CMSG_SPACE(__TRANS_FDS_MAX * sizeof(int)); constexpr static int __CONTROLLEN = CMSG_SPACE(__TRANS_FDS_MAX * sizeof(int));
char __buffer[512] = {0}; char __buffer[4096] = {0};
struct iovec __iovec[1]; struct iovec __iovec[1];
struct msghdr __msghdr; struct msghdr __msghdr;
@@ -237,39 +104,37 @@ void __kni_event_cb(evutil_socket_t fd, short what, void * user)
{ {
return; return;
} }
else if (rd < 0) else if (rd <= 0)
{ {
TFE_LOG_ERROR(__ctx->logger, "Failed at recving fds from KNI connection: %s. ", strerror(errno)); TFE_LOG_ERROR(__ctx->logger, "failed at recvmsg from scm socket: %s. ", strerror(errno));
goto __close_kni_connection; goto __die;
}
else if (rd == 0)
{
TFE_LOG_INFO(__ctx->logger, "KNI disconnect (PID = %u). ", __ctx->pid_kni_conn);
goto __close_kni_connection;
} }
__cmsghdr = CMSG_FIRSTHDR(&__msghdr); __cmsghdr = CMSG_FIRSTHDR(&__msghdr);
if (unlikely(__cmsghdr == NULL)) if (unlikely(__cmsghdr == NULL))
{ {
TFE_LOG_ERROR(__ctx->logger, "Failed at fetch CMSG_FIRSTHDR() from incoming fds, close KNI connection."); TFE_LOG_ERROR(__ctx->logger, "failed at fetch CMSG_FIRSTHDR() from incoming fds.");
goto __close_kni_connection; goto __die;
} }
__fds = (int *) (CMSG_DATA(__cmsghdr)); __fds = (int *) (CMSG_DATA(__cmsghdr));
if (unlikely(__fds == NULL)) if (unlikely(__fds == NULL))
{ {
TFE_LOG_ERROR(__ctx->logger, "Failed at fetch CMSG_DATA() from incoming fds, close KNI connection."); TFE_LOG_ERROR(__ctx->logger, "failed at fetch CMSG_DATA() from incoming fds.");
goto __close_kni_connection; goto __die;
} }
#if 0
if (unlikely(__kni_parse_tlv_data(__ctx, &__accept_para, __buffer, (size_t) rd) < 0)) if (unlikely(__kni_parse_tlv_data(__ctx, &__accept_para, __buffer, (size_t) rd) < 0))
{ {
TFE_LOG_ERROR(__ctx->logger, "Failed at parsing TLV format, close KNI connection."); TFE_LOG_ERROR(__ctx->logger, "Failed at parsing TLV format, close KNI connection.");
goto __close_kni_connection; goto __close_kni_connection;
} }
#endif
__accept_para.downstream_fd = __fds[0]; __accept_para.downstream_fd = __fds[0];
__accept_para.upstream_fd = __fds[1]; __accept_para.upstream_fd = __fds[1];
__accept_para.session_type = STREAM_PROTO_SSL;
TFE_PROXY_STAT_INCREASE(STAT_FD_OPEN_BY_KNI_ACCEPT, 2); TFE_PROXY_STAT_INCREASE(STAT_FD_OPEN_BY_KNI_ACCEPT, 2);
if (tfe_proxy_fds_accept(__ctx->proxy, &__accept_para) < 0) if (tfe_proxy_fds_accept(__ctx->proxy, &__accept_para) < 0)
@@ -279,8 +144,9 @@ void __kni_event_cb(evutil_socket_t fd, short what, void * user)
return; return;
__close_kni_connection: __die:
__kni_conn_close(__ctx); DIE("Broken kni scm socket connection, abort.");
return;
__drop_recieved_fds: __drop_recieved_fds:
TFE_PROXY_STAT_INCREASE(STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL, 2); TFE_PROXY_STAT_INCREASE(STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL, 2);
@@ -288,88 +154,26 @@ __drop_recieved_fds:
if (__fds != NULL) evutil_closesocket(__fds[1]); if (__fds != NULL) evutil_closesocket(__fds[1]);
} }
void __kni_listener_accept_cb(struct evconnlistener * listener, evutil_socket_t fd, void * kni_acceptor_event_thread_entry(void * args)
struct sockaddr * sk_addr, int sk_len, void * user)
{
struct kni_acceptor * __ctx = (struct kni_acceptor *) user;
struct event * __event = NULL;
struct ucred __cr{};
socklen_t __cr_len = sizeof(struct ucred);
int ret = 0;
/* There is only one KNI process can connect to TFE.
* If ev_kni_conn is not NULL, there's already a KNI connected to TFE.
* We need to refuse this connection
*/
if (unlikely(__ctx->ev_kni_conn != NULL))
{
TFE_LOG_ERROR(__ctx->logger, "One KNI(PID = %d) has been connected to our program, "
"close this connection", __ctx->pid_kni_conn);
evutil_closesocket(fd);
return;
}
/* Get Peer's PID */
if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, (void *) &__cr, &__cr_len) < 0)
{
TFE_LOG_ERROR(__ctx->logger, "Failed at getsockopt(SO_PEERCRED) for fd %d, close this connection", fd);
goto __close_this_connection;
}
__event = event_new(__ctx->ev_base, fd, EV_READ | EV_PERSIST, __kni_event_cb, __ctx);
if (unlikely(__event == NULL))
{
TFE_LOG_ERROR(__ctx->logger, "Failed at creating event for fd %d, close this connection.", fd);
goto __close_this_connection;
}
ret = event_add(__event, NULL);
if (unlikely(ret < 0))
{
TFE_LOG_ERROR(__ctx->logger, "Failed at adding event to evbase, close this connection. ");
goto __close_this_connection;
}
__ctx->fd_kni_conn = fd;
__ctx->ev_kni_conn = __event;
__ctx->pid_kni_conn = __cr.pid;
TFE_LOG_INFO(__ctx->logger, "KNI connected (PID = %u, fd = %d). ", __ctx->pid_kni_conn, __ctx->fd_kni_conn);
return;
__close_this_connection:
__kni_conn_close(__ctx);
}
void * __kni_listener_thread_entry(void * args)
{ {
struct kni_acceptor * __ctx = (struct kni_acceptor *) args; struct kni_acceptor * __ctx = (struct kni_acceptor *) args;
assert(__ctx != NULL && __ctx->thread == pthread_self()); assert(__ctx != NULL && __ctx->thread == pthread_self());
TFE_LOG_DEBUG(__ctx->logger, "Starting KNI listener thread..."); TFE_LOG_INFO(__ctx->logger, "kni acceptor thread is running.");
event_base_dispatch(__ctx->ev_base); event_base_dispatch(__ctx->ev_base);
TFE_LOG_DEBUG(__ctx->logger, "Stoping KNI listener thread..."); DIE("kni acceptor thread is exited, abort.");
return (void *) NULL;
} }
void kni_acceptor_deinit(struct kni_acceptor * ctx) void kni_acceptor_deinit(struct kni_acceptor * ctx)
{ {
if (ctx != NULL && ctx->ev_listener != NULL)
{
evconnlistener_free(ctx->ev_listener);
}
if (ctx != NULL && ctx->ev_base != NULL) if (ctx != NULL && ctx->ev_base != NULL)
{ {
event_base_free(ctx->ev_base); event_base_free(ctx->ev_base);
} }
if (ctx != NULL && ctx->fd_unixdomain != 0) if (ctx != NULL && ctx->fd_scm_socket != 0)
{ {
close(ctx->fd_unixdomain); close(ctx->fd_scm_socket);
} }
if (ctx != NULL) if (ctx != NULL)
@@ -383,7 +187,7 @@ void kni_acceptor_deinit(struct kni_acceptor * ctx)
struct kni_acceptor * kni_acceptor_init(struct tfe_proxy * proxy, const char * profile, void * logger) struct kni_acceptor * kni_acceptor_init(struct tfe_proxy * proxy, const char * profile, void * logger)
{ {
struct kni_acceptor * __ctx = ALLOC(struct kni_acceptor, 1); struct kni_acceptor * __ctx = ALLOC(struct kni_acceptor, 1);
struct sockaddr_un __sockaddr_un; struct sockaddr_un __sockaddr_un{};
int ret = 0; int ret = 0;
__ctx->proxy = proxy; __ctx->proxy = proxy;
@@ -391,18 +195,11 @@ struct kni_acceptor * kni_acceptor_init(struct tfe_proxy * proxy, const char * p
__ctx->logger = logger; __ctx->logger = logger;
/* Read the unix domain socket file, this file is used to recieve fds from KNI */ /* Read the unix domain socket file, this file is used to recieve fds from KNI */
MESA_load_profile_string_def(profile, "kni", "uxdomain", __ctx->str_unixdomain_file, MESA_load_profile_string_def(profile, "kni", "scm_socket_file", __ctx->str_scm_socket_file,
sizeof(__ctx->str_unixdomain_file), TFE_CONFIG_KNI_UXDOMAIN_PATH_DEFAULT); sizeof(__ctx->str_scm_socket_file), TFE_CONFIG_KNI_SCM_SOCKET_FILE);
if (unlikely(unlink(__ctx->str_unixdomain_file) < 0))
{
TFE_LOG_ERROR(__ctx->logger, "Failed at unlink undomain file %s: %s",
__ctx->str_unixdomain_file, strerror(errno));
goto __errout;
}
__sockaddr_un.sun_family = AF_UNIX; __sockaddr_un.sun_family = AF_UNIX;
strncpy(__sockaddr_un.sun_path, __ctx->str_unixdomain_file, sizeof(__sockaddr_un.sun_path)); strncpy(__sockaddr_un.sun_path, __ctx->str_scm_socket_file, sizeof(__sockaddr_un.sun_path));
/* Create new event base, this event base will be dispatched at separated thread */ /* Create new event base, this event base will be dispatched at separated thread */
__ctx->ev_base = event_base_new(); __ctx->ev_base = event_base_new();
@@ -412,25 +209,43 @@ struct kni_acceptor * kni_acceptor_init(struct tfe_proxy * proxy, const char * p
goto __errout; goto __errout;
} }
/* Create a listener */ __ctx->fd_scm_socket = socket(AF_UNIX, SOCK_DGRAM, 0);
__ctx->ev_listener = evconnlistener_new_bind(__ctx->ev_base, __kni_listener_accept_cb, __ctx, 0, if (unlikely(__ctx->fd_scm_socket < 0))
TFE_CONFIG_BACKLOG_DEFAULT, (struct sockaddr *) (&__sockaddr_un), sizeof(__sockaddr_un)); {
TFE_LOG_ERROR(__ctx->logger, "Failed at create scm socket fd: %s", strerror(errno));
goto __errout;
}
if (unlikely(__ctx->ev_listener == NULL)) ret = connect(__ctx->fd_scm_socket, (struct sockaddr *)&__sockaddr_un, sizeof(__sockaddr_un));
if (unlikely(ret < 0))
{
TFE_LOG_ERROR(__ctx->logger, "Failed at connecting to %s: %s", __sockaddr_un.sun_path, strerror(errno));
goto __errout;
}
__ctx->ev_scm_socket = event_new(__ctx->ev_base, __ctx->fd_scm_socket, EV_READ | EV_PERSIST, __kni_event_cb, __ctx);
if (unlikely(__ctx->ev_scm_socket == NULL))
{
TFE_LOG_ERROR(__ctx->logger, "Failed at setup READ event for scm socket fd %d.", __ctx->fd_scm_socket);
goto __errout;
}
ret = event_add(__ctx->ev_scm_socket, NULL);
if (unlikely(ret < 0))
{
TFE_LOG_ERROR(__ctx->logger, "Failed at adding scm socket event to evbase. ");
goto __errout;
}
/* Create a thread to dispatch ctx->evbase */
ret = pthread_create(&__ctx->thread, NULL, kni_acceptor_event_thread_entry, (void *) __ctx);
if (unlikely(ret < 0))
{ {
TFE_LOG_ERROR(__ctx->logger, "Failed at creating evconnlistener."); TFE_LOG_ERROR(__ctx->logger, "Failed at creating event thread: %s", strerror(errno));
goto __errout; goto __errout;
} }
/* Create a thread to dispatch ctx->evbase */ TFE_LOG_INFO(__ctx->logger, "KNI scm socket file: %s", __ctx->str_scm_socket_file);
ret = pthread_create(&__ctx->thread, NULL, __kni_listener_thread_entry, (void *) __ctx);
if (ret < 0)
{
TFE_LOG_ERROR(__ctx->logger, "Failed at creating listener thread: %s", strerror(errno));
goto __errout;
}
TFE_LOG_INFO(__ctx->logger, "KNI acceptor unixdomain file: %s", __ctx->str_unixdomain_file);
return __ctx; return __ctx;
__errout: __errout: