feat: session 增加 stream_id,stream_url

1. pcap 新增 summary 统计信息
This commit is contained in:
shizhendong
2024-08-13 14:08:33 +08:00
parent 58b38fbe91
commit ecb57f6c6c
5 changed files with 81 additions and 21 deletions

View File

@@ -236,11 +236,11 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationDao, Applicat
String param1 = String.format("_a=(discover:(columns:!(_source),isDirty:!f,sort:!()),metadata:(indexPattern:'%s',view:discover))", workspaceId);
String param2 = "_g=(filters:!(),refreshInterval:(pause:!t,value:0),time:(from:now-15m,to:now))";
String source = pcapList.stream()
.map(PcapEntity::getName)
.map(fileName -> "\"" + fileName + "\"")
.collect(Collectors.joining("|", "source: (", ")"));
String param3 = String.format("_q=(filters:!(),query:(language:lucene,query:'%s'))", source);
String filter = pcapList.stream()
.map(PcapEntity::getId)
.map(pcapId -> "\"" + pcapId + "\"")
.collect(Collectors.joining("|", "pcap.id: (", ")"));
String param3 = String.format("_q=(filters:!(),query:(language:lucene,query:'%s'))", filter);
String query = String.format("?%s&%s&%s", param1, param2, param3);
String kibanaDiscoverUrl = baseUrl + "#" + query;

View File

@@ -22,6 +22,7 @@ public class PcapEntity {
private Long size;
private String md5;
private String status;
private String summary;
private Long createTimestamp;
private String createUserId;
private String workspaceId;

View File

@@ -30,6 +30,7 @@ import net.geedge.asw.module.workspace.entity.WorkspaceEntity;
import net.geedge.asw.module.workspace.service.IWorkspaceService;
import org.apache.commons.io.FileUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.Resource;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -38,6 +39,7 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -48,6 +50,9 @@ import java.util.stream.Collectors;
public class PcapServiceImpl extends ServiceImpl<PcapDao, PcapEntity> implements IPcapService {
private static final Log log = Log.get();
@Value("${sharkdApi.host:127.0.0.1}")
private String sharkdApiHostAddr;
@Autowired
private IJobService jobService;
@@ -180,6 +185,11 @@ public class PcapServiceImpl extends ServiceImpl<PcapDao, PcapEntity> implements
public void parse2session(String... ids) {
List<Runnable> taskList = T.ListUtil.list(true);
Long maxFileSize = 0L;
// parse thread config properties
Properties properties = new Properties();
properties.setProperty("sharkdApiHostAddr", this.sharkdApiHostAddr);
for (String id : ids) {
PcapEntity pcapEntity = this.getById(id);
if (T.ObjectUtil.isNotNull(pcapEntity)) {
@@ -188,6 +198,7 @@ public class PcapServiceImpl extends ServiceImpl<PcapDao, PcapEntity> implements
PcapParserThread pcapParserThread = new PcapParserThread();
pcapParserThread.setPcapEntity(pcapEntity);
pcapParserThread.setProperties(properties);
taskList.add(pcapParserThread);
Long size = pcapEntity.getSize();

View File

@@ -22,10 +22,8 @@ import org.opensearch.client.opensearch.indices.ExistsRequest;
import org.opensearch.client.opensearch.indices.IndexSettings;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static net.geedge.asw.module.runner.util.RunnerConstant.PcapStatus;
@@ -34,6 +32,8 @@ public class PcapParserThread implements Runnable {
private Log log = Log.get();
private Properties properties;
private PcapEntity pcapEntity;
private IPcapService pcapService;
@@ -88,27 +88,33 @@ public class PcapParserThread implements Runnable {
// geoip
List<String> ipList = jsonArray.stream()
.flatMap(obj -> Stream.of(
T.MapUtil.getStr((JSONObject) obj, "id.orig_h", ""),
T.MapUtil.getStr((JSONObject) obj, "id.resp_h", "")
))
.map(obj -> T.MapUtil.getStr((JSONObject) obj, "id.resp_h", ""))
.filter(s -> T.StrUtil.isNotEmpty(s))
.distinct()
.collect(Collectors.toList());
Map<String, JSONObject> geoipInfo = this.queryGeoip(ipList);
// add source&geoip_info field
String fileName = T.FileUtil.getName(pcapEntity.getPath());
// add custom field
String pcapId = pcapEntity.getId();
String pcapName = T.FileUtil.getName(pcapEntity.getPath());
Long tcpStream = 0L, udpStream = 0L;
String sharkdApiHostAddr = properties.getProperty("sharkdApiHostAddr", "127.0.0.1");
for (Object obj : jsonArray) {
JSONObject pojo = (JSONObject) obj;
pojo.put("source", fileName);
pojo.put("pcap.id", pcapId);
pojo.put("pcap.name", pcapName);
String orig = T.MapUtil.getStr(pojo, "id.orig_h", "");
if (T.StrUtil.isNotEmpty(orig)) {
JSONObject jsonObject = T.MapUtil.get(geoipInfo, orig, JSONObject.class, new JSONObject());
pojo.put("id.orig_country", T.MapUtil.getStr(jsonObject, "country", ""));
pojo.put("id.orig_asn", T.MapUtil.getStr(jsonObject, "asn", ""));
pojo.put("id.orig_asname", T.MapUtil.getStr(jsonObject, "asname", ""));
String proto = T.MapUtil.getStr(pojo, "proto", "");
if (T.StrUtil.equalsIgnoreCase("tcp", proto)) {
Long streamId = tcpStream++;
pojo.put("pcap.tcp_stream", streamId);
pojo.put("pcap.stream_url", String.format("http://%s/pcap/%s/tcp/%s", sharkdApiHostAddr, pcapId, streamId));
}
if (T.StrUtil.equalsIgnoreCase("udp", proto)) {
Long streamId = udpStream++;
pojo.put("pcap.udp_stream", streamId);
pojo.put("pcap.stream_url", String.format("http://%s/pcap/%s/udp/%s", sharkdApiHostAddr, pcapId, streamId));
}
String resp = T.MapUtil.getStr(pojo, "id.resp_h", "");
@@ -120,6 +126,9 @@ public class PcapParserThread implements Runnable {
}
}
// summary
this.statisticSummary(jsonArray);
// opensearch
this.uploadToOpenSearch(jsonArray);
}
@@ -147,6 +156,44 @@ public class PcapParserThread implements Runnable {
return map;
}
/**
* statistic summary
*
* @param jsonArray
*/
private void statisticSummary(JSONArray jsonArray) {
if (T.ObjectUtil.isEmpty(jsonArray)) {
log.warn("[statisticSummary] [data array is empty] [id: {}]", pcapEntity.getId());
} else {
Set<String> services = new HashSet<>();
Long packets = 0L;
for (Object obj : jsonArray) {
JSONObject pojo = (JSONObject) obj;
long origPkts = pojo.getLongValue("orig_pkts", 0);
long respPkts = pojo.getLongValue("resp_pkts", 0);
packets = packets + (origPkts + respPkts);
services.add(pojo.getString("proto"));
services.add(pojo.getString("service"));
}
JSONObject first = (JSONObject) jsonArray.getFirst();
JSONObject last = (JSONObject) jsonArray.getLast();
Map<Object, Object> m = T.MapUtil.builder()
.put("startTimestamp", first.getBigDecimal("ts"))
.put("endTimestamp", last.getBigDecimal("ts"))
.put("sessions", jsonArray.size())
.put("packets", packets)
.put("services", services)
.build();
pcapService.update(new LambdaUpdateWrapper<PcapEntity>()
.set(PcapEntity::getSummary, T.JSONUtil.toJsonStr(m))
.eq(PcapEntity::getId, pcapEntity.getId())
);
}
}
/**
* upload to opensearch
*

View File

@@ -227,6 +227,7 @@ CREATE TABLE `pcap` (
`size` bigint(20) NOT NULL DEFAULT 0 COMMENT '文件大小',
`md5` varchar(64) NOT NULL DEFAULT '' COMMENT '摘要值,根据文件md5值判断是否已上存在存在则响应当前id',
`status` varchar(64) NOT NULL DEFAULT '' COMMENT '状态,可选值 UploadedAnalyzingCompleted',
`summary` varchar(512) NOT NULL DEFAULT '{}' COMMENT '摘要信息, JSON 格式',
`create_timestamp` bigint(20) NOT NULL COMMENT '创建时间戳',
`create_user_id` varchar(64) NOT NULL COMMENT '创建人',
`workspace_id` varchar(64) NOT NULL DEFAULT '' COMMENT '工作空间ID',