美文网首页
Boto3访问AWS资源操作总结(1)

Boto3访问AWS资源操作总结(1)

作者: 食梦貘的异想世界 | 来源:发表于2020-11-15 13:54 被阅读0次

最近在工作中需要对AWS上的部分资源进行查询和交叉分析,虽然场景都比较简单,但是这种半机械的工作当然还是交给Python来搞比较合适。AWS为Python提供的SDK库叫做boto3,所以我们建立一个Python项目,Interpreter选择的是venv解析,再将boto3安装到项目中,下面就可以开始愉快地写代码了。这个过程中有一些坑,记录在这里,以便后续查阅。

Query AWS CloudWatch

根据一定的搜索条件去CloudWatch中查找相关的log记录。

import boto3

def query_cloudwatch_with_condition(log_group, query, start_time, end_time):
    """
    Search CloudWatch logs by some conditions.
    :param log_group: eg. '/aws/some_log_group'
    :param query: eg. f"fields @timestamp, @message \
                            | sort @timestamp desc \
                            | filter @message like /(?i)(some_filter)/ \
                            | filter @message like /Reason:\sError:/ \
                            | limit 10 \
                            | display @message"
    :param start_time: eg. int((datetime.today() - timedelta(days=5)).timestamp())
    :param end_time: eg. int(datetime.now().timestamp())
    :return: log message string.
    """
    cw_client = boto3.client('logs')
    
    start_query_response = cw_client.start_query(
        logGroupName=log_group,
        startTime=start_time,
        endTime=end_time,
        queryString=query,
    )

    query_id = start_query_response['queryId']
    response = None

    # NOTE: Must wait for query to complete.
    while response is None or response['status'] == 'Running':
        print('Waiting for query to complete ...')
        time.sleep(1)
        response = cw_client.get_query_results(queryId=query_id)

    issue_detail = ''
    # NOTE: In my situation, we only care about the first message because we expect all logs are the same.
    for item in response['results'][0]:
        if item['field'] == '@message':
            issue_detail = item['value']
            break

    return issue_detail

Query DynamoDB

import boto3
from boto3.dynamodb.conditions import Key

def query_dynamodb_with_condition(key_conditionn_exp):
    """
    Query dynamodb with certain condition_exp (Query not Scan)
    :param key_conditionn_exp: eg. Key('id').eq(certain_id) & Key('sk').begins_with('example::')
    :return: query results list
    """
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('some-dynamodb-name')

    response = table.query(KeyConditionExpression=key_conditionn_exp)
    items = response['Items']
    
    # filter item if we have further conditions.
    for item in items:
        pass

    return items

Scan DynamoDB

对DynamoDB做scan的时候,有个坑是AWS的DynamoDB单次scan是有上限的,所以为了做到full scan,需要在代码里面有一些处理

def scan_dynamodb_with_condition(filter_condition_exp):
    """
    Full scan dynamodb with certain condition_exp
    :param filter_condition_exp: eg. Attr('sk').eq('my_sk') & Attr('name').begins_with('Jone') & Attr('isDeleted').eq(False)
    :return: scan results list
    """
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('some-dynamo-table')
    
    response = table.scan(FilterExpression=filter_condition_exp)

    # Loop to do full scan
    results = response['Items']
    index = 1
    while 'LastEvaluatedKey' in response:
        print(f'scanning....{index}')
        index += 1
        response = table.scan(
            ExclusiveStartKey=response['LastEvaluatedKey'],
            FilterExpression=filter_condition_exp)
        
        results.extend(response['Items'])
        print(len(results))

    return results

List S3 objects and read contents

读取S3某个路径下的所有objects也有一个坑,就是默认单次get object的上限是1000个,所以如果想做到full list,也需要做特定的处理。

def get_all_s3_objects(s3, **base_kwargs):
    """
    Private method to list all files under path
    :param s3: s3 client using boto3.client('s3')
    :param base_kwargs: scan args
    :return: yield file path to caller
    """
    continuation_token = None
    while True:
        list_kwargs = dict(MaxKeys=1000, **base_kwargs)
        if continuation_token:
            list_kwargs['ContinuationToken'] = continuation_token
        
        response = s3.list_objects_v2(**list_kwargs)
        yield from response.get('Contents', [])

        if not response.get('IsTruncated'):  # At the end of the list?
            break

        continuation_token = response.get('NextContinuationToken')


def main():
    bucket_name = 'my-bucket-name'
    s3_client = boto3.client('s3')
    # using prefix to define search folder
    prefix = 'this-is-some-path-without-prefix-and-postfix-slash'

    file_paths = []
    for file in get_all_s3_objects(s3_client, Bucket=bucket_name, Prefix=prefix):
        file_paths.append(file['Key'])
    
    print(f'length of file_paths: {len(file_paths)}')
    with open('./file_paths_results.json', 'w') as f:
        f.write(json.dumps(file_paths))
        print('finished writing file paths into json file')

Read S3 file contents

在读取S3文件的内容时,我们遇到了文件Body里的内容(来自AWS SQS的message)无法正确的转换为json的问题,因为时间问题,没有太深入地研究,只是简单地做了一些非json语法字串的替换,把内容拿出来了,后面可以再研究一下这种文件内容需要怎么正确加载到json里。

import json
import re
from pprint import pprint

import boto3
from dynamodb_json import json_util

def read_file_contents(s3client, bucket, path):
    """
    Read a file content with it's key (filepath)
    :param s3client: eg. boto3.client('s3')
    :param bucket: eg. 'some-bucket-name'
    :param path: eg. 'some-path-to-my-file-with-postfix-no-slash-prefix'
    :return: file contents in json format
    """
    file_obj = s3client.get_object(
        Bucket=bucket,
        Key=path)
    
    # open the file object and read it into the variable filedata.
    file_data = file_obj['Body'].read()

    # TODO: we did some ugly string replace here.. will fix this later
    print_str = json_util.loads(file_data).replace('\\', '').replace('""', '"').replace('"Body":"', '"Body":').replace(
        '}}}"}', '}}}}').replace('= "', '- ').replace('" Or', ' -').replace('" And', ' -')
    
    json_obj = json_util.loads(print_str)

    # NOTE: we use regex to match what we want.
    # match = re.findall('someKey":{"S":"(.*?)"', print_str)
    # if match:
    #     pprint(f'find key: {match[0]}')
    #     return match[0]
    # else:
    #     print(f'no key found!')
    #     return None

    return json_obj

总结

本文作为此次生产环境数据问题Investigate的解决过程,记录在这里,数据已经经过脱敏,请结合自己的实际环境进行配置。


原文作者:Asinta
原文链接:https://www.asinta.cn/2020/11/10/Boto3%E8%AE%BF%E9%97%AEAWS%E8%B5%84%E6%BA%90%E6%93%8D%E4%BD%9C%E6%80%BB%E7%BB%93-1/
版权声明:本文采用知识共享署名-非商业性使用 4.0 国际许可协议进行许可

相关文章

  • Boto3访问AWS资源操作总结(1)

    最近在工作中需要对AWS上的部分资源进行查询和交叉分析,虽然场景都比较简单,但是这种半机械的工作当然还是交给Pyt...

  • AWS python sdk boto3 中的 client 与

    AWS 在boto3 中提供了两个级别的接口来访问AWS服务: High Level 的Resource级别的接口...

  • AWS boto3

    Basic Configuration boto3 use AWS CLI's configuration fil...

  • AWS云计算助手级架构师认证之IAM概要

    IAM是AWS架构中对哪些用户能够对哪些资源进行访问这一部分的控制。也就是当用户对AWS内资源进行访问时,IAM可...

  • BOTO3使用

    boto3 使用 下载安装 quickstart 下载安装 配置 安装aws cli 客户端 note: 安装完成...

  • V4.5产品正式发布 :支持Telnet协议、SSH跳板机

    在这个版本中,我们支持了如:基于Telnet协议访问主机、SSH跳板机、AWS中国资源成本分析、AWS中国S3等用...

  • AWS IAM 简介

    什么是IAM IAM是一种AWS的web service, 专门用来帮助你去做对AWS资源访问的控制,你可以使用I...

  • Hive分析AWS ELB访问日志

    保存 AWS ELB 访问日志 AWS ELB - AWS elastic load balancing,为了方便...

  • 011

    AWS EC2快速入门 必备条件 1.您需要一个AWS账户,访问http://aws.amazon.com获得。2...

  • AWS 开发设置

    1.申请AWS账号 2.在IAM中设置加密密钥 3.安装CLI 4.执行命令,添加访问密钥 创建访问密钥 AWS ...

网友评论

      本文标题:Boto3访问AWS资源操作总结(1)

      本文链接:https://www.haomeiwen.com/subject/tpzibktx.html