diff --git a/dynamic_complement/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java b/dynamic_complement/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java index 98e95dd..9800a86 100644 --- a/dynamic_complement/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java +++ b/dynamic_complement/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java @@ -32,10 +32,12 @@ public class FlumeSubscriberApp implements Interceptor { private static Connection connection; private String hbaseZookeeperIp; private String hbaseTableName; + private int updateHBaseTime; - public FlumeSubscriberApp(String hbaseZookeeperIp, String hbaseTableName) { + public FlumeSubscriberApp(String hbaseZookeeperIp, String hbaseTableName, int updateHBaseTime) { this.hbaseZookeeperIp = hbaseZookeeperIp; this.hbaseTableName = hbaseTableName; + this.updateHBaseTime = updateHBaseTime; } @@ -70,7 +72,7 @@ public class FlumeSubscriberApp implements Interceptor { e.printStackTrace(); } } - }, 0, 50000); + }, 0, updateHBaseTime * 1000); } @@ -211,11 +213,12 @@ public class FlumeSubscriberApp implements Interceptor { public static class FlumeDynamicAppBuilder implements Interceptor.Builder { private String hbaseZookeeperIp; private String hbaseTableName; + private int updateHBaseTime; @Override public Interceptor build() { - return new FlumeSubscriberApp(hbaseZookeeperIp, hbaseTableName); + return new FlumeSubscriberApp(hbaseZookeeperIp, hbaseTableName, updateHBaseTime); } @Override @@ -240,6 +243,16 @@ public class FlumeSubscriberApp implements Interceptor { logger.error("FlumeSubscriberApp Get hbaseTableName is error : " + e); } + try { + this.updateHBaseTime = context.getInteger("updateHBaseTime", 30); + Preconditions.checkNotNull("".equals(updateHBaseTime), "updateHBaseTime must be set!!"); + logger.info("FlumeSubscriberApp Read updateHBaseTime from configuration : " + updateHBaseTime); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("FlumeSubscriberApp updateHBaseTime invalid", e); + } catch (Exception e) { + logger.error("FlumeSubscriberApp Get updateHBaseTime is error : " + e); + } + } }