Commit b406349b by zhaoyanchao

1. 空格分隔后的词是相同的集合的词, 在存入es 时进行合并。 如:“nike 鞋 男” 和 “nike 男 鞋” 合并为同一记录

2. 效率考虑,对于空格分隔后长度大于5的集合不做处理,6个词的可能排列为6!, 120 个,效率较低,出现概率也较低
parent 3b53306e
package main
import (
"container/list"
"strings"
"github.com/mozillazg/go-pinyin"
"fmt"
"strconv"
)
type ENV struct {
DataWareDB string
ErpDB string
......@@ -28,4 +36,59 @@ var prod_env = &ENV {
SensitiveFolder: "/data/pssmaster/corpus_set/suggest_corpus/sensitive"}
var RUN_ENV = prod_env
\ No newline at end of file
var RUN_ENV = test_env
/************************* 下面是 util 方法 *****************************/
// 求阶乘
func factorial(n int64) int64 {
if n == 1 { return 1}
return n * factorial(n-1)
}
// 求数组的全排列, 放置到list 中
func permutation(arr [] string, begin int, lst *list.List) {
if begin == len(arr) {
lst.PushBack(strings.Join(arr, " "))
return
}
for i := begin; i < len(arr); i++ {
tmp := arr[begin]
arr[begin] = arr[i]
arr[i] = tmp
permutation(arr, begin +1,lst)
arr[i] = arr[begin]
arr[begin] = tmp
}
}
func convertToPinyin(str string) string {
var ret string
for _, v := range str {
s := strings.Join(pinyin.LazyPinyin(string(v), pinyin.NewArgs()), "")
if len(s) > 0 {
ret += s
} else {
ret += string(v)
}
}
return ret
}
// traditional chinese to simple chinese
// chinese trim
// english remove redudent blank char
func cleanKeyword(keyword string) string {
out, err := t2s.Convert(keyword)
if err != nil { fmt.Println(err) }
keyword = strings.TrimSpace(strings.ToLower(out))
return strings.Join(strings.Fields(keyword)," ")
}
func strToInt(str string) int32 {
if str == "\\N" || str == "" { return 0 }
v, err := strconv.ParseInt(str, 10, 64)
if err != nil { fmt.Println(err) }
return int32(v)
}
\ No newline at end of file
package main
import (
"bufio"
"context"
"crypto/md5"
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/liuzl/gocc"
"github.com/olivere/elastic"
"io"
"io/ioutil"
"log"
"math"
"os"
"strings"
"sync"
"time"
"unicode"
"unicode/utf8"
"container/list"
)
type Word struct {
Keyword string `json:"keyword"`
KeywordPinYin string `json:"keywordPinYin"`
YearClickCount int32 `json:"yearClickCount"`
YearCartCount int32 `json:"yearCartCount"`
YearCount int32 `json:"yearCount"`
WeekClickCount int32 `json:"weekClickCount"`
WeekCartCount int32 `json:"weekCartCount"`
WeekCount int32 `json:"weekCount"`
YearClickRatio float64 `json:"yearClickRatio"`
YearCartRatio float64 `json:"yearCartRatio"`
WeekClickRatio float64 `json:"weekClickRatio"`
WeekCartRatio float64 `json:"weekCartRatio"`
IsBrand bool `json:"isBrand"`
IsCategory bool `json:"isCategory"`
IsManual bool `json:"isManual"`
IsSensitive bool `json:"isSensitive"`
ManualValue int32 `json:"manualValue"`
WordRank float64 `json:"wordRank"`
KeywordVersion string `json:"keywordVersion"`
}
var wordMap sync.Map
var brandMap = make(map[string]int)
var categoryMap = make(map[string]int)
var manualMap = make(map[string]int32)
var sensitiveMap = make(map[string]bool)
var now = time.Now()
var dateStr = fmt.Sprintf("%d-%d-%d",now.Year(),now.Month(),now.Day())
var t2s, _ = gocc.New("t2s")
var prefixFilterArr = []string{"https://", "http://", "dg", "d & g", "dolce&gabbana",
"dolce & gabbana", "杜嘉班纳", "避孕", "情趣", "cucci", "乒乓球", "cuccl", "gucii"}
// 记录是否是在 读历史记录 中, 读完后设为false
var readhistory = true
func main() {
startTime := time.Now()
datawareDB, err := sql.Open("mysql", RUN_ENV.DataWareDB)
if err != nil { log.Print(err.Error()) }
datawareDB.SetConnMaxLifetime(10*time.Minute)
datawareDB.SetMaxOpenConns(50)
datawareDB.SetMaxIdleConns(50)
var client *elastic.Client
if RUN_ENV.EsUser != "" {
client,err = elastic.NewClient(elastic.SetURL(RUN_ENV.EsInfo),elastic.SetBasicAuth(RUN_ENV.EsUser, RUN_ENV.EsPassword))
} else {
client,err = elastic.NewClient(elastic.SetURL(RUN_ENV.EsInfo))
}
if err != nil { log.Print(err.Error()) }
bulkProcessor, err := elastic.NewBulkProcessorService(client).
Workers(50).
BulkActions(5000).
FlushInterval(500*time.Millisecond).
After(after).
Do(context.Background())
if err != nil { log.Print(err.Error()) }
loadErpDB()
loadManual(RUN_ENV.ManualFolder)
loadSensitive(RUN_ENV.SensitiveFolder)
var wg sync.WaitGroup
arr := queryInfo(datawareDB)
count := arr[1] / 10000
log.Printf("maxId/10000=%d\n", count)
for i := 0; i <= count; i++ {
go queryIndex(i*10000, datawareDB, bulkProcessor, &wg)
}
wg.Wait()
fmt.Println("all thread has read maps")
readhistory = false
checkUnusedData(bulkProcessor)
err = bulkProcessor.Flush()
if err != nil { log.Print(err.Error()) }
defer datawareDB.Close()
fmt.Printf("Cost %d ms\n", time.Since(startTime).Nanoseconds()/1e6)
}
func checkUnusedData(bulkProcessor *elastic.BulkProcessor) {
var tmpMap = make(map[string]bool)
for brand := range brandMap {
tmpMap[brand] = true
}
for category := range categoryMap {
tmpMap[category] = true
}
for manual := range manualMap {
tmpMap[manual] = true
}
for word := range tmpMap {
if _, exist := wordMap.Load(word); !exist {
var w = new(Word)
w.Keyword = word
w.KeywordVersion = dateStr
addWord(w,bulkProcessor)
}
}
}
func addWord(w *Word, processor *elastic.BulkProcessor) {
processWord(w)
if !isFilterWord(w) {
wordMap.Store(w.Keyword,w)
id := fmt.Sprintf("%x", md5.Sum([]byte(w.Keyword)))
req := elastic.NewBulkIndexRequest().
Index("search_suggest_index").
Type("search_suggest_type").Id(id).Doc(w)
processor.Add(req)
}
}
func loadErpDB() {
db, err := sql.Open("mysql", RUN_ENV.ErpDB)
if err != nil { log.Print(err.Error()) }
defer db.Close()
var brandQuery = fmt.Sprintf("select id,en_name,ch_name from secooErpDB.t_product_brand where is_del = 0 and enabled = 1")
brandResults, err := db.Query(brandQuery)
if err != nil { panic(err.Error()) }
for brandResults.Next() {
var id int
var enName string
var chName string
err = brandResults.Scan(&id, &enName, &chName)
if err != nil { panic(err.Error()) }
brandMap[cleanKeyword(enName)] = id
brandMap[cleanKeyword(chName)] = id
}
var categoryQuery = fmt.Sprintf("select id,name from secooErpDB.t_product_category where is_del = 0 and enabled = 1")
categoryResults, err := db.Query(categoryQuery)
if err != nil { panic(err.Error()) }
for categoryResults.Next() {
var id int
var name string
err = categoryResults.Scan(&id, &name)
if err != nil { panic(err.Error()) }
categoryMap[cleanKeyword(name)] = id
}
fmt.Println(brandMap)
fmt.Println(categoryMap)
}
func loadManual(folder string) {
files, _ := ioutil.ReadDir(folder)
for _,file := range files {
if !file.IsDir() {
fi, err := os.Open(folder + "/" + file.Name())
if err != nil { fmt.Print(err) }
fmt.Println(file.Name())
br := bufio.NewReader(fi)
for {
bytes, _, e := br.ReadLine()
if e == io.EOF { break }
line := string(bytes)
arr := strings.Split(line, "|")
manualMap[cleanKeyword(arr[0])] = strToInt(arr[1])
}
}
}
fmt.Println("manualMap loaded")
fmt.Println(manualMap)
}
func loadSensitive(folder string) {
files, _ := ioutil.ReadDir(folder)
for _,file := range files {
if !file.IsDir() {
fi, err := os.Open(folder + "/" + file.Name())
if err != nil {
fmt.Print(err)
}
br := bufio.NewReader(fi)
for {
bytes, _, e := br.ReadLine()
if e == io.EOF { break }
line := string(bytes)
key := cleanKeyword(line)
sensitiveMap[cleanKeyword(key)] = true
}
}
}
fmt.Println(sensitiveMap)
}
func queryInfo(db *sql.DB) []int {
countResults, err := db.Query("select count(*),max(id) from app_search_keyword_year_week_p_day ")
if err != nil { panic(err.Error()) }
var count int
var maxId int
for countResults.Next() {
err = countResults.Scan(&count, &maxId)
if err != nil {
panic(err.Error())
}
fmt.Printf("count=%d,maxId=%d\n", count, maxId)
}
return []int{count, maxId}
}
func queryIndex(idFlag int, db *sql.DB, bulkProcessor *elastic.BulkProcessor, wg *sync.WaitGroup) {
wg.Add(1)
// 循环时可能查询到重复数据,应该以id 的上下界来查询
var sqlStr = fmt.Sprintf("select * from app_search_keyword_year_week_p_day where id >= %d and id < %d", idFlag, idFlag + 10000)
results, err := db.Query(sqlStr)
if err != nil { log.Print(err.Error()) }
for results.Next() {
var id int
var keyword sql.NullString
var yearPv sql.NullInt64
var yearProductClickCount sql.NullInt64
var yearAddCartCount sql.NullInt64
var weekPv sql.NullInt64
var weekProductClickCount sql.NullInt64
var weekAddCartCount sql.NullInt64
var pDay string
err = results.Scan(&id, &keyword, &yearPv, &yearProductClickCount, &yearAddCartCount, &weekPv, &weekProductClickCount, &weekAddCartCount, &pDay)
if err != nil { log.Print(err.Error()) }
if keyword.Valid && len(keyword.String) > 0 && keyword.String != "" {
key := cleanKeyword(keyword.String)
var w = &Word{
Keyword:key,YearCount: int32(yearPv.Int64),
YearClickCount:int32(yearProductClickCount.Int64),
WeekCount: int32(weekPv.Int64),
WeekClickCount: int32(weekProductClickCount.Int64),
WeekCartCount: int32(weekAddCartCount.Int64),
KeywordVersion:pDay}
if v, isExist := wordMap.Load(key); isExist {
merge(w,v)
} else if v, isExist := existSameWord(key); isExist {
fmt.Println("find same word, now is:" + w.Keyword + " exist is:" + v.(*Word).Keyword)
merge(w,v)
}
addWord(w,bulkProcessor)
}
}
fmt.Printf("%d done.\n", idFlag)
defer wg.Done()
}
func existSameWord(keyword string) (value interface{}, ok bool) {
fields := strings.Fields(keyword)
length := len(fields)
if length == 1 {
return wordMap.Load(keyword)
}
if length > 5 {
return nil,false
}
lst := list.New()
permutation(fields,0,lst)
for p := lst.Front(); p != nil ; p = p.Next() {
if v, isExist := wordMap.Load(p.Value); isExist {
return v,true
}
}
return nil,false
}
func merge(word *Word, v interface{}) {
var t = v.(*Word)
word.Keyword = t.Keyword
word.YearCount += t.YearCount
word.YearCartCount += t.YearCartCount
word.YearClickCount += t.YearClickCount
word.WeekCount += t.WeekCount
word.WeekCartCount += t.WeekCartCount
word.WeekClickCount += t.WeekClickCount
}
func after(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
if err != nil { fmt.Printf("bulk commit failed, err: %v\n", err) }
}
func processWord(w *Word) {
w.KeywordPinYin = convertToPinyin(w.Keyword)
w.YearClickRatio = calculateRatio(w.YearClickCount, w.YearCount)
w.YearCartRatio = calculateRatio(w.YearCartCount, w.YearCount)
w.WeekClickRatio = calculateRatio(w.WeekClickCount, w.WeekCount)
w.WeekCartRatio = calculateRatio(w.WeekCartCount, w.WeekCount)
// 非默认值,加权
if w.YearCount != 0 && w.YearCartCount != 0 {
w.YearCartRatio *= 3
}
// 非默认值,加权
if w.WeekCount != 0 && w.WeekCartCount != 0 {
w.WeekCartRatio *= 3
}
// 非默认值,加权
if w.WeekCount != 0 && w.WeekClickCount != 0 {
w.WeekClickRatio *= 2
}
if _, isExist := brandMap[w.Keyword]; isExist {
w.IsBrand = true
}
if _, isExist := manualMap[w.Keyword]; isExist {
w.IsManual = true
w.ManualValue = manualMap[w.Keyword]
}
if _, isExist := categoryMap[w.Keyword]; isExist {
w.IsCategory = true
}
if _, isExist := sensitiveMap[w.Keyword]; isExist {
w.IsSensitive = true
}
calculateWordRank(w)
}
func isFilterWord(w *Word) bool {
// 敏感词过滤
if w.IsSensitive { return true }
// 过滤掉太长的词 每个中文字占3个byte
if utf8.RuneCountInString(w.Keyword) <= 1 || len(w.Keyword) > 60 { return true }
// 过滤掉商品id,商品id是有7位数字组成
if len(w.Keyword) > 6 && isAllDigit(w.Keyword) { return true }
// 品牌词 类目词 人工干预词 不做过滤
if w.IsBrand || w.IsCategory || w.IsManual { return false }
// 年数据过滤
if w.YearCount == 0 || w.YearClickCount == 0 { return true }
// 前缀过滤
for _, v := range prefixFilterArr {
if strings.HasPrefix(w.Keyword, v) { return true }
}
// 判断是否是热搜词 一年内搜索次数大于50或者一周内搜索次数大于5
if isHotSearchWord(w) {
// 搜索次数比较多 转化率或者点击率较高的 不过滤
return !isHighCartRatio(w)
} else {
// 搜索次数不多 但是转化率很高的 或者有加购 不过滤
return !isHighClickRatio(w)
}
}
func isAllDigit(str string) bool {
for _, x := range str {
// x 的类型是 rune 其实就是对应字符的 utf8 编码
if !unicode.IsDigit(x) { return false }
}
return true
}
func isHotSearchWord(w *Word) bool {
return w.YearCount > 50 || w.WeekCount > 5
}
func isHighCartRatio(w *Word) bool {
return w.YearCartRatio > 0.025 || w.WeekCartRatio > 0.025 || w.YearClickRatio > 0.1 || w.WeekClickRatio > 0.1
}
func isHighClickRatio(w *Word) bool {
if w.YearCount < 5 && w.YearCartCount == 0 && w.YearClickRatio < 0.6 { return false }
return w.YearClickRatio > 0.2 || w.WeekClickRatio > 0.2 || w.YearCartCount >= 1
}
func calculateRatio(numerator int32, denominator int32) float64 {
if numerator == 0 || denominator == 0 { return 0 }
return float64(float64(numerator) / float64(denominator))
}
func calculateWordRank(w *Word) {
wordRank := 10000.0
wordRank += 3000 * calculateLengthFactor(len(w.Keyword))
wordRank += 2000 * calculateCountFactor(w.YearCount, 1)
wordRank += 2000 * calculateCountFactor(w.WeekCount, 52)
wordRank += 3000 * calculateRatioFactor(w.YearClickRatio, w.YearClickCount)
wordRank += 3000 * calculateRatioFactor(w.WeekClickRatio, w.WeekClickCount)
wordRank += 3000 * calculateRatioFactor(w.YearCartRatio, w.YearCartCount)
wordRank += 3000 * calculateRatioFactor(w.WeekCartRatio, w.WeekCartCount)
if w.IsBrand { wordRank *= 1.8 }
if w.IsCategory { wordRank *= 1.2 }
if w.IsManual && w.ManualValue > 0 { wordRank *= math.Sqrt(float64(w.ManualValue)) }
w.WordRank = wordRank
}
func calculateLengthFactor(length int) float64 {
//根据文本长度转换为长度因子
return float64(1.0 / float64(2 * length + 1))
}
func calculateRatioFactor(ratio float64, count int32) float64 {
var rank float64
switch {
case count > 1 && count < 10 : rank = 1.2
case count >= 10 && count < 20 : rank = 1.4
case count >= 20 && count < 50 : rank = 1.6
case count >= 50 && count < 100 : rank = 1.8
case count >= 100 && count < 200 : rank = 2.0
case count >= 200 && count < 500 : rank = 2.2
case count >= 500 : rank = 2.5
default:rank = 1.0
}
//根据搜索转化率,转换为热度因子
return math.Log10(math.Sqrt(ratio + 10)) * rank
}
func calculateCountFactor(count int32, rank int32) float64 {
//根据搜索次数,转换为热度因子
count = count * rank + 10
return math.Log10(math.Sqrt(float64(count)))
}
......@@ -63,9 +63,7 @@ func main() {
startTime := time.Now()
datawareDB, err := sql.Open("mysql", RUN_ENV.DataWareDB)
if err != nil {
log.Print(err.Error())
}
if err != nil { log.Print(err.Error()) }
var client *elastic.Client
if RUN_ENV.EsUser != "" {
......@@ -142,7 +140,6 @@ func addWord(keyword string, processor *elastic.BulkProcessor) {
func loadErpDB() {
//db, err := sql.Open("mysql", "so_Erp_R:5RgzudyyFlApTmve@tcp(192.168.50.40:3306)/secooErpDB")
db, err := sql.Open("mysql", RUN_ENV.ErpDB)
if err != nil { log.Print(err.Error()) }
defer db.Close()
......@@ -247,19 +244,19 @@ func queryIndex(idFlag int, db *sql.DB, bulkProcessor *elastic.BulkProcessor, wg
for results.Next() {
var id int
var keyword sql.NullString
var year_pv sql.NullInt64
var year_product_click_count sql.NullInt64
var year_add_cart_count sql.NullInt64
var week_pv sql.NullInt64
var week_product_click_count sql.NullInt64
var week_add_cart_count sql.NullInt64
var p_day string
err = results.Scan(&id, &keyword, &year_pv, &year_product_click_count, &year_add_cart_count, &week_pv, &week_product_click_count, &week_add_cart_count, &p_day)
var yearPv sql.NullInt64
var yearProductClickCount sql.NullInt64
var yearAddCartCount sql.NullInt64
var weekPv sql.NullInt64
var weekProductClickCount sql.NullInt64
var weekAddCartCount sql.NullInt64
var pDay string
err = results.Scan(&id, &keyword, &yearPv, &yearProductClickCount, &yearAddCartCount, &weekPv, &weekProductClickCount, &weekAddCartCount, &pDay)
if err != nil { log.Print(err.Error()) }
if keyword.Valid && len(keyword.String) > 0 && keyword.String != "" {
key := cleanKeyword(keyword.String)
if v, isExist := m.Load(key); (!isExist && len(key) > 0) || (isExist && int32(year_pv.Int64) > v.(int32)) {
if v, isExist := m.Load(key); (!isExist && len(key) > 0) || (isExist && int32(yearPv.Int64) > v.(int32)) {
var w = new(Word)
w.Keyword = key
w.YearCount = 0
......@@ -268,14 +265,14 @@ func queryIndex(idFlag int, db *sql.DB, bulkProcessor *elastic.BulkProcessor, wg
w.WeekCount = 0
w.WeekClickCount = 0
w.WeekCartCount = 0
w.KeywordVersion = p_day
w.KeywordVersion = pDay
if year_pv.Valid { w.YearCount = int32(year_pv.Int64) }
if year_product_click_count.Valid { w.YearClickCount = int32(year_product_click_count.Int64) }
if year_add_cart_count.Valid { w.YearCartCount = int32(year_add_cart_count.Int64) }
if week_pv.Valid { w.WeekCount = int32(week_pv.Int64) }
if week_product_click_count.Valid { w.WeekClickCount = int32(week_product_click_count.Int64) }
if week_add_cart_count.Valid { w.WeekCartCount = int32(week_add_cart_count.Int64) }
if yearPv.Valid { w.YearCount = int32(yearPv.Int64) }
if yearProductClickCount.Valid { w.YearClickCount = int32(yearProductClickCount.Int64) }
if yearAddCartCount.Valid { w.YearCartCount = int32(yearAddCartCount.Int64) }
if weekPv.Valid { w.WeekCount = int32(weekPv.Int64) }
if weekProductClickCount.Valid { w.WeekClickCount = int32(weekProductClickCount.Int64) }
if weekAddCartCount.Valid { w.WeekCartCount = int32(weekAddCartCount.Int64) }
processWord(w)
if !isFilterWord(w) {
......@@ -340,9 +337,10 @@ func processWord(w *Word) {
calculateWordRank(w)
}
// traditional chinese to simple chinese
// chinese trim
// english remove redudent blank char
func cleanKeyword(keyword string) string {
fmt.Println(keyword)
out, err := t2s.Convert(keyword)
if err != nil { fmt.Println(err) }
keyword = strings.TrimSpace(strings.ToLower(out))
......@@ -467,4 +465,4 @@ func calculateCountFactor(count int32, rank int32) float64 {
//根据搜索次数,转换为热度因子
count = count * rank + 10
return math.Log10(math.Sqrt(float64(count)))
}
\ No newline at end of file
}
package main
import (
"sync"
"fmt"
"github.com/liuzl/gocc"
"strings"
)
var tmap sync.Map
func main() {
var t2s, _ = gocc.New("t2s")
var _, err = t2s.Convert("中國")
if err != nil { fmt.Println("succ")}
//var t2s, _ = gocc.New("t2s")
//var _, err = t2s.Convert("中國")
//if err != nil { fmt.Println("succ")}
var s = "意尔康 男 鞋"
//var s = "意尔康 男 鞋"
//var re, _ = regexp.Compile("\\s+")
//var st = re.ReplaceAllLiteralString(s," ")
fmt.Println(strings.Join(strings.Fields(s),""))
//fields := strings.Fields(s)
//t := time.Now()
//fmt.Println(factorial(5))
//fmt.Print(time.Now().Unix() -t.Unix() )
fmt.Println(H)
//var lst = new(list.List)
//for i := 1; i < 10 ; i++ {
// lst.PushBack(i)
//}
//for p := lst.Front(); p != nil ; p = p.Next() {
// fmt.Println(p.Value)
//}
add()
var val,_ = tmap.Load("a")
fmt.Print(val)
}
func add() {
tmap.Store("a","b")
var val,_ = tmap.Load("a")
fmt.Print(val)
}
//// 求阶乘
//func factorial(n int64) int64 {
// if n == 1 { return 1}
// return n * factorial(n-1)
//}
//
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment