feat: ASW-14 新增 pcap 解析接口

This commit is contained in:
shizhendong
2024-07-30 21:10:39 +08:00
parent 17328600aa
commit d9ec686bc7
12 changed files with 776 additions and 11 deletions

28
pom.xml
View File

@@ -143,6 +143,34 @@
<artifactId>simplemagic</artifactId>
<version>1.16</version>
</dependency>
<!--Feign client支持-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<version>3.1.5</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.51</version>
</dependency>
<!--opensearch-->
<dependency>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-java</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-rest-client</artifactId>
<version>2.12.0</version>
</dependency>
</dependencies>
<build>

View File

@@ -0,0 +1,61 @@
/*
* Copyright 2012-2024 The Feign Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package net.geedge.asw.common.config.feign;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONException;
import com.alibaba.fastjson2.JSONReader;
import feign.FeignException;
import feign.Response;
import feign.Util;
import feign.codec.Decoder;
import java.io.IOException;
import java.io.Reader;
import java.lang.reflect.Type;
import static feign.Util.ensureClosed;
/**
* @author changjin wei(魏昌进)
*/
public class Fastjson2Decoder implements Decoder {
private final JSONReader.Feature[] features;
public Fastjson2Decoder() {
this(new JSONReader.Feature[0]);
}
public Fastjson2Decoder(JSONReader.Feature[] features) {
this.features = features;
}
@Override
public Object decode(Response response, Type type) throws IOException, FeignException {
if (response.status() == 404 || response.status() == 204) return Util.emptyValueOf(type);
if (response.body() == null) return null;
Reader reader = response.body().asReader(response.charset());
try {
return JSON.parseObject(reader, type, features);
} catch (JSONException e) {
if (e.getCause() != null && e.getCause() instanceof IOException) {
throw IOException.class.cast(e.getCause());
}
throw e;
} finally {
ensureClosed(reader);
}
}
}

View File

@@ -0,0 +1,44 @@
/*
* Copyright 2012-2024 The Feign Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package net.geedge.asw.common.config.feign;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONWriter;
import feign.RequestTemplate;
import feign.Util;
import feign.codec.EncodeException;
import feign.codec.Encoder;
import java.lang.reflect.Type;
/**
* @author changjin wei(魏昌进)
*/
public class Fastjson2Encoder implements Encoder {
private final JSONWriter.Feature[] features;
public Fastjson2Encoder() {
this(new JSONWriter.Feature[0]);
}
public Fastjson2Encoder(JSONWriter.Feature[] features) {
this.features = features;
}
@Override
public void encode(Object object, Type bodyType, RequestTemplate template) throws EncodeException {
template.body(JSON.toJSONBytes(object, features), Util.UTF_8);
}
}

View File

