go-redis

參考 go-redis.下載

go get github.com/go-redis/redis

Redis Cluster 測試程式

package main

import (
	"fmt"

	"github.com/go-redis/redis"
)

func main() {
	var client *redis.ClusterClient

	client = redis.NewClusterClient(&redis.ClusterOptions{
		Addrs:    []string{"192.168.3.1:7000", "192.168.3.2:7000", "192.168.3.3:7000"},
		Password: "mimimi123",
	})

	//client.HSet("usrProfile_hash:628584079XXXX", "locations", "24.747,121.734;24.757,121.834")
	psersonModelToRedis(client, "")

	personResult := client.HGetAll("usrProfile_hash:628584079XXXX")
	personModel, _ := personResult.Result()
	fmt.Println(personModel["id"])
	fmt.Println(personModel["locations"])
	fmt.Println(personModel["profiles"])
	fmt.Println(personModel["labels"])
	fmt.Println(personModel["locations"])

}

func psersonModelToRedis(client *redis.ClusterClient, psersonStr string) {
	//HDFS : id | locations | urls | kvs | profiles | ids | labels
	personModel := make(map[string]interface{})
	personModel["id"] = "test_0001"
	personModel["locations"] = "24.747,121.734;24.757,121.834"
	personModel["urls"] = "init-p01st.push.apple.com/bag;log.tbs.qq.com/ajax?c=dl&k=be407331de19a5ff5a010cc6bc37a330;mon.snssdk.com/monitor/appmonitor/v2/settings?os_api=24&device_type=SM-N9208&ssmix=a&manifest_version_code=202&dpi=420&region=TW&carrier_region=TW&app_name=trill&version_name=2.0.2&timezone_offset=28800&is_my_cn=1&ac=4g&update_version_code=2020&channel=go"
	personModel["kvs"] = "4000019,1;4000001,1"
	personModel["profiles"] = "3000003,1;3000004,Feature Phone;3000010,2900"
	personModel["ids"] = "ifa:625a61936d485e"
	personModel["labels"] = "10071,9;11760,8;10058,9"
	client.HMSet("usrProfile_hash:628584079XXXX", personModel)
}

func pingRedisCluster(client *redis.ClusterClient) bool {
	err := client.Ping().Err()
	if err == nil {
		return true
	} else {
		return false
	}
}

讀取 HDFS 檔案後,寫資料到 Redis

package main

import (
	"bufio"
	"fmt"
	"os"
	"strings"
	"sync"
	"time"

	"github.com/colinmarc/hdfs"
	"github.com/go-redis/redis"
)

/*
	HDFS_DIR=/user/miuser/data/model/20181024_person HDFS_NAMENODE=192.168.3.1:8020 ./enrich-backupToRedis
	hdfsDirPath := "/user/miuser/data/model/20181024_person"
	hdfsClient, _ := hdfs.New("192.168.3.1:8020")
*/

func main() {
	start := time.Now()
	fmt.Println("HDFS to Redis start Time : " + start.Format("20060102150405"))
	var client *redis.ClusterClient
	hdfsDirPath := os.Getenv("HDFS_DIR")
	hdfsNameNode := os.Getenv("HDFS_NAMENODE")
	hdfsClient, _ := hdfs.New(hdfsNameNode)
	hdfsFiles, _ := hdfsClient.ReadDir(hdfsDirPath)

	client = redis.NewClusterClient(&redis.ClusterOptions{
		Addrs:    []string{"192.168.3.1:7000", "192.168.3.2:7000", "192.168.3.3:7000"},
		Password: "test",
	})

	if !pingRedisCluster(client) {
		fmt.Println("Connect RedisCluster Fail ! ")
		os.Exit(1)
	}
	var exeCount = 0
	var m sync.Mutex
	var wg sync.WaitGroup
	wg.Add(len(hdfsFiles))

	for _, hdfsFile := range hdfsFiles {
		var hdfsnm string
		hdfsnm = hdfsFile.Name()

		go func() {
			defer wg.Done()
			if strings.HasPrefix(hdfsnm, "part") {
				hdfsFile := hdfsDirPath + "/" + hdfsnm
				file, _ := hdfsClient.Open(hdfsFile)
				scanner := bufio.NewScanner(file)
				for scanner.Scan() {
					lineData := scanner.Text()
					personModelToRedis(client, lineData)
					m.Lock()
					exeCount++
					m.Unlock()
				}
			}
		}()
	}
	wg.Wait()
	end := time.Now()
	executeTime := end.Sub(start)
	fmt.Println("HDFS to Redis end Time : " + end.Format("20060102150405"))
	fmt.Printf("HDFS to Redis executeTime : %v , executeCount : %d ", executeTime, exeCount)
}

func personModelToRedis(client *redis.ClusterClient, psersonStr string) {
	//HDFS : 0,0;1;2(certset),id,last | locations | urls | kvs | profiles | ids | labels
	personModel := make(map[string]interface{})
	modelPart := strings.Split(psersonStr, "|")
	personPart := strings.Split(modelPart[0], ",")
	personID := personPart[2]
	personModel["certset"] = personPart[1]
	personModel["id"] = personID
	personModel["last"] = personPart[3]
	personModel["locations"] = modelPart[1]
	personModel["urls"] = modelPart[2]
	personModel["kvs"] = modelPart[3]
	personModel["profiles"] = modelPart[4]
	personModel["ids"] = modelPart[5]
	personModel["labels"] = modelPart[6]
	client.HMSet("usrProfile_"+personID, personModel)
}

func pingRedisCluster(client *redis.ClusterClient) bool {
	err := client.Ping().Err()
	if err == nil {
		return true
	} else {
		return false
	}
}

打包成可在 linux 上執行的執行檔

env GOOS=linux GOARCH=amd64 go build example1.go

給環境變數的執行方式

HDFS_DIR=/user/miuser/data/model/20181024_person HDFS_NAMENODE=192.168.3.65:8020 ./enrich-backupToRedis