美文网首页
实战项目(二)-嘉宾发布会签到系统

实战项目(二)-嘉宾发布会签到系统

作者: 巴鶴 | 来源:发表于2021-02-02 08:28 被阅读0次

    先前学习了虫师老师的接口自动化测试教程,总结一下项目实战思路

    近期分享: 接口自动化测试从零开始学习集锦: https://www.jianshu.com/nb/49125734 --欢迎探讨!!

    1、Django开发一个发布会嘉宾签到系统;
    2、学习Html前端优化展示;
    3、保证开发的功能点测试通过;
    4、开发接口视图层代码;
    5、安装Pymsql,搭建本地数据库, 设计测试数据初始化,并对数据的插入删除做了封装
    6、安装Requests库调用接口;
    7、unittest单元测试框架运行测试;
    8、HTMLTestRunner生成接口测试报告;

    步骤1: db_fixture.mysql_db.py

    Created on 2017年8月29日
    @author: Yvon_早安阳光
    
    from pymysql import  connect,cursors
    from pymysql.err import OperationalError
    import os
    import configparser as cparser
     
    #============读取db_config.ini文件设置============
    '''获取当前路径'''
    base_dir = str(os.path.dirname(os.path.dirname(__file__)))
    print(base_dir)
    '''转换路径格式'''
    base_dir = base_dir.replace('\\', '/')
    '''获取数据库配置文件的路径'''
    file_path = base_dir + '/db_config.ini'
    print(file_path)
     
    cf = cparser.ConfigParser()
    cf.read(file_path)
    host = cf.get("mysqlconf", "host")
    print(host)
    port = cf.get("mysqlconf", "port")
    user = cf.get("mysqlconf", "user")
    print(user)
    password = cf.get("mysqlconf", "password")
    db = cf.get("mysqlconf", "db_name")
     
    #============封装MySQL基本操作============ 
    class DB:
        def __init__(self):    
            try:
                #连接数据库
                self.connection = connect(host=host,
                                    port=int(port),
                                    user=user,
                                    password=password,
                                    db=db,
                                    charset='utf8mb4',
                                    cursorclass=cursors.DictCursor)
            except OperationalError as e:
                print("Mysql Error %d: %s" % (e.args[0], e.args[1]))
                 
        #清除表数据
        def clear(self,table_name):
            real_sql = 'delete from ' + table_name + ';'
            print(real_sql)
            with self.connection.cursor() as cursor:
                cursor.execute("SET FOREIGN_KEY_CHECKS=0;")
                cursor.execute(real_sql)
            self.connection.commit()
             
        #插入数据
        def insert(self,table_name,table_data):
            for key in table_data:
                table_data[key] = "'" + str(table_data[key]) + "'"
            key = ','.join(table_data.keys())
            value = ','.join(table_data.values())
            real_sql = 'INSERT INTO ' + table_name + " (" + key + ") VALUES (" + value + ") "
            print(real_sql)
            with self.connection.cursor() as cursor:   
                cursor.execute(real_sql)
            self.connection.commit()
     
        #关闭数据库连接
        def close(self):
            self.connection.close()
             
        # 插入数据
        def init_data(self, datas):
            for table, data in datas.items():
                self.clear(table)
                for d in data:
                    self.insert(table, d)
            self.close()
     
    if __name__ == '__main__':
        db =DB()
        table_name = "sign_event"
        data = {'id':2,'name':'土豆','`limit`':3000,'status':1,'address':'杭州博物馆','start_time':'2018-05-15 14:25:42','create_time':'2016-07-20 00:25:42'}
        table_name2 = "sign_guest"
        data2 = {'realname':'alen','phone':12312341234,'email':'alen@mail.com','sign':0,'event_id':1}
      
        db.clear(table_name)
        db.insert(table_name, data)
        db.close()
    

    步骤2: db_fixture.test_data.py

    '''
    Created on 2017年8月29日
    @author: Yvon_早安阳光
    '''
    import sys
    sys.path.append('../db_fixture')
    try:
        from mysql_db import DB
    except ImportError:
        from .mysql_db import DB
     
    # 创建测试数据
    datas = {
        #发布会表数据
        'sign_event':[
            #有效的发布会数据
            {'id':1,'name':'OPPO发布会','`limit`':2000,'status':1,'address':'杭州江干区','start_time':'2019-05-20 14:00:00','create_time':'2016-07-20 00:25:42'},
            {'id':2,'name':'联想发布会','`limit`':1500,'status':1,'address':'北京博远路','start_time':'2019-05-25 14:25:38','create_time':'2016-07-20 00:25:42'},
            #参加人数限制
            {'id':3,'name':'ViVo发布会','`limit`':0,'status':1,'address':'苏州通园路','start_time':'2019-05-20 14:00:00','create_time':'2016-07-20 00:25:42'},
            #发布会未开始,状态关闭
            {'id':5,'name':'三星发布会','`limit`':2000,'status':0,'address':'南京博物馆','start_time':'2019-07-25 14:00:00','create_time':'2016-07-20 00:25:42'},
            {'id':6,'name':'锤子发布会','`limit`':1000,'status':0,'address':'南通泗水路','start_time':'2019-05-10 14:00:00','create_time':'2016-07-20 00:25:42'},
            #发布会已结束
            {'id':7,'name':'小米发布会','`limit`':800,'status':1,'address':'北京水立方','start_time':'2016-05-20 14:00:00','create_time':'2016-07-20 00:25:42'},
            {'id':8,'name':'大米发布会','`limit`':1500,'status':1,'address':'淮安高教区','start_time':'2017-05-20 14:34:56','create_time':'2016-07-20 00:25:42'},
        ],
         #嘉宾表数据
        'sign_guest':[
            #未签到有效数据
            {'id':1,'realname':'张强','phone':18612341234,'email':'andy@mail.com','sign':0,'event_id':1,'create_time':'2016-07-20 00:25:42'},
            {'id':3,'realname':'刘强','phone':17701231234,'email':'marry@163.com','sign':0,'event_id':2,'create_time':'2016-07-20 00:25:42'},
            #已签到
            {'id':2,'realname':'李强','phone':15040013210,'email':'dugu@126.com','sign':1,'event_id':1,'create_time':'2016-07-20 00:25:42'},
            #发布会人数限制、发布会未激活,发布会已结束
            {'id':5,'realname':'郭强','phone':18108090705,'email':'xiaowu@mail.com','sign':0,'event_id':3,'create_time':'2016-07-20 00:25:42'},
            {'id':6,'realname':'金俊','phone':13064931234,'email':'jinj@163.com','sign':0,'event_id':5,'create_time':'2017-07-20 00:00:00'},
     
        ],
    }
     
    # 将测试数据插入表
    def init_data():
        DB().init_data(datas)
    

    步骤3: Views_interface.py

    思路1: 创建新的一条发布会

    访问地址:http://127.0.0.1:8000/api/add_event](http://127.0.0.1:8000/api/add_event)

    1、首先,判断eid、name、limit、address、start_time 等字段均不能为空,否则JsonResponse()返回相应的状态码和提示。JsonResponse()是一个非常有用的方法,它可以直接将字典转化成Json 格式返回到客户端。
    2、接下来,判断发布会id 是否存在,以及发布会名称(name)是否存在;如果存在将返回相应的状态码和提示信息。将数据字典作为整个返回字典中data 对应的值返回。
    3、再接下来,判断发布会状态是否为空,如果为空,将状态设置为1(True)。
    4、最后,将数据插入到Event表,在插入的过程中如果日期格式错误,将抛出ValidationError异常,接收该异常并返回相应的状态和提示,否则,插入成功,返回状态码200 和“add event success”的提示。

    思路2: 查询发布会接口

    访问地址: http://127.0.0.1:8000/api/get_event_list/?eid=4](http://127.0.0.1:8000/api/get_event_list/?eid=4
    http://127.0.0.1:8000/api/get_event_list/?name=](http://127.0.0.1:8000/api/get_event_list/?name=发布会

    1、通过GET 请求接收发布会id 和name 参数。两个参数都是可选的。首先,判断当两个参数同时为空,接口返回状态码10021,参数错误。
    2、如果发布会id 不为空,优先通过id 查询,因为id 的唯一性,所以,查询结果只会有一条,将查询结果以key:value 对的方式存放到定义的event 字典中,并将数据字典作为整个返回字典中data 对应的值返回。
    3、name 查询为模糊查询,查询数据可能会有多条,返回的数据稍显复杂;首先将查询的每一条数据放到一个字典event 中,再把每一个字典再放到数组datas 中,最后再将整个数组做为返回字典中data 对应的值返回。

    思路3: 添加嘉宾接口

    访问地址: http://127.0.0.1:8000/api/add_guest/](http://127.0.0.1:8000/api/add_guest/

    通过POST 请求接收嘉宾参数:关联的发布会id、姓名、手机号和邮箱等参数。
    1、首先,判断eid、realname、phone 等参数均不能为空。
    2、接下来,判断嘉宾关联的发布会id 是否存在,以及关联的发布会状态是否为True(即1),如果不存在或不为True,将返回相应的状态码和提示信息。
    3、再接下来的步骤是判断当前时间是否大于发布会时间,如果大于则说明发布已开始,或者早已经结束。那么该发布会就应该不能允许再添加嘉宾。
    4、最后,插入嘉宾数据,如果发布会的手机号重复则抛IntegrityError 异常,接收该异常并返回相应的状态码和提示信息。如果添加成功,则返回状态码200 和“add guest success”的提示。

    代码参考

    <meta charset="utf-8">
    from django.http.response import HttpResponse, JsonResponse
    
    '''查询数据库'''
    from sign.models import Event,Guest
    from django.core.exceptions import ValidationError, ObjectDoesNotExist
    from django.db.utils import IntegrityError
    import time
    from django.template.context_processors import request
    
    #添加发布会接口
    def add_event(request):
        eid = request.POST.get('eid','')     #发布会id
        name = request.POST.get('name','')   #发布会标题
        limit = request.POST.get('limit','')  #限制人数
        status = request.POST.get('status','')  #状态
        address = request.POST.get('address','')  #地址
        start_time = request.POST.get('start_time','')  #发布会时间
    
        '''必填项为空'''
        if eid =='' or name == '' or limit == '' or address == '' or start_time == '':
            return JsonResponse({'status':207,'message':'parameter error'})
        '''event id重复'''
        results = Event.objects.filter(id = eid)
        if results:
            return JsonResponse({'status':201,'message':'event id already exists'})
        '''event name 重复'''
        result = Event.objects.filter(name = name)
        if result:
            return JsonResponse({'status':201,'message':'event name already exists'})
    
        '''创建新的一条发布会''' 
        if status == '': 
            status = 1  
        try:
            Event.objects.create(id=eid,name=name,limit=limit,address=address,status=int(status),start_time=start_time)
        except ValidationError as e:
            error = 'start_time format error. It must be in YYYY-MM-DD HH:MM:SS format.' 
            return JsonResponse({'status':202,'message':'start_tiem format error'})
        return JsonResponse({'status':200,'message':'add event success'})
    
    #查询发布会接口
    def get_event_list(request):
        eid = request.GET.get('eid','')
        name = request.GET.get('name','')
        if eid == '' and name == '':
            return JsonResponse({'status':207,'message':'parameter error'})
        if eid !='':
            event = {}
            try:
                result = Event.objects.get(id = eid)
            except ObjectDoesNotExist:
                return JsonResponse({'status':201,'message':'query result is empty'})
            else:
                event['name'] = result.name
                event['limit'] = result.limit
                event['status'] = result.status
                event['address'] = result.address
                event['start_time'] = result.start_time
                return JsonResponse({'status':200,'message':'success','data':event})
    
        if name !='':
            datas = []
            results = Event.objects.filter(name__contains = name)  #发布会名称模糊查询
            if results:
                for r in results:
                    event = {}
                    event['name'] = r.name
                    event['limit'] = r.limit
                    event['status'] = r.status
                    event['address'] = r.address
                    event['start_time'] = r.start_time
                    datas.append(event)
                return JsonResponse({'status':200,'message':'success','data':datas})
            else:
                return JsonResponse({'status':201,'message':'query result is empty'})
    
    #添加嘉宾接口
    def add_guest(request):
        eid = request.POST.get('eid','')    #关联发布会id 
        realname = request.POST.get('realname','') #姓名
        phone = request.POST.get('phone','') #手机号
        email = request.POST.get('email','') #邮箱
    
        '''必填项为空'''
        if eid =='' or realname == '' or phone == '' :
            return JsonResponse({'status':207,'message':'parameter error'})
        '''eid不存在'''
        result = Event.objects.filter(id=eid)   
        if not result:
            return JsonResponse({'status':202,'message':'event id null'})
        '''status状态为False,不可再添加嘉宾'''
        result = Event.objects.get(id=eid).status
        if not result:
            return JsonResponse({'status':203,'message':'event status is not available'})
    
        '''限制发布会人数'''
        event_limit = Event.objects.get(id=eid).limit    #发布会限制人数
        guest_limit = Guest.objects.filter(event_id=eid) #发布会已添加的嘉宾数
        if len(guest_limit) >= event_limit:
            return JsonResponse({'status':204,'message':'event number is full'})
        '''发布会已结束'''
        event_time = Event.objects.get(id=eid).start_time #获取发布会开始时间
        print(event_time)
    #     etime = str(event_time).split(".")[0]  #截取小数点左边的时间数据,精确到秒,并转成字符串
        etime = str(event_time).split('+')[0]  # 根据实际的打印结果截取右侧的数据,打印出来的结果是"+00:00",故截取+号右侧的数据
        print(etime)
        timeArray = time.strptime(str(etime), "%Y-%m-%d %H:%M:%S") #转换成时间戳的格式
        e_time = int(time.mktime(timeArray)) #获取最终的发布会的开始时间
        print(e_time)
        now_time = str(time.time())  #获取当前时间
        print(now_time)
        ntime = now_time.split(".")[0] #截取小数点左边的时间数据
        n_time = int(ntime)
        print(n_time)
    
        #当前时间大于发布会时间
        if n_time >= e_time:
            return JsonResponse({'status':205,'message':'event has started'})
        try:
            Guest.objects.create(realname=realname,phone=int(phone),email=email,sign=0,event_id=int(eid))
        except IntegrityError:
            return JsonResponse({'status':206,'message':'the event guest phone number repeat'}) #嘉宾重复添加手机号
        return JsonResponse({'status':200,'message':'add guest success'})
    

    步骤4: interface.add_event_test.py

    <meta charset="utf-8">
    '''
    Created on 2017年8月29日
    @author: Yvon_早安阳光
    '''
    #coding:utf-8
    
    import unittest
    import requests
    import os, sys
    
    parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    sys.path.insert(0, parentdir)
    from db_fixture import test_data
    
    class AddEventTest(unittest.TestCase):
        ''' 添加发布会 '''
        def setUp(self):
            self.base_url = "[http://127.0.0.1:8000/api/add_event/](http://127.0.0.1:8000/api/add_event/)"
        def tearDown(self):
            print(self.result)
        def test_add_event_all_null(self):
            '''所有参数为空'''
            payload = {'eid':'','name':'','status':'','limit':'','address':"",'start_time':''}
            r = requests.post(self.base_url,data = payload)
            self.result = r.json()
            self.assertEqual(self.result['status'], 207)
            self.assertEqual(self.result['message'], 'parameter error')
    
        def test_add_event_eid_exist(self):
            '''eid 已存在'''
            payload = {'eid':1,'name':'小金理财发布会','status':'1','limit':'2000',
                       'address':"江苏建湖",'start_time':'2020-05-20 14:00:00'}
            r = requests.post(self.base_url,data = payload)
            self.result = r.json()
            self.assertEqual(self.result['status'], 201)
            self.assertEqual(self.result['message'], 'event id already exists')
    
        def test_add_event_name_exist(self):
            '''名称已存在'''
            payload = {'eid':9,'name':'大米发布会','status':'1','limit':'1500',
                       'address':"杭州江干庆春路",'start_time':'2019-05-20 15:00:00'}
            r = requests.post(self.base_url,data = payload)
            self.result = r.json()
            self.assertEqual(self.result['status'], 201)
            self.assertEqual(self.result['message'], 'event name already exists')
    
        def test_add_event_data_type_error(self):
            '''日期格式错误'''
            payload = {'eid':9,'name':'悟空理财发布会','status':'1','limit':'1000',
                       'address':"扬州中华路",'start_time':'2019'}
    
            r = requests.post(self.base_url,data = payload)
            self.result = r.json()
            self.assertEqual(self.result['status'], 202)
            self.assertEqual(self.result['message'],'start_tiem format error')
    
        def test_add_event_success(self):
            payload = {'eid':10,'name':'金立发布会','status':'1','limit':'1500',
                       'address':"盐城中华路",'start_time':'2020-05-10 13:00:00'}
            r = requests.post(self.base_url,data = payload)
            self.result = r.json()
            self.assertEqual(self.result['status'], 200)
            self.assertEqual(self.result['message'],'add event success')
    
    if __name__ == '__main__':
        test_data.init_data() #初始化接口测试数据
        unittest.ma
    

    步骤5: Send_mail_report.py

    '''
    Created on 2017年9月4日
    @author: Yvon_早安阳光
    '''
    import time, sys, unittest, smtplib, os
    sys.path.append('./interface')
    sys.path.append('./db_fixture')
    from HTMLTestRunner import HTMLTestRunner
    from db_fixture import test_data
    from email.mime.multipart import MIMEMultipart
    from email.header import Header
    # from email.mime.application import MIMEApplication
    from email.mime.text import MIMEText
     
    #======================定义发送邮件======================
     
    def send_mail(file_new):
        '''file_new是指最新的测试报告'''
        f = open(file_new,'rb')
        mail_body = f.read()
        f.close()
         
        msg = MIMEMultipart()
        '''邮件标题'''
        msg['Subject'] = Header('发布会嘉宾签到接口测试报告' ,'utf-8')
         
        '''邮件正文'''
        zhengwen = "各位,详情测试结果请见附件文档"
        msg.attach(MIMEText(zhengwen, 'plain')) 
         
        '''附件测试报告 '''
        att= MIMEText(mail_body,'html','utf-8')
        att.add_header('Content-Disposition', 'attachment', filename='Guest Manage System Interface_TestReport.html')
        msg.attach(att) 
            #发送邮件
        smtp = smtplib.SMTP()
        smtp.connect('smtp.exmail.qq.com')
        smtp.login('fajin@bertadata.com','36#20qa20A')
        smtp.sendmail('fajin@bertadata.com','huiselanse@yeah.net',msg.as_string())
        print('email has send out !')
        smtp.quit()
         
    #======================查找测试报告的目录,找到最新生成的测试报告======================
    '''按时间获取最新的测试报告'''
    def new_report(testreport):
        lists = os.listdir(testreport)
        lists.sort(key = lambda fn: os.path.getatime(testreport + '\\' + fn))
        file_new = os.path.join(testreport,lists[-1])
        print(file_new)
        return file_new
     
    #指定测试用例为当前文件夹下的目录
    test_dir = './interface'
    discover = unittest.defaultTestLoader.discover(test_dir, pattern='*_test.py')
     
    if __name__ == '__main__':
        test_data.init_data() #初始化测试数据     
        #报告存放的路径
        result_report_dir= 'result_report/'
        '''实时当前的时间'''
        now = time.strftime("%Y-%m-%d %H_%M_%S")
        file_name = result_report_dir + '\\' + now +'result.html'
        fp = open(file_name,'wb')    
        #定义测试报告
        runner =HTMLTestRunner(stream = fp,title ='嘉宾签到接口测试报告',description='接口用例执行情况:')                          
        runner.run(discover)
        fp.close() #关闭报告文件
        new_report = new_report(result_report_dir)
        send_mail(new_report)
    
    测试报告1.png
    测试报告2.png

    相关文章

      网友评论

          本文标题:实战项目(二)-嘉宾发布会签到系统

          本文链接:https://www.haomeiwen.com/subject/ttsstltx.html