增加 写入hbase间隔 配置

This commit is contained in:
qidaijie
2020-04-14 13:37:30 +08:00
parent 7b89f1acb3
commit aa1fc748df

View File

@@ -32,10 +32,12 @@ public class FlumeSubscriberApp implements Interceptor {
private static Connection connection;
private String hbaseZookeeperIp;
private String hbaseTableName;
private int updateHBaseTime;
public FlumeSubscriberApp(String hbaseZookeeperIp, String hbaseTableName) {
public FlumeSubscriberApp(String hbaseZookeeperIp, String hbaseTableName, int updateHBaseTime) {
this.hbaseZookeeperIp = hbaseZookeeperIp;
this.hbaseTableName = hbaseTableName;
this.updateHBaseTime = updateHBaseTime;
}
@@ -70,7 +72,7 @@ public class FlumeSubscriberApp implements Interceptor {
e.printStackTrace();
}
}
}, 0, 50000);
}, 0, updateHBaseTime * 1000);
}
@@ -211,11 +213,12 @@ public class FlumeSubscriberApp implements Interceptor {
public static class FlumeDynamicAppBuilder implements Interceptor.Builder {
private String hbaseZookeeperIp;
private String hbaseTableName;
private int updateHBaseTime;
@Override
public Interceptor build() {
return new FlumeSubscriberApp(hbaseZookeeperIp, hbaseTableName);
return new FlumeSubscriberApp(hbaseZookeeperIp, hbaseTableName, updateHBaseTime);
}
@Override
@@ -240,6 +243,16 @@ public class FlumeSubscriberApp implements Interceptor {
logger.error("FlumeSubscriberApp Get hbaseTableName is error : " + e);
}
try {
this.updateHBaseTime = context.getInteger("updateHBaseTime", 30);
Preconditions.checkNotNull("".equals(updateHBaseTime), "updateHBaseTime must be set!!");
logger.info("FlumeSubscriberApp Read updateHBaseTime from configuration : " + updateHBaseTime);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("FlumeSubscriberApp updateHBaseTime invalid", e);
} catch (Exception e) {
logger.error("FlumeSubscriberApp Get updateHBaseTime is error : " + e);
}
}
}