#include #include #include #include #include #include #include "kafka.h" #include "mpack.h" #include "tsg_proxy_logger.h" #include "tfe_scan.h" struct json_spec { const char *log_filed_name; enum tfe_http_std_field field_id; }; struct proxy_logger { int entry_id; unsigned int en_sendlog; void* local_logger; unsigned long long send_cnt; unsigned long long random_drop; unsigned long long user_abort; char local_log_path[TFE_STRING_MAX]; struct cache_evbase_instance * log_file_upload_instance; }; enum _log_action //Bigger action number is prior. { LG_ACTION_NONE = 0x00, LG_ACTION_MONIT = 0x01, LG_ACTION_FORWARD = 0x02, /* N/A */ LG_ACTION_REJECT = 0x10, LG_ACTION_DROP = 0x20, /* N/A */ LG_ACTION_MANIPULATE = 0x30, LG_ACTION_RATELIMIT = 0x40, /* N/A */ LG_ACTION_WHITELIST = 0x60, LG_ACTION_SHUNT = 0x80, __LG_ACTION_MAX }; #define get_time_ms(tv) ((long long)(tv.tv_sec) * 1000 + (long long)(tv.tv_usec) / 1000) #include "uuid_v4.h" UUIDv4::UUIDGenerator uuidGenerator; void get_http_body_uuid(char *uuid) { UUIDv4::UUID uid = uuidGenerator.getUUID(); uid.str(uuid); return; } size_t file_bucket_upload_once(struct proxy_logger* handle, char *uuid, struct evbuffer *http_body) { mpack_writer_t writer; char *mpack_data=NULL, *data=NULL; size_t mpack_size=0, datalen=0; mpack_writer_init_growable(&writer, &mpack_data, &mpack_size); mpack_build_map(&writer); mpack_write_cstr(&writer, "uuid"); mpack_write_cstr(&writer, uuid); mpack_write_cstr(&writer, "fileType"); mpack_write_cstr(&writer, "txt"); mpack_write_cstr(&writer, "combineMode"); mpack_write_cstr(&writer, "seek"); mpack_write_cstr(&writer, "offset"); mpack_write_u64(&writer, 0); mpack_write_cstr(&writer, "lastChunkFlag"); mpack_write_u32(&writer, 1); datalen = evbuffer_get_length(http_body); if(datalen > 0) { data = (char *)evbuffer_pullup(http_body, datalen); mpack_write_cstr(&writer, "chunk"); mpack_start_bin(&writer, datalen); mpack_write_bytes(&writer, (const char *)data, datalen); mpack_finish_bin(&writer); } mpack_write_cstr(&writer, "length"); mpack_write_u64(&writer, datalen); mpack_complete_map(&writer); // mpack_init_map mpack_error_t errorno=mpack_writer_destroy(&writer); if(errorno!=mpack_ok) { TFE_LOG_ERROR(handle->local_logger, "Mpack writer destroy is error(%s), uuid: %s", mpack_error_to_string(errorno), uuid); } kafka_send2(tfe_get_kafka_handle(), TOPIC_FILE_STREAM, mpack_data, mpack_size); free(mpack_data); mpack_data = NULL; mpack_size = 0; return datalen; } struct proxy_logger* proxy_log_handle_create(const char* profile, const char* section, void* local_logger) { struct proxy_logger* instance=ALLOC(struct proxy_logger,1); instance->local_logger=local_logger; MESA_load_profile_uint_def(profile, section, "en_sendlog", &instance->en_sendlog, 1); TFE_LOG_INFO(local_logger, "Tsg-Pxy sendlog : %s", instance->en_sendlog ? "ENABLE" : "DISABLE"); return instance; } int tfe_get_format_host(cJSON *common_obj, const char *req_spec_host) { unsigned int port; char *format_host=ALLOC(char, strlen(req_spec_host)+1); sscanf(req_spec_host, "%[^:]:%u", format_host, &port); cJSON_AddStringToObject(common_obj, "http_host", format_host); cJSON_AddStringToObject(common_obj, "server_fqdn", format_host); FREE(&format_host); return 0; } int tfe_get_integer_by_cmsg(cJSON *common_obj, struct tfe_cmsg * cmsg, enum tfe_cmsg_tlv_type type, const char *keyword) { uint16_t opt_out_size = 0; unsigned int integer = 0; int ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&integer, sizeof(integer), &opt_out_size); if(ret == 0 && type == TFE_CMSG_COMMON_DIRECTION) { const char *direction = (integer == 69) ? "Outbound" : "Inbound"; cJSON_AddStringToObject(common_obj, keyword, direction); } if (ret == 0 && type != TFE_CMSG_COMMON_DIRECTION) { cJSON_AddNumberToObject(common_obj, keyword, integer); } return 0; } int tfe_get_string_by_cmsg(cJSON *common_obj, struct tfe_cmsg * cmsg, enum tfe_cmsg_tlv_type type, const char *keyword) { char opt_val[128]={0}; uint16_t opt_out_size = 0; int ret=tfe_cmsg_get_value(cmsg, type, (unsigned char *)opt_val, sizeof(opt_val), &opt_out_size); if (ret == 0 && opt_out_size > 0) { cJSON_AddStringToObject(common_obj, keyword, opt_val); } return 0; } size_t tfe_get_c2s_byte_num(const struct tfe_stream *stream, size_t c2s_byte_num) { size_t rewrite_c2s_byte_num = 0; int ret = tfe_stream_info_get(stream, INFO_FROM_DOWNSTREAM_RX_OFFSET, &rewrite_c2s_byte_num, sizeof(rewrite_c2s_byte_num)); if(ret != 0) { rewrite_c2s_byte_num = c2s_byte_num == 0 ? rewrite_c2s_byte_num : c2s_byte_num; } return rewrite_c2s_byte_num; } size_t tfe_get_s2c_byte_num(const struct tfe_stream *stream, size_t s2c_byte_num) { size_t ret=0, rewrite_s2c_byte_num =0; ret = tfe_stream_info_get(stream, INFO_FROM_UPSTREAM_RX_OFFSET, &rewrite_s2c_byte_num, sizeof(rewrite_s2c_byte_num)); if(ret !=0) { rewrite_s2c_byte_num = s2c_byte_num == 0 ? rewrite_s2c_byte_num : s2c_byte_num; } return rewrite_s2c_byte_num; } int tfe_upload_http_body(struct proxy_logger* handle, cJSON *common_obj, struct evbuffer *http_body, char *uuid, const char *keyword) { size_t datalen=0; if(uuid[0] != '\0') { cJSON_AddStringToObject(common_obj, keyword, uuid); } else { get_http_body_uuid(uuid); datalen=file_bucket_upload_once(handle, uuid, http_body); if(datalen>0) { cJSON_AddStringToObject(common_obj, keyword, uuid); } else { TFE_LOG_ERROR(handle->local_logger, "Upload %s failed.", keyword); } } return 0; } int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) { const struct tfe_http_session* http=log_msg->http; const struct tfe_stream_addr* addr=log_msg->stream->addr; const char* tmp_val=NULL; cJSON *common_obj=NULL, *per_hit_obj=NULL; char* log_payload=NULL; int send_cnt=0; struct timeval cur_time; char src_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0}; char dst_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0}; const char *app_proto[]= {"unkonw","http1", "http2"}; const char *manipulate_action_map[]= {"redirect","block","replace","hijack","insert","edit_element","run_script"}; const char *panggu_action_map[__LG_ACTION_MAX]; panggu_action_map[LG_ACTION_MONIT]="monitor"; panggu_action_map[LG_ACTION_REJECT]="deny"; panggu_action_map[LG_ACTION_WHITELIST]="allow"; struct json_spec req_fields[]={ {"http_cookie", TFE_HTTP_COOKIE}, {"http_referer", TFE_HTTP_REFERER}, {"http_user_agent", TFE_HTTP_USER_AGENT}, {"http_request_content_type", TFE_HTTP_CONT_TYPE}, {"http_request_content_length", TFE_HTTP_CONT_LENGTH}}; struct json_spec resp_fields[]={ {"http_response_content_type", TFE_HTTP_CONT_TYPE}, {"http_response_content_length", TFE_HTTP_CONT_LENGTH}, {"http_set_cookie", TFE_HTTP_SET_COOKIE}}; if (!handle->en_sendlog) { return 0; } common_obj=cJSON_CreateObject(); gettimeofday(&cur_time, NULL); cJSON_AddNumberToObject(common_obj, "start_timestamp_ms", get_time_ms(http->start_time)); cJSON_AddNumberToObject(common_obj, "end_timestamp_ms", get_time_ms(cur_time)); struct tfe_cmsg *cmsg = tfe_stream_get0_cmsg(log_msg->stream); if (cmsg != NULL) { tfe_get_string_by_cmsg(common_obj, cmsg, TFE_CMSG_STREAM_TRACE_ID, "session_id"); tfe_get_string_by_cmsg(common_obj, cmsg, TFE_CMSG_SRC_SUB_ID, "subscriber_id"); tfe_get_string_by_cmsg(common_obj, cmsg, TFE_CMSG_SRC_IMSI_STR, "imsi"); tfe_get_string_by_cmsg(common_obj, cmsg, TFE_CMSG_SRC_IMEI_STR, "imei"); tfe_get_string_by_cmsg(common_obj, cmsg, TFE_CMSG_SRC_PHONE_NUM_STR, "phone_number"); tfe_get_string_by_cmsg(common_obj, cmsg, TFE_CMSG_SRC_APN_STR, "apn"); tfe_get_integer_by_cmsg(common_obj, cmsg, TFE_CMSG_INCOMING_LINK_ID, "in_link_id"); tfe_get_integer_by_cmsg(common_obj, cmsg, TFE_CMSG_OUTGOING_LINK_ID, "out_link_id"); tfe_get_integer_by_cmsg(common_obj, cmsg, TFE_CMSG_COMMON_DIRECTION, "direction"); } tfe_get_library_tags(log_msg->stream, common_obj, TFE_CMSG_SRC_IP_TAGS_IDS_STR, "client_ip_tags"); tfe_get_library_tags(log_msg->stream, common_obj, TFE_CMSG_DST_IP_TAGS_IDS_STR, "server_ip_tags"); tfe_get_library_tags(log_msg->stream, common_obj, TFE_CMSG_FQDN_TAGS_IDS_STR, "server_fqdn_tags"); if (http->req) { char *request_line=NULL; struct tfe_http_req_spec req_spec=http->req->req_spec; asprintf(&request_line, "%s %s HTTP/%d.%d", http_std_method_to_string(req_spec.method), req_spec.url, http->req->major_version, http->req->minor_version); cJSON_AddStringToObject(common_obj, "http_request_line", request_line); free(request_line); } if (http->resp) { char *response_line=NULL; struct tfe_http_resp_spec resp_spec=http->resp->resp_spec; asprintf(&response_line, "HTTP/%d.%d %d OK", http->major_version, http->minor_version, resp_spec.resp_code); cJSON_AddStringToObject(common_obj, "http_response_line", response_line); cJSON_AddNumberToObject(common_obj, "http_status_code", resp_spec.resp_code); free(response_line); } switch(addr->addrtype) { case TFE_ADDR_STREAM_TUPLE4_V4: cJSON_AddNumberToObject(common_obj, "address_type", 4); inet_ntop(AF_INET, &addr->tuple4_v4->saddr, src_ip_str, sizeof(src_ip_str)); inet_ntop(AF_INET, &addr->tuple4_v4->daddr, dst_ip_str, sizeof(dst_ip_str)); cJSON_AddStringToObject(common_obj, "client_ip", src_ip_str); cJSON_AddStringToObject(common_obj, "server_ip", dst_ip_str); cJSON_AddNumberToObject(common_obj, "client_port", ntohs(addr->tuple4_v4->source)); cJSON_AddNumberToObject(common_obj, "server_port", ntohs(addr->tuple4_v4->dest)); break; case TFE_ADDR_STREAM_TUPLE4_V6: cJSON_AddNumberToObject(common_obj, "address_type", 6); inet_ntop(AF_INET6, &addr->tuple4_v6->saddr, src_ip_str, sizeof(src_ip_str)); inet_ntop(AF_INET6, &addr->tuple4_v6->daddr, dst_ip_str, sizeof(dst_ip_str)); cJSON_AddStringToObject(common_obj, "client_ip", src_ip_str); cJSON_AddStringToObject(common_obj, "server_ip", dst_ip_str); cJSON_AddNumberToObject(common_obj, "client_port", ntohs(addr->tuple4_v6->source)); cJSON_AddNumberToObject(common_obj, "server_port", ntohs(addr->tuple4_v6->dest)); break; default: break; } cJSON_AddStringToObject(common_obj, "http_version", app_proto[http->major_version]); cJSON_AddStringToObject(common_obj, "decoded_as", "HTTP"); cJSON_AddStringToObject(common_obj, "ip_protocol", "tcp"); cJSON_AddStringToObject(common_obj, "sled_ip", tfe_get_sled_ip()); cJSON_AddNumberToObject(common_obj, "t_vsys_id", tfe_get_vsys_id()); cJSON_AddStringToObject(common_obj, "device_id", tfe_get_device_id()); cJSON_AddNumberToObject(common_obj, "sent_bytes", tfe_get_c2s_byte_num(log_msg->stream, log_msg->c2s_byte_num)); cJSON_AddNumberToObject(common_obj, "received_bytes", tfe_get_s2c_byte_num(log_msg->stream, log_msg->s2c_byte_num)); cJSON_AddStringToObject(common_obj, "http_url", http->req->req_spec.url); tfe_get_format_host(common_obj, http->req->req_spec.host); if (tfe_get_device_tag()) { cJSON_AddStringToObject(common_obj, "device_tag", tfe_get_device_tag()); } for(size_t i=0;ireq, req_fields[i].field_id); if(tmp_val!=NULL) { cJSON_AddStringToObject(common_obj,req_fields[i].log_filed_name, tmp_val); } } for(size_t i=0;iresp!=NULL;i++) { tmp_val=tfe_http_std_field_read(http->resp, resp_fields[i].field_id); if(tmp_val!=NULL) { cJSON_AddStringToObject(common_obj,resp_fields[i].log_filed_name, tmp_val); } } #define FILE_CHUNK_UUID_LEN 40 char http_req_uuid[FILE_CHUNK_UUID_LEN]={0}; char http_resp_uuid[FILE_CHUNK_UUID_LEN]={0}; for(size_t i=0; iresult_num; i++) { if(log_msg->result[i].do_log!=1) continue; if(log_msg->req_body!=NULL) { tfe_upload_http_body(handle, common_obj, log_msg->req_body, http_req_uuid, "http_request_body"); } if(log_msg->resp_body!=NULL) { tfe_upload_http_body(handle, common_obj, log_msg->resp_body, http_resp_uuid, "http_response_body"); } } int j=0, enable_monit=0; int monit_config_id[16]={0}; for(size_t i=0; iresult_num; i++) { if(log_msg->result[i].action == LG_ACTION_MONIT) { monit_config_id[j]=log_msg->result[i].config_id; j++; } } for(size_t i=0; iresult_num; i++) { TFE_LOG_DEBUG(handle->local_logger, "URL: %s, policy_id: %lld, service: %d, do_log:%d", http->req->req_spec.url, log_msg->result[i].config_id, log_msg->result[i].service_id, log_msg->result[i].do_log); if(log_msg->result[i].do_log==0) { continue; } if(log_msg->result[i].action == LG_ACTION_MONIT && enable_monit==1) { continue; } cJSON *proxy_rule_list=NULL; int config_id[1]={0}; per_hit_obj=cJSON_Duplicate(common_obj, 1); if(log_msg->result[i].action == LG_ACTION_MONIT) { proxy_rule_list = cJSON_CreateIntArray(monit_config_id, j); enable_monit=1; } else { config_id[0]=log_msg->result[i].config_id; proxy_rule_list = cJSON_CreateIntArray(config_id, 1); } cJSON_AddItemToObject(per_hit_obj, "proxy_rule_list", proxy_rule_list); cJSON_AddNumberToObject(per_hit_obj, "vsys_id", log_msg->result[i].vsys_id); if(log_msg->result[i].action == LG_ACTION_MANIPULATE) { cJSON_AddStringToObject(per_hit_obj, "proxy_action", manipulate_action_map[log_msg->action]); cJSON_AddNumberToObject(per_hit_obj, "http_action_file_size", log_msg->inject_sz); } else { cJSON_AddStringToObject(per_hit_obj, "proxy_action", panggu_action_map[(unsigned char)(log_msg->result[i].action)]); } log_payload = cJSON_PrintUnformatted(per_hit_obj); TFE_LOG_DEBUG(handle->local_logger, "%s", log_payload); if (kafka_send(tfe_get_kafka_handle(), TOPIC_PROXY_EVENT, log_payload, strlen(log_payload)) == 0) { send_cnt++; } free(log_payload); cJSON_Delete(per_hit_obj); } cJSON_Delete(common_obj); return send_cnt; }