美文网首页Shell
Linux Shell脚本经典案例(二)

Linux Shell脚本经典案例(二)

作者: apple524 | 来源:发表于2022-03-03 16:33 被阅读0次

    11.统计 /proc 目类下 Linux 进程相关数量信息,输出总进程数,running 进程数,stoped 进程数,sleeing 进程数,zombie 进程数。
    【输出所有 zombie 的进程到 zombie.txt 杀死所有 zombie 进程】

    #!/bin/bash
    ALL_PROCESS=$(ls /proc/ | egrep '[0-9]+')
    running_count=0
    stoped_count=0
    sleeping_count=0
    zombie_count=0
    for pid in ${ALL_PROCESS[*]}
    do
        test -f /proc/$pid/status && state=$(egrep "State" /proc/$pid/status | awk '{print $2}')
        case "$state" in
    R)
    running_count=$((running_count+1))
    ;;
    T)
    stoped_count=$((stoped_count+1))
    ;;
    S)
    sleeping_count=$((sleeping_count+1))
    ;;
    Z)
    zombie_count=$((zombie_count+1))
    echo "$pid" >>zombie.txt
    kill -9 "$pid"
    ;;
    esac
    done
    echo -e "total:
    $((running_count+stoped_count+sleeping_count+zombie_count))\nrunning:
    $running_count\nstoped: $stoped_count\nsleeping: $sleeping_count\nzombie:
    $zombie_count"
    

    12.12.把当前目录(包含子目录)下所有后缀为 ".sh" 的文件后缀变更为 ".shell",之后删除每个文件的第二行。

    #!/bin/bash
    ALL_SH_FILE=$(find .-type f -name "*.sh")
    for file in ${ALL_SH_FILE[*]}
    do
    filename=$(echo $file | awk -F'.sh' '{print $1}')
    new_filename="${filename}.shell"
    mv "$file" "$new_filename"
    sed -i '2d' "$new_filename"
    done
    

    13.判断目录 /tmp/jstack 是否存在,不存在则新建一个目
    录,若存在则删除目录下所有内容。
    【每隔 1 小时打印 inceptor server 的 jstack 信息,并以 jstack_${当前时间} 命名文件,每当目录下超过10 个文件后,删除最旧的文件】

    #!/bin/bash
    DIRPATH='/tmp/jstack'
    CURRENT_TIME=$(date +'%F'-'%H:%M:%S')
    if [ ! -d "$DIRPATH" ];then
          mkdir "$DIRPATH"
    else
          rm -rf "$DIRPATH"/*
    fi
          cd "$DIRPATH"
    while true
    do
        sleep 3600
        # 这里需要将inceptor改后自己的java进程名称
        pid=$(ps -ef | grep 'inceptor' | grep -v grep | awk '{print $2}')
        jstack $pid >> "jstack_${CURRENT_TIME}"
        dir_count=$(ls | wc -l)
        if [ "$dir_count"-gt 10 ];then
            rm -f $(ls -tr | head -1)
        fi
    done
    

    14.从 test.log 中截取当天的所有 gc 信息日志,并统计 gc
    时间的平均值和时长最长的时间。

    #!/bin/bash
    awk '{print $2}' hive-server2.log | tr -d ':' | awk '{sum+=$1} END {print "avg:
    ", sum/NR}' >>capture_hive_log.log
    awk '{print $2}' hive-server2.log | tr -d ':' | awk '{max = 0} {if ($1+0 > max+0)
    max=$1} END {print "Max: ", max}'>>capture_hive_log.log
    

    15.查找 80 端口请求数最高的前 20 个 IP 地址,判断中间最小的请求数是否大于 500,如大于 500,则输出系统活动情况报告到 alert.txt,如果没有,则在 600s 后重试,直到有输出为止。

    #!/bin/bash
    state="true"
    while $state
    do
        SMALL_REQUESTS=$(netstat -ant | awk -F'[ :]+' '/:22/{count[$4]++} END {for(ip in count) print count[ip]}' | sort -n | head -20 | head -1)
        if [ "$SMALL_REQUESTS" -gt 500 ];then
              sar -A > alert.txt
              state="false"
          else
              sleep 6
              continue
    fi
    

    16.将当前目录下大于 10K 的文件转移到 /tmp 目录,再按
    照文件大小顺序,从大到小输出文件名

    #!/bin/bash
    # 目标目录
    DIRPATH='/tmp'
    # 查看目录
    FILEPATH='.'
    find "$FILEPATH"  -size +10k -type f | xargs -i mv {} "$DIRPATH"
    ls -lS "$DIRPATH" | awk '{if(NR>1) print $NF}'
    

    17.企业微信告警
    【此脚本通过企业微信应用,进行微信告警,可用于 Zabbix 监控。

    # -*- coding: utf-8 -*-
    import requests
    import json
    class DLF:
        def__init__(self, corpid, corpsecret):
            self.url ="https://qyapi.weixin.qq.com/cgi-bin"
            self.corpid = corpid
            self.corpsecret = corpsecret
            self._token = self._get_token()
    def_get_token(self):
            '''
            获取企业微信API接口的access_token
            :return:
            '''
            token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid,
    self.corpsecret)
    try:
            res = requests.get(token_url).json()
            token = res['access_token']
            return token
    except Exception as e:
            return str(e)
        def_get_media_id(self, file_obj):
            get_media_url = self.url + "/media/upload?access_token={}&type=file"
    .format(self._token)
         data = {"media": file_obj}
        try:
            res = requests.post(url=get_media_url, files=data)
            media_id = res.json()['media_id']
            return media_id
        except Exception as e:
            return str(e)
      def send_text(self, agentid, content, touser=None, toparty=None):
            send_msg_url = self.url + "/message/send?access_token=%s" %
    (self._token)
    send_data = {
        "touser": touser,
        "toparty": toparty,
        "msgtype": "text",
        "agentid": agentid,
    "text": {
            "content": content
        }
    }
    try:
          res = requests.post(send_msg_url, data=json.dumps(send_data))
        except Exception as e:
          return str(e)
    def send_image(self, agentid, file_obj, touser=None, toparty=None):
        media_id = self._get_media_id(file_obj)
        send_msg_url = self.url + "/message/send?access_token=%s" %
    (self._token)
        send_data = {
            "touser": touser,
            "toparty": toparty,
            "msgtype": "image",
            "agentid": agentid,
            "image": {
                  "media_id": media_id
              }
          }
    try:
          res = requests.post(send_msg_url, data=json.dumps(send_data))
    except Exception as e:
        return str(e)
    

    18.FTP 客户端
    通过 ftplib 模块操作 ftp 服务器,进行上传下载等操作。

    # -*- coding: utf-8 -*-
    from ftplib import FTP
    from os import path
    import copy
    class FTPClient:
          def__init__(self, host, user, passwd, port=21):
              self.host = host
              self.user = user
              self.passwd = passwd
              self.port = port
              self.res = {'status': True, 'msg': None}
              self._ftp = None
              self._login()
    def_login(self):
    '''
    登录FTP服务器
    :return: 连接或登录出现异常时返回错误信息
    '''
    try:
            self._ftp = FTP()
            self._ftp.connect(self.host, self.port, timeout=30)
            self._ftp.login(self.user, self.passwd)
    except Exception as e:
            return e
    def upload(self, localpath, remotepath=None):
    '''
    上传ftp文件
    :param localpath: local file path
    :param remotepath: remote file path
    :return:
    '''
    if not localpath: return 'Please select a local file. '
    # 读取本地文件
    # fp = open(localpath, 'rb')
    # 如果未传递远程文件路径,则上传到当前目录,文件名称同本地文件
    if not remotepath:
          remotepath = path.basename(localpath)
    # 上传文件
          self._ftp.storbinary('STOR ' + remotepath, localpath)
    # fp.close()
    def download(self, remotepath, localpath=None):
    '''
    localpath
    :param localpath: local file path
    :param remotepath: remote file path
    :return:
    '''
    if not remotepath: return 'Please select a remote file. '
    # 如果未传递本地文件路径,则下载到当前目录,文件名称同远程文件
    if not localpath:
            localpath = path.basename(remotepath)
    # 如果localpath是目录的话就和remotepath的basename拼接
    if path.isdir(localpath):
            localpath = path.join(localpath, path.basename(remotepath))
    # 写入本地文件
    fp = open(localpath, 'wb')
    # 下载文件
    self._ftp.retrbinary('RETR ' + remotepath, fp.write)
    fp.close()
    def nlst(self, dir='/'):
    '''
    查看目录下的内容
    :return: 以列表形式返回目录下的所有内容
    '''
    files_list = self._ftp.nlst(dir)
    return files_list
    def rmd(self, dir=None):
    '''
    删除目录
    :param dir: 目录名称
    :return: 执行结果
    '''
    if not dir: return 'Please input dirname'
    res = copy.deepcopy(self.res)
    try:
          del_d = self._ftp.rmd(dir)
           res['msg'] = del_d
    except Exception as e:
          res['status'] = False
           res['msg'] = str(e)
          return res
    def mkd(self, dir=None):
    '''
    创建目录
    :param dir: 目录名称
    :return: 执行结果
    '''
    if not dir: return 'Please input dirname'
    res = copy.deepcopy(self.res)
    try:
            mkd_d = self._ftp.mkd(dir)
            res['msg'] = mkd_d
        except Exception as e:
            res['status'] = False
            res['msg'] = str(e)
    return res
    def del_file(self, filename=None):
    '''
    删除文件
    :param filename: 文件名称
    :return: 执行结果
    '''
    if not filename: return 'Please input filename'
    res = copy.deepcopy(self.res)
    try:
          del_f = self._ftp.delete(filename)
          res['msg'] = del_f
    except Exception as e:
          res['status'] = False
          res['msg'] = str(e)
    return res
    def get_
    file_size(self, filenames=[]):
    '''
    获取文件大小,单位是字节
    判断文件类型
    :param filename: 文件名称
    :return: 执行结果
    '''
    if not filenames: return {'msg': 'This is an empty directory'}
    res_l = []
    for file in filenames:
        res_d = {}
        # 如果是目录或者文件不存在就会报错
    try:
        size = self._ftp.size(file)
        type ='f'
    except:
        # 如果是路径的话size显示 -, file末尾加/ (/dir/)
      size ='-'
       type ='d'
        file = file + '/'
    res_d['filename'] = file
    res_d['size'] = size
    res_d['type'] = type
    res_l.append(res_d)
    return res_l
    def rename(self, old_name=None, new_name=None):
    :param new
    _
    name: 新的文件或者目录名称
    :return: 执行结果
    '''
    if not old_name or not new_name: return 'Please input old_name and new_name'
    res = copy.deepcopy(self.res)
    try:
    rename_f = self._ftp.rename(old_name, new_name)
    res['msg'] = rename_f
    except Exception as e:
    res['status'] = False
    res['msg'] = str(e)
    return res
    def close(self):
    '''
    退出ftp连接
    :return:
    '''
    try:
    # 向服务器发送quit命令
    self.
    _
    ftp.quit()
    except Exception:
    return 'No response from server'
    finally:
    # 客户端单方面关闭连接
    self._ftp.close()
    '''
    
    19.SSH 客户端
    此脚本仅用于通过 key 连接,如需要密码连接,简单修改下即可。
    
    

    -- coding: utf-8 --

    import paramiko
    class SSHClient:
    def__init__(self, host, port, user, pkey):
    self.ssh_host = host
    self.ssh_port = port
    self.ssh_user = user
    self.private_key = paramiko.RSAKey.from_private_key_file(pkey)
    self.ssh = None
    self._connect()
    def_connect(self):
    self.ssh = paramiko.SSHClient()
    self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
    self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port,
    username=self.ssh_user, pkey=self.private_key, timeout=10)
    except:
    return 'ssh connect fail'
    def execute_command(self, command):
    stdin, stdout, stderr = self.ssh.exec_command(command)
    out = stdout.read()
    err = stderr.read()
    return out, err
    def close(self):
    self.ssh.close()

    20.Saltstack 客户端
    
    通过 api 对 Saltstack 服务端进行操作,执行命令。
    

    !/usr/bin/env python

    -- coding:utf-8 --

    import requests
    import json
    import copy
    class SaltApi:
    """
    定义salt api接口的类
    初始化获得token
    """
    def__init__(self):
    self.url ="http://172.85.10.21:8000/"
    self.username = "saltapi"
    self.password = "saltapi"
    self.headers = {"Content-type": "application/json"}
    self.params = {'client': 'local', 'fun': None, 'tgt': None, 'arg': None}
    self.login_url = self.url + "login"
    self.login_params = {'username': self.username, 'password': self.password,
    'eauth': 'pam'}
    self.token = self.get_data(self.login_url, self.login_params)['token']
    self.headers['X-Auth-Token'] = self.token
    def get_data(self, url, params):
    '''
    请求url获取数据
    :param url: 请求的url地址
    :param params: 传递给url的参数
    :return: 请求的结果
    '''
    send_data = json.dumps(params)
    request = requests.post(url, data=send_data, headers=self.headers)
    response = request.json()
    result = dict(response)
    return result['return'][0]
    def get_auth_keys(self):
    '''
    获取所有已经认证的key
    :return:
    '''
    data = copy.deepcopy(self.params)
    data['client'] = 'wheel'
    data['fun'] ='key.list_all'
    result = self.get_data(self.url, data)
    try:
    return result['data']['return']['minions']
    except Exception as e:
    return str(e)
    def get_grains(self, tgt, arg='id'):
    """
    获取系统基础信息
    :tgt: 目标主机
    :return:
    """
    data = copy.deepcopy(self.params)
    if tgt:
    data['tgt'] = tgt
    else:
    data['tgt'] = '*'
    data['fun'] ='grains.item'
    data['arg'] = arg
    result = self.get_data(self.url, data)
    return result

    def execute_command(self, tgt, fun='cmd.run', arg=None, tgt_type='list',salt_async=False):
    """
    执行saltstack 模块命令,类似于salt '' cmd.run 'command'
    :param tgt: 目标主机
    :param fun: 模块方法 可为空
    :param arg: 传递参数 可为空
    :return: 执行结果
    """
    data = copy.deepcopy(self.params)
    if not tgt: return {'status': False, 'msg': 'target host not exist'}
    if not arg:
    data.pop('arg')
    else:
    data['arg'] = arg
    if tgt != '
    ':
    data['tgt_type'] = tgt_type
    if salt_async: data['client'] = 'local_async'
    data['fun'] = fun
    data['tgt'] = tgt
    result = self.get_data(self.url, data)
    return result

    def jobs(self, fun='detail', jid=None):
    """
    任务
    :param fun: active, detail
    :param jod: Job ID
    :return: 任务执行结果
    """
    data = {'client': 'runner'}
    data['fun'] = fun
    if fun == detail':
    if not jid: return {'success': False, 'msg': 'job id is none'}
    data['fun'] = 'jobs.lookup_jid' data['jid'] = jid
    else:
    return {'success': False, 'msg': 'fun is active or detail'}
    result = self.get_data(self.url, data)

    return result

    相关文章

      网友评论

        本文标题:Linux Shell脚本经典案例(二)

        本文链接:https://www.haomeiwen.com/subject/poqurrtx.html