diff --git a/cache/CMakeLists.txt b/cache/CMakeLists.txt index dea93d4..98e4e08 100644 --- a/cache/CMakeLists.txt +++ b/cache/CMakeLists.txt @@ -2,4 +2,4 @@ add_library(tango-cache-client src/cache_evbase_client.cpp src/ src/tango_cache_ target_link_libraries(tango-cache-client http) target_include_directories(tango-cache-client PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include) target_link_libraries(tango-cache-client libevent-static openssl-crypto-static openssl-ssl-static libxml2-static libcurl-static) -target_link_libraries(tango-cache-client MESA_handle_logger MESA_htable MESA_prof_load wiredcfg) +target_link_libraries(tango-cache-client MESA_handle_logger MESA_htable MESA_prof_load wiredLB) diff --git a/cache/src/tango_cache_client.cpp b/cache/src/tango_cache_client.cpp index 7e24969..2038b35 100644 --- a/cache/src/tango_cache_client.cpp +++ b/cache/src/tango_cache_client.cpp @@ -699,6 +699,7 @@ static int wired_load_balancer_init(struct tango_cache_instance *instance) MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_create failed.\n"); return -1; } + wiredLB_set_opt(instance->wiredlb, WLB_OPT_HEALTH_CHECK_PORT, &instance->wiredlb_ha_port, sizeof(instance->wiredlb_ha_port)); wiredLB_set_opt(instance->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &instance->wiredlb_override, sizeof(instance->wiredlb_override)); wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_DATACENTER, instance->wiredlb_datacenter, strlen(instance->wiredlb_datacenter)+1); if(instance->wiredlb_override) @@ -751,6 +752,9 @@ static int load_local_configure(struct tango_cache_instance *instance, const cha MESA_load_profile_string_def(profile_path, section, "WIREDLB_GROUP", instance->wiredlb_group, 64, "KAZAKHSTAN"); MESA_load_profile_string_def(profile_path, section, "WIREDLB_DATACENTER", instance->wiredlb_datacenter, 64, "ASTANA"); MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", &instance->wiredlb_override, 1); + + MESA_load_profile_uint_def(profile_path, section, "WIREDLB_HEALTH_PORT", &intval, 52100); + instance->wiredlb_ha_port=(unsigned short)intval; return 0; } diff --git a/cache/src/tango_cache_client_in.h b/cache/src/tango_cache_client_in.h index 9e855bf..12a2fee 100644 --- a/cache/src/tango_cache_client_in.h +++ b/cache/src/tango_cache_client_in.h @@ -54,6 +54,7 @@ struct tango_cache_instance char wiredlb_datacenter[64]; u_int32_t minio_port; u_int32_t wiredlb_override; + u_int16_t wiredlb_ha_port; struct event_base* evbase; struct event timer_event; struct cache_statistics statistic; diff --git a/cache/test/pangu_tg_cahce.conf b/cache/test/pangu_tg_cahce.conf index 91579ad..a193059 100644 --- a/cache/test/pangu_tg_cahce.conf +++ b/cache/test/pangu_tg_cahce.conf @@ -1,24 +1,22 @@ [TANGO_CACHE] -#MINIO IP地址,目前只支持一个 +#Address of MINIO Servers MINIO_IP_LIST=192.168.10.61-64; MINIO_LISTEN_PORT=9000 -#每个域名最多开启的链接数 MAX_CONNECTION_PER_HOST=10 -#bucket的名称 CACHE_BUCKET_NAME=openbucket -#缓存最大占用的内存空间大小,超出空间时上传失败 +#Upload failed when exceed max memory constraintt MAX_USED_MEMORY_SIZE_MB=5120 -#上传时Expires头部的过期时间,单位秒,最小60(1分钟) +#Expire second of Upload header, 60s minimum. CACHE_DEFAULT_TTL_SECOND=3600 -#是否对对象的名称进行哈希,开启哈希有助于提高上传下载的速率 +#Hash object name to speedup query. CACHE_OBJECT_KEY_HASH_SWITCH=0 -#WIRED LOAD BALANCER配置 +#For WIRED LOAD BALANCER #WIREDLB_OVERRIDE=1 #WIREDLB_TOPIC= #WIREDLB_GROUP= diff --git a/platform/CMakeLists.txt b/platform/CMakeLists.txt index 0aef968..a617d14 100644 --- a/platform/CMakeLists.txt +++ b/platform/CMakeLists.txt @@ -3,7 +3,7 @@ add_executable(tfe src/key_keeper.cpp src/kni_acceptor.cpp src/ssl_stream.cpp sr target_include_directories(tfe PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include/external) target_include_directories(tfe PRIVATE ${CMAKE_CURRENT_LIST_DIR}/include/internal) -target_link_libraries(tfe common) +target_link_libraries(tfe common tango-cache-client) target_link_libraries(tfe pthread dl openssl-ssl-static openssl-crypto-static diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index 68c0472..e986594 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -22,6 +22,8 @@ #include #include +#include + #include #include #include @@ -305,16 +307,6 @@ int main(int argc, char *argv[]) g_default_proxy->evbase, g_default_logger, g_default_proxy->fs_handle); CHECK_OR_EXIT(g_default_proxy->ssl_mgr_handler, "Failed at init SSL manager. Exit."); - /* PLUGIN INIT */ - unsigned int plugin_iterator = 0; - for(struct tfe_plugin * plugin_iter = tfe_plugin_iterate(&plugin_iterator); - plugin_iter != NULL; plugin_iter = tfe_plugin_iterate(&plugin_iterator)) - { - ret = plugin_iter->on_init(g_default_proxy); - CHECK_OR_EXIT(ret >= 0, "Plugin %s init failed. Exit. ", plugin_iter->symbol); - TFE_LOG_INFO(g_default_logger, "Plugin %s initialized. ", plugin_iter->symbol); - } - for (size_t i = 0; i < (sizeof(signals) / sizeof(int)); i++) { g_default_proxy->sev[i] = evsignal_new(g_default_proxy->evbase, signals[i], __signal_handler_cb, g_default_proxy); @@ -336,6 +328,17 @@ int main(int argc, char *argv[]) g_default_proxy->kni_acceptor_handler = kni_acceptor_init(g_default_proxy, main_profile, g_default_logger); CHECK_OR_EXIT(g_default_proxy->kni_acceptor_handler, "Failed at init KNI acceptor. Exit. "); + /* PLUGIN INIT */ + unsigned int plugin_iterator = 0; + for(struct tfe_plugin * plugin_iter = tfe_plugin_iterate(&plugin_iterator); + plugin_iter != NULL; plugin_iter = tfe_plugin_iterate(&plugin_iterator)) + { + ret = plugin_iter->on_init(g_default_proxy); + CHECK_OR_EXIT(ret >= 0, "Plugin %s init failed. Exit. ", plugin_iter->symbol); + TFE_LOG_INFO(g_default_logger, "Plugin %s initialized. ", plugin_iter->symbol); + } + + TFE_LOG_ERROR(g_default_logger, "Tango Frontend Engine initialized. "); event_base_dispatch(g_default_proxy->evbase); diff --git a/plugin/business/pangu-http/conf/pangu_pxy.conf b/plugin/business/pangu-http/conf/pangu_pxy.conf index 0424123..080e7e3 100644 --- a/plugin/business/pangu-http/conf/pangu_pxy.conf +++ b/plugin/business/pangu-http/conf/pangu_pxy.conf @@ -7,7 +7,7 @@ KAFKA_BROKERLIST=192.168.10.73:9092 [MAAT] # 0:json 1: redis 2: iris -MAAT_INPUT_MODE=0 +MAAT_INPUT_MODE=1 TABLE_INFO=./pangu_conf/table_info.conf JSON_CFG_FILE=./pangu_conf/pangu_ctrl.json STAT_FILE=./log/pangu_scan.status @@ -17,3 +17,29 @@ MAAT_REDIS_SERVER=192.168.11.243 MAAT_REDIS_PORT=6379 MAAT_REDIS_DB_INDEX=4 EFFECT_INTERVAL_S=1 + +[TANGO_CACHE] +[TANGO_CACHE] +#Address of MINIO Servers +MINIO_IP_LIST=192.168.10.61-64; +MINIO_LISTEN_PORT=9000 + +MAX_CONNECTION_PER_HOST=10 + +CACHE_BUCKET_NAME=openbucket + +#Upload failed when exceed max memory constraintt +MAX_USED_MEMORY_SIZE_MB=5120 + +#Expire second of Upload header, 60s minimum. +CACHE_DEFAULT_TTL_SECOND=3600 + +#Hash object name to speedup query. +CACHE_OBJECT_KEY_HASH_SWITCH=0 + +#For WIRED LOAD BALANCER +#WIREDLB_OVERRIDE=1 +#WIREDLB_TOPIC= +#WIREDLB_GROUP= +#WIREDLB_DATACENTER= + diff --git a/plugin/business/pangu-http/pangu_http.cpp b/plugin/business/pangu-http/pangu_http.cpp index e93152b..52d5738 100644 --- a/plugin/business/pangu-http/pangu_http.cpp +++ b/plugin/business/pangu-http/pangu_http.cpp @@ -1,5 +1,6 @@ #include "pangu_logger.h" #include "pattern_replace.h" +#include "pangu_web_cache.h" #include #include @@ -198,7 +199,7 @@ int pangu_http_init(struct tfe_proxy * proxy) "./pangu_conf/template/HTTP451.html"); g_pangu_rt->tpl_451 = ctemplate::Template::GetTemplate(page_path, ctemplate::DO_NOT_STRIP); - g_pangu_rt->cache = create_web_cache_handle(profile, "CACHE_SERVER", g_pangu_rt->local_logger); + g_pangu_rt->cache = create_web_cache_handle(profile, "TANGO_CACHE", g_pangu_rt->local_logger); TFE_LOG_INFO(NULL, "Pangu HTTP init success."); return 0; @@ -287,7 +288,7 @@ static struct pangu_http_ctx * pangu_http_ctx_new(unsigned int thread_id) static void pangu_http_ctx_free(struct pangu_http_ctx * ctx) { - if (!ctx->rep_ctx) + if (ctx->rep_ctx) { http_repl_ctx_free(ctx->rep_ctx); ctx->rep_ctx = NULL; @@ -297,26 +298,26 @@ static void pangu_http_ctx_free(struct pangu_http_ctx * ctx) Maat_clean_status(&(ctx->mid)); ctx->mid = NULL; - if(!ctx->sp) + if(ctx->sp) { Maat_stream_scan_string_end(&(ctx->sp)); } - if(!ctx->cache_update_ctx) + if(ctx->cache_update_ctx) { web_cache_update_end(ctx->cache_update_ctx); ctx->cache_update_ctx=NULL; } - if(!ctx->cached_header) + if(ctx->cached_header) { cache_query_free_meta(ctx->cached_header); ctx->cached_header=NULL; } - if(!ctx->cached_body) + if(ctx->cached_body) { evbuffer_free(ctx->cached_body); ctx->cached_body=NULL; } - if(!ctx->f_cache_query) + if(ctx->f_cache_query) { future_destroy(ctx->f_cache_query); ctx->f_cache_query=NULL; @@ -811,6 +812,7 @@ void enforce_control_policy(const struct tfe_stream * stream, const struct tfe_h } switch (ctx->action) { + case PG_ACTION_NONE: case PG_ACTION_MONIT: //send log on close. break; @@ -829,12 +831,12 @@ void enforce_control_policy(const struct tfe_stream * stream, const struct tfe_h } void cache_query(const struct tfe_http_session * session, unsigned int thread_id, struct pangu_http_ctx * ctx) { - ctx->ref_session=session; ctx->f_cache_query=future_create("cache_query", cache_query_on_succ, cache_query_on_fail, ctx); ctx->cache_query_status=async_web_cache_query(g_pangu_rt->cache, thread_id, session->req, ctx->f_cache_query); if(ctx->cache_query_status==WEB_CACHE_QUERING) { - tfe_http_session_suspend(session); + ctx->ref_session=tfe_http_session_allow_write(session); + tfe_http_session_suspend(ctx->ref_session); } else { diff --git a/plugin/business/pangu-http/pangu_web_cache.cpp b/plugin/business/pangu-http/pangu_web_cache.cpp index c1f7a06..c195eeb 100644 --- a/plugin/business/pangu-http/pangu_web_cache.cpp +++ b/plugin/business/pangu-http/pangu_web_cache.cpp @@ -1,7 +1,12 @@ -#include "tango_cache_pending.h" +#include "pangu_web_cache.h" +#include +#include + #include #include +#include + #include #include @@ -30,7 +35,7 @@ struct cache_handle* create_web_cache_handle(const char* profile_path, const cha return handle; } -char* read_http1_hdr(const char* hdr, const char* field_name) +static char* read_http1_hdr(const char* hdr, const char* field_name) { const char *p=NULL, *q=NULL; char* value=NULL; @@ -50,7 +55,7 @@ char* read_http1_hdr(const char* hdr, const char* field_name) { return NULL; } - value=calloc(sizeof(char), (q-p+1)); + value=(char*) calloc(sizeof(char), (q-p+1)); memcpy(value, p, q-p); return value; } @@ -95,8 +100,8 @@ struct cached_meta* cache_query_result_get_header(future_result_t * result) return NULL; } meta= ALLOC(struct cached_meta, 1); - meta->content_length=read_http1_hdr(cache_result->data_frag, "content-length"); - meta->content_type=read_http1_hdr(cache_result->data_frag, "content-type"); + meta->content_length=read_http1_hdr((const char*)cache_result->data_frag, "content-length"); + meta->content_type=read_http1_hdr((const char*)cache_result->data_frag, "content-type"); return meta; } void cache_query_result_append_data(struct evbuffer* buf, future_result_t * result) @@ -118,8 +123,8 @@ enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsig { case UNDEFINED: case FORBIDDEN: - case VERIFY: return WEB_CACHE_NOT_APPLICABLE; + case VERIFY: case ALLOWED: break; default: @@ -129,8 +134,8 @@ enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsig struct tango_cache_meta meta; memset(&meta, 0, sizeof(meta)); - meta->url=request->req_spec.url; - memcpy(&(meta->put), req_fresshness, sizeof(meta->put)); + meta.url=request->req_spec.url; + memcpy(&(meta.put), &req_fresshness, sizeof(meta.put)); ret=tango_cache_fetch_object(handle->clients[thread_id], f, &meta); assert(ret==0); return WEB_CACHE_QUERING; @@ -159,7 +164,7 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle, snprintf(buffer, sizeof(buffer), "content-type:%s",session->resp->resp_spec.content_length); meta.std_hdr[i]=buffer; i++; - memcpy(&meta.put, resp_freshness, sizeof(resp_freshness)); + memcpy(&meta.put, &resp_freshness, sizeof(resp_freshness)); write_ctx=tango_cache_update_start(handle->clients[thread_id], NULL, &meta); if(write_ctx==NULL) { @@ -173,7 +178,7 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle, } void web_cache_update(struct cache_update_context* ctx, const unsigned char * body_frag, size_t frag_size) { - tango_cache_update_frag_data(ctx->write_ctx, body_frag, frag_size); + tango_cache_update_frag_data(ctx->write_ctx, (const char*)body_frag, frag_size); return; } void web_cache_update_end(struct cache_update_context* ctx) diff --git a/plugin/business/pangu-http/pangu_web_cache.h b/plugin/business/pangu-http/pangu_web_cache.h index 94ef024..1f13f15 100644 --- a/plugin/business/pangu-http/pangu_web_cache.h +++ b/plugin/business/pangu-http/pangu_web_cache.h @@ -15,8 +15,8 @@ struct cache_handle; struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, void *logger); struct cached_meta { - const char* content_length; - const char* content_type; + char* content_length; + char* content_type; }; struct cached_meta* cache_query_result_get_header(future_result_t * result); void cache_query_free_meta(struct cached_meta* meta); diff --git a/plugin/protocol/http/src/http_half.cpp b/plugin/protocol/http/src/http_half.cpp index 50d3117..e025f56 100644 --- a/plugin/protocol/http/src/http_half.cpp +++ b/plugin/protocol/http/src/http_half.cpp @@ -322,7 +322,7 @@ static int __parser_callback_on_headers_complete(http_parser * parser) tfe_http_event event = (hf_direction == TFE_HTTP_REQUEST) ? EV_HTTP_REQ_HDR : EV_HTTP_RESP_HDR; if (hf_private->event_cb) { - hf_private->event_cb(hf_private, EV_HTTP_REQ_HDR, NULL, 0, hf_private->event_cb_user); + hf_private->event_cb(hf_private, event, NULL, 0, hf_private->event_cb_user); } /* The setup of user stream option indicates that the way to handle the request/response has diff --git a/vendor/CMakeLists.txt b/vendor/CMakeLists.txt index c404190..911dd8c 100644 --- a/vendor/CMakeLists.txt +++ b/vendor/CMakeLists.txt @@ -139,6 +139,11 @@ add_library(MESA_htable SHARED IMPORTED GLOBAL) set_property(TARGET MESA_htable PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/libMESA_htable.so) set_property(TARGET MESA_htable PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MESA_FRAMEWORK_INCLUDE_DIR}) +add_library(wiredLB SHARED IMPORTED GLOBAL) +set_property(TARGET wiredLB PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/libWiredLB.so) +set_property(TARGET wiredLB PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MESA_FRAMEWORK_INCLUDE_DIR}) + + add_library(maatframe SHARED IMPORTED GLOBAL) set_property(TARGET maatframe PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/libmaatframe.so) set_property(TARGET maatframe PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MESA_FRAMEWORK_INCLUDE_DIR})