最近项目需要实现 随机的未来时间自动填写文件信息功能
其实方法有很多
方法1.开启一个进程专门检查在指定时间段内执行任务操作
方法2.来一个任务就开一个线程在指定的时间段内执行任务操作
方法3.来一个任务就开启一个定时任务
我目前想到的就是这些,方法比较粗俗
现在我就来讲一下如何实现方法1
实现写一个自动填写文件的函数
def automatic_response():
# 自动应助
# 当前时间
now_dt = datetime.datetime.now()
# 5秒浮动时间
before_dt = datetime.datetime(year=now_dt.year,
month=now_dt.month,
day=now_dt.day,
hour=now_dt.hour,
minute=(0 if now_dt.minute-1 < 0 else now_dt.minute) if now_dt.second -5 < 0 else now_dt.minute,
second=0 if now_dt.second-5 < 0 else now_dt.second - 5)
last_dt = datetime.datetime(year=now_dt.year,
month=now_dt.month,
day=now_dt.day,
hour=now_dt.hour,
minute=now_dt.minute,
second=59 if now_dt.second + 5 > 59 else now_dt.second + 5)
# 查找在应助中的应助时间=浮动时间 __gte 大于等于 __lte 小于等于
helps = Helps.objects.filter(state=0).filter(response_time__gte=before_dt, response_time__lte=last_dt).all()
if not helps:
return
else:
print('自动应助在%s开始执行...' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M"))
for h in helps:
# 数据库数据
# 创建对象
myclient = pymongo.MongoClient(MONGO_IP, 27017)
# 数据库
mydb = myclient[MONGO_DB]
# 数据表
mycol = mydb[MONGO_TABLE]
# 在服务器中查有doi的对象
file = mycol.find_one({'doi': h.doi})
field_info = file['file']
Helps.objects.filter(id=h.id).update(state=6,
end_response_time=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
field_info=field_info,
button_code=True)
print('自动应助在%s执行完成...' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M"))
然后在开启一个进程用来跑这个函数
from multiprocessing import Process
import time
from web.views import automatic_response, update_code
def check():
# 检查过期
# update_code()
# 自动应助
automatic_response()
class MyProcess(Process):
def __init__(self):
super(MyProcess, self).__init__()
self.name = str(time.time())
def run(self):
print("%s is running " % self.name)
while True:
time.sleep(1)
check()
print('%s is done' % self.name)
if __name__ == '__main__':
p = MyProcess()
p.start() # 就是调用run()方法
然后在manange.py中调用这个进程
#!/usr/bin/env python
import os
import sys
import django
if __name__ == "__main__":
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "wxhz.settings")
django.setup()
try:
from django.core.management import execute_from_command_line
from utils.PorcessClass import MyProcess
MyProcess().start()
except ImportError as exc:
# The above import may fail for some other reason. Ensure that the
# issue is really that Django is missing to avoid masking other
# exceptions on Python 2.
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
然后运行就可以啦
当然应产品需求 改成线程来跑
现在那就讲讲方法2
首先写一个线程
def thread_response(id):
# 线程
hread_artificial = threading.Thread(target=auto_response_task, args=(id,))
hread_artificial.start()
在写线程调用的自动操作数据库的函数
# =============自动应助=================
def auto_response_task(id):
"""
这儿是对应一个求助来写的应助 是一对一的
:param help_pk:
:return:
"""
# 通过help的唯一标示 找到help对象
help = Helps.objects.filter(id=id).first()
response_time = datetime.datetime.strptime(help.response_time, '%Y-%m-%d %H:%M:%S')
# 使用 while 死循环 然线程一直运行 通过修改 结果循环
bl = True
while bl:
now_dt = datetime.datetime.now()
# 5秒浮动时间
before_dt = datetime.datetime(year=now_dt.year,
month=now_dt.month,
day=now_dt.day,
hour=now_dt.hour,
minute=(
0 if now_dt.minute - 1 < 0 else now_dt.minute) if now_dt.second - 5 < 0 else now_dt.minute,
second=0 if now_dt.second - 5 < 0 else now_dt.second - 5)
last_dt = datetime.datetime(year=now_dt.year,
month=now_dt.month,
day=now_dt.day,
hour=now_dt.hour,
minute=now_dt.minute,
second=59 if now_dt.second + 5 > 59 else now_dt.second + 5)
# 判断help的响应时间是否在时间正常浮动范围内
if response_time <= last_dt and response_time >= before_dt:
print('自动应助在%s开始执行...' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M"))
# True 作出响应 应助 修改字段值
help.state=6
help.end_response_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
#创建对象
myclient = pymongo.MongoClient(MONGO_IP, 27017)
# 数据库
mydb = myclient[MONGO_DB]
# 数据表
mycol = mydb[MONGO_TABLE]
# 在服务器中查有doi的对象
file = mycol.find_one({'doi': help.doi})
# 在数据库中写入
field_info = file['file']
help.field_info = field_info
help.button_code = True
help.end_response_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
help.save()
# zhe er success update
bl = False
print('自动应助在%s执行完成...' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M"))
time.sleep(2)
接下来就在需要用到线程的地方调用
# 开启一个线程 进行自动应助
thread_response(h.id)
OK啦 至于定时任务嘛 就自己研究一下吧 我这边暂时没有用到
网友评论