问题背景
我们经常会获取tapd/coding的工单或缺陷信息,通过webhook企业微信机器人同步消息。以做到新工单提醒,加快客户问题闭环。
但是会遇到一个问题,定时任务去轮训是否有新工单,如果下一轮任务执行时该工单状态还是NEW的话,会导致重复发送。
解决方案
运用sqllite新建临时表,存储工单id和发送状态(默认为未发送);消息发送成功后,发送状态更新为已发送。下一次发送前判断发送状态,若已发送,则跳出。
一、sqllite是什么?
是一个开源的嵌入式关系数据库,实现了自给自足的、无服务器的、配置无需的、事务性的 SQL 数据库引擎。
一种非常轻量级的数据库解决方案,非常适合小型项目、嵌入式数据库或者测试环境中。
SQLite 的一些主要特性包括:
1、无服务器的:SQLite 不是一个单独的服务进程,而是直接嵌入到应用程序中。它直接读取和写入磁盘文件。
2、事务性的:SQLite 支持 ACID(原子性、一致性、隔离性、持久性)属性,能够确保所有事务都是安全、一致的,即使在系统崩溃或者电力中断的情况下。
3、零配置的:SQLite 不需要任何配置或者管理,这使得它非常容易安装和使用。
4、自包含的:SQLite 是一个自包含系统,这意味着它几乎不依赖其他任何外部系统或者库,这使得 SQLite 的跨平台移植非常方便。
5、小型的:SQLite 非常小巧轻量,全功能的 SQLite 数据库引擎的大小只有几百KB。
6、广泛应用:SQLite 被广泛应用在各种各样的产品和系统中,包括手机、平板电脑、嵌入式系统、物联网设备等。它也被广泛用于网站开发、科学研究、数据分析等领域
二、sqllite常用用法?
1、建表
# 创建链接
conn = sqlite3.connect(self.safe_orders_db)
print("数据库打开成功")
# 创建游标
c = conn.cursor()
# 创建表
c.execute("""create TABLE safe_orders
(id int primary key not null,
sended int default 0 not null);""")
print("数据表创建成功")
# 提交
conn.commit()
2、插入记录
c.execute("insert into safe_orders (id,sended) values (9880, 0)")
print("记录插入成功")
# 提交
conn.commit()
3、删除记录
c.execute("delete from safe_orders where id=9880")
print("记录删除成功")
# 提交
conn.commit()
4、查询记录
# 执行查询
result = c.execute("select sended from safe_orders where id=9880")
# 获取单条查询结果
result.fetchone()
# 获取全部查询结果
result.fetchall()
三、具体案例代码
# -*- coding: utf-8 -*-
# @Time : 2023/12/1 18:26
# @Author : xxx
# @File : xx工单提醒
# @Software: PyCharm
import json
import sqlite3
import requests
from jsonpath import jsonpath
class SafeOrder:
def __init__(self, robot_url):
"""初始化"""
self.robot_url = robot_url # 机器人url
self.conn = sqlite3.connect('safe_orders.db') # 链接数据库
self.c = self.conn.cursor() # 创建游标
def get_safe_order_info(self):
# 请求地址
url = 'https://xx.coding.net/api/project/xx/issues/MISSION/list'
# 请求头
headers = {xx}
# coding缺陷查询请求体
json_body = {
"page": 1,
"pageSize": 50,
"content": {
"sorts": [{
"key": "PRIORITY",
"value": "DESC",
"customFieldIds": None,
"componentType": None,
"id": "PRIORITY"
}],
"conditions": [{
"key": "ISSUE_TYPE_ID",
"fixed": False,
"value": [xxx],
"validInfo": [],
"userMap": {
"xxx": {
"value": xxx
}
},
"selectedItems": {
"xxx": {
"value": xxx
}
}
}, {
"key": "STATUS",
"fixed": False,
"value": [xxx],
"validInfo": [],
"userMap": {
"xxx": {
"value": xxx
}
},
"selectedItems": {
"xxx": {
"value": xxx
}
}
}],
"showSubIssues": True
}
}
params_body = {"filterId": xxx}
form_body = {}
# 发起筛选
res = requests.post(url=url, headers=headers, json=json_body, params=params_body, data=form_body)
# 处理返回,默认返回json
try:
res = res.json()
except Exception as e:
res = res.text
logging.warning(f'当前返回异常,请检查,返回:{res}')
logging.warning(e)
# 抛出异常后不执行
return
# 提取返回信息
# 提取code
code_list = jsonpath(res, '$.data.list[*].code')
# 提取name
name_list = jsonpath(res, '$.data.list[*].name')
# 提取issueStatusName
status_list = jsonpath(res, '$.data.list[*].issueStatusName')
# 提取创建时间
create_at_list = jsonpath(res, '$.data.list[*].createdAt')
# 处理为标准时间
create_at_list = [datetime.datetime.utcfromtimestamp(x / 1000).strftime('%Y-%m-%d %H:%M:%S') for x in
create_at_list]
# 利用枚举函数组装需要的记录信息
records = []
for i, v in enumerate(code_list):
record = [v, name_list[i], status_list[i], create_at_list[i]]
records.append(record)
print(f"工单信息:{records}")
# 消息头
contents = f"""**您有<font color ='warning'>{len(records)}</font> 条新的安全工单,尽快处理\n<@xxx>**\n\n\n"""
for r in records:
try:
# 插入记录,默认为0
self.c.execute(f"insert into safe_orders (id,issend) values ({r[0]}, 0)")
self.conn.commit()
print("数据插入成功")
except Exception as e:
print(f'{r[0]} 已存在请勿重复插入 {e}')
# 组装消息体
order_url = f'https://xxx.coding.net/p/x x x/assignments/issues/{r[0]}/detail'
content = f"""标题:<font color ='comment'>{r[1]}</font>\n链接:<font color ='comment'>[{order_url}]({order_url})</font>\n状态:<font color ='comment'>{r[2]}</font>\n创建时间:<font color ='comment'>{r[3]}</font>\n\n\n"""
contents += content
end = '[👉工单汇总链接](https://xxxx.net/p/xxx/assignments/issues?filter=xxx)'
# print(contents)
# 拼接汇总链接
contents = contents + end
return contents, records
def send_msg(self):
"""发送消息"""
# 获取记录信息和待发送的请求体信息
contents, records = self.get_safe_order_info()
# 定义信息头
headers = {'Content-Type': 'application/json'}
# 定义请求体
body = {
"msgtype": "markdown",
"markdown": {
"content": contents
}
}
# 判断是否发送过
is_sends = []
for r in records:
is_send = self.c.execute(f"select issend from safe_orders where id={r[0]}").fetchall()[0][0]
self.c.execute(f"select issend from safe_orders where id={r[0]}").fetchone()
is_sends.append(is_send)
print(f'发送结果列表为: {is_sends}')
if 1 in is_sends:
print('请勿重复发送')
return False
else:
# 发起请求
response = requests.post(url=self.robot_url, headers=headers, data=json.dumps(body))
# 提取返回信息
code = response.json()["errcode"]
msg = response.json()["errmsg"]
# 断言消息是否发送成功
if code == 0 and msg == "ok":
print(f"发送成功,发送结果为:{response.text}")
# 更新发送状态 0-未发送 1-已发送
for r in records:
# 更新
self.c.execute(f"update safe_orders set issend=1 where id={r[0]}")
# 提交
self.conn.commit()
# 关闭
self.conn.close()
print("发送状态更新成功")
else:
print(f"发送失败,发送结果为:{response.text}\n")
def create_table(self):
"""利用sqllite建表"""
conn = sqlite3.connect(self.safe_orders_db)
print("数据库打开成功")
c = conn.cursor()
c.execute("""create table safe_orders
(id nit primary key not null,
issend int default 0 not null);""")
print("数据表创建成功")
conn.commit()
conn.close()
if __name__ == '__main__':
robot_url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx'
s = SafeOrder(robot_url)
# s.create_table()
s.send_msg()
执行结果:
![](https://img.haomeiwen.com/i27710895/1369440e5b986830.png)
![](https://img.haomeiwen.com/i27710895/695fdaaecd926964.png)
![](https://img.haomeiwen.com/i27710895/9e12c1ec70bd020a.png)
网友评论