feature: adding Kafka infrastructure
This commit is contained in:
@@ -8,6 +8,7 @@ extern "C"
|
||||
|
||||
#include <sched.h>
|
||||
|
||||
#include "kafka.h"
|
||||
#include "policy.h"
|
||||
#include "timestamp.h"
|
||||
#include "packet_io.h"
|
||||
@@ -99,6 +100,7 @@ struct sce_ctx
|
||||
int cpu_affinity_mask[MAX_THREAD_NUM];
|
||||
|
||||
cpu_set_t coremask;
|
||||
struct kafka *kfk;
|
||||
struct timestamp *ts;
|
||||
struct packet_io *io;
|
||||
struct global_metrics *metrics;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <assert.h>
|
||||
#include <MESA/field_stat2.h>
|
||||
#include <MESA/MESA_prof_load.h>
|
||||
|
||||
#include "log.h"
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
#include "sce.h"
|
||||
#include "log.h"
|
||||
#include "kafka.h"
|
||||
#include "global_metrics.h"
|
||||
|
||||
char *memdup(const char *src, int len)
|
||||
@@ -87,6 +88,12 @@ struct sce_ctx *sce_ctx_create(const char *profile)
|
||||
CPU_SET(cpu_id, &sce_ctx->coremask);
|
||||
}
|
||||
|
||||
sce_ctx->kfk = kafka_create(profile);
|
||||
if (sce_ctx->kfk == NULL)
|
||||
{
|
||||
goto error_out;
|
||||
}
|
||||
|
||||
sce_ctx->ts = timestamp_new(sce_ctx->ts_update_interval_ms);
|
||||
sce_ctx->metrics = global_metrics_create(profile, sce_ctx->nr_worker_threads);
|
||||
if (sce_ctx->metrics == NULL)
|
||||
@@ -126,6 +133,7 @@ void sce_ctx_destory(struct sce_ctx *sce_ctx)
|
||||
policy_enforcer_destory(sce_ctx->enforcer);
|
||||
global_metrics_destory(sce_ctx->metrics);
|
||||
timestamp_free(sce_ctx->ts);
|
||||
kafka_destroy(sce_ctx->kfk);
|
||||
|
||||
free(sce_ctx);
|
||||
sce_ctx = NULL;
|
||||
|
||||
Reference in New Issue
Block a user