1找连续时间
import pandas as pd
import datetime
from sqlalchemy import create_engine
def read_sql():
engine = create_engine("postgresql+psycopg2://glzt:123456@127.0.0.1:5432/spider", max_overflow=5, encoding='utf-8')
sql = """select count(grid_id) as grid_id, to_char(published_at, 'YYYY-MM-DD HH24:MI:SS') as published_at
from grid_air_quality group by published_at order by published_at;"""
data = pd.read_sql(sql, engine)
data['published_at'] = pd.to_datetime(data['published_at'], format='%Y-%m-%d %H:%M:%S')
return data
def get_continus_data():
# 设置数据读取方式(此处从数据库中读取,也可以设置为从csv中读取)
data = read_sql()
# 排序
data.sort_values(by="published_at", inplace=True)
# 重新设置索引
data.reset_index(drop=True, inplace=True)
print(data)
list_continuous = []
# 获取连续的21天的评级(503个小时(即21天乘24小时减一))
for index, value in enumerate(data['published_at'][: -503]):
try:
delta = data.iloc[index+503]['published_at'] - data.iloc[index]['published_at']
print(delta)
if delta.days*24 + delta.seconds//3600 == 503:
list_continuous.append([data.iloc[index]['published_at'], data.iloc[index+503]['published_at']])
else:
print(data.iloc[index]['published_at'])
except IndexError:
print(index)
pass
list_continuous = pd.DataFrame(data=list_continuous, columns=['start', 'end'])
list_continuous.to_csv('./train.csv', index=False)
print(list_continuous)
2找出本地可用的字体
from matplotlib.font_manager import FontManager
import subprocess
fm = FontManager()
mat_fonts = set(f.name for f in fm.ttflist)
output = subprocess.check_output(
'fc-list :lang=zh -f "%{family}\n"', shell=True)
output = output.decode('utf-8')
zh_fonts = set(f.split(',', 1)[0] for f in output.split('\n'))
available = mat_fonts & zh_fonts
print('*' * 10, '可用的字体', '*' * 10)
for f in available:
print(f)
3redis发布订阅消息
import json
import time
import datetime
import redis
class DbRedis:
def __new__(cls, *args, **kwargs):
if not hasattr(cls, '_instance'):
cls._instance = super(DbRedis, cls).__new__(cls, *args, **kwargs)
return cls._instance
# 提供数据库配置
def __init__(self):
self.redis_link = {
'host': '192.168.1.115',
'port': 6379,
'password': 'glztredis',
'decode_responses': True,
# 'db': 2
}
self.pool = redis.ConnectionPool(**self.redis_link) # 创建redis连接池
self.corsur = redis.Redis(connection_pool=self.pool) # 从连接池中取一个连接
def publish(self, message):
self.corsur.publish('dynamic_feature_published', json.dumps(message))
print('发布成功')
return True
# corsur = redis.StrictRedis(host='127.0.0.1', port=6379, db=2)
db = DbRedis()
# corsur = db.corsur
# keys = corsur.keys()
# print(keys)
loss_time = ['2019-11-08 13:00:00---2019-11-08 13:00:00',
'2019-11-08 18:00:00---2019-11-08 19:00:00',]
for published_ats in loss_time:
start, end = published_ats.split('---')
start = datetime.datetime.strptime(start, '%Y-%m-%d %H:%M:%S')
end = datetime.datetime.strptime(end, '%Y-%m-%d %H:%M:%S')
hours = (end - start).days*24 + (end - start).seconds//3600
for hour in range(hours+1):
loss_current = start + datetime.timedelta(hours=hour)
print(loss_current)
db.publish({"published": str(loss_current)})
time.sleep(1)
#两个for循环加if判断用pandas写法,效率提高多倍
points['id'] = points.apply(lambda x: grids.loc[(grids['bottom_lat'] <= x['lat']) & (grids['top_lat'] >= x['lat']) & (grids['bottom_lng'] <= x['lng']) & (grids['top_lng'] >= x['lng']), 'id'].to_numpy()[0], axis=1)
网友评论