#include #include #include #include #include #include #include "pangu_logger.h" struct json_spec { const char *log_filed_name; enum tfe_http_std_field field_id; }; struct pangu_logger { int entry_id; unsigned int en_sendlog; const char *device_id; const char *data_center; void* local_logger; unsigned long long send_cnt; unsigned long long random_drop; unsigned long long user_abort; char local_log_path[TFE_STRING_MAX]; tfe_kafka_logger_t *kafka_logger; struct cache_evbase_instance * log_file_upload_instance; }; enum _log_action //Bigger action number is prior. { LG_ACTION_NONE = 0x00, LG_ACTION_MONIT = 0x01, LG_ACTION_FORWARD = 0x02, /* N/A */ LG_ACTION_REJECT = 0x10, LG_ACTION_DROP = 0x20, /* N/A */ LG_ACTION_MANIPULATE = 0x30, LG_ACTION_RATELIMIT = 0x40, /* N/A */ LG_ACTION_LOOP = 0x60, /* N/A */ LG_ACTION_WHITELIST = 0x80, __LG_ACTION_MAX }; struct pangu_logger* pangu_log_handle_create(const char* profile, const char* section, void* local_logger) { struct tango_cache_parameter *log_file_upload_para=NULL; struct pangu_logger* instance=ALLOC(struct pangu_logger,1); instance->local_logger=local_logger; TFE_LOG_INFO(local_logger,"Pangu log is inititating from %s section %s.", profile, section); MESA_load_profile_int_def(profile, section, "ENTRANCE_ID",&(instance->entry_id),0); MESA_load_profile_uint_def(profile, section, "en_sendlog", &instance->en_sendlog, 1); TFE_LOG_INFO(local_logger, "Pangu sendlog : %s", instance->en_sendlog ? "ENABLE" : "DISABLE"); if (!instance->en_sendlog) { return instance; } instance->device_id = (const char *)tfe_bussiness_resouce_get(DEVICE_ID); instance->data_center = (const char *)tfe_bussiness_resouce_get(DATA_CENTER); instance->kafka_logger = (tfe_kafka_logger_t *)tfe_bussiness_resouce_get(KAFKA_LOGGER); if (instance->kafka_logger && !instance->kafka_logger->enable) { TFE_LOG_ERROR(local_logger, "Pangu sendlog ENABLE, but tfe kafka logger DISABLED."); goto error_out; } log_file_upload_para=cache_evbase_parameter_new(profile, section, local_logger); if (log_file_upload_para == NULL) { TFE_LOG_ERROR(local_logger, "Pangu failed to new cache evbase parameter."); goto error_out; } instance->log_file_upload_instance=cache_evbase_instance_new(log_file_upload_para, local_logger); return instance; error_out: free(instance); return NULL; } int pangu_send_log(struct pangu_logger* handle, const struct pangu_log* log_msg) { const struct tfe_http_session* http=log_msg->http; const struct tfe_stream_addr* addr=log_msg->stream->addr; const char* tmp_val=NULL; cJSON *common_obj=NULL, *per_hit_obj=NULL; char* log_payload=NULL; int kafka_status=0; int send_cnt=0; int tmp=0; time_t cur_time; char src_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0}; char dst_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0}; const char *app_proto[]= {"unkonw","http1", "http2"}; const char *manipulate_action_map[]= {"redirect","block","replace","hijack","insert"}; const char *panggu_action_map[__LG_ACTION_MAX]; panggu_action_map[LG_ACTION_MONIT]="monitor"; panggu_action_map[LG_ACTION_REJECT]="deny"; panggu_action_map[LG_ACTION_WHITELIST]="allow"; struct json_spec req_fields[]={ {"http_cookie", TFE_HTTP_COOKIE}, {"http_referer", TFE_HTTP_REFERER}, {"http_user_agent", TFE_HTTP_USER_AGENT} }; struct json_spec resp_fields[]={ {"http_content_type", TFE_HTTP_CONT_TYPE}, {"http_content_length", TFE_HTTP_CONT_LENGTH}, {"http_set_cookie", TFE_HTTP_SET_COOKIE}}; if (!handle->en_sendlog) { return 0; } common_obj=cJSON_CreateObject(); cur_time = time(NULL); cJSON_AddNumberToObject(common_obj, "common_start_time", cur_time); cJSON_AddNumberToObject(common_obj, "common_end_time", cur_time); cJSON_AddStringToObject(common_obj, "http_version", app_proto[http->major_version]); cJSON_AddStringToObject(common_obj, "common_schema_type", "HTTP"); unsigned int common_direction=0; char opt_val[24] = { 0 }; uint16_t opt_out_size; struct tfe_cmsg * cmsg = tfe_stream_get0_cmsg(log_msg->stream); if (cmsg!=NULL) { int ret=tfe_cmsg_get_value(cmsg, TFE_CMSG_STREAM_TRACE_ID, (unsigned char *) opt_val, sizeof(opt_val), &opt_out_size); if (ret==0) { cJSON_AddNumberToObject(common_obj, "common_stream_trace_id", atoll(opt_val)); } ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_COMMON_DIRECTION, (unsigned char *)&common_direction, sizeof(common_direction), &opt_out_size); if (ret==0) { cJSON_AddNumberToObject(common_obj, "common_direction", common_direction); //0:域内->域外,1:域外->域内,描述的是CLIENT_IP信息 } } if (http->req) { char *request_line=NULL; struct tfe_http_req_spec req_spec=http->req->req_spec; asprintf(&request_line, "%s %s HTTP/%d.%d", http_std_method_to_string(req_spec.method), req_spec.url, http->major_version, http->minor_version); cJSON_AddStringToObject(common_obj, "http_request_line", request_line); free(request_line); } if (http->resp) { char *response_line=NULL; struct tfe_http_resp_spec resp_spec=http->resp->resp_spec; asprintf(&response_line, "HTTP/%d.%d %d OK", http->major_version, http->minor_version, resp_spec.resp_code); cJSON_AddStringToObject(common_obj, "http_response_line", response_line); free(response_line); } switch(addr->addrtype) { case TFE_ADDR_STREAM_TUPLE4_V4: cJSON_AddNumberToObject(common_obj, "common_address_type", 4); inet_ntop(AF_INET, &addr->tuple4_v4->saddr, src_ip_str, sizeof(src_ip_str)); inet_ntop(AF_INET, &addr->tuple4_v4->daddr, dst_ip_str, sizeof(dst_ip_str)); cJSON_AddStringToObject(common_obj, "common_client_ip", src_ip_str); cJSON_AddStringToObject(common_obj, "common_server_ip", dst_ip_str); cJSON_AddNumberToObject(common_obj, "common_client_port", ntohs(addr->tuple4_v4->source)); cJSON_AddNumberToObject(common_obj, "common_server_port", ntohs(addr->tuple4_v4->dest)); cJSON_AddStringToObject(common_obj, "common_l4_protocol", "IPv4_TCP"); break; case TFE_ADDR_STREAM_TUPLE4_V6: cJSON_AddNumberToObject(common_obj, "common_address_type", 6); inet_ntop(AF_INET6, &addr->tuple4_v6->saddr, src_ip_str, sizeof(src_ip_str)); inet_ntop(AF_INET6, &addr->tuple4_v6->daddr, dst_ip_str, sizeof(dst_ip_str)); cJSON_AddStringToObject(common_obj, "common_client_ip", src_ip_str); cJSON_AddStringToObject(common_obj, "common_server_ip", dst_ip_str); cJSON_AddNumberToObject(common_obj, "common_client_port", ntohs(addr->tuple4_v6->source)); cJSON_AddNumberToObject(common_obj, "common_server_port", ntohs(addr->tuple4_v6->dest)); cJSON_AddStringToObject(common_obj, "common_l4_protocol", "IPv6_TCP"); break; default: break; } size_t c2s_byte_num = 0, s2c_byte_num =0; tfe_stream_info_get(log_msg->stream, INFO_FROM_DOWNSTREAM_RX_OFFSET, &c2s_byte_num, sizeof(c2s_byte_num)); tfe_stream_info_get(log_msg->stream, INFO_FROM_UPSTREAM_RX_OFFSET, &s2c_byte_num, sizeof(s2c_byte_num)); cJSON_AddNumberToObject(common_obj, "common_link_id", 0); cJSON_AddNumberToObject(common_obj, "common_stream_dir", 3); //1:c2s, 2:s2c, 3:double cJSON_AddStringToObject(common_obj, "common_sled_ip", handle->kafka_logger->local_ip_str); cJSON_AddNumberToObject(common_obj, "common_entrance_id", handle->entry_id); cJSON_AddStringToObject(common_obj, "common_device_id", handle->device_id); cJSON_AddNumberToObject(common_obj, "common_c2s_byte_num", c2s_byte_num); cJSON_AddNumberToObject(common_obj, "common_s2c_byte_num", s2c_byte_num); cJSON_AddStringToObject(common_obj, "http_url", http->req->req_spec.url); cJSON_AddStringToObject(common_obj, "http_host", http->req->req_spec.host); if(handle->data_center) { cJSON_AddStringToObject(common_obj, "common_data_center", handle->data_center); } for(size_t i=0;ireq, req_fields[i].field_id); if(tmp_val!=NULL) { cJSON_AddStringToObject(common_obj,req_fields[i].log_filed_name, tmp_val); } } for(size_t i=0;iresp!=NULL;i++) { tmp_val=tfe_http_std_field_read(http->resp, resp_fields[i].field_id); if(tmp_val!=NULL) { cJSON_AddStringToObject(common_obj,resp_fields[i].log_filed_name, tmp_val); } } char log_file_upload_path[TFE_STRING_MAX]={0}, cont_type_whole[TFE_STRING_MAX]={0}; struct tango_cache_meta_put meta; char* log_file_key=NULL;; const char* cont_type_val; if(log_msg->req_body!=NULL && log_msg->result[0].do_log==1) { memset(&meta, 0, sizeof(meta)); asprintf(&log_file_key, "%s.reqbody", http->req->req_spec.url); meta.url=log_file_key; cont_type_val=tfe_http_std_field_read(http->req, TFE_HTTP_CONT_TYPE); if(cont_type_val!=NULL) { snprintf(cont_type_whole, sizeof(cont_type_whole), "Content-Type:%s", cont_type_val); meta.std_hdr[0]=cont_type_whole; } tmp=cache_evbase_upload_once_evbuf(handle->log_file_upload_instance, NULL, log_msg->req_body, &meta, log_file_upload_path, sizeof(log_file_upload_path)); if(tmp==0) { cJSON_AddStringToObject(common_obj, "http_request_body", log_file_upload_path); } else { TFE_LOG_ERROR(handle->local_logger, "Upload req_body failed."); } free(log_file_key); } if(log_msg->resp_body!=NULL && log_msg->result[0].do_log==1) { memset(&meta, 0, sizeof(meta)); asprintf(&log_file_key, "%s.respbody", http->req->req_spec.url); meta.url=log_file_key; cont_type_val=tfe_http_std_field_read(http->resp, TFE_HTTP_CONT_TYPE); if(cont_type_val!=NULL) { snprintf(cont_type_whole, sizeof(cont_type_whole), "Content-Type:%s", cont_type_val); meta.std_hdr[0]=cont_type_whole; } tmp=cache_evbase_upload_once_evbuf(handle->log_file_upload_instance, NULL, log_msg->resp_body, &meta, log_file_upload_path, sizeof(log_file_upload_path)); if(tmp==0) { cJSON_AddStringToObject(common_obj, "http_response_body", log_file_upload_path); } else { TFE_LOG_ERROR(handle->local_logger, "Upload resp_body failed."); } free(log_file_key); } for(size_t i=0; iresult_num; i++) { TFE_LOG_DEBUG(handle->local_logger, "URL: %s, policy_id: %d, service: %d, do_log:%d", http->req->req_spec.url, log_msg->result[i].config_id, log_msg->result[i].service_id, log_msg->result[i].do_log); if(log_msg->result[i].do_log==0) { continue; } per_hit_obj=cJSON_Duplicate(common_obj, 1); cJSON_AddNumberToObject(per_hit_obj, "common_policy_id", log_msg->result[i].config_id); cJSON_AddNumberToObject(per_hit_obj, "common_service", log_msg->result[i].service_id); cJSON_AddNumberToObject(per_hit_obj, "common_action", LG_ACTION_MANIPULATE); if(log_msg->result[i].action == LG_ACTION_MANIPULATE) { cJSON_AddStringToObject(per_hit_obj, "common_sub_action", manipulate_action_map[log_msg->action]); cJSON_AddNumberToObject(per_hit_obj, "http_action_file_size", log_msg->inject_sz); } else { cJSON_AddStringToObject(per_hit_obj, "common_sub_action", panggu_action_map[(unsigned char)(log_msg->result[i].action)]); } if(log_msg->location_client) { cJSON_AddStringToObject(per_hit_obj, "common_client_location", log_msg->location_client); } if(log_msg->location_server) { cJSON_AddStringToObject(per_hit_obj, "common_server_location", log_msg->location_server); } log_payload = cJSON_PrintUnformatted(per_hit_obj); TFE_LOG_DEBUG(handle->local_logger, "%s", log_payload); kafka_status = tfe_kafka_logger_send(handle->kafka_logger, log_payload, strlen(log_payload)); free(log_payload); cJSON_Delete(per_hit_obj); if(kafka_status<0) { TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error())); } send_cnt++; } cJSON_Delete(common_obj); return send_cnt; }