Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
G
gic-cloud
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
data-hook
gic-cloud
Commits
1c590255
Commit
1c590255
authored
Feb 22, 2021
by
陶光胜
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'developer' into 'master'
Developer See merge request
!13
parents
bb3a6891
7daf15c6
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
193 additions
and
7 deletions
+193
-7
FlatQueryResultServiceImpl.java
...ud/data/hook/service/impl/FlatQueryResultServiceImpl.java
+193
-7
No files found.
gic-cloud-data-hook-service/src/main/java/com/gic/cloud/data/hook/service/impl/FlatQueryResultServiceImpl.java
View file @
1c590255
...
@@ -67,6 +67,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
...
@@ -67,6 +67,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
private
FlatQueryResultServiceImpl
()
{
private
FlatQueryResultServiceImpl
()
{
log
.
debug
(
"construct"
,
"准备初始化 FlatQuery 查询服务"
);
log
.
debug
(
"construct"
,
"准备初始化 FlatQuery 查询服务"
);
runDownloadTask
(
3
);
runDownloadTask
(
3
);
runBalaDownloadTask
(
3
);
runApplyTask
(
5
);
// 每5秒钟进行任务状态检测
runApplyTask
(
5
);
// 每5秒钟进行任务状态检测
}
}
...
@@ -514,7 +515,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
...
@@ -514,7 +515,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
//FlatQueryTaskCondition condition = taskConditions.remove(0); // 移除并获取第一个任务条件
//FlatQueryTaskCondition condition = taskConditions.remove(0); // 移除并获取第一个任务条件
for
(
int
i
=
0
;
i
<
taskConditions
.
size
();
i
++
)
{
for
(
int
i
=
0
;
i
<
taskConditions
.
size
();
i
++
)
{
log
.
debug
(
"自助指标当前正在执行的任务为:"
,
JSON
.
toJSONString
(
taskConditions
.
get
(
i
)));
log
.
debug
(
"自助指标当前正在执行的任务为:"
,
JSON
.
toJSONString
(
taskConditions
.
get
(
i
)));
if
(
taskConditions
.
get
(
i
).
getBuildPermitted
().
equals
(
Global
.
YES
))
{
if
(
taskConditions
.
get
(
i
).
getBuildPermitted
().
equals
(
Global
.
YES
)
&&
!
taskConditions
.
get
(
i
).
getEnterpriseIds
().
contains
(
"ff8080816dd0385e016ddca436d01fe1"
))
{
condition
=
taskConditions
.
remove
(
i
);
// 移除并获取第一个任务条件
condition
=
taskConditions
.
remove
(
i
);
// 移除并获取第一个任务条件
break
;
break
;
}
// IF OVER
}
// IF OVER
...
@@ -536,12 +538,196 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
...
@@ -536,12 +538,196 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
0
,
0
,
condition
.
getAuthStoreIdList
());
condition
.
getAuthStoreIdList
());
Connection
conn
=
null
;
Connection
conn
=
HiveHelper
.
getDownloadHiveConnection
();
if
(
condition
.
getEnterpriseIds
().
contains
(
"ff8080816dd0385e016ddca436d01fe1"
)){
log
.
debug
(
"runDownloadTask.run"
,
"获取商户连接:"
+
task
.
getId
());
conn
=
HiveHelper
.
getBalaDownloadHiveConnection
();
if
(
conn
!=
null
)
{
}
else
{
try
{
conn
=
HiveHelper
.
getDownloadHiveConnection
();
Statement
stat
=
conn
.
createStatement
();
}
// stat.setQueryTimeout(60 * 1000);
stat
.
execute
(
"REFRESH TABLE "
+
condition
.
getTableId
());
// 强制刷新表结构
ResultSet
rs
=
stat
.
executeQuery
(
fullQuery
);
// 生成指定格式下载元文件
String
originalFilePath
=
""
;
if
(
task
.
getFormat
().
equals
(
DownloadFileFormat
.
CSV
))
{
// 如果指定为 CSV 格式
log
.
debug
(
"runDownloadTask.run"
,
"准备生成自助指标下载文件 "
+
condition
.
getTaskId
()
+
".csv"
);
originalFilePath
=
SAVE_FOLDER
+
"/"
+
condition
.
getTaskId
()
+
".csv"
;
File
tmp
=
new
File
(
originalFilePath
);
if
(
tmp
.
exists
())
{
// 删除可能存在的文件
tmp
.
delete
();
}
//CSVWriter csvWriter = new CSVWriter(new FileWriter(csvPath), '\t');
OutputStreamWriter
out
=
new
OutputStreamWriter
(
new
FileOutputStream
(
originalFilePath
),
Charset
.
forName
(
"GBK"
));
ResultSetHelper
helper
=
new
CsvResultSetHelper
(
task
.
getQueryDataType
()
==
QueryDataType
.
FULL
?
CsvDataFilterMode
.
DECRYPT
:
CsvDataFilterMode
.
DESENSI
,
condition
.
getDecryptFilters
());
CSVWriter
writer
=
new
CSVWriter
(
out
,
','
);
writer
.
setResultService
(
helper
);
writer
.
writeAll
(
rs
,
true
);
writer
.
close
();
out
.
close
();
//记得关闭资源
log
.
debug
(
"runDownloadTask.run"
,
"已生成自助指标下载文件 "
+
condition
.
getTaskId
()
+
".csv"
);
}
else
{
// 如果指定为 XLS 格式
log
.
debug
(
"runDownloadTask.run"
,
"准备生成自助指标下载文件 "
+
condition
.
getTaskId
()
+
".xlsx"
);
originalFilePath
=
SAVE_FOLDER
+
"/"
+
condition
.
getTaskId
()
+
".xlsx"
;
SXSSFWorkbook
wb
=
new
SXSSFWorkbook
(
100
);
// 内存中保留 100 行
Sheet
sheet
=
wb
.
createSheet
();
Row
row
=
sheet
.
createRow
(
0
);
Cell
cell
;
for
(
int
j
=
0
;
j
<
rs
.
getMetaData
().
getColumnCount
();
++
j
)
{
// 遍历创建表头
String
colName
=
rs
.
getMetaData
().
getColumnLabel
(
j
+
1
);
cell
=
row
.
createCell
(
j
);
cell
.
setCellValue
(
colName
);
}
// 遍历输出行
int
rowCount
=
0
;
while
(
rs
.
next
())
{
rowCount
++;
row
=
sheet
.
createRow
(
rowCount
);
for
(
int
j
=
0
;
j
<
rs
.
getMetaData
().
getColumnCount
();
++
j
)
{
//String c = rs.getString(j + 1);
//row.createCell(j).setCellValue(c);
String
cName
=
rs
.
getMetaData
().
getColumnName
(
j
+
1
);
List
<
String
>
cFilters
=
condition
.
getDecryptFilters
();
if
(
task
.
getQueryDataType
()
==
QueryDataType
.
FULL
&&
cFilters
.
contains
(
cName
))
{
String
tmpResult
=
rs
.
getString
(
j
+
1
);
if
(
StringUtils
.
isNotBlank
(
tmpResult
))
tmpResult
=
DecryptUtils
.
getInstance
().
decrypt
(
tmpResult
);
row
.
createCell
(
j
).
setCellValue
(
tmpResult
);
}
else
{
int
cType
=
rs
.
getMetaData
().
getColumnType
(
j
+
1
);
switch
(
cType
)
{
case
Types
.
TIMESTAMP
:
row
.
createCell
(
j
).
setCellValue
(
rs
.
getTimestamp
(
j
+
1
)
!=
null
?
datetimeFormatter
.
format
(
rs
.
getTimestamp
(
j
+
1
))
:
""
);
break
;
case
Types
.
DATE
:
row
.
createCell
(
j
).
setCellValue
(
rs
.
getDate
(
j
+
1
)
!=
null
?
dateFormatter
.
format
(
rs
.
getDate
(
j
+
1
))
:
""
);
break
;
case
Types
.
TIME
:
row
.
createCell
(
j
).
setCellValue
(
rs
.
getTimestamp
(
j
+
1
)
!=
null
?
timeFormatter
.
format
(
rs
.
getTimestamp
(
j
+
1
))
:
""
);
break
;
default
:
if
(
cFilters
.
contains
(
cName
)){
row
.
createCell
(
j
).
setCellValue
(
"******"
);
}
else
{
row
.
createCell
(
j
).
setCellValue
(
rs
.
getString
(
j
+
1
));
}
break
;
}
}
// IF ELSE OVER
}
// FOR OVER
}
// WHILE OVER
FileOutputStream
fileOut
=
new
FileOutputStream
(
originalFilePath
);
wb
.
write
(
fileOut
);
//fileOut.flush(); // SXSSFWorkbook 使用 auto-flush 模式
fileOut
.
close
();
//wb.close();
wb
.
dispose
();
// SXSSFWorkbook 没有 close 方法
log
.
debug
(
"runDownloadTask.run"
,
"已生成自助指标下载文件 "
+
condition
.
getTaskId
()
+
".xlsx"
);
}
// IF ELSE OVER
// 如果指定压缩,则使用之
//if (task.getFormat().equals("zip")) {
if
(
task
.
getUseCompress
().
equals
(
Global
.
YES
))
{
log
.
debug
(
"runDownloadTask.run"
,
"准备生成自助指标压缩文件 "
+
condition
.
getTaskId
()
+
".zip"
);
String
zipFilePath
=
SAVE_FOLDER
+
"/"
+
condition
.
getTaskId
()
+
".zip"
;
File
zipFile
=
new
File
(
zipFilePath
);
ZipOutputStream
zos
=
null
;
byte
[]
buf
=
new
byte
[
1024
];
int
length
=
0
;
try
{
OutputStream
os
=
new
FileOutputStream
(
zipFilePath
);
BufferedOutputStream
bos
=
new
BufferedOutputStream
(
os
);
zos
=
new
ZipOutputStream
(
bos
);
zos
.
setLevel
(
6
);
// 压缩率选择 0-9
InputStream
is
=
new
FileInputStream
(
originalFilePath
);
BufferedInputStream
bis
=
new
BufferedInputStream
(
is
);
zos
.
putNextEntry
(
new
ZipEntry
(
originalFilePath
.
substring
(
originalFilePath
.
lastIndexOf
(
"/"
)
+
1
)));
while
((
length
=
bis
.
read
(
buf
))
>
0
)
{
zos
.
write
(
buf
,
0
,
length
);
}
bis
.
close
();
is
.
close
();
//bos.close();
//os.close();
log
.
debug
(
"runDownloadTask.run"
,
"已生成自助指标压缩文件 "
+
condition
.
getTaskId
()
+
".zip"
);
}
catch
(
Exception
ex2
)
{
throw
ex2
;
}
finally
{
zos
.
closeEntry
();
zos
.
close
();
}
}
task
.
setStatus
(
DownloadTaskStatus
.
COMPLISHED
);
task
.
setOverTime
(
new
Date
());
String
taskFileExt
=
task
.
getUseCompress
().
equals
(
Global
.
YES
)
?
".zip"
:
task
.
getFormat
().
equals
(
DownloadFileFormat
.
CSV
)
?
".csv"
:
".xlsx"
;
task
.
setFilePath
(
task
.
getId
()
+
taskFileExt
);
DownloadTaskServiceImpl
.
getInstance
().
updateDownloadTask
(
task
);
}
catch
(
Exception
ex
)
{
ex
.
printStackTrace
();
// 标记任务异常
task
.
setStatus
(
DownloadTaskStatus
.
ERROR
);
task
.
setOverTime
(
new
Date
());
DownloadTaskServiceImpl
.
getInstance
().
updateDownloadTask
(
task
);
}
finally
{
try
{
conn
.
close
();
}
catch
(
SQLException
e
)
{
e
.
printStackTrace
();
}
}
}
// IF OVER
}
// IF OVER
}
// 没有任务则忽略
}
catch
(
Exception
e
){
log
.
debug
(
"自助指标下载异常"
,
e
.
getMessage
());
e
.
printStackTrace
();
}
}
// run Define Over
},
interval
*
1000
,
interval
*
1000
);
// 配置中的值为毫秒
}
/** 下载任务执行计时器 */
private
Timer
balaDownloadTaskTimer
=
new
Timer
();
/** 启动自助指标查询计划任务 */
private
void
runBalaDownloadTask
(
Integer
interval
)
{
this
.
balaDownloadTaskTimer
.
schedule
(
new
TimerTask
()
{
@Override
public
void
run
()
{
try
{
if
(
taskConditions
!=
null
&&
taskConditions
.
size
()
>
0
)
{
FlatQueryTaskCondition
condition
=
null
;
//FlatQueryTaskCondition condition = taskConditions.remove(0); // 移除并获取第一个任务条件
for
(
int
i
=
0
;
i
<
taskConditions
.
size
();
i
++
)
{
log
.
debug
(
"自助指标当前正在执行的任务为:"
,
JSON
.
toJSONString
(
taskConditions
.
get
(
i
)));
if
(
taskConditions
.
get
(
i
).
getBuildPermitted
().
equals
(
Global
.
YES
)
&&
taskConditions
.
get
(
i
).
getEnterpriseIds
().
contains
(
"ff8080816dd0385e016ddca436d01fe1"
))
{
condition
=
taskConditions
.
remove
(
i
);
// 移除并获取第一个任务条件
break
;
}
// IF OVER
}
// FOR OVER
if
(
condition
!=
null
)
{
// 更新任务状态
DownloadTask
task
=
DownloadTaskServiceImpl
.
getInstance
().
getDownloadTaskById
(
condition
.
getTaskId
());
task
.
setStatus
(
DownloadTaskStatus
.
BUILDING
);
DownloadTaskServiceImpl
.
getInstance
().
updateDownloadTask
(
task
);
log
.
debug
(
"runDownloadTask.run"
,
"自助指标下载任务执行:"
+
task
.
getId
());
String
fullQuery
=
buildFlatQuerySQL
(
false
,
// 下载用途
condition
.
getTableId
(),
condition
.
getEnterpriseIds
(),
condition
.
getConditions
(),
condition
.
getOrderField
(),
condition
.
getOrderDir
(),
condition
.
getExecDistinct
(),
0
,
condition
.
getAuthStoreIdList
());
Connection
conn
=
HiveHelper
.
getBalaDownloadHiveConnection
();
if
(
conn
!=
null
)
{
if
(
conn
!=
null
)
{
try
{
try
{
Statement
stat
=
conn
.
createStatement
();
Statement
stat
=
conn
.
createStatement
();
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment