Commit 09f0b713 by wangyulong

1.特征数据优化

parent 8d11f657
......@@ -131,7 +131,7 @@
<dependency>
<groupId>com.secoo.so.common</groupId>
<artifactId>common-data-api</artifactId>
<version>1.9.8</version>
<version>1.9.9</version>
</dependency>
......
package com.secoo.search.common;
import org.apache.hadoop.io.Text;
public class Const {
public static final Text KEY_WORD = new Text("key_word");
public static final Text SEARCH_PV = new Text("search_pv");
public static final Text SEARCH_UV = new Text("search_uv");
}
......@@ -3,6 +3,7 @@ package com.secoo.search.job.keyword;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
......@@ -48,7 +49,7 @@ public class KeywordFeatureExtractJob extends Configured implements Tool {
job.setMapperClass(KeywordFeatureExtractMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setMapOutputValueClass(MapWritable.class);
job.setInputFormatClass(TextInputFormat.class);
......
package com.secoo.search.job.keyword;
import com.secoo.search.common.Const;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
......@@ -13,9 +14,9 @@ import java.io.IOException;
* map 根据 keyword 作为key, 到 reduce 再处理
* @author zhaoyanchao
*/
public class KeywordFeatureExtractMap extends Mapper<Object, Text, Text, NullWritable> {
public class KeywordFeatureExtractMap extends Mapper<Object, Text, Text, MapWritable> {
private static final int COLS = 1;
private static final int COLS = 3;
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
......@@ -25,9 +26,17 @@ public class KeywordFeatureExtractMap extends Mapper<Object, Text, Text, NullWri
if (items.length == COLS){
if (StringUtils.isNotBlank(items[0]) && !filter(items[0])){
context.write(new Text(items[0].trim()), NullWritable.get());
String keyWord = items[0];
String searchPv = items[1];
String searchUv = items[2];
if (StringUtils.isBlank(keyWord) || filter(keyWord) || StringUtils.isBlank(searchPv) || StringUtils.isBlank(searchUv)){
return;
}
MapWritable mapWritable = new MapWritable();
mapWritable.put(Const.KEY_WORD, new Text(keyWord));
mapWritable.put(Const.SEARCH_PV, new Text(searchPv));
mapWritable.put(Const.SEARCH_UV, new Text(searchUv));
context.write(new Text(keyWord), mapWritable);
}
}
......
package com.secoo.search.job.keyword;
import com.secoo.search.common.Const;
import com.secoo.so.common.constant.Environment;
import com.secoo.so.common.query.NormalQueryPlanServiceImpl;
import com.secoo.so.common.query.QueryFeature;
import com.secoo.so.common.query.QueryPlanService;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
/**
* @author zhaoyanchao
*/
public class KeywordFeatureExtractReduce extends Reducer<Text, NullWritable, NullWritable, Text> {
public class KeywordFeatureExtractReduce extends Reducer<Text, MapWritable, NullWritable, Text> {
QueryPlanService planService;
private QueryPlanService planService;
private static final String FIELD_SPLITOR = "\t";
@Override
......@@ -29,28 +33,42 @@ public class KeywordFeatureExtractReduce extends Reducer<Text, NullWritable, Nu
}
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
if (key == null ) {
protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException {
if (key == null) {
return;
}
String keyword = key.toString();
if (StringUtils.isBlank(keyword)) {
return;
Iterator<MapWritable> iterator = values.iterator();
while (iterator.hasNext()){
MapWritable mapWritable = iterator.next();
Writable keywordW = mapWritable.get(Const.KEY_WORD);
Writable searchPvW = mapWritable.get(Const.SEARCH_PV);
Writable searchUvW = mapWritable.get(Const.SEARCH_UV);
if (keywordW == null || searchPvW == null || searchUvW == null){
continue;
}
String keyword = String.valueOf(keywordW);
String searchPv = String.valueOf(searchPvW);
String searchUv = String.valueOf(searchUvW);
//调用sqp语义解析query词
QueryFeature queryFeature = planService.extractQueryFeature(keyword);
String cateStr = getListStr(queryFeature.getCateIds(),5);
String brandStr = getListStr(queryFeature.getBrandIds(),3);
Integer queryWordSize = queryFeature.getQueryWordSize();
StringBuilder record = new StringBuilder();
record.append(keyword).append(FIELD_SPLITOR)
.append(cateStr).append(FIELD_SPLITOR)
.append(brandStr).append(FIELD_SPLITOR)
.append(queryFeature.getGender()).append(FIELD_SPLITOR)
.append(queryFeature.getContainsOtherWord()).append(FIELD_SPLITOR)
.append(queryWordSize).append(FIELD_SPLITOR)
.append(searchPv).append(FIELD_SPLITOR)
.append(searchUv).append(FIELD_SPLITOR);
context.write(NullWritable.get(),new Text(record.toString()));
// 避免对sqp 压力过大
Thread.sleep(5);
}
QueryFeature queryFeature = planService.extractQueryFeature(keyword);
String cateStr = getListStr(queryFeature.getCateIds(),5);
String brandStr = getListStr(queryFeature.getBrandIds(),3);
String record = new StringBuilder().append(keyword).append(FIELD_SPLITOR)
.append(cateStr).append(FIELD_SPLITOR)
.append(brandStr).append(FIELD_SPLITOR)
.append(queryFeature.getGender()).append(FIELD_SPLITOR)
.append(queryFeature.getContainsOtherWord()).append(FIELD_SPLITOR).toString();
context.write(NullWritable.get(),new Text(record.toString()));
// 避免对sqp 压力过大
Thread.sleep(5);
}
/** 输入list 可能为 null
......
......@@ -13,7 +13,10 @@ query_brand_2 bigint comment '识别品牌2',
query_brand_3 bigint comment '识别品牌3',
query_gender tinyint comment '识别性别,1是男,2是女, 0是没有',
query_contains_other_word tinyint comment '是否含其他词'
query_contains_other_word tinyint comment '是否含其他词',
query_word_size tinyint comment 'query分词个数',
query_search_pv bigint comment 'query搜索次数',
query_search_uv bigint comment 'query搜索人数'
) comment 'query原始特征'
row format delimited fields terminated by '\t'
stored as textfile;
\ No newline at end of file
create external table if not exists secoo_search.search_data_original_query_last_year
(
keyword string comment '搜索词',
search_pv bigint comment '搜索pv数',
search_uv bigint comment '搜索uv数'
) comment '过去两个月pv大于3次的搜索词'
row format delimited fields terminated by '\t'
stored as textfile;
\ No newline at end of file
INSERT overwrite TABLE secoo_search.search_data_original_query_last_year
SELECT
T.key_word,
T.search_pv,
T.search_uv
FROM
(
SELECT
key_word,
count(DISTINCT request_id) AS search_pv,
count(DISTINCT search_device_id) AS search_uv
FROM secoo_fact_hour.fact_search_detail_union_p_hour_inrc
WHERE p_day >= date_sub(${yesterday}, 60) AND p_day <= ${yesterday}
AND key_word != ''
AND search_device_id IS NOT NULL
AND request_id IS NOT NULL
GROUP BY key_word
) T
WHERE T.search_pv > 3
\ No newline at end of file
# 创建keyword 原始数据
hive -e "create external table if not exists secoo_search.search_data_original_query_last_year
(
keyword string comment '搜索词'
) comment '过去一年的搜索词'
row format delimited fields terminated by '\t'
stored as textfile;"
## 截取 p_day=xxxx-xx-xx 的最近时间
recent_keyword_day=`hive -e "show partitions secoo_app.app_search_keyword_year_week_p_day" |tail -n 2 | head -n 1`
recent_keyword_day=${recent_keyword_day:6:10}
echo $recent_keyword_day
# 提取keyword 到输入表
hive -e "insert overwrite table secoo_search.search_data_original_query_last_year
select M.keyword
from
(
select T.keyword,T.year_cnt
from (
select
keyword,
sum(year_pv) as year_cnt
from secoo_app.app_search_keyword_year_week_p_day
where p_day = '$recent_keyword_day'
group by keyword
) T
order by T.year_cnt desc limit 100000
) M;"
# 删除原输出文件
work_dir="/data/zhaoyanchao/java/shell/query_feature/"
#搜索query源数据建表
hive -e "drop table secoo_search.search_data_original_query_last_year;"
hive -f "$work_dir"create_query_original_table.sql
#搜索query源数据表数据写入
today_param=$1
delta_day=1
yesterday=`date -d "${today_param} -$delta_day day" "+%Y-%m-%d"`
echo ${yesterday}
hive --hivevar yesterday="'$yesterday'" -f "$work_dir"insert_query_original_table.sql
# 删除原输出文件
hdfs dfs -rm -r hdfs://tesla-cluster/apps/hive/warehouse/secoo_search.db/search_data_query_original_feature
# 提取特征到输出表
yarn jar search-model-data-1.0-SNAPSHOT-jar-with-dependencies.jar com.secoo.search.job.keyword.KeywordFeatureExtractJob
work_jar_dir="/data/soft/data-warehouse_jar/"
yarn jar "${work_jar_dir}"search-model-data-1.0-SNAPSHOT-jar-with-dependencies.jar com.secoo.search.job.keyword.KeywordFeatureExtractJob
yarn jar /data/soft/data-warehouse_jar/search-model-data-1.0-SNAPSHOT-jar-with-dependencies.jar com.secoo.search.job.keyword.KeywordFeatureExtractJob
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment