1.支持从pangu_pxy.conf中读取kafka_topic

2.修改http2变量名
This commit is contained in:
fengweihao
2019-06-04 12:03:23 +08:00
parent d272087565
commit 78d5d473ac
4 changed files with 38 additions and 37 deletions

View File

@@ -5,6 +5,7 @@ log_level=10
nic_name=eth4
entrance_id=0
kafka_brokerlist=10.4.34.10:9092,10.4.34.11:9092,10.4.34.12:9092,10.4.34.13:9092,10.4.34.14:9092,10.4.34.15:9092,10.4.34.16:9092,10.4.34.17:9092,10.4.34.18:9092,10.4.34.19:9092
kafka_topic=policy-event-log
#Addresses of minio. Format is defined by WiredLB.
minio_ip_list=10.4.35.42-46;
minio_listen_port=9000

View File

@@ -38,7 +38,7 @@ struct pangu_logger
rd_kafka_topic_t* kafka_topic;
pthread_mutex_t mutex;
char brokerlist[TFE_STRING_MAX];
const char* topic_name;
char topic_name[TFE_STRING_MAX];
void* local_logger;
@@ -159,7 +159,7 @@ struct pangu_logger* pangu_log_handle_create(const char* profile, const char* s
goto error_out;
}
instance->topic_name="PXY-HTTP-LOG";
MESA_load_profile_string_def(profile, section,"KAFKA_TOPIC", instance->topic_name, sizeof(instance->topic_name), "POLICY-EVENT-LOG");
instance->kafka_topic = rd_kafka_topic_new(instance->kafka_handle,instance->topic_name, NULL);
log_file_upload_para=cache_evbase_parameter_new(profile, section, local_logger);
instance->log_file_upload_instance=cache_evbase_instance_new(log_file_upload_para, local_logger);

View File

@@ -59,7 +59,7 @@ struct tfe_h2_half_private
int by_stream;
char * url_storage;
struct tfe_h2_payload body;
struct tfe_h2_payload h2_payload;
struct tfe_h2_header header;
struct tfe_h2_header promised;

View File

