完成pangu http发送业务日志功能开发。

This commit is contained in:
zhengchao
2018-09-15 17:52:06 +08:00
parent 254f3fbbd1
commit d83e978643
3 changed files with 181 additions and 160 deletions

View File

@@ -201,11 +201,11 @@ struct pangu_http_ctx
char* action_para;
scan_status_t mid;
stream_para_t sp;
struct Maat_rule_t* exec_rule;
struct Maat_rule_t* monit_rule;
int monit_num;
char* exec_para;
struct Maat_rule_t* enforce_rules;
size_t n_enforce;
char* enforce_para;
struct replace_ctx *rep_ctx;
int thread_id;
};
@@ -230,11 +230,14 @@ static void pangu_http_ctx_free(struct pangu_http_ctx* ctx)
ctx->rep_ctx->http_body=NULL;
//todo destroy http_half;
assert(ctx->rep_ctx->replacing==NULL);
FREE(&(ctx->rep_ctx));
FREE(&ctx->rep_ctx);
}
FREE(&ctx->enforce_rules);
FREE(&ctx->enforce_para);
Maat_clean_status(&(ctx->mid));
assert(ctx->sp==NULL);
ctx->mid=NULL;
free(ctx);
FREE(&ctx);
}
inline void addr_tfe2sapp(const struct tfe_stream_addr* tfe_addr, struct ipaddr* sapp_addr)
{
@@ -243,41 +246,31 @@ inline void addr_tfe2sapp(const struct tfe_stream_addr* tfe_addr, struct ipaddr*
return;
}
/*
20180909记录
1const char * tfe_http_field_iterate(const struct tfe_http_half * half, void * interator, struct http_field_name * name); void**?
2)http入口函数返回值增加 enum tfe_bussiness_action
3)struct http_field_name中的field_name内存是如何分配到
4http对上层要暴露evbuffer吗
5 uint64_t cont_len;
uint64_t cont_range_from;
uint64_t cont_range_to;
需要保留吗accept_encoding?
6entry函数中缺少thread id;
7) plugin init and deinit cb;
8) add http session open and close cb;
9)enum tfe_bussiness_action pangu_http_entry(const struct tfe_stream * stream, const struct tfe_http_session * session,
uint64_t event, const char* body_frag, size_t frag_size, unsigned int thread_id void ** pme)
*/
static enum pangu_action decide_ctrl_action(const Maat_rule_t* hit_result,int cnt,const Maat_rule_t**enforce_rule)
//enforce_rules[0] contains execute action.
static enum pangu_action decide_ctrl_action(const struct Maat_rule_t* hit_rules,size_t n_hit,
struct Maat_rule_t**enforce_rules, size_t* n_enforce)
{
int i=0;
const Maat_rule_t* tmp_rule=hit_result;
enum pangu_action tmp_action=PG_ACTION_NONE;
for(i=0;i<cnt;i++)
size_t n_monit=0, exist_enforce_num=0,i=0;
const struct Maat_rule_t* prior_rule=hit_rules;
struct Maat_rule_t monit_rule[n_hit];
enum pangu_action prior_action=PG_ACTION_NONE;
for(i=0;i<n_hit;i++)
{
if((enum pangu_action)hit_result[i].action>tmp_action)
if((enum pangu_action)hit_rules[i].action==PG_ACTION_MONIT)
{
tmp_rule=hit_result+i;
tmp_action=(enum pangu_action)hit_result[i].action;
memcpy(monit_rule+n_monit,hit_rules+i, sizeof(struct Maat_rule_t));
n_monit++;
}
else if((enum pangu_action)hit_result[i].action==tmp_action)
if((enum pangu_action)hit_rules[i].action>prior_action)
{
if(hit_result[i].config_id<tmp_rule->config_id)
prior_rule=hit_rules+i;
prior_action=(enum pangu_action)hit_rules[i].action;
}
else if((enum pangu_action)hit_rules[i].action==prior_action)
{
if(hit_rules[i].config_id<prior_rule->config_id)
{
tmp_rule=hit_result+i;
prior_rule=hit_rules+i;
}
}
@@ -286,8 +279,26 @@ static enum pangu_action decide_ctrl_action(const Maat_rule_t* hit_result,int cn
continue;
}
}
*enforce_rule=tmp_rule;
return tmp_action;
if(prior_action==PG_ACTION_WHITELIST)
{
return PG_ACTION_WHITELIST;
}
exist_enforce_num=*n_enforce;
if(prior_action==PG_ACTION_MONIT)
{
*n_enforce+=n_monit;
}
else
{
*n_enforce+=n_monit+1;
}
*enforce_rules=(struct Maat_rule_t*)realloc(*enforce_rules, sizeof(struct Maat_rule_t)*(*n_enforce));
memcpy(*enforce_rules+exist_enforce_num, prior_rule, sizeof(struct Maat_rule_t));
memcpy(*enforce_rules+exist_enforce_num+1, monit_rule, n_monit*sizeof(struct Maat_rule_t));
return prior_action;
}
//https://github.com/AndiDittrich/HttpErrorPages
static void html_generate(const char* enforce_para, char** page_buff,size_t *page_size)
@@ -556,7 +567,7 @@ void http_replace(const struct tfe_stream * stream, const struct tfe_http_sessio
{
ctx->rep_ctx=rep_ctx=ALLOC(struct replace_ctx, 1);
rep_ctx->rule=ALLOC(struct replace_rule, MAX_EDIT_ZONE_NUM);
rep_ctx->n_rule=format_replace_rule(ctx->exec_para, rep_ctx->rule, MAX_EDIT_ZONE_NUM);
rep_ctx->n_rule=format_replace_rule(ctx->enforce_para, rep_ctx->rule, MAX_EDIT_ZONE_NUM);
}
if(events&EV_HTTP_REQ_HDR)
{
@@ -653,17 +664,17 @@ static void http_reject(const struct tfe_http_session * session, uint64_t events
char cont_len_str[TFE_STRING_MAX];
struct tfe_http_session* to_write_sess=NULL;
ret=sscanf(ctx->exec_para,"code=%d;",&resp_code);
ret=sscanf(ctx->enforce_para,"code=%d;",&resp_code);
if(ret!=1)
{
TFE_LOG_ERROR(g_pangu_rt->local_logger, "Invalid reject rule %d paramter %s",
ctx->exec_rule->config_id, ctx->exec_para);
ctx->enforce_rules->config_id, ctx->enforce_para);
goto error_out;
}
to_write_sess=tfe_http_session_allow_write(session);
response=tfe_http_session_response_create(to_write_sess, resp_code);
html_generate(ctx->exec_para, &page_buff, &page_size);
html_generate(ctx->enforce_para, &page_buff, &page_size);
_wrap_std_field_write(response, TFE_HTTP_CONT_TYPE, "text/html; charset=utf-8");
snprintf(cont_len_str,sizeof(cont_len_str), "%lu", page_size);
_wrap_std_field_write(response, TFE_HTTP_CONT_LENGTH, cont_len_str);
@@ -680,12 +691,12 @@ static void http_redirect(const struct tfe_http_session * session, uint64_t even
char* url=NULL;
struct tfe_http_half* response=NULL;
struct tfe_http_session* to_write=NULL;
url=ALLOC(char, ctx->exec_rule->serv_def_len);
ret=sscanf(ctx->exec_para,"code=%d%[^;];url=%*[^;];",&resp_code,url);
url=ALLOC(char, ctx->enforce_rules->serv_def_len);
ret=sscanf(ctx->enforce_para,"code=%d%[^;];url=%*[^;];",&resp_code,url);
if(ret!=2)
{
TFE_LOG_ERROR(g_pangu_rt->local_logger, "Invalid redirect rule %d paramter %s",
ctx->exec_rule->config_id, ctx->exec_para);
ctx->enforce_rules->config_id, ctx->enforce_para);
goto error_out;
}
@@ -707,10 +718,9 @@ enum pangu_action http_scan(const struct tfe_http_session * session, uint64_t ev
const char* field_val=NULL;
struct http_field_name field_name;
struct Maat_rule_t result[MAX_SCAN_RESULT];
const struct Maat_rule_t* choosen=NULL;
char buff[TFE_STRING_MAX], *p=NULL;
int scan_ret=0, hit_cnt=0, table_id=0, read_rule_ret=0;
unsigned int i=0;
int scan_ret=0, table_id=0, read_rule_ret=0;
size_t hit_cnt=0, i=0;
if(events&EV_HTTP_REQ_HDR)
{
scan_ret=Maat_full_scan_string(g_pangu_rt->maat, g_pangu_rt->scan_table_id[PXY_CTRL_HTTP_URL],
@@ -765,26 +775,24 @@ enum pangu_action http_scan(const struct tfe_http_session * session, uint64_t ev
}
if(hit_cnt>0)
{
ctx->action=decide_ctrl_action(result, hit_cnt, &choosen);
ctx->exec_rule=ALLOC(struct Maat_rule_t, 1);
memcpy(ctx->exec_rule, choosen, sizeof(struct Maat_rule_t));
if(ctx->exec_rule->serv_def_len>MAX_SERVICE_DEFINE_LEN)
ctx->action=decide_ctrl_action(result, hit_cnt, &ctx->enforce_rules, &ctx->n_enforce);
if(ctx->enforce_rules[0].serv_def_len>MAX_SERVICE_DEFINE_LEN)
{
ctx->exec_para=ALLOC(char, ctx->exec_rule->serv_def_len);
read_rule_ret=Maat_read_rule(g_pangu_rt->maat, ctx->exec_rule,
MAAT_RULE_SERV_DEFINE, ctx->exec_para, ctx->exec_rule->serv_def_len);
assert(read_rule_ret== ctx->exec_rule->serv_def_len);
ctx->enforce_para=ALLOC(char, ctx->enforce_rules->serv_def_len);
read_rule_ret=Maat_read_rule(g_pangu_rt->maat, ctx->enforce_rules+0,
MAAT_RULE_SERV_DEFINE, ctx->enforce_para, ctx->enforce_rules[0].serv_def_len);
assert(read_rule_ret== ctx->enforce_rules[0].serv_def_len);
}
if(hit_cnt>1)
{
p=buff;
for(i=0;i<(unsigned int)hit_cnt;i++)
for(i=0;i<hit_cnt;i++)
{
p+=snprintf(p, sizeof(buff)-(p-buff), "%d:", result[i].config_id);
}
*p='\0';
TFE_LOG_INFO(g_pangu_rt->local_logger, "Multiple rules matched: url=%s num=%d ids=%s execute=%d.",
session->req->req_spec.url, hit_cnt, buff, ctx->exec_rule->config_id);
TFE_LOG_INFO(g_pangu_rt->local_logger, "Multiple rules matched: url=%s num=%lu ids=%s execute=%d.",
session->req->req_spec.url, hit_cnt, buff, ctx->enforce_rules[0].config_id);
}
}
return ctx->action;
@@ -796,7 +804,6 @@ void pangu_on_http_begin(const struct tfe_stream * stream,
{
struct pangu_http_ctx* ctx=*(struct pangu_http_ctx**)pme;
struct Maat_rule_t result[MAX_SCAN_RESULT];
const struct Maat_rule_t* choosen=NULL;
struct ipaddr sapp_addr;
int hit_cnt=0;
assert(ctx==NULL);
@@ -806,7 +813,7 @@ void pangu_on_http_begin(const struct tfe_stream * stream,
result, MAX_SCAN_RESULT, &(ctx->mid), (int)thread_id);
if(hit_cnt>0)
{
ctx->action=decide_ctrl_action(result, hit_cnt, &choosen);
ctx->action=decide_ctrl_action(result, hit_cnt, &ctx->enforce_rules, &ctx->n_enforce);
}
if(ctx->action==PG_ACTION_WHITELIST)
{
@@ -820,10 +827,10 @@ void pangu_on_http_end(const struct tfe_stream * stream,
{
struct pangu_http_ctx* ctx=*(struct pangu_http_ctx**)pme;
struct pangu_log log_msg={.stream=stream, .http=session, .result=ctx->exec_rule, .result_num=1};
struct pangu_log log_msg={.stream=stream, .http=session, .result=ctx->enforce_rules, .result_num=1};
if(ctx->action!=PG_ACTION_NONE)
{
pangu_log_send(g_pangu_rt->send_logger, &log_msg, NULL, 0);
pangu_log_send(g_pangu_rt->send_logger, &log_msg);
}
pangu_http_ctx_free(ctx);
*pme=NULL;

View File

@@ -12,11 +12,13 @@
#include <unistd.h>
#include <sys/ioctl.h>
#include <net/if.h>
#include <pthread.h>
#include <errno.h>
struct json_spec
{
int json_type;
const char *name;
const char *log_filed_name;
enum tfe_http_std_field field_id;
};
struct pangu_logger
@@ -28,9 +30,8 @@ struct pangu_logger
void* global_logger;
rd_kafka_t *kafka_handle;
rd_kafka_topic_t* kafka_topic;
pthread_mutex_t mutex;
char brokerlist[TFE_STRING_MAX];
struct json_spec opt2json[LOG_OPT_MAX];
const char* topic_name;
void* local_logger;
@@ -74,7 +75,6 @@ error:
static rd_kafka_t * create_kafka_handle(const char* brokerlist)
{
int i = 0;
char kafka_errstr[1024];
rd_kafka_t *handle=NULL;
rd_kafka_conf_t *rdkafka_conf = NULL;
@@ -103,25 +103,11 @@ static rd_kafka_t * create_kafka_handle(const char* brokerlist)
struct pangu_logger* pangu_log_handle_create(const char* profile, const char* section, void* local_logger)
{
int ret=-1,i=0;
char addr_string[TFE_SYMBOL_MAX]={0},local_msg_dir[TFE_STRING_MAX]={0};
int ret=-1;
char nic_name[64]={0};
unsigned int ip_buff[TFE_SYMBOL_MAX];
struct pangu_logger* instance=ALLOC(struct pangu_logger,1);
instance->global_logger=local_logger;
instance->opt2json[LOG_OPT_HTTP_C2S_ISN] = {cJSON_Number,"isn"};
instance->opt2json[LOG_OPT_HTTP_PROXY_FLAG] = {cJSON_Number,"proxy_flag"};
instance->opt2json[LOG_OPT_HTTP_URL] = {cJSON_String,"url"};
instance->opt2json[LOG_OPT_HTTP_COOKIE] = {cJSON_String,"cookie"};
instance->opt2json[LOG_OPT_HTTP_REFERER] = {cJSON_String,"referer"};
instance->opt2json[LOG_OPT_HTTP_UA] = {cJSON_String,"user_agent"};
instance->opt2json[LOG_OPT_HTTP_REQ_LINE] = {cJSON_String,"req_line"};
instance->opt2json[LOG_OPT_HTTP_RES_LINE] = {cJSON_String,"res_line"};
instance->opt2json[LOG_OPT_HTTP_SET_COOKIE] = {cJSON_String,"set_cookie"};
instance->opt2json[LOG_OPT_HTTP_CONTENT_TYPE] = {cJSON_String,"content_type"};
instance->opt2json[LOG_OPT_HTTP_CONTENT_LEN] = {cJSON_String,"content_len"};
instance->local_logger=local_logger;
TFE_LOG_ERROR(local_logger,"Pangu log is inititating from %s section %s.", profile, section);
@@ -150,16 +136,116 @@ struct pangu_logger* pangu_log_handle_create(const char* profile, const char* s
}
instance->topic_name="PXY_HTTP_LOG";
instance->kafka_topic = rd_kafka_topic_new(instance->kafka_handle,instance->topic_name, NULL);
pthread_mutex_init(&(instance->mutex), NULL);
return instance;
error_out:
free(instance);
return NULL;
}
int pangu_send_log(struct pangu_logger* logger, const struct pangu_log* log_msg,struct opt_unit* log_opt,int opt_num)
static const char * _wrap_std_field_read(struct tfe_http_half * half, enum tfe_http_std_field field_id)
{
struct http_field_name tmp_name;
tmp_name.field_id=field_id;
tmp_name.field_name=NULL;
return tfe_http_field_read(half, &tmp_name);
}
int pangu_send_log(struct pangu_logger* handle, const struct pangu_log* log_msg)
{
const struct tfe_http_session* http=log_msg->http;
const struct tfe_stream_addr* addr=log_msg->stream->addr;
const char* tmp_val=NULL;
cJSON *common_obj=NULL, *per_hit_obj=NULL;
char* log_payload=NULL;
int kafka_status=0;
time_t cur_time;
char src_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0};
char dst_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0};
struct json_spec req_fields[]={ {"cookie", TFE_HTTP_COOKIE},
{"referer", TFE_HTTP_REFERER},
{"user_agent", TFE_HTTP_USER_AGENT} };
struct json_spec resp_fields[]={ {"content_type", TFE_HTTP_CONT_TYPE},
{"content_len", TFE_HTTP_CONT_LENGTH} };
common_obj=cJSON_CreateObject();
cur_time = time(NULL);
cJSON_AddNumberToObject(common_obj, "found_time", cur_time);
cJSON_AddNumberToObject(common_obj, "recv_time", cur_time);
switch(addr->addrtype)
{
case TFE_ADDR_IPV4:
cJSON_AddNumberToObject(common_obj, "addr_type", 4);
inet_ntop(AF_INET, &addr->ipv4->saddr, src_ip_str, sizeof(src_ip_str));
inet_ntop(AF_INET, &addr->ipv4->daddr, dst_ip_str, sizeof(dst_ip_str));
cJSON_AddStringToObject(common_obj, "s_ip", src_ip_str);
cJSON_AddStringToObject(common_obj, "d_ip", dst_ip_str);
cJSON_AddNumberToObject(common_obj, "s_port", ntohs(addr->ipv4->source));
cJSON_AddNumberToObject(common_obj, "d_port", ntohs(addr->ipv4->dest));
cJSON_AddStringToObject(common_obj, "trans_proto", "IPv4_TCP");
break;
case TFE_ADDR_IPV6:
cJSON_AddNumberToObject(common_obj, "addr_type", 6);
inet_ntop(AF_INET6, &addr->ipv6->saddr, src_ip_str, sizeof(src_ip_str));
inet_ntop(AF_INET6, &addr->ipv6->daddr, dst_ip_str, sizeof(dst_ip_str));
cJSON_AddStringToObject(common_obj, "s_ip", src_ip_str);
cJSON_AddStringToObject(common_obj, "d_ip", dst_ip_str);
cJSON_AddNumberToObject(common_obj, "s_port", ntohs(addr->ipv6->source));
cJSON_AddNumberToObject(common_obj, "d_port", ntohs(addr->ipv6->dest));
cJSON_AddStringToObject(common_obj, "trans_proto", "IPv6_TCP");
break;
default:
break;
}
cJSON_AddNumberToObject(common_obj, "direction", 0); //0域内->域外1域外->域内描述的是CLIENT_IP信息
cJSON_AddNumberToObject(common_obj, "stream_dir", 3); //1:c2s, 2:s2c, 3:double
cJSON_AddStringToObject(common_obj, "cap_ip", handle->local_ip_str);
cJSON_AddNumberToObject(common_obj, "entrance_id", handle->entry_id);
cJSON_AddNumberToObject(common_obj, "device_id", 0);
cJSON_AddStringToObject(common_obj, "user_region", "null");
cJSON_AddStringToObject(common_obj, "url", http->req->req_spec.url);
for(size_t i=0;i<sizeof(req_fields)/sizeof(struct json_spec);i++)
{
tmp_val=_wrap_std_field_read(http->req, req_fields[i].field_id);
if(tmp_val!=NULL)
{
cJSON_AddStringToObject(common_obj,req_fields[i].log_filed_name, tmp_val);
}
}
for(size_t i=0;i<sizeof(resp_fields)/sizeof(struct json_spec);i++)
{
tmp_val=_wrap_std_field_read(http->req, resp_fields[i].field_id);
if(tmp_val!=NULL)
{
cJSON_AddStringToObject(common_obj,resp_fields[i].log_filed_name, tmp_val);
}
}
for(int i=0; i<log_msg->result_num; i++)
{
if(log_msg->result[i].do_log==0)
{
continue;
}
per_hit_obj=cJSON_Duplicate(common_obj, 1);
cJSON_AddNumberToObject(per_hit_obj, "cfg_id", log_msg->result[i].config_id);
cJSON_AddNumberToObject(per_hit_obj, "service", log_msg->result[i].service_id);
log_payload = cJSON_Print(per_hit_obj);
kafka_status = rd_kafka_produce(handle->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
log_payload, strlen(log_payload), NULL, 0, NULL);
free(log_payload);
cJSON_free(per_hit_obj);
if(kafka_status<0)
{
TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error()));
}
}
cJSON_free(common_obj);
return 0;
}

View File

@@ -4,77 +4,6 @@
#include <tfe_http.h>
#include <MESA/Maat_rule.h>
enum pangu_log_opt
{
//Shared log options
LOG_OPT_SCENE_FILE=1, //IP pcap/Mail content
LOG_OPT_STREAM_INFO, // data is a struct stream_info *, size =8
LOG_OPT_MAAT_RULE, //duplicate option is allowed.
//Following are options for the respective protocol
LOG_OPT_HTTP_REQ_LINE,
LOG_OPT_HTTP_REQ_HDR,
LOG_OPT_HTTP_REQ_BODY,
LOG_OPT_HTTP_RES_LINE,
LOG_OPT_HTTP_RES_HDR,
LOG_OPT_HTTP_RES_BODY,
LOG_OPT_HTTP_URL,
LOG_OPT_HTTP_C2S_ISN, //size=4
LOG_OPT_HTTP_PROXY_FLAG, //size=4 ,0 or 1
LOG_OPT_HTTP_SEQ, //size=4
LOG_OPT_HTTP_COOKIE,
LOG_OPT_HTTP_REFERER,
LOG_OPT_HTTP_UA,
LOG_OPT_HTTP_SET_COOKIE,
LOG_OPT_HTTP_CONTENT_LEN,
LOG_OPT_HTTP_CONTENT_TYPE,
LOG_OPT_HTTP_USER_DEFINE, //key:value+ '\0' ,e.g. "Server:nginx"
LOG_OPT_MAIL_PROTO,//string:"pop3","smtp" or "imap4"
LOG_OPT_MAIL_FROM,
LOG_OPT_MAIL_TO,
LOG_OPT_MAIL_SUBJECT,
LOG_OPT_MAIL_EML,
LOG_OPT_DNS_RD, //Shared with FD and JC
LOG_OPT_DNS_QTYPE, //Shared with FD and JC
LOG_OPT_DNS_QCLASS, //Shared with FD and JC
LOG_OPT_DNS_OPCODE, //Shared with FD and JC
LOG_OPT_DNS_QNAME, //Shared with FD and JC
LOG_OPT_DNS_CHEAT_TYPE, //Only in FD
LOG_OPT_DNS_CHEAT_RCODE, //Only in FD
LOG_OPT_DNS_CHEAT_STRATEGY, //Only in FD
LOG_OPT_DNS_CHEAT_RECORD, //Only in FD
LOG_OPT_DNS_CHEAT_TTL, //Only in FD
LOG_OPT_DNS_QR, //Only in JC
LOG_OPT_DNS_RA, //Only in JC
LOG_OPT_DNS_RR, //Only in JC
LOG_OPT_DNS_TTL, //Only in JC
LOG_OPT_DNS_DNS_SUB, //Only in JC, size=sizeof(int) 0-DNS,1-DNSSEC
LOG_OPT_FTP_URL,
LOG_OPT_MAX
};
typedef enum _soq_action
{
SOQ_ACTION_BLOCK,
SOQ_ACTION_MONITOR,
SOQ_ACTION_CONTINUE,
SOQ_ACTION_ABORT
}soq_action_t;
struct opt_unit
{
enum pangu_log_opt opt_type;
int opt_len;
const void* opt_value;
};
struct pangu_log
{
const struct tfe_stream *stream;
@@ -82,11 +11,10 @@ struct pangu_log
const Maat_rule_t*result;
int result_num;
};
struct pangu_logger* logger;
struct pangu_logger;
struct pangu_logger* pangu_log_handle_create(const char* profile, const char* section, void* local_logger);
//return 0 if SUCCESS, otherwise return -1
int pangu_log_send(struct pangu_logger* logger, const pangu_log* log_msg, struct opt_unit* log_opt, int opt_num);
int pangu_log_send(struct pangu_logger* logger, const pangu_log* log_msg);