diff --git a/conf/pangu/pangu_pxy.conf b/conf/pangu/pangu_pxy.conf index 4b60256..edbe3cc 100644 --- a/conf/pangu/pangu_pxy.conf +++ b/conf/pangu/pangu_pxy.conf @@ -5,6 +5,7 @@ log_level=10 nic_name=eth4 entrance_id=0 kafka_brokerlist=10.4.34.10:9092,10.4.34.11:9092,10.4.34.12:9092,10.4.34.13:9092,10.4.34.14:9092,10.4.34.15:9092,10.4.34.16:9092,10.4.34.17:9092,10.4.34.18:9092,10.4.34.19:9092 +kafka_topic=policy-event-log #Addresses of minio. Format is defined by WiredLB. minio_ip_list=10.4.35.42-46; minio_listen_port=9000 diff --git a/plugin/business/pangu-http/src/pangu_logger.cpp b/plugin/business/pangu-http/src/pangu_logger.cpp index 4468e32..d366293 100644 --- a/plugin/business/pangu-http/src/pangu_logger.cpp +++ b/plugin/business/pangu-http/src/pangu_logger.cpp @@ -38,7 +38,7 @@ struct pangu_logger rd_kafka_topic_t* kafka_topic; pthread_mutex_t mutex; char brokerlist[TFE_STRING_MAX]; - const char* topic_name; + char topic_name[TFE_STRING_MAX]; void* local_logger; @@ -157,9 +157,9 @@ struct pangu_logger* pangu_log_handle_create(const char* profile, const char* s { TFE_LOG_ERROR(local_logger,"Pangu log init failed. Cannot create lafka handle with brokerlist: %s.", instance->brokerlist); goto error_out; - } + } - instance->topic_name="PXY-HTTP-LOG"; + MESA_load_profile_string_def(profile, section,"KAFKA_TOPIC", instance->topic_name, sizeof(instance->topic_name), "POLICY-EVENT-LOG"); instance->kafka_topic = rd_kafka_topic_new(instance->kafka_handle,instance->topic_name, NULL); log_file_upload_para=cache_evbase_parameter_new(profile, section, local_logger); instance->log_file_upload_instance=cache_evbase_instance_new(log_file_upload_para, local_logger); diff --git a/plugin/protocol/http2/include/internal/http2_stream.h b/plugin/protocol/http2/include/internal/http2_stream.h index 3882d08..2de2ce6 100644 --- a/plugin/protocol/http2/include/internal/http2_stream.h +++ b/plugin/protocol/http2/include/internal/http2_stream.h @@ -59,7 +59,7 @@ struct tfe_h2_half_private int by_stream; char * url_storage; - struct tfe_h2_payload body; + struct tfe_h2_payload h2_payload; struct tfe_h2_header header; struct tfe_h2_header promised; diff --git a/plugin/protocol/http2/src/http2_stream.cpp b/plugin/protocol/http2/src/http2_stream.cpp index ed7f080..2df3863 100644 --- a/plugin/protocol/http2/src/http2_stream.cpp +++ b/plugin/protocol/http2/src/http2_stream.cpp @@ -385,25 +385,25 @@ h2_half_ops_append_body(struct tfe_http_half * half, char * buff, size_t size, i { int xret = -1; struct tfe_h2_half_private * resp = nghttp2_to_half_private(half); - struct tfe_h2_payload *body = &resp->body; + struct tfe_h2_payload *body = &resp->h2_payload; if (buff == NULL && size == 0){ if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){ - xret = deflate_write(&body->deflate, NULL, 0, resp->body.evbuf_body, body->gzip, 1); + xret = deflate_write(&body->deflate, NULL, 0, resp->h2_payload.evbuf_body, body->gzip, 1); } resp->message_state = H2_READ_STATE_COMPLETE; goto finish; } - if (resp->body.evbuf_body == NULL){ - resp->body.evbuf_body = evbuffer_new(); + if (resp->h2_payload.evbuf_body == NULL){ + resp->h2_payload.evbuf_body = evbuffer_new(); } if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){ xret = deflate_write(&body->deflate, (const uint8_t *)buff, size, - resp->body.evbuf_body, body->gzip, 0); + resp->h2_payload.evbuf_body, body->gzip, 0); }else{ - xret = evbuffer_add(resp->body.evbuf_body, buff, size); + xret = evbuffer_add(resp->h2_payload.evbuf_body, buff, size); } finish: @@ -415,7 +415,7 @@ void delete_stream_half_data(struct tfe_h2_half_private **data, { if (*data){ - struct tfe_h2_payload *body = &((*data)->body); + struct tfe_h2_payload *body = &((*data)->h2_payload); inflate_finished(&body->inflate); deflate_finished(&body->deflate); @@ -450,7 +450,7 @@ void h2_half_ops_free(struct tfe_http_half * half) int h2_half_ops_body_begin(struct tfe_http_half * half, int by_stream) { struct tfe_h2_half_private * resp = nghttp2_to_half_private(half); - struct tfe_h2_payload *body = &resp->body; + struct tfe_h2_payload *body = &resp->h2_payload; assert(body->evbuf_body == NULL); @@ -482,14 +482,14 @@ int h2_half_ops_body_data(struct tfe_http_half * h2_response, const unsigned cha { int xret = -1; struct tfe_h2_half_private * h2_resp_priv = nghttp2_to_half_private(h2_response); - struct tfe_h2_payload *body = &h2_resp_priv->body; + struct tfe_h2_payload *body = &h2_resp_priv->h2_payload; if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){ xret = deflate_write(&body->deflate, (const uint8_t *)data, sz_data, - h2_resp_priv->body.evbuf_body, body->gzip, 0); + h2_resp_priv->h2_payload.evbuf_body, body->gzip, 0); }else{ - xret = evbuffer_add(h2_resp_priv->body.evbuf_body, data, sz_data); + xret = evbuffer_add(h2_resp_priv->h2_payload.evbuf_body, data, sz_data); } return xret; } @@ -592,9 +592,9 @@ tfe_half_private_init(enum tfe_http_direction direction, int32_t stream_id, headers_init(&half_private->header); headers_init(&half_private->promised); - half_private->body.evbuf_body = evbuffer_new(); - half_private->body.gzip = HTTP2_CONTENT_ENCODING_NONE; - half_private->body.padlen = 0; + half_private->h2_payload.evbuf_body = evbuffer_new(); + half_private->h2_payload.gzip = HTTP2_CONTENT_ENCODING_NONE; + half_private->h2_payload.padlen = 0; half_private->stream_id = stream_id; half_private->session = session; @@ -624,7 +624,7 @@ struct tfe_http_half * h2_ops_response_create(struct tfe_http_session * session, resp->method_or_status = resp_code; if (stream->resp) - resp->body.gzip = stream->resp->body.gzip; + resp->h2_payload.gzip = stream->resp->h2_payload.gzip; return &resp->half_public; } @@ -689,7 +689,7 @@ nghttp2_server_frame_submit_response(struct tfe_h2_stream *h2_stream_info, if (h2_header->nvlen <= 0) return ACTION_FORWARD_DATA; - struct tfe_h2_payload *body = &pangu_resp->body; + struct tfe_h2_payload *body = &pangu_resp->h2_payload; char str_sz_evbuf_body[TFE_STRING_MAX]; snprintf(str_sz_evbuf_body, sizeof(str_sz_evbuf_body) - 1, "%lu", evbuffer_get_length(body->evbuf_body)); @@ -730,7 +730,7 @@ server_frame_submit_data(struct tfe_h2_stream *connection, stream_action = nghttp2_server_frame_submit_response(connection, h2_session); }else{ int rv = -1; - struct tfe_h2_payload *body = &resp->body; + struct tfe_h2_payload *body = &resp->h2_payload; nghttp2_data_provider upstream_data_provider; upstream_data_provider.source.ptr = (void *)body; @@ -1076,7 +1076,7 @@ nghttp2_submit_end_stream_payload(struct tfe_h2_stream *h2_stream_info, resp->event_cb_user); } } - struct tfe_h2_payload *payload = &resp->body; + struct tfe_h2_payload *payload = &resp->h2_payload; payload->flags |= NGHTTP2_FLAG_END_STREAM; resp->body_state = H2_READ_STATE_COMPLETE; resp->message_state = H2_READ_STATE_COMPLETE; @@ -1115,7 +1115,7 @@ nghttp2_submit_frame_data(struct tfe_h2_stream *h2_stream_info,const nghttp2_fra goto finish; } resp = h2_session->resp; - resp->body.padlen = frame->data.padlen; + resp->h2_payload.padlen = frame->data.padlen; if (resp->body_state != H2_READ_STATE_COMPLETE){ nghttp2_submit_end_stream_payload(h2_stream_info, h2_session); } @@ -1703,7 +1703,7 @@ nghttp2_fill_up_header(nghttp2_session *ngh2_session, const nghttp2_frame *frame } if (field.field_id == TFE_HTTP_CONT_ENCODING) { - half->body.gzip = method_to_str_idx((const char *)value); + half->h2_payload.gzip = method_to_str_idx((const char *)value); } h2_header = &half->header; tfe_h2_header_add_field(h2_header, &field, (const char *)value, 1); @@ -1737,7 +1737,7 @@ nghttp2_fill_up_promise(nghttp2_session *ngh2_session, const nghttp2_frame *fram } if (field.field_id == TFE_HTTP_CONT_ENCODING) { - resp->body.gzip = method_to_str_idx((const char *)value); + resp->h2_payload.gzip = method_to_str_idx((const char *)value); } headers = &resp->promised; tfe_h2_header_add_field(headers, &field, (const char *)value, 1); @@ -1862,11 +1862,11 @@ nghttp2_client_on_data_chunk_recv(nghttp2_session *session, uint8_t flags, goto finish; } resp = h2_session->resp; - evbuffer_add(resp->body.evbuf_body, input, input_len); + evbuffer_add(resp->h2_payload.evbuf_body, input, input_len); - if (resp->body.gzip != HTTP2_CONTENT_ENCODING_NONE){ + if (resp->h2_payload.gzip != HTTP2_CONTENT_ENCODING_NONE){ ret = inflate_read(input, input_len, &uncompr, &uncompr_len, - &resp->body.inflate, resp->body.gzip); + &resp->h2_payload.inflate, resp->h2_payload.gzip); if (((ret == Z_STREAM_END) || (ret == Z_OK)) && uncompr > 0){ input = (const uint8_t*)uncompr; input_len = uncompr_len; @@ -1881,9 +1881,9 @@ nghttp2_client_on_data_chunk_recv(nghttp2_session *session, uint8_t flags, resp->event_cb_user); } if (flags == NGHTTP2_FLAG_END_STREAM){ - resp->body.flags = 0; + resp->h2_payload.flags = 0; }else{ - resp->body.flags = flags; + resp->h2_payload.flags = flags; } resp->body_state = H2_READ_STATE_READING; } @@ -1893,9 +1893,9 @@ nghttp2_client_on_data_chunk_recv(nghttp2_session *session, uint8_t flags, resp->event_cb_user); } if (flags == NGHTTP2_FLAG_END_STREAM){ - resp->body.flags = 0; + resp->h2_payload.flags = 0; }else{ - resp->body.flags = flags; + resp->h2_payload.flags = flags; } } if (uncompr_len) FREE(&uncompr); @@ -2000,7 +2000,7 @@ static ssize_t nghttp2_client_select_padding_callback(nghttp2_session *session, if (!resp) return frame->hd.length; - return (ssize_t)MIN(max_payloadlen, frame->hd.length + (resp->body.padlen)); + return (ssize_t)MIN(max_payloadlen, frame->hd.length + (resp->h2_payload.padlen)); } static int @@ -2176,12 +2176,12 @@ nghttp2_server_on_data_chunk_recv(nghttp2_session *session, uint8_t flags, goto finish; } req = h2_session->req; - req->body.flags = flags; - evbuffer_add(req->body.evbuf_body, input, input_len); + req->h2_payload.flags = flags; + evbuffer_add(req->h2_payload.evbuf_body, input, input_len); - if (req->body.gzip != HTTP2_CONTENT_ENCODING_NONE){ + if (req->h2_payload.gzip != HTTP2_CONTENT_ENCODING_NONE){ ret = inflate_read(input, input_len, &uncompr, &uncompr_len, - &req->body.inflate, req->body.gzip); + &req->h2_payload.inflate, req->h2_payload.gzip); if (((ret == Z_STREAM_END) || (ret == Z_OK)) && uncompr > 0){ input = (const uint8_t*)uncompr; input_len = uncompr_len;