Commit 868c00a1 by davidstark

go 任务建立git项目

parents
package main
import (
"bufio"
"context"
"crypto/md5"
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/liuzl/gocc"
"github.com/olivere/elastic"
"github.com/mozillazg/go-pinyin"
"io"
"io/ioutil"
"log"
"math"
"os"
"strconv"
"strings"
"sync"
"time"
"unicode"
"unicode/utf8"
)
type Word struct {
Keyword string `json:"keyword"`
KeywordPinYin string `json:"keywordPinYin"`
YearClickCount int32 `json:"yearClickCount"`
YearCartCount int32 `json:"yearCartCount"`
YearCount int32 `json:"yearCount"`
WeekClickCount int32 `json:"weekClickCount"`
WeekCartCount int32 `json:"weekCartCount"`
WeekCount int32 `json:"weekCount"`
YearClickRatio float64 `json:"yearClickRatio"`
YearCartRatio float64 `json:"yearCartRatio"`
WeekClickRatio float64 `json:"weekClickRatio"`
WeekCartRatio float64 `json:"weekCartRatio"`
IsBrand bool `json:"isBrand"`
IsCategory bool `json:"isCategory"`
IsManual bool `json:"isManual"`
IsSensitive bool `json:"isSensitive"`
ManualValue int32 `json:"manualValue"`
WordRank float64 `json:"wordRank"`
KeywordVersion string `json:"keywordVersion"`
}
const BRAND_TYPE = "1"
const CATEGORY_TYPE = "2"
const MANUAL_TYPE = "3"
var wordMap sync.Map
var brandMap = make(map[string]int)
var categoryMap = make(map[string]int)
var manualMap = make(map[string]int32)
var sensitiveMap = make(map[string]bool)
var usedMap sync.Map
var now = time.Now()
var dateStr = fmt.Sprintf("%d-%d-%d",now.Year(),now.Month(),now.Day())
var t2s, _ = gocc.New("t2s")
var prefixFilterArr = []string{"https://", "http://", "dg", "d & g", "dolce&gabbana",
"dolce & gabbana", "杜嘉班纳", "避孕", "情趣", "cucci", "乒乓球", "cuccl", "gucii"}
var testDatawareDBInfo = "root:1234@tcp(localhost:3306)/secooErpDB"
var testErpDBInfo = "3306_test:iS6CXpYqgZ8Mhjui@tcp(10.4.3.223:3306)/secooErpDB"
var testESInfo = "http://localhost:9200"
var testManualFolder = "D:\\Code\\suggest_corpus-20180801\\manual"
var testSensitiveFolder = "D:\\Code\\suggest_corpus-20180801\\sensitive"
func main() {
startTime := time.Now()
//db, err := sql.Open("mysql", "Search_DataWar_R:pY1P9zUj9x1M65ot5szo@tcp(secooDataWarehouse.slave.com:3306)/secooDataWarehouse")
db, err := sql.Open("mysql", testDatawareDBInfo)
if err != nil { log.Print(err.Error()) }
client, err := elastic.NewClient(elastic.SetURL(testESInfo))
// http://bigdataescluster.secoolocal.com:9200"
//elastic.SetBasicAuth("search", "search5z0NvEn1D"))
if err != nil { log.Print(err.Error()) }
bulkProcessor, err := elastic.NewBulkProcessorService(client).
Workers(50).
BulkActions(10000).
FlushInterval(1 * time.Second).
After(after).
Do(context.Background())
if err != nil { log.Print(err.Error()) }
loadErpDB()
manualFolder := testManualFolder
sensitiveFolder := testSensitiveFolder
loadManual(manualFolder)
loadSensitive(sensitiveFolder)
var wg sync.WaitGroup
arr := queryInfo(db)
if arr[0] > 1000000 {
count := arr[1] / 10000
log.Printf("maxId/10000=%d\n", count)
for i := 0; i <= count; i++ {
go queryIndex(i*10000, db, bulkProcessor, &wg, wordMap)
}
}
wg.Wait()
fmt.Println("all thread has read maps")
checkUnusedData(bulkProcessor)
err = bulkProcessor.Flush()
if err != nil { log.Print(err.Error()) }
defer db.Close()
fmt.Printf("Cost %d ms\n", time.Since(startTime).Nanoseconds()/1e6)
}
func checkUnusedData(bulkProcessor *elastic.BulkProcessor) {
for brand := range brandMap {
if _, exist := usedMap.Load(brand + BRAND_TYPE); !exist {
addWord(brand,bulkProcessor,BRAND_TYPE)
}
}
for category := range categoryMap {
if _, exist := usedMap.Load(category + CATEGORY_TYPE); !exist {
addWord(category,bulkProcessor,CATEGORY_TYPE)
}
}
for manual := range manualMap {
if _, exist := usedMap.Load(manual + MANUAL_TYPE); !exist {
addWord(manual,bulkProcessor,MANUAL_TYPE)
}
}
}
func addWord(keyword string, processor *elastic.BulkProcessor, wordType string) {
var w = new(Word)
switch wordType {
case BRAND_TYPE: w.IsBrand = true
case CATEGORY_TYPE: w.IsCategory = true
case MANUAL_TYPE: { w.IsManual = true; w.ManualValue = manualMap[keyword]}
}
w.Keyword = keyword
w.KeywordVersion = dateStr
processWord(w)
if !isFilterWord(w) {
id := fmt.Sprintf("%x", md5.Sum([]byte(w.Keyword)))
req := elastic.NewBulkIndexRequest().
Index("search_suggest_index").
Type("search_suggest_type").Id(id).Doc(w)
processor.Add(req)
}
}
func productEnv() {
testDatawareDBInfo = "Search_DataWar_R:pY1P9zUj9x1M65ot5szo@tcp(secooDataWarehouse.slave.com:3306)/secooDataWarehouse"
testErpDBInfo = "3306_test:iS6CXpYqgZ8Mhjui@tcp(10.4.3.223:3306)/secooErpDB"
testESInfo = "http://bigdataescluster.secoolocal.com:9200"
testManualFolder = "/data/pssmaster/corpus_set/suggest_corpus/manual"
testSensitiveFolder = "/data/pssmaster/corpus_set/suggest_corpus/sensitive"
}
func loadErpDB() {
//db, err := sql.Open("mysql", "so_Erp_R:5RgzudyyFlApTmve@tcp(192.168.50.40:3306)/secooErpDB")
db, err := sql.Open("mysql", testErpDBInfo)
if err != nil { log.Print(err.Error()) }
defer db.Close()
var brandQuery = fmt.Sprintf("select id,en_name,ch_name from secooErpDB.t_product_brand where is_del = 0 and enabled = 1")
brandResults, err := db.Query(brandQuery)
if err != nil { panic(err.Error()) }
for brandResults.Next() {
var id int
var enName string
var chName string
err = brandResults.Scan(&id, &enName, &chName)
if err != nil { panic(err.Error()) }
brandMap[cleanKeyword(enName)] = id
brandMap[cleanKeyword(chName)] = id
}
var categoryQuery = fmt.Sprintf("select id,name from secooErpDB.t_product_category where is_del = 0 and enabled = 1")
categoryResults, err := db.Query(categoryQuery)
if err != nil { panic(err.Error()) }
for categoryResults.Next() {
var id int
var name string
err = categoryResults.Scan(&id, &name)
if err != nil { panic(err.Error()) }
categoryMap[cleanKeyword(name)] = id
}
fmt.Println(brandMap)
fmt.Println(categoryMap)
}
func loadManual(folder string) {
files, _ := ioutil.ReadDir(folder)
for _,file := range files {
if !file.IsDir() {
fi, err := os.Open(folder + "/" + file.Name())
if err != nil { fmt.Print(err) }
fmt.Println(file.Name())
br := bufio.NewReader(fi)
for {
bytes, _, e := br.ReadLine()
if e == io.EOF { break }
line := string(bytes)
arr := strings.Split(line, "|")
manualMap[cleanKeyword(arr[0])] = strToInt(arr[1])
}
}
}
fmt.Println("manualMap loaded")
fmt.Println(manualMap)
}
func loadSensitive(folder string) {
files, _ := ioutil.ReadDir(folder)
for _,file := range files {
if !file.IsDir() {
fi, err := os.Open(folder + "/" + file.Name())
if err != nil {
fmt.Print(err)
}
br := bufio.NewReader(fi)
for {
bytes, _, e := br.ReadLine()
if e == io.EOF { break }
line := string(bytes)
key := cleanKeyword(line)
sensitiveMap[cleanKeyword(key)] = true
}
}
}
fmt.Println(sensitiveMap)
}
func queryInfo(db *sql.DB) []int {
countResults, err := db.Query("select count(*),max(id) from app_search_keyword_year_week_p_day")
if err != nil { panic(err.Error()) }
var count int
var maxId int
for countResults.Next() {
err = countResults.Scan(&count, &maxId)
if err != nil {
panic(err.Error())
}
fmt.Printf("count=%d,maxId=%d\n", count, maxId)
}
return []int{count, maxId}
}
func queryIndex(idFlag int, db *sql.DB, bulkProcessor *elastic.BulkProcessor, wg *sync.WaitGroup, m sync.Map) {
wg.Add(1)
fmt.Printf("%d doing.\n", idFlag)
// 循环时可能查询到重复数据,应该以id 的上下界来查询
var sqlStr = fmt.Sprintf("select * from app_search_keyword_year_week_p_day where id >= %d and id < %d", idFlag, idFlag + 10000)
results, err := db.Query(sqlStr)
if err != nil { log.Print(err.Error()) }
for results.Next() {
var id int
var keyword sql.NullString
var year_pv sql.NullInt64
var year_product_click_count sql.NullInt64
var year_add_cart_count sql.NullInt64
var week_pv sql.NullInt64
var week_product_click_count sql.NullInt64
var week_add_cart_count sql.NullInt64
var p_day string
err = results.Scan(&id, &keyword, &year_pv, &year_product_click_count, &year_add_cart_count, &week_pv, &week_product_click_count, &week_add_cart_count, &p_day)
if err != nil { log.Print(err.Error()) }
if keyword.Valid && len(keyword.String) > 0 && keyword.String != "" {
key := cleanKeyword(keyword.String)
if v, isExist := m.Load(key); (!isExist && len(key) > 0) || (isExist && int32(year_pv.Int64) > v.(int32)) {
var w = new(Word)
w.Keyword = key
w.YearCount = 0
w.YearClickCount = 0
w.YearCartCount = 0
w.WeekCount = 0
w.WeekClickCount = 0
w.WeekCartCount = 0
w.KeywordVersion = p_day
if year_pv.Valid { w.YearCount = int32(year_pv.Int64) }
if year_product_click_count.Valid { w.YearClickCount = int32(year_product_click_count.Int64) }
if year_add_cart_count.Valid { w.YearCartCount = int32(year_add_cart_count.Int64) }
if week_pv.Valid { w.WeekCount = int32(week_pv.Int64) }
if week_product_click_count.Valid { w.WeekClickCount = int32(week_product_click_count.Int64) }
if week_add_cart_count.Valid { w.WeekCartCount = int32(week_add_cart_count.Int64) }
processWord(w)
if !isFilterWord(w) {
m.Store(key, w.YearCount)
id := fmt.Sprintf("%x", md5.Sum([]byte(w.Keyword)))
req := elastic.NewBulkIndexRequest().
Index("search_suggest_index").
Type("search_suggest_type").Id(id).Doc(w)
bulkProcessor.Add(req)
}
}
}
}
fmt.Printf("%d done.\n", idFlag)
defer wg.Done()
}
func after(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
if err != nil { fmt.Printf("bulk commit failed, err: %v\n", err) }
fmt.Printf("commit successfully, len(requests)=%d\n", len(requests))
}
func processWord(w *Word) {
w.KeywordPinYin = convertToPinyin(w.Keyword)
w.YearClickRatio = calculateRatio(w.YearClickCount, w.YearCount)
w.YearCartRatio = calculateRatio(w.YearCartCount, w.YearCount)
w.WeekClickRatio = calculateRatio(w.WeekClickCount, w.WeekCount)
w.WeekCartRatio = calculateRatio(w.WeekCartCount, w.WeekCount)
// 非默认值,加权
if w.YearCount != 0 && w.YearCartCount != 0 {
w.YearCartRatio *= 3
}
// 非默认值,加权
if w.WeekCount != 0 && w.WeekCartCount != 0 {
w.WeekCartRatio *= 3
}
// 非默认值,加权
if w.WeekCount != 0 && w.WeekClickCount != 0 {
w.WeekClickRatio *= 2
}
if _, isExist := brandMap[w.Keyword]; isExist {
w.IsBrand = true
usedMap.Store(w.Keyword + BRAND_TYPE, true)
}
if _, isExist := manualMap[w.Keyword]; isExist {
w.IsManual = true
w.ManualValue = manualMap[w.Keyword]
usedMap.Store(w.Keyword + MANUAL_TYPE,true)
}
if _, isExist := categoryMap[w.Keyword]; isExist {
w.IsCategory = true
usedMap.Store(w.Keyword + CATEGORY_TYPE, true)
}
if _, isExist := sensitiveMap[w.Keyword]; isExist {
w.IsSensitive = true
}
calculateWordRank(w)
}
func cleanKeyword(keyword string) string {
out, err := t2s.Convert(keyword)
if err != nil { fmt.Println(err) }
keyword = strings.TrimSpace(strings.ToLower(out))
return strings.Join(strings.Fields(keyword)," ")
}
func strToInt(str string) int32 {
if str == "\\N" || str == "" { return 0 }
v, err := strconv.ParseInt(str, 10, 64)
if err != nil { fmt.Println(err) }
return int32(v)
}
func convertToPinyin(str string) string {
var ret string
for _, v := range str {
s := strings.Join(pinyin.LazyPinyin(string(v), pinyin.NewArgs()), "")
if len(s) > 0 {
ret += s
} else {
ret += string(v)
}
}
return ret
}
func isFilterWord(w *Word) bool {
// 敏感词过滤
if w.IsSensitive { return true }
// 过滤掉太长的词 每个中文字占3个byte
if utf8.RuneCountInString(w.Keyword) <= 1 || len(w.Keyword) > 60 { return true }
// 过滤掉商品id,商品id是有7位数字组成
if len(w.Keyword) > 6 && isAllDigit(w.Keyword) { return true }
// 品牌词 类目词 人工干预词 不做过滤
if w.IsBrand || w.IsCategory || w.IsManual { return false }
// 年数据过滤
if w.YearCount == 0 || w.YearClickCount == 0 { return true }
// 前缀过滤
for _, v := range prefixFilterArr {
if strings.HasPrefix(w.Keyword, v) { return true }
}
// 判断是否是热搜词 一年内搜索次数大于50或者一周内搜索次数大于5
if isHotSearchWord(w) {
// 搜索次数比较多 转化率或者点击率较高的 不过滤
return !isHighCartRatio(w)
} else {
// 搜索次数不多 但是转化率很高的 或者有加购 不过滤
return !isHighClickRatio(w)
}
}
func isAllDigit(str string) bool {
for _, x := range str {
// x 的类型是 rune 其实就是对应字符的 utf8 编码
if !unicode.IsDigit(x) { return false }
}
return true
}
func isHotSearchWord(w *Word) bool {
return w.YearCount > 50 || w.WeekCount > 5
}
func isHighCartRatio(w *Word) bool {
return w.YearCartRatio > 0.025 || w.WeekCartRatio > 0.025 || w.YearClickRatio > 0.1 || w.WeekClickRatio > 0.1
}
func isHighClickRatio(w *Word) bool {
if w.YearCount < 5 && w.YearCartCount == 0 && w.YearClickRatio < 0.6 { return false }
return w.YearClickRatio > 0.2 || w.WeekClickRatio > 0.2 || w.YearCartCount >= 1
}
func calculateRatio(numerator int32, denominator int32) float64 {
if numerator == 0 || denominator == 0 { return 0 }
return float64(float64(numerator) / float64(denominator))
}
func calculateWordRank(w *Word) {
wordRank := 10000.0
wordRank += 3000 * calculateLengthFactor(len(w.Keyword))
wordRank += 2000 * calculateCountFactor(w.YearCount, 1)
wordRank += 2000 * calculateCountFactor(w.WeekCount, 52)
wordRank += 3000 * calculateRatioFactor(w.YearClickRatio, w.YearClickCount)
wordRank += 3000 * calculateRatioFactor(w.WeekClickRatio, w.WeekClickCount)
wordRank += 3000 * calculateRatioFactor(w.YearCartRatio, w.YearCartCount)
wordRank += 3000 * calculateRatioFactor(w.WeekCartRatio, w.WeekCartCount)
if w.IsBrand { wordRank *= 1.8 }
if w.IsCategory { wordRank *= 1.2 }
if w.IsManual && w.ManualValue > 0 { wordRank *= math.Sqrt(float64(w.ManualValue)) }
w.WordRank = wordRank
}
func calculateLengthFactor(length int) float64 {
//根据文本长度转换为长度因子
return float64(1.0 / float64(2 * length + 1))
}
func calculateRatioFactor(ratio float64, count int32) float64 {
var rank float64
switch {
case count > 1 && count < 10 : rank = 1.2
case count >= 10 && count < 20 : rank = 1.4
case count >= 20 && count < 50 : rank = 1.6
case count >= 50 && count < 100 : rank = 1.8
case count >= 100 && count < 200 : rank = 2.0
case count >= 200 && count < 500 : rank = 2.2
case count >= 500 : rank = 2.5
default:rank = 1.0
}
//根据搜索转化率,转换为热度因子
return math.Log10(math.Sqrt(ratio + 10)) * rank
}
func calculateCountFactor(count int32, rank int32) float64 {
//根据搜索次数,转换为热度因子
count = count * rank + 10
return math.Log10(math.Sqrt(float64(count)))
}
\ No newline at end of file
package main
import (
"time"
"fmt"
)
type T struct {
Keyword string `json:"keyword"`
KeywordPinYin string `json:"keywordPinYin"`
YearClickCount int32 `json:"yearClickCount"`
YearCartCount int32 `json:"yearCartCount"`
YearCount int32 `json:"yearCount"`
WeekClickCount int32 `json:"weekClickCount"`
WeekCartCount int32 `json:"weekCartCount"`
WeekCount int32 `json:"weekCount"`
YearClickRatio float64 `json:"yearClickRatio"`
YearCartRatio float64 `json:"yearCartRatio"`
WeekClickRatio float64 `json:"weekClickRatio"`
WeekCartRatio float64 `json:"weekCartRatio"`
IsBrand bool `json:"isBrand"`
IsCategory bool `json:"isCategory"`
IsManual bool `json:"isManual"`
IsSensitive bool `json:"isSensitive"`
ManualValue int32 `json:"manualValue"`
WordRank float64 `json:"wordRank"`
KeywordVersion string `json:"keywordVersion"`
}
func main() {
var w = new(T)
var t = time.Now()
w.KeywordVersion = fmt.Sprintf("%d-%d-%d",t.Year(),t.Month(),t.Day())
fmt.Println(w.KeywordVersion)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment