背景
由于信息安全的管理需要,很多生产环境的服务器都采用了非常严格的远程访问方式,一些例行的服务器维护工作就变的比较繁琐,需要多层登陆,并且执行一大堆的命令才能够完成,那么,作为一个画家和摄影师,写那么复杂的shell脚本怕是会要了我半条命,因此,我想到了一个曲线救国的方案——Jupyter。
思路
具体思路是这样的:
为了安全起见,我没有直接暴露堡垒机到互联网,而是在堡垒机上安装了Docker环境,这样我的Jupyter环境就可以只在我需要的时候打开,万一Jupyter被别有用心的人发现,也只是针对docker环境,多一层缓冲。由于我的堡垒机本身是一个计算集群中的一台,所以我使用Rancher作为容器的管理工具,顺便给大家安利一下,非常好用!比某帽的openshfit商业版还好用一百倍!
我需要做的主要工作是每个月把生产服务器上的物联网数据压缩打包传输到堡垒机的计算集群并且倒入到InfluxDB时序数据库中。由于物联网数据量很大,并且压缩比也非常大,所以如果直接传输非常浪费带宽资源和时间,因此,我的大致步骤如下:
#%% ——————————Step0. 建立远程服务器连接——————————
import os, time
# 我自己封装的工具包,里面有python通过ssh协议远程登录服务器以及数据清洗、写入influxDB等功能
from utilities import koalaTools as k
# 因为数据量很大,所以需要关注一下每次任务的执行时间
s_time = time.process_time()
k.logger.info("start.........")
# 一些全局参数
local_work_dir ='/data/'
# 年月标签,用于后面的文件命名
ym ='202003'
# zz是我的远程服务器登陆信息,这些信息实际上存在我的一个数据库中,通过Rest API调用获得,这里为了方便,写了一个本地的dict代替。
zz = {
"host_name":"",
"ssh_port":22,
"username":"root",
"password":"",
"remote_root":"/shared/",
"work_dir":"data",
}
# ssh client
sc = k.get_ssh_connection(hostname=zz['host_name'], username=zz['username'], port=zz['ssh_port'], password=zz['password'])
# sftp client
sftp = sc.open_sftp()
sftp.chdir(zz['remote_root'] + zz['work_dir'])
sftp.listdir()
网上很多文章用paramiko的sftpClient 去创建sftp连接,我用了一下感觉很多问题,并且我还要用bash命令去执行tar打包等操作,因此,sftp可以从ssh connection来创建。另外,查看目标目录的文件列表我更喜欢用sftp的listdir方法,返回一个list,比执行command返回stdout,然后readline读出来感觉省事儿很多。
这里说一下jupyter的好处,就是我可以把我的执行步骤拆解成几部分,执行完一个jupyther的cell可以看到一些结果,确认无误后再执行下一个环节,个人感觉也比写shell方便很多,不然你的shell需要设计一大堆用于反馈的东西。
那么,上一步最后的sftp.listdir()输出结果如果是我预期的,那么我就可以执行下一步操作了。
#%% ——————————Step1. 远程服务器文件打包——————————
# 远程文件打包 tar -zcvf 2002020326.tar.gz 202002*,我生产服务器上的数据文件都是每天一个csv这样dump出来的,所以可以利用这个规律进行按月的打包。
# 压缩包文件名
gz_name = "".join([ym, 'tar.gz'])
tar_cmd = "".join(['tar -zcvf ', gz_name, ym + '*'])
k.exec_cmd(sc, tar_cmd)
k.exec_cmd(sc, 'ls -l')
同样,执行完后看一下打包结果,确认无误后再进行下一步骤。
接下来是把文件通过sftp get到本地服务器,操作非常简单,只是文件太大,传输万一出现网络不稳定情况终端就比较麻烦,我毕竟是个艺术家,写代码不擅长,这里一直想加入一个进度监控,但是还没有想到合适的方式,请高人看到后指点一下,谢谢!
#%% ——————————Step2. 获取文件到本地——————————
remote_file_path = zz['remote_root']+zz['work_dir']+'/' + gz_name
local_file_path = local_work_dir + gz_name
sftp.get(remote_file_path, local_file_path)
os.listdir(local_work_dir)
这里os.listdir看一下文件是否get过来了,没问题的话执行下一步。
这里注意,不要忘了关闭ssh连接哦!
#%% ——————————Step3. 本地服务解压——————————
sc.close()
tmp_dir = '_tmp'
file_list = k.un_tar(gz_name, tmp_dir)
# 归档压缩包
os.rename(local_work_dir + gz_name, "./tarbak/" + gz_name)
os.listdir(local_work_dir)
这里有一个感觉很麻烦的事情,就是老要用绝对路径,也不知道确实是需要这样还是因为我是菜鸟。
whatever,能用就行了吧,绝对路径也有好处,程序拷到任何目录执行不受影响。
最后依然是老规矩,检查一下这一步的操作成果是否正确,然后才执行下一步。
最后一步就是将数据导入时序数据库InfluxDB
#%% ——————————Step3. 导入至InfluxDB——————————
client = k.connect_to_influxdb()
k.logger.info('File list : %s' % os.listdir(tmp_dir))
for file_name in file_list:
if file_name.endswith('.csv') and os.path.getsize(file_name) > 0:
k.logger.info('Start saving File: %s' % file_name)
cf = k.pd.DataFrame()
cf = k.clear_me2(file_name)
k.save_to_influxdb(file_name, cf, client)
k.logger.info('Delete file: %s' % file_name)
os.remove(local_work_dir+file_name)
client.close()
e_time = time.process_time()
k.logger.info("The total running time is: %s" % (e_time - s_time))
下面是我的koalaTools.py的部分代码,至于里面的数据清洗的内容就不便公开了,涉及商业秘密啦,哈哈
import paramiko
import tarfile
# 加入日志
# 获取logger实例
logger = logging.getLogger("baseSpider")
# 指定输出格式
formatter = logging.Formatter('%(asctime)s\
%(levelname)-8s:%(message)s')
# 文件日志
file_handler = logging.FileHandler("operation_theServer.log")
file_handler.setFormatter(formatter)
# 控制台日志
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
# 为logge添加具体的日志处理器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.setLevel(logging.INFO)
def get_ssh_connection(hostname, username, password=None, key_dir=None, port=22):
try:
# 创建SSH对象
sf = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
sf.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
# 是否证书登陆
if key_dir:
sf.connect(hostname=hostname, username=username, key_filename=key_dir,
password=password, port=port)
else:
sf.connect(hostname=hostname, port=port, username=username,
password=password)
logger.info("SSHConnection-->" + hostname + "<--succes!")
return sf
except Exception as e:
logger.error("SSHConnection-->" + hostname + "<--failed!")
logger.error(e)
return False
def exec_cmd(ssh_client, command):
stdin, stdout, stderr = ssh_client.exec_command(command)
for i in stdout.readlines():
logger.info(i)
def un_tar(tgz_name, tmp_dir="_tmp"):
"""
untar zip file
:param tgz_name: Tar gz File name
:return:
"""
tar = tarfile.open(tgz_name)
names = tar.getnames()
file_list = []
if os.path.isdir(tmp_dir):
pass
else:
os.mkdir(tmp_dir)
# 因为解压后是很多文件,预先建立同名目录
for name in names:
tar.extract(name, tmp+"/")
logger.info('Extract', name, 'From', tgz_name)
file_list.append(tmp_dir + '/' + name)
tar.close()
return file_list
之前我还用django写了一个小的管理工具,试图实现这些功能,但是本人是个强迫症,因为做这个小工具,什么技术都想用最新的,所以从django开始,又研究了rest_framwork、VUE、React、flask等等,兜了一大圈子,毕竟我不是靠写代码吃饭的,作为一个艺术家,这样的投入产出太不划算了,而且非常不灵活,所以只用来作为我管理一些数据的小工具作罢。
下一篇文章,我打算分享一下给初级摄影爱好者的照片管理方案,本人的特点就是务实,绝对不会因为要显示自己的牛逼故意讲很多很复杂很专业的工具(虽然我也很专业,但是没必要拿出来显摆啦,真正强大的人不需要证明自己,哈哈哈哈)
网友评论