缓存能跑起来了。

This commit is contained in:
zhengchao
2018-10-14 18:45:02 +08:00
parent 12d4370b3f
commit 294201ecd8
12 changed files with 86 additions and 42 deletions

View File

@@ -2,4 +2,4 @@ add_library(tango-cache-client src/cache_evbase_client.cpp src/ src/tango_cache_
target_link_libraries(tango-cache-client http)
target_include_directories(tango-cache-client PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include)
target_link_libraries(tango-cache-client libevent-static openssl-crypto-static openssl-ssl-static libxml2-static libcurl-static)
target_link_libraries(tango-cache-client MESA_handle_logger MESA_htable MESA_prof_load wiredcfg)
target_link_libraries(tango-cache-client MESA_handle_logger MESA_htable MESA_prof_load wiredLB)

View File

@@ -699,6 +699,7 @@ static int wired_load_balancer_init(struct tango_cache_instance *instance)
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_create failed.\n");
return -1;
}
wiredLB_set_opt(instance->wiredlb, WLB_OPT_HEALTH_CHECK_PORT, &instance->wiredlb_ha_port, sizeof(instance->wiredlb_ha_port));
wiredLB_set_opt(instance->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &instance->wiredlb_override, sizeof(instance->wiredlb_override));
wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_DATACENTER, instance->wiredlb_datacenter, strlen(instance->wiredlb_datacenter)+1);
if(instance->wiredlb_override)
@@ -751,6 +752,9 @@ static int load_local_configure(struct tango_cache_instance *instance, const cha
MESA_load_profile_string_def(profile_path, section, "WIREDLB_GROUP", instance->wiredlb_group, 64, "KAZAKHSTAN");
MESA_load_profile_string_def(profile_path, section, "WIREDLB_DATACENTER", instance->wiredlb_datacenter, 64, "ASTANA");
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", &instance->wiredlb_override, 1);
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_HEALTH_PORT", &intval, 52100);
instance->wiredlb_ha_port=(unsigned short)intval;
return 0;
}

View File

@@ -54,6 +54,7 @@ struct tango_cache_instance
char wiredlb_datacenter[64];
u_int32_t minio_port;
u_int32_t wiredlb_override;
u_int16_t wiredlb_ha_port;
struct event_base* evbase;
struct event timer_event;
struct cache_statistics statistic;

View File

@@ -1,24 +1,22 @@
[TANGO_CACHE]
#MINIO IP地址目前只支持一个
#Address of MINIO Servers
MINIO_IP_LIST=192.168.10.61-64;
MINIO_LISTEN_PORT=9000
#每个域名最多开启的链接数
MAX_CONNECTION_PER_HOST=10
#bucket的名称
CACHE_BUCKET_NAME=openbucket
#缓存最大占用的内存空间大小,超出空间时上传失败
#Upload failed when exceed max memory constraintt
MAX_USED_MEMORY_SIZE_MB=5120
#上传时Expires头部的过期时间单位秒最小601分钟
#Expire second of Upload header, 60s minimum.
CACHE_DEFAULT_TTL_SECOND=3600
#是否对对象的名称进行哈希,开启哈希有助于提高上传下载的速率
#Hash object name to speedup query.
CACHE_OBJECT_KEY_HASH_SWITCH=0
#WIRED LOAD BALANCER配置
#For WIRED LOAD BALANCER
#WIREDLB_OVERRIDE=1
#WIREDLB_TOPIC=
#WIREDLB_GROUP=

View File

@@ -3,7 +3,7 @@ add_executable(tfe src/key_keeper.cpp src/kni_acceptor.cpp src/ssl_stream.cpp sr
target_include_directories(tfe PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include/external)
target_include_directories(tfe PRIVATE ${CMAKE_CURRENT_LIST_DIR}/include/internal)
target_link_libraries(tfe common)
target_link_libraries(tfe common tango-cache-client)
target_link_libraries(tfe pthread dl
openssl-ssl-static
openssl-crypto-static

View File

@@ -22,6 +22,8 @@
#include <event2/thread.h>
#include <MESA/MESA_handle_logger.h>
#include <tango_cache_client.h>
#include <tfe_utils.h>
#include <tfe_future.h>
#include <tfe_stream.h>
@@ -305,16 +307,6 @@ int main(int argc, char *argv[])
g_default_proxy->evbase, g_default_logger, g_default_proxy->fs_handle);
CHECK_OR_EXIT(g_default_proxy->ssl_mgr_handler, "Failed at init SSL manager. Exit.");
/* PLUGIN INIT */
unsigned int plugin_iterator = 0;
for(struct tfe_plugin * plugin_iter = tfe_plugin_iterate(&plugin_iterator);
plugin_iter != NULL; plugin_iter = tfe_plugin_iterate(&plugin_iterator))
{
ret = plugin_iter->on_init(g_default_proxy);
CHECK_OR_EXIT(ret >= 0, "Plugin %s init failed. Exit. ", plugin_iter->symbol);
TFE_LOG_INFO(g_default_logger, "Plugin %s initialized. ", plugin_iter->symbol);
}
for (size_t i = 0; i < (sizeof(signals) / sizeof(int)); i++)
{
g_default_proxy->sev[i] = evsignal_new(g_default_proxy->evbase, signals[i], __signal_handler_cb, g_default_proxy);
@@ -336,6 +328,17 @@ int main(int argc, char *argv[])
g_default_proxy->kni_acceptor_handler = kni_acceptor_init(g_default_proxy, main_profile, g_default_logger);
CHECK_OR_EXIT(g_default_proxy->kni_acceptor_handler, "Failed at init KNI acceptor. Exit. ");
/* PLUGIN INIT */
unsigned int plugin_iterator = 0;
for(struct tfe_plugin * plugin_iter = tfe_plugin_iterate(&plugin_iterator);
plugin_iter != NULL; plugin_iter = tfe_plugin_iterate(&plugin_iterator))
{
ret = plugin_iter->on_init(g_default_proxy);
CHECK_OR_EXIT(ret >= 0, "Plugin %s init failed. Exit. ", plugin_iter->symbol);
TFE_LOG_INFO(g_default_logger, "Plugin %s initialized. ", plugin_iter->symbol);
}
TFE_LOG_ERROR(g_default_logger, "Tango Frontend Engine initialized. ");
event_base_dispatch(g_default_proxy->evbase);

View File

@@ -7,7 +7,7 @@ KAFKA_BROKERLIST=192.168.10.73:9092
[MAAT]
# 0:json 1: redis 2: iris
MAAT_INPUT_MODE=0
MAAT_INPUT_MODE=1
TABLE_INFO=./pangu_conf/table_info.conf
JSON_CFG_FILE=./pangu_conf/pangu_ctrl.json
STAT_FILE=./log/pangu_scan.status
@@ -17,3 +17,29 @@ MAAT_REDIS_SERVER=192.168.11.243
MAAT_REDIS_PORT=6379
MAAT_REDIS_DB_INDEX=4
EFFECT_INTERVAL_S=1
[TANGO_CACHE]
[TANGO_CACHE]
#Address of MINIO Servers
MINIO_IP_LIST=192.168.10.61-64;
MINIO_LISTEN_PORT=9000
MAX_CONNECTION_PER_HOST=10
CACHE_BUCKET_NAME=openbucket
#Upload failed when exceed max memory constraintt
MAX_USED_MEMORY_SIZE_MB=5120
#Expire second of Upload header, 60s minimum.
CACHE_DEFAULT_TTL_SECOND=3600
#Hash object name to speedup query.
CACHE_OBJECT_KEY_HASH_SWITCH=0
#For WIRED LOAD BALANCER
#WIREDLB_OVERRIDE=1
#WIREDLB_TOPIC=
#WIREDLB_GROUP=
#WIREDLB_DATACENTER=

View File

@@ -1,5 +1,6 @@
#include "pangu_logger.h"
#include "pattern_replace.h"
#include "pangu_web_cache.h"
#include <tfe_proxy.h>
#include <tfe_stream.h>
@@ -198,7 +199,7 @@ int pangu_http_init(struct tfe_proxy * proxy)
"./pangu_conf/template/HTTP451.html");
g_pangu_rt->tpl_451 = ctemplate::Template::GetTemplate(page_path, ctemplate::DO_NOT_STRIP);
g_pangu_rt->cache = create_web_cache_handle(profile, "CACHE_SERVER", g_pangu_rt->local_logger);
g_pangu_rt->cache = create_web_cache_handle(profile, "TANGO_CACHE", g_pangu_rt->local_logger);
TFE_LOG_INFO(NULL, "Pangu HTTP init success.");
return 0;
@@ -287,7 +288,7 @@ static struct pangu_http_ctx * pangu_http_ctx_new(unsigned int thread_id)
static void pangu_http_ctx_free(struct pangu_http_ctx * ctx)
{
if (!ctx->rep_ctx)
if (ctx->rep_ctx)
{
http_repl_ctx_free(ctx->rep_ctx);
ctx->rep_ctx = NULL;
@@ -297,26 +298,26 @@ static void pangu_http_ctx_free(struct pangu_http_ctx * ctx)
Maat_clean_status(&(ctx->mid));
ctx->mid = NULL;
if(!ctx->sp)
if(ctx->sp)
{
Maat_stream_scan_string_end(&(ctx->sp));
}
if(!ctx->cache_update_ctx)
if(ctx->cache_update_ctx)
{
web_cache_update_end(ctx->cache_update_ctx);
ctx->cache_update_ctx=NULL;
}
if(!ctx->cached_header)
if(ctx->cached_header)
{
cache_query_free_meta(ctx->cached_header);
ctx->cached_header=NULL;
}
if(!ctx->cached_body)
if(ctx->cached_body)
{
evbuffer_free(ctx->cached_body);
ctx->cached_body=NULL;
}
if(!ctx->f_cache_query)
if(ctx->f_cache_query)
{
future_destroy(ctx->f_cache_query);
ctx->f_cache_query=NULL;
@@ -811,6 +812,7 @@ void enforce_control_policy(const struct tfe_stream * stream, const struct tfe_h
}
switch (ctx->action)
{
case PG_ACTION_NONE:
case PG_ACTION_MONIT:
//send log on close.
break;
@@ -829,12 +831,12 @@ void enforce_control_policy(const struct tfe_stream * stream, const struct tfe_h
}
void cache_query(const struct tfe_http_session * session, unsigned int thread_id, struct pangu_http_ctx * ctx)
{
ctx->ref_session=session;
ctx->f_cache_query=future_create("cache_query", cache_query_on_succ, cache_query_on_fail, ctx);
ctx->cache_query_status=async_web_cache_query(g_pangu_rt->cache, thread_id, session->req, ctx->f_cache_query);
if(ctx->cache_query_status==WEB_CACHE_QUERING)
{
tfe_http_session_suspend(session);
ctx->ref_session=tfe_http_session_allow_write(session);
tfe_http_session_suspend(ctx->ref_session);
}
else
{

View File

@@ -1,7 +1,12 @@
#include "tango_cache_pending.h"
#include "pangu_web_cache.h"
#include <tango_cache_pending.h>
#include <tango_cache_client.h>
#include <tfe_proxy.h>
#include <tfe_http.h>
#include <tfe_utils.h>
#include <event2/event.h>
#include <event2/buffer.h>
@@ -30,7 +35,7 @@ struct cache_handle* create_web_cache_handle(const char* profile_path, const cha
return handle;
}
char* read_http1_hdr(const char* hdr, const char* field_name)
static char* read_http1_hdr(const char* hdr, const char* field_name)
{
const char *p=NULL, *q=NULL;
char* value=NULL;
@@ -50,7 +55,7 @@ char* read_http1_hdr(const char* hdr, const char* field_name)
{
return NULL;
}
value=calloc(sizeof(char), (q-p+1));
value=(char*) calloc(sizeof(char), (q-p+1));
memcpy(value, p, q-p);
return value;
}
@@ -95,8 +100,8 @@ struct cached_meta* cache_query_result_get_header(future_result_t * result)
return NULL;
}
meta= ALLOC(struct cached_meta, 1);
meta->content_length=read_http1_hdr(cache_result->data_frag, "content-length");
meta->content_type=read_http1_hdr(cache_result->data_frag, "content-type");
meta->content_length=read_http1_hdr((const char*)cache_result->data_frag, "content-length");
meta->content_type=read_http1_hdr((const char*)cache_result->data_frag, "content-type");
return meta;
}
void cache_query_result_append_data(struct evbuffer* buf, future_result_t * result)
@@ -118,8 +123,8 @@ enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsig
{
case UNDEFINED:
case FORBIDDEN:
case VERIFY:
return WEB_CACHE_NOT_APPLICABLE;
case VERIFY:
case ALLOWED:
break;
default:
@@ -129,8 +134,8 @@ enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsig
struct tango_cache_meta meta;
memset(&meta, 0, sizeof(meta));
meta->url=request->req_spec.url;
memcpy(&(meta->put), req_fresshness, sizeof(meta->put));
meta.url=request->req_spec.url;
memcpy(&(meta.put), &req_fresshness, sizeof(meta.put));
ret=tango_cache_fetch_object(handle->clients[thread_id], f, &meta);
assert(ret==0);
return WEB_CACHE_QUERING;
@@ -159,7 +164,7 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle,
snprintf(buffer, sizeof(buffer), "content-type:%s",session->resp->resp_spec.content_length);
meta.std_hdr[i]=buffer;
i++;
memcpy(&meta.put, resp_freshness, sizeof(resp_freshness));
memcpy(&meta.put, &resp_freshness, sizeof(resp_freshness));
write_ctx=tango_cache_update_start(handle->clients[thread_id], NULL, &meta);
if(write_ctx==NULL)
{
@@ -173,7 +178,7 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle,
}
void web_cache_update(struct cache_update_context* ctx, const unsigned char * body_frag, size_t frag_size)
{
tango_cache_update_frag_data(ctx->write_ctx, body_frag, frag_size);
tango_cache_update_frag_data(ctx->write_ctx, (const char*)body_frag, frag_size);
return;
}
void web_cache_update_end(struct cache_update_context* ctx)

View File

@@ -15,8 +15,8 @@ struct cache_handle;
struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, void *logger);
struct cached_meta
{
const char* content_length;
const char* content_type;
char* content_length;
char* content_type;
};
struct cached_meta* cache_query_result_get_header(future_result_t * result);
void cache_query_free_meta(struct cached_meta* meta);

View File

@@ -322,7 +322,7 @@ static int __parser_callback_on_headers_complete(http_parser * parser)
tfe_http_event event = (hf_direction == TFE_HTTP_REQUEST) ? EV_HTTP_REQ_HDR : EV_HTTP_RESP_HDR;
if (hf_private->event_cb)
{
hf_private->event_cb(hf_private, EV_HTTP_REQ_HDR, NULL, 0, hf_private->event_cb_user);
hf_private->event_cb(hf_private, event, NULL, 0, hf_private->event_cb_user);
}
/* The setup of user stream option indicates that the way to handle the request/response has

View File

@@ -139,6 +139,11 @@ add_library(MESA_htable SHARED IMPORTED GLOBAL)
set_property(TARGET MESA_htable PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/libMESA_htable.so)
set_property(TARGET MESA_htable PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MESA_FRAMEWORK_INCLUDE_DIR})
add_library(wiredLB SHARED IMPORTED GLOBAL)
set_property(TARGET wiredLB PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/libWiredLB.so)
set_property(TARGET wiredLB PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MESA_FRAMEWORK_INCLUDE_DIR})
add_library(maatframe SHARED IMPORTED GLOBAL)
set_property(TARGET maatframe PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/libmaatframe.so)
set_property(TARGET maatframe PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MESA_FRAMEWORK_INCLUDE_DIR})