@@ -0,0 +1,254 @@
/*
* Copyright 2012-2024 The Feign Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package net.geedge.asw.common.config.feign;
import feign.*;
import feign.Request.Options;
import feign.Request.ProtocolVersion;
import java.io.IOException;
import java.io.InputStream;
import java.lang.ref.SoftReference;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Redirect;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublisher;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpRequest.Builder;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import static feign.Util.enumForName;
public class Http2Client implements Client, AsyncClient<Object> {
private final HttpClient client;
private final Map<Integer, SoftReference<HttpClient>> clients = new ConcurrentHashMap<>();
/**
* Creates the new Http2Client using following defaults:
* <ul>
* <li>Connect Timeout: 10 seconds, as {@link Request.Options#Options()} uses</li>
* <li>Follow all 3xx redirects</li>
* <li>HTTP 2</li>
* </ul>
*
* @see Request.Options#Options()
*/
public Http2Client() {
this(HttpClient.newBuilder()
.followRedirects(Redirect.ALWAYS)
.version(Version.HTTP_2)
.connectTimeout(Duration.ofMillis(10000))
.build());
}
public Http2Client(Options options) {
this(newClientBuilder(options)
.version(Version.HTTP_2)
.build());
}
public Http2Client(HttpClient client) {
this.client = Util.checkNotNull(client, "HttpClient must not be null");
}
@Override
public Response execute(Request request, Options options) throws IOException {
final HttpRequest httpRequest;
try {
httpRequest = newRequestBuilder(request, options)
.version(client.version())
.build();
} catch (URISyntaxException e) {
throw new IOException("Invalid uri " + request.url(), e);
}
HttpClient clientForRequest = getOrCreateClient(options);
HttpResponse<InputStream> httpResponse;
try {
httpResponse = clientForRequest.send(httpRequest, BodyHandlers.ofInputStream());
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
return toFeignResponse(request, httpResponse);
}
@Override
public CompletableFuture<Response> execute(Request request,
Options options,
Optional<Object> requestContext) {
HttpRequest httpRequest;
try {
httpRequest = newRequestBuilder(request, options).build();
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid uri " + request.url(), e);
}
HttpClient clientForRequest = getOrCreateClient(options);
CompletableFuture<HttpResponse<InputStream>> future =
clientForRequest.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofInputStream());
return future.thenApply(httpResponse -> toFeignResponse(request, httpResponse));
}
protected Response toFeignResponse(Request request, HttpResponse<InputStream> httpResponse) {
final OptionalLong length = httpResponse.headers().firstValueAsLong("Content-Length");
return Response.builder()
.protocolVersion(enumForName(ProtocolVersion.class, httpResponse.version()))
.body(httpResponse.body(), length.isPresent() ? (int) length.getAsLong() : null)
.reason(httpResponse.headers().firstValue("Reason-Phrase").orElse(null))
.request(request)
.status(httpResponse.statusCode())
.headers(castMapCollectType(httpResponse.headers().map()))
.build();
}
private HttpClient getOrCreateClient(Options options) {
if (doesClientConfigurationDiffer(options)) {
// create a new client from the existing one - but with connectTimeout and followRedirect
// settings from options
final int clientKey = createClientKey(options);
SoftReference<HttpClient> requestScopedSoftReference = clients.get(clientKey);
HttpClient requestScoped =
requestScopedSoftReference == null ? null : requestScopedSoftReference.get();
if (requestScoped == null) {
java.net.http.HttpClient.Builder builder = newClientBuilder(options)
.sslContext(client.sslContext())
.sslParameters(client.sslParameters())
.version(client.version());
client.authenticator().ifPresent(builder::authenticator);
client.cookieHandler().ifPresent(builder::cookieHandler);
client.executor().ifPresent(builder::executor);
client.proxy().ifPresent(builder::proxy);
requestScoped = builder.build();
clients.put(clientKey, new SoftReference<>(requestScoped));
}
return requestScoped;
}
return client;
}
private boolean doesClientConfigurationDiffer(Options options) {
if ((client.followRedirects() == Redirect.ALWAYS) != options.isFollowRedirects()) {
return true;
}
return client.connectTimeout()
.map(timeout -> timeout.toMillis() != options.connectTimeoutMillis())
.orElse(true);
}
/**
* Creates integer key that represents {@link Options} settings based on
* {@link Http2Client#doesClientConfigurationDiffer(Options)} method
*
* @param options value
* @return integer key
*/
public int createClientKey(feign.Request.Options options) {
int key = options.connectTimeoutMillis();
if (options.isFollowRedirects()) {
key |= 1 << 31; // connectTimeoutMillis always positive, so we can use first sign bit for
// isFollowRedirects flag
}
return key;
}
private static java.net.http.HttpClient.Builder newClientBuilder(Options options) {
return HttpClient
.newBuilder()
.followRedirects(options.isFollowRedirects() ? Redirect.ALWAYS : Redirect.NEVER)
.connectTimeout(Duration.ofMillis(options.connectTimeoutMillis()));
}
private Builder newRequestBuilder(Request request, Options options) throws URISyntaxException {
URI uri = new URI(request.url());
final BodyPublisher body;
final byte[] data = request.body();
if (data == null) {
body = BodyPublishers.noBody();
} else {
body = BodyPublishers.ofByteArray(data);
}
final Builder requestBuilder = HttpRequest.newBuilder()
.uri(uri)
.timeout(Duration.ofMillis(options.readTimeoutMillis()))
.version(client.version());
final Map<String, Collection<String>> headers = filterRestrictedHeaders(request.headers());
if (!headers.isEmpty()) {
requestBuilder.headers(asString(headers));
}
return requestBuilder.method(request.httpMethod().toString(), body);
}
/**
* There is a bunch o headers that the http2 client do not allow to be set.
*
* @see jdk.internal.net.http.common.Utils.DISALLOWED_HEADERS_SET
*/
private static final Set<String> DISALLOWED_HEADERS_SET;
static {
// A case insensitive TreeSet of strings.
final TreeSet<String> treeSet = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
treeSet.addAll(Set.of("connection", "content-length", "expect", "host", "upgrade"));
DISALLOWED_HEADERS_SET = Collections.unmodifiableSet(treeSet);
}
private Map<String, Collection<String>> filterRestrictedHeaders(Map<String, Collection<String>> headers) {
final Map<String, Collection<String>> filteredHeaders = headers.keySet()
.stream()
.filter(headerName -> !DISALLOWED_HEADERS_SET.contains(headerName))
.collect(Collectors.toMap(
Function.identity(),
headers::get));
filteredHeaders.computeIfAbsent("Accept", key -> List.of("*/*"));
return filteredHeaders;
}
private Map<String, Collection<String>> castMapCollectType(Map<String, List<String>> map) {
final Map<String, Collection<String>> result = new HashMap<>();
map.forEach((key, value) -> result.put(key, new HashSet<>(value)));
return result;
}
private String[] asString(Map<String, Collection<String>> headers) {
return headers.entrySet().stream()
.flatMap(entry -> entry.getValue()
.stream()
.map(value -> Arrays.asList(entry.getKey(), value))
.flatMap(List::stream))
.toArray(String[]::new);
}
}