@@ -385,25 +385,25 @@ h2_half_ops_append_body(struct tfe_http_half * half, char * buff, size_t size, i
{
int xret = -1;
struct tfe_h2_half_private * resp = nghttp2_to_half_private(half);
struct tfe_h2_payload *body = &resp->body;
struct tfe_h2_payload *body = &resp->h2_payload;
if (buff == NULL && size == 0){
if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){
xret = deflate_write(&body->deflate, NULL, 0, resp->body.evbuf_body, body->gzip, 1);
xret = deflate_write(&body->deflate, NULL, 0, resp->h2_payload.evbuf_body, body->gzip, 1);
}
resp->message_state = H2_READ_STATE_COMPLETE;
goto finish;
}
if (resp->body.evbuf_body == NULL){
resp->body.evbuf_body = evbuffer_new();
if (resp->h2_payload.evbuf_body == NULL){
resp->h2_payload.evbuf_body = evbuffer_new();
}
if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){
xret = deflate_write(&body->deflate, (const uint8_t *)buff, size,
resp->body.evbuf_body, body->gzip, 0);
resp->h2_payload.evbuf_body, body->gzip, 0);
}else{
xret = evbuffer_add(resp->body.evbuf_body, buff, size);
xret = evbuffer_add(resp->h2_payload.evbuf_body, buff, size);
}
finish:
@@ -415,7 +415,7 @@ void delete_stream_half_data(struct tfe_h2_half_private **data,
{
if (*data){
struct tfe_h2_payload *body = &((*data)->body);
struct tfe_h2_payload *body = &((*data)->h2_payload);
inflate_finished(&body->inflate);
deflate_finished(&body->deflate);
@@ -450,7 +450,7 @@ void h2_half_ops_free(struct tfe_http_half * half)
int h2_half_ops_body_begin(struct tfe_http_half * half, int by_stream)
{
struct tfe_h2_half_private * resp = nghttp2_to_half_private(half);
struct tfe_h2_payload *body = &resp->body;
struct tfe_h2_payload *body = &resp->h2_payload;
assert(body->evbuf_body == NULL);
@@ -482,14 +482,14 @@ int h2_half_ops_body_data(struct tfe_http_half * h2_response, const unsigned cha
{
int xret = -1;
struct tfe_h2_half_private * h2_resp_priv = nghttp2_to_half_private(h2_response);
struct tfe_h2_payload *body = &h2_resp_priv->body;
struct tfe_h2_payload *body = &h2_resp_priv->h2_payload;
if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){
xret = deflate_write(&body->deflate, (const uint8_t *)data, sz_data,
h2_resp_priv->body.evbuf_body, body->gzip, 0);
h2_resp_priv->h2_payload.evbuf_body, body->gzip, 0);
}else{
xret = evbuffer_add(h2_resp_priv->body.evbuf_body, data, sz_data);
xret = evbuffer_add(h2_resp_priv->h2_payload.evbuf_body, data, sz_data);
}
return xret;
}
@@ -592,9 +592,9 @@ tfe_half_private_init(enum tfe_http_direction direction, int32_t stream_id,
headers_init(&half_private->header);
headers_init(&half_private->promised);
half_private->body.evbuf_body = evbuffer_new();
half_private->body.gzip = HTTP2_CONTENT_ENCODING_NONE;
half_private->body.padlen = 0;
half_private->h2_payload.evbuf_body = evbuffer_new();
half_private->h2_payload.gzip = HTTP2_CONTENT_ENCODING_NONE;
half_private->h2_payload.padlen = 0;
half_private->stream_id = stream_id;
half_private->session = session;
@@ -624,7 +624,7 @@ struct tfe_http_half * h2_ops_response_create(struct tfe_http_session * session,
resp->method_or_status = resp_code;
if (stream->resp)
resp->body.gzip = stream->resp->body.gzip;
resp->h2_payload.gzip = stream->resp->h2_payload.gzip;
return &resp->half_public;
}
@@ -689,7 +689,7 @@ nghttp2_server_frame_submit_response(struct tfe_h2_stream *h2_stream_info,
if (h2_header->nvlen <= 0)
return ACTION_FORWARD_DATA;
struct tfe_h2_payload *body = &pangu_resp->body;
struct tfe_h2_payload *body = &pangu_resp->h2_payload;
char str_sz_evbuf_body[TFE_STRING_MAX];
snprintf(str_sz_evbuf_body, sizeof(str_sz_evbuf_body) - 1, "%lu", evbuffer_get_length(body->evbuf_body));
@@ -730,7 +730,7 @@ server_frame_submit_data(struct tfe_h2_stream *connection,
stream_action = nghttp2_server_frame_submit_response(connection, h2_session);
}else{
int rv = -1;
struct tfe_h2_payload *body = &resp->body;
struct tfe_h2_payload *body = &resp->h2_payload;
nghttp2_data_provider upstream_data_provider;
upstream_data_provider.source.ptr = (void *)body;
@@ -1076,7 +1076,7 @@ nghttp2_submit_end_stream_payload(struct tfe_h2_stream *h2_stream_info,
resp->event_cb_user);
}
}
struct tfe_h2_payload *payload = &resp->body;
struct tfe_h2_payload *payload = &resp->h2_payload;
payload->flags |= NGHTTP2_FLAG_END_STREAM;
resp->body_state = H2_READ_STATE_COMPLETE;
resp->message_state = H2_READ_STATE_COMPLETE;
@@ -1115,7 +1115,7 @@ nghttp2_submit_frame_data(struct tfe_h2_stream *h2_stream_info,const nghttp2_fra
goto finish;
}
resp = h2_session->resp;
resp->body.padlen = frame->data.padlen;
resp->h2_payload.padlen = frame->data.padlen;
if (resp->body_state != H2_READ_STATE_COMPLETE){
nghttp2_submit_end_stream_payload(h2_stream_info, h2_session);
}
@@ -1703,7 +1703,7 @@ nghttp2_fill_up_header(nghttp2_session *ngh2_session, const nghttp2_frame *frame
}
if (field.field_id == TFE_HTTP_CONT_ENCODING)
{
half->body.gzip = method_to_str_idx((const char *)value);
half->h2_payload.gzip = method_to_str_idx((const char *)value);
}
h2_header = &half->header;
tfe_h2_header_add_field(h2_header, &field, (const char *)value, 1);
@@ -1737,7 +1737,7 @@ nghttp2_fill_up_promise(nghttp2_session *ngh2_session, const nghttp2_frame *fram
}
if (field.field_id == TFE_HTTP_CONT_ENCODING)
{
resp->body.gzip = method_to_str_idx((const char *)value);
resp->h2_payload.gzip = method_to_str_idx((const char *)value);
}
headers = &resp->promised;
tfe_h2_header_add_field(headers, &field, (const char *)value, 1);
@@ -1862,11 +1862,11 @@ nghttp2_client_on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
goto finish;
}
resp = h2_session->resp;
evbuffer_add(resp->body.evbuf_body, input, input_len);
evbuffer_add(resp->h2_payload.evbuf_body, input, input_len);
if (resp->body.gzip != HTTP2_CONTENT_ENCODING_NONE){
if (resp->h2_payload.gzip != HTTP2_CONTENT_ENCODING_NONE){
ret = inflate_read(input, input_len, &uncompr, &uncompr_len,
&resp->body.inflate, resp->body.gzip);
&resp->h2_payload.inflate, resp->h2_payload.gzip);
if (((ret == Z_STREAM_END) || (ret == Z_OK)) && uncompr > 0){
input = (const uint8_t*)uncompr;
input_len = uncompr_len;
@@ -1881,9 +1881,9 @@ nghttp2_client_on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
resp->event_cb_user);
}
if (flags == NGHTTP2_FLAG_END_STREAM){
resp->body.flags = 0;
resp->h2_payload.flags = 0;
}else{
resp->body.flags = flags;
resp->h2_payload.flags = flags;
}
resp->body_state = H2_READ_STATE_READING;
}
@@ -1893,9 +1893,9 @@ nghttp2_client_on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
resp->event_cb_user);
}
if (flags == NGHTTP2_FLAG_END_STREAM){
resp->body.flags = 0;
resp->h2_payload.flags = 0;
}else{
resp->body.flags = flags;
resp->h2_payload.flags = flags;
}
}
if (uncompr_len) FREE(&uncompr);
@@ -2000,7 +2000,7 @@ static ssize_t nghttp2_client_select_padding_callback(nghttp2_session *session,
if (!resp)
return frame->hd.length;
return (ssize_t)MIN(max_payloadlen, frame->hd.length + (resp->body.padlen));
return (ssize_t)MIN(max_payloadlen, frame->hd.length + (resp->h2_payload.padlen));
}
static int
@@ -2176,12 +2176,12 @@ nghttp2_server_on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
goto finish;
}
req = h2_session->req;
req->body.flags = flags;
evbuffer_add(req->body.evbuf_body, input, input_len);
req->h2_payload.flags = flags;
evbuffer_add(req->h2_payload.evbuf_body, input, input_len);
if (req->body.gzip != HTTP2_CONTENT_ENCODING_NONE){
if (req->h2_payload.gzip != HTTP2_CONTENT_ENCODING_NONE){
ret = inflate_read(input, input_len, &uncompr, &uncompr_len,
&req->body.inflate, req->body.gzip);
&req->h2_payload.inflate, req->h2_payload.gzip);
if (((ret == Z_STREAM_END) || (ret == Z_OK)) && uncompr > 0){
input = (const uint8_t*)uncompr;
input_len = uncompr_len;