美文网首页
golang mongo

golang mongo

作者: Feng_Sir | 来源:发表于2019-04-23 16:00 被阅读0次
    package main
    
    import (
        "encoding/json"
        "gopkg.in/mgo.v2"
        "jy/mongodbutil"
        "log"
        mu "mfw/util"
        "net"
        "net/rpc"
        "path"
        qu "qfw/util"
        "strings"
    
        "gopkg.in/mgo.v2/bson"
    )
    
    var udpclient mu.UdpClient //udp对象
    var Sysconfig map[string]interface{}
    var MgoIP, MgoDB, MgoC, MgoFileFiled string
    var ChanB chan bool
    
    func init() {
        qu.ReadConfig(&Sysconfig)
        MgoIP = qu.ObjToString(Sysconfig["mongodb_one_ip"])
        MgoDB = qu.ObjToString(Sysconfig["mongodb_one_db"])
        MgoC = qu.ObjToString(Sysconfig["mongodb_one_c"])
        MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_one_filefiled"], "projectinfo")
        if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
            log.Println("获取配置文件参数失败", Sysconfig)
            return
        }
        mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
        ChanB = make(chan bool, qu.IntAllDef(Sysconfig["channelsize"], 5))
    }
    
    func main() {
        log.Println(Sysconfig)
        udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
        udpclient.Listen(processUdpMsg)
        log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
        b := make(chan bool, 1)
        <-b
    }
    //  "file2text": "192.168.3.207:1234",
    func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
        defer qu.Catch()
        switch act {
        case mu.OP_TYPE_DATA:
            var mapInfo map[string]interface{}
            err := json.Unmarshal(data, &mapInfo)
            if err != nil {
                log.Println("json err :", err, string(data))
                return
            }
            log.Println(mapInfo)
            gid := strings.TrimSpace(mapInfo["gtid"].(string))
            lid := strings.TrimSpace(mapInfo["lteid"].(string))
            if bson.IsObjectIdHex(gid) && bson.IsObjectIdHex(lid) {
    
                MgoSession, err := mgo.Dial(MgoIP)
                defer MgoSession.Close()
                if err != nil {
                    log.Println("mongo err:",err)
                    return
                }
                iter := MgoSession.DB(MgoDB).C(MgoC).Find(
                    bson.M{
                        "_id": bson.M{
                            "$gte": bson.ObjectIdHex(gid),
                            "$lte": bson.ObjectIdHex(lid),
                        },
                        MgoFileFiled: bson.M{
                            "$ne": nil,
                        },
                    },).Select(bson.M{"_id": 1,MgoFileFiled:1}).Iter()
    
                //if findAll, b := mongodbutil.Mgo.Find(MgoC,
                //  bson.M{
                //      "_id": bson.M{
                //          "$gte": bson.ObjectIdHex(gid),
                //          "$lte": bson.ObjectIdHex(lid),
                //      },
                //      MgoFileFiled: bson.M{
                //          "$ne": nil,
                //      },
                //  },
                //  //if findAll, b := mongodbutil.Mgo.Find(MgoC, bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid)}},
                //  nil, `{"_id":"1",`+MgoFileFiled+`:"1"}`, false, -1, -1); !b {
                //  log.Println("查询数据失败 :", string(data))
                //} else {
                var result *map[string]interface{}
                for iter.Next(&result){
                    //for _, v := range *result {
                        qmap := *qu.ObjToMap(result)
                        mid := qmap["_id"]
                        if v, ok := qmap[MgoFileFiled].(map[string]interface{}); !ok {
                            mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
                                "$set": bson.M{
                                    "updatefileErr": 1,
                                },})
                            //log.Println(mid, "mgo 转换异常", MgoFileFiled)
                            continue
                        } else {
                            switch v["attachments"].(type) {
                            case map[string]interface{}:
                                att := v["attachments"].(map[string]interface{})
                                for _, vaatt := range att {
                                    if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
                                        mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
                                            "$set": bson.M{
                                                "updatefileErr": 1,
                                            },})
                                        //log.Println(mid, "mgo 结构体转换失败", vaatt)
                                        continue
                                    } else {
                                        ChanB <- true
                                        go save(mid, qmap, fileinfo)
    
                                    }
                                }
                            }
                        }
                        //fileMap := *qu.ObjToMap(qmap["projectinfo"])
                        //fmt.Println(fileMap["attachments"])
                    }
                //}
                defer iter.Close()
    
                //fmt.Println(len(*findAll))
                    //if len(*findAll) <= 0 {
                    //  log.Println("查询数据为空 :", string(data))
                    //  return
                    //}
                    //for _, v := range *findAll {
                    //  qmap := *qu.ObjToMap(v)
                    //  mid := qmap["_id"]
                    //  if v, ok := qmap[MgoFileFiled].(map[string]interface{}); !ok {
                    //      log.Println(mid, "mgo 转换异常", MgoFileFiled)
                    //      continue
                    //  } else {
                    //      switch v["attachments"].(type) {
                    //      case map[string]interface{}:
                    //          att := v["attachments"].(map[string]interface{})
                    //          for _, vaatt := range att {
                    //              if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
                    //                  log.Println(mid, "mgo 结构体转换失败", vaatt)
                    //                  continue
                    //              } else {
                    //                  ChanB <- true
                    //                  go save(mid, qmap, fileinfo)
                    //
                    //              }
                    //          }
                    //      }
                    //  }
                    //  //fileMap := *qu.ObjToMap(qmap["projectinfo"])
                    //  //fmt.Println(fileMap["attachments"])
                    //}
                //}
            } else {
                log.Println("开始id或结束id参数错误:", string(data))
            }
    
        case mu.OP_NOOP: //下个节点回应
            log.Println("接收成功", string(data))
    
        }
    
    }
    func save(mid interface{}, qmap, fileinfo map[string]interface{}) {
        defer qu.Catch()
        defer func() {
            <-ChanB
        }()
        type FileData struct {
            Fid     string
            Name    string
            Type    string //文件类型png、jpg、tif、swf(ocr识别);pdf,doc,docx,xls
            Content string //识别内容
        }
        client, err := rpc.DialHTTP("tcp", qu.ObjToString(Sysconfig["file2text"]))
        if err != nil {
            mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
                "$set": bson.M{
                    "updatefileErr": 1,
                },})
            log.Println(mid, "rpc err :", err)
            return
        }
        defer client.Close()
        var reply []byte
        //bs, _ := ioutil.ReadFile("1.docx")
        fileData := &FileData{
            Name: qu.ObjToString(fileinfo["filename"]),
            Fid:  qu.ObjToString(fileinfo["fid"]), //附件id
            Type: path.Ext(qu.ObjToString(fileinfo["filename"]))[1:],
        }
        //log.Println(mid, fileData)
        err = client.Call("FileToText.FileToContext", fileData, &reply)
        if err != nil {
            mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
                "$set": bson.M{
                    "updatefileErr": 1,
                },})
            log.Println(mid, "call ocr error:", err)
            return
        }
        //fileinfo["ftype"] = "doc"
        //reply = []byte("jdsfkldasjflkj")
        //fileinfo["ftype"] = "zip"
        //testfiles := []map[string]interface {
        //}{
        //  {"Name": "test4.doc", "Content": "test4context", "Type": "doc", "Size": "40M"},
        //  {"Name": "test5.pdf", "Content": "test5context", "Type": "pdf", "Size": "50M"},
        //  {"Name": "test6.xlsx", "Content": "test6context", "Type": "xlsx", "Size": "60M"},
        //}
        //reply, _ = json.Marshal(testfiles)
        if len(reply) == 0{
            mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
                "$set": bson.M{
                    "updatefileErr": 1,
                },})
            log.Println(mid, "rpc返回数据为空:", string(reply))
            return
        }
        log.Println(mid, string(reply))
        rdata := make(map[string]interface{})
        if err := json.Unmarshal(reply, &rdata); err != nil {
            mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
                "$set": bson.M{
                    "updatefileErr": 1,
                },})
            log.Println(mid, "rpc返回数据解析失败:", err)
            return
        }
        if rdata["err"] == nil || rdata["err"] == "null" || rdata["err"] == "" {
            if qu.ObjToString(fileinfo["ftype"]) == "rar" || qu.ObjToString(fileinfo["ftype"]) == "zip" {
                fileinfo["content"] = rdata["contextc"]
            } else {
                fileinfo["content"] = rdata["context"]
            }
            if !mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
                "$set": bson.M{
                    MgoFileFiled: qmap[MgoFileFiled],
                    "updatefileErr":0,
                },
            }) {
                mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
                    "$set": bson.M{
                        "updatefileErr": 1,
                    },})
                log.Println(mid, "mongo更新数据失败")
            } else {
                log.Println(mid, "mongo更新数据成功")
            }
        } else {
            mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
                "$set": bson.M{
                    "updatefileErr": 1,
                },})
            log.Println(mid, "调用rpc服务解析异常:", rdata["err"])
        }
        //if qu.ObjToString(fileinfo["ftype"]) == "zip" || qu.ObjToString(fileinfo["ftype"]) == "rar" {
        //  fileDatas := make([]map[string]interface{}, 0)
        //  if err := json.Unmarshal(reply, &fileDatas); err != nil {
        //      log.Println("json转换错误", mid, err)
        //      return
        //  }
        //  fileinfo["content"] = fileDatas
        //} else {
        //  fileinfo["content"] = string(reply)
        //}
        //if !mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
        //  "$set": bson.M{
        //      MgoFileFiled: qmap[MgoFileFiled],
        //  },
        //}) {
        //  log.Println(mid, "更新数据失败")
        //} else {
        //  log.Println(mid, "更新数据成功")
        //}
    
    }
    
    

    相关文章

      网友评论

          本文标题:golang mongo

          本文链接:https://www.haomeiwen.com/subject/kfavgqtx.html