增加HTTP业务层和缓存运行状态统计。业务层状态输出到tfe.fieldstat,缓存输出到cache.fieldstat。增加缓存开关。
This commit is contained in:
@@ -1,9 +1,13 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
#include <MESA/field_stat2.h>
|
||||||
|
#include <event2/event.h>
|
||||||
|
|
||||||
struct tfe_proxy;
|
struct tfe_proxy;
|
||||||
|
|
||||||
const char * tfe_proxy_default_conffile();
|
const char * tfe_proxy_default_conffile();
|
||||||
const char * tfe_proxy_default_logger();
|
const char * tfe_proxy_default_logger();
|
||||||
unsigned int tfe_proxy_get_thread_count();
|
unsigned int tfe_proxy_get_work_thread_count();
|
||||||
struct event_base * tfe_proxy_get_evbase(unsigned int thread_id);
|
struct event_base * tfe_proxy_get_work_thread_evbase(unsigned int thread_id);
|
||||||
|
struct event_base * tfe_proxy_get_gc_evbase(void);
|
||||||
|
screen_stat_handle_t tfe_proxy_get_fs_handle(void);
|
||||||
|
|
||||||
|
|||||||
@@ -73,9 +73,10 @@ do { MESA_handle_runtime_log(handler, RLOG_LV_DEBUG, "tfe", fmt, ##__VA_ARGS__);
|
|||||||
})
|
})
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define ATOMIC_INC(x) __atomic_fetch_add(x,1,__ATOMIC_RELAXED)
|
#define ATOMIC_INC(x) __atomic_fetch_add(x,1,__ATOMIC_RELAXED)
|
||||||
#define ATOMIC_DEC(x) __atomic_fetch_sub(x,1,__ATOMIC_RELAXED)
|
#define ATOMIC_DEC(x) __atomic_fetch_sub(x,1,__ATOMIC_RELAXED)
|
||||||
#define ATOMIC_READ(x) __atomic_fetch_add(x,0,__ATOMIC_RELAXED)
|
#define ATOMIC_READ(x) __atomic_fetch_add(x,0,__ATOMIC_RELAXED)
|
||||||
|
#define ATOMIC_ADD(x, y) __atomic_fetch_add(x,y,__ATOMIC_RELAXED)
|
||||||
|
|
||||||
|
|
||||||
#ifndef MAX
|
#ifndef MAX
|
||||||
|
|||||||
@@ -8,13 +8,12 @@
|
|||||||
#include <openssl/x509.h>
|
#include <openssl/x509.h>
|
||||||
#include <openssl/x509v3.h>
|
#include <openssl/x509v3.h>
|
||||||
|
|
||||||
#include <MESA/field_stat2.h>
|
|
||||||
|
|
||||||
struct ssl_stream;
|
struct ssl_stream;
|
||||||
|
|
||||||
struct ssl_mgr;
|
struct ssl_mgr;
|
||||||
struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section, struct event_base * ev_base_gc,
|
struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section, struct event_base * ev_base_gc,
|
||||||
void * logger, screen_stat_handle_t fs);
|
void * logger);
|
||||||
void ssl_manager_destroy(struct ssl_mgr * mgr);
|
void ssl_manager_destroy(struct ssl_mgr * mgr);
|
||||||
|
|
||||||
struct ssl_stream * ssl_upstream_create_result_release_stream(future_result_t * result);
|
struct ssl_stream * ssl_upstream_create_result_release_stream(future_result_t * result);
|
||||||
|
|||||||
@@ -194,29 +194,35 @@ static void * __thread_ctx_entry(void * arg)
|
|||||||
return (void *)NULL;
|
return (void *)NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct tfe_thread_ctx * __thread_ctx_create(struct tfe_proxy * proxy, unsigned int thread_id)
|
|
||||||
|
void tfe_proxy_work_thread_create_ctx(struct tfe_proxy * proxy)
|
||||||
{
|
{
|
||||||
struct tfe_thread_ctx * __thread_ctx = ALLOC(struct tfe_thread_ctx, 1);
|
unsigned int i=0;
|
||||||
assert(__thread_ctx != NULL);
|
for(i=0; i<proxy->nr_work_threads;i++)
|
||||||
|
|
||||||
__thread_ctx->thread_id = thread_id;
|
|
||||||
__thread_ctx->evbase = event_base_new();
|
|
||||||
|
|
||||||
int ret = pthread_create(&__thread_ctx->thr, NULL, __thread_ctx_entry, (void *)__thread_ctx);
|
|
||||||
if (unlikely(ret < 0))
|
|
||||||
{
|
{
|
||||||
TFE_LOG_ERROR(proxy->logger, "Failed at pthread_create() for thread %d: %s",errno, strerror(errno));
|
proxy->work_threads[i]=ALLOC(struct tfe_thread_ctx, 1);
|
||||||
goto __errout;
|
proxy->work_threads[i]->thread_id = i;
|
||||||
|
proxy->work_threads[i]->evbase = event_base_new();
|
||||||
}
|
}
|
||||||
|
return;
|
||||||
return __thread_ctx;
|
}
|
||||||
|
int tfe_proxy_work_thread_run(struct tfe_proxy * proxy)
|
||||||
__errout:
|
{
|
||||||
if (__thread_ctx != NULL && __thread_ctx->evbase != NULL) event_base_free(__thread_ctx->evbase);
|
struct tfe_thread_ctx * __thread_ctx=NULL;
|
||||||
if (__thread_ctx != NULL) free(__thread_ctx);
|
unsigned int i=0;
|
||||||
return NULL;
|
int ret=0;
|
||||||
|
for(i=0; i<proxy->nr_work_threads;i++)
|
||||||
|
{
|
||||||
|
__thread_ctx=proxy->work_threads[i];
|
||||||
|
ret = pthread_create(&__thread_ctx->thr, NULL, __thread_ctx_entry, (void *)__thread_ctx);
|
||||||
|
if (unlikely(ret < 0))
|
||||||
|
{
|
||||||
|
TFE_LOG_ERROR(proxy->logger, "Failed at pthread_create() for thread %d, error %d: %s", i, errno, strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tfe_proxy_config(struct tfe_proxy * proxy, const char * profile)
|
int tfe_proxy_config(struct tfe_proxy * proxy, const char * profile)
|
||||||
{
|
{
|
||||||
/* Worker threads */
|
/* Worker threads */
|
||||||
@@ -304,7 +310,7 @@ int main(int argc, char *argv[])
|
|||||||
|
|
||||||
/* SSL INIT */
|
/* SSL INIT */
|
||||||
g_default_proxy->ssl_mgr_handler = ssl_manager_init(main_profile, "ssl",
|
g_default_proxy->ssl_mgr_handler = ssl_manager_init(main_profile, "ssl",
|
||||||
g_default_proxy->evbase, g_default_logger, g_default_proxy->fs_handle);
|
g_default_proxy->evbase, g_default_logger);
|
||||||
CHECK_OR_EXIT(g_default_proxy->ssl_mgr_handler, "Failed at init SSL manager. Exit.");
|
CHECK_OR_EXIT(g_default_proxy->ssl_mgr_handler, "Failed at init SSL manager. Exit.");
|
||||||
|
|
||||||
for (size_t i = 0; i < (sizeof(signals) / sizeof(int)); i++)
|
for (size_t i = 0; i < (sizeof(signals) / sizeof(int)); i++)
|
||||||
@@ -317,15 +323,9 @@ int main(int argc, char *argv[])
|
|||||||
struct timeval gc_delay = {2, 0};
|
struct timeval gc_delay = {2, 0};
|
||||||
evtimer_add(g_default_proxy->gcev , &gc_delay);
|
evtimer_add(g_default_proxy->gcev , &gc_delay);
|
||||||
|
|
||||||
/* WORKER THREAD */
|
/* WORKER THREAD CTX Create */
|
||||||
//TODO: Split ctx_create functioin to create and Run.
|
tfe_proxy_work_thread_create_ctx(g_default_proxy);
|
||||||
for(unsigned tid = 0; tid < g_default_proxy->nr_work_threads; tid++)
|
|
||||||
{
|
|
||||||
g_default_proxy->work_threads[tid] = __thread_ctx_create(g_default_proxy, tid);
|
|
||||||
CHECK_OR_EXIT(g_default_proxy->work_threads[tid], "Failed at creating thread %u", tid);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* ACCEPTOR INIT */
|
/* ACCEPTOR INIT */
|
||||||
g_default_proxy->kni_acceptor_handler = kni_acceptor_init(g_default_proxy, main_profile, g_default_logger);
|
g_default_proxy->kni_acceptor_handler = kni_acceptor_init(g_default_proxy, main_profile, g_default_logger);
|
||||||
CHECK_OR_EXIT(g_default_proxy->kni_acceptor_handler, "Failed at init KNI acceptor. Exit. ");
|
CHECK_OR_EXIT(g_default_proxy->kni_acceptor_handler, "Failed at init KNI acceptor. Exit. ");
|
||||||
@@ -341,21 +341,31 @@ int main(int argc, char *argv[])
|
|||||||
TFE_LOG_INFO(g_default_logger, "Plugin %s initialized. ", plugin_iter->symbol);
|
TFE_LOG_INFO(g_default_logger, "Plugin %s initialized. ", plugin_iter->symbol);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ret=tfe_proxy_work_thread_run(g_default_proxy);
|
||||||
|
CHECK_OR_EXIT(ret==0, "Failed at creating thread. Exit.");
|
||||||
|
|
||||||
TFE_LOG_ERROR(g_default_logger, "Tango Frontend Engine initialized. ");
|
TFE_LOG_ERROR(g_default_logger, "Tango Frontend Engine initialized. ");
|
||||||
event_base_dispatch(g_default_proxy->evbase);
|
event_base_dispatch(g_default_proxy->evbase);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
unsigned int tfe_proxy_get_thread_count(void)
|
unsigned int tfe_proxy_get_work_thread_count(void)
|
||||||
{
|
{
|
||||||
return g_default_proxy->nr_work_threads;
|
return g_default_proxy->nr_work_threads;
|
||||||
}
|
}
|
||||||
struct event_base * tfe_proxy_get_evbase(unsigned int thread_id)
|
struct event_base * tfe_proxy_get_work_thread_evbase(unsigned int thread_id)
|
||||||
{
|
{
|
||||||
assert(thread_id<g_default_proxy->nr_work_threads);
|
assert(thread_id<g_default_proxy->nr_work_threads);
|
||||||
return g_default_proxy->work_threads[thread_id]->evbase;
|
return g_default_proxy->work_threads[thread_id]->evbase;
|
||||||
}
|
}
|
||||||
|
struct event_base * tfe_proxy_get_gc_evbase(void)
|
||||||
|
{
|
||||||
|
return g_default_proxy->evbase;
|
||||||
|
}
|
||||||
|
|
||||||
|
screen_stat_handle_t tfe_proxy_get_fs_handle(void)
|
||||||
|
{
|
||||||
|
return g_default_proxy->fs_handle;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -27,6 +27,7 @@
|
|||||||
#include <tfe_stream.h>
|
#include <tfe_stream.h>
|
||||||
#include <tfe_utils.h>
|
#include <tfe_utils.h>
|
||||||
#include <tfe_future.h>
|
#include <tfe_future.h>
|
||||||
|
#include <tfe_proxy.h>
|
||||||
#include <key_keeper.h>
|
#include <key_keeper.h>
|
||||||
#include <ssl_sess_cache.h>
|
#include <ssl_sess_cache.h>
|
||||||
#include <ssl_utils.h>
|
#include <ssl_utils.h>
|
||||||
@@ -207,6 +208,22 @@ struct fs_spec
|
|||||||
enum ssl_stream_stat id;
|
enum ssl_stream_stat id;
|
||||||
const char* name;
|
const char* name;
|
||||||
};
|
};
|
||||||
|
/*
|
||||||
|
* Garbage collection handler.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ssl_stream_gc_cb(evutil_socket_t fd, short what, void * arg)
|
||||||
|
{
|
||||||
|
struct ssl_mgr *mgr=(struct ssl_mgr *)arg;
|
||||||
|
int i=0;
|
||||||
|
ssl_sess_cache_stat(mgr->up_sess_cache, &(mgr->stat_val[SSL_UP_CACHE_SZ]), &(mgr->stat_val[SSL_UP_CACHE_QUERY]), &(mgr->stat_val[SSL_UP_CACHE_HIT]));
|
||||||
|
ssl_sess_cache_stat(mgr->down_sess_cache, &(mgr->stat_val[SSL_DOWN_CACHE_SZ]), &(mgr->stat_val[SSL_DOWN_CACHE_QUERY]), &(mgr->stat_val[SSL_DOWN_CACHE_HIT]));
|
||||||
|
for(i=0;i<SSL_STAT_MAX;i++)
|
||||||
|
{
|
||||||
|
FS_operate(mgr->fs_handle, mgr->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(mgr->stat_val[i])));
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
void ssl_stat_init(struct ssl_mgr * mgr)
|
void ssl_stat_init(struct ssl_mgr * mgr)
|
||||||
{
|
{
|
||||||
int i=0;
|
int i=0;
|
||||||
@@ -288,6 +305,11 @@ void ssl_stat_init(struct ssl_mgr * mgr)
|
|||||||
FS_CALC_CURRENT,
|
FS_CALC_CURRENT,
|
||||||
"dtkt_hit");
|
"dtkt_hit");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct timeval gc_delay = {0, 500*1000}; //Microseconds, we set 500 miliseconds here.
|
||||||
|
mgr->gcev = event_new(mgr->ev_base_gc, -1, EV_PERSIST, ssl_stream_gc_cb, mgr);
|
||||||
|
evtimer_add(mgr->gcev, &gc_delay);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
static SSL * downstream_ssl_create(struct ssl_mgr * mgr, struct keyring * crt);
|
static SSL * downstream_ssl_create(struct ssl_mgr * mgr, struct keyring * crt);
|
||||||
@@ -424,27 +446,11 @@ void ssl_manager_destroy(struct ssl_mgr * mgr)
|
|||||||
}
|
}
|
||||||
free(mgr);
|
free(mgr);
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
* Garbage collection handler.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
ssl_stream_gc_cb(evutil_socket_t fd, short what, void * arg)
|
|
||||||
{
|
|
||||||
struct ssl_mgr *mgr=(struct ssl_mgr *)arg;
|
|
||||||
int i=0;
|
|
||||||
ssl_sess_cache_stat(mgr->up_sess_cache, &(mgr->stat_val[SSL_UP_CACHE_SZ]), &(mgr->stat_val[SSL_UP_CACHE_QUERY]), &(mgr->stat_val[SSL_UP_CACHE_HIT]));
|
|
||||||
ssl_sess_cache_stat(mgr->down_sess_cache, &(mgr->stat_val[SSL_DOWN_CACHE_SZ]), &(mgr->stat_val[SSL_DOWN_CACHE_QUERY]), &(mgr->stat_val[SSL_DOWN_CACHE_HIT]));
|
|
||||||
for(i=0;i<SSL_STAT_MAX;i++)
|
|
||||||
{
|
|
||||||
FS_operate(mgr->fs_handle, mgr->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(mgr->stat_val[i])));
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section,
|
struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section,
|
||||||
struct event_base * ev_base_gc, void * logger, screen_stat_handle_t fs)
|
struct event_base * ev_base_gc, void * logger)
|
||||||
{
|
{
|
||||||
struct timeval gc_delay = {0, 500*1000}; //Microseconds, we set 500 miliseconds here.
|
|
||||||
unsigned char key_name[]="!mesalab-tfe3a~&";
|
unsigned char key_name[]="!mesalab-tfe3a~&";
|
||||||
unsigned char aes_key_def[]={0xC5,0xAC,0xC1,0xA6,0xB2,0xBB,0xCA,0xC7,0xE3,0xBE,0xE3,0xB2,0xC6,0xA3,0xB1,0xB9
|
unsigned char aes_key_def[]={0xC5,0xAC,0xC1,0xA6,0xB2,0xBB,0xCA,0xC7,0xE3,0xBE,0xE3,0xB2,0xC6,0xA3,0xB1,0xB9
|
||||||
,0xA3,0xAC,0xB6,0xF8,0xCA,0xC7,0xD1,0xDB,0xBE,0xA6,0xC0,0xEF,0xD3,0xD0,0xB9,0x84};
|
,0xA3,0xAC,0xB6,0xF8,0xCA,0xC7,0xD1,0xDB,0xBE,0xA6,0xC0,0xEF,0xD3,0xD0,0xB9,0x84};
|
||||||
@@ -456,7 +462,6 @@ struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section
|
|||||||
char version_str[TFE_SYMBOL_MAX];
|
char version_str[TFE_SYMBOL_MAX];
|
||||||
mgr->logger = logger;
|
mgr->logger = logger;
|
||||||
mgr->ev_base_gc=ev_base_gc;
|
mgr->ev_base_gc=ev_base_gc;
|
||||||
mgr->fs_handle=fs;
|
|
||||||
MESA_load_profile_string_def(ini_profile, section, "ssl_min_version", version_str, sizeof(version_str), "ssl3");
|
MESA_load_profile_string_def(ini_profile, section, "ssl_min_version", version_str, sizeof(version_str), "ssl3");
|
||||||
mgr->ssl_min_version = sslver_str2num(version_str);
|
mgr->ssl_min_version = sslver_str2num(version_str);
|
||||||
MESA_load_profile_string_def(ini_profile, section, "ssl_max_version", version_str, sizeof(version_str), "tls12");
|
MESA_load_profile_string_def(ini_profile, section, "ssl_max_version", version_str, sizeof(version_str), "tls12");
|
||||||
@@ -538,15 +543,10 @@ struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section
|
|||||||
}
|
}
|
||||||
|
|
||||||
memcpy(mgr->ssl_session_context, "mesa-tfe", sizeof(mgr->ssl_session_context));
|
memcpy(mgr->ssl_session_context, "mesa-tfe", sizeof(mgr->ssl_session_context));
|
||||||
mgr->fs_handle=fs;
|
mgr->fs_handle=tfe_proxy_get_fs_handle();
|
||||||
ssl_stat_init(mgr);
|
ssl_stat_init(mgr);
|
||||||
|
|
||||||
mgr->gcev = event_new(mgr->ev_base_gc, -1, EV_PERSIST, ssl_stream_gc_cb, mgr);
|
|
||||||
if (!mgr->gcev)
|
|
||||||
{
|
|
||||||
goto error_out;
|
|
||||||
}
|
|
||||||
evtimer_add(mgr->gcev, &gc_delay);
|
|
||||||
|
|
||||||
return mgr;
|
return mgr;
|
||||||
|
|
||||||
@@ -1490,6 +1490,7 @@ retry:
|
|||||||
|
|
||||||
if (ctx->retries++ >= MAX_NET_RETRIES)
|
if (ctx->retries++ >= MAX_NET_RETRIES)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
struct tfe_stream_addr* addr=tfe_stream_addr_create_by_fd(fd, ctx->s_stream->dir);
|
struct tfe_stream_addr* addr=tfe_stream_addr_create_by_fd(fd, ctx->s_stream->dir);
|
||||||
char* addr_string=tfe_stream_addr_to_str(addr);
|
char* addr_string=tfe_stream_addr_to_str(addr);
|
||||||
TFE_LOG_ERROR(logger, "Failed to shutdown %s SSL connection cleanly: %s "
|
TFE_LOG_ERROR(logger, "Failed to shutdown %s SSL connection cleanly: %s "
|
||||||
@@ -1498,6 +1499,7 @@ retry:
|
|||||||
addr_string, fd);
|
addr_string, fd);
|
||||||
tfe_stream_addr_free(addr);
|
tfe_stream_addr_free(addr);
|
||||||
free(addr_string);
|
free(addr_string);
|
||||||
|
*/
|
||||||
if(ctx->s_stream->dir==CONN_DIR_DOWNSTREAM)
|
if(ctx->s_stream->dir==CONN_DIR_DOWNSTREAM)
|
||||||
{
|
{
|
||||||
ATOMIC_INC(&(mgr->stat_val[SSL_DOWN_DIRTY_CLOSED]));
|
ATOMIC_INC(&(mgr->stat_val[SSL_DOWN_DIRTY_CLOSED]));
|
||||||
|
|||||||
@@ -53,6 +53,17 @@ enum scan_table
|
|||||||
__SCAN_TABLE_MAX
|
__SCAN_TABLE_MAX
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum pangu_http_stat
|
||||||
|
{
|
||||||
|
STAT_SESSION,
|
||||||
|
STAT_LOG_NUM,
|
||||||
|
STAT_ACTION_MONIT,
|
||||||
|
STAT_ACTION_REJECT,
|
||||||
|
STAT_ACTION_REDIRECT,
|
||||||
|
STAT_ACTION_REPLACE,
|
||||||
|
STAT_ACTION_WHITELSIT,
|
||||||
|
__PG_STAT_MAX
|
||||||
|
};
|
||||||
struct pangu_rt
|
struct pangu_rt
|
||||||
{
|
{
|
||||||
Maat_feather_t maat;
|
Maat_feather_t maat;
|
||||||
@@ -64,7 +75,15 @@ struct pangu_rt
|
|||||||
ctemplate::Template * tpl_403, * tpl_404, * tpl_451;
|
ctemplate::Template * tpl_403, * tpl_404, * tpl_451;
|
||||||
char * reject_page;
|
char * reject_page;
|
||||||
int page_size;
|
int page_size;
|
||||||
|
|
||||||
|
int cache_enabled;
|
||||||
struct cache_handle* cache;
|
struct cache_handle* cache;
|
||||||
|
|
||||||
|
screen_stat_handle_t fs_handle;
|
||||||
|
long long stat_val[__PG_STAT_MAX];
|
||||||
|
int fs_id[__PG_STAT_MAX];
|
||||||
|
struct event_base* gc_evbase;
|
||||||
|
struct event* gcev;
|
||||||
};
|
};
|
||||||
struct pangu_rt * g_pangu_rt;
|
struct pangu_rt * g_pangu_rt;
|
||||||
|
|
||||||
@@ -148,13 +167,51 @@ error_out:
|
|||||||
Maat_burn_feather(target);
|
Maat_burn_feather(target);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
static void pangu_http_gc_cb(evutil_socket_t fd, short what, void * arg)
|
||||||
|
{
|
||||||
|
int i=0;
|
||||||
|
|
||||||
|
for(i=0;i<__PG_STAT_MAX;i++)
|
||||||
|
{
|
||||||
|
FS_operate(g_pangu_rt->fs_handle, g_pangu_rt->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(g_pangu_rt->stat_val[i])));
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void pangu_http_stat_init(struct pangu_rt * pangu_runtime)
|
||||||
|
{
|
||||||
|
int i=0;
|
||||||
|
struct timeval gc_delay = {0, 500*1000}; //Microseconds, we set 500 miliseconds here.
|
||||||
|
const char* spec[__PG_STAT_MAX]={0};
|
||||||
|
spec[STAT_SESSION]="http_sess";
|
||||||
|
spec[STAT_LOG_NUM]="log_num";
|
||||||
|
spec[STAT_ACTION_MONIT]="monit";
|
||||||
|
spec[STAT_ACTION_REJECT]="reject";
|
||||||
|
spec[STAT_ACTION_REDIRECT]="redirect";
|
||||||
|
spec[STAT_ACTION_REPLACE]="replace";
|
||||||
|
spec[STAT_ACTION_WHITELSIT]="whitelist";
|
||||||
|
|
||||||
|
for(i=0;i<__PG_STAT_MAX;i++)
|
||||||
|
{
|
||||||
|
if(spec[i]!=NULL)
|
||||||
|
{
|
||||||
|
pangu_runtime->fs_id[i]=FS_register(pangu_runtime->fs_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, spec[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
g_pangu_rt->gcev = event_new(pangu_runtime->gc_evbase, -1, EV_PERSIST, pangu_http_gc_cb, NULL);
|
||||||
|
evtimer_add(g_pangu_rt->gcev, &gc_delay);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int pangu_http_init(struct tfe_proxy * proxy)
|
int pangu_http_init(struct tfe_proxy * proxy)
|
||||||
{
|
{
|
||||||
const char * profile = "./pangu_conf/pangu_pxy.conf";
|
const char * profile = "./pangu_conf/pangu_pxy.conf";
|
||||||
const char * logfile = "./log/pangu_pxy.log";
|
const char * logfile = "./log/pangu_pxy.log";
|
||||||
g_pangu_rt = ALLOC(struct pangu_rt, 1);
|
g_pangu_rt = ALLOC(struct pangu_rt, 1);
|
||||||
g_pangu_rt->thread_num = tfe_proxy_get_thread_count();
|
g_pangu_rt->thread_num = tfe_proxy_get_work_thread_count();
|
||||||
|
g_pangu_rt->gc_evbase=tfe_proxy_get_gc_evbase();
|
||||||
MESA_load_profile_int_def(profile, "DEBUG", "LOG_LEVEL", &(g_pangu_rt->log_level), 0);
|
MESA_load_profile_int_def(profile, "DEBUG", "LOG_LEVEL", &(g_pangu_rt->log_level), 0);
|
||||||
g_pangu_rt->local_logger = MESA_create_runtime_log_handle(logfile, g_pangu_rt->log_level);
|
g_pangu_rt->local_logger = MESA_create_runtime_log_handle(logfile, g_pangu_rt->log_level);
|
||||||
g_pangu_rt->send_logger = pangu_log_handle_create(profile, "LOG", g_pangu_rt->local_logger);
|
g_pangu_rt->send_logger = pangu_log_handle_create(profile, "LOG", g_pangu_rt->local_logger);
|
||||||
@@ -162,6 +219,11 @@ int pangu_http_init(struct tfe_proxy * proxy)
|
|||||||
{
|
{
|
||||||
goto error_out;
|
goto error_out;
|
||||||
}
|
}
|
||||||
|
g_pangu_rt->fs_handle = tfe_proxy_get_fs_handle();
|
||||||
|
pangu_http_stat_init(g_pangu_rt);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
g_pangu_rt->maat = create_maat_feather(profile, "MAAT", g_pangu_rt->thread_num, g_pangu_rt->local_logger);
|
g_pangu_rt->maat = create_maat_feather(profile, "MAAT", g_pangu_rt->thread_num, g_pangu_rt->local_logger);
|
||||||
if (!g_pangu_rt->maat)
|
if (!g_pangu_rt->maat)
|
||||||
{
|
{
|
||||||
@@ -199,7 +261,17 @@ int pangu_http_init(struct tfe_proxy * proxy)
|
|||||||
"./pangu_conf/template/HTTP451.html");
|
"./pangu_conf/template/HTTP451.html");
|
||||||
g_pangu_rt->tpl_451 = ctemplate::Template::GetTemplate(page_path, ctemplate::DO_NOT_STRIP);
|
g_pangu_rt->tpl_451 = ctemplate::Template::GetTemplate(page_path, ctemplate::DO_NOT_STRIP);
|
||||||
|
|
||||||
g_pangu_rt->cache = create_web_cache_handle(profile, "TANGO_CACHE", g_pangu_rt->local_logger);
|
MESA_load_profile_int_def(profile, "TANGO_CACHE", "enable_cache", &(g_pangu_rt->cache_enabled), 1);
|
||||||
|
if(g_pangu_rt->cache_enabled)
|
||||||
|
{
|
||||||
|
g_pangu_rt->cache = create_web_cache_handle(profile, "TANGO_CACHE", g_pangu_rt->gc_evbase, g_pangu_rt->local_logger);
|
||||||
|
if(!g_pangu_rt->cache)
|
||||||
|
{
|
||||||
|
TFE_LOG_INFO(NULL, "Tango Cache init failed.");
|
||||||
|
goto error_out;
|
||||||
|
}
|
||||||
|
TFE_LOG_INFO(NULL, "Tango Cache Enabled.");
|
||||||
|
}
|
||||||
TFE_LOG_INFO(NULL, "Pangu HTTP init success.");
|
TFE_LOG_INFO(NULL, "Pangu HTTP init success.");
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
@@ -207,25 +279,6 @@ error_out:
|
|||||||
TFE_LOG_ERROR(NULL, "Pangu HTTP init failed.");
|
TFE_LOG_ERROR(NULL, "Pangu HTTP init failed.");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
static void _wrap_std_field_write(struct tfe_http_half * half, enum tfe_http_std_field field_id, const char * value)
|
|
||||||
{
|
|
||||||
struct http_field_name tmp_name;
|
|
||||||
tmp_name.field_id = field_id;
|
|
||||||
tmp_name.field_name = NULL;
|
|
||||||
tfe_http_field_write(half, &tmp_name, value);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
#if 0
|
|
||||||
static void _wrap_non_std_field_write(struct tfe_http_half * half, const char* field_name, const char * value)
|
|
||||||
{
|
|
||||||
struct http_field_name tmp_name;
|
|
||||||
tmp_name.field_id=TFE_HTTP_UNKNOWN_FIELD;
|
|
||||||
//todo remove force convert after tfe_http.h improved.
|
|
||||||
tmp_name.field_name=(char*)field_name;
|
|
||||||
tfe_http_field_write(half, &tmp_name, value);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
struct replace_ctx
|
struct replace_ctx
|
||||||
{
|
{
|
||||||
@@ -450,7 +503,7 @@ static void cache_query_on_succ(future_result_t * result, void * user)
|
|||||||
tfe_http_session_resume(ctx->ref_session);
|
tfe_http_session_resume(ctx->ref_session);
|
||||||
|
|
||||||
ctx->cached_response=tfe_http_session_response_create(ctx->ref_session, 200);
|
ctx->cached_response=tfe_http_session_response_create(ctx->ref_session, 200);
|
||||||
tfe_http_nonstd_field_write(ctx->cached_response, "X-Cache-Lookup", "Hit From MESA-TFE");
|
tfe_http_nonstd_field_write(ctx->cached_response, "X-Cache-Lookup", "Hit From TFE");
|
||||||
tfe_http_std_field_write(ctx->cached_response, TFE_HTTP_CONT_TYPE, meta->content_type);
|
tfe_http_std_field_write(ctx->cached_response, TFE_HTTP_CONT_TYPE, meta->content_type);
|
||||||
snprintf(temp, sizeof(temp), "%lu", meta->content_length);
|
snprintf(temp, sizeof(temp), "%lu", meta->content_length);
|
||||||
tfe_http_std_field_write(ctx->cached_response, TFE_HTTP_CONT_LENGTH, temp);
|
tfe_http_std_field_write(ctx->cached_response, TFE_HTTP_CONT_LENGTH, temp);
|
||||||
@@ -469,8 +522,8 @@ static void cache_query_on_succ(future_result_t * result, void * user)
|
|||||||
case CACHE_QUERY_RESULT_END:
|
case CACHE_QUERY_RESULT_END:
|
||||||
assert(ctx->cached_response!=NULL);
|
assert(ctx->cached_response!=NULL);
|
||||||
ctx->cache_query_status=WEB_CACHE_HIT;
|
ctx->cache_query_status=WEB_CACHE_HIT;
|
||||||
tfe_http_half_write_body_end(ctx->cached_response);
|
|
||||||
printf("cache query hit: %s\n", ctx->ref_session->req->req_spec.url);
|
printf("cache query hit: %s\n", ctx->ref_session->req->req_spec.url);
|
||||||
|
tfe_http_half_write_body_end(ctx->cached_response);
|
||||||
//ownership has been transferred to http session, set to NULL.
|
//ownership has been transferred to http session, set to NULL.
|
||||||
ctx->cached_response=NULL;
|
ctx->cached_response=NULL;
|
||||||
assert(ctx->cache_result_actual_sz==ctx->cache_result_declared_sz);
|
assert(ctx->cache_result_actual_sz==ctx->cache_result_declared_sz);
|
||||||
@@ -846,16 +899,26 @@ void enforce_control_policy(const struct tfe_stream * stream, const struct tfe_h
|
|||||||
switch (ctx->action)
|
switch (ctx->action)
|
||||||
{
|
{
|
||||||
case PG_ACTION_NONE:
|
case PG_ACTION_NONE:
|
||||||
|
break;
|
||||||
case PG_ACTION_MONIT:
|
case PG_ACTION_MONIT:
|
||||||
|
ATOMIC_INC(&(g_pangu_rt->stat_val[STAT_ACTION_MONIT]));
|
||||||
//send log on close.
|
//send log on close.
|
||||||
break;
|
break;
|
||||||
case PG_ACTION_REJECT: http_reject(session, events, ctx);
|
case PG_ACTION_REJECT:
|
||||||
|
http_reject(session, events, ctx);
|
||||||
|
ATOMIC_INC(&(g_pangu_rt->stat_val[STAT_ACTION_REJECT]));
|
||||||
break;
|
break;
|
||||||
case PG_ACTION_REDIRECT: http_redirect(session, events, ctx);
|
case PG_ACTION_REDIRECT:
|
||||||
|
http_redirect(session, events, ctx);
|
||||||
|
ATOMIC_INC(&(g_pangu_rt->stat_val[STAT_ACTION_REDIRECT]));
|
||||||
break;
|
break;
|
||||||
case PG_ACTION_REPLACE: http_replace(stream, session, events, body_frag, frag_size, ctx);
|
case PG_ACTION_REPLACE:
|
||||||
|
http_replace(stream, session, events, body_frag, frag_size, ctx);
|
||||||
|
ATOMIC_INC(&(g_pangu_rt->stat_val[STAT_ACTION_REPLACE]));
|
||||||
break;
|
break;
|
||||||
case PG_ACTION_WHITELIST: tfe_http_session_detach(session);
|
case PG_ACTION_WHITELIST:
|
||||||
|
tfe_http_session_detach(session);
|
||||||
|
ATOMIC_INC(&(g_pangu_rt->stat_val[STAT_ACTION_WHITELSIT]));
|
||||||
break;
|
break;
|
||||||
default: assert(0);
|
default: assert(0);
|
||||||
break;
|
break;
|
||||||
@@ -911,6 +974,7 @@ void pangu_on_http_begin(const struct tfe_stream * stream,
|
|||||||
struct ipaddr sapp_addr;
|
struct ipaddr sapp_addr;
|
||||||
int hit_cnt = 0;
|
int hit_cnt = 0;
|
||||||
assert(ctx == NULL);
|
assert(ctx == NULL);
|
||||||
|
ATOMIC_INC(&(g_pangu_rt->stat_val[STAT_SESSION]));
|
||||||
ctx = pangu_http_ctx_new(thread_id);
|
ctx = pangu_http_ctx_new(thread_id);
|
||||||
addr_tfe2sapp(stream->addr, &sapp_addr);
|
addr_tfe2sapp(stream->addr, &sapp_addr);
|
||||||
hit_cnt = Maat_scan_proto_addr(g_pangu_rt->maat, g_pangu_rt->scan_table_id[PXY_CTRL_IP], &sapp_addr, 0,
|
hit_cnt = Maat_scan_proto_addr(g_pangu_rt->maat, g_pangu_rt->scan_table_id[PXY_CTRL_IP], &sapp_addr, 0,
|
||||||
@@ -933,7 +997,7 @@ void pangu_on_http_end(const struct tfe_stream * stream,
|
|||||||
const struct tfe_http_session * session, unsigned int thread_id, void ** pme)
|
const struct tfe_http_session * session, unsigned int thread_id, void ** pme)
|
||||||
{
|
{
|
||||||
struct pangu_http_ctx * ctx = *(struct pangu_http_ctx **) pme;
|
struct pangu_http_ctx * ctx = *(struct pangu_http_ctx **) pme;
|
||||||
int i=0, j=0;
|
int i=0, j=0,ret=0;
|
||||||
if(ctx->action == PG_ACTION_REPLACE && ctx->rep_ctx->actually_replaced==0)
|
if(ctx->action == PG_ACTION_REPLACE && ctx->rep_ctx->actually_replaced==0)
|
||||||
{
|
{
|
||||||
for(i=0; i< ctx->n_enforce; i++)
|
for(i=0; i< ctx->n_enforce; i++)
|
||||||
@@ -957,7 +1021,8 @@ void pangu_on_http_end(const struct tfe_stream * stream,
|
|||||||
struct pangu_log log_msg = {.stream=stream, .http=session, .result=ctx->enforce_rules, .result_num=ctx->n_enforce};
|
struct pangu_log log_msg = {.stream=stream, .http=session, .result=ctx->enforce_rules, .result_num=ctx->n_enforce};
|
||||||
if (ctx->action != PG_ACTION_NONE&& !(ctx->action == PG_ACTION_REPLACE && ctx->n_enforce==1 && ctx->rep_ctx->actually_replaced==0))
|
if (ctx->action != PG_ACTION_NONE&& !(ctx->action == PG_ACTION_REPLACE && ctx->n_enforce==1 && ctx->rep_ctx->actually_replaced==0))
|
||||||
{
|
{
|
||||||
pangu_send_log(g_pangu_rt->send_logger, &log_msg);
|
ret=pangu_send_log(g_pangu_rt->send_logger, &log_msg);
|
||||||
|
ATOMIC_ADD(&(g_pangu_rt->stat_val[STAT_LOG_NUM]), ret);
|
||||||
}
|
}
|
||||||
pangu_http_ctx_free(ctx);
|
pangu_http_ctx_free(ctx);
|
||||||
*pme = NULL;
|
*pme = NULL;
|
||||||
@@ -984,16 +1049,17 @@ void pangu_on_http_data(const struct tfe_stream * stream, const struct tfe_http_
|
|||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if(g_pangu_rt->cache_enabled)
|
||||||
if(events & EV_HTTP_REQ_HDR && !ctx->resume_from_cache_query)
|
|
||||||
{
|
{
|
||||||
cache_query(session, thread_id, ctx);
|
if(events & EV_HTTP_REQ_HDR && !ctx->resume_from_cache_query)
|
||||||
}
|
{
|
||||||
if(!tfe_http_in_request(events))
|
cache_query(session, thread_id, ctx);
|
||||||
{
|
}
|
||||||
cache_update(session, events, body_frag, frag_size, thread_id, ctx);
|
if(!tfe_http_in_request(events))
|
||||||
}
|
{
|
||||||
|
cache_update(session, events, body_frag, frag_size, thread_id, ctx);
|
||||||
|
}
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -21,7 +21,6 @@ struct json_spec
|
|||||||
const char *log_filed_name;
|
const char *log_filed_name;
|
||||||
enum tfe_http_std_field field_id;
|
enum tfe_http_std_field field_id;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct pangu_logger
|
struct pangu_logger
|
||||||
{
|
{
|
||||||
char local_ip_str[TFE_SYMBOL_MAX];
|
char local_ip_str[TFE_SYMBOL_MAX];
|
||||||
@@ -149,6 +148,7 @@ int pangu_send_log(struct pangu_logger* handle, const struct pangu_log* log_msg)
|
|||||||
cJSON *common_obj=NULL, *per_hit_obj=NULL;
|
cJSON *common_obj=NULL, *per_hit_obj=NULL;
|
||||||
char* log_payload=NULL;
|
char* log_payload=NULL;
|
||||||
int kafka_status=0;
|
int kafka_status=0;
|
||||||
|
int send_cnt=0;
|
||||||
time_t cur_time;
|
time_t cur_time;
|
||||||
char src_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0};
|
char src_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0};
|
||||||
char dst_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0};
|
char dst_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0};
|
||||||
@@ -235,8 +235,9 @@ int pangu_send_log(struct pangu_logger* handle, const struct pangu_log* log_msg)
|
|||||||
{
|
{
|
||||||
TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error()));
|
TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error()));
|
||||||
}
|
}
|
||||||
|
send_cnt++;
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON_free(common_obj);
|
cJSON_free(common_obj);
|
||||||
return 0;
|
return send_cnt;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,32 +7,222 @@
|
|||||||
#include <tfe_http.h>
|
#include <tfe_http.h>
|
||||||
#include <tfe_utils.h>
|
#include <tfe_utils.h>
|
||||||
|
|
||||||
|
#include <MESA/MESA_prof_load.h>
|
||||||
|
#include <MESA/field_stat2.h>
|
||||||
|
|
||||||
#include <event2/event.h>
|
#include <event2/event.h>
|
||||||
#include <event2/buffer.h>
|
#include <event2/buffer.h>
|
||||||
|
|
||||||
|
enum cache_stat_field
|
||||||
|
{
|
||||||
|
STAT_CACHE_QUERY,
|
||||||
|
STAT_CACHE_QUERY_NOT_APPLICABLE,
|
||||||
|
STAT_CACHE_QUERY_HIT,
|
||||||
|
STAT_CACHE_QUERY_BYTES,
|
||||||
|
STAT_CACHE_OVERRIDE_QUERY,
|
||||||
|
STAT_CACHE_OVERRIDE_HIT,
|
||||||
|
STAT_CACHE_OVERRIDE_BYTES,
|
||||||
|
STAT_CACHE_QUERY_ERR,
|
||||||
|
STAT_CACHE_UPLOAD_CNT,
|
||||||
|
STAT_CACHE_UPLOAD_OVERRIDE,
|
||||||
|
STAT_CACHE_UPLOAD_FORBIDEN,
|
||||||
|
STAT_CACHE_UPLOAD_ABANDON,
|
||||||
|
STAT_CACHE_UPLOAD_ERR,
|
||||||
|
STAT_CACHE_UPLOAD_BYTES,
|
||||||
|
STAT_CACHE_MEMORY,
|
||||||
|
STAT_CACHE_ACTIVE_SESSION,
|
||||||
|
|
||||||
|
STAT_CACHE_QUERY_HIT_OJB_SIZE,
|
||||||
|
STAT_CACHE_UPLOAD_OBJ_SIZE,
|
||||||
|
STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE,
|
||||||
|
STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE,
|
||||||
|
__CACHE_STAT_MAX
|
||||||
|
};
|
||||||
struct cache_handle
|
struct cache_handle
|
||||||
{
|
{
|
||||||
unsigned int thread_count;
|
unsigned int thread_count;
|
||||||
|
int cache_undefined_obj_enabled;
|
||||||
|
size_t cache_undefined_obj_min_size;
|
||||||
|
int minimum_cache_seconds;
|
||||||
struct tango_cache_instance **clients;
|
struct tango_cache_instance **clients;
|
||||||
|
|
||||||
|
screen_stat_handle_t fs_handle;
|
||||||
|
long long stat_val[__CACHE_STAT_MAX];
|
||||||
|
int fs_id[__CACHE_STAT_MAX];
|
||||||
|
struct event_base* gc_evbase;
|
||||||
|
struct event* gcev;
|
||||||
};
|
};
|
||||||
struct cache_update_context
|
struct cache_update_context
|
||||||
{
|
{
|
||||||
|
struct cache_handle* ref_cache_handle;
|
||||||
struct tango_cache_ctx * write_ctx;
|
struct tango_cache_ctx * write_ctx;
|
||||||
};
|
};
|
||||||
struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, void *logger)
|
static void cache_gc_cb(evutil_socket_t fd, short what, void * arg)
|
||||||
{
|
{
|
||||||
struct cache_handle* handle=ALLOC(struct cache_handle, 1);
|
struct cache_handle* cache=(struct cache_handle *)arg;
|
||||||
handle->thread_count=tfe_proxy_get_thread_count();
|
struct cache_statistics client_stat_sum, client_stat;
|
||||||
handle->clients=ALLOC(struct tango_cache_instance *, handle->thread_count);
|
memset(&client_stat_sum, 0, sizeof(client_stat_sum));
|
||||||
int i=0;
|
long long *val_sum = (long long *)&client_stat_sum;
|
||||||
for(i=0; i<handle->thread_count; i++)
|
long long *val = NULL;
|
||||||
|
int i=0, j=0;
|
||||||
|
for(i=0; i<cache->thread_count;i++)
|
||||||
{
|
{
|
||||||
handle->clients[i]=tango_cache_instance_new(tfe_proxy_get_evbase(i), profile_path, section, logger);
|
tango_cache_get_statistics(cache->clients[i], &client_stat);
|
||||||
|
val=(long long*)&client_stat;
|
||||||
|
for(j=0; j<sizeof(client_stat)/sizeof(long long); j++)
|
||||||
|
{
|
||||||
|
val_sum[j]+=val[j];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return handle;
|
for(i=0;i<__CACHE_STAT_MAX;i++)
|
||||||
|
{
|
||||||
|
if(ATOMIC_READ(&(cache->stat_val[i]))!=0)
|
||||||
|
{
|
||||||
|
switch(i)
|
||||||
|
{
|
||||||
|
case STAT_CACHE_UPLOAD_BYTES:
|
||||||
|
case STAT_CACHE_QUERY_BYTES:
|
||||||
|
case STAT_CACHE_OVERRIDE_BYTES:
|
||||||
|
//translate bytes to mega bytes.
|
||||||
|
FS_operate(cache->fs_handle, cache->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(cache->stat_val[i]))/(1024*1024));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
FS_operate(cache->fs_handle, cache->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(cache->stat_val[i])));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY], 0, FS_OP_SET, client_stat_sum.get_recv_num);
|
||||||
|
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY_HIT], 0, FS_OP_SET, client_stat_sum.get_succ_num);
|
||||||
|
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY_ERR], 0, FS_OP_SET, client_stat_sum.get_error_num);
|
||||||
|
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_UPLOAD_CNT], 0, FS_OP_SET, client_stat_sum.put_recv_num);
|
||||||
|
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_UPLOAD_ERR], 0, FS_OP_SET, client_stat_sum.put_error_num);
|
||||||
|
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_MEMORY], 0, FS_OP_SET, client_stat_sum.memory_used/(1024*1024));
|
||||||
|
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_ACTIVE_SESSION], 0, FS_OP_SET, client_stat_sum.session_num);
|
||||||
|
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_UPLOAD_ABANDON], 0, FS_OP_SET, client_stat_sum.totaldrop_num);
|
||||||
|
FS_passive_output(cache->fs_handle);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
struct cache_stat_sepc
|
||||||
|
{
|
||||||
|
const char* name;
|
||||||
|
enum field_dsp_style_t style;
|
||||||
|
enum field_calc_algo calc_type;
|
||||||
|
};
|
||||||
|
|
||||||
|
static void set_stat_spec(struct cache_stat_sepc* spec, const char* name, enum field_dsp_style_t style, enum field_calc_algo calc_type)
|
||||||
|
{
|
||||||
|
spec->name=name;
|
||||||
|
spec->style=style;
|
||||||
|
spec->calc_type=calc_type;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
void cache_stat_init(struct cache_handle* cache)
|
||||||
|
{
|
||||||
|
const char* fieldstat_output="./cache.fieldstat";
|
||||||
|
const char* app_name="tango_cache";
|
||||||
|
const char* obj_size_bins_KB="10,100,1000,10000";
|
||||||
|
|
||||||
|
int value=0, i=0;
|
||||||
|
screen_stat_handle_t fs_handle=NULL;
|
||||||
|
fs_handle=FS_create_handle();
|
||||||
|
FS_set_para(fs_handle, OUTPUT_DEVICE, fieldstat_output, strlen(fieldstat_output)+1);
|
||||||
|
value=1;
|
||||||
|
FS_set_para(fs_handle, PRINT_MODE, &value, sizeof(value));
|
||||||
|
value=0;
|
||||||
|
FS_set_para(fs_handle, CREATE_THREAD, &value, sizeof(value));
|
||||||
|
FS_set_para(fs_handle, APP_NAME, app_name, strlen(app_name)+1);
|
||||||
|
FS_set_para(fs_handle, HISTOGRAM_GLOBAL_BINS, obj_size_bins_KB, strlen(obj_size_bins_KB)+1);
|
||||||
|
|
||||||
|
cache->fs_handle=fs_handle;
|
||||||
|
|
||||||
|
struct cache_stat_sepc spec[__CACHE_STAT_MAX];
|
||||||
|
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_QUERY], "cache_query",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_QUERY_NOT_APPLICABLE], "qry_not_allow",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_QUERY_HIT], "hit_num",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_QUERY_BYTES], "downloaded(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_QUERY], "or_qry",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_HIT], "or_hit",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_BYTES], "or_download(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_QUERY_ERR], "query_err",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_UPLOAD_CNT], "cache_upload",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_UPLOAD_OVERRIDE], "or_upload",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_UPLOAD_FORBIDEN], "upload_forbid",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_UPLOAD_ABANDON], "upload_abandon",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_UPLOAD_ERR], "upload_err",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_UPLOAD_BYTES], "uploaded(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_MEMORY], "used_mem(MB)",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_ACTIVE_SESSION], "active_sess",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||||
|
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE], "or_hit_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT);
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_QUERY_HIT_OJB_SIZE], "hitted_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT);
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_UPLOAD_OBJ_SIZE], "cached_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT);
|
||||||
|
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE], "or_cached(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT);
|
||||||
|
|
||||||
|
|
||||||
|
for(i=0;i<__CACHE_STAT_MAX;i++)
|
||||||
|
{
|
||||||
|
cache->fs_id[i]=FS_register(cache->fs_handle, spec[i].style, spec[i].calc_type, spec[i].name);
|
||||||
|
}
|
||||||
|
// value=cache->fs_id[STAT_CACHE_QUERY_HIT];
|
||||||
|
// FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value));
|
||||||
|
|
||||||
|
FS_register_ratio(cache->fs_handle,
|
||||||
|
cache->fs_id[STAT_CACHE_QUERY_HIT],
|
||||||
|
cache->fs_id[STAT_CACHE_QUERY],
|
||||||
|
1,
|
||||||
|
FS_STYLE_STATUS,
|
||||||
|
FS_CALC_CURRENT,
|
||||||
|
"cache_hit");
|
||||||
|
|
||||||
|
value=cache->fs_id[STAT_CACHE_OVERRIDE_HIT];
|
||||||
|
FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value));
|
||||||
|
|
||||||
|
FS_register_ratio(cache->fs_handle,
|
||||||
|
cache->fs_id[STAT_CACHE_OVERRIDE_HIT],
|
||||||
|
cache->fs_id[STAT_CACHE_OVERRIDE_QUERY],
|
||||||
|
1,
|
||||||
|
FS_STYLE_STATUS,
|
||||||
|
FS_CALC_CURRENT,
|
||||||
|
"override_hit");
|
||||||
|
|
||||||
|
|
||||||
|
FS_start(cache->fs_handle);
|
||||||
|
|
||||||
|
struct timeval gc_delay = {0, 500*1000}; //Microseconds, we set 500 miliseconds here.
|
||||||
|
cache->gcev = event_new(cache->gc_evbase, -1, EV_PERSIST, cache_gc_cb, cache);
|
||||||
|
evtimer_add(cache->gcev, &gc_delay);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, struct event_base* gc_evbase, void *logger)
|
||||||
|
{
|
||||||
|
struct cache_handle* cache=ALLOC(struct cache_handle, 1);
|
||||||
|
int temp=0;
|
||||||
|
cache->thread_count=tfe_proxy_get_work_thread_count();
|
||||||
|
cache->clients=ALLOC(struct tango_cache_instance *, cache->thread_count);
|
||||||
|
int i=0;
|
||||||
|
for(i=0; i<cache->thread_count; i++)
|
||||||
|
{
|
||||||
|
cache->clients[i]=tango_cache_instance_new(tfe_proxy_get_work_thread_evbase(i), profile_path, section, logger);
|
||||||
|
if(cache->clients[i]==NULL)
|
||||||
|
{
|
||||||
|
goto error_out;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
MESA_load_profile_int_def(profile_path, section, "cache_undefined_obj", &(cache->cache_undefined_obj_enabled), 1);
|
||||||
|
MESA_load_profile_int_def(profile_path, section, "cached_undefined_obj_minimum_size", &(temp), 100*1024);
|
||||||
|
cache->cache_undefined_obj_min_size=temp;
|
||||||
|
MESA_load_profile_int_def(profile_path, section, "cache_minimum_time_override", &(cache->minimum_cache_seconds), 60*5);
|
||||||
|
|
||||||
|
cache->gc_evbase=gc_evbase;
|
||||||
|
cache_stat_init(cache);
|
||||||
|
return cache;
|
||||||
|
error_out:
|
||||||
|
free(cache);
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static char* read_http1_hdr(const char* hdr, const char* field_name)
|
static char* read_http1_hdr(const char* hdr, const char* field_name)
|
||||||
@@ -99,18 +289,85 @@ size_t cache_query_result_get_data(future_result_t * result, const unsigned char
|
|||||||
*pp_data=(const unsigned char*)cache_result->data_frag;
|
*pp_data=(const unsigned char*)cache_result->data_frag;
|
||||||
return cache_result->size;
|
return cache_result->size;
|
||||||
}
|
}
|
||||||
|
struct cache_query_context
|
||||||
|
{
|
||||||
|
struct cache_handle* ref_handle;
|
||||||
|
int is_undefined_obj;
|
||||||
|
struct future* f_tango_cache_fetch;
|
||||||
|
};
|
||||||
|
void cache_query_ctx_free_cb(void* p)
|
||||||
|
{
|
||||||
|
struct cache_query_context* ctx=(struct cache_query_context*)p;
|
||||||
|
future_destroy(ctx->f_tango_cache_fetch);
|
||||||
|
ctx->f_tango_cache_fetch=NULL;
|
||||||
|
free(ctx);
|
||||||
|
}
|
||||||
|
static void wrap_cache_query_on_succ(future_result_t * result, void * user)
|
||||||
|
{
|
||||||
|
struct promise * p = (struct promise *) user;
|
||||||
|
struct cache_query_context* ctx=(struct cache_query_context*)promise_get_ctx(p);
|
||||||
|
struct tango_cache_result* _result=tango_cache_read_result(result);
|
||||||
|
enum cache_query_result_type type=cache_query_result_get_type(result);
|
||||||
|
|
||||||
|
switch(_result->type)
|
||||||
|
{
|
||||||
|
case RESULT_TYPE_HEADER:
|
||||||
|
if(ctx->is_undefined_obj)
|
||||||
|
{
|
||||||
|
ATOMIC_INC(&(ctx->ref_handle->stat_val[STAT_CACHE_OVERRIDE_HIT]));
|
||||||
|
FS_operate(ctx->ref_handle->fs_handle, ctx->ref_handle->fs_id[STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE], 0, FS_OP_SET, _result->tlength/1024);
|
||||||
|
}
|
||||||
|
ATOMIC_INC(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERY_HIT]));
|
||||||
|
FS_operate(ctx->ref_handle->fs_handle, ctx->ref_handle->fs_id[STAT_CACHE_QUERY_HIT_OJB_SIZE], 0, FS_OP_SET, _result->tlength/1024);
|
||||||
|
break;
|
||||||
|
case RESULT_TYPE_END:
|
||||||
|
case RESULT_TYPE_MISS:
|
||||||
|
//last call.
|
||||||
|
promise_dettach_ctx(p);
|
||||||
|
cache_query_ctx_free_cb(ctx);
|
||||||
|
break;
|
||||||
|
case RESULT_TYPE_BODY:
|
||||||
|
ATOMIC_ADD(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERY_BYTES]), _result->size);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
promise_success(p, result);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
static void wrap_cache_query_on_fail(enum e_future_error err, const char * what, void * user)
|
||||||
|
{
|
||||||
|
struct promise * p = (struct promise *) user;
|
||||||
|
struct cache_query_context* ctx=(struct cache_query_context*)promise_dettach_ctx(p);
|
||||||
|
promise_failed(p, err, what);
|
||||||
|
cache_query_ctx_free_cb(ctx);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsigned int thread_id,
|
enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsigned int thread_id,
|
||||||
const struct tfe_http_half * request, struct future* f)
|
const struct tfe_http_half * request, struct future* f)
|
||||||
{
|
{
|
||||||
struct request_freshness req_fresshness;
|
struct request_freshness req_fresshness;
|
||||||
enum cache_pending_action get_action;
|
enum cache_pending_action get_action;
|
||||||
int ret=0;
|
struct cache_query_context* query_ctx=NULL;
|
||||||
|
struct promise* p=NULL;
|
||||||
|
struct future* _f=NULL;
|
||||||
|
int ret=0, is_undefined_obj=0;
|
||||||
get_action=tfe_cache_get_pending(request, &req_fresshness);
|
get_action=tfe_cache_get_pending(request, &req_fresshness);
|
||||||
switch(get_action)
|
switch(get_action)
|
||||||
{
|
{
|
||||||
case UNDEFINED:
|
case UNDEFINED:
|
||||||
case FORBIDDEN:
|
if(!handle->cache_undefined_obj_enabled)
|
||||||
|
{
|
||||||
|
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_NOT_APPLICABLE]));
|
||||||
|
return WEB_CACHE_NOT_APPLICABLE;
|
||||||
|
}
|
||||||
|
is_undefined_obj=1;
|
||||||
|
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_OVERRIDE_QUERY]));
|
||||||
|
break;
|
||||||
|
case FORBIDDEN:
|
||||||
|
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_NOT_APPLICABLE]));
|
||||||
return WEB_CACHE_NOT_APPLICABLE;
|
return WEB_CACHE_NOT_APPLICABLE;
|
||||||
case VERIFY:
|
case VERIFY:
|
||||||
case ALLOWED:
|
case ALLOWED:
|
||||||
@@ -124,7 +381,13 @@ enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsig
|
|||||||
memset(&meta, 0, sizeof(meta));
|
memset(&meta, 0, sizeof(meta));
|
||||||
meta.url=request->req_spec.url;
|
meta.url=request->req_spec.url;
|
||||||
memcpy(&(meta.get), &req_fresshness, sizeof(meta.get));
|
memcpy(&(meta.get), &req_fresshness, sizeof(meta.get));
|
||||||
ret=tango_cache_fetch_object(handle->clients[thread_id], f, &meta);
|
query_ctx=ALLOC(struct cache_query_context, 1);
|
||||||
|
query_ctx->ref_handle=handle;
|
||||||
|
query_ctx->is_undefined_obj=is_undefined_obj;
|
||||||
|
p=future_to_promise(f);
|
||||||
|
promise_set_ctx(p, query_ctx, cache_query_ctx_free_cb);
|
||||||
|
query_ctx->f_tango_cache_fetch=future_create("wrap_cache_qry", wrap_cache_query_on_succ, wrap_cache_query_on_fail, p);
|
||||||
|
ret=tango_cache_fetch_object(handle->clients[thread_id], query_ctx->f_tango_cache_fetch, &meta);
|
||||||
assert(ret==0);
|
assert(ret==0);
|
||||||
return WEB_CACHE_QUERING;
|
return WEB_CACHE_QUERING;
|
||||||
}
|
}
|
||||||
@@ -138,24 +401,28 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle,
|
|||||||
struct tango_cache_ctx *write_ctx=NULL;
|
struct tango_cache_ctx *write_ctx=NULL;
|
||||||
char buffer[TFE_STRING_MAX];
|
char buffer[TFE_STRING_MAX];
|
||||||
const char* value=NULL;
|
const char* value=NULL;
|
||||||
int i=0;
|
int i=0, is_undefined_obj=0;
|
||||||
size_t content_len=0;
|
size_t content_len=0;
|
||||||
if(session->resp->resp_spec.content_length!=NULL)
|
if(!session->resp->resp_spec.content_length)
|
||||||
{
|
{
|
||||||
sscanf(session->resp->resp_spec.content_length, "%lu", &content_len);
|
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_FORBIDEN]));
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
sscanf(session->resp->resp_spec.content_length, "%lu", &content_len);
|
||||||
put_action=tfe_cache_put_pending(session->resp, &resp_freshness);
|
put_action=tfe_cache_put_pending(session->resp, &resp_freshness);
|
||||||
switch(put_action){
|
switch(put_action){
|
||||||
case FORBIDDEN:
|
case FORBIDDEN:
|
||||||
case VERIFY:
|
case VERIFY:
|
||||||
|
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_FORBIDEN]));
|
||||||
return NULL;
|
return NULL;
|
||||||
case ALLOWED:
|
case ALLOWED:
|
||||||
break;
|
break;
|
||||||
case UNDEFINED:
|
case UNDEFINED:
|
||||||
if(content_len<100*1024)
|
if(handle->cache_undefined_obj_enabled && content_len<handle->cache_undefined_obj_min_size)
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
is_undefined_obj=1;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
assert(0);
|
assert(0);
|
||||||
@@ -170,19 +437,26 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle,
|
|||||||
i++;
|
i++;
|
||||||
memcpy(&meta.put, &resp_freshness, sizeof(resp_freshness));
|
memcpy(&meta.put, &resp_freshness, sizeof(resp_freshness));
|
||||||
write_ctx=tango_cache_update_start(handle->clients[thread_id], NULL, &meta);
|
write_ctx=tango_cache_update_start(handle->clients[thread_id], NULL, &meta);
|
||||||
if(write_ctx==NULL)
|
if(write_ctx==NULL)//exceed maximum cache memory size.
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
if(is_undefined_obj)
|
||||||
|
{
|
||||||
|
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_OVERRIDE]));
|
||||||
|
FS_operate(handle->fs_handle,handle->fs_id[STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE], 0, FS_OP_SET, content_len/1024);
|
||||||
|
}
|
||||||
|
FS_operate(handle->fs_handle,handle->fs_id[STAT_CACHE_UPLOAD_OBJ_SIZE], 0, FS_OP_SET, content_len/1024);
|
||||||
update_ctx=ALLOC(struct cache_update_context, 1);
|
update_ctx=ALLOC(struct cache_update_context, 1);
|
||||||
update_ctx->write_ctx=write_ctx;
|
update_ctx->write_ctx=write_ctx;
|
||||||
|
update_ctx->ref_cache_handle=handle;
|
||||||
return update_ctx;
|
return update_ctx;
|
||||||
|
|
||||||
}
|
}
|
||||||
void web_cache_update(struct cache_update_context* ctx, const unsigned char * body_frag, size_t frag_size)
|
void web_cache_update(struct cache_update_context* ctx, const unsigned char * body_frag, size_t frag_size)
|
||||||
{
|
{
|
||||||
tango_cache_update_frag_data(ctx->write_ctx, (const char*)body_frag, frag_size);
|
tango_cache_update_frag_data(ctx->write_ctx, (const char*)body_frag, frag_size);
|
||||||
|
ATOMIC_ADD(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOAD_BYTES]), frag_size);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
void web_cache_update_end(struct cache_update_context* ctx)
|
void web_cache_update_end(struct cache_update_context* ctx)
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ enum cache_query_status
|
|||||||
WEB_CACHE_HIT
|
WEB_CACHE_HIT
|
||||||
};
|
};
|
||||||
struct cache_handle;
|
struct cache_handle;
|
||||||
struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, void *logger);
|
struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, struct event_base* gc_evbase, void *logger);
|
||||||
struct cached_meta
|
struct cached_meta
|
||||||
{
|
{
|
||||||
size_t content_length;
|
size_t content_length;
|
||||||
|
|||||||
Reference in New Issue
Block a user