美文网首页
Python脚本查询订单失败的数据并回推给第三方服务

Python脚本查询订单失败的数据并回推给第三方服务

作者: ambeer | 来源:发表于2023-04-25 16:23 被阅读0次

    步骤详情:
    1 定时任务 每天下午9点执行
    简易功能代码如下:
    schedule.every().day.at("16:00").do(job)
    2 查询订单失败数据
    3 截取有效信息后查询订单明细数据,并加上新的规则,协商好的code、msg
    4 推送失败时,发送邮件

    其他细节:
    关闭命令行python脚本也会定时执行(生成日志文件到 ItemList_yu_gbk_0214.log),命令如下:
    nohup python3 ItemList_yu_gbk_0214.py > ItemList_yu_gbk_0214.log

    收到邮件效果


    WeChat Screenshot_20230214175616.png

    服务器上文件


    ME1676368494413.png

    总的功能代码如下

    #!/usr/bin/python
    #  -*-coding:utf8 -*-
    # nohup python3 ItemList_yu_gbk_0214.py > ItemList_yu_gbk_0214.log
    from io import DEFAULT_BUFFER_SIZE
    from time import sleep
    import traceback
    import pymysql
    import smtplib
    import zipfile
    from email import encoders
    from email.header import Header
    from email.mime.text import MIMEText
    from email.mime.application import MIMEApplication
    from email.mime.multipart import MIMEMultipart    # 使用MIMEMultipart来标示这个邮件是多个部分组成的
    from email.mime.base import MIMEBase
    from email.mime.text import MIMEText   # 定义邮件内容
    from email.utils import formataddr
    import os,sys,multitesting
    import time  
    import shutil  
    import datetime
    import calendar
    import schedule
    import mimetypes
    import pandas as pd
    from pathlib import Path
    
    
    
    item_list_dict = {}
    item_dict = {}
    PAUSE_TIME_LIST = [['0:10', '7:00'], ['12:00', '13:30']]
    #---init Time---
    begintime =  time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
    
    
    def isPause():
        _now_time = datetime.datetime.now()
    
        for _time in PAUSE_TIME_LIST:
            begin_time_str = _time[0]
            end_time_str = _time[1]
            begin_time = datetime.datetime.strptime(begin_time_str, '%H:%M')
            end_time = datetime.datetime.strptime(end_time_str, '%H:%M')
    
            _now_time_str = _now_time.strftime('%H:%M')
            now_time = datetime.datetime.strptime(_now_time_str, '%H:%M')
    
            if (now_time >= begin_time) and (now_time <= end_time):
                return True
    
        return False
    def getAlltestID(_yhs, _begin_time, _end_time):
        sql = "SELECT * FROM dertest_test_info where yhstx_nsrtest = '{}' and create_time > '{}'  and create_time <= '{}' and dertest_status ='0'".format(
            _yhs, _begin_time, _end_time)
        global mycat_db
        cursor = mycat_db.cursor()
        cursor.execute(sql)
        rows = cursor.fetchall()
    
        if rows is None:
            return ''
        if len(rows) == 0:
            return ''
        # print("test:" + str(len(rows)))
        return rows
    
    # '?????,(0:?????;1:?????;2:??????;3:??????;)'
    def getdertestIDBytestID(_PID):
        sql = "SELECT * FROM dertest_intext_info WHERE dertest_protext_info_id = '" + str(
            _PID) + "'"
        # print(sql)
        global mycat_db
        cursor = mycat_db.cursor()
        cursor.execute(sql)
        rows = cursor.fetchall()
        if len(rows) == 0:
            return ''
        else:
            print("getdertestIDBytestID:" + str(len(rows)))
    
        return rows
    
    def getdertestInfoBytestID(_PID):
        sql = "SELECT ifnull(ghf_yh,''),ifnull(ghf_zh,''),ifnull(ghf_dz,''),ifnull(ghf_dh,''),ifnull(nsrmc,''),ifnull(nsrtest,''),ifnull(bz,''),id FROM dertest_info WHERE  test_id = '" + str(_PID) + "'"
        # print(sql)
        global mycat_db
        cursor = mycat_db.cursor()
        cursor.execute(sql)
        rows = cursor.fetchall()
        if len(rows) == 0:
            return ''
        else:
            print("getGhfBankBytestID:" + str(len(rows)))
        return rows
    
    def getSQDHBytestID(_obr_id):
        sql = "SELECT sqdh FROM dertest_batest_request WHERE  id = '" + str(_obr_id) + "'"
        # print(sql)
        global mycat_db
        cursor = mycat_db.cursor()
        cursor.execute(sql)
        row = cursor.fetchone()
        if row is None:
            return ""
    
        return str(row[0])
    
    def getIsMail(_dertest_id):
       sql = "SELECT * FROM dertest_intest_info_ext WHERE dertest_id ='"+_dertest_id+"'"
       global mycat_db
       cursor = mycat_db.cursor()
       cursor.execute(sql)
       row = cursor.fetchone()
       if row is None:
           return "非邮寄"
       else:
           print("getIsMail:" + str(len(row)))
       return "邮寄"
    
    def getItemsBydertestID(_OID,_sqdh,_ddh,_fpdm, _fphm, _yhs_mc, _yhs, _ghf_mc, _ghf_nsrtest, _ghf_bank, _ghf_address,_kprq,_kpr,_kplx,_fpbeizhu,_kpzt,_qingdanbzhi,_zuofeibz,_dingdanlx,_youjiFangshi):
        sql = "SELECT * FROM dertest_item_info WHERE dertest_info_id = '" + str(_OID) + "'"
        global mycat_db
    
        global item_list_dict
        cursor = mycat_db.cursor()
        cursor.execute(sql)
        rows = cursor.fetchall()
        # print("item:" + str(len(rows)))
        for _item in rows:
            if isBadItem(_item):
                continue
            itemInfo = _sqdh + ','
            itemInfo +=str(_item[0]).split("*")[2]+','
            itemInfo +=str(_item[0]).split("*")[1]+','
            itemInfo += _youjiFangshi
    
            writeItemInfo(_yhs,itemInfo)
    
    def makeTitle(_yhs):
        itemInfo = "申号,据号,银行账号,地址电话合计金额(含税),合计金额(不含税),合计税额,税率,订单类型,商品名称,简称,邮式"
        writeItemInfo(_yhs, itemInfo)
    # 'xmmc,ggxh,xmdw,xmdj,xmsl,xmje'
    def isBadItem(_item):
        #    if _item[3] == '1090505010000000000':
        #        return False
        #    if _item[3] == '1090414010000000000':
        #        return False
        #    if _item[3] == '1030204020000000000':
        #        return False
    
        return False
    
    def makeALLItemsToList(_yhs, my_pid_list):
        # print("----------------------------????????????????-----------------------")
        makeTitle(_yhs)
        for _pid in my_pid_list:
            dertestid_list = getdertestIDBytestID(_pid[0])
            if dertestid_list == '':
                continue
            ghf_info = getdertestInfoBytestID(_pid[0])
            dertest_id = str(ghf_info[0][7])
            youjifangshi = getIsMail(dertest_id)
            qingdanbz = str(dertestid_list[0][-2])
            OID = dertestid_list[0][0]
            obr_id = str(_pid[3])
            if qingdanbz == "1":
                qingdanbz = "有"
            if qingdanbz == "0":
                qingdanbz = "无"
    
            if (dertestid_list == ''):
                continue
            for _dertestid in dertestid_list:
                while (isPause() == True):
                    sleep(60)
                    print('Sleeping........')
                    continue
    
                getItemsBydertestID(OID,rere,ddrh,saf,sdf,nsr_mc,_yhs, dgf,gd,yui,gd,vgh,
                                  iiy,a,gd,kpzt,gd,zfbz,h,fgh)
    
    def writeItemInfo(yhs_mc,infos):
        temp_s = infos
        # path = yhs_mc+".csv"
        path ='/home/tom/data/yu/'+yhs_mc+".csv"
        f = open(path, "a+", encoding="gbk")
        f.write("%s" % (temp_s) + "\n")
        f.close()
    
    def getyhsList():
        file = open("yhs_list.txt")
        # file = open("C:\\Users\\test\\Desktop\\111\\yhs_list.txt")
        yhs_list = file.readlines()
        return yhs_list
    
    def countAllItems(_yhs_list, _begin_time, _end_time):
        global item_dict
        global item_list_dict
        yhs_index = 0
    
        for _yhs in _yhs_list:
            yhs_index += 1
            if os.path.exists(_yhs.strip('\n')+".csv"):
                os.remove(_yhs.strip('\n')+".csv")
            test_list = getAlltestID(_yhs.strip('\n'), _begin_time, _end_time)
            makeALLItemsToList(_yhs.strip('\n'), test_list)
    
    def makeDBTOConnection():
        MYCAT_HOST = "101.72.237.88"
        MYCAT_PORT = 8066
        MYCAT_USER = "txds"
        MYCAT_PASSWORD = "2erOxFSAyrIOeLKJODSA3g6k"
        MYCAT_DATABASE = "txds_sales_dertest"
        toDBcon = pymysql.connect(
    
            host=MYCAT_HOST,  # IP??MySQL??????????IP???
            port=MYCAT_PORT,  # ???????3306???????????
            user=MYCAT_USER,  # ??????????
            password=MYCAT_PASSWORD,  # ???????????
            database=MYCAT_DATABASE,  # ???????????
            charset='utf8'  # ????????????'utf-8'
        )
        return toDBcon
    
    def makeDBTestConnection():
        MYCAT_HOST = "102.14.234.89"
        MYCAT_PORT = 3306
        MYCAT_USER = "txds"
        MYCAT_PASSWORD = "txds@123"
        MYCAT_DATABASE = "txds_sales_dertest_hbq"
        toDBcon = pymysql.connect(
    
            host=MYCAT_HOST,  # IP??MySQL??????????IP???
            port=MYCAT_PORT,  # ???????3306???????????
            user=MYCAT_USER,  # ??????????
            password=MYCAT_PASSWORD,  # ???????????
            database=MYCAT_DATABASE,  # ???????????
            charset='utf8'  # ????????????'utf-8'
        )
        return toDBcon
    
    def sendtxtmail():
        filepath = '/home/tom/data/yuData/腾讯读书每日发数明细汇总'+datetime.datetime.now().strftime("%Y-%m-%d")+".zip" #压缩后的文件名
    
        smtp_server = "smtp.mxhichina.com"  # 发送邮箱服务器
        username = 'yang@163.com'
        password = 'IPMS*5873957'
        sender = 'yang@163.com'  # 发送者的邮箱 
        receivers = ["gao@163.com","guo@163.com",
                     "guo@.163.cn","wang@.163.cn"]
        EMAIL_FROM_NAME = '税组'   # 自定义发件人名称
    
        # time = datetime.datetime.today().strftime("%m-%d %H:%M")
        msg = MIMEMultipart()
        # 邮件正文
        msg.attach(MIMEText(" \r\n  腾讯读书每日发数明细汇总,结果请查看附件。",'plain','utf-8'))   # 文本内容换行\r\n
        msg['From'] = formataddr(pair=(EMAIL_FROM_NAME, sender))     # 自定义发件人的名称
        msg['To'] = ";".join(receivers)  # 发送给多个好友
        # subject = "{}腾讯读书每日发数明细汇总".format(time)
        subject = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+"腾讯读书发数明细汇总"
        msg['Subject'] = subject
    
        data = open(filepath, 'rb')
        ctype, encoding = mimetypes.guess_type(filepath)
        if ctype is None or encoding is not None:
            ctype = 'application/octet-stream'
        maintype, subtype = ctype.split('/', 1)
        file_msg = MIMEBase(maintype, subtype)
        file_msg.set_payload(data.read())
        data.close()
        encoders.encode_base64(file_msg)  # 把附件编码
        file_msg.add_header('Content-Disposition', 'attachment', filename="腾讯读书每日发数明细汇总"+datetime.datetime.now().strftime("%Y-%m-%d")+".zip")  # 修改邮件头
        msg.attach(file_msg)
        try:
            server = smtplib.SMTP(smtp_server)
            server.login(username,password)
            server.sendmail(sender,receivers,msg.as_string())
            server.quit()
            print("=================================== 邮件发送成功 =================================== %s" % begintime)
            print("删除压缩包文件路径:", filepath)
            # 发送成功后删除压缩包文件
            if os.path.exists(filepath):
                os.remove(filepath)
                print("=================================== 压缩包删除成功 =================================== %s" % begintime)
            shutil.rmtree('/home/tom/data/yu')  
            os.mkdir('/home/tom/data/yu')  
            print("=================================== 压缩包删除成功 =================================== %s" % begintime)
        except Exception as err:
            print("=================================== 邮件发送失败 =================================== %s" % begintime)
            print(err)
    
    def batest_zip(start_dir,zip_file):
        # start_dir要压缩的文件路径
        # zip_file 输出zip文件的路径
        zip_file = zip_file + '.zip'
        z = zipfile.ZipFile(zip_file, 'w', zipfile.ZIP_DEFLATED)
        print(z)
        for path, dirname, file_name in os.walk(start_dir):
    #         print("文件夹根路径:", path)
            fpath = path.replace(start_dir, '') # 去除根路径名称
    #         print("--去除根路径:", fpath)
            fpath = fpath and fpath + os.sep   # 在原fpath加上\
    #         print("****去除根路径+\ :", fpath)
    
            for filename in file_name: # 逐个循环读取文档名称
    #             print('--', fpath+filename)
    #             fpath + filename完整构成每个文档的去根绝对路径
    #             s = os.path.join(path, filename)   # 补齐全部的绝对路径
    #             print('*-*',s)
    
                z.write(os.path.join(path, filename), fpath + filename) # 实现在输出路径的Zip压缩操作
        z.close()
        return zip_file
    
    # 定时任务调用的函数
    def job():
        print("=================================== 汇总数据开始 =================================== %s" % begintime)
        global mycat_db    
        mycat_db = makeDBTOConnection()
        yhs_list = getyhsList()
        # 1汇总数据
        countAllItems(yhs_list, datetime.datetime.now().strftime("%Y-%m")+"-01 00:00:00", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        print("=================================== 汇总数据结束 =================================== %s" % begintime)
    
        # 2压缩文件
        print("=================================== 压缩文件开始 =================================== %s" % begintime)
        # start_dir要压缩的文件路径
        # zip_file 输出zip文件的路径
        batest_zip("/home/tom/data/yu","/home/tom/data/yuData/腾讯读书每日发数明细汇总"+datetime.datetime.now().strftime("%Y-%m-%d"))
        
        print("=================================== 压缩文件结束 =================================== %s" % begintime)
        time.sleep(1)
        # 3发送邮件并删除文件
        print("=================================== 邮件开始发送 =================================== %s" % begintime)
        sendtxtmail()
        return 'main func over'
    
    # 设置定时任务启动的时间 ,每天16:00 启动
    schedule.every().day.at("16:00").do(job)
    while True:
        schedule.run_pending()
    

    相关文章

      网友评论

          本文标题:Python脚本查询订单失败的数据并回推给第三方服务

          本文链接:https://www.haomeiwen.com/subject/kvhmjdtx.html