import requests, json
from it.models import SysConf
import datetime
class DingDing:
def get_access_token(self):
_t = SysConf.objects.get(type='ali', key='ACCESS_TOKEN')
if _t.update_time + datetime.timedelta(hours=_t.valid_time.hour) > datetime.datetime.now():
return _t.value
params = {
'appkey': SysConf.objects.get(type='ali', key='APP_KEY').value,
'appsecret': SysConf.objects.get(type='ali', key='APP_SECRET').value,
}
res = requests.get('https://oapi.dingtalk.com/gettoken', params)
access_token = json.loads(res.text).get('access_token')
_t.value = access_token
_t.save()
return access_token
def send_inform(self, userid_list, title, text):
'''
:param title: 会话透出的展示内容
:param text: markdown格式的消息,建议500字内
:return: True or False
'''
url = f'https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2'
params = {'access_token': self.get_access_token()}
data = {
'agent_id': SysConf.objects.get(type='ali', key='AGENT_ID').value,
'msg': {"markdown": {"text": text, "title": title}, "msgtype": "markdown"},
'userid_list': userid_list,
}
res = requests.post(url, data=json.dumps(data), params=params).json()
if res.get('errcode') == 0:return True
else:return False
SysConf.py
from django.db import models
# Create your models here.
class SysConf(models.Model):
type = models.CharField('类型', max_length=32)
key = models.CharField('键', max_length=32)
value = models.CharField('值', max_length=255)
update_time = models.DateTimeField('最近更新时间', auto_now_add=True, null=True, blank=True)
valid_time = models.TimeField('有效时间', null=True, blank=True)
class Meta:
managed = False
db_table = 'qweqweqwe'
verbose_name = '配置表'
verbose_name_plural = verbose_name
image.png
网友评论