diff --git a/plugin/protocol/http/CMakeLists.txt b/plugin/protocol/http/CMakeLists.txt index 2dc0054..93a2551 100644 --- a/plugin/protocol/http/CMakeLists.txt +++ b/plugin/protocol/http/CMakeLists.txt @@ -8,7 +8,7 @@ target_link_libraries(http common) target_link_libraries(http http-parser-static) target_link_libraries(http libevent-static) target_link_libraries(http z) -target_link_libraries(http brotlienc-static brotlidec-static) +target_link_libraries(http brotlienc-static brotlidec-static zstd-static) target_link_libraries(http MESA_prof_load) ### UNITTEST CASE diff --git a/plugin/protocol/http/include/internal/http_convert.h b/plugin/protocol/http/include/internal/http_convert.h index 3acf100..cdefb7f 100644 --- a/plugin/protocol/http/include/internal/http_convert.h +++ b/plugin/protocol/http/include/internal/http_convert.h @@ -1,6 +1,8 @@ #pragma once #include #include +#include +#include struct hf_content_uncompress; struct hf_content_compress; diff --git a/plugin/protocol/http/include/internal/http_half.h b/plugin/protocol/http/include/internal/http_half.h index 90af617..ae97b2c 100644 --- a/plugin/protocol/http/include/internal/http_half.h +++ b/plugin/protocol/http/include/internal/http_half.h @@ -31,6 +31,7 @@ struct http_header_private #define HTTP_ACCEPT_ENCODING_X_GZIP BV(5) #define HTTP_ACCEPT_ENCODING_X_BZIP2 BV(6) #define HTTP_ACCEPT_ENCODING_BR BV(7) +#define HTTP_ACCEPT_ENCODING_ZSTD BV(8) TAILQ_HEAD(http_header_private_list, http_header_private); struct http_half_private diff --git a/plugin/protocol/http/src/http_convert.cpp b/plugin/protocol/http/src/http_convert.cpp index 8cc57b6..c1f5574 100644 --- a/plugin/protocol/http/src/http_convert.cpp +++ b/plugin/protocol/http/src/http_convert.cpp @@ -17,6 +17,8 @@ struct hf_content_uncompress z_stream * z_stream_ptr; /* BR DECODER STATE */ BrotliDecoderState * brdec_state; + /* ZSTD DCTX */ + ZSTD_DCtx* dctx; unsigned char * chunk; size_t sz_chunk; @@ -27,6 +29,7 @@ struct hf_content_compress unsigned int content_encode; z_stream * z_stream_ptr; BrotliEncoderState * brenc_state; + ZSTD_CCtx* cctx; }; void hf_content_uncompress_destroy(struct hf_content_uncompress * cv_object) @@ -44,6 +47,12 @@ void hf_content_uncompress_destroy(struct hf_content_uncompress * cv_object) cv_object->brdec_state = NULL; } + if(cv_object->dctx != NULL) + { + ZSTD_freeDCtx(cv_object->dctx); + cv_object->dctx = NULL; + } + free(cv_object->chunk); free(cv_object); } @@ -93,6 +102,11 @@ struct hf_content_uncompress * hf_content_uncompress_create(unsigned int content cv_object->brdec_state = BrotliDecoderCreateInstance(NULL, NULL, NULL); if (unlikely(cv_object->brdec_state == NULL)) goto __errout; } + if (content_encode == HTTP_ACCEPT_ENCODING_ZSTD) + { + cv_object->dctx = ZSTD_createDCtx(); + if (unlikely(cv_object->dctx == NULL)) goto __errout; + } return cv_object; @@ -109,6 +123,12 @@ __errout: cv_object->brdec_state = NULL; } + if(cv_object->dctx != NULL) + { + ZSTD_freeDCtx(cv_object->dctx); + cv_object->dctx = NULL; + } + free(cv_object->chunk); free(cv_object); return NULL; @@ -195,6 +215,43 @@ static int __hf_content_uncompress_write_br(struct hf_content_uncompress * cv_ob } } +static int __hf_content_uncompress_write_zstd(struct hf_content_uncompress * cv_object, + struct http_half_private * hf_private, tfe_http_event http_ev, const unsigned char * data, size_t datalen) +{ + ZSTD_outBuffer output; + size_t available_in = datalen; + const unsigned char * next_in = data; + ZSTD_inBuffer input = {next_in, available_in, 0 }; + + + size_t available_out; + unsigned char * next_out; + + int ret; + for (;;) + { + available_out = cv_object->sz_chunk; + next_out = cv_object->chunk; + output = {next_out, available_out, 0 }; + + ret = ZSTD_decompressStream(cv_object->dctx, &output, &input); + size_t have = (unsigned int)cv_object->sz_chunk - (output.size - output.pos); + if (have > 0 && cv_object->data_cb != NULL) + { + cv_object->data_cb(hf_private, http_ev, cv_object->chunk, have, cv_object->data_cb_user); + } + if(ret >= 0) + { + return 1; + } + if (ret < 0 ) + { + TFE_LOG_ERROR(g_http_plugin->logger, "ZSTD_decompressStream() failed: errno = %d, %s", + ret, ZSTD_getErrorName(ret)); return -1; + } + } +} + int hf_content_uncompress_write(struct hf_content_uncompress * cv_object, struct http_half_private * hf_private, tfe_http_event http_ev, const unsigned char * data, size_t datalen) { @@ -209,6 +266,11 @@ int hf_content_uncompress_write(struct hf_content_uncompress * cv_object, return __hf_content_uncompress_write_br(cv_object, hf_private, http_ev, data, datalen); } + if (cv_object->content_encode == HTTP_ACCEPT_ENCODING_ZSTD) + { + return __hf_content_uncompress_write_zstd(cv_object, hf_private, http_ev, data, datalen); + } + assert(0); return -1; } @@ -253,6 +315,13 @@ struct hf_content_compress * hf_content_compress_create(unsigned int content_enc BrotliEncoderSetParameter(cv_object->brenc_state, BROTLI_PARAM_QUALITY, 3); } + if (cv_object->content_encode == HTTP_ACCEPT_ENCODING_ZSTD) + { + cv_object->cctx = ZSTD_createCCtx(); + if (unlikely(cv_object->cctx == NULL)) goto __errout; + ZSTD_CCtx_setParameter(cv_object->cctx, ZSTD_c_compressionLevel, 1); + ZSTD_CCtx_setParameter(cv_object->cctx, ZSTD_c_checksumFlag, 1); + } return cv_object; __errout: @@ -268,6 +337,12 @@ __errout: cv_object->brenc_state = NULL; } + if (cv_object->cctx) + { + ZSTD_freeCCtx(cv_object->cctx); + cv_object->cctx = NULL; + } + free(cv_object); return NULL; } @@ -374,6 +449,54 @@ static int __hf_content_compress_write_br(struct hf_content_compress * cv_object return 0; } +static int __hf_content_compress_write_zstd(struct hf_content_compress * cv_object, + const unsigned char * in_data, size_t sz_in_data, struct evbuffer * out_ev_buf, int end) +{ + struct evbuffer_iovec v[1]; + size_t __sz_reserve_chunk = sz_in_data > SZ_RESERVE_SPACE ? sz_in_data : SZ_RESERVE_SPACE; + int iov_count = evbuffer_reserve_space(out_ev_buf, __sz_reserve_chunk, v, 1); + if (iov_count != 1) return -1; + + ZSTD_outBuffer output = {0}; + const unsigned char * next_in = in_data; + size_t avail_in = sz_in_data; + ZSTD_inBuffer input = {next_in, avail_in, 0}; + + unsigned char * next_out = (unsigned char *)v[0].iov_base; + size_t avail_out = v[0].iov_len; + output = (ZSTD_outBuffer){next_out, avail_out, 0}; + + ZSTD_EndDirective const mode = end ? ZSTD_e_end : ZSTD_e_continue; + int ret = 0; + do + { + ret = ZSTD_compressStream2(cv_object->cctx, &output, &input, mode); + if (ZSTD_isError(ret)) + { + TFE_LOG_ERROR(g_http_plugin->logger, "ZSTD_compressStream2() error."); + return ret; + } + if (output.pos == output.size && output.pos != 0 && output.size != 0) + { + v[0].iov_len = v[0].iov_len - avail_out; + ret = evbuffer_commit_space(out_ev_buf, v, iov_count); + if(ret < 0) return -2; + + if(avail_out == 0) + { + iov_count = evbuffer_reserve_space(out_ev_buf, __sz_reserve_chunk, v, 1); + if(unlikely(iov_count != 1)) return -3; + + next_out = (unsigned char *) v[0].iov_base; + avail_out = (unsigned int) v[0].iov_len; + output = (ZSTD_outBuffer){next_out, avail_out, 0}; + } + } + } + while (avail_in > 0); + return 0; +} + int hf_content_compress_write(struct hf_content_compress * cv_object, const unsigned char * in_data, size_t sz_in_data, struct evbuffer * out_ev_buf, int end) { @@ -388,12 +511,22 @@ int hf_content_compress_write(struct hf_content_compress * cv_object, return __hf_content_compress_write_br(cv_object, in_data, sz_in_data, out_ev_buf, end); } + if (cv_object->content_encode == HTTP_ACCEPT_ENCODING_ZSTD) + { + return __hf_content_compress_write_zstd(cv_object, in_data, sz_in_data, out_ev_buf, end); + } + assert(0); return -1; } void hf_content_compress_destroy(hf_content_compress * cv_object) { + if (cv_object->cctx){ + ZSTD_freeCCtx(cv_object->cctx); + cv_object->cctx = NULL; + } + if (cv_object->brenc_state) { BrotliEncoderDestroyInstance(cv_object->brenc_state); cv_object->brenc_state = NULL; diff --git a/plugin/protocol/http/src/http_half.cpp b/plugin/protocol/http/src/http_half.cpp index e364856..b0e7ba1 100644 --- a/plugin/protocol/http/src/http_half.cpp +++ b/plugin/protocol/http/src/http_half.cpp @@ -73,6 +73,11 @@ uint16_t __hf_content_encoding_parse(const char * str_content_encoding) return HTTP_ACCEPT_ENCODING_BR; } + if(strcasestr(str_content_encoding, "zstd") != NULL) + { + return HTTP_ACCEPT_ENCODING_ZSTD; + } + return HTTP_ACCEPT_ENCODING_NONE; } @@ -86,6 +91,7 @@ const char * __hf_content_encoding_to_str(unsigned int encode) case HTTP_ACCEPT_ENCODING_BZIP2: return "bzip2"; case HTTP_ACCEPT_ENCODING_X_BZIP2: return "x-bzip2"; case HTTP_ACCEPT_ENCODING_BR: return "br"; + case HTTP_ACCEPT_ENCODING_ZSTD: return "zstd"; default: return ""; } } diff --git a/plugin/protocol/http/test/test_http_convert.cpp b/plugin/protocol/http/test/test_http_convert.cpp index 3f333b7..d348e53 100644 --- a/plugin/protocol/http/test/test_http_convert.cpp +++ b/plugin/protocol/http/test/test_http_convert.cpp @@ -317,6 +317,22 @@ TEST_P(HttpConvertCompress, MonkeyToGzipStreamWithNullEnd) EXPECT_EQ(memcmp(uncompress_buf.data(), monkey, sizeof(monkey)), 0); } +TEST_P(HttpConvertCompress, MonkeyToZstd) +{ + int ret = hf_content_compress_write(cv_compress_object, monkey, sizeof(monkey), buf, 1); + ASSERT_EQ(ret, 0); + + unsigned char * __raw_buf_ptr = evbuffer_pullup(buf, -1); + size_t __raw_buf_length = evbuffer_get_length(buf); + + ret = hf_content_uncompress_write(cv_uncompress_object, NULL, EV_HTTP_REQ_BODY_CONT, + __raw_buf_ptr, __raw_buf_length); + ASSERT_TRUE(__raw_buf_ptr != NULL); + ASSERT_EQ(ret, 1); + EXPECT_EQ(uncompress_buf.size(), sizeof(monkey)); + EXPECT_EQ(memcmp(uncompress_buf.data(), monkey, sizeof(monkey)), 0); +} + INSTANTIATE_TEST_CASE_P(HttpConvertCompressGroup, HttpConvertCompress, ::testing::Values(HTTP_ACCEPT_ENCODING_GZIP, HTTP_ACCEPT_ENCODING_DEFLATE, HTTP_ACCEPT_ENCODING_BR));