View File

@@ -0,0 +1,48 @@
package net.geedge.asw.module.runner.client;
import cn.hutool.core.net.url.UrlBuilder;
import cn.hutool.log.Log;
import feign.Feign;
import feign.form.FormEncoder;
import net.geedge.asw.common.config.feign.Fastjson2Decoder;
import net.geedge.asw.common.config.feign.Fastjson2Encoder;
import net.geedge.asw.common.config.feign.Http2Client;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FeignClientConfiguration {
private static final Log log = Log.get();
@Value("${zeek.url:127.0.0.1:8086}")
private String zeekUrl;
@Value("${geoip.url:127.0.0.1:8087}")
private String geoipUrl;
@Bean("zeekClient")
public ZeekClient zeekClient() {
String url = UrlBuilder.ofHttp(zeekUrl).toString();
log.info("[zeekClient] [url: {}]", url);
return Feign.builder()
.encoder(new FormEncoder())
.decoder(new Fastjson2Decoder())
.client(new Http2Client())
.target(ZeekClient.class, url);
}
@Bean("geoipClient")
public GeoipClient geoipClient() {
String url = UrlBuilder.ofHttp(geoipUrl).toString();
log.info("[geoipClient] [url: {}]", url);
return Feign.builder()
.encoder(new Fastjson2Encoder())
.decoder(new Fastjson2Decoder())
.client(new Http2Client())
.target(GeoipClient.class, url);
}
}

View File

@@ -0,0 +1,14 @@
package net.geedge.asw.module.runner.client;
import com.alibaba.fastjson2.JSONArray;
import feign.Param;
import feign.RequestLine;
import org.springframework.cloud.openfeign.FeignClient;
@FeignClient(name = "geoipClient")
public interface GeoipClient {
@RequestLine("GET /geoip?ips={ip}")
JSONArray geoip(@Param("ip") String ipAddress);
}

View File

@@ -0,0 +1,88 @@
package net.geedge.asw.module.runner.client;
import cn.hutool.log.Log;
import net.geedge.asw.common.util.T;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.ssl.SSLContextBuilder;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.core.InfoResponse;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.rest_client.RestClientTransport;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
@Component
public class OpenSearchClientConfiguration {
private static final Log log = Log.get();
@Value("${opensearch.url:127.0.0.1:7200}")
private String hostAndPort;
@Value("${opensearch.username:admin}")
private String username;
@Value("${opensearch.password:G1egG2U4NrjHRzV}")
private String password;
@Bean("openSearchClient")
public OpenSearchClient openSearchClient() {
try {
if (T.StrUtil.hasEmpty(this.hostAndPort, this.username, this.password)) {
throw new IllegalArgumentException("OpenSearchClient init info cannot be empty.");
}
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
// Setup SSL context to trust all certificates
SSLContext sslContext = SSLContextBuilder.create()
.loadTrustMaterial((chain, authType) -> true)
.build();
String[] split = this.hostAndPort.split(":");
String host = split[0];
Integer port = Integer.valueOf(split[1]);
SSLContext finalSslContext = sslContext;
RestClientBuilder builder = RestClient.builder(
new HttpHost(host, port, "https"))
.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
.setSSLContext(finalSslContext)
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setDefaultIOReactorConfig(
IOReactorConfig.custom()
.setIoThreadCount(1)
.build()
);
return httpAsyncClientBuilder;
});
RestClient restClient = builder.build();
OpenSearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
OpenSearchClient client = new OpenSearchClient(transport);
InfoResponse info = client.info();
log.info("[openSearchClient] [url: {}] [{}]", this.hostAndPort, info.version().distribution() + ": " + info.version().number());
return client;
} catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | IOException e) {
log.error("[openSearchClient] [error] [url: {}]", this.hostAndPort);
throw new RuntimeException(e);
}
}
}

View File

@@ -0,0 +1,18 @@
package net.geedge.asw.module.runner.client;
import com.alibaba.fastjson2.JSONArray;
import feign.Headers;
import feign.Param;
import feign.RequestLine;
import org.springframework.cloud.openfeign.FeignClient;
import java.io.File;
@FeignClient(name = "zeekClient")
public interface ZeekClient {
@RequestLine("POST /upload")
@Headers("Content-Type: multipart/form-data")
JSONArray parser(@Param("pcap") File file);
}

View File

@@ -98,7 +98,7 @@ public class PcapController {
public R parse2session(String[] ids) {
T.VerifyUtil.is(ids).notEmpty();
// pcapService.parse2session(ids);
pcapService.parse2session(ids);
return R.ok();
}
}

View File

