Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
G
gic-spark-tag-4.0
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
wangxiaokang
gic-spark-tag-4.0
Commits
742d13ef
Commit
742d13ef
authored
Mar 08, 2021
by
zhangyannao
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update
parent
292c03a3
Hide whitespace changes
Inline
Side-by-side
Showing
15 changed files
with
701 additions
and
680 deletions
+701
-680
.gitignore
.gitignore
+2
-0
SparkTagProcessByEnterpriseId.java
...ain/java/com/gic/spark/SparkTagProcessByEnterpriseId.java
+6
-6
SparkTagProcessByTagGroupId.java
src/main/java/com/gic/spark/SparkTagProcessByTagGroupId.java
+6
-6
DataSourceMysql.java
...java/com/gic/spark/datasource/entity/DataSourceMysql.java
+2
-2
AbstractTagConsumRecordFilter.java
...a/com/gic/spark/filter/AbstractTagConsumRecordFilter.java
+53
-51
TagAverageDiscountFactorFilter.java
.../com/gic/spark/filter/TagAverageDiscountFactorFilter.java
+65
-63
TagConsumeCommodityFilter.java
.../java/com/gic/spark/filter/TagConsumeCommodityFilter.java
+28
-30
TagCouponFilter.java
src/main/java/com/gic/spark/filter/TagCouponFilter.java
+62
-57
TagCurrentCouponNumFilter.java
.../java/com/gic/spark/filter/TagCurrentCouponNumFilter.java
+85
-79
TagFirstConsumptionMoneyFilter.java
.../com/gic/spark/filter/TagFirstConsumptionMoneyFilter.java
+93
-87
TagFirstOnlineConsumptionStoreFilter.java
...ic/spark/filter/TagFirstOnlineConsumptionStoreFilter.java
+25
-24
TagHistoryConsumeCommodityFilter.java
...om/gic/spark/filter/TagHistoryConsumeCommodityFilter.java
+21
-25
TagHistoryConsumeTotalFilter.java
...va/com/gic/spark/filter/TagHistoryConsumeTotalFilter.java
+54
-52
TagProcessManager.java
src/main/java/com/gic/spark/tag/TagProcessManager.java
+176
-180
EsRequestUtil.java
src/main/java/com/gic/spark/util/EsRequestUtil.java
+23
-18
No files found.
.gitignore
0 → 100644
View file @
742d13ef
/target
/gic-spark-tag-4.0.iml
src/main/java/com/gic/spark/SparkTagProcessByEnterpriseId.java
View file @
742d13ef
...
@@ -5,8 +5,8 @@ import com.gic.spark.application.SparkEnvManager;
...
@@ -5,8 +5,8 @@ import com.gic.spark.application.SparkEnvManager;
import
com.gic.spark.config.SparkConfigManager
;
import
com.gic.spark.config.SparkConfigManager
;
import
com.gic.spark.tag.TagProcessManager
;
import
com.gic.spark.tag.TagProcessManager
;
import
java.util.
ArrayLis
t
;
import
java.util.
HashSe
t
;
import
java.util.
Lis
t
;
import
java.util.
Se
t
;
/**
/**
* @description:
* @description:
...
@@ -15,7 +15,7 @@ import java.util.List;
...
@@ -15,7 +15,7 @@ import java.util.List;
*/
*/
public
class
SparkTagProcessByEnterpriseId
{
public
class
SparkTagProcessByEnterpriseId
{
public
static
void
main
(
String
[]
args
){
public
static
void
main
(
String
[]
args
)
{
if
(
args
==
null
||
args
.
length
<
3
)
{
if
(
args
==
null
||
args
.
length
<
3
)
{
System
.
err
.
println
(
"invalidate input params:"
);
System
.
err
.
println
(
"invalidate input params:"
);
System
.
err
.
println
(
"1.is production: true/false"
);
System
.
err
.
println
(
"1.is production: true/false"
);
...
@@ -26,9 +26,9 @@ public class SparkTagProcessByEnterpriseId {
...
@@ -26,9 +26,9 @@ public class SparkTagProcessByEnterpriseId {
boolean
isProd
=
Boolean
.
parseBoolean
(
args
[
0
]);
boolean
isProd
=
Boolean
.
parseBoolean
(
args
[
0
]);
boolean
extractData
=
Boolean
.
parseBoolean
(
args
[
1
]);
boolean
extractData
=
Boolean
.
parseBoolean
(
args
[
1
]);
String
[]
arrs
=
args
[
2
].
split
(
","
);
String
[]
arrs
=
args
[
2
].
split
(
","
);
List
<
Integer
>
params
=
new
ArrayList
();
Set
<
Integer
>
params
=
new
HashSet
<>
();
for
(
String
s:
arrs
)
{
for
(
String
s
:
arrs
)
{
params
.
add
(
Integer
.
parseInt
(
s
));
params
.
add
(
Integer
.
parseInt
(
s
));
}
}
ShardingConfigManager
.
initDefualt
();
ShardingConfigManager
.
initDefualt
();
...
...
src/main/java/com/gic/spark/SparkTagProcessByTagGroupId.java
View file @
742d13ef
...
@@ -5,8 +5,8 @@ import com.gic.spark.application.SparkEnvManager;
...
@@ -5,8 +5,8 @@ import com.gic.spark.application.SparkEnvManager;
import
com.gic.spark.config.SparkConfigManager
;
import
com.gic.spark.config.SparkConfigManager
;
import
com.gic.spark.tag.TagProcessManager
;
import
com.gic.spark.tag.TagProcessManager
;
import
java.util.
ArrayLis
t
;
import
java.util.
HashSe
t
;
import
java.util.
Lis
t
;
import
java.util.
Se
t
;
/**
/**
* @description:
* @description:
...
@@ -15,7 +15,7 @@ import java.util.List;
...
@@ -15,7 +15,7 @@ import java.util.List;
*/
*/
public
class
SparkTagProcessByTagGroupId
{
public
class
SparkTagProcessByTagGroupId
{
public
static
void
main
(
String
[]
args
){
public
static
void
main
(
String
[]
args
)
{
if
(
args
==
null
||
args
.
length
<
3
)
{
if
(
args
==
null
||
args
.
length
<
3
)
{
System
.
err
.
println
(
"invalidate input params:"
);
System
.
err
.
println
(
"invalidate input params:"
);
System
.
err
.
println
(
"1.is production: true/false"
);
System
.
err
.
println
(
"1.is production: true/false"
);
...
@@ -26,9 +26,9 @@ public class SparkTagProcessByTagGroupId {
...
@@ -26,9 +26,9 @@ public class SparkTagProcessByTagGroupId {
boolean
isProd
=
Boolean
.
parseBoolean
(
args
[
0
]);
boolean
isProd
=
Boolean
.
parseBoolean
(
args
[
0
]);
boolean
extractData
=
Boolean
.
parseBoolean
(
args
[
1
]);
boolean
extractData
=
Boolean
.
parseBoolean
(
args
[
1
]);
String
[]
arrs
=
args
[
2
].
split
(
","
);
String
[]
arrs
=
args
[
2
].
split
(
","
);
List
<
Long
>
params
=
new
ArrayList
();
Set
<
Long
>
params
=
new
HashSet
<>
();
for
(
String
s:
arrs
)
{
for
(
String
s
:
arrs
)
{
params
.
add
(
Long
.
parseLong
(
s
));
params
.
add
(
Long
.
parseLong
(
s
));
}
}
ShardingConfigManager
.
initDefualt
();
ShardingConfigManager
.
initDefualt
();
...
...
src/main/java/com/gic/spark/datasource/entity/DataSourceMysql.java
View file @
742d13ef
...
@@ -43,7 +43,7 @@ public class DataSourceMysql extends DataSourceEntity {
...
@@ -43,7 +43,7 @@ public class DataSourceMysql extends DataSourceEntity {
@Override
@Override
public
String
getHiveTableName
()
{
public
String
getHiveTableName
()
{
return
getSchema
()
+
"."
+
datasource
.
getSchema
().
replaceAll
(
"\\."
,
"_"
)
+
"_"
+
targetTable
;
return
getSchema
()
+
"."
+
datasource
.
getSchema
().
replaceAll
(
"\\."
,
"_"
)
+
"_"
+
targetTable
;
}
}
@Override
@Override
...
@@ -51,7 +51,7 @@ public class DataSourceMysql extends DataSourceEntity {
...
@@ -51,7 +51,7 @@ public class DataSourceMysql extends DataSourceEntity {
SparkSession
sparkSession
=
SparkEnvManager
.
getInstance
().
getSparkSession
();
SparkSession
sparkSession
=
SparkEnvManager
.
getInstance
().
getSparkSession
();
Dataset
<
Row
>
sourceDataset
=
datasource
.
buildRddManager
().
getDatasetByEnterpriseIds
(
targetTable
,
enterpriseList
,
null
,
null
).
repartition
(
new
Column
(
"enterprise_id"
));
Dataset
<
Row
>
sourceDataset
=
datasource
.
buildRddManager
().
getDatasetByEnterpriseIds
(
targetTable
,
enterpriseList
,
null
,
null
).
repartition
(
new
Column
(
"enterprise_id"
));
SparkHiveUtil
.
createHivePartitionTable
(
sourceDataset
,
"enterprise_id"
,
getSchema
(),
datasource
.
getSchema
().
replaceAll
(
"\\."
,
"_"
)
+
"_"
+
targetTable
,
sparkSession
);
SparkHiveUtil
.
createHivePartitionTable
(
sourceDataset
,
"enterprise_id"
,
getSchema
(),
datasource
.
getSchema
().
replaceAll
(
"\\."
,
"_"
)
+
"_"
+
targetTable
,
sparkSession
);
HivePartitionUtil
.
saveDatasetToPartitionTable
(
sparkSession
,
sourceDataset
,
getHiveTableName
());
HivePartitionUtil
.
saveDatasetToPartitionTable
(
sparkSession
,
sourceDataset
,
getHiveTableName
());
...
...
src/main/java/com/gic/spark/filter/AbstractTagConsumRecordFilter.java
View file @
742d13ef
...
@@ -16,82 +16,84 @@ import java.util.List;
...
@@ -16,82 +16,84 @@ import java.util.List;
/**
/**
* @description:
* @description: 消费记录
* 消费记录
* @author: wangxk
* @author: wangxk
* @date: 2020/8/3
* @date: 2020/8/3
*/
*/
public
abstract
class
AbstractTagConsumRecordFilter
implements
BaseTagFilter
{
public
abstract
class
AbstractTagConsumRecordFilter
implements
BaseTagFilter
{
DataSourceHive
dataSourceHiveOrder
=
new
DataSourceHive
(
ConstantUtil
.
DWD_GIC_TRD_VIRTUAL_WDORDER_D
);
DataSourceHive
dataSourceHiveOrder
=
new
DataSourceHive
(
ConstantUtil
.
DWD_GIC_TRD_VIRTUAL_WDORDER_D
);
DataSourceHive
dataSourceHiveOrderItem
=
new
DataSourceHive
(
ConstantUtil
.
DWD_GIC_TRD_VIRTUAL_ORDER_ITEM_D
);
DataSourceHive
dataSourceHiveOrderItem
=
new
DataSourceHive
(
ConstantUtil
.
DWD_GIC_TRD_VIRTUAL_ORDER_ITEM_D
);
protected
static
JavaRDD
<
TrdVirtualOrderBean
>
statisticsTypeHandle
(
JavaRDD
<
Tuple2
<
TrdVirtualOrderBean
,
Optional
<
Iterable
<
String
>>>>
orderRdd
,
AbstractFilterRequest
request
)
{
protected
static
JavaRDD
<
TrdVirtualOrderBean
>
statisticsTypeHandle
(
JavaRDD
<
Tuple2
<
TrdVirtualOrderBean
,
Optional
<
Iterable
<
String
>>>>
orderRdd
,
AbstractFilterRequest
request
)
{
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
orderRdd
.
mapPartitions
(
data
->
{
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
orderRdd
.
mapPartitions
(
data
->
{
List
<
TrdVirtualOrderBean
>
result
=
new
ArrayList
();
List
<
TrdVirtualOrderBean
>
result
=
new
ArrayList
();
while
(
data
.
hasNext
())
{
while
(
data
.
hasNext
())
{
Tuple2
<
TrdVirtualOrderBean
,
Optional
<
Iterable
<
String
>>>
tp2
=
data
.
next
();
Tuple2
<
TrdVirtualOrderBean
,
Optional
<
Iterable
<
String
>>>
tp2
=
data
.
next
();
TrdVirtualOrderBean
consumeRecordBean
=
tp2
.
_1
();
TrdVirtualOrderBean
consumeRecordBean
=
tp2
.
_1
();
switch
(
request
.
getStatisticsType
())
{
switch
(
request
.
getStatisticsType
())
{
case
COMMODITYBRAND:
case
COMMODITYBRAND:
if
(
tp2
.
_2
().
isPresent
())
{
if
(
tp2
.
_2
().
isPresent
())
{
for
(
String
ent_brand_id:
tp2
.
_2
().
get
())
{
for
(
String
ent_brand_id
:
tp2
.
_2
().
get
())
{
if
(
request
.
getStatisticsValList
().
contains
(
ent_brand_id
))
{
if
(
request
.
getStatisticsValList
().
contains
(
ent_brand_id
))
{
result
.
add
(
consumeRecordBean
);
result
.
add
(
consumeRecordBean
);
break
;
break
;
}
}
}
}
}
}
break
;
break
;
case
CHANNEL:
case
CHANNEL:
if
(
request
.
getStatisticsValList
().
contains
(
String
.
valueOf
(
consumeRecordBean
.
getStore_info_id
()))){
if
(
request
.
getStatisticsValList
().
contains
(
String
.
valueOf
(
consumeRecordBean
.
getStore_info_id
())))
{
result
.
add
(
consumeRecordBean
);
result
.
add
(
consumeRecordBean
);
}
}
break
;
break
;
case
MCUINFO:
case
MCUINFO:
if
(
request
.
getStatisticsValList
().
contains
(
String
.
valueOf
(
consumeRecordBean
.
getArea_id
()))){
if
(
request
.
getStatisticsValList
().
contains
(
String
.
valueOf
(
consumeRecordBean
.
getArea_id
())))
{
result
.
add
(
consumeRecordBean
);
result
.
add
(
consumeRecordBean
);
}
}
break
;
break
;
default
:
break
;
default
:
}
break
;
}
}
return
result
.
iterator
();
}
});
return
result
.
iterator
();
});
return
consumeRecordRDD
;
return
consumeRecordRDD
;
}
}
protected
static
boolean
checkTime
(
AbstractFilterRequestTime
request
,
long
consumeTime
)
{
protected
static
boolean
checkTime
(
AbstractFilterRequestTime
request
,
long
consumeTime
)
{
Boolean
result
=
false
;
Boolean
result
=
false
;
switch
(
request
.
getTimeRangeType
()){
switch
(
request
.
getTimeRangeType
())
{
case
FIXATION:
case
FIXATION:
if
(
consumeTime
>=
request
.
getBeginTime
().
getTime
()
if
(
consumeTime
>=
request
.
getBeginTime
().
getTime
()
&&
consumeTime
<=
request
.
getEndTime
().
getTime
())
&&
consumeTime
<=
request
.
getEndTime
().
getTime
())
result
=
true
;
result
=
true
;
break
;
break
;
case
LATELY:
case
LATELY:
switch
(
request
.
getYearMonthDayType
()){
switch
(
request
.
getYearMonthDayType
())
{
case
DAY:
case
DAY:
if
(
consumeTime
>
DateUtil
.
addNumForDay
(
new
Date
(),-
request
.
getTimeNum
()).
getTime
())
{
if
(
consumeTime
>
DateUtil
.
addNumForDay
(
new
Date
(),
-
request
.
getTimeNum
()).
getTime
())
{
result
=
true
;
result
=
true
;
}
}
break
;
break
;
case
MONTH:
case
MONTH:
if
(
consumeTime
>
DateUtil
.
addNumForMonth
(
new
Date
(),-
request
.
getTimeNum
()).
getTime
())
{
if
(
consumeTime
>
DateUtil
.
addNumForMonth
(
new
Date
(),
-
request
.
getTimeNum
()).
getTime
())
{
result
=
true
;
result
=
true
;
}
}
break
;
break
;
case
YEAR:
case
YEAR:
if
(
consumeTime
>
DateUtil
.
addNumForYear
(
new
Date
(),-
request
.
getTimeNum
()).
getTime
())
{
if
(
consumeTime
>
DateUtil
.
addNumForYear
(
new
Date
(),
-
request
.
getTimeNum
()).
getTime
())
{
result
=
true
;
result
=
true
;
}
}
break
;
break
;
default
:
break
;
default
:
break
;
}
}
break
;
break
;
default
:
break
;
default
:
break
;
}
}
return
result
;
return
result
;
}
}
}
}
src/main/java/com/gic/spark/filter/TagAverageDiscountFactorFilter.java
View file @
742d13ef
...
@@ -6,7 +6,6 @@ import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
...
@@ -6,7 +6,6 @@ import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import
com.gic.spark.entity.bean.TrdEcuSalesLabelBean
;
import
com.gic.spark.entity.bean.TrdEcuSalesLabelBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeDoubleRequest
;
import
com.gic.spark.entity.request.TagConsumeDoubleRequest
;
import
com.gic.spark.entity.request.TagConsumeRequest
;
import
com.gic.spark.util.CommonUtil
;
import
com.gic.spark.util.CommonUtil
;
import
org.apache.spark.api.java.JavaRDD
;
import
org.apache.spark.api.java.JavaRDD
;
import
org.apache.spark.api.java.Optional
;
import
org.apache.spark.api.java.Optional
;
...
@@ -16,21 +15,23 @@ import java.util.ArrayList;
...
@@ -16,21 +15,23 @@ import java.util.ArrayList;
import
java.util.List
;
import
java.util.List
;
/**
/**
* @description:
* @description: 平均折扣率
* 平均折扣率
* @author: wangxk
* @author: wangxk
* @date: 2020/5/7
* @date: 2020/5/7
*/
*/
public
class
TagAverageDiscountFactorFilter
extends
AbstractTagConsumFilter
{
public
class
TagAverageDiscountFactorFilter
extends
AbstractTagConsumFilter
{
private
static
TagAverageDiscountFactorFilter
instance
;
private
static
TagAverageDiscountFactorFilter
instance
;
public
static
TagAverageDiscountFactorFilter
getInstance
()
{
public
static
TagAverageDiscountFactorFilter
getInstance
()
{
if
(
null
==
instance
)
{
if
(
null
==
instance
)
{
instance
=
new
TagAverageDiscountFactorFilter
();
instance
=
new
TagAverageDiscountFactorFilter
();
}
}
return
instance
;
return
instance
;
}
}
private
TagAverageDiscountFactorFilter
(){}
private
TagAverageDiscountFactorFilter
()
{
}
@Override
@Override
...
@@ -43,66 +44,67 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter {
...
@@ -43,66 +44,67 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter {
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeDoubleRequest
consumeRequest
=(
TagConsumeDoubleRequest
)
request
;
TagConsumeDoubleRequest
consumeRequest
=
(
TagConsumeDoubleRequest
)
request
;
JavaRDD
<
TrdEcuSalesLabelBean
>
salesLabelRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveSalesLabel
.
getDatasetByEntId
(
enterpriseId
),
TrdEcuSalesLabelBean
.
class
).
javaRDD
();
JavaRDD
<
TrdEcuSalesLabelBean
>
salesLabelRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveSalesLabel
.
getDatasetByEntId
(
enterpriseId
),
TrdEcuSalesLabelBean
.
class
).
javaRDD
();
JavaRDD
<
TrdEcuBrandLabelBean
>
brandLabelRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveBrandLabel
.
getDatasetByEntId
(
enterpriseId
),
TrdEcuBrandLabelBean
.
class
).
javaRDD
();
JavaRDD
<
TrdEcuBrandLabelBean
>
brandLabelRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveBrandLabel
.
getDatasetByEntId
(
enterpriseId
),
TrdEcuBrandLabelBean
.
class
).
javaRDD
();
JavaRDD
<
Tuple2
<
TrdEcuSalesLabelBean
,
Optional
<
Iterable
<
TrdEcuBrandLabelBean
>>>>
labelRDD
=
salesLabelRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
))
JavaRDD
<
Tuple2
<
TrdEcuSalesLabelBean
,
Optional
<
Iterable
<
TrdEcuBrandLabelBean
>>>>
labelRDD
=
salesLabelRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
))
.
leftOuterJoin
(
brandLabelRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
)).
groupByKey
())
.
leftOuterJoin
(
brandLabelRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
)).
groupByKey
())
.
map
(
data
->
data
.
_2
());
.
map
(
data
->
data
.
_2
());
JavaRDD
<
TrdEcuSalesLabelBean
>
consumeRDD
=
statisticsTypeHandle
(
labelRDD
,
consumeRequest
);
JavaRDD
<
TrdEcuSalesLabelBean
>
consumeRDD
=
statisticsTypeHandle
(
labelRDD
,
consumeRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
JavaRDD
<
Long
>
ecuRdd
=
consumeRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
))
JavaRDD
<
Long
>
ecuRdd
=
consumeRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
))
.
reduceByKey
((
x
,
y
)->{
.
reduceByKey
((
x
,
y
)
->
{
x
.
setReceive_amt
(
x
.
getReceive_amt
()+
y
.
getReceive_amt
());
x
.
setReceive_amt
(
x
.
getReceive_amt
()
+
y
.
getReceive_amt
());
x
.
setPay_amt
(
x
.
getPay_amt
()+
y
.
getPay_amt
());
x
.
setPay_amt
(
x
.
getPay_amt
()
+
y
.
getPay_amt
());
x
.
setTotal_amt
(
x
.
getTotal_amt
()+
y
.
getTotal_amt
());
x
.
setTotal_amt
(
x
.
getTotal_amt
()
+
y
.
getTotal_amt
());
return
x
;
return
x
;
})
})
.
mapPartitions
(
data
->{
.
mapPartitions
(
data
->
{
List
<
Long
>
result
=
new
ArrayList
();
List
<
Long
>
result
=
new
ArrayList
();
while
(
data
.
hasNext
()){
while
(
data
.
hasNext
())
{
Tuple2
<
Long
,
TrdEcuSalesLabelBean
>
tp2
=
data
.
next
();
Tuple2
<
Long
,
TrdEcuSalesLabelBean
>
tp2
=
data
.
next
();
double
avgDiscountRate
=
1
==
configStatus
?
CommonUtil
.
isEmptyDouble2double
(
tp2
.
_2
().
getPay_amt
())/
CommonUtil
.
isEmptyDouble2double
(
tp2
.
_2
().
getTotal_amt
())
double
avgDiscountRate
=
1
==
configStatus
?
CommonUtil
.
isEmptyDouble2double
(
tp2
.
_2
().
getPay_amt
())
/
CommonUtil
.
isEmptyDouble2double
(
tp2
.
_2
().
getTotal_amt
())
:
CommonUtil
.
isEmptyDouble2double
(
tp2
.
_2
().
getReceive_amt
())/
CommonUtil
.
isEmptyDouble2double
(
tp2
.
_2
().
getTotal_amt
());
:
CommonUtil
.
isEmptyDouble2double
(
tp2
.
_2
().
getReceive_amt
())
/
CommonUtil
.
isEmptyDouble2double
(
tp2
.
_2
().
getTotal_amt
());
switch
(
consumeRequest
.
getNumberType
()){
switch
(
consumeRequest
.
getNumberType
())
{
case
gt:
case
gt:
if
(
avgDiscountRate
>
consumeRequest
.
getBeginNum
()){
if
(
avgDiscountRate
>
consumeRequest
.
getBeginNum
())
{
result
.
add
(
tp2
.
_1
());
result
.
add
(
tp2
.
_1
());
}
}
break
;
break
;
case
gte:
case
gte:
if
(
avgDiscountRate
>=
consumeRequest
.
getBeginNum
()){
if
(
avgDiscountRate
>=
consumeRequest
.
getBeginNum
())
{
result
.
add
(
tp2
.
_1
());
result
.
add
(
tp2
.
_1
());
}
}
break
;
break
;
case
lt:
case
lt:
if
(
avgDiscountRate
<
consumeRequest
.
getEndNum
()){
if
(
avgDiscountRate
<
consumeRequest
.
getEndNum
())
{
result
.
add
(
tp2
.
_1
());
result
.
add
(
tp2
.
_1
());
}
}
break
;
break
;
case
lte:
case
lte:
if
(
avgDiscountRate
<=
consumeRequest
.
getEndNum
()){
if
(
avgDiscountRate
<=
consumeRequest
.
getEndNum
())
{
result
.
add
(
tp2
.
_1
());
result
.
add
(
tp2
.
_1
());
}
}
break
;
break
;
case
eq:
case
eq:
if
(
avgDiscountRate
==
consumeRequest
.
getEqualNum
()){
if
(
avgDiscountRate
==
consumeRequest
.
getEqualNum
())
{
result
.
add
(
tp2
.
_1
());
result
.
add
(
tp2
.
_1
());
}
}
break
;
break
;
case
between:
case
between:
if
(
avgDiscountRate
>=
consumeRequest
.
getBeginNum
()
if
(
avgDiscountRate
>=
consumeRequest
.
getBeginNum
()
&&
avgDiscountRate
<=
consumeRequest
.
getEndNum
()){
&&
avgDiscountRate
<=
consumeRequest
.
getEndNum
())
{
result
.
add
(
tp2
.
_1
());
result
.
add
(
tp2
.
_1
());
}
}
default
:
break
;
default
:
}
break
;
}
}
return
result
.
iterator
();
}
});
return
result
.
iterator
();
});
return
ecuRdd
;
return
ecuRdd
;
}
}
}
}
src/main/java/com/gic/spark/filter/TagConsumeCommodityFilter.java
View file @
742d13ef
package
com
.
gic
.
spark
.
filter
;
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.entity.DataSourceHive
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TrdVirtualOrderBean
;
import
com.gic.spark.entity.bean.TrdVirtualOrderBean
;
import
com.gic.spark.entity.bean.TrdVirtualOrderItemBean
;
import
com.gic.spark.entity.bean.TrdVirtualOrderItemBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeCommodityRequest
;
import
com.gic.spark.entity.request.TagConsumeCommodityRequest
;
import
com.gic.spark.util.ConstantUtil
;
import
com.gic.spark.util.DateUtil
;
import
com.gic.spark.util.DateUtil
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.spark.api.java.JavaPairRDD
;
import
org.apache.spark.api.java.JavaPairRDD
;
...
@@ -25,14 +23,14 @@ import java.util.List;
...
@@ -25,14 +23,14 @@ import java.util.List;
* @author: wangxk
* @author: wangxk
* @date: 2020/8/12
* @date: 2020/8/12
*/
*/
public
class
TagConsumeCommodityFilter
extends
AbstractTagConsumRecordFilter
{
public
class
TagConsumeCommodityFilter
extends
AbstractTagConsumRecordFilter
{
private
static
TagConsumeCommodityFilter
instance
;
private
static
TagConsumeCommodityFilter
instance
;
public
static
TagConsumeCommodityFilter
getInstance
()
{
public
static
TagConsumeCommodityFilter
getInstance
()
{
if
(
null
==
instance
)
{
if
(
null
==
instance
)
{
instance
=
new
TagConsumeCommodityFilter
();
instance
=
new
TagConsumeCommodityFilter
();
}
}
return
instance
;
return
instance
;
}
}
...
@@ -47,37 +45,37 @@ public class TagConsumeCommodityFilter extends AbstractTagConsumRecordFilter{
...
@@ -47,37 +45,37 @@ public class TagConsumeCommodityFilter extends AbstractTagConsumRecordFilter{
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeCommodityRequest
commodityRequest
=(
TagConsumeCommodityRequest
)
request
;
TagConsumeCommodityRequest
commodityRequest
=
(
TagConsumeCommodityRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrder
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrder
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
Dataset
<
Row
>
OrderItemDS
=
dataSourceHiveOrderItem
.
getDatasetByEntId
(
enterpriseId
);
Dataset
<
Row
>
OrderItemDS
=
dataSourceHiveOrderItem
.
getDatasetByEntId
(
enterpriseId
);
JavaRDD
<
Tuple2
<
TrdVirtualOrderBean
,
Optional
<
Iterable
<
String
>>>>
orderAndItemRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
))
JavaRDD
<
Tuple2
<
TrdVirtualOrderBean
,
Optional
<
Iterable
<
String
>>>>
orderAndItemRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
))
.
leftOuterJoin
(
OrderItemDS
.
select
(
"virtual_order_id"
,
"ent_brand_id"
).
javaRDD
()
.
leftOuterJoin
(
OrderItemDS
.
select
(
"virtual_order_id"
,
"ent_brand_id"
).
javaRDD
()
.
mapToPair
(
row
->
Tuple2
.
apply
(
row
.
getLong
(
0
),
row
.
getString
(
1
)))
.
mapToPair
(
row
->
Tuple2
.
apply
(
row
.
getLong
(
0
),
row
.
getString
(
1
)))
.
groupByKey
())
.
groupByKey
())
.
map
(
data
->
data
.
_2
());
.
map
(
data
->
data
.
_2
());
consumeRecordRDD
=
statisticsTypeHandle
(
orderAndItemRdd
,
commodityRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
orderAndItemRdd
,
commodityRequest
);
JavaPairRDD
<
Long
,
Long
>
orderRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
JavaPairRDD
<
Long
,
Long
>
orderRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
.
filter
(
data
->
checkTime
(
commodityRequest
,
DateUtil
.
strToDate
(
data
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATETIME_19
).
getTime
()))
.
filter
(
data
->
checkTime
(
commodityRequest
,
DateUtil
.
strToDate
(
data
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATETIME_19
).
getTime
()))
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
.
getEcu_id
()));
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
.
getEcu_id
()));
JavaPairRDD
<
Long
,
Long
>
orderItemRDD
=
MysqlRddManager
.
getPojoFromDataset
(
OrderItemDS
,
TrdVirtualOrderItemBean
.
class
).
javaRDD
()
JavaPairRDD
<
Long
,
Long
>
orderItemRDD
=
MysqlRddManager
.
getPojoFromDataset
(
OrderItemDS
,
TrdVirtualOrderItemBean
.
class
).
javaRDD
()
.
filter
(
data
->
{
.
filter
(
data
->
{
if
(
StringUtils
.
isNotEmpty
(
data
.
getSku_code
())
if
(
StringUtils
.
isNotEmpty
(
data
.
getSku_code
())
&&
commodityRequest
.
getSkuCodeList
().
contains
(
data
.
getSku_code
()))
{
&&
commodityRequest
.
getSkuCodeList
().
contains
(
data
.
getSku_code
()))
{
return
true
;
return
true
;
}
}
return
false
;
return
false
;
}).
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_order_id
(),
data
.
getVirtual_order_id
()))
}).
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_order_id
(),
data
.
getVirtual_order_id
()))
.
reduceByKey
((
x
,
y
)->
x
);
.
reduceByKey
((
x
,
y
)
->
x
);
JavaRDD
<
Long
>
ecuRdd
=
orderRdd
.
leftOuterJoin
(
orderItemRDD
)
JavaRDD
<
Long
>
ecuRdd
=
orderRdd
.
leftOuterJoin
(
orderItemRDD
)
.
filter
(
data
->
data
.
_2
().
_2
().
isPresent
())
.
filter
(
data
->
data
.
_2
().
_2
().
isPresent
())
.
map
(
data
->
data
.
_2
().
_1
()).
distinct
();
.
map
(
data
->
data
.
_2
().
_1
()).
distinct
();
return
ecuRdd
;
return
ecuRdd
;
}
}
}
}
src/main/java/com/gic/spark/filter/TagCouponFilter.java
View file @
742d13ef
...
@@ -4,12 +4,13 @@ import com.gic.spark.datasource.entity.DataSourceEntity;
...
@@ -4,12 +4,13 @@ import com.gic.spark.datasource.entity.DataSourceEntity;
import
com.gic.spark.datasource.entity.DataSourceSharding
;
import
com.gic.spark.datasource.entity.DataSourceSharding
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TagCouponBean
;
import
com.gic.spark.entity.bean.TagCouponBean
;
import
com.gic.spark.entity.request.TagCouponRequest
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagCouponRequest
;
import
com.gic.spark.util.AppEnvUtil
;
import
com.gic.spark.util.AppEnvUtil
;
import
com.gic.spark.util.ConstantUtil
;
import
com.gic.spark.util.ConstantUtil
;
import
org.apache.spark.api.java.JavaRDD
;
import
org.apache.spark.api.java.JavaRDD
;
import
org.apache.spark.sql.Column
;
import
org.apache.spark.sql.Column
;
import
java.util.ArrayList
;
import
java.util.ArrayList
;
import
java.util.HashSet
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.List
;
...
@@ -22,15 +23,18 @@ import java.util.Set;
...
@@ -22,15 +23,18 @@ import java.util.Set;
*/
*/
public
class
TagCouponFilter
implements
BaseTagFilter
{
public
class
TagCouponFilter
implements
BaseTagFilter
{
private
DataSourceSharding
dataSourceSharding
=
new
DataSourceSharding
(
AppEnvUtil
.
MARKETING_SHARDING
,
ConstantUtil
.
TAB_COUPON_LOG
);
private
DataSourceSharding
dataSourceSharding
=
new
DataSourceSharding
(
AppEnvUtil
.
MARKETING_SHARDING
,
ConstantUtil
.
TAB_COUPON_LOG
);
private
static
TagCouponFilter
instance
;
private
static
TagCouponFilter
instance
;
public
static
TagCouponFilter
getInstance
(){
if
(
null
==
instance
){
public
static
TagCouponFilter
getInstance
()
{
instance
=
new
TagCouponFilter
();
if
(
null
==
instance
)
{
instance
=
new
TagCouponFilter
();
}
}
return
instance
;
return
instance
;
}
}
private
TagCouponFilter
(){}
private
TagCouponFilter
()
{
}
@Override
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
...
@@ -41,57 +45,58 @@ public class TagCouponFilter implements BaseTagFilter {
...
@@ -41,57 +45,58 @@ public class TagCouponFilter implements BaseTagFilter {
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagCouponRequest
couponRequest
=(
TagCouponRequest
)
request
;
TagCouponRequest
couponRequest
=
(
TagCouponRequest
)
request
;
JavaRDD
<
TagCouponBean
>
couponBeanRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceSharding
.
getDatasetByEnterpriseId
(
enterpriseId
)
JavaRDD
<
TagCouponBean
>
couponBeanRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceSharding
.
getDatasetByEnterpriseId
(
enterpriseId
)
.
filter
(
new
Column
(
"status"
).
isNotNull
()),
TagCouponBean
.
class
).
javaRDD
()
.
filter
(
new
Column
(
"status"
).
isNotNull
()),
TagCouponBean
.
class
).
javaRDD
()
.
filter
(
data
->
couponRequest
.
getCouponList
().
contains
(
data
.
getCoupon_id
()));
.
filter
(
data
->
couponRequest
.
getCouponList
().
contains
(
data
.
getCoupon_id
()));
JavaRDD
<
Long
>
ecuRDD
=
couponBeanRDD
.
mapPartitions
(
data
->{
JavaRDD
<
Long
>
ecuRDD
=
couponBeanRDD
.
mapPartitions
(
data
->
{
Set
<
Long
>
result
=
new
HashSet
();
Set
<
Long
>
result
=
new
HashSet
();
while
(
data
.
hasNext
()){
while
(
data
.
hasNext
())
{
TagCouponBean
couponBean
=
data
.
next
();
TagCouponBean
couponBean
=
data
.
next
();
switch
(
couponRequest
.
getCouponType
()){
switch
(
couponRequest
.
getCouponType
())
{
case
UNCLAIMED:
case
UNCLAIMED:
if
(
couponBean
.
getStatus
()==
3
){
if
(
couponBean
.
getStatus
()
==
3
)
{
result
.
add
(
couponBean
.
getEcu_Id
());
result
.
add
(
couponBean
.
getEcu_Id
());
}
}
break
;
break
;
case
NO_CANCEL:
case
NO_CANCEL:
if
(
couponBean
.
getStatus
()==
4
if
(
couponBean
.
getStatus
()
==
4
&&
couponBean
.
getEffect_End_Time
().
getTime
()>
System
.
currentTimeMillis
()){
&&
couponBean
.
getEffect_End_Time
().
getTime
()
>
System
.
currentTimeMillis
())
{
result
.
add
(
couponBean
.
getEcu_Id
());
result
.
add
(
couponBean
.
getEcu_Id
());
}
}
break
;
break
;
case
NO_CANCEL_EXPIRES:
case
NO_CANCEL_EXPIRES:
if
(
couponBean
.
getStatus
()==
4
if
(
couponBean
.
getStatus
()
==
4
&&
couponBean
.
getEffect_End_Time
().
getTime
()<
System
.
currentTimeMillis
()){
&&
couponBean
.
getEffect_End_Time
().
getTime
()
<
System
.
currentTimeMillis
())
{
result
.
add
(
couponBean
.
getEcu_Id
());
result
.
add
(
couponBean
.
getEcu_Id
());
}
}
break
;
break
;
case
CANCEL:
case
CANCEL:
if
(
couponBean
.
getStatus
()==
7
){
if
(
couponBean
.
getStatus
()
==
7
)
{
result
.
add
(
couponBean
.
getEcu_Id
());
result
.
add
(
couponBean
.
getEcu_Id
());
}
}
break
;
break
;
case
OCCUPY:
case
OCCUPY:
if
(
couponBean
.
getStatus
()==
8
){
if
(
couponBean
.
getStatus
()
==
8
)
{
result
.
add
(
couponBean
.
getEcu_Id
());
result
.
add
(
couponBean
.
getEcu_Id
());
}
}
break
;
break
;
case
DONATION_IN:
case
DONATION_IN:
if
(
couponBean
.
getStatus
()==
9
){
if
(
couponBean
.
getStatus
()
==
9
)
{
result
.
add
(
couponBean
.
getEcu_Id
());
result
.
add
(
couponBean
.
getEcu_Id
());
}
}
break
;
break
;
case
YET_DONATION:
case
YET_DONATION:
if
(
couponBean
.
getStatus
()==
10
){
if
(
couponBean
.
getStatus
()
==
10
)
{
result
.
add
(
couponBean
.
getEcu_Id
());
result
.
add
(
couponBean
.
getEcu_Id
());
}
}
break
;
break
;
default
:
break
;
default
:
}
break
;
}
}
return
result
.
iterator
();
}
});
return
result
.
iterator
();
});
return
ecuRDD
;
return
ecuRDD
;
}
}
...
...
src/main/java/com/gic/spark/filter/TagCurrentCouponNumFilter.java
View file @
742d13ef
...
@@ -4,8 +4,8 @@ import com.gic.spark.datasource.entity.DataSourceEntity;
...
@@ -4,8 +4,8 @@ import com.gic.spark.datasource.entity.DataSourceEntity;
import
com.gic.spark.datasource.entity.DataSourceSharding
;
import
com.gic.spark.datasource.entity.DataSourceSharding
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TagCouponBean
;
import
com.gic.spark.entity.bean.TagCouponBean
;
import
com.gic.spark.entity.request.TagCouponRequest
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagCouponRequest
;
import
com.gic.spark.util.AppEnvUtil
;
import
com.gic.spark.util.AppEnvUtil
;
import
com.gic.spark.util.ConstantUtil
;
import
com.gic.spark.util.ConstantUtil
;
import
org.apache.spark.api.java.JavaRDD
;
import
org.apache.spark.api.java.JavaRDD
;
...
@@ -17,121 +17,127 @@ import java.util.List;
...
@@ -17,121 +17,127 @@ import java.util.List;
import
java.util.Set
;
import
java.util.Set
;
/**
/**
* @description:
* @description: 当前卡券数
* 当前卡券数
* @author: wangxk
* @author: wangxk
* @date: 2020/4/20
* @date: 2020/4/20
*/
*/
public
class
TagCurrentCouponNumFilter
implements
BaseTagFilter
{
public
class
TagCurrentCouponNumFilter
implements
BaseTagFilter
{
private
DataSourceSharding
dataSourceSharding
=
new
DataSourceSharding
(
AppEnvUtil
.
MARKETING_SHARDING
,
ConstantUtil
.
TAB_COUPON_LOG
);
private
DataSourceSharding
dataSourceSharding
=
new
DataSourceSharding
(
AppEnvUtil
.
MARKETING_SHARDING
,
ConstantUtil
.
TAB_COUPON_LOG
);
private
static
TagCurrentCouponNumFilter
instance
;
private
static
TagCurrentCouponNumFilter
instance
;
public
static
TagCurrentCouponNumFilter
getInstance
()
{
public
static
TagCurrentCouponNumFilter
getInstance
()
{
if
(
null
==
instance
)
{
if
(
null
==
instance
)
{
instance
=
new
TagCurrentCouponNumFilter
();
instance
=
new
TagCurrentCouponNumFilter
();
}
}
return
instance
;
return
instance
;
}
}
private
TagCurrentCouponNumFilter
(){};
private
TagCurrentCouponNumFilter
()
{
}
;
@Override
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
List
<
DataSourceEntity
>
result
=
new
ArrayList
();
List
<
DataSourceEntity
>
result
=
new
ArrayList
();
System
.
out
.
println
(
"dataSourceSharding==>"
+
dataSourceSharding
);
System
.
out
.
println
(
"dataSourceSharding==>"
+
dataSourceSharding
);
result
.
add
(
dataSourceSharding
);
result
.
add
(
dataSourceSharding
);
return
result
;
return
result
;
}
}
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagCouponRequest
couponRequest
=(
TagCouponRequest
)
request
;
TagCouponRequest
couponRequest
=
(
TagCouponRequest
)
request
;
JavaRDD
<
TagCouponBean
>
couponBeanRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceSharding
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagCouponBean
.
class
).
javaRDD
()
JavaRDD
<
TagCouponBean
>
couponBeanRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceSharding
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagCouponBean
.
class
).
javaRDD
()
.
filter
(
data
->{
.
filter
(
data
->
{
if
(
null
!=
data
.
getStatus
()
if
(
null
!=
data
.
getStatus
()
&&
null
!=
data
.
getEffect_End_Time
()){
&&
null
!=
data
.
getEffect_End_Time
())
{
return
(
data
.
getStatus
()==
3
||
data
.
getStatus
()==
4
)
return
(
data
.
getStatus
()
==
3
||
data
.
getStatus
()
==
4
)
&&
data
.
getEffect_End_Time
().
getTime
()>
System
.
currentTimeMillis
();
&&
data
.
getEffect_End_Time
().
getTime
()
>
System
.
currentTimeMillis
();
}
}
return
false
;
return
false
;
});
});
JavaRDD
<
Long
>
ecuRDD
=
couponBeanRDD
.
mapPartitionsToPair
(
data
->{
JavaRDD
<
Long
>
ecuRDD
=
couponBeanRDD
.
mapPartitionsToPair
(
data
->
{
List
<
Tuple2
<
String
,
Integer
>>
result
=
new
ArrayList
();
List
<
Tuple2
<
String
,
Integer
>>
result
=
new
ArrayList
();
while
(
data
.
hasNext
()){
while
(
data
.
hasNext
())
{
TagCouponBean
couponBean
=
data
.
next
();
TagCouponBean
couponBean
=
data
.
next
();
switch
(
couponRequest
.
getDomainType
()){
switch
(
couponRequest
.
getDomainType
())
{
case
ECU_INFO:
case
ECU_INFO:
result
.
add
(
Tuple2
.
apply
(
couponBean
.
getEcu_Id
().
toString
(),
1
));
result
.
add
(
Tuple2
.
apply
(
couponBean
.
getEcu_Id
().
toString
(),
1
));
break
;
break
;
case
ACU_INFO:
case
ACU_INFO:
if
(
null
!=
couponBean
.
getAcu_Id
()&&
couponRequest
.
getCuVals
().
contains
(
couponBean
.
getAcu_Id
())){
if
(
null
!=
couponBean
.
getAcu_Id
()
&&
couponRequest
.
getCuVals
().
contains
(
couponBean
.
getAcu_Id
()))
{
result
.
add
(
Tuple2
.
apply
(
couponBean
.
getAcu_Id
()+
"&"
+
couponBean
.
getEcu_Id
(),
1
));
result
.
add
(
Tuple2
.
apply
(
couponBean
.
getAcu_Id
()
+
"&"
+
couponBean
.
getEcu_Id
(),
1
));
}
}
break
;
break
;
case
SCU_INFO:
case
SCU_INFO:
if
(
null
!=
couponBean
.
getAcu_Id
()&&
couponRequest
.
getCuVals
().
contains
(
couponBean
.
getScu_Id
())){
if
(
null
!=
couponBean
.
getAcu_Id
()
&&
couponRequest
.
getCuVals
().
contains
(
couponBean
.
getScu_Id
()))
{
result
.
add
(
Tuple2
.
apply
(
couponBean
.
getScu_Id
()+
"&"
+
couponBean
.
getEcu_Id
(),
1
));
result
.
add
(
Tuple2
.
apply
(
couponBean
.
getScu_Id
()
+
"&"
+
couponBean
.
getEcu_Id
(),
1
));
}
}
break
;
break
;
case
MCU_INFO:
case
MCU_INFO:
if
(
null
!=
couponBean
.
getMcu_Id
()&&
couponRequest
.
getCuVals
().
contains
(
couponBean
.
getMcu_Id
())){
if
(
null
!=
couponBean
.
getMcu_Id
()
&&
couponRequest
.
getCuVals
().
contains
(
couponBean
.
getMcu_Id
()))
{
result
.
add
(
Tuple2
.
apply
(
couponBean
.
getMcu_Id
()+
"&"
+
couponBean
.
getEcu_Id
(),
1
));
result
.
add
(
Tuple2
.
apply
(
couponBean
.
getMcu_Id
()
+
"&"
+
couponBean
.
getEcu_Id
(),
1
));
}
}
break
;
break
;
default
:
break
;
default
:
}
break
;
}
}
return
result
.
iterator
();
}
}).
reduceByKey
((
x
,
y
)->
x
+
y
)
return
result
.
iterator
();
.
mapPartitions
(
data
->{
}).
reduceByKey
((
x
,
y
)
->
x
+
y
)
Set
<
Long
>
result
=
new
HashSet
();
.
mapPartitions
(
data
->
{
while
(
data
.
hasNext
()){
Set
<
Long
>
result
=
new
HashSet
();
Tuple2
<
String
,
Integer
>
tp2
=
data
.
next
();
while
(
data
.
hasNext
())
{
switch
(
couponRequest
.
getDomainType
()){
Tuple2
<
String
,
Integer
>
tp2
=
data
.
next
();
case
ECU_INFO:
switch
(
couponRequest
.
getDomainType
())
{
if
(
numberComputeHandle
(
couponRequest
,
tp2
.
_2
())){
case
ECU_INFO:
result
.
add
(
Long
.
parseLong
(
tp2
.
_1
()));
if
(
numberComputeHandle
(
couponRequest
,
tp2
.
_2
()))
{
}
result
.
add
(
Long
.
parseLong
(
tp2
.
_1
()));
break
;
}
default
:
break
;
if
(
numberComputeHandle
(
couponRequest
,
tp2
.
_2
())){
default
:
result
.
add
(
Long
.
parseLong
(
tp2
.
_1
().
split
(
"&"
)[
1
]));
if
(
numberComputeHandle
(
couponRequest
,
tp2
.
_2
()))
{
}
result
.
add
(
Long
.
parseLong
(
tp2
.
_1
().
split
(
"&"
)[
1
]));
break
;
}
}
break
;
}
}
return
result
.
iterator
();
}
});
return
result
.
iterator
();
});
return
ecuRDD
;
return
ecuRDD
;
}
}
public
static
boolean
numberComputeHandle
(
TagCouponRequest
couponRequest
,
int
num
){
public
static
boolean
numberComputeHandle
(
TagCouponRequest
couponRequest
,
int
num
)
{
boolean
result
=
false
;
boolean
result
=
false
;
switch
(
couponRequest
.
getNumberType
()){
switch
(
couponRequest
.
getNumberType
())
{
case
eq:
case
eq:
result
=
num
==
couponRequest
.
getEqualNum
();
result
=
num
==
couponRequest
.
getEqualNum
();
break
;
break
;
case
lt:
case
lt:
result
=
num
<
couponRequest
.
getEndNum
();
result
=
num
<
couponRequest
.
getEndNum
();
break
;
break
;
case
lte:
case
lte:
result
=
num
<=
couponRequest
.
getEndNum
();
result
=
num
<=
couponRequest
.
getEndNum
();
break
;
break
;
case
gt:
case
gt:
result
=
num
>
couponRequest
.
getBeginNum
();
result
=
num
>
couponRequest
.
getBeginNum
();
break
;
break
;
case
gte:
case
gte:
result
=
num
>=
couponRequest
.
getBeginNum
();
result
=
num
>=
couponRequest
.
getBeginNum
();
break
;
break
;
case
between:
case
between:
result
=
num
>=
couponRequest
.
getBeginNum
()&&
num
<=
couponRequest
.
getEndNum
();
result
=
num
>=
couponRequest
.
getBeginNum
()
&&
num
<=
couponRequest
.
getEndNum
();
break
;
default
:
break
;
break
;
default
:
break
;
}
}
return
result
;
return
result
;
}
}
...
...
src/main/java/com/gic/spark/filter/TagFirstConsumptionMoneyFilter.java
View file @
742d13ef
...
@@ -7,12 +7,15 @@ import com.gic.spark.entity.request.AbstractFilterRequest;
...
@@ -7,12 +7,15 @@ import com.gic.spark.entity.request.AbstractFilterRequest;
import
com.gic.spark.entity.request.TagConsumeAmountRequest
;
import
com.gic.spark.entity.request.TagConsumeAmountRequest
;
import
com.gic.spark.util.CommonUtil
;
import
com.gic.spark.util.CommonUtil
;
import
com.gic.spark.util.DateUtil
;
import
com.gic.spark.util.DateUtil
;
import
org.apache.spark.api.java.
*
;
import
org.apache.spark.api.java.
JavaRDD
;
import
org.apache.spark.api.java.Optional
;
import
org.apache.spark.api.java.Optional
;
import
org.apache.spark.sql.Row
;
import
org.apache.spark.sql.Row
;
import
scala.Tuple2
;
import
scala.Tuple2
;
import
java.util.*
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
/**
/**
* @description:
* @description:
...
@@ -24,13 +27,15 @@ public class TagFirstConsumptionMoneyFilter extends AbstractTagConsumRecordFilte
...
@@ -24,13 +27,15 @@ public class TagFirstConsumptionMoneyFilter extends AbstractTagConsumRecordFilte
private
static
TagFirstConsumptionMoneyFilter
instance
;
private
static
TagFirstConsumptionMoneyFilter
instance
;
public
static
TagFirstConsumptionMoneyFilter
getInstance
()
{
public
static
TagFirstConsumptionMoneyFilter
getInstance
()
{
if
(
null
==
instance
)
{
if
(
null
==
instance
)
{
instance
=
new
TagFirstConsumptionMoneyFilter
();
instance
=
new
TagFirstConsumptionMoneyFilter
();
}
}
return
instance
;
return
instance
;
}
}
private
TagFirstConsumptionMoneyFilter
(){}
private
TagFirstConsumptionMoneyFilter
()
{
}
@Override
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
List
<
DataSourceEntity
>
result
=
new
ArrayList
();
List
<
DataSourceEntity
>
result
=
new
ArrayList
();
...
@@ -41,94 +46,95 @@ public class TagFirstConsumptionMoneyFilter extends AbstractTagConsumRecordFilte
...
@@ -41,94 +46,95 @@ public class TagFirstConsumptionMoneyFilter extends AbstractTagConsumRecordFilte
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
TagConsumeAmountRequest
consumeAmountRequest
=
(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrder
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrder
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
Row
>
virtualOrderItemRdd
=
dataSourceHiveOrderItem
.
getDatasetByEntId
(
enterpriseId
).
select
(
"virtual_order_id"
,
"ent_brand_id"
).
javaRDD
();
JavaRDD
<
Row
>
virtualOrderItemRdd
=
dataSourceHiveOrderItem
.
getDatasetByEntId
(
enterpriseId
).
select
(
"virtual_order_id"
,
"ent_brand_id"
).
javaRDD
();
JavaRDD
<
Tuple2
<
TrdVirtualOrderBean
,
Optional
<
Iterable
<
String
>>>>
orderRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
))
JavaRDD
<
Tuple2
<
TrdVirtualOrderBean
,
Optional
<
Iterable
<
String
>>>>
orderRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
))
.
leftOuterJoin
(
virtualOrderItemRdd
.
mapToPair
(
row
->
Tuple2
.
apply
(
row
.
getLong
(
0
),
row
.
getString
(
1
))).
groupByKey
())
.
leftOuterJoin
(
virtualOrderItemRdd
.
mapToPair
(
row
->
Tuple2
.
apply
(
row
.
getLong
(
0
),
row
.
getString
(
1
))).
groupByKey
())
.
map
(
data
->
data
.
_2
());
.
map
(
data
->
data
.
_2
());
consumeRecordRDD
=
statisticsTypeHandle
(
orderRdd
,
consumeAmountRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
orderRdd
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
))
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
))
.
groupByKey
()
.
groupByKey
()
.
mapPartitions
(
data
->
{
.
mapPartitions
(
data
->
{
List
<
Long
>
result
=
new
ArrayList
();
List
<
Long
>
result
=
new
ArrayList
();
while
(
data
.
hasNext
())
{
while
(
data
.
hasNext
())
{
Tuple2
<
Long
,
Iterable
<
TrdVirtualOrderBean
>>
tp2
=
data
.
next
();
Tuple2
<
Long
,
Iterable
<
TrdVirtualOrderBean
>>
tp2
=
data
.
next
();
double
firstConsumAmount
=
0
;
double
firstConsumAmount
=
0
;
TrdVirtualOrderBean
firstConsumeAmountBean
=
null
;
TrdVirtualOrderBean
firstConsumeAmountBean
=
null
;
Map
<
String
,
TrdVirtualOrderBean
>
effectiveOrderMap
=
new
HashMap
();
Map
<
String
,
TrdVirtualOrderBean
>
effectiveOrderMap
=
new
HashMap
();
List
<
TrdVirtualOrderBean
>
noEffectiveOrderList
=
new
ArrayList
();
List
<
TrdVirtualOrderBean
>
noEffectiveOrderList
=
new
ArrayList
();
tp2
.
_2
().
forEach
(
element
->
{
tp2
.
_2
().
forEach
(
element
->
{
if
(
element
.
getPay_amt
()>
0
)
{
if
(
element
.
getPay_amt
()
>
0
)
{
effectiveOrderMap
.
put
(
element
.
getOorder_no
(),
element
);
effectiveOrderMap
.
put
(
element
.
getOorder_no
(),
element
);
}
else
{
}
else
{
noEffectiveOrderList
.
add
(
element
);
noEffectiveOrderList
.
add
(
element
);
}
}
});
});
noEffectiveOrderList
.
forEach
(
noEffectiveOrder
->
{
noEffectiveOrderList
.
forEach
(
noEffectiveOrder
->
{
TrdVirtualOrderBean
effectiveOrder
=
effectiveOrderMap
.
get
(
noEffectiveOrder
.
getOorder_no
());
TrdVirtualOrderBean
effectiveOrder
=
effectiveOrderMap
.
get
(
noEffectiveOrder
.
getOorder_no
());
if
(
noEffectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
).
equals
(
effectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
)))
{
if
(
noEffectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
).
equals
(
effectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
)))
{
effectiveOrder
.
setPaid_amt
(
effectiveOrder
.
getPaid_amt
()+
noEffectiveOrder
.
getPaid_amt
());
effectiveOrder
.
setPaid_amt
(
effectiveOrder
.
getPaid_amt
()
+
noEffectiveOrder
.
getPaid_amt
());
effectiveOrder
.
setPay_amt
(
effectiveOrder
.
getPay_amt
()+
noEffectiveOrder
.
getPay_amt
());
effectiveOrder
.
setPay_amt
(
effectiveOrder
.
getPay_amt
()
+
noEffectiveOrder
.
getPay_amt
());
}
}
});
});
for
(
TrdVirtualOrderBean
amountBean:
effectiveOrderMap
.
values
())
{
for
(
TrdVirtualOrderBean
amountBean
:
effectiveOrderMap
.
values
())
{
if
(
null
==
firstConsumeAmountBean
)
{
if
(
null
==
firstConsumeAmountBean
)
{
firstConsumeAmountBean
=
amountBean
;
firstConsumeAmountBean
=
amountBean
;
}
else
{
}
else
{
if
(
DateUtil
.
stringToDate
(
amountBean
.
getReceipts_time
()).
getTime
()
if
(
DateUtil
.
stringToDate
(
amountBean
.
getReceipts_time
()).
getTime
()
<
DateUtil
.
stringToDate
(
firstConsumeAmountBean
.
getReceipts_time
()).
getTime
())
{
<
DateUtil
.
stringToDate
(
firstConsumeAmountBean
.
getReceipts_time
()).
getTime
())
{
firstConsumeAmountBean
=
amountBean
;
firstConsumeAmountBean
=
amountBean
;
}
}
}
}
}
}
firstConsumAmount
=
null
!=
firstConsumeAmountBean
?
firstConsumAmount
=
null
!=
firstConsumeAmountBean
?
(
configStatus
==
1
?
firstConsumeAmountBean
.
getPaid_amt
():
firstConsumeAmountBean
.
getPay_amt
())
(
configStatus
==
1
?
firstConsumeAmountBean
.
getPaid_amt
()
:
firstConsumeAmountBean
.
getPay_amt
())
:
firstConsumAmount
;
:
firstConsumAmount
;
switch
(
consumeAmountRequest
.
getNumberType
()){
switch
(
consumeAmountRequest
.
getNumberType
())
{
case
between:
case
between:
if
(
firstConsumAmount
>=
consumeAmountRequest
.
getBeginNum
()
if
(
firstConsumAmount
>=
consumeAmountRequest
.
getBeginNum
()
&&
firstConsumAmount
<=
consumeAmountRequest
.
getEndNum
()){
&&
firstConsumAmount
<=
consumeAmountRequest
.
getEndNum
())
{
result
.
add
(
tp2
.
_1
());
result
.
add
(
tp2
.
_1
());
}
}
break
;
break
;
case
lt:
case
lt:
if
(
firstConsumAmount
<
consumeAmountRequest
.
getEndNum
()){
if
(
firstConsumAmount
<
consumeAmountRequest
.
getEndNum
())
{
result
.
add
(
tp2
.
_1
());
result
.
add
(
tp2
.
_1
());
}
}
break
;
break
;
case
gt:
case
gt:
if
(
firstConsumAmount
>
consumeAmountRequest
.
getBeginNum
()){
if
(
firstConsumAmount
>
consumeAmountRequest
.
getBeginNum
())
{
result
.
add
(
tp2
.
_1
());
result
.
add
(
tp2
.
_1
());
}
}
break
;
break
;
case
eq:
case
eq:
if
(
firstConsumAmount
==
consumeAmountRequest
.
getEqualNum
()){
if
(
firstConsumAmount
==
consumeAmountRequest
.
getEqualNum
())
{
result
.
add
(
tp2
.
_1
());
result
.
add
(
tp2
.
_1
());
}
}
break
;
break
;
case
lte:
case
lte:
if
(
firstConsumAmount
<=
consumeAmountRequest
.
getEndNum
()){
if
(
firstConsumAmount
<=
consumeAmountRequest
.
getEndNum
())
{
result
.
add
(
tp2
.
_1
());
result
.
add
(
tp2
.
_1
());
}
}
break
;
break
;
case
gte:
case
gte:
if
(
firstConsumAmount
>=
consumeAmountRequest
.
getBeginNum
()){
if
(
firstConsumAmount
>=
consumeAmountRequest
.
getBeginNum
())
{
result
.
add
(
tp2
.
_1
());
result
.
add
(
tp2
.
_1
());
}
}
break
;
break
;
default
:
break
;
default
:
}
break
;
}
}
return
result
.
iterator
();
}
});
return
result
.
iterator
();
});
return
ecuRdd
;
return
ecuRdd
;
}
}
}
}
src/main/java/com/gic/spark/filter/TagFirstOnlineConsumptionStoreFilter.java
View file @
742d13ef
...
@@ -20,18 +20,19 @@ import java.util.List;
...
@@ -20,18 +20,19 @@ import java.util.List;
* @author: wangxk
* @author: wangxk
* @date: 2020/8/11
* @date: 2020/8/11
*/
*/
public
class
TagFirstOnlineConsumptionStoreFilter
extends
AbstractTagConsumRecordFilter
{
public
class
TagFirstOnlineConsumptionStoreFilter
extends
AbstractTagConsumRecordFilter
{
private
static
TagFirstOnlineConsumptionStoreFilter
instance
;
private
static
TagFirstOnlineConsumptionStoreFilter
instance
;
public
static
TagFirstOnlineConsumptionStoreFilter
getInstance
()
{
public
static
TagFirstOnlineConsumptionStoreFilter
getInstance
()
{
if
(
null
==
instance
)
{
if
(
null
==
instance
)
{
instance
=
new
TagFirstOnlineConsumptionStoreFilter
();
instance
=
new
TagFirstOnlineConsumptionStoreFilter
();
}
}
return
instance
;
return
instance
;
}
}
private
TagFirstOnlineConsumptionStoreFilter
(){}
private
TagFirstOnlineConsumptionStoreFilter
()
{
}
@Override
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
...
@@ -43,30 +44,30 @@ public class TagFirstOnlineConsumptionStoreFilter extends AbstractTagConsumRecor
...
@@ -43,30 +44,30 @@ public class TagFirstOnlineConsumptionStoreFilter extends AbstractTagConsumRecor
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
TagConsumeStoreRequest
storeRequest
=
(
TagConsumeStoreRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrder
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrder
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
Row
>
virtualOrderItemRdd
=
dataSourceHiveOrderItem
.
getDatasetByEntId
(
enterpriseId
).
select
(
"virtual_order_id"
,
"ent_brand_id"
).
javaRDD
();
JavaRDD
<
Row
>
virtualOrderItemRdd
=
dataSourceHiveOrderItem
.
getDatasetByEntId
(
enterpriseId
).
select
(
"virtual_order_id"
,
"ent_brand_id"
).
javaRDD
();
JavaRDD
<
Tuple2
<
TrdVirtualOrderBean
,
Optional
<
Iterable
<
String
>>>>
orderRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
))
JavaRDD
<
Tuple2
<
TrdVirtualOrderBean
,
Optional
<
Iterable
<
String
>>>>
orderRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
))
.
leftOuterJoin
(
virtualOrderItemRdd
.
mapToPair
(
row
->
Tuple2
.
apply
(
row
.
getLong
(
0
),
row
.
getString
(
1
))).
groupByKey
())
.
leftOuterJoin
(
virtualOrderItemRdd
.
mapToPair
(
row
->
Tuple2
.
apply
(
row
.
getLong
(
0
),
row
.
getString
(
1
))).
groupByKey
())
.
map
(
data
->
data
.
_2
());
.
map
(
data
->
data
.
_2
());
consumeRecordRDD
=
statisticsTypeHandle
(
orderRdd
,
storeRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
orderRdd
,
storeRequest
);
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()!=
1
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()
!=
1
&&
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
())
&&
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
())
&&
null
!=
data
.
getShop_id
())
&&
null
!=
data
.
getShop_id
())
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
Tuple2
.
apply
(
data
.
getReceipts_time
(),
data
.
getShop_id
())))
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
Tuple2
.
apply
(
data
.
getReceipts_time
(),
data
.
getShop_id
())))
.
reduceByKey
((
x
,
y
)->
{
.
reduceByKey
((
x
,
y
)
->
{
if
(
DateUtil
.
strToDate
(
x
.
_1
(),
DateUtil
.
FORMAT_DATETIME_19
).
getTime
()
if
(
DateUtil
.
strToDate
(
x
.
_1
(),
DateUtil
.
FORMAT_DATETIME_19
).
getTime
()
<
DateUtil
.
strToDate
(
y
.
_1
(),
DateUtil
.
FORMAT_DATETIME_19
).
getTime
())
{
<
DateUtil
.
strToDate
(
y
.
_1
(),
DateUtil
.
FORMAT_DATETIME_19
).
getTime
())
{
return
x
;
return
x
;
}
else
{
}
else
{
return
y
;
return
y
;
}
}
}).
filter
(
data
->
storeRequest
.
getStoreList
().
contains
(
String
.
valueOf
(
data
.
_2
().
_2
())))
}).
filter
(
data
->
storeRequest
.
getStoreList
().
contains
(
String
.
valueOf
(
data
.
_2
().
_2
())))
.
map
(
data
->
data
.
_1
());
.
map
(
data
->
data
.
_1
());
return
ecuRdd
;
return
ecuRdd
;
}
}
}
}
src/main/java/com/gic/spark/filter/TagHistoryConsumeCommodityFilter.java
View file @
742d13ef
package
com
.
gic
.
spark
.
filter
;
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.entity.DataSourceHive
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TrdVirtualOrderBean
;
import
com.gic.spark.entity.bean.TrdVirtualOrderBean
;
import
com.gic.spark.entity.bean.TrdVirtualOrderItemBean
;
import
com.gic.spark.entity.bean.TrdVirtualOrderItemBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeCommodityRequest
;
import
com.gic.spark.entity.request.TagConsumeCommodityRequest
;
import
com.gic.spark.util.ConstantUtil
;
import
com.gic.spark.util.DateUtil
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.spark.api.java.JavaPairRDD
;
import
org.apache.spark.api.java.JavaRDD
;
import
org.apache.spark.api.java.JavaRDD
;
import
org.apache.spark.api.java.Optional
;
import
org.apache.spark.api.java.Optional
;
import
org.apache.spark.sql.Dataset
;
import
org.apache.spark.sql.Dataset
;
...
@@ -25,13 +21,13 @@ import java.util.List;
...
@@ -25,13 +21,13 @@ import java.util.List;
* @author: wangxk
* @author: wangxk
* @date: 2020/8/12
* @date: 2020/8/12
*/
*/
public
class
TagHistoryConsumeCommodityFilter
extends
AbstractTagConsumRecordFilter
{
public
class
TagHistoryConsumeCommodityFilter
extends
AbstractTagConsumRecordFilter
{
private
static
TagHistoryConsumeCommodityFilter
instance
;
private
static
TagHistoryConsumeCommodityFilter
instance
;
public
static
TagHistoryConsumeCommodityFilter
getInstance
()
{
public
static
TagHistoryConsumeCommodityFilter
getInstance
()
{
if
(
null
==
instance
)
{
if
(
null
==
instance
)
{
instance
=
new
TagHistoryConsumeCommodityFilter
();
instance
=
new
TagHistoryConsumeCommodityFilter
();
}
}
return
instance
;
return
instance
;
}
}
...
@@ -46,27 +42,27 @@ public class TagHistoryConsumeCommodityFilter extends AbstractTagConsumRecordFil
...
@@ -46,27 +42,27 @@ public class TagHistoryConsumeCommodityFilter extends AbstractTagConsumRecordFil
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeCommodityRequest
commodityRequest
=(
TagConsumeCommodityRequest
)
request
;
TagConsumeCommodityRequest
commodityRequest
=
(
TagConsumeCommodityRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrder
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrder
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
Dataset
<
Row
>
OrderItemDS
=
dataSourceHiveOrderItem
.
getDatasetByEntId
(
enterpriseId
);
Dataset
<
Row
>
OrderItemDS
=
dataSourceHiveOrderItem
.
getDatasetByEntId
(
enterpriseId
);
JavaRDD
<
Tuple2
<
TrdVirtualOrderBean
,
Optional
<
Iterable
<
String
>>>>
orderAndItemRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
))
JavaRDD
<
Tuple2
<
TrdVirtualOrderBean
,
Optional
<
Iterable
<
String
>>>>
orderAndItemRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
))
.
leftOuterJoin
(
OrderItemDS
.
select
(
"virtual_order_id"
,
"ent_brand_id"
).
javaRDD
()
.
leftOuterJoin
(
OrderItemDS
.
select
(
"virtual_order_id"
,
"ent_brand_id"
).
javaRDD
()
.
mapToPair
(
row
->
Tuple2
.
apply
(
row
.
getLong
(
0
),
row
.
getString
(
1
)))
.
mapToPair
(
row
->
Tuple2
.
apply
(
row
.
getLong
(
0
),
row
.
getString
(
1
)))
.
groupByKey
())
.
groupByKey
())
.
map
(
data
->
data
.
_2
());
.
map
(
data
->
data
.
_2
());
consumeRecordRDD
=
statisticsTypeHandle
(
orderAndItemRdd
,
commodityRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
orderAndItemRdd
,
commodityRequest
);
JavaRDD
<
TrdVirtualOrderItemBean
>
orderItemRDD
=
MysqlRddManager
.
getPojoFromDataset
(
OrderItemDS
,
TrdVirtualOrderItemBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderItemBean
>
orderItemRDD
=
MysqlRddManager
.
getPojoFromDataset
(
OrderItemDS
,
TrdVirtualOrderItemBean
.
class
).
javaRDD
();
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
.
getEcu_id
()))
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
.
getEcu_id
()))
.
leftOuterJoin
(
orderItemRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_order_id
(),
data
.
getSku_code
()))
.
leftOuterJoin
(
orderItemRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_order_id
(),
data
.
getSku_code
()))
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
_2
())
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
_2
())
&&
commodityRequest
.
getSkuCodeList
().
contains
(
data
.
_2
())))
&&
commodityRequest
.
getSkuCodeList
().
contains
(
data
.
_2
())))
.
filter
(
data
->
data
.
_2
().
_2
().
isPresent
())
.
filter
(
data
->
data
.
_2
().
_2
().
isPresent
())
.
map
(
data
->
data
.
_2
().
_1
())
.
map
(
data
->
data
.
_2
().
_1
())
.
distinct
();
.
distinct
();
return
ecuRdd
;
return
ecuRdd
;
}
}
}
}
src/main/java/com/gic/spark/filter/TagHistoryConsumeTotalFilter.java
View file @
742d13ef
...
@@ -19,18 +19,19 @@ import java.util.List;
...
@@ -19,18 +19,19 @@ import java.util.List;
* @author: wangxk
* @author: wangxk
* @date: 2020/8/6
* @date: 2020/8/6
*/
*/
public
class
TagHistoryConsumeTotalFilter
extends
AbstractTagConsumRecordFilter
{
public
class
TagHistoryConsumeTotalFilter
extends
AbstractTagConsumRecordFilter
{
private
static
TagHistoryConsumeTotalFilter
instance
;
private
static
TagHistoryConsumeTotalFilter
instance
;
public
static
TagHistoryConsumeTotalFilter
getInstance
()
{
public
static
TagHistoryConsumeTotalFilter
getInstance
()
{
if
(
null
==
instance
)
{
if
(
null
==
instance
)
{
instance
=
new
TagHistoryConsumeTotalFilter
();
instance
=
new
TagHistoryConsumeTotalFilter
();
}
}
return
instance
;
return
instance
;
}
}
private
TagHistoryConsumeTotalFilter
(){}
private
TagHistoryConsumeTotalFilter
()
{
}
@Override
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
...
@@ -42,58 +43,59 @@ public class TagHistoryConsumeTotalFilter extends AbstractTagConsumRecordFilter{
...
@@ -42,58 +43,59 @@ public class TagHistoryConsumeTotalFilter extends AbstractTagConsumRecordFilter{
@Override
@Override
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
public
JavaRDD
<
Long
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
TagConsumeAmountRequest
consumeAmountRequest
=
(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrder
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrder
.
getDatasetByEntId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
JavaRDD
<
Row
>
virtualOrderItemRdd
=
dataSourceHiveOrderItem
.
getDatasetByEntId
(
enterpriseId
).
select
(
"virtual_order_id"
,
"ent_brand_id"
).
javaRDD
();
JavaRDD
<
Row
>
virtualOrderItemRdd
=
dataSourceHiveOrderItem
.
getDatasetByEntId
(
enterpriseId
).
select
(
"virtual_order_id"
,
"ent_brand_id"
).
javaRDD
();
JavaRDD
<
Tuple2
<
TrdVirtualOrderBean
,
Optional
<
Iterable
<
String
>>>>
orderRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
))
JavaRDD
<
Tuple2
<
TrdVirtualOrderBean
,
Optional
<
Iterable
<
String
>>>>
orderRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
))
.
leftOuterJoin
(
virtualOrderItemRdd
.
mapToPair
(
row
->
Tuple2
.
apply
(
row
.
getLong
(
0
),
row
.
getString
(
1
))).
groupByKey
())
.
leftOuterJoin
(
virtualOrderItemRdd
.
mapToPair
(
row
->
Tuple2
.
apply
(
row
.
getLong
(
0
),
row
.
getString
(
1
))).
groupByKey
())
.
map
(
data
->
data
.
_2
());
.
map
(
data
->
data
.
_2
());
consumeRecordRDD
=
statisticsTypeHandle
(
orderRdd
,
consumeAmountRequest
);
consumeRecordRDD
=
statisticsTypeHandle
(
orderRdd
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
configStatus
==
1
?
data
.
getPaid_amt
():
data
.
getPay_amt
()))
JavaRDD
<
Long
>
ecuRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
configStatus
==
1
?
data
.
getPaid_amt
()
:
data
.
getPay_amt
()))
.
reduceByKey
((
x
,
y
)->
x
+
y
)
.
reduceByKey
((
x
,
y
)
->
x
+
y
)
.
filter
(
data
->{
.
filter
(
data
->
{
boolean
result
=
false
;
boolean
result
=
false
;
switch
(
consumeAmountRequest
.
getNumberType
()){
switch
(
consumeAmountRequest
.
getNumberType
())
{
case
between:
case
between:
if
(
data
.
_2
()>=
consumeAmountRequest
.
getBeginNum
()
if
(
data
.
_2
()
>=
consumeAmountRequest
.
getBeginNum
()
&&
data
.
_2
()<=
consumeAmountRequest
.
getEndNum
()){
&&
data
.
_2
()
<=
consumeAmountRequest
.
getEndNum
())
{
result
=
true
;
result
=
true
;
}
}
break
;
break
;
case
lt:
case
lt:
if
(
data
.
_2
()<
consumeAmountRequest
.
getEndNum
()){
if
(
data
.
_2
()
<
consumeAmountRequest
.
getEndNum
())
{
result
=
true
;
result
=
true
;
}
}
break
;
break
;
case
gt:
case
gt:
if
(
data
.
_2
()>
consumeAmountRequest
.
getBeginNum
()){
if
(
data
.
_2
()
>
consumeAmountRequest
.
getBeginNum
())
{
result
=
true
;
result
=
true
;
}
}
break
;
break
;
case
eq:
case
eq:
if
(
data
.
_2
()==
consumeAmountRequest
.
getEqualNum
()){
if
(
data
.
_2
()
==
consumeAmountRequest
.
getEqualNum
())
{
result
=
true
;
result
=
true
;
}
}
break
;
break
;
case
lte:
case
lte:
if
(
data
.
_2
()<=
consumeAmountRequest
.
getEndNum
()){
if
(
data
.
_2
()
<=
consumeAmountRequest
.
getEndNum
())
{
result
=
true
;
result
=
true
;
}
}
break
;
break
;
case
gte:
case
gte:
if
(
data
.
_2
()>=
consumeAmountRequest
.
getBeginNum
()){
if
(
data
.
_2
()
>=
consumeAmountRequest
.
getBeginNum
())
{
result
=
true
;
result
=
true
;
}
}
break
;
break
;
default
:
break
;
default
:
}
break
;
return
result
;
}
}).
map
(
data
->
data
.
_1
());
return
result
;
}).
map
(
data
->
data
.
_1
());
return
ecuRdd
;
return
ecuRdd
;
}
}
}
}
src/main/java/com/gic/spark/tag/TagProcessManager.java
View file @
742d13ef
...
@@ -16,13 +16,14 @@ import com.gic.spark.entity.TagGroupInfo;
...
@@ -16,13 +16,14 @@ import com.gic.spark.entity.TagGroupInfo;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.table.TabDataActuallyPaidConfig
;
import
com.gic.spark.entity.table.TabDataActuallyPaidConfig
;
import
com.gic.spark.entity.table.TabSceneCrowd
;
import
com.gic.spark.entity.table.TabSceneCrowd
;
import
com.gic.spark.filter.
*
;
import
com.gic.spark.filter.
BaseTagFilter
;
import
com.gic.spark.util.*
;
import
com.gic.spark.util.*
;
import
com.google.common.base.Joiner
;
import
com.google.common.base.Joiner
;
import
com.google.common.collect.Lists
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.spark.TaskContext
;
import
org.apache.spark.TaskContext
;
import
org.apache.spark.api.java.*
;
import
org.apache.spark.api.java.JavaPairRDD
;
import
org.apache.spark.api.java.JavaRDD
;
import
org.apache.spark.api.java.JavaSparkContext
;
import
org.apache.spark.api.java.Optional
;
import
org.apache.spark.api.java.Optional
;
import
org.apache.spark.sql.Column
;
import
org.apache.spark.sql.Column
;
import
org.apache.spark.sql.Dataset
;
import
org.apache.spark.sql.Dataset
;
...
@@ -46,8 +47,6 @@ import java.sql.SQLException;
...
@@ -46,8 +47,6 @@ import java.sql.SQLException;
import
java.sql.Timestamp
;
import
java.sql.Timestamp
;
import
java.util.*
;
import
java.util.*
;
import
static
com
.
gic
.
spark
.
datasource
.
mysql
.
MysqlDatasource
.
JDBC_OPTIONS
;
/**
/**
* @description:
* @description:
* @author: wangxk
* @author: wangxk
...
@@ -55,7 +54,7 @@ import static com.gic.spark.datasource.mysql.MysqlDatasource.JDBC_OPTIONS;
...
@@ -55,7 +54,7 @@ import static com.gic.spark.datasource.mysql.MysqlDatasource.JDBC_OPTIONS;
*/
*/
public
class
TagProcessManager
{
public
class
TagProcessManager
{
private
List
<
SceneCrowdDTO
>
sceneCrowdDTOList
=
new
ArrayList
();
private
List
<
SceneCrowdDTO
>
sceneCrowdDTOList
=
new
ArrayList
();
private
MysqlRddManager
member4RddManager
;
private
MysqlRddManager
member4RddManager
;
private
MysqlRddManager
enterprise4RddManager
;
private
MysqlRddManager
enterprise4RddManager
;
private
DataSourceSharding
memberSharding4Datasource
;
private
DataSourceSharding
memberSharding4Datasource
;
...
@@ -63,112 +62,110 @@ public class TagProcessManager {
...
@@ -63,112 +62,110 @@ public class TagProcessManager {
private
MysqlDatasource
enterprise4Datasource
=
null
;
private
MysqlDatasource
enterprise4Datasource
=
null
;
private
boolean
isProduction
;
private
boolean
isProduction
;
private
static
TagProcessManager
instance
;
private
static
TagProcessManager
instance
;
public
static
TagProcessManager
getInstance
(){
if
(
null
==
instance
){
public
static
TagProcessManager
getInstance
()
{
instance
=
new
TagProcessManager
();
if
(
null
==
instance
)
{
instance
=
new
TagProcessManager
();
}
}
return
instance
;
return
instance
;
}
}
private
TagProcessManager
(){}
private
TagProcessManager
()
{
}
public
boolean
isProduction
()
{
public
boolean
isProduction
()
{
return
isProduction
;
return
isProduction
;
}
}
public
void
init
(
boolean
isProd
){
public
void
init
(
boolean
isProd
)
{
this
.
isProduction
=
isProd
;
this
.
isProduction
=
isProd
;
if
(
isProduction
)
{
if
(
isProduction
)
{
enterprise4Datasource
=
SparkConfigManager
.
getInstance
().
getDatasource
(
"4.0-enterprise"
);
enterprise4Datasource
=
SparkConfigManager
.
getInstance
().
getDatasource
(
"4.0-enterprise"
);
member4Datasource
=
SparkConfigManager
.
getInstance
().
getDatasourceByIdAndSchema
(
"4.0-prodDs"
,
"gic_member_4_0"
);
member4Datasource
=
SparkConfigManager
.
getInstance
().
getDatasourceByIdAndSchema
(
"4.0-prodDs"
,
"gic_member_4_0"
);
}
else
{
}
else
{
member4Datasource
=
new
MysqlDatasource
();
member4Datasource
=
new
MysqlDatasource
();
member4Datasource
.
setJdbcUrl
(
MysqlDatasource
.
devDatasource
.
getJdbcUrl
().
replaceAll
(
"gic3_test"
,
"gic-member4.0"
));
member4Datasource
.
setJdbcUrl
(
MysqlDatasource
.
devDatasource
.
getJdbcUrl
().
replaceAll
(
"gic3_test"
,
"gic-member4.0"
));
member4Datasource
.
setUser
(
"cdb_outerroot"
);
member4Datasource
.
setUser
(
"cdb_outerroot"
);
member4Datasource
.
setPassword
(
"@09ui%sbc09"
);
member4Datasource
.
setPassword
(
"@09ui%sbc09"
);
enterprise4Datasource
=
new
MysqlDatasource
();
enterprise4Datasource
=
new
MysqlDatasource
();
enterprise4Datasource
.
setJdbcUrl
(
MysqlDatasource
.
devDatasource
.
getJdbcUrl
().
replaceAll
(
"gic3_test"
,
"gic-enterprise4.0"
));
enterprise4Datasource
.
setJdbcUrl
(
MysqlDatasource
.
devDatasource
.
getJdbcUrl
().
replaceAll
(
"gic3_test"
,
"gic-enterprise4.0"
));
enterprise4Datasource
.
setUser
(
"cdb_outerroot"
);
enterprise4Datasource
.
setUser
(
"cdb_outerroot"
);
enterprise4Datasource
.
setPassword
(
"@09ui%sbc09"
);
enterprise4Datasource
.
setPassword
(
"@09ui%sbc09"
);
}
}
member4RddManager
=
member4Datasource
.
buildRddManager
();
member4RddManager
=
member4Datasource
.
buildRddManager
();
enterprise4RddManager
=
enterprise4Datasource
.
buildRddManager
();
enterprise4RddManager
=
enterprise4Datasource
.
buildRddManager
();
memberSharding4Datasource
=
new
DataSourceSharding
(
AppEnvUtil
.
MEMBER_SHARDING_4
,
ConstantUtil
.
TAB_ENTERPRISE_USER
);
memberSharding4Datasource
=
new
DataSourceSharding
(
AppEnvUtil
.
MEMBER_SHARDING_4
,
ConstantUtil
.
TAB_ENTERPRISE_USER
);
List
<
TabSceneCrowd
>
sceneCrowdList
=
member4RddManager
.
getPojo
(
"tab_scene_crowd"
,
TabSceneCrowd
.
class
,
null
)
List
<
TabSceneCrowd
>
sceneCrowdList
=
member4RddManager
.
getPojo
(
"tab_scene_crowd"
,
TabSceneCrowd
.
class
,
null
)
.
filter
(
new
Column
(
"delete_flag"
).
equalTo
(
0
))
.
filter
(
new
Column
(
"delete_flag"
).
equalTo
(
0
))
.
filter
(
new
Column
(
"valid_flag"
).
equalTo
(
1
))
.
filter
(
new
Column
(
"valid_flag"
).
equalTo
(
1
))
.
javaRDD
()
.
javaRDD
()
.
filter
(
data
->{
.
filter
(
data
->
{
if
(
null
!=
data
.
getReal_Time
()){
if
(
null
!=
data
.
getReal_Time
())
{
if
(
data
.
getReal_Time
()==
2
){
if
(
data
.
getReal_Time
()
==
2
)
{
return
true
;
return
true
;
}
else
if
(
data
.
getReal_Time
()==
3
){
}
else
if
(
data
.
getReal_Time
()
==
3
)
{
return
true
;
return
true
;
}
}
}
}
return
false
;
return
false
;
})
})
.
collect
();
.
collect
();
for
(
TabSceneCrowd
sceneCrowd:
sceneCrowdList
){
for
(
TabSceneCrowd
sceneCrowd
:
sceneCrowdList
)
{
LinkedList
<
TagConditionGroupDTO
>
conditionGroupDTOList
=
JSONObject
.
parseObject
(
sceneCrowd
.
getTag_Condition_Group_Info
(),
new
TypeReference
<
LinkedList
<
TagConditionGroupDTO
>>(){});
LinkedList
<
TagConditionGroupDTO
>
conditionGroupDTOList
=
JSONObject
.
parseObject
(
sceneCrowd
.
getTag_Condition_Group_Info
(),
new
TypeReference
<
LinkedList
<
TagConditionGroupDTO
>>()
{
sceneCrowdDTOList
.
add
(
new
SceneCrowdDTO
(
sceneCrowd
,
conditionGroupDTOList
));
});
sceneCrowdDTOList
.
add
(
new
SceneCrowdDTO
(
sceneCrowd
,
conditionGroupDTOList
));
}
}
List
<
TabDataActuallyPaidConfig
>
dataActuallyPaidConfigList
=
enterprise4RddManager
.
getPojo
(
"tab_data_actually_paid_config"
,
TabDataActuallyPaidConfig
.
class
,
null
)
List
<
TabDataActuallyPaidConfig
>
dataActuallyPaidConfigList
=
enterprise4RddManager
.
getPojo
(
"tab_data_actually_paid_config"
,
TabDataActuallyPaidConfig
.
class
,
null
)
.
filter
(
new
Column
(
"status"
).
equalTo
(
1
))
.
filter
(
new
Column
(
"status"
).
equalTo
(
1
))
.
collectAsList
();
.
collectAsList
();
dataActuallyPaidConfigList
.
forEach
(
data
->
CommonUtil
.
dataActuallyPaidConfigMap
.
put
(
data
.
getEnterprise_Id
(),
data
));
dataActuallyPaidConfigList
.
forEach
(
data
->
CommonUtil
.
dataActuallyPaidConfigMap
.
put
(
data
.
getEnterprise_Id
(),
data
));
}
}
public
void
setEnterpriseId
(
List
<
Integer
>
enterpriseIdList
){
for
(
int
i
=
sceneCrowdDTOList
.
size
()-
1
;
i
>=
0
;
i
--){
public
void
setEnterpriseId
(
Set
<
Integer
>
enterpriseIdList
)
{
if
(!
enterpriseIdList
.
contains
(
sceneCrowdDTOList
.
get
(
i
).
getEnterprise_Id
())){
sceneCrowdDTOList
.
removeIf
(
sceneCrowdDTO
->
!
enterpriseIdList
.
contains
(
sceneCrowdDTO
.
getEnterprise_Id
()));
sceneCrowdDTOList
.
remove
(
i
);
}
}
}
}
public
void
setTagGroupId
(
List
<
Long
>
tagGroupIdList
){
for
(
int
i
=
sceneCrowdDTOList
.
size
()-
1
;
i
>=
0
;
i
--){
public
void
setTagGroupId
(
Set
<
Long
>
tagGroupIdList
)
{
if
(!
tagGroupIdList
.
contains
(
sceneCrowdDTOList
.
get
(
i
).
getId
())){
sceneCrowdDTOList
.
removeIf
(
sceneCrowdDTO
->
!
tagGroupIdList
.
contains
(
sceneCrowdDTO
.
getId
()));
sceneCrowdDTOList
.
remove
(
i
);
}
}
}
}
public
void
process
(
boolean
extractData
){
public
void
process
(
boolean
extractData
)
{
Map
<
Integer
,
List
<
TagProcessEntity
>>
tagGroupByEnterpriseMap
=
new
HashMap
<>();
Map
<
Integer
,
List
<
TagProcessEntity
>>
tagGroupByEnterpriseMap
=
new
HashMap
<>();
Map
<
Long
,
BaseTagFilter
>
tagIdToFilterMap
=
new
HashMap
();
Map
<
Long
,
BaseTagFilter
>
tagIdToFilterMap
=
new
HashMap
();
for
(
SceneCrowdDTO
sceneCrowdDTO:
sceneCrowdDTOList
){
for
(
SceneCrowdDTO
sceneCrowdDTO
:
sceneCrowdDTOList
)
{
LinkedList
<
TagConditionGroupDTO
>
conditionGroupDTOS
=
sceneCrowdDTO
.
getConditionGroupDTOList
();
LinkedList
<
TagConditionGroupDTO
>
conditionGroupDTOS
=
sceneCrowdDTO
.
getConditionGroupDTOList
();
for
(
int
i
=
0
;
i
<
conditionGroupDTOS
.
size
();
i
++){
for
(
int
i
=
0
;
i
<
conditionGroupDTOS
.
size
();
i
++)
{
TagProcessEntity
entity
=
new
TagProcessEntity
();
TagProcessEntity
entity
=
new
TagProcessEntity
();
entity
.
enterpriseId
=
sceneCrowdDTO
.
getEnterprise_Id
();
entity
.
enterpriseId
=
sceneCrowdDTO
.
getEnterprise_Id
();
entity
.
tagGroupId
=
sceneCrowdDTO
.
getId
();
entity
.
tagGroupId
=
sceneCrowdDTO
.
getId
();
entity
.
realTime
=
sceneCrowdDTO
.
getReal_Time
();
entity
.
realTime
=
sceneCrowdDTO
.
getReal_Time
();
entity
.
level
=
i
+
1
;
entity
.
level
=
i
+
1
;
entity
.
tagList
=
conditionGroupDTOS
.
get
(
i
).
getConditionInfos
();
entity
.
tagList
=
conditionGroupDTOS
.
get
(
i
).
getConditionInfos
();
for
(
TagConditionDTO
conditionDTO:
entity
.
tagList
){
//将tag同filter进行映射
for
(
TagConditionDTO
conditionDTO
:
entity
.
tagList
)
{
//将tag同filter进行映射
if
(
conditionDTO
.
getRealTime
()==
0
){
if
(
conditionDTO
.
getRealTime
()
==
0
)
{
BaseTagFilter
tagFilter
=
TagFilterFactory
.
getInstance
().
getTagFilter
(
conditionDTO
);
BaseTagFilter
tagFilter
=
TagFilterFactory
.
getInstance
().
getTagFilter
(
conditionDTO
);
if
(
null
!=
tagFilter
){
if
(
null
!=
tagFilter
)
{
tagIdToFilterMap
.
put
(
conditionDTO
.
getTagId
(),
tagFilter
);
tagIdToFilterMap
.
put
(
conditionDTO
.
getTagId
(),
tagFilter
);
}
}
}
}
}
}
if
(!
tagGroupByEnterpriseMap
.
containsKey
(
sceneCrowdDTO
.
getEnterprise_Id
())){
if
(!
tagGroupByEnterpriseMap
.
containsKey
(
sceneCrowdDTO
.
getEnterprise_Id
()))
{
tagGroupByEnterpriseMap
.
put
(
sceneCrowdDTO
.
getEnterprise_Id
(),
new
ArrayList
());
tagGroupByEnterpriseMap
.
put
(
sceneCrowdDTO
.
getEnterprise_Id
(),
new
ArrayList
());
}
tagGroupByEnterpriseMap
.
get
(
sceneCrowdDTO
.
getEnterprise_Id
()).
add
(
entity
);
}
}
tagGroupByEnterpriseMap
.
get
(
sceneCrowdDTO
.
getEnterprise_Id
()).
add
(
entity
);
}
}
}
...
@@ -186,7 +183,7 @@ public class TagProcessManager {
...
@@ -186,7 +183,7 @@ public class TagProcessManager {
}
}
}
}
}
}
DataSourceManager
.
getInstance
().
addSourceEntity
(
memberSharding4Datasource
,
enterpriseTagEntry
.
getKey
().
intValue
());
DataSourceManager
.
getInstance
().
addSourceEntity
(
memberSharding4Datasource
,
enterpriseTagEntry
.
getKey
().
intValue
());
}
}
if
(
extractData
)
{
if
(
extractData
)
{
...
@@ -197,59 +194,59 @@ public class TagProcessManager {
...
@@ -197,59 +194,59 @@ public class TagProcessManager {
//处理标签数据
//处理标签数据
JavaSparkContext
jsc
=
SparkEnvManager
.
getInstance
().
getJsc
();
JavaSparkContext
jsc
=
SparkEnvManager
.
getInstance
().
getJsc
();
List
<
Long
>
sceneCrowdIdList
=
new
ArrayList
();
List
<
Long
>
sceneCrowdIdList
=
new
ArrayList
();
for
(
Map
.
Entry
<
Integer
,
List
<
TagProcessEntity
>>
enterpriseTagEntry
:
tagGroupByEnterpriseMap
.
entrySet
())
{
for
(
Map
.
Entry
<
Integer
,
List
<
TagProcessEntity
>>
enterpriseTagEntry
:
tagGroupByEnterpriseMap
.
entrySet
())
{
Integer
enterpriseId
=
enterpriseTagEntry
.
getKey
();
Integer
enterpriseId
=
enterpriseTagEntry
.
getKey
();
String
indexName
=
EsRequestUtil
.
getESIindexName
(
enterpriseId
,
this
.
isProduction
());
String
indexName
=
EsRequestUtil
.
getESIindexName
(
enterpriseId
,
this
.
isProduction
());
JavaPairRDD
<
Long
,
String
>
memberGroupRdd
=
null
;
JavaPairRDD
<
Long
,
String
>
memberGroupRdd
=
null
;
for
(
TagProcessEntity
entity:
enterpriseTagEntry
.
getValue
())
{
for
(
TagProcessEntity
entity
:
enterpriseTagEntry
.
getValue
())
{
for
(
TagConditionDTO
conditionDTO:
entity
.
tagList
)
{
for
(
TagConditionDTO
conditionDTO
:
entity
.
tagList
)
{
if
(
tagIdToFilterMap
.
containsKey
(
conditionDTO
.
getTagId
()))
{
if
(
tagIdToFilterMap
.
containsKey
(
conditionDTO
.
getTagId
()))
{
BaseTagFilter
tagFilter
=
tagIdToFilterMap
.
get
(
conditionDTO
.
getTagId
());
BaseTagFilter
tagFilter
=
tagIdToFilterMap
.
get
(
conditionDTO
.
getTagId
());
AbstractFilterRequest
filterRequest
=
TagValueParser
.
parseFilterValue
(
conditionDTO
,
enterpriseId
);
AbstractFilterRequest
filterRequest
=
TagValueParser
.
parseFilterValue
(
conditionDTO
,
enterpriseId
);
final
String
groupId
=
entity
.
tagGroupId
+
"_"
+
conditionDTO
.
getTagId
()
+
"_"
+
entity
.
level
;
final
String
groupId
=
entity
.
tagGroupId
+
"_"
+
conditionDTO
.
getTagId
()
+
"_"
+
entity
.
level
;
JavaPairRDD
<
Long
,
String
>
filterRdd
=
tagFilter
.
filterValidMember
(
enterpriseId
,
filterRequest
).
mapToPair
(
data
->
Tuple2
.
apply
(
data
,
groupId
));
JavaPairRDD
<
Long
,
String
>
filterRdd
=
tagFilter
.
filterValidMember
(
enterpriseId
,
filterRequest
).
mapToPair
(
data
->
Tuple2
.
apply
(
data
,
groupId
));
System
.
out
.
println
(
"filterRdd==>"
+
filterRdd
.
count
());
System
.
out
.
println
(
"filterRdd==>"
+
filterRdd
.
count
());
if
(
null
==
memberGroupRdd
)
{
if
(
null
==
memberGroupRdd
)
{
memberGroupRdd
=
filterRdd
;
memberGroupRdd
=
filterRdd
;
}
else
{
}
else
{
memberGroupRdd
=
memberGroupRdd
.
union
(
filterRdd
);
memberGroupRdd
=
memberGroupRdd
.
union
(
filterRdd
);
}
}
sceneCrowdIdList
.
add
(
entity
.
tagGroupId
);
sceneCrowdIdList
.
add
(
entity
.
tagGroupId
);
}
}
}
}
}
}
if
(
null
!=
memberGroupRdd
)
{
if
(
null
!=
memberGroupRdd
)
{
JavaPairRDD
<
Long
,
Long
>
userRdd
=
memberSharding4Datasource
.
getDatasetByEnterpriseId
(
enterpriseId
).
select
(
"id"
).
javaRDD
()
JavaPairRDD
<
Long
,
Long
>
userRdd
=
memberSharding4Datasource
.
getDatasetByEnterpriseId
(
enterpriseId
).
select
(
"id"
).
javaRDD
()
.
mapToPair
(
data
->
Tuple2
.
apply
((
Long
)
data
.
getAs
(
"id"
),(
Long
)
data
.
getAs
(
"id"
)))
.
mapToPair
(
data
->
Tuple2
.
apply
((
Long
)
data
.
getAs
(
"id"
),
(
Long
)
data
.
getAs
(
"id"
)))
.
reduceByKey
((
x
,
y
)->
x
);
.
reduceByKey
((
x
,
y
)
->
x
);
JavaPairRDD
<
Long
,
String
>
updateMemberGroupRdd
=
userRdd
.
leftOuterJoin
(
memberGroupRdd
.
reduceByKey
((
x
,
y
)->
x
+
" "
+
y
))
JavaPairRDD
<
Long
,
String
>
updateMemberGroupRdd
=
userRdd
.
leftOuterJoin
(
memberGroupRdd
.
reduceByKey
((
x
,
y
)
->
x
+
" "
+
y
))
.
mapPartitionsToPair
(
data
->
{
.
mapPartitionsToPair
(
data
->
{
List
<
Tuple2
<
Long
,
String
>>
result
=
new
ArrayList
();
List
<
Tuple2
<
Long
,
String
>>
result
=
new
ArrayList
();
while
(
data
.
hasNext
())
{
while
(
data
.
hasNext
())
{
Tuple2
<
Long
,
Optional
<
String
>>
tp2
=
data
.
next
().
_2
();
Tuple2
<
Long
,
Optional
<
String
>>
tp2
=
data
.
next
().
_2
();
Long
id
=
tp2
.
_1
();
Long
id
=
tp2
.
_1
();
JSONObject
json
=
new
JSONObject
();
JSONObject
json
=
new
JSONObject
();
json
.
put
(
"id"
,
id
);
json
.
put
(
"id"
,
id
);
json
.
put
(
"sceneTags_b"
,
tp2
.
_2
().
isPresent
()?
tp2
.
_2
().
get
():
""
);
json
.
put
(
"sceneTags_b"
,
tp2
.
_2
().
isPresent
()
?
tp2
.
_2
().
get
()
:
""
);
result
.
add
(
Tuple2
.
apply
(
id
,
json
.
toString
()));
result
.
add
(
Tuple2
.
apply
(
id
,
json
.
toString
()));
}
}
return
result
.
iterator
();
return
result
.
iterator
();
});
});
updateMemberGroupRdd
.
foreach
(
data
->
System
.
out
.
println
(
"id==>"
+
data
.
_1
()+
"sceneTags_b==>"
+
data
.
_2
()));
updateMemberGroupRdd
.
foreach
(
data
->
System
.
out
.
println
(
"id==>"
+
data
.
_1
()
+
"sceneTags_b==>"
+
data
.
_2
()));
System
.
out
.
println
(
"updateMemberGroupRdd==>"
+
updateMemberGroupRdd
.
count
());
System
.
out
.
println
(
"updateMemberGroupRdd==>"
+
updateMemberGroupRdd
.
count
());
JavaPairRDD
<
Long
,
String
>
cacheMemberGroupRdd
=
updateMemberGroupRdd
.
cache
();
JavaPairRDD
<
Long
,
String
>
cacheMemberGroupRdd
=
updateMemberGroupRdd
.
cache
();
updateIndex
(
cacheMemberGroupRdd
,
indexName
);
updateIndex
(
cacheMemberGroupRdd
,
indexName
);
saveToHive
(
cacheMemberGroupRdd
,
enterpriseId
);
saveToHive
(
cacheMemberGroupRdd
,
enterpriseId
);
cacheMemberGroupRdd
.
unpersist
();
cacheMemberGroupRdd
.
unpersist
();
...
@@ -258,14 +255,14 @@ public class TagProcessManager {
...
@@ -258,14 +255,14 @@ public class TagProcessManager {
//处理混合标签
//处理混合标签
JavaPairRDD
<
Long
,
String
>
searchRDD
=
null
;
JavaPairRDD
<
Long
,
String
>
searchRDD
=
null
;
for
(
TagProcessEntity
mixEntity:
enterpriseTagEntry
.
getValue
())
{
for
(
TagProcessEntity
mixEntity
:
enterpriseTagEntry
.
getValue
())
{
if
(
mixEntity
.
realTime
==
3
)
{
if
(
mixEntity
.
realTime
==
3
)
{
Long
tagGroupId
=
mixEntity
.
tagGroupId
;
Long
tagGroupId
=
mixEntity
.
tagGroupId
;
String
query
=
EsRequestUtil
.
getIndexParam
(
enterpriseId
,
tagGroupId
,
this
.
isProduction
);
String
query
=
EsRequestUtil
.
getIndexParam
(
enterpriseId
,
tagGroupId
,
this
.
isProduction
);
if
(
StringUtils
.
isNotEmpty
(
query
))
{
if
(
StringUtils
.
isNotEmpty
(
query
))
{
Map
<
String
,
String
>
conf
=
new
HashMap
();
Map
<
String
,
String
>
conf
=
new
HashMap
();
conf
.
put
(
"es.nodes"
,
AppEnvUtil
.
ES_NODES
);
conf
.
put
(
"es.nodes"
,
AppEnvUtil
.
ES_NODES
);
conf
.
put
(
"es.resource"
,
indexName
+
"/mapper_type"
);
conf
.
put
(
"es.resource"
,
indexName
+
"/mapper_type"
);
...
@@ -274,25 +271,24 @@ public class TagProcessManager {
...
@@ -274,25 +271,24 @@ public class TagProcessManager {
JavaPairRDD
<
Long
,
String
>
esRdd
=
JavaEsSpark
.
esRDD
(
jsc
,
conf
)
JavaPairRDD
<
Long
,
String
>
esRdd
=
JavaEsSpark
.
esRDD
(
jsc
,
conf
)
.
mapToPair
(
data
->
{
.
mapToPair
(
data
->
{
String
sceneTagsB
=
tagGroupId
.
toString
();
String
sceneTagsB
=
tagGroupId
.
toString
();
if
(
null
!=
data
.
_2
().
get
(
"sceneTags_b"
))
{
if
(
null
!=
data
.
_2
().
get
(
"sceneTags_b"
))
{
sceneTagsB
=
sceneTagsB
+
" "
+
data
.
_2
().
get
(
"sceneTags_b"
);
sceneTagsB
=
sceneTagsB
+
" "
+
data
.
_2
().
get
(
"sceneTags_b"
);
}
}
return
Tuple2
.
apply
((
Long
)
data
.
_2
.
get
(
"id"
),
sceneTagsB
);
return
Tuple2
.
apply
((
Long
)
data
.
_2
.
get
(
"id"
),
sceneTagsB
);
});
});
if
(
null
==
searchRDD
)
{
if
(
null
==
searchRDD
){
searchRDD
=
esRdd
;
searchRDD
=
esRdd
;
}
else
{
}
else
{
searchRDD
=
searchRDD
.
union
(
esRdd
);
searchRDD
=
searchRDD
.
union
(
esRdd
);
}
}
}
}
}
}
}
}
if
(
null
!=
searchRDD
)
{
if
(
null
!=
searchRDD
)
{
JavaPairRDD
<
Long
,
String
>
groupRDD
=
searchRDD
.
repartition
(
100
).
reduceByKey
((
x
,
y
)->
x
+
" "
+
y
)
JavaPairRDD
<
Long
,
String
>
groupRDD
=
searchRDD
.
repartition
(
100
).
reduceByKey
((
x
,
y
)
->
x
+
" "
+
y
)
.
mapPartitionsToPair
(
data
->
{
.
mapPartitionsToPair
(
data
->
{
List
<
Tuple2
<
Long
,
String
>>
list
=
new
ArrayList
();
List
<
Tuple2
<
Long
,
String
>>
list
=
new
ArrayList
();
while
(
data
.
hasNext
())
{
while
(
data
.
hasNext
())
{
...
@@ -311,30 +307,30 @@ public class TagProcessManager {
...
@@ -311,30 +307,30 @@ public class TagProcessManager {
return
list
.
iterator
();
return
list
.
iterator
();
});
});
updateIndex
(
groupRDD
,
indexName
);
updateIndex
(
groupRDD
,
indexName
);
}
}
}
}
}
}
private
void
saveToHive
(
JavaPairRDD
<
Long
,
String
>
updateMemberGroup
,
Integer
enterpriseId
)
{
private
void
saveToHive
(
JavaPairRDD
<
Long
,
String
>
updateMemberGroup
,
Integer
enterpriseId
)
{
JavaRDD
<
TagGroupInfo
>
tagGroupInfoRDD
=
updateMemberGroup
.
mapPartitions
(
data
->
{
JavaRDD
<
TagGroupInfo
>
tagGroupInfoRDD
=
updateMemberGroup
.
mapPartitions
(
data
->
{
List
<
TagGroupInfo
>
tagGroupInfoList
=
new
ArrayList
();
List
<
TagGroupInfo
>
tagGroupInfoList
=
new
ArrayList
();
while
(
data
.
hasNext
())
{
while
(
data
.
hasNext
())
{
Tuple2
<
Long
,
String
>
tp2
=
data
.
next
();
Tuple2
<
Long
,
String
>
tp2
=
data
.
next
();
JSONObject
json
=
JSONObject
.
parseObject
(
tp2
.
_2
());
JSONObject
json
=
JSONObject
.
parseObject
(
tp2
.
_2
());
if
(
StringUtils
.
isNotEmpty
(
json
.
getString
(
"sceneTags_b"
)))
{
if
(
StringUtils
.
isNotEmpty
(
json
.
getString
(
"sceneTags_b"
)))
{
tagGroupInfoList
.
add
(
new
TagGroupInfo
(
json
.
getLong
(
"id"
),
enterpriseId
,
json
.
getString
(
"sceneTags_b"
)));
tagGroupInfoList
.
add
(
new
TagGroupInfo
(
json
.
getLong
(
"id"
),
enterpriseId
,
json
.
getString
(
"sceneTags_b"
)));
}
}
}
}
return
tagGroupInfoList
.
iterator
();
return
tagGroupInfoList
.
iterator
();
});
});
System
.
out
.
println
(
"tagGroupInfoRDD.count==>"
+
tagGroupInfoRDD
.
count
());
System
.
out
.
println
(
"tagGroupInfoRDD.count==>"
+
tagGroupInfoRDD
.
count
());
SparkSession
sparkSession
=
SparkEnvManager
.
getInstance
().
getSparkSession
();
SparkSession
sparkSession
=
SparkEnvManager
.
getInstance
().
getSparkSession
();
Dataset
<
Row
>
tagGroupInfoDataset
=
sparkSession
.
createDataFrame
(
tagGroupInfoRDD
,
TagGroupInfo
.
class
);
Dataset
<
Row
>
tagGroupInfoDataset
=
sparkSession
.
createDataFrame
(
tagGroupInfoRDD
,
TagGroupInfo
.
class
);
tagGroupInfoDataset
.
createOrReplaceTempView
(
"tag_group_info_tmp"
);
tagGroupInfoDataset
.
createOrReplaceTempView
(
"tag_group_info_tmp"
);
sparkSession
.
sql
(
String
.
format
(
"INSERT overwrite table %s.tag_group_info PARTITION(enterprise_id = '%s') SELECT id,tag_group_info from tag_group_info_tmp"
,
isProduction
?
"tag4_prod"
:
"tag4_test"
,
enterpriseId
));
sparkSession
.
sql
(
String
.
format
(
"INSERT overwrite table %s.tag_group_info PARTITION(enterprise_id = '%s') SELECT id,tag_group_info from tag_group_info_tmp"
,
isProduction
?
"tag4_prod"
:
"tag4_test"
,
enterpriseId
));
}
}
...
@@ -359,36 +355,36 @@ public class TagProcessManager {
...
@@ -359,36 +355,36 @@ public class TagProcessManager {
ESShardPartitioner
partitioner
=
new
ESShardPartitioner
(
settings
);
ESShardPartitioner
partitioner
=
new
ESShardPartitioner
(
settings
);
updateMemberGroup
.
partitionBy
(
partitioner
).
map
(
data
->
data
.
_2
()).
foreachPartition
(
data
->
{
updateMemberGroup
.
partitionBy
(
partitioner
).
map
(
data
->
data
.
_2
()).
foreachPartition
(
data
->
{
Settings
newSettings
=
new
PropertiesSettings
().
load
(
settings
);
Settings
newSettings
=
new
PropertiesSettings
().
load
(
settings
);
EsRDDWriter
<
String
>
writer
=
EsRddFactory
.
createJsonWriter
(
newSettings
.
save
(),
false
);
EsRDDWriter
<
String
>
writer
=
EsRddFactory
.
createJsonWriter
(
newSettings
.
save
(),
false
);
writer
.
write
(
TaskContext
.
get
(),
JavaConversions
.
asScalaIterator
(
data
));
writer
.
write
(
TaskContext
.
get
(),
JavaConversions
.
asScalaIterator
(
data
));
});
});
}
}
private
void
updateSceneCrowd
(
List
<
Long
>
sceneCrowdIdList
){
private
void
updateSceneCrowd
(
List
<
Long
>
sceneCrowdIdList
)
{
Connection
connection
=
member4Datasource
.
getConnection
();
Connection
connection
=
member4Datasource
.
getConnection
();
PreparedStatement
ps
=
null
;
PreparedStatement
ps
=
null
;
try
{
try
{
ps
=
DaoHelper
.
getPreparedStatement
(
connection
,
"update tab_scene_crowd set update_group_time=? where id=?"
);
ps
=
DaoHelper
.
getPreparedStatement
(
connection
,
"update tab_scene_crowd set update_group_time=? where id=?"
);
for
(
Long
id:
sceneCrowdIdList
)
{
for
(
Long
id
:
sceneCrowdIdList
)
{
ps
.
setTimestamp
(
1
,
new
Timestamp
(
System
.
currentTimeMillis
()));
ps
.
setTimestamp
(
1
,
new
Timestamp
(
System
.
currentTimeMillis
()));
ps
.
setLong
(
2
,
id
);
ps
.
setLong
(
2
,
id
);
ps
.
addBatch
();
ps
.
addBatch
();
}
}
ps
.
executeBatch
();
ps
.
executeBatch
();
}
catch
(
SQLException
e
)
{
}
catch
(
SQLException
e
)
{
e
.
printStackTrace
();
e
.
printStackTrace
();
}
finally
{
}
finally
{
if
(
null
!=
ps
)
{
if
(
null
!=
ps
)
{
try
{
try
{
ps
.
close
();
ps
.
close
();
}
catch
(
SQLException
e
)
{
}
catch
(
SQLException
e
)
{
e
.
printStackTrace
();
e
.
printStackTrace
();
}
}
}
}
if
(
null
!=
connection
)
{
if
(
null
!=
connection
)
{
try
{
try
{
connection
.
close
();
connection
.
close
();
}
catch
(
SQLException
e
)
{
}
catch
(
SQLException
e
)
{
...
@@ -399,16 +395,16 @@ public class TagProcessManager {
...
@@ -399,16 +395,16 @@ public class TagProcessManager {
}
}
private
void
mixTagProcess
(
Map
<
Integer
,
List
<
TagProcessEntity
>>
tagGroupByEnterpriseMap
){
private
void
mixTagProcess
(
Map
<
Integer
,
List
<
TagProcessEntity
>>
tagGroupByEnterpriseMap
)
{
//处理混合标签
//处理混合标签
JavaSparkContext
jsc
=
SparkEnvManager
.
getInstance
().
getJsc
();
JavaSparkContext
jsc
=
SparkEnvManager
.
getInstance
().
getJsc
();
for
(
Map
.
Entry
<
Integer
,
List
<
TagProcessEntity
>>
enterpriseMixTagEntry
:
tagGroupByEnterpriseMap
.
entrySet
())
{
for
(
Map
.
Entry
<
Integer
,
List
<
TagProcessEntity
>>
enterpriseMixTagEntry
:
tagGroupByEnterpriseMap
.
entrySet
())
{
Integer
enterpriseId
=
enterpriseMixTagEntry
.
getKey
();
Integer
enterpriseId
=
enterpriseMixTagEntry
.
getKey
();
String
indexName
=
EsRequestUtil
.
getESIindexName
(
enterpriseId
,
this
.
isProduction
());
String
indexName
=
EsRequestUtil
.
getESIindexName
(
enterpriseId
,
this
.
isProduction
());
JavaPairRDD
<
Long
,
String
>
searchRDD
=
null
;
JavaPairRDD
<
Long
,
String
>
searchRDD
=
null
;
for
(
TagProcessEntity
mixEntity:
enterpriseMixTagEntry
.
getValue
())
{
for
(
TagProcessEntity
mixEntity
:
enterpriseMixTagEntry
.
getValue
())
{
Long
tagGroupId
=
mixEntity
.
tagGroupId
;
Long
tagGroupId
=
mixEntity
.
tagGroupId
;
Map
<
String
,
String
>
conf
=
new
HashMap
();
Map
<
String
,
String
>
conf
=
new
HashMap
();
conf
.
put
(
"es.nodes"
,
AppEnvUtil
.
ES_NODES
);
conf
.
put
(
"es.nodes"
,
AppEnvUtil
.
ES_NODES
);
...
@@ -418,23 +414,23 @@ public class TagProcessManager {
...
@@ -418,23 +414,23 @@ public class TagProcessManager {
JavaPairRDD
<
Long
,
String
>
esRdd
=
JavaEsSpark
.
esRDD
(
jsc
,
conf
)
JavaPairRDD
<
Long
,
String
>
esRdd
=
JavaEsSpark
.
esRDD
(
jsc
,
conf
)
.
mapToPair
(
data
->
{
.
mapToPair
(
data
->
{
String
sceneTagsB
=
tagGroupId
.
toString
();
String
sceneTagsB
=
tagGroupId
.
toString
();
if
(
null
!=
data
.
_2
().
get
(
"sceneTags_b"
))
{
if
(
null
!=
data
.
_2
().
get
(
"sceneTags_b"
))
{
sceneTagsB
=
sceneTagsB
+
" "
+
data
.
_2
().
get
(
"sceneTags_b"
);
sceneTagsB
=
sceneTagsB
+
" "
+
data
.
_2
().
get
(
"sceneTags_b"
);
}
}
return
Tuple2
.
apply
((
Long
)
data
.
_2
.
get
(
"id"
),
sceneTagsB
);
return
Tuple2
.
apply
((
Long
)
data
.
_2
.
get
(
"id"
),
sceneTagsB
);
});
});
if
(
null
==
searchRDD
)
{
if
(
null
==
searchRDD
)
{
searchRDD
=
esRdd
;
searchRDD
=
esRdd
;
}
else
{
}
else
{
searchRDD
=
searchRDD
.
union
(
esRdd
);
searchRDD
=
searchRDD
.
union
(
esRdd
);
}
}
}
}
if
(
null
!=
searchRDD
)
{
if
(
null
!=
searchRDD
)
{
JavaPairRDD
<
Long
,
String
>
groupRDD
=
searchRDD
.
repartition
(
100
).
reduceByKey
((
x
,
y
)->
x
+
" "
+
y
)
JavaPairRDD
<
Long
,
String
>
groupRDD
=
searchRDD
.
repartition
(
100
).
reduceByKey
((
x
,
y
)
->
x
+
" "
+
y
)
.
mapPartitionsToPair
(
data
->
{
.
mapPartitionsToPair
(
data
->
{
List
<
Tuple2
<
Long
,
String
>>
list
=
new
ArrayList
();
List
<
Tuple2
<
Long
,
String
>>
list
=
new
ArrayList
();
while
(
data
.
hasNext
())
{
while
(
data
.
hasNext
())
{
...
@@ -453,7 +449,7 @@ public class TagProcessManager {
...
@@ -453,7 +449,7 @@ public class TagProcessManager {
return
list
.
iterator
();
return
list
.
iterator
();
});
});
updateIndex
(
groupRDD
,
indexName
);
updateIndex
(
groupRDD
,
indexName
);
}
}
}
}
}
}
...
...
src/main/java/com/gic/spark/util/EsRequestUtil.java
View file @
742d13ef
package
com
.
gic
.
spark
.
util
;
package
com
.
gic
.
spark
.
util
;
import
com.alibaba.fastjson.JSONArray
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.fastjson.JSONObject
;
import
org.apache.http.HttpResponse
;
import
org.apache.http.HttpResponse
;
import
org.apache.http.client.ClientProtocolException
;
import
org.apache.http.client.ClientProtocolException
;
...
@@ -7,7 +8,10 @@ import org.apache.http.client.methods.HttpGet;
...
@@ -7,7 +8,10 @@ import org.apache.http.client.methods.HttpGet;
import
org.apache.http.client.methods.HttpPost
;
import
org.apache.http.client.methods.HttpPost
;
import
java.io.IOException
;
import
java.io.IOException
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.Map
;
import
java.util.Map
;
import
java.util.Set
;
/**
/**
* Created by paste on 2018/7/21 16:29
* Created by paste on 2018/7/21 16:29
...
@@ -16,26 +20,26 @@ import java.util.Map;
...
@@ -16,26 +20,26 @@ import java.util.Map;
*/
*/
public
class
EsRequestUtil
{
public
class
EsRequestUtil
{
public
static
String
getESIindexName
(
Integer
enterpriseId
,
boolean
isProduction
)
{
public
static
String
getESIindexName
(
Integer
enterpriseId
,
boolean
isProduction
)
{
String
url
=
isProduction
?
"https://ideal.demogic.com/member-config/member/index-version/"
+
enterpriseId
String
url
=
isProduction
?
"https://ideal.demogic.com/member-config/member/index-version/"
+
enterpriseId
:
"https://four.gicdev.com/member-config/member/index-version/"
+
enterpriseId
;
:
"https://four.gicdev.com/member-config/member/index-version/"
+
enterpriseId
;
HttpResponse
response
=
getHttpResponseByGet
(
url
,
enterpriseId
);
HttpResponse
response
=
getHttpResponseByGet
(
url
,
enterpriseId
);
int
responseCode
=
response
.
getStatusLine
().
getStatusCode
();
int
responseCode
=
response
.
getStatusLine
().
getStatusCode
();
if
(
responseCode
==
200
)
{
if
(
responseCode
==
200
)
{
return
HttpClient
.
getResponseString
(
response
);
return
HttpClient
.
getResponseString
(
response
);
}
}
return
null
;
return
null
;
}
}
public
static
HttpResponse
getHttpResponseByGet
(
String
url
,
Integer
enterpriseId
)
{
public
static
HttpResponse
getHttpResponseByGet
(
String
url
,
Integer
enterpriseId
)
{
org
.
apache
.
http
.
client
.
HttpClient
httpClient
=
HttpClient
.
getHttpClient
();
org
.
apache
.
http
.
client
.
HttpClient
httpClient
=
HttpClient
.
getHttpClient
();
HttpGet
get
=
new
HttpGet
(
url
);
HttpGet
get
=
new
HttpGet
(
url
);
get
.
setHeader
(
"sign"
,
""
+
enterpriseId
);
get
.
setHeader
(
"sign"
,
""
+
enterpriseId
);
get
.
setHeader
(
"route"
,
"/spark"
);
get
.
setHeader
(
"route"
,
"/spark"
);
get
.
setHeader
(
"project"
,
"memberfour"
);
get
.
setHeader
(
"project"
,
"memberfour"
);
get
.
setHeader
(
"isControl"
,
"true"
);
get
.
setHeader
(
"isControl"
,
"true"
);
get
.
setHeader
(
"Refer"
,
url
);
get
.
setHeader
(
"Refer"
,
url
);
try
{
try
{
HttpResponse
response
=
httpClient
.
execute
(
get
);
HttpResponse
response
=
httpClient
.
execute
(
get
);
return
response
;
return
response
;
...
@@ -48,16 +52,16 @@ public class EsRequestUtil {
...
@@ -48,16 +52,16 @@ public class EsRequestUtil {
return
null
;
return
null
;
}
}
public
static
String
getIndexParam
(
Integer
enterpriseId
,
Long
id
,
boolean
isProd
)
{
public
static
String
getIndexParam
(
Integer
enterpriseId
,
Long
id
,
boolean
isProd
)
{
String
url
=
isProd
?
"https://four.gicdev.com/member-label/get-member-size-by-condition?sceneCrowdId="
+
id
+
"&enterpriseId="
+
enterpriseId
String
url
=
isProd
?
"https://four.gicdev.com/member-label/get-member-size-by-condition?sceneCrowdId="
+
id
+
"&enterpriseId="
+
enterpriseId
:
"https://four.gicdev.com/member-label/get-member-size-by-condition?sceneCrowdId="
+
id
+
"&enterpriseId="
+
enterpriseId
;
:
"https://four.gicdev.com/member-label/get-member-size-by-condition?sceneCrowdId="
+
id
+
"&enterpriseId="
+
enterpriseId
;
HttpPost
httpPost
=
new
HttpPost
(
url
);
HttpPost
httpPost
=
new
HttpPost
(
url
);
httpPost
.
addHeader
(
"Content-Type"
,
"application/json;charset=UTF-8"
);
httpPost
.
addHeader
(
"Content-Type"
,
"application/json;charset=UTF-8"
);
httpPost
.
addHeader
(
"project"
,
"member-tag"
);
httpPost
.
addHeader
(
"project"
,
"member-tag"
);
httpPost
.
addHeader
(
"sign"
,
"1129"
);
httpPost
.
addHeader
(
"sign"
,
"1129"
);
httpPost
.
addHeader
(
"route"
,
"get-member-size-by-condition"
);
httpPost
.
addHeader
(
"route"
,
"get-member-size-by-condition"
);
HttpResponse
response
=
null
;
HttpResponse
response
=
null
;
try
{
try
{
response
=
HttpClient
.
getHttpClient
().
execute
(
httpPost
);
response
=
HttpClient
.
getHttpClient
().
execute
(
httpPost
);
...
@@ -71,6 +75,7 @@ public class EsRequestUtil {
...
@@ -71,6 +75,7 @@ public class EsRequestUtil {
}
}
public
static
void
main
(
String
[]
args
)
throws
IOException
{
public
static
void
main
(
String
[]
args
)
throws
IOException
{
System
.
out
.
println
(
getIndexParam
(
1129
,
183676455182233606L
,
true
));
// System.out.println(getIndexParam(1129,183676455182233606L,true));
}
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment