HTTP业务层增加subscribe ID扫描功能。
This commit is contained in:
@@ -157,6 +157,7 @@ struct tfe_stream_addr * tfe_stream_addr_create_by_fd(int fd, enum tfe_conn_dir
|
||||
//Follow function's returned pointer should be passed to free to release the allocated storage when it is no longer needed.
|
||||
char* tfe_string_addr_create_by_fd(int fd, enum tfe_conn_dir dir);
|
||||
char * tfe_stream_addr_to_str(const struct tfe_stream_addr * addr);
|
||||
int tfe_stream_addr_str_split(char* addr_str, const char** sip, const char** sport, const char** dip, const char** dport);
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,10 +1,42 @@
|
||||
#include "tfe_types.h"
|
||||
#include "tfe_utils.h"
|
||||
#include <string.h>
|
||||
const char * tfe_stream_conn_dir_to_str(enum tfe_conn_dir dir)
|
||||
{
|
||||
return (dir == CONN_DIR_DOWNSTREAM) ? "downstream" : "upstream";
|
||||
}
|
||||
int tfe_stream_addr_str_split(char* addr_str, const char** sip, const char** sport, const char** dip, const char** dport)
|
||||
{
|
||||
const char* seps=" ";
|
||||
char* saveptr=NULL, *subtoken=NULL, *str=NULL;
|
||||
int i=0;
|
||||
for (str = addr_str; ; str = NULL)
|
||||
{
|
||||
subtoken = strtok_r(str, seps, &saveptr);
|
||||
if (subtoken == NULL)
|
||||
break;
|
||||
switch(i)
|
||||
{
|
||||
case 0:
|
||||
if(sip!=NULL) *sip=subtoken;
|
||||
break;
|
||||
case 1:
|
||||
if(sport!=NULL) *sport=subtoken;
|
||||
break;
|
||||
case 2:
|
||||
if(dip!=NULL) *dip=subtoken;
|
||||
break;
|
||||
case 3:
|
||||
if(dport!=NULL) *dport=subtoken;
|
||||
break;
|
||||
default:
|
||||
return -1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
|
||||
}
|
||||
char * tfe_stream_addr_to_str(const struct tfe_stream_addr * addr)
|
||||
{
|
||||
char * __str_ret = NULL;
|
||||
|
||||
@@ -19,6 +19,17 @@ maat_redis_port_range=6379-6379
|
||||
maat_redis_db_index=4
|
||||
effect_interval_s=1
|
||||
|
||||
[dynamic_maat]
|
||||
# 0:json 1: redis 2: iris
|
||||
maat_input_mode=1
|
||||
table_info=resource/pangu/dynamic_maat_table_info.conf
|
||||
json_cfg_file=resource/
|
||||
stat_file=log/pangu_dyn_scan.status
|
||||
maat_redis_server=192.168.11.243
|
||||
maat_redis_port_range=6379-6379
|
||||
maat_redis_db_index=4
|
||||
effect_interval_s=1
|
||||
|
||||
[tango_cache]
|
||||
enable_cache=1
|
||||
#minio ip, as wiredlb required
|
||||
|
||||
@@ -50,6 +50,7 @@ enum scan_table
|
||||
PXY_CTRL_HTTP_REQ_BODY,
|
||||
PXY_CTRL_HTTP_RES_HDR,
|
||||
PXY_CTRL_HTTP_RES_BODY,
|
||||
PXY_CTRL_SUBSCRIBE_ID,
|
||||
__SCAN_TABLE_MAX
|
||||
};
|
||||
|
||||
@@ -69,6 +70,8 @@ enum pangu_http_stat
|
||||
struct pangu_rt
|
||||
{
|
||||
Maat_feather_t maat;
|
||||
Maat_feather_t dyn_maat;
|
||||
int subscribe_id_table_id;
|
||||
struct pangu_logger * send_logger;
|
||||
void * local_logger;
|
||||
int log_level;
|
||||
@@ -326,7 +329,53 @@ void trusted_CA_update_finish_cb(void* u_para)
|
||||
}
|
||||
}
|
||||
}
|
||||
static int get_column_pos(const char* line, int column_seq, size_t *offset, size_t *len)
|
||||
{
|
||||
const char* seps=" \t";
|
||||
char* saveptr=NULL, *subtoken=NULL, *str=NULL;
|
||||
char* dup_line=tfe_strdup(line);
|
||||
int i=0, ret=-1;
|
||||
for (str = dup_line; ; str = NULL)
|
||||
{
|
||||
subtoken = strtok_r(str, seps, &saveptr);
|
||||
if (subtoken == NULL)
|
||||
break;
|
||||
if(i==column_seq-1)
|
||||
{
|
||||
*offset=subtoken-dup_line;
|
||||
*len=strlen(subtoken);
|
||||
ret=0;
|
||||
break;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
free(dup_line);
|
||||
return ret;
|
||||
}
|
||||
|
||||
void subscribe_id_dup_cb(int table_id, MAAT_PLUGIN_EX_DATA* to, MAAT_PLUGIN_EX_DATA* from, long argl, void* argp)
|
||||
{
|
||||
*to = tfe_strdup((char*)*from);
|
||||
return;
|
||||
}
|
||||
void subscribe_id_new_cb(int table_id, const char* key, const char* table_line, MAAT_PLUGIN_EX_DATA* ad, long argl, void* argp)
|
||||
{
|
||||
int ret=0;
|
||||
size_t subscribe_id_offset, len;
|
||||
ret=get_column_pos(table_line, 7, &subscribe_id_offset, &len);
|
||||
if(ret<0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
*ad=ALLOC(char, len+1);
|
||||
memcpy(*ad, table_line+subscribe_id_offset, len);
|
||||
return;
|
||||
}
|
||||
void subscribe_id_free_cb(int table_id, MAAT_PLUGIN_EX_DATA* ad, long argl, void* argp)
|
||||
{
|
||||
free(*ad);
|
||||
*ad=NULL;
|
||||
}
|
||||
|
||||
int pangu_http_init(struct tfe_proxy * proxy)
|
||||
{
|
||||
@@ -354,6 +403,7 @@ int pangu_http_init(struct tfe_proxy * proxy)
|
||||
{
|
||||
goto error_out;
|
||||
}
|
||||
|
||||
const char * table_name[__SCAN_TABLE_MAX];
|
||||
table_name[PXY_CTRL_IP] = "PXY_CTRL_IP";
|
||||
table_name[PXY_CTRL_HTTP_URL] = "PXY_CTRL_HTTP_URL";
|
||||
@@ -361,6 +411,7 @@ int pangu_http_init(struct tfe_proxy * proxy)
|
||||
table_name[PXY_CTRL_HTTP_REQ_BODY] = "PXY_CTRL_HTTP_REQ_BODY";
|
||||
table_name[PXY_CTRL_HTTP_RES_HDR] = "PXY_CTRL_HTTP_RES_HDR";
|
||||
table_name[PXY_CTRL_HTTP_RES_BODY] = "PXY_CTRL_HTTP_RES_BODY";
|
||||
table_name[PXY_CTRL_SUBSCRIBE_ID] = "PXY_CTRL_SUBSCRIBE_ID";
|
||||
for (int i = 0; i < __SCAN_TABLE_MAX; i++)
|
||||
{
|
||||
g_pangu_rt->scan_table_id[i] = Maat_table_register(g_pangu_rt->maat, table_name[i]);
|
||||
@@ -370,6 +421,49 @@ int pangu_http_init(struct tfe_proxy * proxy)
|
||||
goto error_out;
|
||||
}
|
||||
}
|
||||
table_id=Maat_table_register(g_pangu_rt->maat, "PXY_OBJ_TRUSTED_CA_CERT");
|
||||
if(table_id<0)
|
||||
{
|
||||
TFE_LOG_INFO(NULL, "Pangu HTTP register table PXY_OBJ_TRUSTED_CA_CERT failed.");
|
||||
goto error_out;
|
||||
}
|
||||
Maat_table_callback_register(g_pangu_rt->maat, table_id,
|
||||
trusted_CA_update_start_cb,
|
||||
trusted_CA_update_cert_cb,
|
||||
trusted_CA_update_finish_cb,
|
||||
g_pangu_rt);
|
||||
|
||||
table_id=Maat_table_register(g_pangu_rt->maat, "PXY_OBJ_TRUSTED_CA_CRL");
|
||||
if(table_id<0)
|
||||
{
|
||||
TFE_LOG_INFO(NULL, "Pangu HTTP register table PXY_OBJ_TRUSTED_CA_CRL failed.");
|
||||
goto error_out;
|
||||
}
|
||||
Maat_table_callback_register(g_pangu_rt->maat, table_id,
|
||||
trusted_CA_update_start_cb,
|
||||
trusted_CA_update_crl_cb,
|
||||
trusted_CA_update_finish_cb,
|
||||
g_pangu_rt);
|
||||
|
||||
g_pangu_rt->dyn_maat = create_maat_feather(profile, "DYNAMIC_MAAT", g_pangu_rt->thread_num, g_pangu_rt->local_logger);
|
||||
if (!g_pangu_rt->maat)
|
||||
{
|
||||
goto error_out;
|
||||
}
|
||||
g_pangu_rt->subscribe_id_table_id=Maat_table_register(g_pangu_rt->dyn_maat, "IPD_DYN_SUBSCIBE_IP");
|
||||
temp=Maat_plugin_EX_register(g_pangu_rt->dyn_maat,
|
||||
g_pangu_rt->subscribe_id_table_id,
|
||||
subscribe_id_new_cb,
|
||||
subscribe_id_free_cb,
|
||||
subscribe_id_dup_cb,
|
||||
NULL,
|
||||
0,
|
||||
NULL);
|
||||
if(temp!=0)
|
||||
{
|
||||
TFE_LOG_ERROR(NULL, "Pangu HTTP Dynamic Maat IPD_DYN_SUBSCIBE_IP EX data register failed.");
|
||||
goto error_out;
|
||||
}
|
||||
|
||||
char page_path[256];
|
||||
memset(page_path, 0, sizeof(page_path));
|
||||
@@ -400,30 +494,7 @@ int pangu_http_init(struct tfe_proxy * proxy)
|
||||
}
|
||||
TFE_LOG_INFO(NULL, "Tango Cache Enabled.");
|
||||
}
|
||||
table_id=Maat_table_register(g_pangu_rt->maat, "PXY_OBJ_TRUSTED_CA_CERT");
|
||||
if(table_id<0)
|
||||
{
|
||||
TFE_LOG_INFO(NULL, "Pangu HTTP register table PXY_OBJ_TRUSTED_CA_CERT failed.");
|
||||
goto error_out;
|
||||
}
|
||||
Maat_table_callback_register(g_pangu_rt->maat, table_id,
|
||||
trusted_CA_update_start_cb,
|
||||
trusted_CA_update_cert_cb,
|
||||
trusted_CA_update_finish_cb,
|
||||
g_pangu_rt);
|
||||
|
||||
table_id=Maat_table_register(g_pangu_rt->maat, "PXY_OBJ_TRUSTED_CA_CRL");
|
||||
if(table_id<0)
|
||||
{
|
||||
TFE_LOG_INFO(NULL, "Pangu HTTP register table PXY_OBJ_TRUSTED_CA_CRL failed.");
|
||||
goto error_out;
|
||||
}
|
||||
Maat_table_callback_register(g_pangu_rt->maat, table_id,
|
||||
trusted_CA_update_start_cb,
|
||||
trusted_CA_update_crl_cb,
|
||||
trusted_CA_update_finish_cb,
|
||||
g_pangu_rt);
|
||||
|
||||
TFE_LOG_INFO(NULL, "Pangu HTTP init success.");
|
||||
return 0;
|
||||
|
||||
@@ -1315,13 +1386,47 @@ void pangu_on_http_begin(const struct tfe_stream * stream,
|
||||
struct pangu_http_ctx * ctx = *(struct pangu_http_ctx **) pme;
|
||||
struct Maat_rule_t result[MAX_SCAN_RESULT];
|
||||
struct ipaddr sapp_addr;
|
||||
int hit_cnt = 0;
|
||||
int hit_cnt = 0, scan_ret=0;
|
||||
UNUSED int tmp=0;
|
||||
assert(ctx == NULL);
|
||||
ATOMIC_INC(&(g_pangu_rt->stat_val[STAT_SESSION]));
|
||||
ctx = pangu_http_ctx_new(thread_id);
|
||||
char* addr_string=tfe_stream_addr_to_str(stream->addr);
|
||||
const char* sip=NULL, *dip=NULL;
|
||||
char* source_subscribe_id=NULL, *dest_subscribe_id=NULL;
|
||||
|
||||
tmp=tfe_stream_addr_str_split(addr_string, &sip, NULL, &dip, NULL);
|
||||
assert(tmp==0);
|
||||
source_subscribe_id=(char*)Maat_plugin_get_EX_data(g_pangu_rt->dyn_maat, g_pangu_rt->subscribe_id_table_id, sip);
|
||||
dest_subscribe_id=(char*)Maat_plugin_get_EX_data(g_pangu_rt->dyn_maat, g_pangu_rt->subscribe_id_table_id, dip);
|
||||
|
||||
if(source_subscribe_id!=NULL)
|
||||
{
|
||||
scan_ret = Maat_full_scan_string(g_pangu_rt->maat, g_pangu_rt->scan_table_id[PXY_CTRL_SUBSCRIBE_ID],
|
||||
CHARSET_UTF8, sip, strlen(sip),
|
||||
result+hit_cnt, NULL, MAX_SCAN_RESULT-hit_cnt,
|
||||
&(ctx->scan_mid), (int) thread_id);
|
||||
if(scan_ret>0)
|
||||
{
|
||||
hit_cnt+=scan_ret;
|
||||
}
|
||||
}
|
||||
if(dest_subscribe_id!=NULL)
|
||||
{
|
||||
scan_ret = Maat_full_scan_string(g_pangu_rt->maat, g_pangu_rt->scan_table_id[PXY_CTRL_SUBSCRIBE_ID],
|
||||
CHARSET_UTF8, dip, strlen(dip),
|
||||
result+hit_cnt, NULL, MAX_SCAN_RESULT-hit_cnt,
|
||||
&(ctx->scan_mid), (int) thread_id);
|
||||
if(scan_ret>0)
|
||||
{
|
||||
hit_cnt+=scan_ret;
|
||||
}
|
||||
}
|
||||
|
||||
addr_tfe2sapp(stream->addr, &sapp_addr);
|
||||
hit_cnt = Maat_scan_proto_addr(g_pangu_rt->maat, g_pangu_rt->scan_table_id[PXY_CTRL_IP], &sapp_addr, 0,
|
||||
result, MAX_SCAN_RESULT, &(ctx->scan_mid), (int) thread_id);
|
||||
hit_cnt += Maat_scan_proto_addr(g_pangu_rt->maat, g_pangu_rt->scan_table_id[PXY_CTRL_IP], &sapp_addr, 0,
|
||||
result+hit_cnt, MAX_SCAN_RESULT-hit_cnt, &(ctx->scan_mid), (int) thread_id);
|
||||
|
||||
|
||||
if (hit_cnt > 0)
|
||||
{
|
||||
@@ -1333,6 +1438,9 @@ void pangu_on_http_begin(const struct tfe_stream * stream,
|
||||
}
|
||||
|
||||
*pme = ctx;
|
||||
free(addr_string);
|
||||
free(source_subscribe_id);
|
||||
free(dest_subscribe_id);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
19
resource/pangu/dynamic_maat_table_info.conf
Normal file
19
resource/pangu/dynamic_maat_table_info.conf
Normal file
@@ -0,0 +1,19 @@
|
||||
#each collumn seperate with '\t'
|
||||
#id (0~65535)
|
||||
#name string
|
||||
#type one of ip,expr,expr_plus,digest,intval,compile or plugin
|
||||
#src_charset one of GBK,BIG5,UNICODE,UTF8
|
||||
#dst_charset combined by GBK,BIG5,UNICODE,UTF8,seperate with '/'
|
||||
#do_merege [yes/no]
|
||||
#cross cache [number]
|
||||
#quick mode [quickon/quickoff], default [quickoff]
|
||||
#For ip/intval/digest/compile/group
|
||||
#id name type
|
||||
#
|
||||
#For plugin table
|
||||
#id name type json_descr
|
||||
#
|
||||
#For expr/expr_plus Table
|
||||
#id name type src_charset dst_charset do_merge cross_cache quick_mode
|
||||
|
||||
1 IPD_DYN_SUBSCIBE_IP plugin {"key":4,"valid":9}
|
||||
@@ -23,9 +23,10 @@
|
||||
5 PXY_CTRL_HTTP_REQ_BODY expr UTF8 GBK/UNICODE/UTF8 yes 128 quickoff
|
||||
6 PXY_CTRL_HTTP_RES_HDR expr_plus UTF8 UTF8 UTF8 yes 0 quickoff
|
||||
7 PXY_CTRL_HTTP_RES_BODY expr UTF8 GBK/UNICODE/UTF8 yes 128 quickoff
|
||||
8 PXY_CACHE_COMPILE compile escape --
|
||||
9 PXY_CACHE_GROUP group --
|
||||
10 PXY_CACHE_HTTP_URL expr UTF8 UTF8 yes 0 quickoff
|
||||
11 PXY_CACHE_HTTP_COOKIE expr UTF8 UTF8 yes 0 quickoff
|
||||
12 PXY_OBJ_TRUSTED_CA_CERT plugin {"valid":4,"foreign":"3"}
|
||||
13 PXY_OBJ_TRUSTED_CA_CRL plugin {"valid":4,"foreign":"3"}
|
||||
8 PXY_CTRL_SUBSCRIBE_ID expr UTF8 UTF8 yes 0 quickon
|
||||
9 PXY_CACHE_COMPILE compile escape --
|
||||
10 PXY_CACHE_GROUP group --
|
||||
11 PXY_CACHE_HTTP_URL expr UTF8 UTF8 yes 0 quickoff
|
||||
12 PXY_CACHE_HTTP_COOKIE expr UTF8 UTF8 yes 0 quickoff
|
||||
13 PXY_OBJ_TRUSTED_CA_CERT plugin {"valid":4,"foreign":"3"}
|
||||
14 PXY_OBJ_TRUSTED_CA_CRL plugin {"valid":4,"foreign":"3"}
|
||||
Reference in New Issue
Block a user