4 Commits

Author SHA1 Message Date
wangchengcheng
d544eb8752 fix:foundTime值根据common_end_timestamp_ms填写 2024-04-15 11:17:44 +08:00
wangchengcheng
4962a40f97 fix:support oos address list 2024-01-31 22:52:22 +08:00
wangchengcheng
ed07a3dbfb style:remove redundant logic 2024-01-22 10:44:26 +08:00
wangchengcheng
4cda43724d feat:adapt to percent log structure 2024-01-18 19:31:05 +08:00
22 changed files with 489 additions and 325 deletions

View File

@@ -5,8 +5,8 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.zdjizhi</groupId>
<artifactId>log-completion-schema</artifactId>
<version>230907</version>
<artifactId>log-stream-doublewrite</artifactId>
<version>24.01</version>
<name>log-completion-schema</name>
<url>http://www.example.com</url>

View File

@@ -0,0 +1,35 @@
none=0
Monitor=1
monitor=1
Intercept=2
intercept=2
NoIntercept=3
nointercept=3
ActiveDefence=4
activedefence=4
WANNAT=8
wannat=8
Reject=16
reject=16
Deny=16
deny=16
Shaping=32
shaping=32
Manipulate=48
manipulate=48
ServiceChaining=64
servicechaining=64
Allow=96
allow=96
Bypass=96
bypass=96
Shunt=128
shunt=128
Statistics=129
statistics=129
redirect=48
replace=48
hijack=48
insert=48
edit_element=48
run_script=48

View File

@@ -0,0 +1,68 @@
#session-record
tcp_c2s_ip_fragments=common_c2s_ipfrag_num
tcp_s2c_ip_fragments=common_s2c_ipfrag_num
tcp_c2s_o3_pkts=common_c2s_tcp_unorder_num
tcp_s2c_o3_pkts=common_s2c_tcp_unorder_num
http_response_latency_ms=http_response_latency_ms
http_session_duration_ms=http_session_duration_ms
security_rule_list=security_rule_list
monitor_rule_list=monitor_rule_list
tcp_handshake_latency_ms=common_establish_latency_ms
mail_protocol_type=mail_protocol_type
mail_account=mail_account
mail_password=mail_passwd
mail_from_cmd=mail_from_cmd
mail_to_cmd=mail_to_cmd
mail_from=mail_from
mail_to=mail_to
mail_cc=mail_cc
mail_bcc=mail_bcc
mail_subject=mail_subject
mail_subject_charset=mail_subject_charset
mail_attachment_name=mail_attachment_name
mail_attachment_name_charset=mail_attachment_name_charset
mail_eml_file=mail_eml_file
dns_message_id=dns_message_id
dns_qr=dns_qr
dns_opcode=dns_opcode
dns_aa=dns_aa
dns_tc=dns_tc
dns_rd=dns_rd
dns_ra=dns_ra
dns_rcode=dns_rcode
dns_qdcount=dns_qdcount
dns_ancount=dns_ancount
dns_nscount=dns_nscount
dns_arcount=dns_arcount
dns_qname=dns_qname
dns_qtype=dns_qtype
dns_qclass=dns_qclass
dns_cname=dns_cname
dns_sub=dns_sub
dns_rr=dns_rr
ssl_version=ssl_version
ssl_sni=ssl_sni
ssl_san=ssl_san
ssl_cn=ssl_cn
ssl_handshake_latency_ms=ssl_con_latency_ms
ssl_ja3_hash=ssl_ja3_hash
ssl_cert_issuer=ssl_cert_issuer
ssl_cert_subject=ssl_cert_subject
quic_version=quic_version
quic_sni=quic_sni
quic_user_agent=quic_user_agent
ftp_account=ftp_account
ftp_url=ftp_url
ftp_link_type=ftp_link_type
http_proxy_flag=http_proxy_flag
http_sequence=http_sequence
tcp_client_isn=common_tcp_client_isn
tcp_server_isn=common_tcp_server_isn
sent_pkts=common_c2s_pkt_num
received_pkts=common_s2c_pkt_num
app=common_app_label
out_link_id=common_egress_link_id
in_link_id=common_ingress_link_id
duration_ms=common_con_duration_ms
http_request_line=http_request_line
http_response_line=http_response_line

View File

@@ -0,0 +1,4 @@
tcp_c2s_ip_fragments=common_c2s_ipfrag_num
tcp_s2c_ip_fragments=common_s2c_ipfrag_num
tcp_c2s_o3_pkts=common_c2s_tcp_unorder_num
tcp_s2c_o3_pkts=common_s2c_tcp_unorder_num

View File

@@ -0,0 +1,42 @@
#security-event
http_request_body=http_request_body
http_response_body=http_response_body
http_response_latency_ms=http_response_latency_ms
http_session_duration_ms=http_session_duration_ms
security_rule_list=security_rule_list
monitor_rule_list=monitor_rule_list
tcp_handshake_latency_ms=common_establish_latency_ms
#proxy-event
http_action_file_size=http_action_file_size
doh_url=doh_url
doh_host=doh_host
doh_cookie=doh_cookie
doh_referer=doh_referer
doh_user_agent=doh_user_agent
doh_version=doh_version
doh_message_id=doh_message_id
doh_qr=doh_qr
doh_opcode=doh_opcode
doh_aa=doh_aa
doh_tc=doh_tc
doh_rd=doh_rd
doh_ra=doh_ra
doh_rcode=doh_rcode
doh_qdcount=doh_qdcount
doh_ancount=doh_ancount
doh_nscount=doh_nscount
doh_arcount=doh_arcount
doh_qname=doh_qname
doh_qtype=doh_qtype
doh_qclass=doh_qclass
doh_cname=doh_cname
doh_sub=doh_sub
doh_rr=doh_rr
proxy_rule_list=proxy_rule_list

View File

@@ -1,68 +0,0 @@
recv_time=common_recv_time
log_id=common_log_id
start_timestamp_ms=common_start_timestamp_ms
end_timestamp_ms=common_end_timestamp_ms
processing_time=common_processing_time
device_id=common_device_id
data_center=common_data_center
sled_ip=common_sled_ip
device_tag=common_device_tag
client_ip=common_client_ip
client_port=common_client_port
client_asn=common_client_asn
subscriber_id=common_subscriber_id
imei=common_imei
imsi=common_imsi
phone_number=common_phone_number
server_ip=common_server_ip
server_port=common_server_port
server_asn=common_server_asn
address_type=common_address_type
http_url=http_url
http_host=http_host
http_request_line=http_request_line
http_response_line=http_response_line
http_request_body=http_request_body
http_response_body=http_response_body
http_cookie=http_cookie
http_referer=http_referer
http_user_agent=http_user_agent
http_request_content_length=http_request_content_length
http_request_content_type=http_request_content_type
http_response_content_length=http_response_content_length
http_response_content_type=http_response_content_type
http_set_cookie=http_set_cookie
http_version=http_version
http_action_file_size=http_action_file_size
doh_url=doh_url
doh_host=doh_host
doh_cookie=doh_cookie
doh_referer=doh_referer
doh_user_agent=doh_user_agent
doh_version=doh_version
doh_message_id=doh_message_id
doh_qr=doh_qr
doh_opcode=doh_opcode
doh_aa=doh_aa
doh_tc=doh_tc
doh_rd=doh_rd
doh_ra=doh_ra
doh_rcode=doh_rcode
doh_qdcount=doh_qdcount
doh_ancount=doh_ancount
doh_nscount=doh_nscount
doh_arcount=doh_arcount
doh_qname=doh_qname
doh_qtype=doh_qtype
doh_qclass=doh_qclass
doh_cname=doh_cname
doh_sub=doh_sub
doh_rr=doh_rr
client_geolocation=common_client_location
server_geolocation=common_server_location
ip_protocol=common_l4_protocol
sent_bytes=common_c2s_byte_num
received_bytes=common_s2c_byte_num
decoded_as=common_schema_type
proxy_rule_list=proxy_rule_list
session_id=common_stream_trace_id

View File

@@ -1,7 +1,7 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
#source.kafka.servers=192.168.44.12:9094
source.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
source.kafka.servers=192.168.44.12:9094
#source.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
#百分点输出kafka地址
percent.sink.kafka.servers=192.168.44.12:9094
#文件源数据topic输出kafka地址
@@ -24,14 +24,16 @@ nacos.server=192.168.44.12:8848
nacos.schema.namespace=P19
#schema data id名称
nacos.schema.data.id=proxy_event.json
nacos.schema.data.id=session_record.json
#--------------------------------Kafka消费/生产配置------------------------------#
#kafka 接收数据topic
source.kafka.topic=SESSION-RECORD
sink.percent.kafka.topic=PERCENT-RECORD
sink.percent.kafka.topic.session=PERCENT-SESSION-RECORD
sink.percent.kafka.topic.security=PERCENT-SECURITY-RECORD
sink.percent.kafka.topic.proxy=PERCENT-POLICY-RECORD
sink.file.data.kafka.topic=test-file-data
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
@@ -46,34 +48,31 @@ transform.parallelism=1
deal.file.parallelism=1
sink.file.data.parallelism=1
sink.percent.parallelism=1
sink.percent.session.parallelism=1
sink.percent.security.parallelism=1
sink.percent.proxy.parallelism=1
#数据中心,取值范围(0-31)
data.center.id.num=0
#hbase 更新时间如填写0则不更新缓存
hbase.tick.tuple.freq.secs=180
#--------------------------------默认值配置------------------------------#
#生产者压缩模式 none or snappy
producer.kafka.compression.type=snappy
#------------------------------------OOS配置------------------------------------#
#oos地址
oos.servers=10.3.45.100:8057
oos.servers=172.18.10.153:8058,172.18.10.154:8058,172.18.10.155:8058,172.18.10.156:8058,172.18.10.157:8058
#prometheus-httpserver
prometheus.pushgateway.address=192.168.44.12:9091
pushgateway.statistics.time=300
deal.file.statistics.time=60
#------------------------------------knowledge配置------------------------------------#
knowledge.execution.minutes=600
knowledge.base.uri=http://192.168.44.67:9999
knowledge.base.uri=http://192.168.44.12:9999
knowledge.base.path=/v1/knowledge_base
ip.user.defined.kd.id=004390bc-3135-4a6f-a492-3662ecb9e289
ip.builtin.kd.id=64af7077-eb9b-4b8f-80cf-2ceebc89bea9
asn.builtin.kd.id=f9f6bc91-2142-4673-8249-e097c00fe1ea
hos.url=http://192.168.44.67:9098/hos/traffic_file_bucket/
hos.url=http://192.168.44.12:9098/hos/traffic_file_bucket/

View File

@@ -19,10 +19,22 @@ server_ip=common_server_ip
server_port=common_server_port
server_asn=common_server_asn
address_type=common_address_type
out_link_id=common_egress_link_id
in_link_id=common_ingress_link_id
client_geolocation=common_client_location
server_geolocation=common_server_location
app=common_app_label
ip_protocol=common_l4_protocol
sent_pkts=common_c2s_pkt_num
received_pkts=common_s2c_pkt_num
sent_bytes=common_c2s_byte_num
received_bytes=common_s2c_byte_num
tcp_client_isn=common_tcp_client_isn
tcp_server_isn=common_tcp_server_isn
decoded_as=common_schema_type
session_id=common_stream_trace_id
http_url=http_url
http_host=http_host
http_request_body=http_request_body
http_response_body=http_response_body
http_proxy_flag=http_proxy_flag
http_sequence=http_sequence
http_cookie=http_cookie
@@ -34,8 +46,6 @@ http_response_content_length=http_response_content_length
http_response_content_type=http_response_content_type
http_set_cookie=http_set_cookie
http_version=http_version
http_response_latency_ms=http_response_latency_ms
http_session_duration_ms=http_session_duration_ms
mail_protocol_type=mail_protocol_type
mail_account=mail_account
mail_password=mail_passwd
@@ -68,26 +78,6 @@ dns_qclass=dns_qclass
dns_cname=dns_cname
dns_sub=dns_sub
dns_rr=dns_rr
quic_version=quic_version
quic_sni=quic_sni
quic_user_agent=quic_user_agent
ftp_account=ftp_account
ftp_url=ftp_url
ftp_link_type=ftp_link_type
out_link_id=common_egress_link_id
in_link_id=common_ingress_link_id
client_geolocation=common_client_location
server_geolocation=common_server_location
app=common_app_label
ip_protocol=common_l4_protocol
sent_pkts=common_c2s_pkt_num
received_pkts=common_s2c_pkt_num
sent_bytes=common_c2s_byte_num
received_bytes=common_s2c_byte_num
tcp_client_isn=common_tcp_client_isn
tcp_server_isn=common_tcp_server_isn
decoded_as=common_schema_type
session_id=common_stream_trace_id
ssl_version=ssl_version
ssl_sni=ssl_sni
ssl_san=ssl_san
@@ -96,6 +86,49 @@ ssl_handshake_latency_ms=ssl_con_latency_ms
ssl_ja3_hash=ssl_ja3_hash
ssl_cert_issuer=ssl_cert_issuer
ssl_cert_subject=ssl_cert_subject
quic_version=quic_version
quic_sni=quic_sni
quic_user_agent=quic_user_agent
ftp_account=ftp_account
ftp_url=ftp_url
ftp_link_type=ftp_link_type
#security-event
http_request_body=http_request_body
http_response_body=http_response_body
http_response_latency_ms=http_response_latency_ms
http_session_duration_ms=http_session_duration_ms
security_rule_list=security_rule_list
monitor_rule_list=monitor_rule_list
tcp_handshake_latency_ms=common_establish_latency_ms
#proxy-event
http_action_file_size=http_action_file_size
doh_url=doh_url
doh_host=doh_host
doh_cookie=doh_cookie
doh_referer=doh_referer
doh_user_agent=doh_user_agent
doh_version=doh_version
doh_message_id=doh_message_id
doh_qr=doh_qr
doh_opcode=doh_opcode
doh_aa=doh_aa
doh_tc=doh_tc
doh_rd=doh_rd
doh_ra=doh_ra
doh_rcode=doh_rcode
doh_qdcount=doh_qdcount
doh_ancount=doh_ancount
doh_nscount=doh_nscount
doh_arcount=doh_arcount
doh_qname=doh_qname
doh_qtype=doh_qtype
doh_qclass=doh_qclass
doh_cname=doh_cname
doh_sub=doh_sub
doh_rr=doh_rr
proxy_rule_list=proxy_rule_list
#session-record
tcp_c2s_ip_fragments=common_c2s_ipfrag_num
tcp_s2c_ip_fragments=common_s2c_ipfrag_num
tcp_c2s_o3_pkts=common_c2s_tcp_unorder_num
tcp_s2c_o3_pkts=common_s2c_tcp_unorder_num

View File

@@ -83,8 +83,10 @@ public class FlowWriteConfig {
public static final Integer BUFFER_TIMEOUT = ConfigurationsUtils.getIntProperty(propDefault, "buffer.timeout");
public static final Integer DEAL_FILE_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "deal.file.parallelism");
public static final Integer SINK_FILE_DATA_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.file.data.parallelism");
public static final Integer SINK_PERCENT_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.percent.parallelism");
public static final Integer SINK_PERCENT_SESSION_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.percent.session.parallelism");
public static final Integer SINK_PERCENT_SECURITY_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.percent.security.parallelism");
public static final Integer SINK_PERCENT_PROXY_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.percent.proxy.parallelism");
/**
* HBase
*/
@@ -134,6 +136,11 @@ public class FlowWriteConfig {
public static final String SINK_PERCENT_KAFKA_TOPIC = ConfigurationsUtils.getStringProperty(propService, "sink.percent.kafka.topic");
public static final String SINK_FILE_DATA_SINK_KAFKA_TOPIC = ConfigurationsUtils.getStringProperty(propService, "sink.file.data.kafka.topic");
public static final String SINK_PERCENT_KAFKA_TOPIC_SESSION = ConfigurationsUtils.getStringProperty(propService, "sink.percent.kafka.topic.session");
public static final String SINK_PERCENT_KAFKA_TOPIC_SECURITY = ConfigurationsUtils.getStringProperty(propService, "sink.percent.kafka.topic.security");
public static final String SINK_PERCENT_KAFKA_TOPIC_PROXY = ConfigurationsUtils.getStringProperty(propService, "sink.percent.kafka.topic.proxy");
/**
* connection kafka
*/

View File

@@ -6,7 +6,7 @@ import com.zdjizhi.common.FlowWriteConfig;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import io.prometheus.client.exporter.PushGateway;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
@@ -15,13 +15,15 @@ import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
public class SendCountProcess extends ProcessFunction<Tuple5<Long, Long, Long, Long, Long>, String> {
public class SendCountProcess extends ProcessFunction<Tuple7<Long, Long, Long, Long, Long, Long, Long>, String> {
private static final Log logger = LogFactory.get();
private long recordCount = 0L;
private long failedCount = 0L;
private long httpRequestCount = 0L;
private long httpResponseCount = 0L;
private long mailEmlCount = 0L;
private long securityCount = 0L;
private long proxyCount = 0L;
static final Gauge recordCountsGauge = Gauge.build()
@@ -37,6 +39,10 @@ public class SendCountProcess extends ProcessFunction<Tuple5<Long, Long, Long, L
.name("httpResponseCount").labelNames("ServerName", "Duration").help("The general intput log volume, the unit is slip").register();
static final Gauge mailEmlCountCountGauge = Gauge.build()
.name("mailEmlCount").labelNames("ServerName", "Duration").help("The general intput log volume, the unit is slip").register();
static final Gauge securityCountGauge = Gauge.build()
.name("securityCount").labelNames("ServerName", "Duration").help("The general intput log volume, the unit is slip").register();
static final Gauge proxyCountGauge = Gauge.build()
.name("proxyCount").labelNames("ServerName", "Duration").help("The general intput log volume, the unit is slip").register();
@Override
public void open(Configuration parameters) {
@@ -53,6 +59,9 @@ public class SendCountProcess extends ProcessFunction<Tuple5<Long, Long, Long, L
httpRequestCountGauge.labels("httpRequestCount", String.valueOf(FlowWriteConfig.PUSHGATEWAY_STATISTICS_TIME / 60)).set(httpRequestCount);
httpResponseCountGauge.labels("httpResponseCount", String.valueOf(FlowWriteConfig.PUSHGATEWAY_STATISTICS_TIME / 60)).set(httpResponseCount);
mailEmlCountCountGauge.labels("mailEmlCount", String.valueOf(FlowWriteConfig.PUSHGATEWAY_STATISTICS_TIME / 60)).set(mailEmlCount);
securityCountGauge.labels("securityCount", String.valueOf(FlowWriteConfig.PUSHGATEWAY_STATISTICS_TIME / 60)).set(securityCount);
proxyCountGauge.labels("proxyCount", String.valueOf(FlowWriteConfig.PUSHGATEWAY_STATISTICS_TIME / 60)).set(proxyCount);
try {
//将指标推送至pushgateway
push();
@@ -72,13 +81,15 @@ public class SendCountProcess extends ProcessFunction<Tuple5<Long, Long, Long, L
}
@Override
public void processElement(Tuple5<Long, Long, Long, Long, Long> value, Context ctx, Collector<String> out) {
public void processElement(Tuple7<Long, Long, Long, Long, Long, Long, Long> value, Context ctx, Collector<String> out) {
try {
recordCount = recordCount + value.f0;
failedCount = failedCount + value.f1;
httpRequestCount = httpRequestCount + value.f2;
httpResponseCount = httpResponseCount + value.f3;
mailEmlCount = mailEmlCount + value.f4;
securityCount = securityCount + value.f5;
proxyCount = proxyCount + value.f6;
} catch (Exception e) {
logger.error("统计指标处理失败,原因为" + e);

View File

@@ -2,6 +2,7 @@ package com.zdjizhi.operator.map;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.tools.general.ConfigurationsUtils;
@@ -12,6 +13,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Properties;
@@ -23,17 +25,27 @@ public class TypeMapCompleted extends ProcessFunction<String, JSONObject> {
private static final Log logger = LogFactory.get();
private ConvertRecordToPERCENT convertRecordToPERCENT;
Properties Prop = new Properties();
Properties actionProp = new Properties();
private HashMap<String, Integer> actionMap = new HashMap<String, Integer>();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
try {
if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("security_event.json")) {
Prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("security_event_mapping_table.properties"));
} else if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("proxy_event.json")) {
Prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("proxy_event_mapping_table.properties"));
}
Prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("session_record_mapping_table.properties"));
convertRecordToPERCENT = new ConvertRecordToPERCENT(Prop);
logger.info("session_record_mapping_table.properties日志加载成功");
} catch (Exception e) {
logger.error("session_record_mapping_table.properties日志加载失败,失败原因为:" + e);
}
try {
actionProp.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("action_definition.properties"));
for (String key : actionProp.stringPropertyNames()) {
final String action = actionProp.getProperty(key);
actionMap.put(key, Integer.valueOf(action));
}
logger.info(FlowWriteConfig.NACOS_SCHEMA_DATA_ID + "日志Schema加载成功");
} catch (Exception e) {
logger.error(FlowWriteConfig.NACOS_SCHEMA_DATA_ID + "日志Schema加载失败,失败原因为:" + e);
@@ -44,22 +56,26 @@ public class TypeMapCompleted extends ProcessFunction<String, JSONObject> {
public void processElement(String message, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) {
try {
JSONObject record = JSONObject.parseObject(message);
JSONObject jsonObject = null;
if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("security_event.json")) {
if (record.containsKey("security_rule_list") || record.containsKey("monitor_rule_list")) {
jsonObject = convertRecordToPERCENT.convertToPERCENT(record);
}
}
if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("proxy_event.json")) {
if (record.containsKey("proxy_rule_list")) {
jsonObject = convertRecordToPERCENT.convertToPERCENT(record);
}
}
JSONObject jsonObject = convertRecordToPERCENT.convertToPERCENT(record);
if (jsonObject != null) {
jsonObject.put("common_ingestion_time", ctx.timestamp() / 1000);
if (record.containsKey("security_rule_list")) {
jsonObject.put("common_policy_id", JSONArray.from(record.get("security_rule_list")).get(0));
jsonObject.put("common_action", actionMap.get(record.get("security_action").toString().replace(" ","")));
}
if (record.containsKey("monitor_rule_list")) {
jsonObject.put("common_policy_id", JSONArray.from(record.get("monitor_rule_list")).get(0));
jsonObject.put("common_action", 1);
}
if (record.containsKey("proxy_rule_list")) {
jsonObject.put("common_policy_id", JSONArray.from(record.get("proxy_rule_list")).get(0));
jsonObject.put("common_action", actionMap.get(record.get("proxy_action").toString().replace(" ","")));
}
jsonObject.put("common_ingestion_time", ctx.timestamp() / 1000);
TransForm.transformLog(jsonObject);
MetaUtil.typeTransform(jsonObject);
out.collect(jsonObject);

View File

@@ -0,0 +1,40 @@
package com.zdjizhi.operator.percent;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.tools.general.ConfigurationsUtils;
import com.zdjizhi.tools.logtransformation.ConvertRecordToPERCENT;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class PercentProxyProcess extends ProcessFunction<JSONObject, String> {
private static final Log logger = LogFactory.get();
Properties prop = new Properties();
private ConvertRecordToPERCENT convertRecordToPERCENT;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
try {
prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("percent_proxy_event.properties"));
convertRecordToPERCENT = new ConvertRecordToPERCENT(prop);
logger.info("percent_proxy_event.properties日志加载成功");
} catch (Exception e) {
logger.error("percent_proxy_event.properties日志加载失败,失败原因为:" + e);
}
}
@Override
public void processElement(JSONObject value, Context ctx, Collector<String> out) {
try {
out.collect(JSONObject.toJSONString(convertRecordToPERCENT.removeFields(value)));
} catch (Exception e) {
logger.error("删减proxy_event日志字段失败,失败原因为:{},数据为:{}", e, value);
}
}
}

View File

@@ -0,0 +1,42 @@
package com.zdjizhi.operator.percent;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.tools.general.ConfigurationsUtils;
import com.zdjizhi.tools.logtransformation.ConvertRecordToPERCENT;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class PercentSecurityProcess extends ProcessFunction<JSONObject, String> {
private static final Log logger = LogFactory.get();
Properties prop = new Properties();
private ConvertRecordToPERCENT convertRecordToPERCENT;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
try {
prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("percent_security_event.properties"));
convertRecordToPERCENT = new ConvertRecordToPERCENT(prop);
logger.info("percent_security_event.properties日志加载成功");
} catch (Exception e) {
logger.error("percent_security_event.properties日志加载失败,失败原因为:" + e);
}
}
@Override
public void processElement(JSONObject value, Context ctx, Collector<String> out) {
try {
out.collect(JSONObject.toJSONString(convertRecordToPERCENT.removeFields(value)));
} catch (Exception e) {
logger.error("删减security_event日志字段失败,失败原因为:{},数据为:{}", e, value);
}
}
}

View File

@@ -0,0 +1,40 @@
package com.zdjizhi.operator.percent;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.tools.general.ConfigurationsUtils;
import com.zdjizhi.tools.logtransformation.ConvertRecordToPERCENT;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class PercentSessionProcess extends ProcessFunction<JSONObject, String> {
private static final Log logger = LogFactory.get();
Properties prop = new Properties();
private ConvertRecordToPERCENT convertRecordToPERCENT;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
try {
prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("percent_session_record.properties"));
convertRecordToPERCENT = new ConvertRecordToPERCENT(prop);
logger.info("percent_session_record.properties日志加载成功");
} catch (Exception e) {
logger.error("percent_session_record.properties日志加载失败,失败原因为:" + e);
}
}
@Override
public void processElement(JSONObject value, Context ctx, Collector<String> out) {
try {
out.collect(JSONObject.toJSONString(convertRecordToPERCENT.removeFields(value)));
} catch (Exception e) {
logger.error("删减percent_session日志字段失败,失败原因为:{},数据为:{}", e, value);
}
}
}

View File

@@ -10,12 +10,13 @@ import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.common.pojo.FileMeta;
import com.zdjizhi.common.pojo.SourceList;
import com.zdjizhi.tools.general.FileEdit;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
@@ -26,10 +27,16 @@ import java.util.TimerTask;
* @Description:
* @date 2023/0928
*/
public class DealFileProcessFunction extends ProcessFunction<JSONObject, String> {
public class DealFileProcess extends ProcessFunction<JSONObject, JSONObject> {
private static final Log logger = LogFactory.get();
public static final OutputTag<Tuple5<Long, Long, Long, Long, Long>> dealFileMetircTag = new OutputTag<Tuple5<Long, Long, Long, Long, Long>>("DealFileMetircTag") {
public static final OutputTag<Tuple7<Long, Long, Long, Long, Long, Long, Long>> dealFileMetircTag = new OutputTag<Tuple7<Long, Long, Long, Long, Long, Long, Long>>("DealFileMetircTag") {
};
public static final OutputTag<JSONObject> percentSecurityTag = new OutputTag<JSONObject>("percentSecurityTag") {
};
public static final OutputTag<JSONObject> percentProxyTag = new OutputTag<JSONObject>("percentProxyTag") {
};
private String rpUrlValue;
@@ -53,6 +60,9 @@ public class DealFileProcessFunction extends ProcessFunction<JSONObject, String>
private long httpResponseCount = 0L;
private long mailEmlCount = 0L;
private boolean metricSendFlag = true;
private long securityCount = 0L;
private long proxyCount = 0L;
private Random random;
//初始化侧输流的标记
@@ -62,6 +72,7 @@ public class DealFileProcessFunction extends ProcessFunction<JSONObject, String>
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
random = new Random();
Timer timer = new Timer();
//注册定时器
timer.schedule(new TimerTask() {
@@ -77,18 +88,20 @@ public class DealFileProcessFunction extends ProcessFunction<JSONObject, String>
@SuppressWarnings("unchecked")
@Override
public void processElement(JSONObject message, Context context, Collector<String> collector) throws Exception {
public void processElement(JSONObject message, Context context, Collector<JSONObject> collector) throws Exception {
try {
//定时向下游推送指标
if (metricSendFlag) {
metricSendFlag = false;
if (recordCount > 0 || failedCount > 0 || httpRequestCount > 0 || httpResponseCount > 0 || mailEmlCount > 0) {
context.output(dealFileMetircTag, Tuple5.of(recordCount, failedCount, httpRequestCount, httpResponseCount, mailEmlCount));
context.output(dealFileMetircTag, Tuple7.of(recordCount, failedCount, httpRequestCount, httpResponseCount, mailEmlCount, securityCount, proxyCount));
recordCount = 0L;
failedCount = 0L;
httpRequestCount = 0;
httpResponseCount = 0;
mailEmlCount = 0L;
securityCount = 0L;
proxyCount = 0L;
}
}
recordCount++;
@@ -102,37 +115,42 @@ public class DealFileProcessFunction extends ProcessFunction<JSONObject, String>
sPort = (int) message.get("common_client_port");
dIp = (String) message.get("common_server_ip");
dPort = (int) message.get("common_server_port");
foundTime = (long) message.get("common_recv_time");
foundTime = (long) message.get("common_end_timestamp_ms");
schemaType = (String) message.get("common_schema_type");
domain = (String) message.getOrDefault("common_server_domain", "");
account = (String) message.getOrDefault("common_subscribe_id", "");
FileMeta fileMeta = new FileMeta();
JSONArray jsonarray = new JSONArray();
final String[] oosArr = FlowWriteConfig.OOS_SERVERS.split(",");
final int i = random.nextInt(oosArr.length);
if (StringUtil.isNotBlank(rqUrlValue)) {
String fileId = FileEdit.getFileId(rqUrlValue, "_1");
message.put("http_request_body", FileEdit.getFileDownloadUrl(fileId));
message.put("http_request_body", FileEdit.getFileDownloadUrl(fileId,oosArr[i]));
SourceList request = new SourceList();
request.setSource_oss_path(FlowWriteConfig.HOS_URL+rqUrlValue);
request.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId));
request.setSource_oss_path(FlowWriteConfig.HOS_URL + rqUrlValue);
request.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId,oosArr[i]));
jsonarray.add(request);
httpRequestCount++;
}
if (StringUtil.isNotBlank(rpUrlValue)) {
String fileId = FileEdit.getFileId(rpUrlValue, "_2");
message.put("http_response_body", FileEdit.getFileDownloadUrl(fileId));
message.put("http_response_body", FileEdit.getFileDownloadUrl(fileId,oosArr[i]));
SourceList response = new SourceList();
response.setSource_oss_path(FlowWriteConfig.HOS_URL+rpUrlValue);
response.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId));
response.setSource_oss_path(FlowWriteConfig.HOS_URL + rpUrlValue);
response.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId,oosArr[i]));
jsonarray.add(response);
httpResponseCount++;
}
if (StringUtil.isNotBlank(emailUrlValue)) {
String fileId = FileEdit.getFileId(emailUrlValue, "_9");
message.put("mail_eml_file", FileEdit.getFileDownloadUrl(fileId));
message.put("mail_eml_file", FileEdit.getFileDownloadUrl(fileId,oosArr[i]));
SourceList emailFile = new SourceList();
emailFile.setSource_oss_path(FlowWriteConfig.HOS_URL+emailUrlValue);
emailFile.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId));
emailFile.setSource_oss_path(FlowWriteConfig.HOS_URL + emailUrlValue);
emailFile.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId,oosArr[i]));
jsonarray.add(emailFile);
mailEmlCount++;
}
@@ -143,7 +161,15 @@ public class DealFileProcessFunction extends ProcessFunction<JSONObject, String>
fileMeta.setProcessing_time((int) (System.currentTimeMillis() / 1000));
context.output(metaToKafa, JSONObject.toJSONString(fileMeta));
}
collector.collect(JSONObject.toJSONString(message));
if (message.containsKey("security_rule_list") || message.containsKey("monitor_rule_list")) {
context.output(percentSecurityTag, message);
securityCount++;
} else if (message.containsKey("proxy_rule_list")) {
context.output(percentProxyTag, message);
proxyCount++;
}
collector.collect(message);
}
} catch (RuntimeException e) {
logger.error("处理带有非结构结构化字段的日志出错:" + e + "\n" + message);

View File

@@ -30,9 +30,9 @@ public class KafkaProducer {
return properties;
}
public static FlinkKafkaProducer<String> getPercentKafkaProducer() {
public static FlinkKafkaProducer<String> getPercentKafkaProducer(String kafkaTopic) {
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC,
kafkaTopic,
new SimpleStringSchema(),
createProducerConfig(FlowWriteConfig.PERCENT_SINK_KAFKA_SERVERS), Optional.empty());

View File

@@ -1,18 +1,14 @@
package com.zdjizhi.tools.general;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.tools.ordinary.MD5Utils;
import static com.zdjizhi.common.FlowWriteConfig.judgeFileType;
/**
* 文件字段操作工具
*/
public class FileEdit {
public static String getFileUploadUrl(long cfgId, String sIp, int sPort, String dIp, int dPort, long foundTime, String account, String domain, String schemaType, String fileId) {
public static String getFileUploadUrl(long cfgId, String sIp, int sPort, String dIp, int dPort, long foundTime, String account, String domain, String schemaType, String fileId,String oosServer) {
String fileType = null;
if (schemaType.equals("HTTP")) {
fileType = "html";
@@ -21,26 +17,20 @@ public class FileEdit {
fileType = "eml";
}
return "http://" + FlowWriteConfig.OOS_SERVERS + "/v3/upload?cfg_id=" + cfgId + "&file_id=" + fileId + "&file_type=" + fileType + "&found_time=" + foundTime + "&s_ip=" + sIp + "&s_port=" + sPort + "&d_ip=" + dIp + "&d_port=" + dPort + "&domain=" + domain + "&account=" + account;
return "http://" + oosServer + "/v3/upload?cfg_id=" + cfgId + "&file_id=" + fileId + "&file_type=" + fileType + "&found_time=" + foundTime + "&s_ip=" + sIp + "&s_port=" + sPort + "&d_ip=" + dIp + "&d_port=" + dPort + "&domain=" + domain + "&account=" + account;
}
public static String getFileDownloadUrl(String fileId) {
return "http://" + FlowWriteConfig.OOS_SERVERS + "/v3/download?file_id=" + fileId;
public static String getFileDownloadUrl(String fileId,String oosServer) {
return "http://" + oosServer + "/v3/download?file_id=" + fileId;
}
public static String getFileType(String url) {
String[] split = url.split("\\.");
return split[split.length - 1];
}
public static String getFileId(String filename, String fileSuffix) throws Exception {
// String[] arr = url.split("/");
// String filename = arr[arr.length-1].substring(0,arr[arr.length-1].lastIndexOf("_"));
// String prefix = MD5Utils.md5Encode(filename);
// String suffix = arr[arr.length-1].substring(arr[arr.length-1].lastIndexOf("_"),arr[arr.length-1].lastIndexOf("."));
return filename + fileSuffix;
String prefix = MD5Utils.md5Encode(filename);
return prefix + fileSuffix;
}
}

View File

@@ -7,7 +7,6 @@ import com.alibaba.fastjson2.*;
import com.geedgenetworks.utils.IpLookupV2;
import com.geedgenetworks.utils.StringUtil;
import com.google.common.base.Joiner;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.common.pojo.KnowlegeBaseMeta;
import com.zdjizhi.tools.connections.http.HttpClientService;
@@ -204,9 +203,4 @@ public class IpLookupUtils {
}
return knowlegeBaseMeta;
}
public static void main(String[] args) {
final String countryLookup = IpLookupUtils.getIpLookup().asnLookup("10.64.10.7");
System.out.println(countryLookup);
}
}

View File

@@ -1,19 +1,18 @@
package com.zdjizhi.tools.logtransformation;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import java.util.*;
public class ConvertRecordToPERCENT {
private Properties securityProp;
private Properties prop;
private HashMap<String, String> recordSchema;
public ConvertRecordToPERCENT(Properties securityProp) {
this.securityProp = securityProp;
public ConvertRecordToPERCENT(Properties prop) {
this.prop = prop;
final HashMap<String, String> schemaMap = new HashMap<String, String>();
for (String key : securityProp.stringPropertyNames()) {
final String schema = securityProp.getProperty(key);
for (String key : prop.stringPropertyNames()) {
final String schema = prop.getProperty(key);
schemaMap.put(key, schema);
}
this.recordSchema = schemaMap;
@@ -27,30 +26,19 @@ public class ConvertRecordToPERCENT {
}
}
if (record.containsKey("security_rule_list")) {
percent.put("common_policy_id", (Integer) JSONArray.from(record.get("security_rule_list")).get(0));
percent.put("common_action", fillingCommonAction((String) record.get("security_action")));
}
if (record.containsKey("monitor_rule_list")) {
percent.put("common_policy_id", (Integer) JSONArray.from(record.get("monitor_rule_list")).get(0));
percent.put("common_action", 1);
}
if (record.containsKey("proxy_rule_list")){
percent.put("common_policy_id", (Integer) JSONArray.from(record.get("proxy_rule_list")).get(0));
percent.put("common_action", fillingCommonAction((String) record.get("proxy_action")));
}
//填充common_start_time、common_end_time
percent.put("common_start_time",record.get("start_timestamp_ms"));
percent.put("common_end_time",record.get("end_timestamp_ms"));
percent.put("common_start_time", (long) record.get("start_timestamp_ms") / 1000);
percent.put("common_end_time", (long) record.get("end_timestamp_ms") / 1000);
//填充common_sessions
percent.put("common_sessions", 1);
//填充common_internal_ip、common_external_ip、common_direction、common_stream_dir
if (record.containsKey("flags")) {
final int flags = (int) record.get("flags");
if (flags > 0) {
if ((8L & flags) == 8L && (16L & flags) != 16L) {
if ((8L & flags) == 8L && (16L & flags) != 16L) {
percent.put("common_internal_ip", record.get("common_client_ip"));
percent.put("common_external_ip", record.get("common_server_ip"));
percent.put("common_direction", 69);
@@ -59,6 +47,7 @@ public class ConvertRecordToPERCENT {
percent.put("common_external_ip", record.get("common_client_ip"));
percent.put("common_direction", 73);
}
if ((8192L & flags) == 8192L && (16384L & flags) == 16384L) {
percent.put("common_stream_dir", 3);
} else if ((8192L & flags) == 8192L) {
@@ -71,55 +60,13 @@ public class ConvertRecordToPERCENT {
return percent;
}
private int fillingCommonAction(String action) {
int number = 0;
switch (action) {
case "none":
number = 0;
break;
case "Monitor":
number = 1;
break;
case "Intercept":
number = 2;
break;
case "No Intercept":
number = 3;
break;
case "Active Defence":
number = 4;
break;
case "WAN NAT":
number = 8;
break;
case "Reject":
case "Deny":
number = 16;
break;
case "Shaping":
number = 32;
break;
case "Manipulate":
number = 48;
break;
case "Service Chaining":
number = 64;
break;
case "Allow":
case "Bypass":
number = 96;
break;
case "Shunt":
number = 128;
break;
case "Statistics":
number = 129;
break;
default:
number = 0;
public JSONObject removeFields(JSONObject record) {
for (Map.Entry<String, String> entry : recordSchema.entrySet()) {
if (record.containsKey(entry.getValue())) {
record.remove(entry.getValue());
}
}
return number;
return record;
}
}

View File

@@ -7,15 +7,16 @@ import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.operator.count.SendCountProcess;
import com.zdjizhi.operator.map.MapCompleted;
import com.zdjizhi.operator.percent.PercentProxyProcess;
import com.zdjizhi.operator.percent.PercentSecurityProcess;
import com.zdjizhi.operator.map.TypeMapCompleted;
import com.zdjizhi.operator.process.DealFileProcessFunction;
import com.zdjizhi.operator.percent.PercentSessionProcess;
import com.zdjizhi.operator.process.DealFileProcess;
import com.zdjizhi.tools.connections.kafka.KafkaConsumer;
import com.zdjizhi.tools.connections.kafka.KafkaProducer;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class LogFlowWriteTopology {
private static final Log logger = LogFactory.get();
@@ -37,12 +38,21 @@ public class LogFlowWriteTopology {
}
//处理带有非结构化文件字段的数据
SingleOutputStreamOperator<String> dealFileProcessFunction = completedStream.process(new DealFileProcessFunction()).name("DealFileProcessFunction").uid("DealFile-ProcessFunction").setParallelism(FlowWriteConfig.DEAL_FILE_PARALLELISM);
SingleOutputStreamOperator<JSONObject> dealFileProcessFunction = completedStream.process(new DealFileProcess()).name("DealFileProcess").uid("DealFile-Process").setParallelism(FlowWriteConfig.DEAL_FILE_PARALLELISM);
//补全后的数据发送至百分点的kafka
dealFileProcessFunction.addSink(KafkaProducer.getPercentKafkaProducer()).name("ToPercentKafka").uid("To-Percent-Kafka").setParallelism(FlowWriteConfig.SINK_PERCENT_PARALLELISM);
final SingleOutputStreamOperator<String> percentSecurityProcess = dealFileProcessFunction.getSideOutput(DealFileProcess.percentSecurityTag).process(new PercentSecurityProcess()).name("PercentSecurityProcess").uid("Percent-Security-Process").setParallelism(FlowWriteConfig.SINK_PERCENT_SECURITY_PARALLELISM);
final SingleOutputStreamOperator<String> percentProxyProcess = dealFileProcessFunction.getSideOutput(DealFileProcess.percentProxyTag).process(new PercentProxyProcess()).name("PercentProxyProcess").uid("Percent-Proxy-Process").setParallelism(FlowWriteConfig.SINK_PERCENT_PROXY_PARALLELISM);
final SingleOutputStreamOperator<String> percentSessionProcess = dealFileProcessFunction.process(new PercentSessionProcess()).name("PercentSessionProcess").uid("Percent-Session-Process").setParallelism(FlowWriteConfig.SINK_PERCENT_SESSION_PARALLELISM);
percentSecurityProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_SECURITY)).name("sendSecurityEvent").uid("send-Security-Event").setParallelism(FlowWriteConfig.SINK_PERCENT_PROXY_PARALLELISM);
percentProxyProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_PROXY)).name("sendProxyEvent").uid("send-Proxy-Event").setParallelism(FlowWriteConfig.SINK_PERCENT_SECURITY_PARALLELISM);
percentSessionProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_SESSION)).name("sendSessionRECORD").uid("send-Session-RECORD").setParallelism(FlowWriteConfig.SINK_PERCENT_SESSION_PARALLELISM);
//文件元数据发送至TRAFFIC-FILE-METADATA
dealFileProcessFunction.getSideOutput(DealFileProcessFunction.metaToKafa).addSink(KafkaProducer.getTrafficFileMetaKafkaProducer()).name("toTrafficFileMeta").uid("to-Traffic-FileMeta").setParallelism(FlowWriteConfig.SINK_FILE_DATA_PARALLELISM);
dealFileProcessFunction.getSideOutput(DealFileProcessFunction.dealFileMetircTag).process(new SendCountProcess()).name("SendCountProcess").uid("Send-Count-Process").setParallelism(1);
dealFileProcessFunction.getSideOutput(DealFileProcess.metaToKafa).addSink(KafkaProducer.getTrafficFileMetaKafkaProducer()).name("toTrafficFileMeta").uid("to-Traffic-FileMeta").setParallelism(FlowWriteConfig.SINK_FILE_DATA_PARALLELISM);
dealFileProcessFunction.getSideOutput(DealFileProcess.dealFileMetircTag).process(new SendCountProcess()).name("SendCountProcess").uid("Send-Count-Process").setParallelism(1);
try {
environment.execute(args[0]);
} catch (Exception e) {

View File

@@ -1,75 +0,0 @@
package com.zdjizhi.hdfs;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* @author qidaijie
* @Package com.zdjizhi.tools.connections.hadoop
* @Description:
* @date 2022/11/217:57
*/
public class FileUtilsTest {
private static final Log logger = LogFactory.get();
private static FileSystem fileSystem;
static {
Configuration configuration = new Configuration();
try {
configuration.set("fs.defaultFS","hdfs://ns1");
configuration.set("hadoop.proxyuser.root.hosts","*");
configuration.set("hadoop.proxyuser.root.groups","*");
configuration.set("ha.zookeeper.quorum","192.168.44.83:2181,192.168.44.84:2181,192.168.44.85:2181");
configuration.set("dfs.nameservices","ns1");
configuration.set("dfs.ha.namenodes.ns1","nn1,nn2");
configuration.set("dfs.namenode.rpc-address.ns1.nn1","192.168.44.85:9000");
configuration.set("dfs.namenode.rpc-address.ns1.nn2","192.168.44.86:9000");
configuration.set("dfs.client.failover.proxy.provider.ns1","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
//创建fileSystem,用于连接hdfs
fileSystem = FileSystem.get(configuration);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test
public void mkdir() throws Exception{
fileSystem.mkdirs(new Path("/knowledgebase/test"));
}
@Test
public void create() throws Exception{
FSDataOutputStream outputStream = fileSystem.create(new Path("/knowledgebase/test/test.txt"));
outputStream.write("Hello World".getBytes());
outputStream.flush();
outputStream.close();
}
@Test
public void cat() throws Exception{
FSDataInputStream inputStream = fileSystem.open(new Path("/knowledgebase/test/test.txt"));
IOUtils.copyBytes(inputStream, System.out, 1024);
inputStream.close();
}
@Test
public void rename() throws Exception{
fileSystem.rename(new Path("/knowledgebase/test/test.txt"), new Path("/knowledgebase/test/test1.txt"));
}
@Test
public void delete() throws Exception{
fileSystem.delete(new Path("/knowledgebase/test"),true);//是否递归删除
}
}

View File

@@ -1,7 +1,10 @@
package com.zdjizhi.schema;
import com.zdjizhi.tools.general.FileEdit;
import com.zdjizhi.tools.ordinary.MD5Utils;
public class Test {
public static void main(String[] args) {
System.out.println(26286 & 2132321);
public static void main(String[] args) throws Exception {
System.out.println(FileEdit.getFileId("4856d031-521e-4426-a03a-42ca4bb96028", "_9"));
}
}