Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
G
gic-spark-tag-4.0
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
wangxiaokang
gic-spark-tag-4.0
Commits
a4178b7a
Commit
a4178b7a
authored
Aug 21, 2020
by
guos
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
会员标签4.0
parent
b600c7ae
Show whitespace changes
Inline
Side-by-side
Showing
39 changed files
with
53 additions
and
51 deletions
+53
-51
DataSourceEntity.java
...ava/com/gic/spark/datasource/entity/DataSourceEntity.java
+4
-0
DataSourceHive.java
.../java/com/gic/spark/datasource/entity/DataSourceHive.java
+6
-8
AbstractTagConsumFilter.java
...in/java/com/gic/spark/filter/AbstractTagConsumFilter.java
+1
-1
AbstractTagConsumRecordFilter.java
...a/com/gic/spark/filter/AbstractTagConsumRecordFilter.java
+1
-1
BaseTagFilter.java
src/main/java/com/gic/spark/filter/BaseTagFilter.java
+1
-1
TagAboutExpireIntegralFilter.java
...va/com/gic/spark/filter/TagAboutExpireIntegralFilter.java
+1
-1
TagAccumulatedIntegralFilter.java
...va/com/gic/spark/filter/TagAccumulatedIntegralFilter.java
+1
-1
TagAssociatedPurchaseRateFilter.java
...com/gic/spark/filter/TagAssociatedPurchaseRateFilter.java
+1
-1
TagAverageDiscountFactorFilter.java
.../com/gic/spark/filter/TagAverageDiscountFactorFilter.java
+1
-1
TagConsumeCommodityFilter.java
.../java/com/gic/spark/filter/TagConsumeCommodityFilter.java
+2
-2
TagConsumeTimeFilter.java
src/main/java/com/gic/spark/filter/TagConsumeTimeFilter.java
+1
-1
TagConsumeTotalFilter.java
...main/java/com/gic/spark/filter/TagConsumeTotalFilter.java
+1
-1
TagConsumptionSleepDaysFilter.java
...a/com/gic/spark/filter/TagConsumptionSleepDaysFilter.java
+1
-1
TagConsumptionTimeFilter.java
...n/java/com/gic/spark/filter/TagConsumptionTimeFilter.java
+1
-1
TagCouponFilter.java
src/main/java/com/gic/spark/filter/TagCouponFilter.java
+1
-1
TagCurrentCouponNumFilter.java
.../java/com/gic/spark/filter/TagCurrentCouponNumFilter.java
+1
-1
TagFirstConsumeCommodityFilter.java
.../com/gic/spark/filter/TagFirstConsumeCommodityFilter.java
+2
-2
TagFirstConsumeTimeFilter.java
.../java/com/gic/spark/filter/TagFirstConsumeTimeFilter.java
+1
-1
TagFirstConsumptionChannelFilter.java
...om/gic/spark/filter/TagFirstConsumptionChannelFilter.java
+1
-1
TagFirstConsumptionMoneyFilter.java
.../com/gic/spark/filter/TagFirstConsumptionMoneyFilter.java
+1
-1
TagFirstOfflineConsumptionStoreFilter.java
...c/spark/filter/TagFirstOfflineConsumptionStoreFilter.java
+1
-1
TagFirstOnlineConsumptionStoreFilter.java
...ic/spark/filter/TagFirstOnlineConsumptionStoreFilter.java
+1
-1
TagHistoryConsumeCommodityFilter.java
...om/gic/spark/filter/TagHistoryConsumeCommodityFilter.java
+2
-2
TagHistoryConsumeTotalFilter.java
...va/com/gic/spark/filter/TagHistoryConsumeTotalFilter.java
+1
-1
TagHistoryOfflineConsumptionStoreFilter.java
...spark/filter/TagHistoryOfflineConsumptionStoreFilter.java
+1
-1
TagHistoryOnlineConsumptionStoreFilter.java
.../spark/filter/TagHistoryOnlineConsumptionStoreFilter.java
+1
-1
TagLatelyConsumeCommodityFilter.java
...com/gic/spark/filter/TagLatelyConsumeCommodityFilter.java
+1
-1
TagLatelyConsumeTimeFilter.java
...java/com/gic/spark/filter/TagLatelyConsumeTimeFilter.java
+1
-1
TagLatelyConsumptionChannelFilter.java
...m/gic/spark/filter/TagLatelyConsumptionChannelFilter.java
+1
-1
TagLatelyConsumptionMoneyFilter.java
...com/gic/spark/filter/TagLatelyConsumptionMoneyFilter.java
+1
-1
TagLatelyOnlineConsumptionStoreFilter.java
...c/spark/filter/TagLatelyOnlineConsumptionStoreFilter.java
+1
-1
TagLowestSingleConsumptionMoneyFilter.java
...c/spark/filter/TagLowestSingleConsumptionMoneyFilter.java
+1
-1
TagOfflineConsumptionStoreFilter.java
...om/gic/spark/filter/TagOfflineConsumptionStoreFilter.java
+1
-1
TagOnlineConsumptionStoreFilter.java
...com/gic/spark/filter/TagOnlineConsumptionStoreFilter.java
+1
-1
TagPerCustomerTransactionFilter.java
...com/gic/spark/filter/TagPerCustomerTransactionFilter.java
+1
-1
TagRecentlyOfflineConsumptionStoreFilter.java
...park/filter/TagRecentlyOfflineConsumptionStoreFilter.java
+1
-1
TagTopSingleConsumptionMoneyFilter.java
.../gic/spark/filter/TagTopSingleConsumptionMoneyFilter.java
+1
-1
TagFilterFactory.java
src/main/java/com/gic/spark/tag/TagFilterFactory.java
+2
-2
TagProcessManager.java
src/main/java/com/gic/spark/tag/TagProcessManager.java
+3
-3
No files found.
src/main/java/com/gic/spark/datasource/entity/DataSourceEntity.java
View file @
a4178b7a
...
@@ -32,6 +32,10 @@ public abstract class DataSourceEntity {
...
@@ -32,6 +32,10 @@ public abstract class DataSourceEntity {
return
SparkEnvManager
.
getInstance
().
getSparkSession
().
sql
(
String
.
format
(
"select * from %s where enterprise_id='%s'"
,
getHiveTableName
(),
enterpriseId
));
return
SparkEnvManager
.
getInstance
().
getSparkSession
().
sql
(
String
.
format
(
"select * from %s where enterprise_id='%s'"
,
getHiveTableName
(),
enterpriseId
));
}
}
public
Dataset
<
Row
>
getDatasetByEntId
(
Integer
enterpriseId
)
{
return
SparkEnvManager
.
getInstance
().
getSparkSession
().
sql
(
String
.
format
(
"select * from %s where ent_id='%s'"
,
getHiveTableName
(),
enterpriseId
));
}
public
abstract
void
extractDataToPartitionedHiveTable
(
List
<
Integer
>
enterpriseList
);
public
abstract
void
extractDataToPartitionedHiveTable
(
List
<
Integer
>
enterpriseList
);
}
}
src/main/java/com/gic/spark/datasource/entity/DataSourceHive.java
View file @
a4178b7a
...
@@ -47,16 +47,14 @@ public class DataSourceHive extends DataSourceEntity {
...
@@ -47,16 +47,14 @@ public class DataSourceHive extends DataSourceEntity {
@Override
@Override
public
void
extractDataToPartitionedHiveTable
(
List
<
Integer
>
enterpriseList
)
{
public
void
extractDataToPartitionedHiveTable
(
List
<
Integer
>
enterpriseList
)
{
SparkSession
sparkSession
=
SparkEnvManager
.
getInstance
().
getSparkSession
();
SparkSession
sparkSession
=
SparkEnvManager
.
getInstance
().
getSparkSession
();
Dataset
<
Row
>
sourceDataset
=
sparkSession
//
Dataset<Row> sourceDataset = sparkSession
.
sql
(
String
.
format
(
"select * from %s where enterprise
_id in (%s)"
,
sourceTable
,
Joiner
.
on
(
"','"
).
join
(
enterpriseList
)))
// .sql(String.format("select * from %s where ent
_id in (%s)", sourceTable, Joiner.on("','").join(enterpriseList)))
.
repartition
(
new
Column
(
"enterprise
_id"
));
// .repartition(new Column("ent
_id"));
if
(
table
.
equals
(
"ads_gic_trd_ecu_sales_label_d"
))
{
Dataset
<
Row
>
sourceDataset
=
sparkSession
sourceDataset
=
sparkSession
.
sql
(
String
.
format
(
"select * from %s where ent_id in (%s) distribute by ent_id "
,
sourceTable
,
Joiner
.
on
(
"','"
).
join
(
enterpriseList
)));
.
sql
(
String
.
format
(
"select * from %s where enterprise_id in (%s) distribute by enterprise_id "
,
sourceTable
,
Joiner
.
on
(
"','"
).
join
(
enterpriseList
)));
}
SparkHiveUtil
.
createHivePartitionTable
(
sourceDataset
,
"ent
erprise
_id"
,
getSchema
(),
schema
.
replaceAll
(
"\\."
,
"_"
)
+
"_"
+
table
,
sparkSession
);
SparkHiveUtil
.
createHivePartitionTable
(
sourceDataset
,
"ent_id"
,
getSchema
(),
schema
.
replaceAll
(
"\\."
,
"_"
)
+
"_"
+
table
,
sparkSession
);
HivePartitionUtil
.
saveDatasetToPartitionTable
(
sparkSession
,
sourceDataset
,
getHiveTableName
());
HivePartitionUtil
.
saveDatasetToPartitionTable
(
sparkSession
,
sourceDataset
,
getHiveTableName
());
}
}
...
...
src/main/java/com/gic/spark/filter/AbstractTagConsumFilter.java
View file @
a4178b7a
...
@@ -14,7 +14,7 @@ import java.util.List;
...
@@ -14,7 +14,7 @@ import java.util.List;
* @author: wangxk
* @author: wangxk
* @date: 2020/5/7
* @date: 2020/5/7
*/
*/
public
abstract
class
AbstractTagConsumFilter
implements
TagFilter
{
public
abstract
class
AbstractTagConsumFilter
implements
Base
TagFilter
{
DataSourceHive
dataSourceHive
=
new
DataSourceHive
(
ConstantUtil
.
ADS_GIC_TRD_ECU_SALES_LABEL_D
);
DataSourceHive
dataSourceHive
=
new
DataSourceHive
(
ConstantUtil
.
ADS_GIC_TRD_ECU_SALES_LABEL_D
);
protected
static
JavaRDD
<
TrdEcuSalesLabelBean
>
statisticsTypeHandle
(
JavaRDD
<
TrdEcuSalesLabelBean
>
consumeRDD
,
TagConsumeRequest
consumeRequest
){
protected
static
JavaRDD
<
TrdEcuSalesLabelBean
>
statisticsTypeHandle
(
JavaRDD
<
TrdEcuSalesLabelBean
>
consumeRDD
,
TagConsumeRequest
consumeRequest
){
...
...
src/main/java/com/gic/spark/filter/AbstractTagConsumRecordFilter.java
View file @
a4178b7a
...
@@ -19,7 +19,7 @@ import java.util.List;
...
@@ -19,7 +19,7 @@ import java.util.List;
* @author: wangxk
* @author: wangxk
* @date: 2020/8/3
* @date: 2020/8/3
*/
*/
public
abstract
class
AbstractTagConsumRecordFilter
implements
TagFilter
{
public
abstract
class
AbstractTagConsumRecordFilter
implements
BaseTagFilter
{
DataSourceHive
dataSourceHive
=
new
DataSourceHive
(
ConstantUtil
.
DWD_GIC_TRD_VIRTUAL_WDORDER_D
);
DataSourceHive
dataSourceHive
=
new
DataSourceHive
(
ConstantUtil
.
DWD_GIC_TRD_VIRTUAL_WDORDER_D
);
...
...
src/main/java/com/gic/spark/filter/TagFilter.java
→
src/main/java/com/gic/spark/filter/
Base
TagFilter.java
View file @
a4178b7a
...
@@ -11,7 +11,7 @@ import java.util.List;
...
@@ -11,7 +11,7 @@ import java.util.List;
* <p>
* <p>
* TODO: class description
* TODO: class description
*/
*/
public
interface
TagFilter
{
public
interface
Base
TagFilter
{
List
<
DataSourceEntity
>
necessarySourceList
();
List
<
DataSourceEntity
>
necessarySourceList
();
...
...
src/main/java/com/gic/spark/filter/TagAboutExpireIntegralFilter.java
View file @
a4178b7a
...
@@ -15,7 +15,7 @@ import java.util.List;
...
@@ -15,7 +15,7 @@ import java.util.List;
* @author: wangxk
* @author: wangxk
* @date: 2020/4/24
* @date: 2020/4/24
*/
*/
public
class
TagAboutExpireIntegralFilter
implements
TagFilter
{
public
class
TagAboutExpireIntegralFilter
implements
Base
TagFilter
{
private
DataSourceSharding
dataSourceSharding
=
new
DataSourceSharding
(
AppEnvUtil
.
MEMBER_SHARDING_4
,
ConstantUtil
.
TAB_INTEGRAL_CU_CHANGE_LOG
);
private
DataSourceSharding
dataSourceSharding
=
new
DataSourceSharding
(
AppEnvUtil
.
MEMBER_SHARDING_4
,
ConstantUtil
.
TAB_INTEGRAL_CU_CHANGE_LOG
);
private
static
TagAboutExpireIntegralFilter
instance
;
private
static
TagAboutExpireIntegralFilter
instance
;
...
...
src/main/java/com/gic/spark/filter/TagAccumulatedIntegralFilter.java
View file @
a4178b7a
...
@@ -26,7 +26,7 @@ import java.util.Set;
...
@@ -26,7 +26,7 @@ import java.util.Set;
* @author: wangxk
* @author: wangxk
* @date: 2020/4/24
* @date: 2020/4/24
*/
*/
public
class
TagAccumulatedIntegralFilter
implements
TagFilter
{
public
class
TagAccumulatedIntegralFilter
implements
Base
TagFilter
{
private
DataSourceSharding
dataSourceSharding
=
new
DataSourceSharding
(
AppEnvUtil
.
MEMBER_SHARDING_4
,
ConstantUtil
.
TAB_INTEGRAL_CU_CHANGE_LOG
);
private
DataSourceSharding
dataSourceSharding
=
new
DataSourceSharding
(
AppEnvUtil
.
MEMBER_SHARDING_4
,
ConstantUtil
.
TAB_INTEGRAL_CU_CHANGE_LOG
);
private
static
TagAccumulatedIntegralFilter
instance
;
private
static
TagAccumulatedIntegralFilter
instance
;
...
...
src/main/java/com/gic/spark/filter/TagAssociatedPurchaseRateFilter.java
View file @
a4178b7a
...
@@ -37,7 +37,7 @@ public class TagAssociatedPurchaseRateFilter extends AbstractTagConsumFilter{
...
@@ -37,7 +37,7 @@ public class TagAssociatedPurchaseRateFilter extends AbstractTagConsumFilter{
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeRequest
consumeRequest
=(
TagConsumeRequest
)
request
;
TagConsumeRequest
consumeRequest
=(
TagConsumeRequest
)
request
;
JavaRDD
<
TrdEcuSalesLabelBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdEcuSalesLabelBean
.
class
).
javaRDD
();
JavaRDD
<
TrdEcuSalesLabelBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdEcuSalesLabelBean
.
class
).
javaRDD
();
consumeRDD
=
statisticsTypeHandle
(
consumeRDD
,
consumeRequest
);
consumeRDD
=
statisticsTypeHandle
(
consumeRDD
,
consumeRequest
);
JavaRDD
<
Long
>
ecuRdd
=
consumeRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
)).
groupByKey
()
JavaRDD
<
Long
>
ecuRdd
=
consumeRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
)).
groupByKey
()
.
mapPartitions
(
data
->{
.
mapPartitions
(
data
->{
...
...
src/main/java/com/gic/spark/filter/TagAverageDiscountFactorFilter.java
View file @
a4178b7a
...
@@ -39,7 +39,7 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter {
...
@@ -39,7 +39,7 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter {
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeRequest
consumeRequest
=(
TagConsumeRequest
)
request
;
TagConsumeRequest
consumeRequest
=(
TagConsumeRequest
)
request
;
JavaRDD
<
TrdEcuSalesLabelBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdEcuSalesLabelBean
.
class
).
javaRDD
();
JavaRDD
<
TrdEcuSalesLabelBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdEcuSalesLabelBean
.
class
).
javaRDD
();
consumeRDD
=
statisticsTypeHandle
(
consumeRDD
,
consumeRequest
);
consumeRDD
=
statisticsTypeHandle
(
consumeRDD
,
consumeRequest
);
JavaRDD
<
Long
>
ecuRdd
=
consumeRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
)).
groupByKey
()
JavaRDD
<
Long
>
ecuRdd
=
consumeRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
)).
groupByKey
()
.
mapPartitions
(
data
->{
.
mapPartitions
(
data
->{
...
...
src/main/java/com/gic/spark/filter/TagConsumeCommodityFilter.java
View file @
a4178b7a
...
@@ -46,14 +46,14 @@ public class TagConsumeCommodityFilter extends AbstractTagConsumRecordFilter{
...
@@ -46,14 +46,14 @@ public class TagConsumeCommodityFilter extends AbstractTagConsumRecordFilter{
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeCommodityRequest
commodityRequest
=(
TagConsumeCommodityRequest
)
request
;
TagConsumeCommodityRequest
commodityRequest
=(
TagConsumeCommodityRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
commodityRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
commodityRequest
);
JavaPairRDD
<
Long
,
Long
>
orderRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
JavaPairRDD
<
Long
,
Long
>
orderRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
.
filter
(
data
->
checkTime
(
commodityRequest
,
DateUtil
.
strToDate
(
data
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATETIME_19
).
getTime
()))
.
filter
(
data
->
checkTime
(
commodityRequest
,
DateUtil
.
strToDate
(
data
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATETIME_19
).
getTime
()))
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
.
getEcu_id
()));
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
.
getEcu_id
()));
JavaPairRDD
<
Long
,
Long
>
orderItemRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrderItem
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderItemBean
.
class
).
javaRDD
()
JavaPairRDD
<
Long
,
Long
>
orderItemRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrderItem
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderItemBean
.
class
).
javaRDD
()
.
filter
(
data
->{
.
filter
(
data
->{
if
(
StringUtils
.
isNotEmpty
(
data
.
getSku_code
())
if
(
StringUtils
.
isNotEmpty
(
data
.
getSku_code
())
&&
commodityRequest
.
getSkuCodeList
().
contains
(
data
.
getSku_code
())){
&&
commodityRequest
.
getSkuCodeList
().
contains
(
data
.
getSku_code
())){
...
...
src/main/java/com/gic/spark/filter/TagConsumeTimeFilter.java
View file @
a4178b7a
...
@@ -42,7 +42,7 @@ public class TagConsumeTimeFilter extends AbstractTagConsumRecordFilter{
...
@@ -42,7 +42,7 @@ public class TagConsumeTimeFilter extends AbstractTagConsumRecordFilter{
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeTimeRequest
consumeTimeRequest
=(
TagConsumeTimeRequest
)
request
;
TagConsumeTimeRequest
consumeTimeRequest
=(
TagConsumeTimeRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
)
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
)
.
filter
(
new
Column
(
"is_eff_order"
).
equalTo
(
1
)),
TrdVirtualOrderBean
.
class
).
javaRDD
();
.
filter
(
new
Column
(
"is_eff_order"
).
equalTo
(
1
)),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeTimeRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeTimeRequest
);
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
...
...
src/main/java/com/gic/spark/filter/TagConsumeTotalFilter.java
View file @
a4178b7a
...
@@ -41,7 +41,7 @@ public class TagConsumeTotalFilter extends AbstractTagConsumRecordFilter {
...
@@ -41,7 +41,7 @@ public class TagConsumeTotalFilter extends AbstractTagConsumRecordFilter {
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
...
...
src/main/java/com/gic/spark/filter/TagConsumptionSleepDaysFilter.java
View file @
a4178b7a
...
@@ -38,7 +38,7 @@ public class TagConsumptionSleepDaysFilter extends AbstractTagConsumFilter{
...
@@ -38,7 +38,7 @@ public class TagConsumptionSleepDaysFilter extends AbstractTagConsumFilter{
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeRequest
consumeRequest
=(
TagConsumeRequest
)
request
;
TagConsumeRequest
consumeRequest
=(
TagConsumeRequest
)
request
;
JavaRDD
<
TrdEcuSalesLabelBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdEcuSalesLabelBean
.
class
).
javaRDD
();
JavaRDD
<
TrdEcuSalesLabelBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdEcuSalesLabelBean
.
class
).
javaRDD
();
consumeRDD
=
statisticsTypeHandle
(
consumeRDD
,
consumeRequest
);
consumeRDD
=
statisticsTypeHandle
(
consumeRDD
,
consumeRequest
);
JavaRDD
<
Long
>
ecuRdd
=
consumeRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
.
getSleep_days
()))
JavaRDD
<
Long
>
ecuRdd
=
consumeRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
.
getSleep_days
()))
.
reduceByKey
((
x
,
y
)->
x
>
y
?
y:
x
)
.
reduceByKey
((
x
,
y
)->
x
>
y
?
y:
x
)
...
...
src/main/java/com/gic/spark/filter/TagConsumptionTimeFilter.java
View file @
a4178b7a
...
@@ -36,7 +36,7 @@ public class TagConsumptionTimeFilter extends AbstractTagConsumFilter{
...
@@ -36,7 +36,7 @@ public class TagConsumptionTimeFilter extends AbstractTagConsumFilter{
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeRequest
consumeRequest
=(
TagConsumeRequest
)
request
;
TagConsumeRequest
consumeRequest
=(
TagConsumeRequest
)
request
;
JavaRDD
<
TrdEcuSalesLabelBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdEcuSalesLabelBean
.
class
).
javaRDD
();
JavaRDD
<
TrdEcuSalesLabelBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdEcuSalesLabelBean
.
class
).
javaRDD
();
consumeRDD
=
statisticsTypeHandle
(
consumeRDD
,
consumeRequest
);
consumeRDD
=
statisticsTypeHandle
(
consumeRDD
,
consumeRequest
);
JavaRDD
<
Long
>
ecuRdd
=
consumeRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
.
getOrder_times
())).
reduceByKey
((
x
,
y
)->
x
+
y
)
JavaRDD
<
Long
>
ecuRdd
=
consumeRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
.
getOrder_times
())).
reduceByKey
((
x
,
y
)->
x
+
y
)
.
mapPartitions
(
data
->{
.
mapPartitions
(
data
->{
...
...
src/main/java/com/gic/spark/filter/TagCouponFilter.java
View file @
a4178b7a
...
@@ -20,7 +20,7 @@ import java.util.Set;
...
@@ -20,7 +20,7 @@ import java.util.Set;
* @author: wangxk
* @author: wangxk
* @date: 2020/4/29
* @date: 2020/4/29
*/
*/
public
class
TagCouponFilter
implements
TagFilter
{
public
class
TagCouponFilter
implements
Base
TagFilter
{
private
DataSourceSharding
dataSourceSharding
=
new
DataSourceSharding
(
AppEnvUtil
.
MARKETING_SHARDING
,
ConstantUtil
.
TAB_COUPON_LOG
);
private
DataSourceSharding
dataSourceSharding
=
new
DataSourceSharding
(
AppEnvUtil
.
MARKETING_SHARDING
,
ConstantUtil
.
TAB_COUPON_LOG
);
private
static
TagCouponFilter
instance
;
private
static
TagCouponFilter
instance
;
...
...
src/main/java/com/gic/spark/filter/TagCurrentCouponNumFilter.java
View file @
a4178b7a
...
@@ -22,7 +22,7 @@ import java.util.Set;
...
@@ -22,7 +22,7 @@ import java.util.Set;
* @author: wangxk
* @author: wangxk
* @date: 2020/4/20
* @date: 2020/4/20
*/
*/
public
class
TagCurrentCouponNumFilter
implements
TagFilter
{
public
class
TagCurrentCouponNumFilter
implements
Base
TagFilter
{
private
DataSourceSharding
dataSourceSharding
=
new
DataSourceSharding
(
AppEnvUtil
.
MARKETING_SHARDING
,
ConstantUtil
.
TAB_COUPON_LOG
);
private
DataSourceSharding
dataSourceSharding
=
new
DataSourceSharding
(
AppEnvUtil
.
MARKETING_SHARDING
,
ConstantUtil
.
TAB_COUPON_LOG
);
private
static
TagCurrentCouponNumFilter
instance
;
private
static
TagCurrentCouponNumFilter
instance
;
...
...
src/main/java/com/gic/spark/filter/TagFirstConsumeCommodityFilter.java
View file @
a4178b7a
...
@@ -46,7 +46,7 @@ public class TagFirstConsumeCommodityFilter extends AbstractTagConsumRecordFilte
...
@@ -46,7 +46,7 @@ public class TagFirstConsumeCommodityFilter extends AbstractTagConsumRecordFilte
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeCommodityRequest
commodityRequest
=(
TagConsumeCommodityRequest
)
request
;
TagConsumeCommodityRequest
commodityRequest
=(
TagConsumeCommodityRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
commodityRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
commodityRequest
);
JavaPairRDD
<
Long
,
Long
>
orderRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
JavaPairRDD
<
Long
,
Long
>
orderRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
...
@@ -60,7 +60,7 @@ public class TagFirstConsumeCommodityFilter extends AbstractTagConsumRecordFilte
...
@@ -60,7 +60,7 @@ public class TagFirstConsumeCommodityFilter extends AbstractTagConsumRecordFilte
}
}
})
})
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
_2
().
getVirtual_id
(),
data
.
_1
()));
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
_2
().
getVirtual_id
(),
data
.
_1
()));
JavaPairRDD
<
Long
,
Long
>
orderItemRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrderItem
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderItemBean
.
class
).
javaRDD
()
JavaPairRDD
<
Long
,
Long
>
orderItemRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrderItem
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderItemBean
.
class
).
javaRDD
()
.
filter
(
data
->{
.
filter
(
data
->{
if
(
StringUtils
.
isNotEmpty
(
data
.
getSku_code
())
if
(
StringUtils
.
isNotEmpty
(
data
.
getSku_code
())
&&
commodityRequest
.
getSkuCodeList
().
contains
(
data
.
getSku_code
())){
&&
commodityRequest
.
getSkuCodeList
().
contains
(
data
.
getSku_code
())){
...
...
src/main/java/com/gic/spark/filter/TagFirstConsumeTimeFilter.java
View file @
a4178b7a
...
@@ -41,7 +41,7 @@ public class TagFirstConsumeTimeFilter extends AbstractTagConsumRecordFilter {
...
@@ -41,7 +41,7 @@ public class TagFirstConsumeTimeFilter extends AbstractTagConsumRecordFilter {
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeTimeRequest
consumeTimeRequest
=(
TagConsumeTimeRequest
)
request
;
TagConsumeTimeRequest
consumeTimeRequest
=(
TagConsumeTimeRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
)
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
)
.
filter
(
new
Column
(
"is_eff_order"
).
equalTo
(
1
)),
TrdVirtualOrderBean
.
class
).
javaRDD
();
.
filter
(
new
Column
(
"is_eff_order"
).
equalTo
(
1
)),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeTimeRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeTimeRequest
);
...
...
src/main/java/com/gic/spark/filter/TagFirstConsumptionChannelFilter.java
View file @
a4178b7a
...
@@ -37,7 +37,7 @@ public class TagFirstConsumptionChannelFilter extends AbstractTagConsumRecordFil
...
@@ -37,7 +37,7 @@ public class TagFirstConsumptionChannelFilter extends AbstractTagConsumRecordFil
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeChannelRequest
channelRequest
=(
TagConsumeChannelRequest
)
request
;
TagConsumeChannelRequest
channelRequest
=(
TagConsumeChannelRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
channelRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
channelRequest
);
...
...
src/main/java/com/gic/spark/filter/TagFirstConsumptionMoneyFilter.java
View file @
a4178b7a
...
@@ -39,7 +39,7 @@ public class TagFirstConsumptionMoneyFilter extends AbstractTagConsumRecordFilte
...
@@ -39,7 +39,7 @@ public class TagFirstConsumptionMoneyFilter extends AbstractTagConsumRecordFilte
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
...
...
src/main/java/com/gic/spark/filter/TagFirstOfflineConsumptionStoreFilter.java
View file @
a4178b7a
...
@@ -42,7 +42,7 @@ public class TagFirstOfflineConsumptionStoreFilter extends AbstractTagConsumReco
...
@@ -42,7 +42,7 @@ public class TagFirstOfflineConsumptionStoreFilter extends AbstractTagConsumReco
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()==
1
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()==
1
...
...
src/main/java/com/gic/spark/filter/TagFirstOnlineConsumptionStoreFilter.java
View file @
a4178b7a
...
@@ -41,7 +41,7 @@ public class TagFirstOnlineConsumptionStoreFilter extends AbstractTagConsumRecor
...
@@ -41,7 +41,7 @@ public class TagFirstOnlineConsumptionStoreFilter extends AbstractTagConsumRecor
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()!=
1
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()!=
1
...
...
src/main/java/com/gic/spark/filter/TagHistoryConsumeCommodityFilter.java
View file @
a4178b7a
...
@@ -46,10 +46,10 @@ public class TagHistoryConsumeCommodityFilter extends AbstractTagConsumRecordFil
...
@@ -46,10 +46,10 @@ public class TagHistoryConsumeCommodityFilter extends AbstractTagConsumRecordFil
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeCommodityRequest
commodityRequest
=(
TagConsumeCommodityRequest
)
request
;
TagConsumeCommodityRequest
commodityRequest
=(
TagConsumeCommodityRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
commodityRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
commodityRequest
);
JavaRDD
<
TrdVirtualOrderItemBean
>
orderItemRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrderItem
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderItemBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderItemBean
>
orderItemRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrderItem
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderItemBean
.
class
).
javaRDD
();
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
.
getEcu_id
()))
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
.
getEcu_id
()))
.
leftOuterJoin
(
orderItemRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_order_id
(),
data
.
getSku_code
()))
.
leftOuterJoin
(
orderItemRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_order_id
(),
data
.
getSku_code
()))
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
_2
())
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
_2
())
...
...
src/main/java/com/gic/spark/filter/TagHistoryConsumeTotalFilter.java
View file @
a4178b7a
...
@@ -40,7 +40,7 @@ public class TagHistoryConsumeTotalFilter extends AbstractTagConsumRecordFilter{
...
@@ -40,7 +40,7 @@ public class TagHistoryConsumeTotalFilter extends AbstractTagConsumRecordFilter{
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
...
...
src/main/java/com/gic/spark/filter/TagHistoryOfflineConsumptionStoreFilter.java
View file @
a4178b7a
...
@@ -40,7 +40,7 @@ public class TagHistoryOfflineConsumptionStoreFilter extends AbstractTagConsumRe
...
@@ -40,7 +40,7 @@ public class TagHistoryOfflineConsumptionStoreFilter extends AbstractTagConsumRe
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()==
1
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()==
1
...
...
src/main/java/com/gic/spark/filter/TagHistoryOnlineConsumptionStoreFilter.java
View file @
a4178b7a
...
@@ -40,7 +40,7 @@ public class TagHistoryOnlineConsumptionStoreFilter extends AbstractTagConsumRec
...
@@ -40,7 +40,7 @@ public class TagHistoryOnlineConsumptionStoreFilter extends AbstractTagConsumRec
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()!=
1
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()!=
1
...
...
src/main/java/com/gic/spark/filter/TagLatelyConsumeCommodityFilter.java
View file @
a4178b7a
...
@@ -46,7 +46,7 @@ public class TagLatelyConsumeCommodityFilter extends AbstractTagConsumRecordFilt
...
@@ -46,7 +46,7 @@ public class TagLatelyConsumeCommodityFilter extends AbstractTagConsumRecordFilt
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeCommodityRequest
commodityRequest
=(
TagConsumeCommodityRequest
)
request
;
TagConsumeCommodityRequest
commodityRequest
=(
TagConsumeCommodityRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
commodityRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
commodityRequest
);
JavaPairRDD
<
Long
,
Long
>
orderRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
JavaPairRDD
<
Long
,
Long
>
orderRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
...
...
src/main/java/com/gic/spark/filter/TagLatelyConsumeTimeFilter.java
View file @
a4178b7a
...
@@ -40,7 +40,7 @@ public class TagLatelyConsumeTimeFilter extends AbstractTagConsumRecordFilter{
...
@@ -40,7 +40,7 @@ public class TagLatelyConsumeTimeFilter extends AbstractTagConsumRecordFilter{
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeTimeRequest
consumeTimeRequest
=(
TagConsumeTimeRequest
)
request
;
TagConsumeTimeRequest
consumeTimeRequest
=(
TagConsumeTimeRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
)
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
)
.
filter
(
new
Column
(
"is_eff_order"
).
equalTo
(
1
)),
TrdVirtualOrderBean
.
class
).
javaRDD
();
.
filter
(
new
Column
(
"is_eff_order"
).
equalTo
(
1
)),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeTimeRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeTimeRequest
);
...
...
src/main/java/com/gic/spark/filter/TagLatelyConsumptionChannelFilter.java
View file @
a4178b7a
...
@@ -40,7 +40,7 @@ public class TagLatelyConsumptionChannelFilter extends AbstractTagConsumRecordFi
...
@@ -40,7 +40,7 @@ public class TagLatelyConsumptionChannelFilter extends AbstractTagConsumRecordFi
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeChannelRequest
channelRequest
=(
TagConsumeChannelRequest
)
request
;
TagConsumeChannelRequest
channelRequest
=(
TagConsumeChannelRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
channelRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
channelRequest
);
...
...
src/main/java/com/gic/spark/filter/TagLatelyConsumptionMoneyFilter.java
View file @
a4178b7a
...
@@ -42,7 +42,7 @@ public class TagLatelyConsumptionMoneyFilter extends AbstractTagConsumRecordFil
...
@@ -42,7 +42,7 @@ public class TagLatelyConsumptionMoneyFilter extends AbstractTagConsumRecordFil
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
...
...
src/main/java/com/gic/spark/filter/TagLatelyOnlineConsumptionStoreFilter.java
View file @
a4178b7a
...
@@ -41,7 +41,7 @@ public class TagLatelyOnlineConsumptionStoreFilter extends AbstractTagConsumReco
...
@@ -41,7 +41,7 @@ public class TagLatelyOnlineConsumptionStoreFilter extends AbstractTagConsumReco
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()!=
1
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()!=
1
...
...
src/main/java/com/gic/spark/filter/TagLowestSingleConsumptionMoneyFilter.java
View file @
a4178b7a
...
@@ -40,7 +40,7 @@ public class TagLowestSingleConsumptionMoneyFilter extends AbstractTagConsumReco
...
@@ -40,7 +40,7 @@ public class TagLowestSingleConsumptionMoneyFilter extends AbstractTagConsumReco
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
...
...
src/main/java/com/gic/spark/filter/TagOfflineConsumptionStoreFilter.java
View file @
a4178b7a
...
@@ -43,7 +43,7 @@ public class TagOfflineConsumptionStoreFilter extends AbstractTagConsumRecordFil
...
@@ -43,7 +43,7 @@ public class TagOfflineConsumptionStoreFilter extends AbstractTagConsumRecordFil
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
storeRequest
.
setYearMonthDayType
(
YearMonthDayType
.
DAY
);
storeRequest
.
setYearMonthDayType
(
YearMonthDayType
.
DAY
);
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()==
1
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()==
1
...
...
src/main/java/com/gic/spark/filter/TagOnlineConsumptionStoreFilter.java
View file @
a4178b7a
...
@@ -38,7 +38,7 @@ public class TagOnlineConsumptionStoreFilter extends AbstractTagConsumRecordFilt
...
@@ -38,7 +38,7 @@ public class TagOnlineConsumptionStoreFilter extends AbstractTagConsumRecordFilt
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
storeRequest
.
setYearMonthDayType
(
YearMonthDayType
.
DAY
);
storeRequest
.
setYearMonthDayType
(
YearMonthDayType
.
DAY
);
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()!=
1
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()!=
1
...
...
src/main/java/com/gic/spark/filter/TagPerCustomerTransactionFilter.java
View file @
a4178b7a
...
@@ -37,7 +37,7 @@ public class TagPerCustomerTransactionFilter extends AbstractTagConsumFilter{
...
@@ -37,7 +37,7 @@ public class TagPerCustomerTransactionFilter extends AbstractTagConsumFilter{
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeRequest
consumeRequest
=(
TagConsumeRequest
)
request
;
TagConsumeRequest
consumeRequest
=(
TagConsumeRequest
)
request
;
JavaRDD
<
TrdEcuSalesLabelBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdEcuSalesLabelBean
.
class
).
javaRDD
();
JavaRDD
<
TrdEcuSalesLabelBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdEcuSalesLabelBean
.
class
).
javaRDD
();
consumeRDD
=
statisticsTypeHandle
(
consumeRDD
,
consumeRequest
);
consumeRDD
=
statisticsTypeHandle
(
consumeRDD
,
consumeRequest
);
JavaRDD
<
Long
>
ecuRdd
=
consumeRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
)).
groupByKey
()
JavaRDD
<
Long
>
ecuRdd
=
consumeRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
)).
groupByKey
()
.
mapPartitions
(
data
->{
.
mapPartitions
(
data
->{
...
...
src/main/java/com/gic/spark/filter/TagRecentlyOfflineConsumptionStoreFilter.java
View file @
a4178b7a
...
@@ -40,7 +40,7 @@ public class TagRecentlyOfflineConsumptionStoreFilter extends AbstractTagConsumR
...
@@ -40,7 +40,7 @@ public class TagRecentlyOfflineConsumptionStoreFilter extends AbstractTagConsumR
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()==
1
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()==
1
...
...
src/main/java/com/gic/spark/filter/TagTopSingleConsumptionMoneyFilter.java
View file @
a4178b7a
...
@@ -39,7 +39,7 @@ public class TagTopSingleConsumptionMoneyFilter extends AbstractTagConsumRecord
...
@@ -39,7 +39,7 @@ public class TagTopSingleConsumptionMoneyFilter extends AbstractTagConsumRecord
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnt
erprise
Id
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
...
...
src/main/java/com/gic/spark/tag/TagFilterFactory.java
View file @
a4178b7a
...
@@ -23,8 +23,8 @@ public class TagFilterFactory {
...
@@ -23,8 +23,8 @@ public class TagFilterFactory {
}
}
private
TagFilterFactory
(){}
private
TagFilterFactory
(){}
public
TagFilter
getTagFilter
(
TagConditionDTO
conditionDTO
){
public
Base
TagFilter
getTagFilter
(
TagConditionDTO
conditionDTO
){
TagFilter
tagFilter
=
null
;
Base
TagFilter
tagFilter
=
null
;
if
(
StringUtils
.
isNotEmpty
(
conditionDTO
.
getTagEsFieldName
())){
if
(
StringUtils
.
isNotEmpty
(
conditionDTO
.
getTagEsFieldName
())){
switch
(
conditionDTO
.
getTagEsFieldName
())
{
switch
(
conditionDTO
.
getTagEsFieldName
())
{
//积分信息
//积分信息
...
...
src/main/java/com/gic/spark/tag/TagProcessManager.java
View file @
a4178b7a
...
@@ -129,7 +129,7 @@ public class TagProcessManager {
...
@@ -129,7 +129,7 @@ public class TagProcessManager {
// }
// }
Map
<
Integer
,
List
<
TagProcessEntity
>>
tagGroupByEnterpriseMap
=
new
HashMap
<>();
Map
<
Integer
,
List
<
TagProcessEntity
>>
tagGroupByEnterpriseMap
=
new
HashMap
<>();
Map
<
Long
,
TagFilter
>
tagIdToFilterMap
=
new
HashMap
();
Map
<
Long
,
Base
TagFilter
>
tagIdToFilterMap
=
new
HashMap
();
for
(
SceneCrowdDTO
sceneCrowdDTO:
sceneCrowdDTOList
){
for
(
SceneCrowdDTO
sceneCrowdDTO:
sceneCrowdDTOList
){
if
(
sceneCrowdDTO
.
getReal_Time
()==
2
){
if
(
sceneCrowdDTO
.
getReal_Time
()==
2
){
...
@@ -143,7 +143,7 @@ public class TagProcessManager {
...
@@ -143,7 +143,7 @@ public class TagProcessManager {
entity
.
tagList
=
conditionGroupDTOS
.
get
(
i
).
getConditionInfos
();
entity
.
tagList
=
conditionGroupDTOS
.
get
(
i
).
getConditionInfos
();
for
(
TagConditionDTO
conditionDTO:
entity
.
tagList
){
//将tag同filter进行映射
for
(
TagConditionDTO
conditionDTO:
entity
.
tagList
){
//将tag同filter进行映射
TagFilter
tagFilter
=
TagFilterFactory
.
getInstance
().
getTagFilter
(
conditionDTO
);
Base
TagFilter
tagFilter
=
TagFilterFactory
.
getInstance
().
getTagFilter
(
conditionDTO
);
if
(
null
!=
tagFilter
){
if
(
null
!=
tagFilter
){
tagIdToFilterMap
.
put
(
conditionDTO
.
getTagId
(),
tagFilter
);
tagIdToFilterMap
.
put
(
conditionDTO
.
getTagId
(),
tagFilter
);
...
@@ -198,7 +198,7 @@ public class TagProcessManager {
...
@@ -198,7 +198,7 @@ public class TagProcessManager {
for
(
TagProcessEntity
entity:
enterpriseTagEntry
.
getValue
()){
for
(
TagProcessEntity
entity:
enterpriseTagEntry
.
getValue
()){
for
(
TagConditionDTO
conditionDTO:
entity
.
tagList
){
for
(
TagConditionDTO
conditionDTO:
entity
.
tagList
){
if
(
tagIdToFilterMap
.
containsKey
(
conditionDTO
.
getTagId
())){
if
(
tagIdToFilterMap
.
containsKey
(
conditionDTO
.
getTagId
())){
TagFilter
tagFilter
=
tagIdToFilterMap
.
get
(
conditionDTO
.
getTagId
());
Base
TagFilter
tagFilter
=
tagIdToFilterMap
.
get
(
conditionDTO
.
getTagId
());
AbstractFilterRequest
filterRequest
=
TagValueParser
.
parseFilterValue
(
conditionDTO
);
AbstractFilterRequest
filterRequest
=
TagValueParser
.
parseFilterValue
(
conditionDTO
);
final
String
groupId
=
entity
.
tagGroupId
+
"_"
+
conditionDTO
.
getTagId
()
+
"_"
+
entity
.
level
;
final
String
groupId
=
entity
.
tagGroupId
+
"_"
+
conditionDTO
.
getTagId
()
+
"_"
+
entity
.
level
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment