#include #include #include #include #include #include #include #include "mpack.h" #include "tsg_proxy_logger.h" struct json_spec { const char *log_filed_name; enum tfe_http_std_field field_id; }; struct proxy_logger { int entry_id; unsigned int en_sendlog; const char *device_id; const char *effective_device_tag; void* local_logger; unsigned long long send_cnt; unsigned long long random_drop; unsigned long long user_abort; char local_log_path[TFE_STRING_MAX]; tfe_kafka_logger_t *kafka_logger; struct cache_evbase_instance * log_file_upload_instance; }; enum _log_action //Bigger action number is prior. { LG_ACTION_NONE = 0x00, LG_ACTION_MONIT = 0x01, LG_ACTION_FORWARD = 0x02, /* N/A */ LG_ACTION_REJECT = 0x10, LG_ACTION_DROP = 0x20, /* N/A */ LG_ACTION_MANIPULATE = 0x30, LG_ACTION_RATELIMIT = 0x40, /* N/A */ LG_ACTION_WHITELIST = 0x60, LG_ACTION_SHUNT = 0x80, __LG_ACTION_MAX }; #define get_time_ms(tv) ((long long)(tv.tv_sec) * 1000 + (long long)(tv.tv_usec) / 1000) #include "uuid_v4.h" UUIDv4::UUIDGenerator uuidGenerator; void get_http_body_uuid(char *uuid) { UUIDv4::UUID uid = uuidGenerator.getUUID(); uid.str(uuid); return; } size_t file_bucket_upload_once(struct proxy_logger* handle, char *uuid, struct evbuffer *http_body) { int kafka_status=0; mpack_writer_t writer; char *mpack_data=NULL, *data=NULL; size_t mpack_size=0, datalen=0; mpack_writer_init_growable(&writer, &mpack_data, &mpack_size); mpack_build_map(&writer); mpack_write_cstr(&writer, "uuid"); mpack_write_cstr(&writer, uuid); mpack_write_cstr(&writer, "fileType"); mpack_write_cstr(&writer, "txt"); mpack_write_cstr(&writer, "combineMode"); mpack_write_cstr(&writer, "seek"); mpack_write_cstr(&writer, "offset"); mpack_write_u64(&writer, 0); mpack_write_cstr(&writer, "lastChunkFlag"); mpack_write_u32(&writer, 1); datalen = evbuffer_get_length(http_body); if(datalen > 0) { data = (char *)evbuffer_pullup(http_body, datalen); mpack_write_cstr(&writer, "chunk"); mpack_start_bin(&writer, datalen); mpack_write_bytes(&writer, (const char *)data, datalen); mpack_finish_bin(&writer); } mpack_write_cstr(&writer, "length"); mpack_write_u64(&writer, datalen); mpack_complete_map(&writer); // mpack_init_map mpack_error_t errorno=mpack_writer_destroy(&writer); if(errorno!=mpack_ok) { TFE_LOG_ERROR(handle->local_logger, "Mpack writer destroy is error(%s), uuid: %s", mpack_error_to_string(errorno), uuid); } kafka_status = tfe_kafka_logger_send(handle->kafka_logger, TOPIC_BUCKET, mpack_data, mpack_size); if(kafka_status<0) { TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error())); } free(mpack_data); mpack_data = NULL; mpack_size = 0; return datalen; } struct proxy_logger* proxy_log_handle_create(const char* profile, const char* section, void* local_logger) { struct proxy_logger* instance=ALLOC(struct proxy_logger,1); instance->local_logger=local_logger; TFE_LOG_INFO(local_logger,"Tsg-Pxy log is inititating from %s section %s.", profile, section); MESA_load_profile_uint_def(profile, section, "en_sendlog", &instance->en_sendlog, 1); TFE_LOG_INFO(local_logger, "Tsg-Pxy sendlog : %s", instance->en_sendlog ? "ENABLE" : "DISABLE"); if (!instance->en_sendlog) { return instance; } instance->device_id = (const char *)tfe_bussiness_resouce_get(DEVICE_ID); instance->effective_device_tag = (const char *)tfe_bussiness_resouce_get(EFFECTIVE_DEVICE_TAG); instance->kafka_logger = (tfe_kafka_logger_t *)tfe_bussiness_resouce_get(KAFKA_LOGGER); if (instance->kafka_logger && !instance->kafka_logger->enable) { TFE_LOG_ERROR(local_logger, "Tsg-Pxy sendlog ENABLE, but tfe kafka logger DISABLED."); goto error_out; } return instance; error_out: free(instance); return NULL; } int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) { const struct tfe_http_session* http=log_msg->http; const struct tfe_stream_addr* addr=log_msg->stream->addr; const char* tmp_val=NULL; cJSON *common_obj=NULL, *per_hit_obj=NULL; char* log_payload=NULL; int kafka_status=0; int send_cnt=0; struct timeval cur_time; char src_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0}; char dst_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0}; const char *app_proto[]= {"unkonw","http1", "http2"}; const char *manipulate_action_map[]= {"redirect","block","replace","hijack","insert","edit_element","run_script"}; const char *panggu_action_map[__LG_ACTION_MAX]; panggu_action_map[LG_ACTION_MONIT]="monitor"; panggu_action_map[LG_ACTION_REJECT]="deny"; panggu_action_map[LG_ACTION_WHITELIST]="allow"; struct json_spec req_fields[]={ {"http_cookie", TFE_HTTP_COOKIE}, {"http_referer", TFE_HTTP_REFERER}, {"http_user_agent", TFE_HTTP_USER_AGENT}, {"http_request_content_type", TFE_HTTP_CONT_TYPE}, {"http_request_content_length", TFE_HTTP_CONT_LENGTH}}; struct json_spec resp_fields[]={ {"http_response_content_type", TFE_HTTP_CONT_TYPE}, {"http_response_content_length", TFE_HTTP_CONT_LENGTH}, {"http_set_cookie", TFE_HTTP_SET_COOKIE}}; if (!handle->en_sendlog) { return 0; } common_obj=cJSON_CreateObject(); gettimeofday(&cur_time, NULL); cJSON_AddNumberToObject(common_obj, "start_timestamp_ms", get_time_ms(http->start_time)); cJSON_AddNumberToObject(common_obj, "end_timestamp_ms", get_time_ms(cur_time)); char source_subscribe_id[64]={0}; char opt_val[24]={0}; uint16_t opt_out_size; struct tfe_cmsg * cmsg = tfe_stream_get0_cmsg(log_msg->stream); if (cmsg!=NULL) { int ret=tfe_cmsg_get_value(cmsg, TFE_CMSG_STREAM_TRACE_ID, (unsigned char *) opt_val, sizeof(opt_val), &opt_out_size); if (ret==0) { cJSON_AddStringToObject(common_obj, "session_id", opt_val); } ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_SRC_SUB_ID, (unsigned char *)source_subscribe_id, sizeof(source_subscribe_id), &opt_out_size); if (ret==0) { cJSON_AddStringToObject(common_obj, "subscriber_id", source_subscribe_id); } } if (http->req) { char *request_line=NULL; struct tfe_http_req_spec req_spec=http->req->req_spec; asprintf(&request_line, "%s %s HTTP/%d.%d", http_std_method_to_string(req_spec.method), req_spec.url, http->major_version, http->minor_version); cJSON_AddStringToObject(common_obj, "http_request_line", request_line); free(request_line); } if (http->resp) { char *response_line=NULL; struct tfe_http_resp_spec resp_spec=http->resp->resp_spec; asprintf(&response_line, "HTTP/%d.%d %d OK", http->major_version, http->minor_version, resp_spec.resp_code); cJSON_AddStringToObject(common_obj, "http_response_line", response_line); cJSON_AddNumberToObject(common_obj, "http_status_code", resp_spec.resp_code); free(response_line); } switch(addr->addrtype) { case TFE_ADDR_STREAM_TUPLE4_V4: cJSON_AddNumberToObject(common_obj, "address_type", 4); inet_ntop(AF_INET, &addr->tuple4_v4->saddr, src_ip_str, sizeof(src_ip_str)); inet_ntop(AF_INET, &addr->tuple4_v4->daddr, dst_ip_str, sizeof(dst_ip_str)); cJSON_AddStringToObject(common_obj, "client_ip", src_ip_str); cJSON_AddStringToObject(common_obj, "server_ip", dst_ip_str); cJSON_AddNumberToObject(common_obj, "client_port", ntohs(addr->tuple4_v4->source)); cJSON_AddNumberToObject(common_obj, "server_port", ntohs(addr->tuple4_v4->dest)); break; case TFE_ADDR_STREAM_TUPLE4_V6: cJSON_AddNumberToObject(common_obj, "address_type", 6); inet_ntop(AF_INET6, &addr->tuple4_v6->saddr, src_ip_str, sizeof(src_ip_str)); inet_ntop(AF_INET6, &addr->tuple4_v6->daddr, dst_ip_str, sizeof(dst_ip_str)); cJSON_AddStringToObject(common_obj, "client_ip", src_ip_str); cJSON_AddStringToObject(common_obj, "server_ip", dst_ip_str); cJSON_AddNumberToObject(common_obj, "client_port", ntohs(addr->tuple4_v6->source)); cJSON_AddNumberToObject(common_obj, "server_port", ntohs(addr->tuple4_v6->dest)); break; default: break; } size_t ret=0, c2s_byte_num = 0, s2c_byte_num =0; ret = tfe_stream_info_get(log_msg->stream, INFO_FROM_DOWNSTREAM_RX_OFFSET, &c2s_byte_num, sizeof(c2s_byte_num)); if(ret != 0) { c2s_byte_num = log_msg->c2s_byte_num; } ret = tfe_stream_info_get(log_msg->stream, INFO_FROM_UPSTREAM_RX_OFFSET, &s2c_byte_num, sizeof(s2c_byte_num)); if(ret !=0) { s2c_byte_num = log_msg->s2c_byte_num; } cJSON_AddStringToObject(common_obj, "http_version", app_proto[http->major_version]); cJSON_AddStringToObject(common_obj, "decoded_as", "HTTP"); cJSON_AddNumberToObject(common_obj, "out_link_id", 0); cJSON_AddNumberToObject(common_obj, "in_link_id", 0); cJSON_AddStringToObject(common_obj, "sled_ip", handle->kafka_logger->local_ip_str); cJSON_AddNumberToObject(common_obj, "t_vsys_id", handle->kafka_logger->t_vsys_id); cJSON_AddStringToObject(common_obj, "device_id", handle->device_id); cJSON_AddNumberToObject(common_obj, "sent_bytes", c2s_byte_num); cJSON_AddNumberToObject(common_obj, "received_bytes", s2c_byte_num); cJSON_AddStringToObject(common_obj, "http_url", http->req->req_spec.url); cJSON_AddStringToObject(common_obj, "http_host", http->req->req_spec.host); cJSON_AddStringToObject(common_obj, "server_fqdn", http->req->req_spec.host); if(handle->effective_device_tag) { cJSON_AddStringToObject(common_obj, "device_tag", handle->effective_device_tag); } for(size_t i=0;ireq, req_fields[i].field_id); if(tmp_val!=NULL) { cJSON_AddStringToObject(common_obj,req_fields[i].log_filed_name, tmp_val); } } for(size_t i=0;iresp!=NULL;i++) { tmp_val=tfe_http_std_field_read(http->resp, resp_fields[i].field_id); if(tmp_val!=NULL) { cJSON_AddStringToObject(common_obj,resp_fields[i].log_filed_name, tmp_val); } } #define FILE_CHUNK_UUID_LEN 40 char uuid[FILE_CHUNK_UUID_LEN]={0}; size_t datalen=0; for(size_t i=0; iresult_num; i++) { if(log_msg->result[i].do_log!=1) continue; if(log_msg->req_body!=NULL) { if(uuid[0] != '\0') { cJSON_AddStringToObject(common_obj, "http_request_body", uuid); } else { get_http_body_uuid(uuid); datalen=file_bucket_upload_once(handle, uuid, log_msg->req_body); if(datalen>0) { cJSON_AddStringToObject(common_obj, "http_request_body", uuid); } else { TFE_LOG_ERROR(handle->local_logger, "Upload req_body failed."); } } } if(log_msg->resp_body!=NULL) { if(uuid[0] != '\0') { cJSON_AddStringToObject(common_obj, "http_response_body", uuid); } else { get_http_body_uuid(uuid); datalen=file_bucket_upload_once(handle, uuid, log_msg->resp_body); if(datalen>0) { cJSON_AddStringToObject(common_obj, "http_response_body", uuid); } else { TFE_LOG_ERROR(handle->local_logger, "Upload resp_body failed."); } } } } for(size_t i=0; iresult_num; i++) { TFE_LOG_DEBUG(handle->local_logger, "URL: %s, policy_id: %lld, service: %d, do_log:%d", http->req->req_spec.url, log_msg->result[i].config_id, log_msg->result[i].service_id, log_msg->result[i].do_log); if(log_msg->result[i].do_log==0) { continue; } cJSON *proxy_rule_list=NULL; int config_id[1]={0}; per_hit_obj=cJSON_Duplicate(common_obj, 1); config_id[0]=log_msg->result[i].config_id; proxy_rule_list = cJSON_CreateIntArray(config_id, 1); cJSON_AddItemToObject(per_hit_obj, "proxy_rule_list", proxy_rule_list); cJSON_AddNumberToObject(per_hit_obj, "vsys_id", log_msg->result[i].vsys_id); if(log_msg->result[i].action == LG_ACTION_MANIPULATE) { cJSON_AddStringToObject(per_hit_obj, "proxy_action", manipulate_action_map[log_msg->action]); cJSON_AddNumberToObject(per_hit_obj, "http_action_file_size", log_msg->inject_sz); } else { cJSON_AddStringToObject(per_hit_obj, "proxy_action", panggu_action_map[(unsigned char)(log_msg->result[i].action)]); } if(log_msg->location_client) { cJSON_AddStringToObject(per_hit_obj, "client_geolocation", log_msg->location_client); } if(log_msg->location_server) { cJSON_AddStringToObject(per_hit_obj, "server_geolocation", log_msg->location_server); } if(log_msg->asn_client) { cJSON_AddStringToObject(common_obj, "client_asn", log_msg->asn_client); } if (log_msg->asn_server) { cJSON_AddStringToObject(common_obj, "server_asn", log_msg->asn_server); } log_payload = cJSON_PrintUnformatted(per_hit_obj); TFE_LOG_DEBUG(handle->local_logger, "%s", log_payload); kafka_status = tfe_kafka_logger_send(handle->kafka_logger, TOPIC_LOGGER, log_payload, strlen(log_payload)); free(log_payload); cJSON_Delete(per_hit_obj); if(kafka_status<0) { TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error())); } send_cnt++; } cJSON_Delete(common_obj); return send_cnt; }