Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
G
gic-cloud
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
data-hook
gic-cloud
Commits
2388ed1f
Commit
2388ed1f
authored
Apr 25, 2023
by
fudahua
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'fix-2023-04' into 'developer'
Fix 2023 04 See merge request
!98
parents
1d6a105e
9a40bf8f
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
97 additions
and
6 deletions
+97
-6
HDFSUtil.java
...c/main/java/com/gic/cloud/data/hook/service/HDFSUtil.java
+27
-1
DownloadTaskServiceImpl.java
...cloud/data/hook/service/impl/DownloadTaskServiceImpl.java
+5
-0
FlatQueryResultServiceImpl.java
...ud/data/hook/service/impl/FlatQueryResultServiceImpl.java
+65
-5
ExceTest2.java
...ud-data-hook-service/src/test/java/com/gic/ExceTest2.java
+0
-0
No files found.
gic-cloud-data-hook-service/src/main/java/com/gic/cloud/data/hook/service/HDFSUtil.java
View file @
2388ed1f
...
@@ -16,6 +16,8 @@ public class HDFSUtil {
...
@@ -16,6 +16,8 @@ public class HDFSUtil {
private
static
HDFSUtil
hdfsUtil
=
null
;
private
static
HDFSUtil
hdfsUtil
=
null
;
private
static
FileSystem
fileSystem
=
null
;
private
static
FileSystem
fileSystem
=
null
;
public
static
final
String
HDFS_URL
=
"/data/hook"
;
public
static
HDFSUtil
getInstance
(){
public
static
HDFSUtil
getInstance
(){
if
(
hdfsUtil
==
null
)
{
if
(
hdfsUtil
==
null
)
{
synchronized
(
HDFSUtil
.
class
)
{
synchronized
(
HDFSUtil
.
class
)
{
...
@@ -47,11 +49,35 @@ public class HDFSUtil {
...
@@ -47,11 +49,35 @@ public class HDFSUtil {
*/
*/
public
boolean
downloadFile
(
String
srcPath
,
String
toPath
)
{
public
boolean
downloadFile
(
String
srcPath
,
String
toPath
)
{
try
{
try
{
fileSystem
.
copyToLocalFile
(
true
,
new
Path
(
srcPath
),
new
Path
(
toPath
));
Config
appConfig
=
ConfigService
.
getAppConfig
();
Integer
delFlag
=
appConfig
.
getIntProperty
(
"del.hive.flag"
,
1
);
fileSystem
.
copyToLocalFile
(
delFlag
.
intValue
()==
1
?
true
:
false
,
new
Path
(
srcPath
),
new
Path
(
toPath
));
return
true
;
return
true
;
}
catch
(
IOException
e
)
{
}
catch
(
IOException
e
)
{
logger
.
info
(
"下载失败:{}"
,
e
);
logger
.
info
(
"下载失败:{}"
,
e
);
return
false
;
return
false
;
}
}
}
}
public
boolean
deleteFile
(
String
srcPath
){
try
{
fileSystem
.
delete
(
new
Path
(
srcPath
),
true
);
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
return
true
;
}
public
String
getHdfsName
(
String
id
){
//下载文件
String
dirName
=
"hdfs"
+
id
;
return
dirName
;
}
public
String
getHdfsPath
(
String
id
){
//下载文件
String
dirName
=
getHdfsName
(
id
);
String
path
=
HDFS_URL
+
"/"
+
dirName
;
return
path
;
}
}
}
gic-cloud-data-hook-service/src/main/java/com/gic/cloud/data/hook/service/impl/DownloadTaskServiceImpl.java
View file @
2388ed1f
...
@@ -10,6 +10,7 @@ import com.gic.cloud.data.hook.api.entity.DownloadTaskStatus;
...
@@ -10,6 +10,7 @@ import com.gic.cloud.data.hook.api.entity.DownloadTaskStatus;
import
com.gic.cloud.data.hook.api.entity.FreeQueryTaskCondition
;
import
com.gic.cloud.data.hook.api.entity.FreeQueryTaskCondition
;
import
com.gic.cloud.data.hook.api.entity.Global
;
import
com.gic.cloud.data.hook.api.entity.Global
;
import
com.gic.cloud.data.hook.api.service.IDownloadTaskService
;
import
com.gic.cloud.data.hook.api.service.IDownloadTaskService
;
import
com.gic.cloud.data.hook.service.HDFSUtil
;
import
com.gic.cloud.data.hook.service.MysqlHelper
;
import
com.gic.cloud.data.hook.service.MysqlHelper
;
import
com.gic.cloud.data.hook.service.dao.DownloadRecordDao
;
import
com.gic.cloud.data.hook.service.dao.DownloadRecordDao
;
import
com.gic.cloud.data.hook.service.dao.DownloadTaskDao
;
import
com.gic.cloud.data.hook.service.dao.DownloadTaskDao
;
...
@@ -165,6 +166,10 @@ public class DownloadTaskServiceImpl implements IDownloadTaskService {
...
@@ -165,6 +166,10 @@ public class DownloadTaskServiceImpl implements IDownloadTaskService {
}
}
this
.
downloadTaskDao
.
deleteDownloadTask
(
task
.
getId
());
this
.
downloadTaskDao
.
deleteDownloadTask
(
task
.
getId
());
CloudFileUtil
.
delFileByUrl
(
task
.
getFilePath
());
CloudFileUtil
.
delFileByUrl
(
task
.
getFilePath
());
String
hdfsPath
=
HDFSUtil
.
getInstance
().
getHdfsPath
(
task
.
getId
());
HDFSUtil
.
getInstance
().
deleteFile
(
hdfsPath
);
return
true
;
return
true
;
}
else
return
false
;
}
else
return
false
;
}
}
...
...
gic-cloud-data-hook-service/src/main/java/com/gic/cloud/data/hook/service/impl/FlatQueryResultServiceImpl.java
View file @
2388ed1f
...
@@ -78,7 +78,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
...
@@ -78,7 +78,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
public
static
final
String
SAVE_FOLDER
=
"/usr/local/data-hook-file"
;
public
static
final
String
SAVE_FOLDER
=
"/usr/local/data-hook-file"
;
// public static final String SAVE_FOLDER = "D:\\testorder";
// public static final String SAVE_FOLDER = "D:\\testorder";
public
static
final
String
HDFS_URL
=
"/data/hook"
;
//
public static final String HDFS_URL = "/data/hook";
public
static
final
String
LOCK_KEY
=
"data:hook:hive"
;
public
static
final
String
LOCK_KEY
=
"data:hook:hive"
;
...
@@ -930,8 +930,10 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
...
@@ -930,8 +930,10 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
task
.
setDownloadWay
(-
1
);
task
.
setDownloadWay
(-
1
);
task
.
setFieldSize
(
condition
.
getAllFields
().
size
());
task
.
setFieldSize
(
condition
.
getAllFields
().
size
());
//下载文件
//下载文件
String
dirName
=
"hdfs"
+
task
.
getId
();
String
dirName
=
HDFSUtil
.
getInstance
().
getHdfsName
(
task
.
getId
());
String
path
=
HDFS_URL
+
"/"
+
dirName
;
String
path
=
HDFSUtil
.
getInstance
().
getHdfsPath
(
task
.
getId
());
HDFSUtil
.
getInstance
().
getHdfsName
(
task
.
getId
());
try
{
try
{
StopWatch
stopWatch
=
StopWatch
.
create
(
"down"
);
StopWatch
stopWatch
=
StopWatch
.
create
(
"down"
);
stopWatch
.
start
();
stopWatch
.
start
();
...
@@ -945,9 +947,10 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
...
@@ -945,9 +947,10 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
logger
.
info
(
"下载耗时:{}"
,
stopWatch
.
getLastTaskTimeMillis
());
logger
.
info
(
"下载耗时:{}"
,
stopWatch
.
getLastTaskTimeMillis
());
stopWatch
.
start
();
stopWatch
.
start
();
List
<
String
>
xlsxFiles
=
new
ArrayList
<>();
List
<
String
>
xlsxFiles
=
new
ArrayList
<>();
AtomicInteger
totalCount
=
new
AtomicInteger
(
0
);
AtomicInteger
count
=
new
AtomicInteger
(
0
);
AtomicInteger
count
=
new
AtomicInteger
(
0
);
AtomicReference
<
XlsxFileInfo
>
currentFile
=
new
AtomicReference
<>();
AtomicReference
<
XlsxFileInfo
>
currentFile
=
new
AtomicReference
<>();
read
Csv
File
(
condition
,
dirName
,(
cells
,
titles
,
firstFlag
)->{
read
Json
File
(
condition
,
dirName
,(
cells
,
titles
,
firstFlag
)->{
if
(
count
.
get
()==
0
)
{
if
(
count
.
get
()==
0
)
{
XlsxFileInfo
xlsxFileInfo
=
new
XlsxFileInfo
();
XlsxFileInfo
xlsxFileInfo
=
new
XlsxFileInfo
();
xlsxFileInfo
.
filepath
=
SAVE_FOLDER
+
"/"
+
task
.
getId
()
+
xlsxFiles
.
size
()
+
".xlsx"
;
xlsxFileInfo
.
filepath
=
SAVE_FOLDER
+
"/"
+
task
.
getId
()
+
xlsxFiles
.
size
()
+
".xlsx"
;
...
@@ -957,9 +960,12 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
...
@@ -957,9 +960,12 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
saveXlsSplitNew
(
currentFile
.
get
().
filepath
,
cells
,
titles
,
currentFile
.
get
(),
count
,
false
,
queryDataType
);
saveXlsSplitNew
(
currentFile
.
get
().
filepath
,
cells
,
titles
,
currentFile
.
get
(),
count
,
false
,
queryDataType
);
});
});
//结束
//结束
Integer
limitSize
=
FileUtil
.
getLimitSize
();
Integer
total
=
(
xlsxFiles
.
size
()-
1
)*
limitSize
+
count
.
get
();
saveXlsSplitNew
(
currentFile
.
get
().
filepath
,
null
,
null
,
currentFile
.
get
(),
count
,
true
,
queryDataType
);
saveXlsSplitNew
(
currentFile
.
get
().
filepath
,
null
,
null
,
currentFile
.
get
(),
count
,
true
,
queryDataType
);
stopWatch
.
stop
();
stopWatch
.
stop
();
logger
.
info
(
"写入本地excel耗时:{}
"
,
stopWatch
.
getLastTaskTimeMillis
()
);
logger
.
info
(
"写入本地excel耗时:{}
, 数量: {}-》{}"
,
stopWatch
.
getLastTaskTimeMillis
(),
task
.
getAmount
(),
total
);
stopWatch
.
start
();
stopWatch
.
start
();
//是否压缩
//是否压缩
boolean
zipFlag
=
(
xlsxFiles
.
size
()
>
1
)
?
true
:
false
;
boolean
zipFlag
=
(
xlsxFiles
.
size
()
>
1
)
?
true
:
false
;
...
@@ -986,6 +992,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
...
@@ -986,6 +992,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
task
.
setFilePath
(
cloudFileUrl
);
task
.
setFilePath
(
cloudFileUrl
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
logger
.
info
(
"异常:{}"
,
e
);
logger
.
info
(
"异常:{}"
,
e
);
task
.
setStatus
(
DownloadTaskStatus
.
ERROR
);
task
.
setStatus
(
DownloadTaskStatus
.
ERROR
);
...
@@ -1071,6 +1078,59 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
...
@@ -1071,6 +1078,59 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
csvReader
.
close
();
csvReader
.
close
();
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
logger
.
info
(
"读取异常:{}"
,
e
);
logger
.
info
(
"读取异常:{}"
,
e
);
throw
new
RuntimeException
(
e
);
}
}
}
/**
* 读物文件
* @param dirName
* @param func
*/
private
void
readJsonFile
(
FlatQueryTaskCondition
condition
,
String
dirName
,
DownloadFunc
func
){
File
file
=
new
File
(
SAVE_FOLDER
+
"/"
+
dirName
);
File
[]
files
=
file
.
listFiles
();
List
<
FlatQueryCondition
>
titles
=
null
;
List
<
FlatQueryCondition
>
conditions
=
condition
.
getConditions
();
List
<
String
>
keys
=
conditions
.
stream
().
map
(
mid
->
mid
.
getFieldMark
()).
collect
(
Collectors
.
toList
());
Map
<
String
,
FlatQueryCondition
>
columnInfoMap
=
conditions
.
stream
().
collect
(
Collectors
.
toMap
(
mid
->
mid
.
getFieldMark
(),
mid
->
mid
));
List
<
File
>
fileList
=
Arrays
.
stream
(
files
).
sorted
(
Comparator
.
comparing
(
File:
:
getName
)).
collect
(
Collectors
.
toList
());
for
(
File
midFile
:
fileList
)
{
if
(!
midFile
.
getName
().
endsWith
(
"json"
))
{
continue
;
}
try
{
BufferedReader
reader
=
new
BufferedReader
(
new
FileReader
(
midFile
));
boolean
first
=
true
;
Exception
exception
=
null
;
try
{
String
line
=
reader
.
readLine
();
while
(
line
!=
null
)
{
List
<
String
>
cellList
=
new
ArrayList
<>();
JSONObject
jsonObject
=
JSONObject
.
parseObject
(
line
);
for
(
String
key
:
keys
)
{
String
cellVal
=
jsonObject
.
getString
(
key
);
cellList
.
add
(
cellVal
);
}
String
[]
cells
=
cellList
.
toArray
(
new
String
[]{});
func
.
deal
(
cells
,
conditions
,
first
);
first
=
false
;
line
=
reader
.
readLine
();
}
logger
.
info
(
"读取结束:{}"
,
midFile
.
getName
());
}
catch
(
Exception
e
)
{
exception
=
e
;
}
finally
{
reader
.
close
();
}
if
(
exception
!=
null
)
{
throw
exception
;
}
}
catch
(
Exception
e
)
{
logger
.
info
(
"读取异常:{}"
,
e
);
throw
new
RuntimeException
(
e
);
}
}
}
}
}
}
...
...
gic-cloud-data-hook-service/src/test/java/com/gic/ExceTest2.java
View file @
2388ed1f
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment