Commit 9ba0f533 by xupeng

修正代码,增加保存到es支持

parent e6f56eb1
......@@ -91,6 +91,23 @@
<version>1.1.8</version>
</dependency>
<!-- es -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.4.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.4.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.4.2</version>
</dependency>
</dependencies>
<build>
......
......@@ -18,8 +18,8 @@ public class EsSuggestKeywordInfo implements Serializable {
private Long weekClickCount;
private Long weekCartCount;
private Double yearClickRatio;
private Float yearCartRatio;
private Float weekClickRatio;
private Double yearCartRatio;
private Double weekClickRatio;
private Double weekCartRatio;
private Boolean isBrand;
private Boolean isCategory;
......@@ -27,7 +27,7 @@ public class EsSuggestKeywordInfo implements Serializable {
private Boolean isSensitive;
private Integer manualValue;
private Double wordRank;
private Float wordABRank;
private Double wordABRank;
private String keywordVersion;
private Boolean isEuropeWord;
private String suggestTags;
......
package com.secoo.so.suggest.es;
public class ESException extends Exception {
private static final long serialVersionUID = -4947060289056203488L;
private String msg;
private int code = 500;
public ESException(String msg) {
super(msg);
this.msg = msg;
}
public ESException(String msg, Throwable e) {
super(msg, e);
this.msg = msg;
}
public ESException(String msg, int code) {
super(msg);
this.msg = msg;
this.code = code;
}
public ESException(String msg, int code, Throwable e) {
super(msg, e);
this.msg = msg;
this.code = code;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
}
package com.secoo.so.suggest.es;
import com.alibaba.fastjson.JSON;
import com.secoo.so.suggest.util.CollectionUtils;
import com.secoo.so.suggest.util.ObjectUtils;
import com.secoo.so.suggest.util.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
public class EsClient {
private static final Logger logger = LoggerFactory.getLogger(EsClient.class);
public static final String SCHEMA_HTTP = "http"; // 默认http
public static final String SCHEMA_HTTPS = "https";
/**
* host地址作为key,缓存client
*/
private static final Map<String, EsClient> clientMap = new HashMap<>();
private static boolean uniqueConnectTimeConfig = false;
private static boolean uniqueConnectNumConfig = false;
private static int CONNECT_TIME_OUT = 1000;
private static int SOCKET_TIME_OUT = 30000;
private static int CONNECTION_REQUEST_TIME_OUT = 500;
private static int MAX_CONNECT_NUM = 100;
private static int MAX_CONNECT_PER_ROUTE = 100;
private String schema; // http | https
private String host; // 主机地址,如果是集群: 主机名加分号分隔
private int port; // 端口号
private String authUser;
private String authPassword;
private RestClientBuilder builder;
private RestClient restClient;
private RestHighLevelClient restHighLevelClient;
private EsClient(String url) {
this(url, null, null);
}
private EsClient(String url, String username, String password) {
if (StringUtils.isBlank(url)) {
throw new IllegalArgumentException("url may not be blank");
}
String hostText = url;
String schema = null;
int schemeIdx = url.indexOf("://");
if (schemeIdx > 0) {
schema = url.substring(0, schemeIdx);
hostText = url.substring(schemeIdx + 3);
}
int port = 80;
int portIdx = hostText.lastIndexOf(":");
if (portIdx > 0) {
try {
port = Integer.parseInt(hostText.substring(portIdx + 1));
} catch (NumberFormatException var7) {
throw new IllegalArgumentException("Invalid HTTP host: " + hostText);
}
hostText = hostText.substring(0, portIdx);
}
this.init(schema, hostText, port, username, password);
}
private EsClient(String schema, String host, int port) {
this(schema, host, port, null, null);
}
private EsClient(String schema, String host, int port, String username, String password) {
this.init(schema, host, port, username, password);
}
private void init(String schema, String host, int port, String username, String password) {
this.schema = StringUtils.isNotBlank(schema) ? schema : SCHEMA_HTTP;
this.host = host;
this.port = port;
this.authUser = username;
this.authPassword = password;
String hostList = this.host;
String[] split = hostList.split(";");
HttpHost[] hostArray = new HttpHost[split.length];
for (int i = 0; i < split.length; i++) {
hostArray[i] = new HttpHost(split[i], this.port, this.schema);
}
// 设置超时时间1小时
this.builder = RestClient.builder(hostArray).setMaxRetryTimeoutMillis(60 * 60 * 1000);
if (uniqueConnectTimeConfig) {
setConnectTimeOutConfig(this.builder);
}
if (uniqueConnectNumConfig) {
setMultiConnectConfig(this.builder);
}
// 设置账户密码
if (StringUtils.isNotBlank(this.authUser, this.authPassword)) {
setDefaultCredentialsProvider(this.builder, this.authUser, this.authPassword);
}
this.restClient = builder.build();
this.restHighLevelClient = new RestHighLevelClient(builder);
logger.info("build es client success: schema = {}, host = {}, port = {}", this.schema, this.host, this.port);
}
private static String buildEsClientUniqueId(String schema, String host, int port, String authUser, String authPassword) {
return (StringUtils.isNotBlank(schema) ? schema : SCHEMA_HTTP) + "|" + host + "|" + port + "|" + authUser + "|" + authPassword;
}
private String getEsClientUniqueId() {
return buildEsClientUniqueId(this.schema, this.host, this.port, this.authUser, this.authPassword);
}
public static synchronized EsClient getEsClient(String schema, String host, int port) {
return getEsClient(schema, schema, port, null, null);
}
public static synchronized EsClient getEsClient(String schema, String host, int port, String authUser, String authPassword) {
String uniqueId = buildEsClientUniqueId(schema, host, port, authUser, authPassword);
if (!clientMap.containsKey(uniqueId)) {
clientMap.put(uniqueId, buildEsClient(schema, host, port));
}
return clientMap.get(uniqueId);
}
public static EsClient buildEsClient(String url) {
return new EsClient(url);
}
public static EsClient buildEsClient(String url, String user, String password) {
return new EsClient(url, user, password);
}
public static EsClient buildEsClient(String schema, String host, int port) {
return new EsClient(schema, host, port);
}
public static EsClient buildEsClient(String schema, String host, int port, String user, String password) {
return new EsClient(schema, host, port, user, password);
}
/**
* 配置账号密码
*/
private static void setDefaultCredentialsProvider(RestClientBuilder builder, String authUser, String authPassword) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(authUser, authPassword));
builder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpClientBuilder;
});
}
/**
* 主要关于异步httpclient的连接延时配置
*/
private static void setConnectTimeOutConfig(RestClientBuilder builder) {
builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(
RequestConfig.Builder builder) {
builder.setConnectTimeout(CONNECT_TIME_OUT);
builder.setSocketTimeout(SOCKET_TIME_OUT);
builder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIME_OUT);
return builder;
}
});
}
/**
* 主要关于异步httpclient的连接数配置
*/
private static void setMultiConnectConfig(RestClientBuilder builder) {
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpAsyncClientBuilder) {
httpAsyncClientBuilder.setMaxConnTotal(MAX_CONNECT_NUM);
httpAsyncClientBuilder
.setMaxConnPerRoute(MAX_CONNECT_PER_ROUTE);
return httpAsyncClientBuilder;
}
});
}
public void close() {
EsClient.close(this);
}
/**
* 关闭资源
*/
public static void close(EsClient esClient) {
if (esClient != null) {
try {
String uniqueId = esClient.getEsClientUniqueId();
if (clientMap.containsKey(esClient)) {
clientMap.remove(uniqueId);
}
if (esClient.restHighLevelClient != null) {
esClient.restHighLevelClient.close();
}
if (esClient.restClient != null) {
esClient.restClient.close();
}
} catch (Exception e) {
logger.error("close esClient error.", e);
}
}
}
/**
* 单条插入(使用es自动生成的id)
*
* @param indexName
* @param object
*/
public void addWithoutId(String indexName, Object object) {
IndexRequest indexRequest = new IndexRequest(indexName, indexName);
indexRequest.source(JSON.toJSONString(object), XContentType.JSON);
try {
restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
logger.error("insert object error, index={}, object={}", indexName, JSON.toJSONString(object), e);
} catch (RuntimeException e) {
logger.error("insert object error, index={}, object={}", indexName, JSON.toJSONString(object), e);
throw e;
}
}
public void add(String indexName, EsObject esObject) {
if (esObject.getObject() == null) {
return;
}
String objectJson = null;
if (esObject.getObject() instanceof String) {
objectJson = (String) esObject.getObject();
// TODO: check is json
} else {
objectJson = JSON.toJSONString(esObject.getObject());
}
//这里原本是非的关系,但为了支持id为null的情况,改为且
if (esObject.getId() != null && esObject.getId().getBytes().length >= 512) {
logger.warn("es insert error, id is too long, must be no longer than 512 bytes. esObject : {}", JSON.toJSONString(esObject));
return;
}
try {
switch (esObject.getOpType()) {
case UPDATE:
UpdateRequest updateRequest = new UpdateRequest(indexName, indexName,
esObject.getId());
updateRequest.doc(objectJson, XContentType.JSON);
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
break;
case CREATE:
case INDEX:
IndexRequest indexRequest = new IndexRequest(indexName, indexName);
indexRequest.opType(esObject.getOpType());
if (StringUtils.isNotBlank(esObject.getId())) {
indexRequest.id(esObject.getId());
}
indexRequest.source(objectJson, XContentType.JSON);
IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
break;
}
} catch (IOException e) {
logger.error("add<{}> object error, index={}, object= " + JSON.toJSONString(esObject), esObject.getOpType().getLowercase(), indexName, e);
} catch (RuntimeException e) {
logger.error("add<{}> object error, index={}, object= " + JSON.toJSONString(esObject), esObject.getOpType().getLowercase(), indexName, e);
throw e;
}
}
public void batchDelete(String indexName, String typeName, List<String> idList) {
if (CollectionUtils.isNotEmpty(idList)) {
List<EsObject> objList = new ArrayList<>();
for (String id : idList) {
objList.add(new EsObject(id, null, EsObject.DELETE));
}
this.safeBatch(indexName, typeName, objList);
}
}
public void safeBatch(String indexName, List<EsObject> objectList) {
safeBatch(indexName, indexName, objectList);
}
public void safeBatch(String indexName, String typeName, List<EsObject> objectList) {
try {
batch(indexName, typeName, objectList);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
/**
* 支持重试的批量处理,重试次数自己指定
*
* @param indexName
* @param objectList
* @param retryCount
*/
public void batchSupportRetry(String indexName, List<EsObject> objectList, Integer retryCount) {
try {
batch(indexName, indexName, objectList);
} catch (IOException | RuntimeException e) {
logger.error(e.getMessage(), e);
} catch (ESException ese) {
logger.error(ese.getMsg(), ese);
if (retryCount == 0) {
logger.error("Batch execute index over, cause run out of retries!");
} else {
retryCount--;
logger.info("Batch execute index retry begin by count<{}>", retryCount);
batchSupportRetry(indexName, objectList, retryCount);
}
}
}
/**
* 批量插入
* <p>
* bulk 可以执行的操作类型:
* <p>
* (1)delete:删除一个文档,只要1个json串就可以了
* (2)create:PUT /index/type/id/_create,强制创建
* (3)index:普通的put操作,可以是创建文档,也可以是全量替换文档
* (4)update:执行的partial update操作
*
* @param indexName
* @param typeName
* @param objectList
*/
public void batch(String indexName, String typeName, List<EsObject> objectList) throws RuntimeException, IOException, ESException {
if (objectList != null) {
BulkRequest bulkRequest = new BulkRequest();
List<String> idList = new ArrayList<>();
int count = 1;
int size = objectList.size();
for (EsObject esObject : objectList) {
// 去掉原有的id为空的判断
if (esObject.getObject() == null && !EsObject.DELETE.equals(esObject.getOpType())) {
continue;
}
String objectJson = null;
if (esObject.getObject() instanceof String) {
objectJson = (String) esObject.getObject();
// TODO: check is json
} else {
objectJson = JSON.toJSONString(esObject.getObject());
}
//这里原本是非的关系,但为了支持id为null的情况,改为且
if (esObject.getId() != null && esObject.getId().getBytes().length >= 512) {
logger.warn("es insert error, id is too long, must be no longer than 512 bytes. esObject : {}", JSON.toJSONString(esObject));
continue;
}
switch (esObject.getOpType()) {
case DELETE:
DeleteRequest deleteRequest = new DeleteRequest(indexName, typeName, esObject.getId());
bulkRequest.add(deleteRequest);
break;
case UPDATE:
UpdateRequest updateRequest = new UpdateRequest(indexName, typeName, esObject.getId());
updateRequest.docAsUpsert(true);
updateRequest.doc(objectJson, XContentType.JSON);
bulkRequest.add(updateRequest);
break;
case CREATE:
case INDEX:
default:
IndexRequest indexRequest = new IndexRequest(indexName, typeName);
if (esObject.getOpType() != null) {
indexRequest.opType(esObject.getOpType());
}
if (StringUtils.isNotBlank(esObject.getId())) {
indexRequest.id(esObject.getId());
}
indexRequest.source(objectJson, XContentType.JSON);
bulkRequest.add(indexRequest);
break;
}
idList.add(esObject.getId());
// 每5000条批量插入一次
if (count % 5000 == 0 || count == size) {
long start = System.currentTimeMillis();
try {
BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
if (response.hasFailures()) {
//取第一个返回错误的抛出
Iterator<BulkItemResponse> responseIt = response.iterator();
while (responseIt.hasNext()) {
BulkItemResponse responseItem = responseIt.next();
if (responseItem.isFailed()) {
logger.error("batch insert to index<{}> error: {}, cost = {}ms", responseItem.getIndex(), responseItem.getFailureMessage(), (System.currentTimeMillis() - start));
// throw new ESException(responseItem.getFailureMessage(), responseItem.getFailure().getStatus().getStatus());
}
}
} else {
logger.info("batch insert to index<{}> process ( {} / {} ) response status: {}, cost = {}ms", indexName, count, size, response.status(), (System.currentTimeMillis() - start));
}
} catch (IOException e) {
logger.error("batch insert error idList: {}", JSON.toJSONString(idList));
logger.error("batch insert error, index={}, type={}, cost = {}ms", indexName, typeName, (System.currentTimeMillis() - start), e);
throw e;
} catch (RuntimeException e) {
logger.error("batch insert error idList: {}", JSON.toJSONString(idList));
logger.error("batch insert error, index={}, type={}, cost = {}ms", indexName, typeName, (System.currentTimeMillis() - start), e);
throw e;
}
bulkRequest = new BulkRequest();
idList = new ArrayList<>();
}
count++;
}
}
}
/**
* 单条删除
*
* @param indexName
* @param id
*/
public void delete(String indexName, String id) {
DeleteRequest deleteRequest = new DeleteRequest(indexName, indexName, id);
try {
DeleteResponse response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
logger.error("delete id<{}> from {} error.", id, indexName, e);
}
}
/**
* 单条删除
*
* @param indexName
* @param id
*/
public void delete(String indexName, String indexType, String id) {
DeleteRequest deleteRequest = new DeleteRequest(indexName, indexType, id);
try {
DeleteResponse response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
logger.error("delete id<{}> from {} - {} error.", id, indexName, indexType, e);
}
}
public void deleteByQuery(String indexName, QueryBuilder queryBuilder) {
deleteByQuery(indexName, queryBuilder, 0);
}
/**
* 根据查询删除数据
*
* @param indexName
* @param queryBuilder
* @param scroll
*/
public void deleteByQuery(String indexName, QueryBuilder queryBuilder, int scroll) {
String requestBody = null;
try {
if (null == queryBuilder) {
return;
}
requestBody = queryBuilder.toString();
if (StringUtils.isBlank(requestBody)) {
return;
}
requestBody = "{\"query\":" + requestBody + "}";
String endpoint = "/" + indexName + "/_delete_by_query?conflicts=proceed";
if (scroll != 0) {
endpoint += "&scroll_size=" + scroll;
}
Request request = new Request(RestRequest.Method.POST.toString(), endpoint);
request.setEntity(new NStringEntity(requestBody, ContentType.APPLICATION_JSON));
Response response = restHighLevelClient.getLowLevelClient().performRequest(request);
logger.info("delete success, response {} ", EntityUtils.toString(response.getEntity()));
} catch (Exception e) {
logger.error("delete index<{}> from {} {}error.", indexName, requestBody, e.getMessage());
}
}
/**
* 查询是否存在field=value的document
*/
public Boolean checkExist(String indexName, String field, Object... value) {
if (StringUtils.isNotBlank(indexName) && StringUtils.isNotBlank(field) && value.length > 0) {
try {
return searchCount(indexName, QueryBuilders.termsQuery(field, value)) > 0;
} catch (Exception e) {
logger.error("checkExist field: {} = {} in {} error.", field, value, indexName, e);
}
}
return false;
}
/**
* 根据字段值列表查询(in)
*
* @author xupeng
* @date: 2019-01-02
*/
public List<Map<String, Object>> findByValueList(String indexName, String field, Object value) {
List<Object> valueList = new ArrayList<>();
if (value != null) {
valueList.add(value);
}
return findByValueList(indexName, field, valueList);
}
/**
* 不支持回调的查询方法
*
* @param indexName
* @param queryBuilder
* @return
*/
public List<Map<String, Object>> search(String indexName, QueryBuilder queryBuilder) {
return search(indexName, queryBuilder, null, null, 5000, null);
}
/**
* 直接返回具体对象
*
* @param indexName
* @param queryBuilder
* @param clazz
* @param <T>
* @return
*/
public <T> List<T> search(String indexName, QueryBuilder queryBuilder, Class<T> clazz) {
return ObjectUtils.listToObjects(search(indexName, queryBuilder), clazz);
}
public List<Map<String, Object>> search(String indexName, QueryBuilder queryBuilder, EsSearchCallback callback) {
return search(indexName, queryBuilder, null, callback, 5000, null);
}
public <T> List<T> search(String indexName, QueryBuilder queryBuilder, EsSearchCallback callback, Class<T> clazz) {
return ObjectUtils.listToObjects(search(indexName, queryBuilder, callback), clazz);
}
public <T> List<T> search(String indexName, QueryBuilder queryBuilder, EsSearchCallback callback, int limit, Class<T> clazz) {
return ObjectUtils.listToObjects(search(indexName, queryBuilder, null, callback, limit, null), clazz);
}
public List<Map<String, Object>> search(String indexName, QueryBuilder queryBuilder, EsSearchCallback callback, int limit) {
return search(indexName, queryBuilder, null, callback, limit, null);
}
public List<Map<String, Object>> search(String indexName, QueryBuilder queryBuilder, String[] storeFields) {
return search(indexName, queryBuilder, storeFields, null, 5000, null);
}
public <T> List<T> search(String indexName, QueryBuilder queryBuilder, String[] storeFields, Class<T> clazz) {
return ObjectUtils.listToObjects(search(indexName, queryBuilder, storeFields, null, 5000, null), clazz);
}
public <T> List<T> search(String indexName, QueryBuilder queryBuilder, String[] storeFields, SortBuilder[] sortBuilders, Class<T> clazz) {
return ObjectUtils.listToObjects(search(indexName, queryBuilder, storeFields, null, 5000, sortBuilders), clazz);
}
/**
* 根据QueryBuilder查询,直接回调函数中直接处理数据
*
* @author xupeng
* @date: 2019-01-25
*/
public List<Map<String, Object>> search(String indexName, QueryBuilder queryBuilder, String[] storeFields, EsSearchCallback callback, int limit, SortBuilder[] sortBuilders) {
List<Map<String, Object>> resultList = new ArrayList<>();
List<String> scrollIdList = new ArrayList<>();
if (StringUtils.isNotBlank(indexName) && queryBuilder != null) {
try {
SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.searchSource().query(queryBuilder);
if (null != storeFields && storeFields.length != 0) {
searchSourceBuilder.fetchSource(storeFields, null);
}
if (null != sortBuilders && sortBuilders.length != 0) {
for (SortBuilder sortBuilder : sortBuilders) {
searchSourceBuilder.sort(sortBuilder);
}
}
searchSourceBuilder.size(limit);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.searchType(SearchType.QUERY_THEN_FETCH);
searchRequest.source(searchSourceBuilder);
Scroll scroll = new Scroll(TimeValue.timeValueSeconds(60));
searchRequest.scroll(scroll);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
long count = 0;
while (searchResponse != null) {
// 记录scrollId
String scrollId = searchResponse.getScrollId();
if (StringUtils.isNotBlank(scrollId) && !scrollIdList.contains(scrollId)) {
scrollIdList.add(scrollId);
}
// 获取上次查询的结果
if (searchResponse.getHits().getHits().length > 0) {
count = count + searchResponse.getHits().getHits().length;
searchResponse.getHits().forEach(documentFields -> {
Map map = documentFields.getSourceAsMap();
map.put("_id", documentFields.getId());
map.put("es_id", documentFields.getId());
resultList.add(documentFields.getSourceAsMap());
});
if (null != callback) {
try {
callback.callback(resultList);
//清空resultList
resultList.clear();
} catch (Exception e) {
logger.error("search index<{}> EsSearchCallback process error : {}", indexName, e.getMessage());
}
}
// 执行下一次查询
if (StringUtils.isNotBlank(scrollId)
&& searchResponse.getHits().getTotalHits() > count) {
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
scrollRequest.scroll(scroll);
searchResponse = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
} else {
break;
}
} else {
// 没有结果结束查询
break;
}
}
} catch (Exception e) {
logger.error("search index<{}> error, QueryBuilder = {} .", indexName, JSON.toJSONString(queryBuilder), e);
} finally {
clearScroll(scrollIdList);
}
}
return resultList;
}
/**
* 根据QueryBuilder查询,查询limit限制的数据条数,不支持排序
*
* @param indexName
* @param queryBuilder
* @param limit
* @return
*/
public List<Map<String, Object>> search(String indexName, QueryBuilder queryBuilder, int limit) {
return search(indexName, queryBuilder, limit, null);
}
public <T> List<T> search(Class<T> clazz, String indexName, QueryBuilder queryBuilder, int limit) {
return ObjectUtils.listToObjects(search(indexName, queryBuilder, limit), clazz);
}
public <T> List<T> search(Class<T> clazz, String indexName, QueryBuilder queryBuilder, int limit, SortBuilder... sortBuilders) {
return ObjectUtils.listToObjects(search(indexName, queryBuilder, limit, sortBuilders), clazz);
}
/**
* 根据QueryBuilder查询,查询limit限制的数据条数,支持排序
*
* @param indexName
* @param queryBuilder
* @param limit
* @param sortBuilders
* @return
*/
public List<Map<String, Object>> search(String indexName, QueryBuilder queryBuilder, int limit, SortBuilder... sortBuilders) {
List<Map<String, Object>> resultList = new ArrayList<>();
if (StringUtils.isNotBlank(indexName) && queryBuilder != null) {
SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.searchSource().query(queryBuilder);
searchSourceBuilder.size(limit);
if (null != sortBuilders && sortBuilders.length != 0) {
for (SortBuilder sortBuilder : sortBuilders) {
searchSourceBuilder.sort(sortBuilder);
}
}
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.searchType(SearchType.QUERY_THEN_FETCH);
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
if (searchResponse == null || searchResponse.getHits().getHits().length < 0) {
return resultList;
}
searchResponse.getHits().forEach(documentFields -> {
// 直接使用source返回的map会丢失es_id
Map<String, Object> sourceAsMap = documentFields.getSourceAsMap();
sourceAsMap.put("_id", documentFields.getId());
sourceAsMap.put("es_id", documentFields.getId());
resultList.add(sourceAsMap);
});
} catch (IOException e) {
logger.error("search index<{}> error, QueryBuilder = {} .", indexName, JSON.toJSONString(queryBuilder), e);
}
return resultList;
}
return resultList;
}
public Map<String, Object> getOne(String indexName, QueryBuilder queryBuilder, SortBuilder... sortBuilders) {
List<Map<String, Object>> list = search(indexName, queryBuilder, 1, sortBuilders);
if (CollectionUtils.isNotEmpty(list)) {
return list.get(0);
}
return null;
}
public <T> T getOne(Class<T> clazz, String indexName, QueryBuilder queryBuilder, SortBuilder... sortBuilders) {
try {
return ObjectUtils.mapToObjectWithJSON(getOne(indexName, queryBuilder, sortBuilders), clazz);
} catch (Exception e) {
logger.error(e.getMessage(), e);
return null;
}
}
public Map<String, Object> getById(String indexName, String id) {
return getOne(indexName, QueryBuilders.idsQuery().addIds(id));
}
public <T> T getById(String indexName, String id, Class<T> clazz) {
try {
return ObjectUtils.mapToObject(getById(indexName, id), clazz);
} catch (Exception e) {
logger.error(e.getMessage(), e);
return null;
}
}
/**
* 根据QueryBuilder查询
*
* @author xupeng
* @date: 2019-01-25
*/
public long searchCount(String indexName, QueryBuilder queryBuilder) {
if (StringUtils.isNotBlank(indexName) && queryBuilder != null) {
try {
SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.searchSource().query(queryBuilder);
searchSourceBuilder.size(0);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.searchType(SearchType.QUERY_THEN_FETCH);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
if (searchResponse != null) {
return searchResponse.getHits().getTotalHits();
}
} catch (Exception e) {
logger.error("search index<{}> count error, QueryBuilder = {} .", indexName, JSON.toJSONString(queryBuilder), e);
}
}
return 0;
}
/**
* 根据字段值列表查询(in)
*
* @author xupeng
* @date: 2019-01-25
*/
public List<Map<String, Object>> findByValueList(String indexName, String field, List<Object> valueList) {
if (StringUtils.isNotBlank(indexName) && StringUtils.isNotBlank(field)
&& valueList != null && !valueList.isEmpty()) {
// 剔除value里的null
List<Object> fixValueList = new ArrayList<>();
valueList.forEach(value -> {
if (value != null && (!(value instanceof String) || ((String) value).trim().length() > 0)) {
fixValueList.add(value);
}
});
if (fixValueList != null && !fixValueList.isEmpty()) {
QueryBuilder queryBuilder = QueryBuilders.termsQuery(field, fixValueList);
return search(indexName, queryBuilder);
}
}
return null;
}
public <T> List<T> findListByValueList(String indexName, String field, List<Object> valueList, Class<T> clazz) {
List<T> resultList = new ArrayList<>();
List<Map<String, Object>> mapList = findByValueList(indexName, field, valueList);
if (CollectionUtils.isNotEmpty(mapList)) {
mapList.forEach(map -> {
resultList.add(JSON.parseObject(JSON.toJSONString(map), clazz));
});
}
return resultList;
}
/**
* 清除滚动ID
*/
public boolean clearScroll(List<String> scrollIdList) {
if (scrollIdList != null && !scrollIdList.isEmpty()) {
try {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.scrollIds(scrollIdList);
ClearScrollResponse clearScrollResponse = restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
return clearScrollResponse.isSucceeded();
} catch (Exception e) {
logger.error("clearScroll error: {} ", JSON.toJSONString(scrollIdList), e);
}
}
return true;
}
/**
* 清除滚动ID
*/
public boolean clearScroll(String scrollId) {
if (StringUtils.isNotBlank(scrollId)) {
try {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
ClearScrollResponse clearScrollResponse = restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
return clearScrollResponse.isSucceeded();
} catch (Exception e) {
logger.error("clearScroll error: {} ", scrollId, e);
}
}
return true;
}
}
package com.secoo.so.suggest.es;
import org.elasticsearch.action.DocWriteRequest;
import java.io.Serializable;
public class EsObject implements Serializable {
private static final long serialVersionUID = -3593470306368703625L;
public static final DocWriteRequest.OpType INDEX = DocWriteRequest.OpType.INDEX;
public static final DocWriteRequest.OpType CREATE = DocWriteRequest.OpType.CREATE;
public static final DocWriteRequest.OpType UPDATE = DocWriteRequest.OpType.UPDATE;
public static final DocWriteRequest.OpType DELETE = DocWriteRequest.OpType.DELETE;
private String id;
private Object object;
private DocWriteRequest.OpType opType = DocWriteRequest.OpType.INDEX;
public EsObject() {
}
public EsObject(String id, Object object) {
this.id = id;
this.object = object;
}
/**
* DocWriteRequest.OpType.INDEX : 覆盖更新
* DocWriteRequest.OpType.UPDATE : 只更新发送的字段
*
* @author xupeng
* @date: 2019-01-23
*/
public EsObject(String id, Object object, DocWriteRequest.OpType opType) {
this.id = id;
this.object = object;
this.opType = opType;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Object getObject() {
return object;
}
public void setObject(Object object) {
this.object = object;
}
public DocWriteRequest.OpType getOpType() {
return opType;
}
public void setOpType(DocWriteRequest.OpType opType) {
this.opType = opType;
}
}
package com.secoo.so.suggest.es;
import java.util.List;
import java.util.Map;
/**
*
* 分页查询时可以使用的回调接口
**/
public interface EsSearchCallback {
/**
* 分页查询时的callback,如果有该实现,search接口最终将返回空数据
* @param results
*/
public void callback(List<Map<String, Object>> results);
}
......@@ -8,6 +8,8 @@ import com.secoo.so.suggest.entity.BrandInfo;
import com.secoo.so.suggest.entity.CategoryInfo;
import com.secoo.so.suggest.entity.EsSuggestKeywordInfo;
import com.secoo.so.suggest.entity.SearchKeywordInfo;
import com.secoo.so.suggest.es.EsClient;
import com.secoo.so.suggest.es.EsObject;
import com.secoo.so.suggest.util.*;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
......@@ -194,8 +196,7 @@ public class SuggestTask {
Long warningCount = ConfigUtil.getLong("suggestTask.searchWordWarningCount", 1000000);
if (count < warningCount) {
log.warn("search word count is too little: count={}, warningCount={}, send warning", count, warningCount);
List<String> phones = StringUtils.splitToList(ConfigUtil.getString("suggestTask.warningPhones"), ",");
FeiShuUtil.sendMessage("suggest-task异常", "搜索词数量过低,不执行索引", phones);
FeiShuUtil.sendMessage("suggest-task异常", "搜索词数量过低,不执行索引");
return;
}
......@@ -221,8 +222,9 @@ public class SuggestTask {
// 过滤词
List<EsSuggestKeywordInfo> suggestKeywordInfoList = new ArrayList<>();
int processCount = 0;
int totalCount = esSuggestKeywordMap.values().size();
int totalCount = esSuggestKeywordMap.values().size();
for (EsSuggestKeywordInfo suggestKeywordInfo : esSuggestKeywordMap.values()) {
// 不过滤的suggest词,计算分值写es
if (!isFilterSuggestKeyword(suggestKeywordInfo)) {
......@@ -242,10 +244,12 @@ public class SuggestTask {
}
// 保存到es
// saveSuggestKeywordToEs(suggestKeywordInfoList);
saveSuggestKeywordToEs(suggestKeywordInfoList);
// for test
saveSuggestKeywordToFile(suggestKeywordInfoList);
if ("true".equalsIgnoreCase(System.getProperty("suggest.saveToFile"))) {
// for test
saveSuggestKeywordToFile(suggestKeywordInfoList);
}
}
/**
......@@ -305,6 +309,28 @@ public class SuggestTask {
private static void saveSuggestKeywordToEs(List<EsSuggestKeywordInfo> suggestKeywordInfoList) {
if (CollectionUtils.isNotEmpty(suggestKeywordInfoList)) {
String esUrl = ConfigUtil.getString("suggestTask.es.url");
String esUser = ConfigUtil.getString("suggestTask.es.user");
String esPassword = ConfigUtil.getString("suggestTask.es.password");
String esIndex = ConfigUtil.getString("suggestTask.es.index");
String esType = ConfigUtil.getString("suggestTask.es.type");
int esBatchSize = ConfigUtil.getInt("suggestTask.es.batchSize", 2000);
EsClient esClient = EsClient.buildEsClient(esUrl, esUser, esPassword);
List<List<EsSuggestKeywordInfo>> subLists = CollectionUtils.splitList(suggestKeywordInfoList, esBatchSize);
for (List<EsSuggestKeywordInfo> subList : subLists) {
List<EsObject> esList = new ArrayList<>();
for (EsSuggestKeywordInfo esSuggestKeywordInfo : subList) {
esList.add(new EsObject(StringUtils.md5(esSuggestKeywordInfo.getKeyword()), esSuggestKeywordInfo));
}
try {
esClient.batch(esIndex, esType, esList);
} catch (Exception e) {
log.error("saveSuggestKeywordToEs error", e);
FeiShuUtil.sendMessage("suggest-task save to es 异常", "suggest-task save to es 异常");
}
}
}
}
......@@ -349,8 +375,10 @@ public class SuggestTask {
return true;
}
// 过滤掉太长的词
if (suggestKeywordInfo.getKeyword().length() <= 1 || suggestKeywordInfo.getKeyword().length() > 50) {
// 过滤掉太短、太长的词
if (StringUtils.isBlank(suggestKeywordInfo.getKeyword())
|| suggestKeywordInfo.getKeyword().length() <= 1
|| StringUtils.getByteLength(suggestKeywordInfo.getKeyword()) > 50) {
return true;
}
......@@ -414,10 +442,10 @@ public class SuggestTask {
private static void processEsSuggestKeywordInfo(EsSuggestKeywordInfo suggestKeywordInfo, SearchKeywordInfo searchKeywordInfo) {
// 年点击加购率
suggestKeywordInfo.setYearClickRatio(CalculateUtils.calculateRatio(suggestKeywordInfo.getYearClickCount(), suggestKeywordInfo.getYearCount()));
suggestKeywordInfo.setYearCartRatio(CalculateUtils.calculateRatio(suggestKeywordInfo.getYearCartCount(), suggestKeywordInfo.getYearCount()).floatValue());
suggestKeywordInfo.setYearCartRatio(CalculateUtils.calculateRatio(suggestKeywordInfo.getYearCartCount(), suggestKeywordInfo.getYearCount()));
// 周点击加购率
suggestKeywordInfo.setWeekClickRatio(CalculateUtils.calculateRatio(suggestKeywordInfo.getWeekClickCount().intValue(), suggestKeywordInfo.getWeekCount().intValue()).floatValue());
suggestKeywordInfo.setWeekClickRatio(CalculateUtils.calculateRatio(suggestKeywordInfo.getWeekClickCount().intValue(), suggestKeywordInfo.getWeekCount().intValue()));
suggestKeywordInfo.setWeekCartRatio(CalculateUtils.calculateRatio(suggestKeywordInfo.getWeekCartCount().intValue(), suggestKeywordInfo.getWeekCount().intValue()));
......@@ -443,11 +471,11 @@ public class SuggestTask {
}
private static void calculateWordRank(EsSuggestKeywordInfo suggestKeywordInfo) {
public static Double calculateWordRank(EsSuggestKeywordInfo suggestKeywordInfo) {
Double wordRank = 10000.0;
// 长度因子
wordRank += 3000 * CalculateUtils.calculateLengthFactor(suggestKeywordInfo.getKeyword().length());
wordRank += 3000 * CalculateUtils.calculateLengthFactor(StringUtils.getByteLength(suggestKeywordInfo.getKeyword()));
// 年数量因子
wordRank += 2000 * CalculateUtils.calculateCountFactor(suggestKeywordInfo.getYearCount(), 1);
// 周数量因子
......@@ -475,9 +503,10 @@ public class SuggestTask {
}
suggestKeywordInfo.setWordRank(wordRank);
return wordRank;
}
private static void calculateWordABRank(EsSuggestKeywordInfo suggestKeywordInfo, SearchKeywordInfo searchKeywordInfo) {
public static Double calculateWordABRank(EsSuggestKeywordInfo suggestKeywordInfo, SearchKeywordInfo searchKeywordInfo) {
// 月点击加购率
Double monthClickRatio = CalculateUtils.calculateRatio(searchKeywordInfo.getMonthProductClickUv().intValue(), searchKeywordInfo.getMonthUv().intValue());
......@@ -510,7 +539,7 @@ public class SuggestTask {
Double wordABRank = 10000.0;
// 长度因子
wordABRank += 3000 * CalculateUtils.calculateLengthFactor(suggestKeywordInfo.getKeyword().length());
wordABRank += 3000 * CalculateUtils.calculateLengthFactor(StringUtils.getByteLength(suggestKeywordInfo.getKeyword()));
// 月数量因子
wordABRank += 2000 * CalculateUtils.calculateCountFactor(searchKeywordInfo.getMonthUv().intValue(), 4);
......@@ -542,7 +571,8 @@ public class SuggestTask {
if (suggestKeywordInfo.getIsManual() && suggestKeywordInfo.getManualValue() > 0) {
wordABRank *= Math.sqrt(suggestKeywordInfo.getManualValue() * 1.0);
}
suggestKeywordInfo.setWordABRank(wordABRank.floatValue());
suggestKeywordInfo.setWordABRank(wordABRank);
return wordABRank;
}
......@@ -560,7 +590,7 @@ public class SuggestTask {
return;
}
// 新词加分大小 类似于 人工干预值
suggestKeywordInfo.setWordABRank(new Double(suggestKeywordInfo.getWordABRank() * Math.sqrt(5.0)).floatValue());
suggestKeywordInfo.setWordABRank(new Double(suggestKeywordInfo.getWordABRank() * Math.sqrt(5.0)));
}
......@@ -585,6 +615,7 @@ public class SuggestTask {
@Override
public void run() {
List<SearchKeywordInfo> searchKeywordInfoList = DwDataSource.querySearchKeywordInfoList(startId, endId);
log.info("start process startId:{}, endId:{}, count:{}", startId, endId, searchKeywordInfoList.size());
if (CollectionUtils.isNotEmpty(searchKeywordInfoList)) {
processSearchKeyword(this.esSuggestKeywordMap, searchKeywordInfoList, startTime);
}
......
package com.secoo.so.suggest.util;
import com.alibaba.fastjson.JSON;
import com.secoo.so.suggest.config.ConfigUtil;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
......@@ -31,6 +32,9 @@ public class FeiShuUtil {
private static ExecutorService executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
public static void sendMessage(String title, String message) {
sendMessage(title, message, null);
}
public static void sendMessage(String title, String message, List<String> phones) {
if (StringUtils.isBlank(message)) {
......@@ -44,6 +48,10 @@ public class FeiShuUtil {
params.put("body", Collections.singletonList(message));
params.put("phones", phones);
if(CollectionUtils.isEmpty(phones)){
phones = StringUtils.splitToList(ConfigUtil.getString("suggestTask.warningPhones"), ",");
}
final String fTitle = title;
final String fMessage = message;
final String fPhones = StringUtils.join(phones, ",");
......
......@@ -1466,6 +1466,41 @@ public final class ObjectUtils {
return true;
}
/**
* 集合Map转具体对象集合
*
* @param sourceList
* @param clazz
* @param <T>
* @return
*/
public static <T> List<T> listToObjects(List<Map<String, Object>> sourceList, Class<T> clazz) {
if (CollectionUtils.isEmpty(sourceList)) {
return new ArrayList<>();
}
try {
String json = JSON.toJSONString(sourceList);
return JSON.parseArray(json, clazz);
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ArrayList<>();
}
}
public static <T> T mapToObjectWithJSON(Map<String, Object> sourceMap, Class<T> calzz) {
if (CollectionUtils.isEmpty(sourceMap)) {
return null;
}
try {
String json = JSON.toJSONString(sourceMap);
return JSON.parseObject(json, calzz);
} catch (Exception e) {
logger.error(e.getMessage(), e);
return null;
}
}
public static void main(String[] args) {
/*String json = "{\"name\":\"TMev344\",\"cluster_name\":\"elasticsearch\",\"cluster_uuid\":\"cRwLaMCuRkmZvpnYrcQ6mQ\",\"version\":{\"number\":\"6.5.0\",\"build_flavor\":\"default\",\"build_type\":\"zip\",\"build_hash\":\"816e6f6\",\"build_date\":\"2018-11-09T18:58:36.352602Z\",\"build_snapshot\":false,\"lucene_version\":\"7.5.0\",\"minimum_wire_compatibility_version\":\"5.6.0\",\"minimum_index_compatibility_version\":\"5.0.0\"},\"tagline\":\"You Know, for Search\"}";
Map<String, String> paramMap = parseJsonStrToAbsolutePathKeyMap(json);
......
......@@ -8,6 +8,8 @@ import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.regex.Matcher;
......@@ -2035,4 +2037,68 @@ public abstract class StringUtils {
return false;
}
/**
* 计算中英文字符串的字节长度 <br/>
* 一个中文占3个字节
*
* @param str
* @return int 字符串的字节长度
*/
public static int getByteLength(String str) {
return getByteLength(str, "UTF-8");
}
/**
* 计算中英文字符串的字节长度 <br/>
* 一个中文占3个字节
*
* @param str
* @return int 字符串的字节长度
*/
public static int getByteLength(String str, String charset) {
if (str == null || str.length() == 0) {
return 0;
}
try {
return str.getBytes(charset).length;
} catch (UnsupportedEncodingException e) {
System.out.println("计算中英文字符串的字节长度失败,");
e.printStackTrace();
}
return 0;
}
/**
* 32位md5加密
*/
public static String md5(String str) {
if (str == null) {
return null;
}
String result = "";
try {
MessageDigest md5 = MessageDigest.getInstance("MD5");
md5.update(str.getBytes("UTF-8"));
byte b[] = md5.digest();
StringBuffer buf = new StringBuffer("");
int i = 0;
for (int offset = 0; offset < b.length; offset++) {
i = b[offset];
if (i < 0) {
i += 256;
}
if (i < 16) {
buf.append("0");
}
buf.append(Integer.toHexString(i));
}
result = buf.toString();
} catch (NoSuchAlgorithmException | UnsupportedEncodingException e) {
System.out.println("encode md5 error");
e.printStackTrace();
}
return result;
}
}
......@@ -4,7 +4,13 @@ suggestTask.ManualFolder=/data/pssmaster/corpus_set/suggest_corpus/manual
suggestTask.SensitiveFolder=/data/pssmaster/corpus_set/suggest_corpus/sensitive
suggestTask.EuropeWordFolder=/data/pssmaster/corpus_set/suggest_corpus/europe_word
suggestTask.batchSize=10000
suggestTask.threadPoolSize=20
suggestTask.threadPoolSize=10
suggestTask.searchWordWarningCount=1000000
suggestTask.suggestTagMaxSize=5
suggestTask.warningPhones=13426233960
suggestTask.es.url=http://bigdataescluster.secoolocal.com:9200
suggestTask.es.user=search
suggestTask.es.password=search5z0NvEn1D
suggestTask.es.index=search_suggest_index
suggestTask.es.type=search_suggest_type
suggestTask.es.batchSize=2000
......@@ -6,4 +6,10 @@ suggestTask.EuropeWordFolder=/data/pssmaster/corpus_set/suggest_corpus/europe_wo
suggestTask.batchSize=10000
suggestTask.threadPoolSize=10
suggestTask.suggestTagMaxSize=5
suggestTask.searchWordWarningCount=1000000
\ No newline at end of file
suggestTask.searchWordWarningCount=1000000
suggestTask.es.url=http://10.0.254.139:9200
suggestTask.es.user=suggest
suggestTask.es.password=suggest456
suggestTask.es.index=search_suggest_index
suggestTask.es.type=search_suggest_type
suggestTask.es.batchSize=2000
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment