在流量转发模块中增加DEFER数据的识别功能,避免DEFER的数据重复镜像。
* 原实现没有其他插件DEFER数据的情况,导致其他插件执行DEFER动作时重复镜像被DEFER的数据; * 现修正,在数据入口处读取本次回调的流OFFSET值,当数据被DEFER时,仅发送本次调用增加的数据。
This commit is contained in:
@@ -11,6 +11,11 @@ struct traffic_mirror_me
|
|||||||
{
|
{
|
||||||
struct profile_table_ex_data * profile_ex_data;
|
struct profile_table_ex_data * profile_ex_data;
|
||||||
struct traffic_mirror_rebuild * rebuild_ctx;
|
struct traffic_mirror_rebuild * rebuild_ctx;
|
||||||
|
|
||||||
|
/* Make the DEFER data not to mirror twice
|
||||||
|
* TODO: the size of (size_t) is enough for a tcp stream offset ? */
|
||||||
|
size_t downstream_rx_offset_mirrored;
|
||||||
|
size_t upstream_tx_offset_mirrored;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct traffic_mirror_instance __g_traffic_mirror_instance;
|
struct traffic_mirror_instance __g_traffic_mirror_instance;
|
||||||
@@ -635,8 +640,60 @@ enum tfe_stream_action traffic_mirror_on_data_cb(const struct tfe_stream * strea
|
|||||||
enum tfe_conn_dir dir, const unsigned char * data, size_t len, void ** pme)
|
enum tfe_conn_dir dir, const unsigned char * data, size_t len, void ** pme)
|
||||||
{
|
{
|
||||||
struct traffic_mirror_me * me = (struct traffic_mirror_me *)(*pme);
|
struct traffic_mirror_me * me = (struct traffic_mirror_me *)(*pme);
|
||||||
traffic_mirror_rebuild_data(me->rebuild_ctx, thread_id, (const char *) data, (size_t) len, dir);
|
|
||||||
|
/* Rx offset of this callback */
|
||||||
|
size_t rx_offset_this_time;
|
||||||
|
size_t rx_offset_mirrored;
|
||||||
|
enum tfe_stream_info rx_offset_type;
|
||||||
|
|
||||||
|
/* Need to mirrored data */
|
||||||
|
const unsigned char * ptr_data_need_to_mirrored;
|
||||||
|
size_t sz_data_need_to_mirrored = 0;
|
||||||
|
|
||||||
|
if (dir == CONN_DIR_DOWNSTREAM)
|
||||||
|
{
|
||||||
|
rx_offset_type = INFO_FROM_DOWNSTREAM_RX_OFFSET;
|
||||||
|
rx_offset_mirrored = me->downstream_rx_offset_mirrored;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
rx_offset_type = INFO_FROM_UPSTREAM_RX_OFFSET;
|
||||||
|
rx_offset_mirrored = me->upstream_tx_offset_mirrored;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Get the offset of this callback */
|
||||||
|
int ret = tfe_stream_info_get(stream, rx_offset_type, &rx_offset_this_time, sizeof(rx_offset_this_time));
|
||||||
|
if (unlikely(ret < 0))
|
||||||
|
{
|
||||||
|
TFE_STREAM_LOG_ERROR(stream, "Failed at fetch rx offset, detached.");
|
||||||
|
goto errout;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Mirrored offset must be larger than rx_offset,
|
||||||
|
* Otherwise, there is a hole in data stream */
|
||||||
|
assert(rx_offset_mirrored >= rx_offset_this_time);
|
||||||
|
assert(rx_offset_this_time + len >= rx_offset_mirrored);
|
||||||
|
|
||||||
|
sz_data_need_to_mirrored = len - (rx_offset_mirrored - rx_offset_this_time);
|
||||||
|
ptr_data_need_to_mirrored = data + (len - sz_data_need_to_mirrored);
|
||||||
|
|
||||||
|
/* Don't need to mirrored, the data has been mirrored in DEFER state */
|
||||||
|
if (sz_data_need_to_mirrored == 0)
|
||||||
|
goto out;
|
||||||
|
|
||||||
|
/* Update the mirrored offset */
|
||||||
|
if (dir == CONN_DIR_DOWNSTREAM) me->downstream_rx_offset_mirrored += sz_data_need_to_mirrored;
|
||||||
|
else me->upstream_tx_offset_mirrored += sz_data_need_to_mirrored;
|
||||||
|
|
||||||
|
traffic_mirror_rebuild_data(me->rebuild_ctx, thread_id, (const char *) ptr_data_need_to_mirrored,
|
||||||
|
(size_t) sz_data_need_to_mirrored, dir);
|
||||||
|
|
||||||
|
out:
|
||||||
return ACTION_FORWARD_DATA;
|
return ACTION_FORWARD_DATA;
|
||||||
|
|
||||||
|
errout:
|
||||||
|
tfe_stream_detach(stream);
|
||||||
|
return ACTION_FORWARD_DATA;
|
||||||
}
|
}
|
||||||
|
|
||||||
void traffic_mirror_on_close_cb(const struct tfe_stream * stream, unsigned int thread_id,
|
void traffic_mirror_on_close_cb(const struct tfe_stream * stream, unsigned int thread_id,
|
||||||
|
|||||||
Reference in New Issue
Block a user