Move code scanning directories to dumpfile io

This commit is contained in:
luwenpeng
2024-04-30 15:47:45 +08:00
parent 95e0982fd1
commit bdf899cf01
6 changed files with 74 additions and 102 deletions

View File

@@ -1,5 +1,4 @@
add_subdirectory(log) add_subdirectory(log)
add_subdirectory(file)
add_subdirectory(timestamp) add_subdirectory(timestamp)
add_subdirectory(tuple) add_subdirectory(tuple)
add_subdirectory(packet) add_subdirectory(packet)

View File

@@ -1,3 +0,0 @@
add_library(file file_scan.cpp)
target_include_directories(file PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_link_libraries(file log)

View File

@@ -1,69 +0,0 @@
#include <errno.h>
#include <dirent.h>
#include <unistd.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include "file_scan.h"
int file_scan(const char *dir, file_handle *cb, void *arg)
{
struct stat statbuf;
struct dirent *entry;
DIR *dp = opendir(dir);
if (NULL == dp)
{
FILE_SCAN_LOG_ERROR("opendir %s failed, %s", dir, strerror(errno));
return -1;
}
if (chdir(dir) == -1)
{
FILE_SCAN_LOG_ERROR("chdir %s failed, %s", dir, strerror(errno));
goto error_out;
}
while ((entry = readdir(dp)))
{
if (lstat(entry->d_name, &statbuf) == -1)
{
FILE_SCAN_LOG_ERROR("lstat %s failed, %s", entry->d_name, strerror(errno));
goto error_out;
}
if (S_IFDIR & statbuf.st_mode)
{
if (!strcmp(".", entry->d_name) || !strcmp("..", entry->d_name))
{
continue;
}
if (file_scan(entry->d_name, cb, arg) == -1)
{
goto error_out;
}
}
else
{
if (cb(entry->d_name, arg) == -1)
{
goto error_out;
}
}
}
if (chdir("..") == -1)
{
FILE_SCAN_LOG_ERROR("chdir .. failed, %s", strerror(errno));
goto error_out;
}
closedir(dp);
return 0;
error_out:
closedir(dp);
return -1;
}

View File

@@ -1,21 +0,0 @@
#ifndef _FILE_SCAN_H
#define _FILE_SCAN_H
#ifdef __cplusplus
extern "C"
{
#endif
#include "log.h"
#define FILE_SCAN_LOG_ERROR(format, ...) LOG_ERROR("file scan", format, ##__VA_ARGS__)
#define FILE_SCAN_LOG_DEBUG(format, ...) LOG_DEBUG("file scan", format, ##__VA_ARGS__)
typedef int file_handle(const char *file, void *arg);
int file_scan(const char *dir, file_handle *cb, void *arg);
#ifdef __cplusplus
}
#endif
#endif

View File

@@ -1,4 +1,4 @@
add_library(packet_io dumpfile_io.cpp marsio_io.cpp lock_free_queue.cpp packet_io.cpp) add_library(packet_io dumpfile_io.cpp marsio_io.cpp lock_free_queue.cpp packet_io.cpp)
target_include_directories(packet_io PUBLIC ${CMAKE_CURRENT_LIST_DIR}) target_include_directories(packet_io PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_include_directories(packet_io PUBLIC ${CMAKE_SOURCE_DIR}/src/stellar) target_include_directories(packet_io PUBLIC ${CMAKE_SOURCE_DIR}/src/stellar)
target_link_libraries(packet_io mrzcpd pcap packet file) target_link_libraries(packet_io mrzcpd pcap packet)

View File

@@ -4,12 +4,15 @@
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
#include <assert.h> #include <assert.h>
#include <errno.h>
#include <dirent.h>
#include <sys/stat.h>
#include <sys/types.h>
#include "macro.h" #include "macro.h"
#include "packet_priv.h"
#include "file_scan.h"
#include "lock_free_queue.h"
#include "dumpfile_io.h" #include "dumpfile_io.h"
#include "packet_priv.h"
#include "lock_free_queue.h"
#define MAX_PACKET_QUEUE_SIZE (4096 * 1000) #define MAX_PACKET_QUEUE_SIZE (4096 * 1000)
@@ -35,7 +38,70 @@ struct pcap_pkt
* Private API * Private API
******************************************************************************/ ******************************************************************************/
static void pcap_handle(u_char *user, const struct pcap_pkthdr *h, const u_char *bytes) typedef int file_handle(const char *file, void *arg);
static int scan_directory(const char *dir, file_handle *handler, void *arg)
{
struct stat statbuf;
struct dirent *entry;
DIR *dp = opendir(dir);
if (NULL == dp)
{
PACKET_IO_LOG_ERROR("opendir %s failed, %s", dir, strerror(errno));
return -1;
}
if (chdir(dir) == -1)
{
PACKET_IO_LOG_ERROR("chdir %s failed, %s", dir, strerror(errno));
goto error_out;
}
while ((entry = readdir(dp)))
{
if (lstat(entry->d_name, &statbuf) == -1)
{
PACKET_IO_LOG_ERROR("lstat %s failed, %s", entry->d_name, strerror(errno));
goto error_out;
}
if (S_IFDIR & statbuf.st_mode)
{
if (!strcmp(".", entry->d_name) || !strcmp("..", entry->d_name))
{
continue;
}
if (scan_directory(entry->d_name, handler, arg) == -1)
{
goto error_out;
}
}
else
{
if (handler(entry->d_name, arg) == -1)
{
goto error_out;
}
}
}
if (chdir("..") == -1)
{
PACKET_IO_LOG_ERROR("chdir .. failed, %s", strerror(errno));
goto error_out;
}
closedir(dp);
return 0;
error_out:
closedir(dp);
return -1;
}
static void pcap_packet_handler(u_char *user, const struct pcap_pkthdr *h, const u_char *bytes)
{ {
struct dumpfile_io *handle = (struct dumpfile_io *)user; struct dumpfile_io *handle = (struct dumpfile_io *)user;
@@ -76,7 +142,7 @@ static void pcap_handle(u_char *user, const struct pcap_pkthdr *h, const u_char
} }
} }
static int dumpfile_handle(const char *file, void *arg) static int dumpfile_handler(const char *file, void *arg)
{ {
char resolved_path[256]; char resolved_path[256];
char pcap_errbuf[PCAP_ERRBUF_SIZE]; char pcap_errbuf[PCAP_ERRBUF_SIZE];
@@ -91,7 +157,7 @@ static int dumpfile_handle(const char *file, void *arg)
PACKET_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf); PACKET_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf);
return -1; return -1;
} }
pcap_loop(handle->pcap, -1, pcap_handle, (u_char *)handle); pcap_loop(handle->pcap, -1, pcap_packet_handler, (u_char *)handle);
pcap_close(handle->pcap); pcap_close(handle->pcap);
PACKET_IO_LOG_STATE("dumpfile %s processed", resolved_path) PACKET_IO_LOG_STATE("dumpfile %s processed", resolved_path)
@@ -106,7 +172,7 @@ static void *dumpfile_thread(void *arg)
ATOMIC_SET(&handle->io_thread_is_runing, 1); ATOMIC_SET(&handle->io_thread_is_runing, 1);
PACKET_IO_LOG_STATE("dumpfile io thread is running"); PACKET_IO_LOG_STATE("dumpfile io thread is running");
file_scan(handle->directory, dumpfile_handle, arg); scan_directory(handle->directory, dumpfile_handler, arg);
while (ATOMIC_READ(&handle->io_thread_need_exit) == 0) while (ATOMIC_READ(&handle->io_thread_need_exit) == 0)
{ {