diff --git a/pom.xml b/pom.xml index 72215a6..6d1f963 100644 --- a/pom.xml +++ b/pom.xml @@ -143,6 +143,34 @@ simplemagic 1.16 + + + + org.springframework.cloud + spring-cloud-starter-openfeign + 3.1.5 + + + + com.alibaba.fastjson2 + fastjson2 + 2.0.51 + + + + + + org.opensearch.client + opensearch-java + 2.12.0 + + + + org.opensearch.client + opensearch-rest-client + 2.12.0 + + diff --git a/src/main/java/net/geedge/asw/common/config/feign/Fastjson2Decoder.java b/src/main/java/net/geedge/asw/common/config/feign/Fastjson2Decoder.java new file mode 100644 index 0000000..67d8183 --- /dev/null +++ b/src/main/java/net/geedge/asw/common/config/feign/Fastjson2Decoder.java @@ -0,0 +1,61 @@ +/* + * Copyright 2012-2024 The Feign Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package net.geedge.asw.common.config.feign; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONException; +import com.alibaba.fastjson2.JSONReader; +import feign.FeignException; +import feign.Response; +import feign.Util; +import feign.codec.Decoder; + +import java.io.IOException; +import java.io.Reader; +import java.lang.reflect.Type; + +import static feign.Util.ensureClosed; + +/** + * @author changjin wei(魏昌进) + */ +public class Fastjson2Decoder implements Decoder { + + private final JSONReader.Feature[] features; + + public Fastjson2Decoder() { + this(new JSONReader.Feature[0]); + } + + public Fastjson2Decoder(JSONReader.Feature[] features) { + this.features = features; + } + + @Override + public Object decode(Response response, Type type) throws IOException, FeignException { + if (response.status() == 404 || response.status() == 204) return Util.emptyValueOf(type); + if (response.body() == null) return null; + Reader reader = response.body().asReader(response.charset()); + try { + return JSON.parseObject(reader, type, features); + } catch (JSONException e) { + if (e.getCause() != null && e.getCause() instanceof IOException) { + throw IOException.class.cast(e.getCause()); + } + throw e; + } finally { + ensureClosed(reader); + } + } +} \ No newline at end of file diff --git a/src/main/java/net/geedge/asw/common/config/feign/Fastjson2Encoder.java b/src/main/java/net/geedge/asw/common/config/feign/Fastjson2Encoder.java new file mode 100644 index 0000000..a2a8de8 --- /dev/null +++ b/src/main/java/net/geedge/asw/common/config/feign/Fastjson2Encoder.java @@ -0,0 +1,44 @@ +/* + * Copyright 2012-2024 The Feign Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package net.geedge.asw.common.config.feign; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONWriter; +import feign.RequestTemplate; +import feign.Util; +import feign.codec.EncodeException; +import feign.codec.Encoder; + +import java.lang.reflect.Type; + +/** + * @author changjin wei(魏昌进) + */ +public class Fastjson2Encoder implements Encoder { + + private final JSONWriter.Feature[] features; + + public Fastjson2Encoder() { + this(new JSONWriter.Feature[0]); + } + + public Fastjson2Encoder(JSONWriter.Feature[] features) { + this.features = features; + } + + @Override + public void encode(Object object, Type bodyType, RequestTemplate template) throws EncodeException { + template.body(JSON.toJSONBytes(object, features), Util.UTF_8); + } +} \ No newline at end of file diff --git a/src/main/java/net/geedge/asw/common/config/feign/Http2Client.java b/src/main/java/net/geedge/asw/common/config/feign/Http2Client.java new file mode 100644 index 0000000..7932975 --- /dev/null +++ b/src/main/java/net/geedge/asw/common/config/feign/Http2Client.java @@ -0,0 +1,254 @@ +/* + * Copyright 2012-2024 The Feign Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package net.geedge.asw.common.config.feign; + +import feign.*; +import feign.Request.Options; +import feign.Request.ProtocolVersion; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.ref.SoftReference; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpClient.Redirect; +import java.net.http.HttpClient.Version; +import java.net.http.HttpRequest; +import java.net.http.HttpRequest.BodyPublisher; +import java.net.http.HttpRequest.BodyPublishers; +import java.net.http.HttpRequest.Builder; +import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandlers; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static feign.Util.enumForName; + +public class Http2Client implements Client, AsyncClient { + + private final HttpClient client; + + private final Map> clients = new ConcurrentHashMap<>(); + + /** + * Creates the new Http2Client using following defaults: + *
    + *
  • Connect Timeout: 10 seconds, as {@link Request.Options#Options()} uses
  • + *
  • Follow all 3xx redirects
  • + *
  • HTTP 2
  • + *
+ * + * @see Request.Options#Options() + */ + public Http2Client() { + this(HttpClient.newBuilder() + .followRedirects(Redirect.ALWAYS) + .version(Version.HTTP_2) + .connectTimeout(Duration.ofMillis(10000)) + .build()); + } + + public Http2Client(Options options) { + this(newClientBuilder(options) + .version(Version.HTTP_2) + .build()); + } + + public Http2Client(HttpClient client) { + this.client = Util.checkNotNull(client, "HttpClient must not be null"); + } + + @Override + public Response execute(Request request, Options options) throws IOException { + final HttpRequest httpRequest; + try { + httpRequest = newRequestBuilder(request, options) + .version(client.version()) + .build(); + } catch (URISyntaxException e) { + throw new IOException("Invalid uri " + request.url(), e); + } + + HttpClient clientForRequest = getOrCreateClient(options); + HttpResponse httpResponse; + try { + httpResponse = clientForRequest.send(httpRequest, BodyHandlers.ofInputStream()); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + + return toFeignResponse(request, httpResponse); + } + + @Override + public CompletableFuture execute(Request request, + Options options, + Optional requestContext) { + HttpRequest httpRequest; + try { + httpRequest = newRequestBuilder(request, options).build(); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid uri " + request.url(), e); + } + + HttpClient clientForRequest = getOrCreateClient(options); + CompletableFuture> future = + clientForRequest.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofInputStream()); + return future.thenApply(httpResponse -> toFeignResponse(request, httpResponse)); + } + + protected Response toFeignResponse(Request request, HttpResponse httpResponse) { + final OptionalLong length = httpResponse.headers().firstValueAsLong("Content-Length"); + + return Response.builder() + .protocolVersion(enumForName(ProtocolVersion.class, httpResponse.version())) + .body(httpResponse.body(), length.isPresent() ? (int) length.getAsLong() : null) + .reason(httpResponse.headers().firstValue("Reason-Phrase").orElse(null)) + .request(request) + .status(httpResponse.statusCode()) + .headers(castMapCollectType(httpResponse.headers().map())) + .build(); + } + + private HttpClient getOrCreateClient(Options options) { + if (doesClientConfigurationDiffer(options)) { + // create a new client from the existing one - but with connectTimeout and followRedirect + // settings from options + final int clientKey = createClientKey(options); + + SoftReference requestScopedSoftReference = clients.get(clientKey); + HttpClient requestScoped = + requestScopedSoftReference == null ? null : requestScopedSoftReference.get(); + + if (requestScoped == null) { + java.net.http.HttpClient.Builder builder = newClientBuilder(options) + .sslContext(client.sslContext()) + .sslParameters(client.sslParameters()) + .version(client.version()); + client.authenticator().ifPresent(builder::authenticator); + client.cookieHandler().ifPresent(builder::cookieHandler); + client.executor().ifPresent(builder::executor); + client.proxy().ifPresent(builder::proxy); + requestScoped = builder.build(); + clients.put(clientKey, new SoftReference<>(requestScoped)); + } + return requestScoped; + } + return client; + } + + private boolean doesClientConfigurationDiffer(Options options) { + if ((client.followRedirects() == Redirect.ALWAYS) != options.isFollowRedirects()) { + return true; + } + return client.connectTimeout() + .map(timeout -> timeout.toMillis() != options.connectTimeoutMillis()) + .orElse(true); + } + + /** + * Creates integer key that represents {@link Options} settings based on + * {@link Http2Client#doesClientConfigurationDiffer(Options)} method + * + * @param options value + * @return integer key + */ + public int createClientKey(feign.Request.Options options) { + int key = options.connectTimeoutMillis(); + if (options.isFollowRedirects()) { + key |= 1 << 31; // connectTimeoutMillis always positive, so we can use first sign bit for + // isFollowRedirects flag + } + return key; + } + + private static java.net.http.HttpClient.Builder newClientBuilder(Options options) { + return HttpClient + .newBuilder() + .followRedirects(options.isFollowRedirects() ? Redirect.ALWAYS : Redirect.NEVER) + .connectTimeout(Duration.ofMillis(options.connectTimeoutMillis())); + } + + private Builder newRequestBuilder(Request request, Options options) throws URISyntaxException { + URI uri = new URI(request.url()); + + final BodyPublisher body; + final byte[] data = request.body(); + if (data == null) { + body = BodyPublishers.noBody(); + } else { + body = BodyPublishers.ofByteArray(data); + } + + final Builder requestBuilder = HttpRequest.newBuilder() + .uri(uri) + .timeout(Duration.ofMillis(options.readTimeoutMillis())) + .version(client.version()); + + final Map> headers = filterRestrictedHeaders(request.headers()); + if (!headers.isEmpty()) { + requestBuilder.headers(asString(headers)); + } + + return requestBuilder.method(request.httpMethod().toString(), body); + } + + /** + * There is a bunch o headers that the http2 client do not allow to be set. + * + * @see jdk.internal.net.http.common.Utils.DISALLOWED_HEADERS_SET + */ + private static final Set DISALLOWED_HEADERS_SET; + + static { + // A case insensitive TreeSet of strings. + final TreeSet treeSet = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + treeSet.addAll(Set.of("connection", "content-length", "expect", "host", "upgrade")); + DISALLOWED_HEADERS_SET = Collections.unmodifiableSet(treeSet); + } + + private Map> filterRestrictedHeaders(Map> headers) { + final Map> filteredHeaders = headers.keySet() + .stream() + .filter(headerName -> !DISALLOWED_HEADERS_SET.contains(headerName)) + .collect(Collectors.toMap( + Function.identity(), + headers::get)); + + filteredHeaders.computeIfAbsent("Accept", key -> List.of("*/*")); + + return filteredHeaders; + } + + private Map> castMapCollectType(Map> map) { + final Map> result = new HashMap<>(); + map.forEach((key, value) -> result.put(key, new HashSet<>(value))); + return result; + } + + private String[] asString(Map> headers) { + return headers.entrySet().stream() + .flatMap(entry -> entry.getValue() + .stream() + .map(value -> Arrays.asList(entry.getKey(), value)) + .flatMap(List::stream)) + .toArray(String[]::new); + } +} \ No newline at end of file diff --git a/src/main/java/net/geedge/asw/module/runner/client/FeignClientConfiguration.java b/src/main/java/net/geedge/asw/module/runner/client/FeignClientConfiguration.java new file mode 100644 index 0000000..da5488f --- /dev/null +++ b/src/main/java/net/geedge/asw/module/runner/client/FeignClientConfiguration.java @@ -0,0 +1,48 @@ +package net.geedge.asw.module.runner.client; + +import cn.hutool.core.net.url.UrlBuilder; +import cn.hutool.log.Log; +import feign.Feign; +import feign.form.FormEncoder; +import net.geedge.asw.common.config.feign.Fastjson2Decoder; +import net.geedge.asw.common.config.feign.Fastjson2Encoder; +import net.geedge.asw.common.config.feign.Http2Client; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class FeignClientConfiguration { + + private static final Log log = Log.get(); + + @Value("${zeek.url:127.0.0.1:8086}") + private String zeekUrl; + + @Value("${geoip.url:127.0.0.1:8087}") + private String geoipUrl; + + + @Bean("zeekClient") + public ZeekClient zeekClient() { + String url = UrlBuilder.ofHttp(zeekUrl).toString(); + log.info("[zeekClient] [url: {}]", url); + return Feign.builder() + .encoder(new FormEncoder()) + .decoder(new Fastjson2Decoder()) + .client(new Http2Client()) + .target(ZeekClient.class, url); + } + + @Bean("geoipClient") + public GeoipClient geoipClient() { + String url = UrlBuilder.ofHttp(geoipUrl).toString(); + log.info("[geoipClient] [url: {}]", url); + return Feign.builder() + .encoder(new Fastjson2Encoder()) + .decoder(new Fastjson2Decoder()) + .client(new Http2Client()) + .target(GeoipClient.class, url); + } + +} \ No newline at end of file diff --git a/src/main/java/net/geedge/asw/module/runner/client/GeoipClient.java b/src/main/java/net/geedge/asw/module/runner/client/GeoipClient.java new file mode 100644 index 0000000..94aed4e --- /dev/null +++ b/src/main/java/net/geedge/asw/module/runner/client/GeoipClient.java @@ -0,0 +1,14 @@ +package net.geedge.asw.module.runner.client; + +import com.alibaba.fastjson2.JSONArray; +import feign.Param; +import feign.RequestLine; +import org.springframework.cloud.openfeign.FeignClient; + +@FeignClient(name = "geoipClient") +public interface GeoipClient { + + @RequestLine("GET /geoip?ips={ip}") + JSONArray geoip(@Param("ip") String ipAddress); + +} \ No newline at end of file diff --git a/src/main/java/net/geedge/asw/module/runner/client/OpenSearchClientConfiguration.java b/src/main/java/net/geedge/asw/module/runner/client/OpenSearchClientConfiguration.java new file mode 100644 index 0000000..6199b33 --- /dev/null +++ b/src/main/java/net/geedge/asw/module/runner/client/OpenSearchClientConfiguration.java @@ -0,0 +1,88 @@ +package net.geedge.asw.module.runner.client; + +import cn.hutool.log.Log; +import net.geedge.asw.common.util.T; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.nio.reactor.IOReactorConfig; +import org.apache.http.ssl.SSLContextBuilder; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestClientBuilder; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch.core.InfoResponse; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.rest_client.RestClientTransport; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; + +@Component +public class OpenSearchClientConfiguration { + + private static final Log log = Log.get(); + + @Value("${opensearch.url:127.0.0.1:7200}") + private String hostAndPort; + + @Value("${opensearch.username:admin}") + private String username; + + @Value("${opensearch.password:G1egG2U4NrjHRzV}") + private String password; + + @Bean("openSearchClient") + public OpenSearchClient openSearchClient() { + try { + if (T.StrUtil.hasEmpty(this.hostAndPort, this.username, this.password)) { + throw new IllegalArgumentException("OpenSearchClient init info cannot be empty."); + } + + BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); + + // Setup SSL context to trust all certificates + SSLContext sslContext = SSLContextBuilder.create() + .loadTrustMaterial((chain, authType) -> true) + .build(); + + String[] split = this.hostAndPort.split(":"); + String host = split[0]; + Integer port = Integer.valueOf(split[1]); + SSLContext finalSslContext = sslContext; + RestClientBuilder builder = RestClient.builder( + new HttpHost(host, port, "https")) + .setHttpClientConfigCallback(httpAsyncClientBuilder -> { + httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider) + .setSSLContext(finalSslContext) + .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE) + .setDefaultIOReactorConfig( + IOReactorConfig.custom() + .setIoThreadCount(1) + .build() + ); + return httpAsyncClientBuilder; + }); + + RestClient restClient = builder.build(); + OpenSearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); + OpenSearchClient client = new OpenSearchClient(transport); + + InfoResponse info = client.info(); + log.info("[openSearchClient] [url: {}] [{}]", this.hostAndPort, info.version().distribution() + ": " + info.version().number()); + return client; + } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | IOException e) { + log.error("[openSearchClient] [error] [url: {}]", this.hostAndPort); + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/src/main/java/net/geedge/asw/module/runner/client/ZeekClient.java b/src/main/java/net/geedge/asw/module/runner/client/ZeekClient.java new file mode 100644 index 0000000..479e1ee --- /dev/null +++ b/src/main/java/net/geedge/asw/module/runner/client/ZeekClient.java @@ -0,0 +1,18 @@ +package net.geedge.asw.module.runner.client; + +import com.alibaba.fastjson2.JSONArray; +import feign.Headers; +import feign.Param; +import feign.RequestLine; +import org.springframework.cloud.openfeign.FeignClient; + +import java.io.File; + +@FeignClient(name = "zeekClient") +public interface ZeekClient { + + @RequestLine("POST /upload") + @Headers("Content-Type: multipart/form-data") + JSONArray parser(@Param("pcap") File file); + +} \ No newline at end of file diff --git a/src/main/java/net/geedge/asw/module/runner/controller/PcapController.java b/src/main/java/net/geedge/asw/module/runner/controller/PcapController.java index 63d01b3..2fc287e 100644 --- a/src/main/java/net/geedge/asw/module/runner/controller/PcapController.java +++ b/src/main/java/net/geedge/asw/module/runner/controller/PcapController.java @@ -98,7 +98,7 @@ public class PcapController { public R parse2session(String[] ids) { T.VerifyUtil.is(ids).notEmpty(); - // pcapService.parse2session(ids); + pcapService.parse2session(ids); return R.ok(); } } \ No newline at end of file diff --git a/src/main/java/net/geedge/asw/module/runner/service/impl/PcapServiceImpl.java b/src/main/java/net/geedge/asw/module/runner/service/impl/PcapServiceImpl.java index 1aace99..d9ee2a3 100644 --- a/src/main/java/net/geedge/asw/module/runner/service/impl/PcapServiceImpl.java +++ b/src/main/java/net/geedge/asw/module/runner/service/impl/PcapServiceImpl.java @@ -22,6 +22,7 @@ import net.geedge.asw.module.runner.service.IJobService; import net.geedge.asw.module.runner.service.IPcapService; import net.geedge.asw.module.runner.service.IPlaybookService; import net.geedge.asw.module.runner.service.IRunnerService; +import net.geedge.asw.module.runner.util.PcapParserThread; import net.geedge.asw.module.runner.util.RunnerConstant; import net.geedge.asw.module.workbook.service.IWorkbookResourceService; import net.geedge.asw.module.workbook.util.WorkbookConstant; @@ -35,6 +36,11 @@ import java.io.File; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; @Service public class PcapServiceImpl extends ServiceImpl implements IPcapService { @@ -169,7 +175,56 @@ public class PcapServiceImpl extends ServiceImpl implements @Override public void parse2session(String... ids) { + List taskList = T.ListUtil.list(true); + Long maxFileSize = 0L; + for (String id : ids) { + PcapEntity pcapEntity = this.getById(id); + if (T.ObjectUtil.isNotNull(pcapEntity)) { + PcapParserThread pcapParserThread = new PcapParserThread(); + pcapParserThread.setPcapEntity(pcapEntity); + taskList.add(pcapParserThread); + Long size = pcapEntity.getSize(); + if (size > maxFileSize) { + maxFileSize = size; + } + } + } + if (T.CollUtil.isNotEmpty(taskList)) { + List> futures = taskList.stream() + .map(task -> CompletableFuture.runAsync(task)) + .collect(Collectors.toList()); + + CompletableFuture allTasksFuture = CompletableFuture.allOf( + futures.toArray(new CompletableFuture[0]) + ); + + try { + allTasksFuture.get(this.calculateParseThreadTimeout(maxFileSize), TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error(e, "[parse2session] [error]"); + throw new ASWException(RCode.ERROR); + } + } + } + + /** + * calculate Parse Thread Timeout + * + * @param size + * @return + */ + private long calculateParseThreadTimeout(Long size) { + // 小于 1MB 的文件,超时时间为 1分钟 + if (size <= 1048576) { + return 60; + // 小于10MB的文件,超时时间为 3分钟 + } else if (size <= 1048576 * 10) { + return 60 * 3; + // 其他,超时时间为 10分钟 + } else { + return 60 * 10; + } } } \ No newline at end of file diff --git a/src/main/java/net/geedge/asw/module/runner/util/PcapParserThread.java b/src/main/java/net/geedge/asw/module/runner/util/PcapParserThread.java index 527443f..8cb0a85 100644 --- a/src/main/java/net/geedge/asw/module/runner/util/PcapParserThread.java +++ b/src/main/java/net/geedge/asw/module/runner/util/PcapParserThread.java @@ -1,13 +1,31 @@ package net.geedge.asw.module.runner.util; import cn.hutool.log.Log; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import lombok.Data; import net.geedge.asw.common.config.SpringContextUtils; import net.geedge.asw.common.util.T; +import net.geedge.asw.module.runner.client.GeoipClient; +import net.geedge.asw.module.runner.client.ZeekClient; import net.geedge.asw.module.runner.entity.PcapEntity; import net.geedge.asw.module.runner.service.IPcapService; import org.apache.commons.lang3.time.StopWatch; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch.core.BulkRequest; +import org.opensearch.client.opensearch.core.BulkResponse; +import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; +import org.opensearch.client.opensearch.indices.CreateIndexRequest; +import org.opensearch.client.opensearch.indices.DeleteIndexRequest; +import org.opensearch.client.opensearch.indices.ExistsRequest; +import org.opensearch.client.opensearch.indices.IndexSettings; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static net.geedge.asw.module.runner.util.RunnerConstant.PcapStatus; @@ -19,8 +37,15 @@ public class PcapParserThread implements Runnable { private PcapEntity pcapEntity; private IPcapService pcapService; + private ZeekClient zeekClient; + private GeoipClient geoipClient; + private OpenSearchClient openSearchClient; + private void init() { pcapService = SpringContextUtils.getBean(IPcapService.class); + zeekClient = (ZeekClient) SpringContextUtils.getBean("zeekClient"); + geoipClient = (GeoipClient) SpringContextUtils.getBean("geoipClient"); + openSearchClient = (OpenSearchClient) SpringContextUtils.getBean("openSearchClient"); } @Override @@ -33,25 +58,21 @@ public class PcapParserThread implements Runnable { StopWatch sw = new StopWatch(); sw.start(); try { - log.info("job pcap parser run start"); // init this.init(); - // parsing this.updateStatus(PcapStatus.PARSING.getValue()); // parser this.parser(); // indexed this.updateStatus(PcapStatus.INDEXED.getValue()); - - log.info("job pcap parser run end"); } catch (Exception e) { // error this.updateStatus(PcapStatus.ERROR.getValue()); - log.error(e, "job pcap parser error, pcap: {}", pcapEntity.getId()); + log.error(e, "job pcap parser error, id: {}", pcapEntity.getId()); } finally { sw.stop(); - log.info("job pcap parser end. Run Time: {}", sw.toString()); + log.info("job pcap parser end. id: {} Run Time: {}", pcapEntity.getId(), sw.toString()); } } @@ -59,9 +80,130 @@ public class PcapParserThread implements Runnable { * parser */ private void parser() { - String id = pcapEntity.getId(); - String path = pcapEntity.getPath(); - // TODO + // zeek + JSONArray jsonArray = zeekClient.parser(T.FileUtil.file(pcapEntity.getPath())); + if (log.isDebugEnabled()) { + log.debug("[parse] [zeek parse pcap file] [size: {}]", jsonArray.size()); + } + + // geoip + List ipList = jsonArray.stream() + .flatMap(obj -> Stream.of( + T.MapUtil.getStr((JSONObject) obj, "id.orig_h", ""), + T.MapUtil.getStr((JSONObject) obj, "id.resp_h", "") + )) + .filter(s -> T.StrUtil.isNotEmpty(s)) + .distinct() + .collect(Collectors.toList()); + Map geoipInfo = this.queryGeoip(ipList); + + // add source&geoip_info field + String fileName = T.FileUtil.getName(pcapEntity.getPath()); + for (Object obj : jsonArray) { + JSONObject pojo = (JSONObject) obj; + pojo.put("source", fileName); + + String orig = T.MapUtil.getStr(pojo, "id.orig_h", ""); + if (T.StrUtil.isNotEmpty(orig)) { + JSONObject jsonObject = T.MapUtil.get(geoipInfo, orig, JSONObject.class, new JSONObject()); + pojo.put("id.orig_country", T.MapUtil.getStr(jsonObject, "country", "")); + pojo.put("id.orig_asn", T.MapUtil.getStr(jsonObject, "asn", "")); + pojo.put("id.orig_asname", T.MapUtil.getStr(jsonObject, "asname", "")); + } + + String resp = T.MapUtil.getStr(pojo, "id.resp_h", ""); + if (T.StrUtil.isNotEmpty(resp)) { + JSONObject jsonObject = T.MapUtil.get(geoipInfo, resp, JSONObject.class, new JSONObject()); + pojo.put("id.resp_country", T.MapUtil.getStr(jsonObject, "country", "")); + pojo.put("id.resp_asn", T.MapUtil.getStr(jsonObject, "asn", "")); + pojo.put("id.resp_asname", T.MapUtil.getStr(jsonObject, "asname", "")); + } + } + + // opensearch + this.uploadToOpenSearch(jsonArray); + } + + /** + * query geoip + * + * @param ipList + * @return + */ + private Map queryGeoip(List ipList) { + JSONArray result = new JSONArray(); + int batchSize = 100; + for (int i = 0; i < ipList.size(); i += batchSize) { + List currentBatch = ipList.subList(i, Math.min(i + batchSize, ipList.size())); + String queryParam = currentBatch.stream().collect(Collectors.joining(",")); + JSONArray array = geoipClient.geoip(queryParam); + result.addAll(array); + } + Map map = result.stream().collect( + Collectors.toMap( + obj -> T.MapUtil.getStr((JSONObject) obj, "ip"), + obj -> (JSONObject) obj + )); + return map; + } + + /** + * upload to opensearch + * + * @param jsonArray + */ + private void uploadToOpenSearch(JSONArray jsonArray) { + String pcapPath = pcapEntity.getPath(); + String md5Hex = T.DigestUtil.md5Hex(T.FileUtil.file(pcapPath)); + String indexName = String.format("session-%s", md5Hex); + + try { + // check if index exists + boolean indexExists = openSearchClient.indices() + .exists(new ExistsRequest.Builder().index(indexName).build()) + .value(); + if (log.isDebugEnabled()) { + log.debug("[uploadToOpenSearch] [index: {}] [exists: {}]", indexName, indexExists); + } + // if index exists, delete + if (indexExists) { + openSearchClient.indices().delete(new DeleteIndexRequest.Builder().index(indexName).build()); + log.debug("[uploadToOpenSearch] [index: {}] [deleted]", indexName); + } + + // create index with default settings + openSearchClient.indices().create( + new CreateIndexRequest.Builder() + .index(indexName) + .settings(new IndexSettings.Builder().build()) + .build() + ); + + // upload data in bulk + BulkRequest.Builder br = new BulkRequest.Builder(); + for (int i = 0; i < jsonArray.size(); i++) { + JSONObject jsonObject = (JSONObject) jsonArray.get(i); + String id = String.valueOf(i); + br.operations(op -> op.index( + idx -> idx.index(indexName) + .id(id) + .document(jsonObject) + )); + } + BulkResponse result = openSearchClient.bulk(br.build()); + // log errors, if any + if (result.errors()) { + log.error("[uploadToOpenSearch] [bulk had errors]"); + for (BulkResponseItem item : result.items()) { + if (item.error() != null) { + log.error("[uploadToOpenSearch] [error reason]", item.error().reason()); + } + } + } + } catch (IOException e) { + log.error("[uploadToOpenSearch] [error] [index: {}]", indexName); + throw new RuntimeException("Failed to upload data to OpenSearch", e); + } } /** diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index a4a79ad..31cf25a 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -35,7 +35,20 @@ spring: enabled: true main: allow-circular-references: true - + +feign: + client: + config: + default: # 这里用default就是全局配置,如果是写服务名称,则是针对某个微服务的配置 + connectTimeout: 5000 + readTimeout: 5000 + loggerLevel: BASIC # 日志级别 NONE|BASIC|HEADERS|FULL + httpclient: + enabled: true + max-connections: 200 # httpclient处理的最大连接数量 + max-connections-per-route: 50 # 单个路径连接的最大数量 + connection-timeout: 2000 # 超时等待 + server: # port: 2023 servlet: