#include #include #include #include #include #include #include "kafka.h" #include "mpack.h" #include "tsg_proxy_logger.h" struct json_spec { const char *log_filed_name; enum tfe_http_std_field field_id; }; struct proxy_logger { int entry_id; unsigned int en_sendlog; void* local_logger; unsigned long long send_cnt; unsigned long long random_drop; unsigned long long user_abort; char local_log_path[TFE_STRING_MAX]; struct cache_evbase_instance * log_file_upload_instance; }; enum _log_action //Bigger action number is prior. { LG_ACTION_NONE = 0x00, LG_ACTION_MONIT = 0x01, LG_ACTION_FORWARD = 0x02, /* N/A */ LG_ACTION_REJECT = 0x10, LG_ACTION_DROP = 0x20, /* N/A */ LG_ACTION_MANIPULATE = 0x30, LG_ACTION_RATELIMIT = 0x40, /* N/A */ LG_ACTION_WHITELIST = 0x60, LG_ACTION_SHUNT = 0x80, __LG_ACTION_MAX }; #define get_time_ms(tv) ((long long)(tv.tv_sec) * 1000 + (long long)(tv.tv_usec) / 1000) #include "uuid_v4.h" UUIDv4::UUIDGenerator uuidGenerator; void get_http_body_uuid(char *uuid) { UUIDv4::UUID uid = uuidGenerator.getUUID(); uid.str(uuid); return; } size_t file_bucket_upload_once(struct proxy_logger* handle, char *uuid, struct evbuffer *http_body) { mpack_writer_t writer; char *mpack_data=NULL, *data=NULL; size_t mpack_size=0, datalen=0; mpack_writer_init_growable(&writer, &mpack_data, &mpack_size); mpack_build_map(&writer); mpack_write_cstr(&writer, "uuid"); mpack_write_cstr(&writer, uuid); mpack_write_cstr(&writer, "fileType"); mpack_write_cstr(&writer, "txt"); mpack_write_cstr(&writer, "combineMode"); mpack_write_cstr(&writer, "seek"); mpack_write_cstr(&writer, "offset"); mpack_write_u64(&writer, 0); mpack_write_cstr(&writer, "lastChunkFlag"); mpack_write_u32(&writer, 1); datalen = evbuffer_get_length(http_body); if(datalen > 0) { data = (char *)evbuffer_pullup(http_body, datalen); mpack_write_cstr(&writer, "chunk"); mpack_start_bin(&writer, datalen); mpack_write_bytes(&writer, (const char *)data, datalen); mpack_finish_bin(&writer); } mpack_write_cstr(&writer, "length"); mpack_write_u64(&writer, datalen); mpack_complete_map(&writer); // mpack_init_map mpack_error_t errorno=mpack_writer_destroy(&writer); if(errorno!=mpack_ok) { TFE_LOG_ERROR(handle->local_logger, "Mpack writer destroy is error(%s), uuid: %s", mpack_error_to_string(errorno), uuid); } kafka_send2(tfe_get_kafka_handle(), TOPIC_FILE_STREAM, mpack_data, mpack_size); free(mpack_data); mpack_data = NULL; mpack_size = 0; return datalen; } struct proxy_logger* proxy_log_handle_create(const char* profile, const char* section, void* local_logger) { struct proxy_logger* instance=ALLOC(struct proxy_logger,1); instance->local_logger=local_logger; MESA_load_profile_uint_def(profile, section, "en_sendlog", &instance->en_sendlog, 1); TFE_LOG_INFO(local_logger, "Tsg-Pxy sendlog : %s", instance->en_sendlog ? "ENABLE" : "DISABLE"); return instance; } static int get_ip_client_geolocation(struct tfe_cmsg * cmsg, cJSON *per_hit_obj) { unsigned int i=0, j=0; char opt_val[128]={0}; uint16_t opt_out_size; const char *client_geo_area_map[] = {"client_country","client_super_administrative_area","client_administrative_area","client_sub_administrative_area"}; for(i=TFE_CMSG_SRC_REGION_STR; i <= TFE_CMSG_DST_SUBDIVISION_STR; i+=2) { memset(opt_val, 0, sizeof(opt_val)); int ret = tfe_cmsg_get_value(cmsg, (enum tfe_cmsg_tlv_type)i, (unsigned char *)opt_val, sizeof(opt_val), &opt_out_size); if (ret == 0) { cJSON_AddStringToObject(per_hit_obj, client_geo_area_map[j], opt_val); } j++; } return 0; } static int get_ip_server_geolocation(struct tfe_cmsg * cmsg, cJSON *per_hit_obj) { unsigned int i=0, j=0; char opt_val[128]={0}; uint16_t opt_out_size; const char *server_geo_area_map[] = {"server_country","server_super_administrative_area","server_administrative_area","server_sub_administrative_area"}; for(i=TFE_CMSG_DST_REGION_STR; i <= TFE_CMSG_DST_SUBDIVISION_STR; i+=2) { memset(opt_val, 0, sizeof(opt_val)); int ret = tfe_cmsg_get_value(cmsg, (enum tfe_cmsg_tlv_type)i, (unsigned char *)opt_val, sizeof(opt_val), &opt_out_size); if (ret == 0) { cJSON_AddStringToObject(per_hit_obj, server_geo_area_map[j], opt_val); } j++; } return 0; } int proxy_add_host_to_object(cJSON *common_obj, const char *req_spec_host) { unsigned int port; char *format_host=ALLOC(char, strlen(req_spec_host)+1); sscanf(req_spec_host, "%[^:]:%u", format_host, &port); cJSON_AddStringToObject(common_obj, "http_host", format_host); cJSON_AddStringToObject(common_obj, "server_fqdn", format_host); FREE(&format_host); return 0; } int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) { const struct tfe_http_session* http=log_msg->http; const struct tfe_stream_addr* addr=log_msg->stream->addr; const char* tmp_val=NULL; cJSON *common_obj=NULL, *per_hit_obj=NULL; char* log_payload=NULL; int send_cnt=0; struct timeval cur_time; char src_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0}; char dst_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0}; const char *app_proto[]= {"unkonw","http1", "http2"}; const char *manipulate_action_map[]= {"redirect","block","replace","hijack","insert","edit_element","run_script"}; const char *panggu_action_map[__LG_ACTION_MAX]; panggu_action_map[LG_ACTION_MONIT]="monitor"; panggu_action_map[LG_ACTION_REJECT]="deny"; panggu_action_map[LG_ACTION_WHITELIST]="allow"; struct json_spec req_fields[]={ {"http_cookie", TFE_HTTP_COOKIE}, {"http_referer", TFE_HTTP_REFERER}, {"http_user_agent", TFE_HTTP_USER_AGENT}, {"http_request_content_type", TFE_HTTP_CONT_TYPE}, {"http_request_content_length", TFE_HTTP_CONT_LENGTH}}; struct json_spec resp_fields[]={ {"http_response_content_type", TFE_HTTP_CONT_TYPE}, {"http_response_content_length", TFE_HTTP_CONT_LENGTH}, {"http_set_cookie", TFE_HTTP_SET_COOKIE}}; if (!handle->en_sendlog) { return 0; } common_obj=cJSON_CreateObject(); gettimeofday(&cur_time, NULL); cJSON_AddNumberToObject(common_obj, "start_timestamp_ms", get_time_ms(http->start_time)); cJSON_AddNumberToObject(common_obj, "end_timestamp_ms", get_time_ms(cur_time)); char source_subscribe_id[64]={0}; char opt_val[24]={0}; uint16_t opt_out_size; struct tfe_cmsg * cmsg = tfe_stream_get0_cmsg(log_msg->stream); if (cmsg!=NULL) { int ret=tfe_cmsg_get_value(cmsg, TFE_CMSG_STREAM_TRACE_ID, (unsigned char *) opt_val, sizeof(opt_val), &opt_out_size); if (ret==0) { cJSON_AddStringToObject(common_obj, "session_id", opt_val); } ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_SRC_SUB_ID, (unsigned char *)source_subscribe_id, sizeof(source_subscribe_id), &opt_out_size); if (ret==0) { cJSON_AddStringToObject(common_obj, "subscriber_id", source_subscribe_id); } } if (http->req) { char *request_line=NULL; struct tfe_http_req_spec req_spec=http->req->req_spec; asprintf(&request_line, "%s %s HTTP/%d.%d", http_std_method_to_string(req_spec.method), req_spec.url, http->major_version, http->minor_version); cJSON_AddStringToObject(common_obj, "http_request_line", request_line); free(request_line); } if (http->resp) { char *response_line=NULL; struct tfe_http_resp_spec resp_spec=http->resp->resp_spec; asprintf(&response_line, "HTTP/%d.%d %d OK", http->major_version, http->minor_version, resp_spec.resp_code); cJSON_AddStringToObject(common_obj, "http_response_line", response_line); cJSON_AddNumberToObject(common_obj, "http_status_code", resp_spec.resp_code); free(response_line); } switch(addr->addrtype) { case TFE_ADDR_STREAM_TUPLE4_V4: cJSON_AddNumberToObject(common_obj, "address_type", 4); inet_ntop(AF_INET, &addr->tuple4_v4->saddr, src_ip_str, sizeof(src_ip_str)); inet_ntop(AF_INET, &addr->tuple4_v4->daddr, dst_ip_str, sizeof(dst_ip_str)); cJSON_AddStringToObject(common_obj, "client_ip", src_ip_str); cJSON_AddStringToObject(common_obj, "server_ip", dst_ip_str); cJSON_AddNumberToObject(common_obj, "client_port", ntohs(addr->tuple4_v4->source)); cJSON_AddNumberToObject(common_obj, "server_port", ntohs(addr->tuple4_v4->dest)); break; case TFE_ADDR_STREAM_TUPLE4_V6: cJSON_AddNumberToObject(common_obj, "address_type", 6); inet_ntop(AF_INET6, &addr->tuple4_v6->saddr, src_ip_str, sizeof(src_ip_str)); inet_ntop(AF_INET6, &addr->tuple4_v6->daddr, dst_ip_str, sizeof(dst_ip_str)); cJSON_AddStringToObject(common_obj, "client_ip", src_ip_str); cJSON_AddStringToObject(common_obj, "server_ip", dst_ip_str); cJSON_AddNumberToObject(common_obj, "client_port", ntohs(addr->tuple4_v6->source)); cJSON_AddNumberToObject(common_obj, "server_port", ntohs(addr->tuple4_v6->dest)); break; default: break; } size_t ret=0, c2s_byte_num = 0, s2c_byte_num =0; ret = tfe_stream_info_get(log_msg->stream, INFO_FROM_DOWNSTREAM_RX_OFFSET, &c2s_byte_num, sizeof(c2s_byte_num)); if(ret != 0) { c2s_byte_num = log_msg->c2s_byte_num == 0 ? c2s_byte_num : log_msg->c2s_byte_num; } ret = tfe_stream_info_get(log_msg->stream, INFO_FROM_UPSTREAM_RX_OFFSET, &s2c_byte_num, sizeof(s2c_byte_num)); if(ret !=0) { s2c_byte_num = log_msg->s2c_byte_num == 0 ? s2c_byte_num : log_msg->s2c_byte_num; } cJSON_AddStringToObject(common_obj, "http_version", app_proto[http->major_version]); cJSON_AddStringToObject(common_obj, "decoded_as", "HTTP"); cJSON_AddStringToObject(common_obj, "ip_protocol", "tcp"); cJSON_AddNumberToObject(common_obj, "out_link_id", 0); cJSON_AddNumberToObject(common_obj, "in_link_id", 0); cJSON_AddStringToObject(common_obj, "sled_ip", tfe_get_sled_ip()); cJSON_AddNumberToObject(common_obj, "t_vsys_id", tfe_get_vsys_id()); cJSON_AddStringToObject(common_obj, "device_id", tfe_get_device_id()); cJSON_AddNumberToObject(common_obj, "sent_bytes", c2s_byte_num); cJSON_AddNumberToObject(common_obj, "received_bytes", s2c_byte_num); cJSON_AddStringToObject(common_obj, "http_url", http->req->req_spec.url); proxy_add_host_to_object(common_obj, http->req->req_spec.host); if (tfe_get_device_tag()) { cJSON_AddStringToObject(common_obj, "device_tag", tfe_get_device_tag()); } for(size_t i=0;ireq, req_fields[i].field_id); if(tmp_val!=NULL) { cJSON_AddStringToObject(common_obj,req_fields[i].log_filed_name, tmp_val); } } for(size_t i=0;iresp!=NULL;i++) { tmp_val=tfe_http_std_field_read(http->resp, resp_fields[i].field_id); if(tmp_val!=NULL) { cJSON_AddStringToObject(common_obj,resp_fields[i].log_filed_name, tmp_val); } } #define FILE_CHUNK_UUID_LEN 40 char uuid[FILE_CHUNK_UUID_LEN]={0}; size_t datalen=0; for(size_t i=0; iresult_num; i++) { if(log_msg->result[i].do_log!=1) continue; if(log_msg->req_body!=NULL) { if(uuid[0] != '\0') { cJSON_AddStringToObject(common_obj, "http_request_body", uuid); } else { get_http_body_uuid(uuid); datalen=file_bucket_upload_once(handle, uuid, log_msg->req_body); if(datalen>0) { cJSON_AddStringToObject(common_obj, "http_request_body", uuid); } else { TFE_LOG_ERROR(handle->local_logger, "Upload req_body failed."); } } } if(log_msg->resp_body!=NULL) { if(uuid[0] != '\0') { cJSON_AddStringToObject(common_obj, "http_response_body", uuid); } else { get_http_body_uuid(uuid); datalen=file_bucket_upload_once(handle, uuid, log_msg->resp_body); if(datalen>0) { cJSON_AddStringToObject(common_obj, "http_response_body", uuid); } else { TFE_LOG_ERROR(handle->local_logger, "Upload resp_body failed."); } } } } int j=0, enable_monit=0; int monit_config_id[16]={0}; for(size_t i=0; iresult_num; i++) { if(log_msg->result[i].action == LG_ACTION_MONIT) { monit_config_id[j]=log_msg->result[i].config_id; j++; } } for(size_t i=0; iresult_num; i++) { TFE_LOG_DEBUG(handle->local_logger, "URL: %s, policy_id: %lld, service: %d, do_log:%d", http->req->req_spec.url, log_msg->result[i].config_id, log_msg->result[i].service_id, log_msg->result[i].do_log); if(log_msg->result[i].do_log==0) { continue; } if(log_msg->result[i].action == LG_ACTION_MONIT && enable_monit==1) { continue; } cJSON *proxy_rule_list=NULL; int config_id[1]={0}; per_hit_obj=cJSON_Duplicate(common_obj, 1); if(log_msg->result[i].action == LG_ACTION_MONIT) { proxy_rule_list = cJSON_CreateIntArray(monit_config_id, j); enable_monit=1; } else { config_id[0]=log_msg->result[i].config_id; proxy_rule_list = cJSON_CreateIntArray(config_id, 1); } cJSON_AddItemToObject(per_hit_obj, "proxy_rule_list", proxy_rule_list); cJSON_AddNumberToObject(per_hit_obj, "vsys_id", log_msg->result[i].vsys_id); if(log_msg->result[i].action == LG_ACTION_MANIPULATE) { cJSON_AddStringToObject(per_hit_obj, "proxy_action", manipulate_action_map[log_msg->action]); cJSON_AddNumberToObject(per_hit_obj, "http_action_file_size", log_msg->inject_sz); } else { cJSON_AddStringToObject(per_hit_obj, "proxy_action", panggu_action_map[(unsigned char)(log_msg->result[i].action)]); } if (cmsg!=NULL) { uint64_t src_asn=0, dst_asn=0; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_SRC_ASN_VAL, (unsigned char *)&src_asn, sizeof(src_asn), &opt_out_size); if (ret == 0) { cJSON_AddNumberToObject(per_hit_obj, "client_asn", src_asn); } ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_DST_ASN_VAL, (unsigned char *)&dst_asn, sizeof(dst_asn), &opt_out_size); if (ret == 0) { cJSON_AddNumberToObject(per_hit_obj, "server_asn", dst_asn); } get_ip_client_geolocation(cmsg, per_hit_obj); get_ip_server_geolocation(cmsg, per_hit_obj); } log_payload = cJSON_PrintUnformatted(per_hit_obj); TFE_LOG_DEBUG(handle->local_logger, "%s", log_payload); if (kafka_send(tfe_get_kafka_handle(), TOPIC_PROXY_EVENT, log_payload, strlen(log_payload)) == 0) { send_cnt++; } free(log_payload); cJSON_Delete(per_hit_obj); } cJSON_Delete(common_obj); return send_cnt; }