项目背景是监控敏感词推送机以及推送机需要推送的目标机敏感词是否更新
代码目录结构:
目录结构
插件程序名toolpligin.go:
package toolpligin
import (
"bufio"
"bytes"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"net/http"
"os"
"log"
"os/exec"
"strconv"
"strings"
"time"
)
var (
Info *log.Logger
Warning *log.Logger
Error * log.Logger
)
type Treeid_ip struct {
Status int `json:"status"`
Msg string `json: "msg"`
Data []string `json: "data"`
}
type Res_node struct {
Id int
Link string
Idc string
}
type Ip_node struct {
Status int `json:"status"`
Msg string `json: "msg"`
Data map[string][]Res_node `json: "data"`
}
type Null_node struct {
Status int `json:"status"`
Msg string `json: "msg"`
Data []string `json: "data"`
}
var localfiletimemapvalue map[string]string = make(map[string]string)
var ipslice []string=make([]string,0)
const (
fileipvalue = "../conf/ip.txt"
filetimevalue = "../conf/filetimevalue.txt"
)
func init(){
errFile,err:=os.OpenFile("../log/errors.log",os.O_CREATE|os.O_WRONLY|os.O_APPEND,0666)
if err!=nil{
log.Fatalln("打开日志文件失败:",err)
}
Info = log.New(errFile,"Info:",log.Ldate | log.Ltime | log.Lshortfile)
Warning = log.New(errFile,"Warning:",log.Ldate | log.Ltime | log.Lshortfile)
Error = log.New(errFile,"Error:",log.Ldate | log.Ltime | log.Lshortfile)
}
//go执行shell命令并返回字符串
func RunRcmdBackstring(rcmdstring string) (string,error){
cmd := exec.Command("/bin/bash", "-c", rcmdstring)
var out bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
res := errors.New(fmt.Sprint(err) + ": " + stderr.String())
Error.Println(res)
return "error", res
}
return out.String(),nil
}
//判断值是否在slice中是否存在值
func InboolSlice(haystack []bool, needle bool) bool{
for _,e := range haystack{
if e == needle{
return false
}
}
return true
}
//发送报警
func RequestAlarm(title,alarmgroup,content string){
alarm_url := fmt.Sprintf("http://falcon.search.weibo.com/falcon/intfs/alarm/send?title=%s&group=%s&content=%s",title,alarmgroup,content)
resp,err := http.Get(alarm_url)
if err != nil{
Error.Println(err)
}
defer resp.Body.Close()
if resp.StatusCode == 200 {
Info.Println("报警发送完成")
}
}
//将获取到的文件时间戳写入文件
func Writefiletimevalue(conffilename string,writeremotefiletimemapvalue map[string]string) {
f, err := os.OpenFile(conffilename, os.O_WRONLY|os.O_TRUNC, 0600)
defer f.Close()
if err != nil { //打开文件失败
Error.Println(err)
} else {
var filetimevaluestring string
for key,value := range writeremotefiletimemapvalue {
filetimevaluestring = key + ":" + value + "\n"
_, err = f.Write([]byte(filetimevaluestring))
if err != nil{
Error.Printf("写入文件出错:%s\n",err)
}
}
}
}
//比较文件时间
func Comparefiletimevalue(firstfiletimevalue,secondfiletimevalue map[string]string) bool{
for firstkey,firstvalue := range firstfiletimevalue{
secondvalue,ok := secondfiletimevalue[firstkey]
if ok {
firstvalue_in ,first_err := strconv.Atoi(firstvalue)
if first_err != nil{
Error.Println(first_err)
}
secondvalue_in ,second_err := strconv.Atoi(secondvalue)
if second_err != nil{
Error.Println(second_err)
}
difference := secondvalue_in - firstvalue_in
abs_difference := math.Abs(float64(difference))
if abs_difference > 120.0 {
return false
}
}
}
return true
}
//发送邮件
func Sendmail(targetip,ipvalue,body string){
m := gomail.NewMessage() // 声明一封邮件对象
m.SetHeader("From", "search-cowork@staff.sina.com.cn") // 发件人
m.SetHeader("To", "xingdong@staff.weibo.com", "shujian@staff.weibo.com", "zhiping7@staff.weibo.com") // 收件人
m.SetHeader("Subject", fmt.Sprintf("删除敏感词推送机上的ip")) // 邮件主题
m.SetBody("text/plain", body) // 邮件内容
// host 是提供邮件的服务器,port是服务器端口,username 是发送邮件的账号, password是发送邮件的密码
d := gomail.NewDialer("mail.staff.sina.com.cn", 25, "search-cowork", "1234.com1")
d.TLSConfig = &tls.Config{InsecureSkipVerify: true} // 配置tls,跳过验证
// 发送邮件
if err := d.DialAndSend(m); err != nil {
toolpligin.Info.Println("try send a mail failed")
}
}
//获取本地文件中的文件时间戳并返回map结构
func Getlocalfiletimevalue() map[string]string{
file,err := os.Open(filetimevalue)
if err != nil{
Error.Println("打开md5文件出错")
}
defer file.Close()
//创建缓冲区
bfvalue := bufio.NewReader(file)
for {
linevalue,err := bfvalue.ReadBytes('\n')
if err != nil{
if err == io.EOF{
break
}
Error.Println(err)
}
ftimevalue := strings.Trim(string(linevalue),"\n")
filetimeslice := strings.Split(ftimevalue,":")
filename := filetimeslice[0]
filevalue := filetimeslice[1]
localfiletimemapvalue[filename] = filevalue
}
return localfiletimemapvalue
}
//数据结构:{"status":0,"msg":"","data":["10.77.104.179","10.185.27.125","10.182.16.99","10.182.16.100","10.182.16.101","10.182.16.102","10.182.16.103","10.182.16.104","10.182.16.105","10.182.16.106","10.185.27.123","10.185.27.124"]}
func OnlineGetIpSlice() []string {
url := "http://falcon.search.weibo.com/falcons/intfs/Node/getServers?tree_id=946"
// 超时时间:5秒
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Get(url)
if err != nil {
ipslice := GetIpSlice()
return ipslice
}
defer resp.Body.Close()
var buffer [512]byte
result := bytes.NewBuffer(nil)
for {
n, err := resp.Body.Read(buffer[0:])
result.Write(buffer[0:n])
if err != nil && err == io.EOF {
break
} else if err != nil {
panic(err)
}
}
res := result.String()
bodyresult := new(Treeid_ip)
json_err := json.Unmarshal([]byte(res),bodyresult)
if json_err != nil{
Error.Println(json_err)
}
Writeipfilevalue(fileipvalue,bodyresult.Data)
return bodyresult.Data
}
//获取本地文件中的ip列表并返回slice结构
func GetIpSlice() []string{
file,err := os.Open("../conf/ip.txt")
if err != nil{
Error.Println("打开ip文件出错")
}
defer file.Close()
//创建缓冲区
bfvalue := bufio.NewReader(file)
for {
linevalue,err := bfvalue.ReadBytes('\n')
if err != nil{
if err == io.EOF{
break
}
Error.Println(err)
}
iplinevalue := strings.Trim(string(linevalue),"\n")
ipslice = append(ipslice,iplinevalue)
}
return ipslice
}
//将获取到的文件时间戳写入文件
func Writeipfilevalue(ipfilename string,sourceipslice []string) {
f, err := os.OpenFile(ipfilename, os.O_WRONLY|os.O_TRUNC, 0600)
defer f.Close()
if err != nil { //打开文件失败
Error.Println(err)
} else {
for _,value := range sourceipslice {
_, err = f.Write([]byte(value+"\n"))
if err != nil{
Error.Printf("写入ip文件出错:%s\n",err)
}
}
}
}
//get请求方式获取http链接数据,并json解析
//数据结构
//{"status":0,"msg":"success","data":{"10.41.16.168":[{"id":2048,"link":"\u5fae\u535a\u641c\u7d22-\u641c\u7d22\u5e94\u7528-falcon-k8s_master","idc":"dbl"},{"id":2240,"link":"\u5fae\u535a\u641c\u7d22-\u8fd0\u7ef4-K8S\u5e73\u53f0-etcd\u670d\u52a1","idc":"dbl"},{"id":2241,"link":"\u5fae\u535a\u641c\u7d22-\u8fd0\u7ef4-K8S\u5e73\u53f0-k8s\u96c6\u7fa4","idc":"dbl"}]}}
func OnlineGetIpnode(newnodeurl string,Ip string) string {
// 超时时间:5秒
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Get(newnodeurl+Ip)
if err != nil {
fmt.Println(err)
}
defer resp.Body.Close()
var buffer [512]byte
result := bytes.NewBuffer(nil)
for {
n, err := resp.Body.Read(buffer[0:])
result.Write(buffer[0:n])
if err != nil && err == io.EOF {
break
} else if err != nil {
fmt.Println(err)
}
}
res := result.String()
nodebodyresult := new(Ip_node)
json_err := json.Unmarshal([]byte(res),nodebodyresult)
if json_err != nil{
nullnodebodyresult := new(Null_node)
nulljson_err := json.Unmarshal([]byte(res),nullnodebodyresult)
if nulljson_err == nil{
return "0"
}
} else {
return nodebodyresult.Data[Ip][0].Idc
}
return "0"
}
主程序monitor_sensitive.go
package main
//2023-03-15,goroutine数量做限制,限制为20
import (
"./toolpligin"
"fmt"
"strings"
"time"
"sync"
)
var localfiletimemapvalue map[string]string = make(map[string]string)
var remotefiletimemapvalue map[string]string = make(map[string]string)
var targetfiletimemapvalue map[string]string = make(map[string]string)
var mingancislices []string = []string{"sensitive_word_dict","sensitive_word_list_new.dict","sensitive_word_list_new_H.dict"}
var lock sync.Mutex
var wg sync.WaitGroup
const (
filetimevalue = "../conf/filetimevalue.txt"
)
var nodeinfourl string = "http://falcon.search.weibo.com/falcons/intfs/server/getTreesByIps?ips="
func GetRcmdfiletimevalue(targetip string) (map[string]string,[]string) {
var tmpfiletimemapvalue map[string]string = make(map[string]string)
var RcmdExecErrIpclise []string = make([]string,0)
for _, minganci := range mingancislices {
filetimecommand := "date +%s -r /data1/apache2/config/" + minganci
var filetimercmdstring = "/data1/rcmd_v2/bin/rcmd -h " + targetip + " -c \"" + filetimecommand + "\""
remotemd5stringvalue, err := toolpligin.RunRcmdBackstring(filetimercmdstring)
if err != nil {
toolpligin.Error.Println(err)
}
comslice := strings.Split(remotemd5stringvalue, "\n")
tmpflag1 := strings.Contains(comslice[0], "-c0")
if tmpflag1 {
continue
}
for _,ress := range comslice{
tmpflag2 := strings.Contains(ress, "EXEC_ERR")
if tmpflag2 {
toolpligin.RequestAlarm(fmt.Sprintf("ip执行失败%s",targetip),"op_search_sms",fmt.Sprintf("ip执行失败%s",targetip))
RcmdExecErrIpclise = append(RcmdExecErrIpclise,targetip)
toolpligin.Info.Printf("EXEC_ERR_IP:%s\n",targetip)
//有ip执行失败直接返回
return tmpfiletimemapvalue,RcmdExecErrIpclise
}
}
tmpfiletimemapvalue[minganci] = comslice[1]
}
return tmpfiletimemapvalue,RcmdExecErrIpclise
}
func SecondGetRcmdfiletimevalue(targetip,ipvalue string) (map[string]string) {
tmpfiletimemapvalue,RcmdExecErrIpclise := GetRcmdfiletimevalue(targetip)
//日志记录敏感词推送机ip和目标ip
toolpligin.Info.Println(ipvalue,targetip)
if len(RcmdExecErrIpclise) != 0 {
for _,ExecErrIp := range RcmdExecErrIpclise{
newnodeinfourl := nodeinfourl+ExecErrIp
ipnodeidc := toolpligin.OnlineGetIpnode(newnodeinfourl,ExecErrIp)
if ipnodeidc == "0" {
countipcommand := "wc -l /data1/minisearch/check_sensitivewords_update/ip_list.txt"
var countiprcmdstring = "/data1/rcmd_v2/bin/rcmd -h " + ipvalue + " -c \"" + countipcommand + "\""
sourcecountipvalue, err := toolpligin.RunRcmdBackstring(countiprcmdstring)
if err != nil{
toolpligin.Info.Printf("统计敏感词%s推送机上目标ip数量出现异常",ipvalue)
}
//清理ip
sedcommand := "sed -i /" + ExecErrIp + "/d /data1/minisearch/check_sensitivewords_update/ip_list.txt"
var sediprcmdstring = "/data1/rcmd_v2/bin/rcmd -h " + ipvalue + " -c \"" + sedcommand + "\""
toolpligin.Info.Println(sediprcmdstring)
_,err = toolpligin.RunRcmdBackstring(sediprcmdstring)
if err != nil{
toolpligin.Info.Printf("%s敏感词推送机上删除%s出现异常",ipvalue,ExecErrIp)
}
targetcountipvalue, err := toolpligin.RunRcmdBackstring(countiprcmdstring)
if err != nil{
toolpligin.Info.Printf("统计敏感词%s推送机上目标ip数量出现异常",ipvalue)
}
toolpligin.Sendmail(ExecErrIp,ipvalue,fmt.Sprintf("%s\n删除敏感词推送机%s上的ip%s\n%s",sourcecountipvalue,ipvalue,targetip,targetcountipvalue))
}
}
}
return tmpfiletimemapvalue
}
func SecondCompare(targetipch chan string,ipvalue string){
tmptargetipslice := []string{}
for targetip := range targetipch {
wg.Done()
tmptrue_or_flase_slice := []bool{}
for j := 0;j <= 2;j++ { //比较三次
targetfiletimemapvalue = SecondGetRcmdfiletimevalue(targetip,ipvalue)
secondcompareresult := toolpligin.Comparefiletimevalue(remotefiletimemapvalue, targetfiletimemapvalue)
if j == 0{
//如果第一次比较就是true,则不需要继续进行另外两次比较
if secondcompareresult == true{
break
}
} else {
tmptrue_or_flase_slice = append(tmptrue_or_flase_slice,secondcompareresult)
}
time.Sleep(30 * time.Second)
}
toolpligin.Info.Printf("%s,%v\n",targetip,tmptrue_or_flase_slice)
compareres := toolpligin.InboolSlice(tmptrue_or_flase_slice,false)
if compareres == false{
toolpligin.Info.Printf("%s上的%s敏感词未更新\n",ipvalue, targetip)
tmptargetipslice = append(tmptargetipslice,targetip)
}
}
if len(tmptargetipslice) > 0{
toolpligin.RequestAlarm("敏感词未更新","op_search_sms",fmt.Sprintf("%s的敏感词没有更新",strings.Join(tmptargetipslice,",")))
}
}
func runmain(){
//获取本地保存敏感词时间
localfiletimemapvalue = toolpligin.Getlocalfiletimevalue()
//获取敏感词推送列表ip
ipslice := toolpligin.OnlineGetIpSlice()
for _, ipvalue := range ipslice {
//获取推送机的敏感词及其时间
remotefiletimemapvalue,_ = GetRcmdfiletimevalue(ipvalue)
compareresult := toolpligin.Comparefiletimevalue(localfiletimemapvalue, remotefiletimemapvalue)
toolpligin.Info.Printf("%t 敏感词推送机:%s 本地敏感词的map值:%v 敏感推送机的map值:%v\n",compareresult,ipvalue,localfiletimemapvalue,remotefiletimemapvalue)
//本地与目标机器上的文件md5不相等,说明敏感词已经更新
if compareresult == false {
//先更新本地的md5配置文件,再进一步比较目标机器所同步的机器的敏感词
toolpligin.Writefiletimevalue(filetimevalue,remotefiletimemapvalue)
toolpligin.Info.Println("本地保存敏感词map值与敏感词推送机上不一致\n")
catcommand := fmt.Sprintf("cat /data1/minisearch/check_sensitivewords_update/ip_list.txt")
var catiprcmdstring = "/data1/rcmd_v2/bin/rcmd -h " + ipvalue + " -c \"" + catcommand + "\""
targetipstringvalue, err := toolpligin.RunRcmdBackstring(catiprcmdstring)
if err != nil {
toolpligin.Error.Println(err)
}
tmp_targetipslicevalue := strings.Split(targetipstringvalue, "\n")
targetipslicevalue := tmp_targetipslicevalue[1 : len(tmp_targetipslicevalue)-2]
toolpligin.Info.Printf("已经获取敏感词推送机%s上需要推送敏感词的ip列表%v",ipvalue,targetipslicevalue)
//定义targetipch存储目标ip,并初始化为目标ip的slice长度
var targetipch = make(chan string, len(targetipslicevalue))
var ipvaluech = make(chan string,1)
ipvaluech <- ipvalue
go SecondCompare(targetipch,<-ipvaluech)
for _, targetip := range targetipslicevalue {
wg.Add(1)
targetipch <- targetip
}
} else if compareresult == true {
toolpligin.Info.Println("敏感词没有变动")
}
}
}
func main(){
for {
runmain()
wg.Wait()
time.Sleep(time.Second * 600)
}
}
网友评论