#coding:utf-8
import pymysql
import celery
import re
import pymongo
import json
import re
import time
from celery.schedules import crontab
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
app = celery.Celery('weather',broker='redis://127.0.0.1:6379',backend='redis://127.0.0.1:6379')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
#sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
sender.add_periodic_task(
crontab(minute='*/1'),
update_knowledge,
)
@app.task
def update_knowledge():
con = pymysql.connect(host='172.22.64.50', port=3306, user='xialong', passwd='adiiadsjkh', db='knowledgebase')
cur = con.cursor()
nums = cur.execute('select * from kb_weather')
print('mysql_already_exit%d'%nums)
mongo_client = pymongo.MongoClient('172.22.32.8', 27017)
db = pymongo.database.Database(mongo_client, 'qi_resultdb')
result = db['weather'].find()
num = 0
for i in result[nums:nums+1]:
num += 1
new = json.loads(i['result'])
# print(new)
id = num
city = new[u'城市名称']
city = city.encode('utf-8')
date = new[u'日期']
#date = date.replace('月','')
#date = date.replace('日','')
date = re.match(u'(\d+)月(\d+)日',date)
date = date.group(1)+'-' +date.group(2)
#date = re.findall('(\d+)月(\d+)日',date)
date = '2018-' + date
timek = new[u'小时']
hour = int(timek.split(':')[0])
minute = int(timek.split(':')[1])
weather = new[u'天气']
weather = weather.encode('utf-8')
temperature = new[u'温度']
if len(new[u'空气质量(aqi)']) > 0:
aqi = int(new[u'空气质量(aqi)'])
else:
aqi = 'NULL'
aqi_remark = ''
humidity = int(new[u'湿度'].replace('%', '')) / 100
precipitation = float(new[u'降水量'])
wind_force = float(new[u'风力'].replace(u'级', ''))
wind_direct = new[u'风向']
wind_direct = wind_direct.encode('utf-8')
updatetime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
t = (city, date, hour, minute, weather, temperature, aqi, aqi_remark, humidity, precipitation, wind_force,
wind_direct, updatetime)
command = "insert into kb_weather (`city`,`date`,`hour`,`minute`,`weather`," \
"`temperature`,`aqi`,`aqi_remark`,`humidity`,`precipitation`,`wind_force`,`wind_direct`," \
"`updatetime`) values('{}','{}','{}','{}','{}','{}','{}','{}','{}','{}','{}','{}','{}');".\
format(city, date, hour, minute, weather, temperature, aqi, aqi_remark, humidity, precipitation, wind_force,
wind_direct, updatetime)
print(command)
cur.execute(command)
con.commit()
cur.close()
con.close()
print('success')
print(num)
网友评论