From 737613d2eeca36e1fd5f37a28910f59781b62892 Mon Sep 17 00:00:00 2001 From: shizhendong Date: Thu, 29 Aug 2024 10:40:35 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20ASW-53=20opensearch=20index=20=E5=88=9B?= =?UTF-8?q?=E5=BB=BA=E6=97=B6=E9=85=8D=E7=BD=AE=E8=87=AA=E5=AE=9A=E4=B9=89?= =?UTF-8?q?=E6=98=A0=E5=B0=84=20mapping=20settings?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../module/runner/util/PcapParserThread.java | 104 +++++++++++++++++- 1 file changed, 98 insertions(+), 6 deletions(-) diff --git a/src/main/java/net/geedge/asw/module/runner/util/PcapParserThread.java b/src/main/java/net/geedge/asw/module/runner/util/PcapParserThread.java index a3329b7..9cf6588 100644 --- a/src/main/java/net/geedge/asw/module/runner/util/PcapParserThread.java +++ b/src/main/java/net/geedge/asw/module/runner/util/PcapParserThread.java @@ -17,6 +17,7 @@ import net.geedge.asw.module.runner.service.IPcapService; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.time.StopWatch; import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.mapping.Property; import org.opensearch.client.opensearch.core.BulkRequest; import org.opensearch.client.opensearch.core.BulkResponse; import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; @@ -248,12 +249,103 @@ public class PcapParserThread implements Runnable { } // create index with default settings - openSearchClient.indices().create( - new CreateIndexRequest.Builder() - .index(indexName) - .settings(new IndexSettings.Builder().build()) - .build() - ); +// openSearchClient.indices().create( +// new CreateIndexRequest.Builder() +// .index(indexName) +// .settings(new IndexSettings.Builder().build()) +// .build() +// ); + CreateIndexRequest.Builder createIndexRequestBuilder = new CreateIndexRequest.Builder() + .index(indexName) + .settings(new IndexSettings.Builder().build()) + .mappings(m -> m.properties("conn_state", Property.of(p -> p.keyword(k -> k))) + .properties("dns", Property.of(p -> + p.object(o -> o.properties("AA", Property.of(p2 -> p2.boolean_(b -> b))) + .properties("RA", Property.of(p2 -> p2.boolean_(b -> b))) + .properties("RD", Property.of(p2 -> p2.boolean_(b -> b))) + .properties("TC", Property.of(p2 -> p2.boolean_(b -> b))) + .properties("TTLs", Property.of(p2 -> p2.long_(l -> l))) + .properties("Z", Property.of(p2 -> p2.long_(l -> l))) + .properties("answers", Property.of(p2 -> p2.keyword(k -> k))) + .properties("qclass", Property.of(p2 -> p2.long_(l -> l))) + .properties("qclass_name", Property.of(p2 -> p2.keyword(k -> k))) + .properties("qtype", Property.of(p2 -> p2.long_(l -> l))) + .properties("qtype_name", Property.of(p2 -> p2.keyword(k -> k))) + .properties("query", Property.of(p2 -> p2.keyword(k -> k))) + .properties("rcode", Property.of(p2 -> p2.long_(l -> l))) + .properties("rcode_name", Property.of(p2 -> p2.keyword(k -> k))) + .properties("rejected", Property.of(p2 -> p2.boolean_(b -> b))) + .properties("rtt", Property.of(p2 -> p2.float_(f -> f))) + .properties("trans_id", Property.of(p2 -> p2.long_(l -> l)))) + ) + ) + .properties("duration", Property.of(p -> p.float_(f -> f))) + .properties("history", Property.of(p -> p.keyword(k -> k))) + .properties("http", Property.of(p -> + p.object(o -> o.properties("host", Property.of(p2 -> p2.text(t -> t))) + .properties("method", Property.of(p2 -> p2.keyword(k -> k))) + .properties("orig_fuids", Property.of(p2 -> p2.keyword(k -> k))) + .properties("request_body_len", Property.of(p2 -> p2.long_(l -> l))) + .properties("resp_fuids", Property.of(p2 -> p2.keyword(k -> k))) + .properties("resp_mime_types", Property.of(p2 -> p2.keyword(k -> k))) + .properties("response_body_len", Property.of(p2 -> p2.long_(l -> l))) + .properties("status_code", Property.of(p2 -> p2.long_(l -> l))) + .properties("status_msg", Property.of(p2 -> p2.keyword(k -> k))) + .properties("trans_depth", Property.of(p2 -> p2.long_(l -> l))) + .properties("uri", Property.of(p2 -> p2.text(t -> t))) + .properties("user_agent", Property.of(p2 -> p2.text(t -> t))) + .properties("version", Property.of(p2 -> p2.keyword(k -> k)))) + ) + ) + .properties("id", Property.of(p -> + p.object(o -> o.properties("orig_h", Property.of(p2 -> p2.keyword(k -> k))) + .properties("orig_p", Property.of(p2 -> p2.long_(l -> l))) + .properties("resp_asn", Property.of(p2 -> p2.keyword(k -> k))) + .properties("resp_asname", Property.of(p2 -> p2.keyword(k -> k))) + .properties("resp_country", Property.of(p2 -> p2.keyword(k -> k))) + .properties("resp_domain", Property.of(p2 -> p2.keyword(k -> k))) + .properties("resp_h", Property.of(p2 -> p2.keyword(k -> k))) + .properties("resp_p", Property.of(p2 -> p2.long_(l -> l)))) + ) + ) + .properties("local_orig", Property.of(p -> p.boolean_(b -> b))) + .properties("local_resp", Property.of(p -> p.boolean_(b -> b))) + .properties("missed_bytes", Property.of(p -> p.long_(l -> l))) + .properties("orig_bytes", Property.of(p -> p.long_(l -> l))) + .properties("orig_ip_bytes", Property.of(p -> p.long_(l -> l))) + .properties("orig_pkts", Property.of(p -> p.long_(l -> l))) + .properties("pcap", Property.of(p -> + p.object(o -> o.properties("id", Property.of(p2 -> p2.keyword(k -> k))) + .properties("name", Property.of(p2 -> p2.keyword(k -> k))) + .properties("stream_url", Property.of(p2 -> p2.keyword(k -> k))) + .properties("tcp_stream", Property.of(p2 -> p2.long_(l -> l))) + .properties("udp_stream", Property.of(p2 -> p2.long_(l -> l)))) + ) + ) + .properties("proto", Property.of(p -> p.keyword(k -> k))) + .properties("resp_bytes", Property.of(p -> p.long_(l -> l))) + .properties("resp_ip_bytes", Property.of(p -> p.long_(l -> l))) + .properties("resp_pkts", Property.of(p -> p.long_(l -> l))) + .properties("service", Property.of(p -> p.keyword(k -> k))) + .properties("ssl", Property.of(p -> + p.object(o -> o.properties("cert_chain_fps", Property.of(p2 -> p2.keyword(k -> k))) + .properties("cipher", Property.of(p2 -> p2.keyword(k -> k))) + .properties("curve", Property.of(p2 -> p2.keyword(k -> k))) + .properties("established", Property.of(p2 -> p2.boolean_(b -> b))) + .properties("next_protocol", Property.of(p2 -> p2.keyword(k -> k))) + .properties("resumed", Property.of(p2 -> p2.boolean_(b -> b))) + .properties("server_name", Property.of(p2 -> p2.keyword(k -> k))) + .properties("sni_matches_cert", Property.of(p2 -> p2.boolean_(b -> b))) + .properties("ssl_history", Property.of(p2 -> p2.keyword(k -> k))) + .properties("validation_status", Property.of(p2 -> p2.keyword(k -> k))) + .properties("version", Property.of(p2 -> p2.keyword(k -> k)))) + ) + ) + .properties("ts", Property.of(p -> p.float_(f -> f))) + .properties("tunnel_parents", Property.of(p -> p.text(t -> t))) + .properties("uid", Property.of(p -> p.keyword(k -> k))) + ); + openSearchClient.indices().create(createIndexRequestBuilder.build()); // upload data in bulk BulkRequest.Builder br = new BulkRequest.Builder();