Commit e8e2e515 by 陶光胜

Merge remote-tracking branch 'remotes/origin/developer'

# Conflicts:
#	gic-cloud-data-hook-service/src/main/java/com/gic/cloud/data/hook/service/impl/FlatQueryResultServiceImpl.java
parents b455278f 05b46302
......@@ -34,6 +34,8 @@ public class HiveHelper implements ApplicationContextAware {
private static DataSource balaSearchSource = null;
private static DataSource bigDataDownloadSource = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
log.debug("setApplicationContext", "准备初始化 Hive 连接池");
......@@ -41,6 +43,7 @@ public class HiveHelper implements ApplicationContextAware {
downloadSource = (DataSource)applicationContext.getBean("downloadHiveSource");
balaDownloadSource = (DataSource)applicationContext.getBean("balaDownloadHiveSource");
balaSearchSource = (DataSource)applicationContext.getBean("balaSearchHiveSource");
bigDataDownloadSource = (DataSource)applicationContext.getBean("bigTaskdownloadHiveSource");
}
/** 获取 Hive 数据源
......@@ -56,10 +59,6 @@ public class HiveHelper implements ApplicationContextAware {
public static Connection getHiveConnection() {
Connection conn = null;
try {
// String url = "jdbc:hive2://115.159.205.44:10015/data_test";
// //String url = "jdbc:hive2://10.0.0.3:10015/data_test";
// Class.forName("org.apache.hive.jdbc.HiveDriver");
// conn = DriverManager.getConnection(url, "hadoop", "");
conn = source.getConnection();
return conn;
} catch (Exception ex) {
......@@ -78,10 +77,6 @@ public class HiveHelper implements ApplicationContextAware {
public static Connection getDownloadHiveConnection() {
Connection conn = null;
try {
// String url = "jdbc:hive2://115.159.205.44:10015/data_test";
// //String url = "jdbc:hive2://10.0.0.3:10015/data_test";
// Class.forName("org.apache.hive.jdbc.HiveDriver");
// conn = DriverManager.getConnection(url, "hadoop", "");
conn = downloadSource.getConnection();
return conn;
} catch (Exception ex) {
......@@ -102,10 +97,6 @@ public class HiveHelper implements ApplicationContextAware {
public static Connection getBalaDownloadHiveConnection() {
Connection conn = null;
try {
// String url = "jdbc:hive2://115.159.205.44:10015/data_test";
// //String url = "jdbc:hive2://10.0.0.3:10015/data_test";
// Class.forName("org.apache.hive.jdbc.HiveDriver");
// conn = DriverManager.getConnection(url, "hadoop", "");
conn = balaDownloadSource.getConnection();
return conn;
} catch (Exception ex) {
......@@ -125,10 +116,6 @@ public class HiveHelper implements ApplicationContextAware {
public static Connection getBalaSearchHiveConnection() {
Connection conn = null;
try {
// String url = "jdbc:hive2://115.159.205.44:10015/data_test";
// //String url = "jdbc:hive2://10.0.0.3:10015/data_test";
// Class.forName("org.apache.hive.jdbc.HiveDriver");
// conn = DriverManager.getConnection(url, "hadoop", "");
conn = balaSearchSource.getConnection();
return conn;
} catch (Exception ex) {
......@@ -144,6 +131,25 @@ public class HiveHelper implements ApplicationContextAware {
} // TRY CATCH OVER
}
public static Connection getBigDataDownloadHiveConnection() {
Connection conn = null;
try {
conn = bigDataDownloadSource.getConnection();
return conn;
} catch (Exception ex) {
DingtalkMessageUtil.sendAlertMessage("获取连接告警:获取连接超时", "https://oapi.dingtalk.com/robot/send?access_token=157ad00c2b6491f2f0aac1d89121e4bd2d82d9d33cad0596b88dacfdc12fe455");
ex.printStackTrace();
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
throw new RuntimeException("获取连接异常");
} // TRY CATCH OVER
}
public static Connection getSearchHiveConnection(List<String> enterpriseIds){
Config config = ConfigService.getConfig("application");
String customRouterEnterpriseId = config.getProperty("customRouterEnterpriseId", "");
......
......@@ -176,6 +176,9 @@ public class CsvResultSetHelper implements ResultSetHelper {
if (tmpResult != null && tmpResult.length() > 0) {
//tmpResult = DecryptUtils.getInstance().decrypt(tmpResult);
tmpResult = DecryptUtils.getInstance().decrypt(tmpResult);
if(tmpResult.startsWith("0")){
tmpResult = tmpResult + "\t";
}
System.out.println("tmpResult = " + tmpResult);
} // IF OVER
result.add(tmpResult);
......
......@@ -44,6 +44,7 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
......@@ -65,6 +66,10 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
public static final List<String> CARD = Arrays.asList("card_num", "receive_card_num");
private static final Integer maxFields = 20;
private static final Map<String, String> bigTaskRunningMap = new ConcurrentHashMap<>();
@Autowired
FlatQueryTableDao flatQueryTableDao;
@Autowired
......@@ -74,11 +79,13 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
/** 自助指标查询关联的下载条件列表 */
protected List<FlatQueryTaskCondition> taskConditions = Lists.newArrayList();
protected List<FlatQueryTaskCondition> bigTaskConditions = Lists.newArrayList();
private FlatQueryResultServiceImpl() {
log.debug("construct", "准备初始化 FlatQuery 查询服务");
runDownloadTask(3);
runBalaDownloadTask(3);
runBigDataDownloadTask(3);
runApplyTask(5); // 每5秒钟进行任务状态检测
}
......@@ -565,7 +572,18 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
log.debug("自助指标当前正在执行的任务为:", JSON.toJSONString(taskConditions.get(i)));
if (taskConditions.get(i).getBuildPermitted().equals(Global.YES)
&& !taskConditions.get(i).getEnterpriseIds().contains("ff8080816dd0385e016ddca436d01fe1")) {
try{
if(taskConditions.get(i).getAllFields().size() >= maxFields) {
bigTaskConditions.add(taskConditions.get(i));
taskConditions.remove(i);
continue;
} else {
if(bigTaskRunningMap.isEmpty()){
bigTaskConditions.add(taskConditions.get(i));
taskConditions.remove(i);
continue;
}
}
try {
connection = HiveHelper.getDownloadHiveConnection();
condition = taskConditions.remove(i); // 移除并获取第一个任务条件
break;
......@@ -594,7 +612,54 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
}, interval*1000, interval*1000, TimeUnit.MILLISECONDS);
}
/** 下载任务执行计时器 */
//private Timer downloadTaskTimer = new Timer();
ScheduledExecutorService bigDataDownloadService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("bigDataDownloadTimer-%d").daemon(true).build());
/** 大任务下载 如果空闲时正常任务也可进入 */
private void runBigDataDownloadTask(Integer interval) {
bigDataDownloadService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Connection connection = null;
try{
if (bigTaskConditions != null && bigTaskConditions.size() > 0) {
FlatQueryTaskCondition condition = null;
//FlatQueryTaskCondition condition = taskConditions.remove(0); // 移除并获取第一个任务条件
for (int i=0; i<bigTaskConditions.size(); i++ ) {
log.debug("自助指标当前正在执行的任务为:", JSON.toJSONString(bigTaskConditions.get(i)));
if (bigTaskConditions.get(i).getBuildPermitted().equals(Global.YES)) {
try{
condition = bigTaskConditions.remove(i); // 移除并获取第一个任务条件
bigTaskRunningMap.put(condition.getTaskId(), condition.getTaskId());
connection = HiveHelper.getBigDataDownloadHiveConnection();
break;
}catch (Exception e){
log.debug("获取连接异常:", e.getMessage());
e.printStackTrace();
continue;
}
} // IF OVER
} // FOR OVER
takeFile(condition, connection);
bigTaskRunningMap.remove(condition.getTaskId());
} // 没有任务则忽略
}catch (Exception e){
log.debug("自助指标下载异常", e.getMessage());
e.printStackTrace();
} finally {
if(connection != null){
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}, interval*1000, interval*1000, TimeUnit.MILLISECONDS);
}
/** 下载任务执行计时器 */
//private Timer balaDownloadTaskTimer = new Timer();
......
......@@ -33,6 +33,20 @@
<property name="maxWait">
<value>120000</value>
</property>
<property name="validationQuery" value="SELECT 1"/>
<!-- 借出连接时不要测试,否则很影响性能 -->
<property name="testOnBorrow" value="false"/>
<!-- 每30秒运行一次空闲连接回收器 -->
<property name="timeBetweenEvictionRunsMillis" value="30000"/>
<!-- 池中的连接空闲30分钟后被回收 -->
<property name="minEvictableIdleTimeMillis" value="1800000"/>
<!-- 在每次空闲连接回收器线程(如果有)运行时检查的连接数量 -->
<property name="numTestsPerEvictionRun" value="10"/>
<!-- 连接泄漏回收参数,当可用连接数少于3个时才执行 -->
<property name="removeAbandoned" value="true"/>
<!-- 连接泄漏回收参数,180秒,泄露的连接可以被删除的超时值 -->
<property name="removeAbandonedTimeout" value="180"/>
</bean>
<bean class="org.apache.commons.dbcp.BasicDataSource" id="downloadHiveSource" destroy-method="close">
......@@ -56,6 +70,20 @@
<property name="maxWait">
<value>120000</value>
</property>
<property name="validationQuery" value="SELECT 1"/>
<!-- 借出连接时不要测试,否则很影响性能 -->
<property name="testOnBorrow" value="false"/>
<!-- 每30秒运行一次空闲连接回收器 -->
<property name="timeBetweenEvictionRunsMillis" value="30000"/>
<!-- 池中的连接空闲30分钟后被回收 -->
<property name="minEvictableIdleTimeMillis" value="1800000"/>
<!-- 在每次空闲连接回收器线程(如果有)运行时检查的连接数量 -->
<property name="numTestsPerEvictionRun" value="10"/>
<!-- 连接泄漏回收参数,当可用连接数少于3个时才执行 -->
<property name="removeAbandoned" value="true"/>
<!-- 连接泄漏回收参数,180秒,泄露的连接可以被删除的超时值 -->
<property name="removeAbandonedTimeout" value="180"/>
</bean>
<bean class="org.apache.commons.dbcp.BasicDataSource" id="balaDownloadHiveSource" destroy-method="close">
......@@ -79,6 +107,20 @@
<property name="maxWait">
<value>120000</value>
</property>
<property name="validationQuery" value="SELECT 1"/>
<!-- 借出连接时不要测试,否则很影响性能 -->
<property name="testOnBorrow" value="false"/>
<!-- 每30秒运行一次空闲连接回收器 -->
<property name="timeBetweenEvictionRunsMillis" value="30000"/>
<!-- 池中的连接空闲30分钟后被回收 -->
<property name="minEvictableIdleTimeMillis" value="1800000"/>
<!-- 在每次空闲连接回收器线程(如果有)运行时检查的连接数量 -->
<property name="numTestsPerEvictionRun" value="10"/>
<!-- 连接泄漏回收参数,当可用连接数少于3个时才执行 -->
<property name="removeAbandoned" value="true"/>
<!-- 连接泄漏回收参数,180秒,泄露的连接可以被删除的超时值 -->
<property name="removeAbandonedTimeout" value="180"/>
</bean>
<bean class="org.apache.commons.dbcp.BasicDataSource" id="balaSearchHiveSource" destroy-method="close">
......@@ -102,6 +144,57 @@
<property name="maxWait">
<value>120000</value>
</property>
<property name="validationQuery" value="SELECT 1"/>
<!-- 借出连接时不要测试,否则很影响性能 -->
<property name="testOnBorrow" value="false"/>
<!-- 每30秒运行一次空闲连接回收器 -->
<property name="timeBetweenEvictionRunsMillis" value="30000"/>
<!-- 池中的连接空闲30分钟后被回收 -->
<property name="minEvictableIdleTimeMillis" value="1800000"/>
<!-- 在每次空闲连接回收器线程(如果有)运行时检查的连接数量 -->
<property name="numTestsPerEvictionRun" value="10"/>
<!-- 连接泄漏回收参数,当可用连接数少于3个时才执行 -->
<property name="removeAbandoned" value="true"/>
<!-- 连接泄漏回收参数,180秒,泄露的连接可以被删除的超时值 -->
<property name="removeAbandonedTimeout" value="180"/>
</bean>
<bean class="org.apache.commons.dbcp.BasicDataSource" id="bigTaskdownloadHiveSource" destroy-method="close">
<property name="driverClassName" value="org.apache.hive.jdbc.HiveDriver" />
<!--<property name="url" value="jdbc:hive2://115.159.205.44:10015/data_test" />-->
<property name="url" value="${hive.bigTaskdownload.url}" />
<property name="username" value="${hive.username}" />
<property name="password" value="" />
<property name="maxActive">
<value>20</value>
</property>
<property name="maxIdle">
<value>5</value>
</property>
<property name="minIdle">
<value>1</value>
</property>
<property name="testWhileIdle">
<value>true</value>
</property>
<property name="maxWait">
<value>120000</value>
</property>
<property name="validationQuery" value="SELECT 1"/>
<!-- 借出连接时不要测试,否则很影响性能 -->
<property name="testOnBorrow" value="false"/>
<!-- 每30秒运行一次空闲连接回收器 -->
<property name="timeBetweenEvictionRunsMillis" value="30000"/>
<!-- 池中的连接空闲30分钟后被回收 -->
<property name="minEvictableIdleTimeMillis" value="1800000"/>
<!-- 在每次空闲连接回收器线程(如果有)运行时检查的连接数量 -->
<property name="numTestsPerEvictionRun" value="10"/>
<!-- 连接泄漏回收参数,当可用连接数少于3个时才执行 -->
<property name="removeAbandoned" value="true"/>
<!-- 连接泄漏回收参数,180秒,泄露的连接可以被删除的超时值 -->
<property name="removeAbandonedTimeout" value="180"/>
</bean>
</beans>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment