执行流程如下
那么根据流程所需要的功能,需要以下的实例进行支撑:
1.并发实例
2.查询数据实例
3.执行post请求实例
目标:编写Http执行POST请求的基本类方法
编写test03.py查询mysql相关对应字段数据
1# -*- coding: utf-8 -*-
2
3from tools.MysqlTools import MysqldbHelper
4import pymysql
5from tools.PostTools import PostHelper
6
7if __name__ == "__main__":
8
9 # 定义数据库访问参数
10 config = {
11 'host': '******#注释',
12 'port': 3361,
13 'user': 'root',
14 'passwd': '******#注释',
15 'charset': 'utf8',
16 'cursorclass': pymysql.cursors.DictCursor
17 }
18
19 # 初始化打开数据库连接
20 mydb = MysqldbHelper(config)
21
22 # 选择数据库
23 DB_NAME = '******#注释'
24 # mydb.createDataBase(DB_NAME)
25
26 # 选择数据库
27 print "========= 选择数据库%s ===========" % DB_NAME
28 mydb.selectDataBase(DB_NAME)
29
30 #选择表
31 TABLE_NAME = '******#注释'
32
33 # 数据查询
34 print "========= 数据查询 ==========="
35 select_fields = [
36 "id",
37 ******#注释
38 "1",
39 "1",
40 "null",
41 "NOW()",
42 "last_sync_time",
43 ]
44 select_order = "order by id desc limit 1"
45 select_result = mydb.select(TABLE_NAME, fields=select_fields,order=select_order)
46
47 # 拆分打印的字段
48 for row in select_result:
49 print "id=", row['id']
50 print "user_id=", row['user_id']
51 ******#注释
52 print "1=", row['1']
53 print "1=", row['1']
54 print "null=", None # 注意:查询null的需要写为None
55 print "NOW()=", row['NOW()']
56 print "last_sync_time=", row['last_sync_time']
57 print
进群:960410445 获取源文件!
编写model类,抽象查询的过程方法
models.py
我新建了一个core文件夹目录,然后新建一个models,专门用来处理查询以及调用API发送请求的业务处理。
models.py代码如下:
1# -*- coding: utf-8 -*-
2
3from tools.MysqlTools import MysqldbHelper
4
5class ModelHelper(object): # 继承object类所有方法
6
7 # 初始化数据库连接
8 def __init__(self , config):
9 self.mydb = MysqldbHelper(config) # 初始化打开数据库连接
10
11 def selectTable(self,DB_NAME,TABLE_NAME,fields,order):
12 # 选择数据库
13 self.mydb.selectDataBase(DB_NAME)
14 # 数据查询
15 print "========= 数据查询 ==========="
16 result = self.mydb.select(TABLE_NAME, fields=fields,order=order)
17 # 返回查询的数据
18 return result
编写test04.py测试文件,执行测试一下:
1# -*- coding: utf-8 -*-
2
3from tools.MysqlTools import MysqldbHelper
4import pymysql
5from tools.PostTools import PostHelper
6from core.models import ModelHelper
7
8if __name__ == "__main__":
9
10 # 定义数据库访问参数
11 config = {
12 'host': '#####注释####',
13 'port': 3361,
14 'user': 'root',
15 'passwd': '#####注释####',
16 'charset': 'utf8',
17 'cursorclass': pymysql.cursors.DictCursor
18 }
19 # 初始化数据模型
20 model = ModelHelper(config)
21 # 设置需要查询的数据库
22 DB_NAME = '#####注释####'
23 # 设置需要查询的表明
24 TABLE_NAME = '#####注释####'
25 # 数据查询
26 select_fields = [
27 "id",
28 #####注释####
29 "1",
30 "1",
31 "null",
32 "NOW()",
33 "last_sync_time",
34 ]
35 select_order = "order by id desc limit 1"
36 select_result = model.selectTable(DB_NAME,TABLE_NAME,select_fields,select_order)
37
38 # 拆分打印的字段
39 for row in select_result:
40 print "id=", row['id']
41 #####注释####
42 print "1=", row['1']
43 print "1=", row['1']
44 print "null=", None
45 print "NOW()=", row['NOW()']
46 print "last_sync_time=", row['last_sync_time']
47 print
测试结果如下:
1id= 1066511830261694464
2 #####注释####
31= 1
41= 1
5null= None
6NOW()= 2018-11-27 11:45:08
7last_sync_time= 2018-11-25 10:00:10
那么已经抽象了这部分查询在model中处理了,还有下一步请求API也要写入到model中自动处理。
那么下面来继续写写。
将返回的查询结果转化为字典类型数据
其中查询的旧表字段与新表的字段应该要用字典进行一一映射关联,方便后续调用。
1、定义字典存储 旧表字段 《==》新表字段的映射关系
2、获取旧表字段数据,进行数据查询
3、获取新表字段对应存储数据,再次使用API请求新表,灌入数据
1# 设置字段映射字典: 旧表查询字段 ==》 新表的字段
2 dict_fields = {
3 "id": "id",
4 "user_id": "user_id",
5 ## 注释部分字段
6 "1": "purpose",
7 "1": "status",
8 "null": "data_source",
9 "NOW()": "create_time",
10 "last_sync_time": "last_modify_time"
11 }
1 # 获取旧表字段数组
2 select_fields = []
3 for key, value in dict_fields.items():
4 print "key = %s , value = %s" % (key,value)
5 select_fields.append(key)
6
7 print "select_fields = "
8 print select_fields
执行结果如下:
1# 映射字典的查询数据
2key = user_code , value = user_code
3key = null , value = data_source
4###### 注释部分 #######
5key = NOW() , value = create_time
6key = 1 , value = status
7key = user_level , value = user_level
8
9# 获取生成旧表需要查询的字段
10select_fields = [
11'census_town',
12###### 注释部分 #######
13 'NOW()', '1',
14 'user_level'
15]
1、那么下面就可以根据获取的字段数据,进行mysql数据查询
2、然后生成一个body请求体字典数据,但是此时body的请求体key是旧表的字段,请求API的时候需要新表的字段,那么就需要进行字段替换
3、再写一个字段映射字典的循环,生成请求API的new_body
1# 此时已有查询字段的数组
2print "select_fields = "
3 print select_fields
4
5 select_order = "order by id desc limit 2"
6 select_result = model.selectTable(DB_NAME,TABLE_NAME,select_fields,select_order)
7
8 # 循环生成每条查询数据的请求body
9 body = {}
10 for result in select_result:
11 for field in select_fields:
12 if field == "null":
13 body[field] = None
14 else:
15 body[field] = result[field]
16
17 # 打印查看已生成的body数据
18 for field in select_fields:
19 print body[field]
20
21 print body
22
23 # 更新body的字段为新表的字段
24 new_body = {}
25 for key, value in dict_fields.items():
26 print "key = %s , value = %s" % (key,value)
27 new_body[value] = body[key]
28
29 print "new_body="
30 print new_body
执行结果如下:
那么上面的过程最好写在model中,这样可以方便使用。
编写model增加生成请求API的body数据相关方法
1# -*- coding: utf-8 -*-
2
3from tools.MysqlTools import MysqldbHelper
4from tools.PostTools import PostHelper
5
6class ModelHelper(object): # 继承object类所有方法
7
8 # 初始化数据库连接
9 def __init__(self , config):
10 self.mydb = MysqldbHelper(config) # 初始化打开数据库连接
11
12 # 根据设置的旧表字段,查询旧库的数据库数据
13 def selectTable(self,DB_NAME,TABLE_NAME,fields,order):
14 # 选择数据库
15 self.mydb.selectDataBase(DB_NAME)
16 # 数据查询
17 result = self.mydb.select(TABLE_NAME, fields=fields,order=order)
18 # 返回查询的数据
19 return result
20
21 # 根据字段映射字典获取旧表字段数组
22 def getSelectFields(self,dict_fields):
23 # 获取旧表字段数组
24 select_fields = []
25 for key, value in dict_fields.items():
26 # print "key = %s , value = %s" % (key, value)
27 select_fields.append(key)
28 return select_fields
29
30 # 根据查询的结果以及字段字典,转化为请求API的body
31 def convertApiBody(self,result,dict_fields):
32 # 循环生成每条查询数据的请求body
33 body = {}
34 for result in result:
35 for field in result:
36 if field == "null":
37 body[field] = None
38 else:
39 body[field] = result[field]
40 # 更新body的字段为新表的字段
41 new_body = {}
42 for key, value in dict_fields.items():
43 # print "key = %s , value = %s" % (key, value)
44 if key == "null":
45 new_body[value] = None
46 else:
47 new_body[value] = body[key]
48 return new_body
使用model方法,以及执行结果:
那么下一步就是再编写一个执行API的方法。
但是在请求API之前,需要将body序列化为json格式,这个存在datetime类型导致序列化失败的情况,下一个篇章继续。
网友评论