@@ -22,6 +22,7 @@ import net.geedge.asw.module.runner.service.IJobService;
import net.geedge.asw.module.runner.service.IPcapService;
import net.geedge.asw.module.runner.service.IPlaybookService;
import net.geedge.asw.module.runner.service.IRunnerService;
import net.geedge.asw.module.runner.util.PcapParserThread;
import net.geedge.asw.module.runner.util.RunnerConstant;
import net.geedge.asw.module.workbook.service.IWorkbookResourceService;
import net.geedge.asw.module.workbook.util.WorkbookConstant;
@@ -35,6 +36,11 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@Service
public class PcapServiceImpl extends ServiceImpl<PcapDao, PcapEntity> implements IPcapService {
@@ -169,7 +175,56 @@ public class PcapServiceImpl extends ServiceImpl<PcapDao, PcapEntity> implements
@Override
public void parse2session(String... ids) {
List<Runnable> taskList = T.ListUtil.list(true);
Long maxFileSize = 0L;
for (String id : ids) {
PcapEntity pcapEntity = this.getById(id);
if (T.ObjectUtil.isNotNull(pcapEntity)) {
PcapParserThread pcapParserThread = new PcapParserThread();
pcapParserThread.setPcapEntity(pcapEntity);
taskList.add(pcapParserThread);
Long size = pcapEntity.getSize();
if (size > maxFileSize) {
maxFileSize = size;
}
}
}
if (T.CollUtil.isNotEmpty(taskList)) {
List<CompletableFuture<Void>> futures = taskList.stream()
.map(task -> CompletableFuture.runAsync(task))
.collect(Collectors.toList());
CompletableFuture<Void> allTasksFuture = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
try {
allTasksFuture.get(this.calculateParseThreadTimeout(maxFileSize), TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error(e, "[parse2session] [error]");
throw new ASWException(RCode.ERROR);
}
}
}
/**
* calculate Parse Thread Timeout
*
* @param size
* @return
*/
private long calculateParseThreadTimeout(Long size) {
// 小于 1MB 的文件,超时时间为 1分钟
if (size <= 1048576) {
return 60;
// 小于10MB的文件超时时间为 3分钟
} else if (size <= 1048576 * 10) {
return 60 * 3;
// 其他,超时时间为 10分钟
} else {
return 60 * 10;
}
}
}

View File

@@ -1,13 +1,31 @@
package net.geedge.asw.module.runner.util;
import cn.hutool.log.Log;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import lombok.Data;
import net.geedge.asw.common.config.SpringContextUtils;
import net.geedge.asw.common.util.T;
import net.geedge.asw.module.runner.client.GeoipClient;
import net.geedge.asw.module.runner.client.ZeekClient;
import net.geedge.asw.module.runner.entity.PcapEntity;
import net.geedge.asw.module.runner.service.IPcapService;
import org.apache.commons.lang3.time.StopWatch;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;
import org.opensearch.client.opensearch.core.bulk.BulkResponseItem;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.DeleteIndexRequest;
import org.opensearch.client.opensearch.indices.ExistsRequest;
import org.opensearch.client.opensearch.indices.IndexSettings;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static net.geedge.asw.module.runner.util.RunnerConstant.PcapStatus;
@@ -19,8 +37,15 @@ public class PcapParserThread implements Runnable {
private PcapEntity pcapEntity;
private IPcapService pcapService;
private ZeekClient zeekClient;
private GeoipClient geoipClient;
private OpenSearchClient openSearchClient;
private void init() {
pcapService = SpringContextUtils.getBean(IPcapService.class);
zeekClient = (ZeekClient) SpringContextUtils.getBean("zeekClient");
geoipClient = (GeoipClient) SpringContextUtils.getBean("geoipClient");
openSearchClient = (OpenSearchClient) SpringContextUtils.getBean("openSearchClient");
}
@Override
@@ -33,25 +58,21 @@ public class PcapParserThread implements Runnable {
StopWatch sw = new StopWatch();
sw.start();
try {
log.info("job pcap parser run start");
// init
this.init();
// parsing
this.updateStatus(PcapStatus.PARSING.getValue());
// parser
this.parser();
// indexed
this.updateStatus(PcapStatus.INDEXED.getValue());
log.info("job pcap parser run end");
} catch (Exception e) {
// error
this.updateStatus(PcapStatus.ERROR.getValue());
log.error(e, "job pcap parser error, pcap: {}", pcapEntity.getId());
log.error(e, "job pcap parser error, id: {}", pcapEntity.getId());
} finally {
sw.stop();
log.info("job pcap parser end. Run Time: {}", sw.toString());
log.info("job pcap parser end. id: {} Run Time: {}", pcapEntity.getId(), sw.toString());
}
}
@@ -59,9 +80,130 @@ public class PcapParserThread implements Runnable {
* parser
*/
private void parser() {
String id = pcapEntity.getId();
String path = pcapEntity.getPath();
// TODO
// zeek
JSONArray jsonArray = zeekClient.parser(T.FileUtil.file(pcapEntity.getPath()));
if (log.isDebugEnabled()) {
log.debug("[parse] [zeek parse pcap file] [size: {}]", jsonArray.size());
}
// geoip
List<String> ipList = jsonArray.stream()
.flatMap(obj -> Stream.of(
T.MapUtil.getStr((JSONObject) obj, "id.orig_h", ""),
T.MapUtil.getStr((JSONObject) obj, "id.resp_h", "")
))
.filter(s -> T.StrUtil.isNotEmpty(s))
.distinct()
.collect(Collectors.toList());
Map<String, JSONObject> geoipInfo = this.queryGeoip(ipList);
// add source&geoip_info field
String fileName = T.FileUtil.getName(pcapEntity.getPath());
for (Object obj : jsonArray) {
JSONObject pojo = (JSONObject) obj;
pojo.put("source", fileName);
String orig = T.MapUtil.getStr(pojo, "id.orig_h", "");
if (T.StrUtil.isNotEmpty(orig)) {
JSONObject jsonObject = T.MapUtil.get(geoipInfo, orig, JSONObject.class, new JSONObject());
pojo.put("id.orig_country", T.MapUtil.getStr(jsonObject, "country", ""));
pojo.put("id.orig_asn", T.MapUtil.getStr(jsonObject, "asn", ""));
pojo.put("id.orig_asname", T.MapUtil.getStr(jsonObject, "asname", ""));
}
String resp = T.MapUtil.getStr(pojo, "id.resp_h", "");
if (T.StrUtil.isNotEmpty(resp)) {
JSONObject jsonObject = T.MapUtil.get(geoipInfo, resp, JSONObject.class, new JSONObject());
pojo.put("id.resp_country", T.MapUtil.getStr(jsonObject, "country", ""));
pojo.put("id.resp_asn", T.MapUtil.getStr(jsonObject, "asn", ""));
pojo.put("id.resp_asname", T.MapUtil.getStr(jsonObject, "asname", ""));
}
}
// opensearch
this.uploadToOpenSearch(jsonArray);
}
/**
* query geoip
*
* @param ipList
* @return
*/
private Map<String, JSONObject> queryGeoip(List<String> ipList) {
JSONArray result = new JSONArray();
int batchSize = 100;
for (int i = 0; i < ipList.size(); i += batchSize) {
List<String> currentBatch = ipList.subList(i, Math.min(i + batchSize, ipList.size()));
String queryParam = currentBatch.stream().collect(Collectors.joining(","));
JSONArray array = geoipClient.geoip(queryParam);
result.addAll(array);
}
Map<String, JSONObject> map = result.stream().collect(
Collectors.toMap(
obj -> T.MapUtil.getStr((JSONObject) obj, "ip"),
obj -> (JSONObject) obj
));
return map;
}
/**
* upload to opensearch
*
* @param jsonArray
*/
private void uploadToOpenSearch(JSONArray jsonArray) {
String pcapPath = pcapEntity.getPath();
String md5Hex = T.DigestUtil.md5Hex(T.FileUtil.file(pcapPath));
String indexName = String.format("session-%s", md5Hex);
try {
// check if index exists
boolean indexExists = openSearchClient.indices()
.exists(new ExistsRequest.Builder().index(indexName).build())
.value();
if (log.isDebugEnabled()) {
log.debug("[uploadToOpenSearch] [index: {}] [exists: {}]", indexName, indexExists);
}
// if index exists, delete
if (indexExists) {
openSearchClient.indices().delete(new DeleteIndexRequest.Builder().index(indexName).build());
log.debug("[uploadToOpenSearch] [index: {}] [deleted]", indexName);
}
// create index with default settings
openSearchClient.indices().create(
new CreateIndexRequest.Builder()
.index(indexName)
.settings(new IndexSettings.Builder().build())
.build()
);
// upload data in bulk
BulkRequest.Builder br = new BulkRequest.Builder();
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = (JSONObject) jsonArray.get(i);
String id = String.valueOf(i);
br.operations(op -> op.index(
idx -> idx.index(indexName)
.id(id)
.document(jsonObject)
));
}
BulkResponse result = openSearchClient.bulk(br.build());
// log errors, if any
if (result.errors()) {
log.error("[uploadToOpenSearch] [bulk had errors]");
for (BulkResponseItem item : result.items()) {
if (item.error() != null) {
log.error("[uploadToOpenSearch] [error reason]", item.error().reason());
}
}
}
} catch (IOException e) {
log.error("[uploadToOpenSearch] [error] [index: {}]", indexName);
throw new RuntimeException("Failed to upload data to OpenSearch", e);
}
}
/**

View File

@@ -35,7 +35,20 @@ spring:
enabled: true
main:
allow-circular-references: true
feign:
client:
config:
default: # 这里用default就是全局配置如果是写服务名称则是针对某个微服务的配置
connectTimeout: 5000
readTimeout: 5000
loggerLevel: BASIC # 日志级别 NONE|BASIC|HEADERS|FULL
httpclient:
enabled: true
max-connections: 200 # httpclient处理的最大连接数量
max-connections-per-route: 50 # 单个路径连接的最大数量
connection-timeout: 2000 # 超时等待
server:
# port: 2023
servlet: