From ecb57f6c6c5e1ad579bf42c4f8176ed6027fea61 Mon Sep 17 00:00:00 2001 From: shizhendong Date: Tue, 13 Aug 2024 14:08:33 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20session=20=E5=A2=9E=E5=8A=A0=20stream?= =?UTF-8?q?=5Fid,stream=5Furl?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. pcap 新增 summary 统计信息 --- .../service/impl/ApplicationServiceImpl.java | 10 +-- .../asw/module/runner/entity/PcapEntity.java | 1 + .../runner/service/impl/PcapServiceImpl.java | 11 +++ .../module/runner/util/PcapParserThread.java | 79 +++++++++++++++---- .../db/migration/V1.0.01__INIT_TABLES.sql | 1 + 5 files changed, 81 insertions(+), 21 deletions(-) diff --git a/src/main/java/net/geedge/asw/module/app/service/impl/ApplicationServiceImpl.java b/src/main/java/net/geedge/asw/module/app/service/impl/ApplicationServiceImpl.java index f334423..a197f00 100644 --- a/src/main/java/net/geedge/asw/module/app/service/impl/ApplicationServiceImpl.java +++ b/src/main/java/net/geedge/asw/module/app/service/impl/ApplicationServiceImpl.java @@ -236,11 +236,11 @@ public class ApplicationServiceImpl extends ServiceImpl "\"" + fileName + "\"") - .collect(Collectors.joining("|", "source: (", ")")); - String param3 = String.format("_q=(filters:!(),query:(language:lucene,query:'%s'))", source); + String filter = pcapList.stream() + .map(PcapEntity::getId) + .map(pcapId -> "\"" + pcapId + "\"") + .collect(Collectors.joining("|", "pcap.id: (", ")")); + String param3 = String.format("_q=(filters:!(),query:(language:lucene,query:'%s'))", filter); String query = String.format("?%s&%s&%s", param1, param2, param3); String kibanaDiscoverUrl = baseUrl + "#" + query; diff --git a/src/main/java/net/geedge/asw/module/runner/entity/PcapEntity.java b/src/main/java/net/geedge/asw/module/runner/entity/PcapEntity.java index 9daa488..a98c290 100644 --- a/src/main/java/net/geedge/asw/module/runner/entity/PcapEntity.java +++ b/src/main/java/net/geedge/asw/module/runner/entity/PcapEntity.java @@ -22,6 +22,7 @@ public class PcapEntity { private Long size; private String md5; private String status; + private String summary; private Long createTimestamp; private String createUserId; private String workspaceId; diff --git a/src/main/java/net/geedge/asw/module/runner/service/impl/PcapServiceImpl.java b/src/main/java/net/geedge/asw/module/runner/service/impl/PcapServiceImpl.java index 72301f1..3fe6fdf 100644 --- a/src/main/java/net/geedge/asw/module/runner/service/impl/PcapServiceImpl.java +++ b/src/main/java/net/geedge/asw/module/runner/service/impl/PcapServiceImpl.java @@ -30,6 +30,7 @@ import net.geedge.asw.module.workspace.entity.WorkspaceEntity; import net.geedge.asw.module.workspace.service.IWorkspaceService; import org.apache.commons.io.FileUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.core.io.Resource; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -38,6 +39,7 @@ import java.io.File; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -48,6 +50,9 @@ import java.util.stream.Collectors; public class PcapServiceImpl extends ServiceImpl implements IPcapService { private static final Log log = Log.get(); + @Value("${sharkdApi.host:127.0.0.1}") + private String sharkdApiHostAddr; + @Autowired private IJobService jobService; @@ -180,6 +185,11 @@ public class PcapServiceImpl extends ServiceImpl implements public void parse2session(String... ids) { List taskList = T.ListUtil.list(true); Long maxFileSize = 0L; + + // parse thread config properties + Properties properties = new Properties(); + properties.setProperty("sharkdApiHostAddr", this.sharkdApiHostAddr); + for (String id : ids) { PcapEntity pcapEntity = this.getById(id); if (T.ObjectUtil.isNotNull(pcapEntity)) { @@ -188,6 +198,7 @@ public class PcapServiceImpl extends ServiceImpl implements PcapParserThread pcapParserThread = new PcapParserThread(); pcapParserThread.setPcapEntity(pcapEntity); + pcapParserThread.setProperties(properties); taskList.add(pcapParserThread); Long size = pcapEntity.getSize(); diff --git a/src/main/java/net/geedge/asw/module/runner/util/PcapParserThread.java b/src/main/java/net/geedge/asw/module/runner/util/PcapParserThread.java index 5a30d4d..215ff24 100644 --- a/src/main/java/net/geedge/asw/module/runner/util/PcapParserThread.java +++ b/src/main/java/net/geedge/asw/module/runner/util/PcapParserThread.java @@ -22,10 +22,8 @@ import org.opensearch.client.opensearch.indices.ExistsRequest; import org.opensearch.client.opensearch.indices.IndexSettings; import java.io.IOException; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; -import java.util.stream.Stream; import static net.geedge.asw.module.runner.util.RunnerConstant.PcapStatus; @@ -34,6 +32,8 @@ public class PcapParserThread implements Runnable { private Log log = Log.get(); + private Properties properties; + private PcapEntity pcapEntity; private IPcapService pcapService; @@ -88,27 +88,33 @@ public class PcapParserThread implements Runnable { // geoip List ipList = jsonArray.stream() - .flatMap(obj -> Stream.of( - T.MapUtil.getStr((JSONObject) obj, "id.orig_h", ""), - T.MapUtil.getStr((JSONObject) obj, "id.resp_h", "") - )) + .map(obj -> T.MapUtil.getStr((JSONObject) obj, "id.resp_h", "")) .filter(s -> T.StrUtil.isNotEmpty(s)) .distinct() .collect(Collectors.toList()); Map geoipInfo = this.queryGeoip(ipList); - // add source&geoip_info field - String fileName = T.FileUtil.getName(pcapEntity.getPath()); + // add custom field + String pcapId = pcapEntity.getId(); + String pcapName = T.FileUtil.getName(pcapEntity.getPath()); + Long tcpStream = 0L, udpStream = 0L; + + String sharkdApiHostAddr = properties.getProperty("sharkdApiHostAddr", "127.0.0.1"); for (Object obj : jsonArray) { JSONObject pojo = (JSONObject) obj; - pojo.put("source", fileName); + pojo.put("pcap.id", pcapId); + pojo.put("pcap.name", pcapName); - String orig = T.MapUtil.getStr(pojo, "id.orig_h", ""); - if (T.StrUtil.isNotEmpty(orig)) { - JSONObject jsonObject = T.MapUtil.get(geoipInfo, orig, JSONObject.class, new JSONObject()); - pojo.put("id.orig_country", T.MapUtil.getStr(jsonObject, "country", "")); - pojo.put("id.orig_asn", T.MapUtil.getStr(jsonObject, "asn", "")); - pojo.put("id.orig_asname", T.MapUtil.getStr(jsonObject, "asname", "")); + String proto = T.MapUtil.getStr(pojo, "proto", ""); + if (T.StrUtil.equalsIgnoreCase("tcp", proto)) { + Long streamId = tcpStream++; + pojo.put("pcap.tcp_stream", streamId); + pojo.put("pcap.stream_url", String.format("http://%s/pcap/%s/tcp/%s", sharkdApiHostAddr, pcapId, streamId)); + } + if (T.StrUtil.equalsIgnoreCase("udp", proto)) { + Long streamId = udpStream++; + pojo.put("pcap.udp_stream", streamId); + pojo.put("pcap.stream_url", String.format("http://%s/pcap/%s/udp/%s", sharkdApiHostAddr, pcapId, streamId)); } String resp = T.MapUtil.getStr(pojo, "id.resp_h", ""); @@ -120,6 +126,9 @@ public class PcapParserThread implements Runnable { } } + // summary + this.statisticSummary(jsonArray); + // opensearch this.uploadToOpenSearch(jsonArray); } @@ -147,6 +156,44 @@ public class PcapParserThread implements Runnable { return map; } + /** + * statistic summary + * + * @param jsonArray + */ + private void statisticSummary(JSONArray jsonArray) { + if (T.ObjectUtil.isEmpty(jsonArray)) { + log.warn("[statisticSummary] [data array is empty] [id: {}]", pcapEntity.getId()); + } else { + Set services = new HashSet<>(); + Long packets = 0L; + for (Object obj : jsonArray) { + JSONObject pojo = (JSONObject) obj; + long origPkts = pojo.getLongValue("orig_pkts", 0); + long respPkts = pojo.getLongValue("resp_pkts", 0); + packets = packets + (origPkts + respPkts); + + services.add(pojo.getString("proto")); + services.add(pojo.getString("service")); + } + + JSONObject first = (JSONObject) jsonArray.getFirst(); + JSONObject last = (JSONObject) jsonArray.getLast(); + + Map m = T.MapUtil.builder() + .put("startTimestamp", first.getBigDecimal("ts")) + .put("endTimestamp", last.getBigDecimal("ts")) + .put("sessions", jsonArray.size()) + .put("packets", packets) + .put("services", services) + .build(); + pcapService.update(new LambdaUpdateWrapper() + .set(PcapEntity::getSummary, T.JSONUtil.toJsonStr(m)) + .eq(PcapEntity::getId, pcapEntity.getId()) + ); + } + } + /** * upload to opensearch * diff --git a/src/main/resources/db/migration/V1.0.01__INIT_TABLES.sql b/src/main/resources/db/migration/V1.0.01__INIT_TABLES.sql index 64094be..b32a2f8 100644 --- a/src/main/resources/db/migration/V1.0.01__INIT_TABLES.sql +++ b/src/main/resources/db/migration/V1.0.01__INIT_TABLES.sql @@ -227,6 +227,7 @@ CREATE TABLE `pcap` ( `size` bigint(20) NOT NULL DEFAULT 0 COMMENT '文件大小', `md5` varchar(64) NOT NULL DEFAULT '' COMMENT '摘要值,根据文件md5值判断是否已上存在,存在则响应当前id', `status` varchar(64) NOT NULL DEFAULT '' COMMENT '状态,可选值 Uploaded,Analyzing,Completed', + `summary` varchar(512) NOT NULL DEFAULT '{}' COMMENT '摘要信息, JSON 格式', `create_timestamp` bigint(20) NOT NULL COMMENT '创建时间戳', `create_user_id` varchar(64) NOT NULL COMMENT '创建人', `workspace_id` varchar(64) NOT NULL DEFAULT '' COMMENT '工作空间ID',