feat(polling in stellar core): worker thread call polling_dispatch

This commit is contained in:
yangwei
2024-09-27 09:15:26 +08:00
parent b3769f0b9f
commit 7aeb5949ee
3 changed files with 10 additions and 3 deletions

View File

@@ -46,6 +46,7 @@ static void on_polling_dispatch(int topic_id __unused,
int stellar_polling_subscribe(struct stellar_polling_manager* polling_mgr, module_on_polling_func on_polling, void *polling_arg) int stellar_polling_subscribe(struct stellar_polling_manager* polling_mgr, module_on_polling_func on_polling, void *polling_arg)
{ {
if(polling_mgr==NULL || polling_mgr->mod_mgr == NULL)return -1;
polling_mgr->polling_topic_id=mq_schema_get_topic_id(stellar_module_manager_get_mq_schema(polling_mgr->mod_mgr), TOPIC_POLLING); polling_mgr->polling_topic_id=mq_schema_get_topic_id(stellar_module_manager_get_mq_schema(polling_mgr->mod_mgr), TOPIC_POLLING);
if(polling_mgr->polling_topic_id<0) if(polling_mgr->polling_topic_id<0)
{ {
@@ -58,11 +59,13 @@ int stellar_polling_subscribe(struct stellar_polling_manager* polling_mgr, modu
void stellar_polling_active(struct stellar_polling_manager *polling_mgr) void stellar_polling_active(struct stellar_polling_manager *polling_mgr)
{ {
if(polling_mgr==NULL || polling_mgr->mod_mgr == NULL)return;
mq_runtime_publish_message(stellar_module_manager_get_mq_runtime(polling_mgr->mod_mgr), polling_mgr->polling_topic_id, NULL); mq_runtime_publish_message(stellar_module_manager_get_mq_runtime(polling_mgr->mod_mgr), polling_mgr->polling_topic_id, NULL);
} }
void stellar_polling_dispatch(struct stellar_polling_manager *polling_mgr) void stellar_polling_dispatch(struct stellar_polling_manager *polling_mgr)
{ {
if(polling_mgr==NULL || polling_mgr->mod_mgr == NULL)return;
stellar_polling_active(polling_mgr); stellar_polling_active(polling_mgr);
mq_runtime_dispatch(stellar_module_manager_get_mq_runtime(polling_mgr->mod_mgr)); mq_runtime_dispatch(stellar_module_manager_get_mq_runtime(polling_mgr->mod_mgr));
mq_runtime_clean(stellar_module_manager_get_mq_runtime(polling_mgr->mod_mgr)); mq_runtime_clean(stellar_module_manager_get_mq_runtime(polling_mgr->mod_mgr));

View File

@@ -15,6 +15,7 @@ struct stellar_polling_manager
int polling_topic_id; int polling_topic_id;
}; };
//TODO: expose this function to polling_manager.h
void stellar_polling_dispatch(struct stellar_polling_manager *polling_mgr); void stellar_polling_dispatch(struct stellar_polling_manager *polling_mgr);
#ifdef __cplusplus #ifdef __cplusplus

View File

@@ -7,6 +7,7 @@
#include "packet_manager_internal.h" #include "packet_manager_internal.h"
#include "stellar/stellar.h" #include "stellar/stellar.h"
#include "stellar/module_manager.h" #include "stellar/module_manager.h"
#include "polling_manager_internal.h"
#define CORE_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "core", format, ##__VA_ARGS__) #define CORE_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "core", format, ##__VA_ARGS__)
#define CORE_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "core", format, ##__VA_ARGS__) #define CORE_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "core", format, ##__VA_ARGS__)
@@ -51,6 +52,9 @@ static void *worker_thread(void *arg)
struct packet_manager *pkt_mgr = st->pkt_mgr; struct packet_manager *pkt_mgr = st->pkt_mgr;
struct stellar_module_manager *mod_mgr = st->mod_mgr; struct stellar_module_manager *mod_mgr = st->mod_mgr;
struct mq_runtime *mq_rt = mq_runtime_new(st->mq_schema); struct mq_runtime *mq_rt = mq_runtime_new(st->mq_schema);
struct stellar_polling_manager *polling_mgr=stellar_module_get_polling_manager(mod_mgr);
snprintf(thread_name, sizeof(thread_name), "stellar:%d", thread_id); snprintf(thread_name, sizeof(thread_name), "stellar:%d", thread_id);
prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL); prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL);
@@ -102,12 +106,11 @@ static void *worker_thread(void *arg)
{ {
packet_io_egress(pkt_io, thread_id, pkt, 1); packet_io_egress(pkt_io, thread_id, pkt, 1);
} }
stellar_polling_dispatch(polling_mgr);
// TODO polling
} }
idle_tasks: idle_tasks:
// TODO polling stellar_polling_dispatch(polling_mgr);
if (nr_pkt_rcv == 0) if (nr_pkt_rcv == 0)
{ {