Commit 233a403c by fudahua

feat: 新版本的下载

parent 8ff66409
...@@ -67,8 +67,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -67,8 +67,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
private static LogPak log = new LogPak(FlatQueryResultServiceImpl.class); private static LogPak log = new LogPak(FlatQueryResultServiceImpl.class);
/** csv / xls 下载目录 */ /** csv / xls 下载目录 */
// public static final String SAVE_FOLDER = "/usr/local/data-hook-file"; public static final String SAVE_FOLDER = "/usr/local/data-hook-file";
public static final String SAVE_FOLDER = "D:\\testorder"; // public static final String SAVE_FOLDER = "D:\\testorder";
public static final String HDFS_URL = "/data/hook"; public static final String HDFS_URL = "/data/hook";
...@@ -106,7 +106,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -106,7 +106,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
private FlatQueryResultServiceImpl() { private FlatQueryResultServiceImpl() {
log.debug("construct", "准备初始化 FlatQuery 查询服务"); log.debug("construct", "准备初始化 FlatQuery 查询服务");
// runDealHiveFile(3); runDealHiveFile(3);
// runDistTask(3); // runDistTask(3);
runDownloadTask(3); runDownloadTask(3);
runBalaDownloadTask(3); runBalaDownloadTask(3);
...@@ -667,8 +667,9 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -667,8 +667,9 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
distService.scheduleAtFixedRate(new Runnable() { distService.scheduleAtFixedRate(new Runnable() {
@Override @Override
public void run() { public void run() {
String traceId = UUID.randomUUID().toString();
ProviderLocalTag providerLocalTag = ProviderLocalTag.tag.get(); ProviderLocalTag providerLocalTag = ProviderLocalTag.tag.get();
providerLocalTag.traceId = UUID.randomUUID().toString(); providerLocalTag.traceId = traceId;
logger.info("执行hivefile"); logger.info("执行hivefile");
try{ try{
String lockKey="data:hook:hive"; String lockKey="data:hook:hive";
...@@ -683,6 +684,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -683,6 +684,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
} }
RedisUtil.unlock(lockKey); RedisUtil.unlock(lockKey);
downloadTasks.parallelStream().forEach(mid->{ downloadTasks.parallelStream().forEach(mid->{
ProviderLocalTag localTag = ProviderLocalTag.tag.get();
localTag.traceId = traceId;
//下载处理 //下载处理
takeFileNew(mid); takeFileNew(mid);
}); });
...@@ -1223,7 +1226,6 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -1223,7 +1226,6 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
xlsxFileInfo.yyyyMMdd=yyyyMMdd; xlsxFileInfo.yyyyMMdd=yyyyMMdd;
count.incrementAndGet(); count.incrementAndGet();
} }
logger.info("{}-count:{}",originalFilePath,count.get());
SXSSFWorkbook wb = xlsxFileInfo.workbook; // 内存中保留 100 行 SXSSFWorkbook wb = xlsxFileInfo.workbook; // 内存中保留 100 行
if (!endFlag) { if (!endFlag) {
...@@ -1263,6 +1265,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -1263,6 +1265,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
Integer limitSize = FileUtil.getLimitSize(); Integer limitSize = FileUtil.getLimitSize();
int c = count.get(); int c = count.get();
if (c>limitSize||endFlag) { if (c>limitSize||endFlag) {
logger.info("{}-count:{}",originalFilePath,count.get());
FileOutputStream fileOut = new FileOutputStream(originalFilePath); FileOutputStream fileOut = new FileOutputStream(originalFilePath);
wb.write(fileOut); wb.write(fileOut);
//fileOut.flush(); // SXSSFWorkbook 使用 auto-flush 模式 //fileOut.flush(); // SXSSFWorkbook 使用 auto-flush 模式
......
...@@ -6,6 +6,7 @@ import org.apache.hadoop.fs.Path; ...@@ -6,6 +6,7 @@ import org.apache.hadoop.fs.Path;
import java.io.FileReader; import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
public class hdfs { public class hdfs {
public static void main(String[] args) { public static void main(String[] args) {
...@@ -20,14 +21,20 @@ public class hdfs { ...@@ -20,14 +21,20 @@ public class hdfs {
// } // }
// HDFSUtil.getInstance().downloadFile("/data/emr/order-1.csv","D:\\testorder"); // HDFSUtil.getInstance().downloadFile("/data/emr/order-1.csv","D:\\testorder");
Integer[] arr=new Integer[]{1,2,3,4,5,6};
Arrays.asList(arr).parallelStream().forEach(mid->{
Thread thread = Thread.currentThread();
String name = thread.getName();
System.out.println(name);
});
try{ // try{
CSVReader csvReader = new CSVReader(new FileReader("C:\\Users\\hua\\Desktop\\part-00000-fa2dd286-1eda-452d-91a3-a222beb0f327-c000.csv")); // CSVReader csvReader = new CSVReader(new FileReader("C:\\Users\\hua\\Desktop\\part-00000-fa2dd286-1eda-452d-91a3-a222beb0f327-c000.csv"));
String[] cells = csvReader.readNext(); // String[] cells = csvReader.readNext();
System.out.println(cells); // System.out.println(cells);
}catch (Exception e) { // }catch (Exception e) {
//
} // }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment