修改边属性逻辑错误

This commit is contained in:
wanglihui
2020-07-01 14:44:45 +08:00
parent 7e8f4d763e
commit 2d543b3df9
5 changed files with 95 additions and 73 deletions

View File

@@ -66,22 +66,20 @@ public class BaseClickhouseData {
DruidPooledConnection connection = manger.getConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
// HashSet<String> fqdnSet = new HashSet<>();
while (resultSet.next()) {
// String commonSchemaType = resultSet.getString("common_schema_type");
// String fqdnName = commonSchemaGetFqdn(commonSchemaType,resultSet);
String fqdnName = resultSet.getString("FQDN");
// fqdnSet.add(fqdnName);
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
BaseDocument newDoc = new BaseDocument();
newDoc.setKey(fqdnName);
newDoc.addAttribute("FQDN_NAME", fqdnName);
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
int i = Math.abs(fqdnName.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER;
ArrayList<BaseDocument> documentList = vFqdnMap.getOrDefault(i, new ArrayList<>());
documentList.add(newDoc);
if (isDomain(fqdnName)){
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
BaseDocument newDoc = new BaseDocument();
newDoc.setKey(fqdnName);
newDoc.addAttribute("FQDN_NAME", fqdnName);
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
int i = Math.abs(fqdnName.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER;
ArrayList<BaseDocument> documentList = vFqdnMap.getOrDefault(i, new ArrayList<>());
documentList.add(newDoc);
}
}
long last = System.currentTimeMillis();
LOG.info(sql + "\n读取clickhouse v_FQDN时间" + (last - start));
@@ -154,29 +152,31 @@ public class BaseClickhouseData {
while (resultSet.next()) {
String commonSchemaType = resultSet.getString("common_schema_type");
String vFqdn = resultSet.getString("FQDN");
String vIp = resultSet.getString("common_server_ip");
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
long countTotal = resultSet.getLong("COUNT_TOTAL");
String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray();
if (isDomain(vFqdn)){
String vIp = resultSet.getString("common_server_ip");
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
long countTotal = resultSet.getLong("COUNT_TOTAL");
String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray();
String key = vFqdn + "-" + vIp;
BaseEdgeDocument newDoc = new BaseEdgeDocument();
newDoc.setKey(key);
newDoc.setFrom("FQDN/" + vFqdn);
newDoc.setTo("IP/" + vIp);
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
newDoc.addAttribute("COUNT_TOTAL", countTotal);
newDoc.addAttribute("DIST_CIP_RECENT", distCipRecents);
newDoc.addAttribute("DIST_CIP_TOTAL", distCipRecents);
String key = vFqdn + "-" + vIp;
BaseEdgeDocument newDoc = new BaseEdgeDocument();
newDoc.setKey(key);
newDoc.setFrom("FQDN/" + vFqdn);
newDoc.setTo("IP/" + vIp);
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
newDoc.addAttribute("COUNT_TOTAL", countTotal);
newDoc.addAttribute("DIST_CIP_RECENT", distCipRecents);
newDoc.addAttribute("DIST_CIP_TOTAL", distCipRecents);
int hashMod = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER;
HashMap<String, HashMap<String, BaseEdgeDocument>> documentHashMap = eFqdnAddressIpMap.getOrDefault(hashMod, new HashMap());
int hashMod = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER;
HashMap<String, HashMap<String, BaseEdgeDocument>> documentHashMap = eFqdnAddressIpMap.getOrDefault(hashMod, new HashMap());
HashMap<String, BaseEdgeDocument> schemaHashMap = documentHashMap.getOrDefault(key, new HashMap<>());
schemaHashMap.put(commonSchemaType, newDoc);
documentHashMap.put(key, schemaHashMap);
HashMap<String, BaseEdgeDocument> schemaHashMap = documentHashMap.getOrDefault(key, new HashMap<>());
schemaHashMap.put(commonSchemaType, newDoc);
documentHashMap.put(key, schemaHashMap);
}
}
long last = System.currentTimeMillis();
LOG.info(sql + "\n读取clickhouse EFqdnAddressIp时间" + (last - start));
@@ -203,24 +203,26 @@ public class BaseClickhouseData {
String commonSchemaType = resultSet.getString("common_schema_type");
String vIp = resultSet.getString("common_client_ip");
String vFqdn = resultSet.getString("FQDN");
String key = vIp + "-" + vFqdn;
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
long countTotal = resultSet.getLong("COUNT_TOTAL");
if (isDomain(vFqdn)){
String key = vIp + "-" + vFqdn;
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
long countTotal = resultSet.getLong("COUNT_TOTAL");
BaseEdgeDocument newDoc = new BaseEdgeDocument();
newDoc.setKey(key);
newDoc.setFrom("IP/" + vIp);
newDoc.setTo("FQDN/" + vFqdn);
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
newDoc.addAttribute("COUNT_TOTAL", countTotal);
int i = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER;
HashMap<String, HashMap<String, BaseEdgeDocument>> documentHashMap = eIpVisitFqdnMap.getOrDefault(i, new HashMap());
BaseEdgeDocument newDoc = new BaseEdgeDocument();
newDoc.setKey(key);
newDoc.setFrom("IP/" + vIp);
newDoc.setTo("FQDN/" + vFqdn);
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
newDoc.addAttribute("COUNT_TOTAL", countTotal);
int i = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER;
HashMap<String, HashMap<String, BaseEdgeDocument>> documentHashMap = eIpVisitFqdnMap.getOrDefault(i, new HashMap());
HashMap<String, BaseEdgeDocument> schemaHashMap = documentHashMap.getOrDefault(key, new HashMap<>());
schemaHashMap.put(commonSchemaType, newDoc);
documentHashMap.put(key, schemaHashMap);
HashMap<String, BaseEdgeDocument> schemaHashMap = documentHashMap.getOrDefault(key, new HashMap<>());
schemaHashMap.put(commonSchemaType, newDoc);
documentHashMap.put(key, schemaHashMap);
}
}
long last = System.currentTimeMillis();
LOG.info(sql + "\n读取clickhouse EIpVisitFqdn时间" + (last - start));
@@ -235,6 +237,7 @@ public class BaseClickhouseData {
}
}
@Deprecated
private static String commonSchemaGetFqdn(String commonSchemaType, ResultSet resultSet) {
String vFqdn = "";
try {

View File

@@ -10,37 +10,45 @@ public class BaseUpdateEtl {
BaseEdgeDocument newBaseEdgeDocument = new BaseEdgeDocument();
Set<String> schemaSets = newEdgeDocumentSchemaMap.keySet();
Map<String, Object> properties = newBaseEdgeDocument.getProperties();
for (String schema : schemaSets) {
BaseEdgeDocument schemaEdgeDoc = newEdgeDocumentSchemaMap.get(schema);
setSchemaCnt(schema,schemaEdgeDoc,newBaseEdgeDocument);
if (newBaseEdgeDocument.getKey() != null){
Map<String, Object> properties = newBaseEdgeDocument.getProperties();
if (!properties.isEmpty()){
setFoundTime(properties,schemaEdgeDoc);
setDistinctClientIpBySchema(properties,schemaEdgeDoc);
}else {
Map<String, Object> properties = schemaEdgeDoc.getProperties();
properties.remove("COUNT_TOTAL");
newBaseEdgeDocument = schemaEdgeDoc;
properties = schemaEdgeDoc.getProperties();
}
setSchemaCnt(schema,schemaEdgeDoc,properties);
}
properties.remove("COUNT_TOTAL");
addSchemaProperty(properties);
newBaseEdgeDocument.setProperties(properties);
return newBaseEdgeDocument;
}
public static BaseEdgeDocument mergeIp2FqdnBySchema(HashMap<String, BaseEdgeDocument> newEdgeDocumentMap){
BaseEdgeDocument newBaseEdgeDocument = new BaseEdgeDocument();
Set<String> schemaSets = newEdgeDocumentMap.keySet();
Map<String, Object> properties = newBaseEdgeDocument.getProperties();
for (String schema : schemaSets) {
BaseEdgeDocument schemaEdgeDoc = newEdgeDocumentMap.get(schema);
setSchemaCnt(schema,schemaEdgeDoc,newBaseEdgeDocument);
if (newBaseEdgeDocument.getKey() != null){
Map<String, Object> properties = newBaseEdgeDocument.getProperties();
if (!properties.isEmpty()){
setFoundTime(properties,schemaEdgeDoc);
}else {
Map<String, Object> properties = schemaEdgeDoc.getProperties();
properties.remove("COUNT_TOTAL");
newBaseEdgeDocument = schemaEdgeDoc;
properties = schemaEdgeDoc.getProperties();
}
setSchemaCnt(schema,schemaEdgeDoc,properties);
}
properties.remove("COUNT_TOTAL");
addSchemaProperty(properties);
newBaseEdgeDocument.setProperties(properties);
return newBaseEdgeDocument;
}
@@ -53,11 +61,21 @@ public class BaseUpdateEtl {
setDistinctClientIpByHistory(newEdgeDocument,edgeDocument);
}
private static void addSchemaProperty(Map<String, Object> properties){
if (!properties.containsKey("TLS_CNT_TOTAL")){
properties.put("TLS_CNT_TOTAL",0L);
properties.put("TLS_CNT_RECENT",new long[7]);
}else if (!properties.containsKey("HTTP_CNT_TOTAL")){
properties.put("HTTP_CNT_TOTAL",0L);
properties.put("HTTP_CNT_RECENT",new long[7]);
}
}
private static void setDistinctClientIpByHistory(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument edgeDocument){
ArrayList<String> distCipTotal = (ArrayList<String>) edgeDocument.getAttribute("DIST_CIP_TOTAL");
String[] distCipTotalsSrc = distCipTotal.toArray(new String[distCipTotal.size()]);
String[] distCipRecentsSrc = (String[]) newEdgeDocument.getAttribute("DIST_CIP_RECENT");
Object[] distCipRecentsSrc = (Object[])newEdgeDocument.getAttribute("DIST_CIP_RECENT");
if (distCipTotalsSrc.length == 30) {
Object[] distCipTotals = mergeClientIp(distCipTotalsSrc, distCipRecentsSrc);
edgeDocument.addAttribute("DIST_CIP_TOTAL", distCipTotals);
@@ -88,8 +106,8 @@ public class BaseUpdateEtl {
edgeDocument.addAttribute(totalSchema, countTotal + updateCountTotal);
}
private static Object[] mergeClientIp(String[] distCipTotalsSrc,String[] distCipRecentsSrc){
HashSet<String> dIpSet = new HashSet<>();
private static Object[] mergeClientIp(Object[] distCipTotalsSrc,Object[] distCipRecentsSrc){
HashSet<Object> dIpSet = new HashSet<>();
dIpSet.addAll(Arrays.asList(distCipRecentsSrc));
dIpSet.addAll(Arrays.asList(distCipTotalsSrc));
Object[] distCipTotals = dIpSet.toArray();
@@ -103,7 +121,7 @@ public class BaseUpdateEtl {
String[] schemaDistCipRecents = (String[]) schemaEdgeDoc.getAttribute("DIST_CIP_RECENT");
String[] distCipRecents = (String[]) properties.get("DIST_CIP_RECENT");
Object[] mergeClientIp = mergeClientIp(schemaDistCipRecents, distCipRecents);
properties.put("DIST_CIP_RECENT",mergeClientIp);
properties.put("DIST_CIP_RECENT", mergeClientIp);
properties.put("DIST_CIP_TOTAL",mergeClientIp);
}
@@ -116,21 +134,21 @@ public class BaseUpdateEtl {
properties.put("LAST_FOUND_TIME",schemaLastFoundTime>lastFoundTime?schemaLastFoundTime:lastFoundTime);
}
private static void setSchemaCnt(String schema,BaseEdgeDocument schemaEdgeDoc,BaseEdgeDocument newBaseEdgeDocument){
private static void setSchemaCnt(String schema,BaseEdgeDocument schemaEdgeDoc,Map<String, Object> properties){
switch (schema) {
case "HTTP":
long httpCntTotal = Long.parseLong(schemaEdgeDoc.getAttribute("COUNT_TOTAL").toString());
newBaseEdgeDocument.addAttribute("HTTP_CNT_TOTAL", httpCntTotal);
properties.put("HTTP_CNT_TOTAL", httpCntTotal);
long[] httpCntRecentsDst = new long[7];
httpCntRecentsDst[0] = httpCntTotal;
newBaseEdgeDocument.addAttribute("HTTP_CNT_RECENT", httpCntRecentsDst);
properties.put("HTTP_CNT_RECENT", httpCntRecentsDst);
break;
case "SSL":
long tlsCntTotal = Long.parseLong(schemaEdgeDoc.getAttribute("COUNT_TOTAL").toString());
newBaseEdgeDocument.addAttribute("TLS_CNT_TOTAL", tlsCntTotal);
properties.put("TLS_CNT_TOTAL", tlsCntTotal);
long[] tlsCntRecentsDst = new long[7];
tlsCntRecentsDst[0] = tlsCntTotal;
newBaseEdgeDocument.addAttribute("TLS_CNT_RECENT", tlsCntRecentsDst);
properties.put("TLS_CNT_RECENT", tlsCntRecentsDst);
break;
}
}

View File

@@ -50,7 +50,8 @@ public class UpdateEFqdnAddressIp implements Runnable {
LOG.info("更新R_LOCATE_FQDN2IP:" + i);
}
} catch (Exception e) {
LOG.error(e.getMessage());
e.printStackTrace();
LOG.error(e.toString());
}
}
}

View File

@@ -54,7 +54,7 @@ public class UpdateEIpVisitFqdn implements Runnable {
LOG.info("更新R_VISIT_IP2FQDN:" + i);
}
} catch (Exception e) {
LOG.error(e.getMessage());
LOG.error(e.toString());
}
}
}

View File

@@ -1,5 +1,5 @@
#arangoDB参数配置
arangoDB.host=192.168.40.127
arangoDB.host=192.168.40.182
arangoDB.port=8529
arangoDB.user=root
arangoDB.password=111111
@@ -13,5 +13,5 @@ update.arango.batch=10000
thread.pool.number=10
thread.await.termination.time=10
read.clickhouse.max.time=1593162456
read.clickhouse.max.time=1593582211
read.clickhouse.min.time=1592879247