1.下载阿里云sdk
pip install aliyun-python-sdk-core
pip install aliyun-python-sdk-bssopenapi==1.7.12
2.创建一个文件存放ak信息
cat ak.yaml #创建一个权限小的ram用户
AccessKeyId: "aaaaaaaaaaaa"
AccessKeySecret: "bbbbbbbbbbbbbb"
3.脚本
cat get_account.py
import base64
import hashlib
import hmac
from aliyunsdkcore.client import AcsClient
from aliyunsdkbssopenapi.request.v20171214.QueryAccountBalanceRequest import QueryAccountBalanceRequest
import json
import yaml
import time
import requests
from loguru import logger
def GetAk(f_obj):
with open(f_obj) as f:
ak = yaml.load(f,Loader=yaml.SafeLoader)
return ak
def AvailableAmount(Akid,AkSecret):
client = AcsClient(Akid, AkSecret, 'cn-hangzhou')
request = QueryAccountBalanceRequest()
request.set_accept_format('json')
print(request)
response = client.do_action_with_exception(request)
print(response)
return (json.loads(response))
def gen_sign(timestamp, secret):
string_to_sign = '{}\n{}'.format(timestamp, secret)
hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest()
sign = base64.b64encode(hmac_code).decode('utf-8')
return sign
def send_feishu(*, title: str, content : str, webhook : str):
headers = {"Content-Type": "application/json"}
timestamp = int(time.time())
data = {
"timestamp": timestamp,
"sign": gen_sign(timestamp, "改成自己的签名校验"),
"msg_type": "interactive",
"card": {
"elements": [{
"tag": "div",
"text": {
"content": content,
"tag": "lark_md"
}
}],
"header": {
"title": {
"content": title,
"tag": "plain_text"
},
}
}
}
res = requests.post(webhook, data=json.dumps(data), headers=headers)
logger.info(f"发送飞书响应消息:{res.json()}")
if __name__ == "__main__":
AK = GetAk('ak.yaml')
Mon = AvailableAmount(AK['AccessKeyId'],AK['AccessKeySecret'])['Data']['AvailableAmount'].replace(',','')
send_feishu(title="阿里云余额通知", content=f"阿里云账户余额: {Mon}",
webhook="改成自己的webhook")
4. 放到定时任务执行
00 10 * * * cd /opt/aliyunyuegaojing && /usr/bin/python3 get_account.py >> /dev/null 2>&1

image.png
网友评论