美文网首页
Python开发实践技巧

Python开发实践技巧

作者: Guang777 | 来源:发表于2023-06-20 09:46 被阅读0次

目录

在 CentOS 7 从源代码安装 Python3.10.8

sudo yum install --downloadonly --downloaddir pkgs --security --bugfix yum-utils wget

sudo yum install yum-utils wget -y
sudo yum-builddep python3 -y
wget https://www.python.org/ftp/python/3.10.8/Python-3.10.8.tgz
tar -xzf Python-3.10.8.tgz
cd Python-3.10.8
sudo ./configure --enable-optimizations --enable-loadable-sqlite-extensions
sudo make
sudo make install
python3.10 -V

--with-openssl=/etc/pki/tls

备注: python3.10的安装路径为:/usr/local/bin/python3.10
--downloadonly --downloaddir=rpms

在 Ubuntu 20.04 从源代码安装 Python3.10.8

echo "deb-src http://mirrors.aliyun.com/ubuntu/ focal main" | sudo tee -a /etc/apt/sources.list
sudo apt-get update
sudo apt-get build-dep python3
sudo apt-get install pkg-config
sudo apt-get install build-essential gdb lcov pkg-config libbz2-dev libffi-dev libgdbm-dev libgdbm-compat-dev liblzma-dev  libncurses5-dev libreadline6-dev libsqlite3-dev libssl-dev  lzma lzma-dev tk-dev uuid-dev zlib1g-dev wget
wget https://www.python.org/ftp/python/3.10.8/Python-3.10.8.tar.xz
tar -xvf Python-3.10.8.tar.xz
cd Python-3.10.8
sudo ./configure --enable-optimizations --enable-loadable-sqlite-extensions
sudo make install
/usr/local/bin/python3.10 -V

虚拟环境及包管理

# 创建虚拟环境
python3 -m venv /path/to/new/virtual/environment

# 激活环境
source path/to/py_env/bin/activate

# 导出当前环境中已经安装的 package
pip3 freeze > requirements.txt

# 从指定文件安装 package
pip3 install -r requirements.txt

# 安装指定包
pip install xxx

# 卸载所有依赖: (除 pip, setuptools 之外, 以下方法均可)
1. pip uninstall -y -r <(pip freeze)
2. pip freeze | xargs pip uninstall -y

常用的数据结构及其特点:

常用的数据结构有: int, float, decimal, str, tupple, list, set, dict, generater.

常用操作对应的时间复杂度: 时间复杂度.

特点:

  1. 整型 int:
    1. 没有大小限制;
  2. 字符串:
    1. str.split("AA|BB|CC"): 会默认将每个字符视为分隔符, 无法表示 AA 或者 BB 或者 CC, 如果想要达到这样的效果可以使用正则: re.split("AA|BB|CC"). 其中 | 表示 或者;
    2. str.strip("城市"): 会去掉结尾的 "城市" 或"城", "市". 如果只想去掉两侧的"城市"可以使用re.sub()
  3. 列表:
    1. 获取指定位置的元素;
    2. 从最后插入数据;
    3. 从最后删除数据;
    4. 遍历;
  4. 集合:
    1. 各元素互不相同;
    2. in: 判断是否包含;
    3. 可以进行数学上的集合操作, 如 交集, 并集, 差集等;
  5. 字典:
    1. key 各不相同;
    2. 根据 key 快递获取对应的 value;
  6. 生成器:
    1. 类似于数据库中的 cursor;
    2. 不能追加, 删除其中的元素;
    3. 只能遍历一遍;
    4. 占用内存少;

编码

str 转化成 byte:

形式上转换成 byte 之后, 只是在前面加了一个 b, 如 b"hello Python".

  • str.encode(errors="replace"), 遇到错误时, 使用 ? 替代.
  • bytes("a", encoding='utf-8').

byte 转化成 str:

bytes.decode(errors="replace"), 遇到错误时, 使用 �(U+FFFD)代替.

int -> byte:

  • int.to_bytes(length, byteorder, signed=False), 如 (1024).to_bytes(2, byteorder='big')
  • bytes([1,2,3,4]) Out[27]: b'\x01\x02\x03\x04', 可通过形如 a[0] 的索引单独引用, 不可变序列, 类似于元祖.
  • bytearray([1,2,3]), bytearray(b'\x01\x02\x03'), 也可以通过索引访问, 类似于列表.

hex -> byte:

  • bytes.fromhex().

byte -> hex:

  • bytes.hex()

byte -> int:

int.from_bytes(length, byteorder, signed=True), 如 int.from_bytes(b'\x00\x10', byteorder='big')

struct.pack(format, v1, v2...): 将整数, 浮点数 或字节串按照指定的字节序及类型转换成 bytes, 可以同时转化多个. struct.pack(">QQ", 1,2)

struct.unpack(format, buffer): 将 strict.pack() 生成结果反向转化为 v1, v2.

bytes, bytearray 常用的method, 如 find(), endswith() 等基本和字符串一样.

IO

主要有以下2种IO

  • 文本IO
  • 二进制IO;

每种IO都可以通过打开文件, 如 with open(file, 'w') as f, with opne(file, 'wb') as f.

将现有内容转化成IO, 如s = "hello python", io.StringIO(s), s = b"hello", io.BytesIO(s)

大部分Unix系统使用 utf-8 编码, 但是 win 系统的默认编码并不是 utf-8.

文件编码格式与读文件性能

如果文件以 ascii 编码, open(file, 'rb'), open(file, "r") 的性能基本一致;
如果文件以其他编码格式编码(如GBK,UTF-8), open(file, 'rb')的性能比 open(file, "r")大约快3部(和编码方式有关)。
seek(), tell() 中使用的都是byte, 而非字符(utf-8编码下,中文占用 3 byte)

转义字符

Python 读取文件时, 会把里面的 \ 字符当做原始字符(不当做转义字符), 但是在程序中定义的字符串中的\会被当做转义字符处理, 可以在字符串前面加上 r 告诉程序该字符串中的\不是转义字符, 如r"\a\n".

Python 读取以下文件时:

\F9\BE\B4\D9inv\00\00

使用 print 打印出来之后是:

\\F9\\BE\\B4\\D9inv\\00\\00

而如果在 Python 程序中定义的字符串:

first = "\abc"
print(first)        # 结果是: bc  默认把 \ 视为转义字符

second = r"\abc"
print(second)       #  结果是: \\abc       把将 \abc 视为原始字符

代码规范

Python 要求符合 pep8 规范.

格式化工具:

  1. autopep8;
  2. black(推荐).

推荐格式:

  1. 脚本最开始注明该脚本的用途, 输入参数, 输出内容, 主要流程, 及用到的复杂的数据结构;

  2. 其次是需要导入的 package;

  3. 函数定义: 1. 注明该函数的用途; 2. 输入参数及其类型; 3: 返回值类型; 如下所示:

    def block_by_number(
        session: Session, block_number: int, host: str = "http://10.1.1.20:8545"
    ) -> Dict:
        """从节点服务器下载该区块内的主币交易数据"""
        ....
    

异常处理

异常处理的基本原则: 白名单.

只有在白名单内的异常才可以通过, 其他异常一定有记录到日志或输出到终端, 绝对不能用 Exception 处理一切异常.

在个别生成环境下, 如处理中心的数据, 脚本日志没法导出, 同时为了降低记录对处理速度的影响, 可以先不记录日志. 只在调试情况下, 才记录日志.

执行顺序

try:
    ...
except IndexError:
    ...
except:
    ...
finally:
    ...

代码先从 try进入:

  1. 如果没有遇到报错, 则会执行finally;
  2. 如果遇到异常, 则会在except中寻找对应的异常;
    1. 如果有对应的异常处理, 就会执行相应的代码块, 最后执行finally;
    2. 如果没有对应的异常处理, 就会执行最后的 except(类似于 else), 然后再执行 finally;
    3. 如果没有对应的异常处理, 并且没有最后的 except, 就会直接执行 finally, 然后报错;

NOTICE:

  1. 如果要逐行处理文件, 需要将 try...except...finally模块放到for line in fin之内. 这样可以在不中断程序的情况下, 将文件的所有行全部处理.

捕获多个异常

如果对于多个异常的处理方式相同, 就可以把多个异常放入一个 except中, 如:

try:
    ...
except (ValueError, ArithmeticError) as e:
    ...
except :
    print("未知异常")
print("程序继续运行")

命令行程序

参数类型:

  • 子命令;
  • 可选参数: 以 -- 开始, 缩写为 -, 可有可无, 每个参数可以设置默认值, 类型. click.option()
  • 位置参数, 使用位置表示含义, 必须有的参数; click.argument()
$ docker run --help
Usage:  docker run [OPTIONS] IMAGE [COMMAND] [ARG...]
Run a command in a new container
Options:
      --add-host list                  Add a custom host-to-IP mapping (host:ip)
  -a, --attach list                    Attach to STDIN, STDOUT or STDERR
      --blkio-weight uint16            Block IO (relative weight), between 10 and 1000, or 0 to disable
                                       (default 0)  

推荐的命令行工具:

  • click.

click

添加 -h 查看帮助信息 (默认情况是 --help).

@click.group(context_settings=dict(help_option_names=['-h', '--help']))
@click.command(context_settings=dict(help_option_names=['-h', '--help']))

命令组

优点:

  1. 一个程序作为入口, 调用其他程序, 便于管理;
  2. 可以在入口程序指定常用参数, 并且可以传递至被调用程序;
  3. 指定命令的映射关系 (通过 cli.add_command(hello, "hello"))

程序组织结构:

  • cli.py:
  • commands/:
    • first.py
    • second.py

程序入口: cli.py

from pathlib import Path
from sys import path

import click

path.insert(0, str(Path(__file__).parent))
from hello import hello


@click.group()
@click.option("--start")
@click.pass_context
def cli(ctx, start):
    ctx.ensure_object(dict)
    ctx.obj['start'] = start

cli.add_command(hello, "hello")

if __name__=="__main__":
    cli()

被调用程序: hello.py

from pathlib import Path
from sys import path

import click

path.insert(0, str(Path(__file__).parent))

@click.command()
@click.option("--end")
@click.pass_context
def hello(ctx, end):
    print("hello")
    print(end)
    print(ctx.obj['start'])

argparse

如果使用argparse解析命令行参数, 可以这样使用:

parser.add_argument("--debug", action="store_true", help="是否开启调试模式")

表示: 当出现--debug时, 是 True; 没有出现时, 是 False.

配置文件

configparser

Python 读取配置文件可以通过configparser这个自带的 package.

; 配置文件为(文件名为`example.ini`):
; 等号两侧可以有空格, 也可以没有, 会自动去掉
; 字符串不需要带双引号或单引号

[DEFAULT]
ServerAliveInterval = 45
Compression = yes
CompressionLevel = 9
ForwardX11 = yes

[bitbucket.org]
User = hg

[topsecret.server.com]
Port = 50022
ForwardX11 = no
>>> import configparser
>>> config = configparser.ConfigParser()
>>> config.sections()   
>>> []
>>> config.read('example.ini')
>>> ['example.ini']
>>> config.sections()
>>> ['bitbucket.org', 'topsecret.server.com']
>>> 'bitbucket.org' in config
>>> True
>>> 'bytebong.com' in config
>>> False
# 直接通过字典形式的访问 返回值都是字符格式
>>> config['bitbucket.org']['User']
>>> 'hg'
>>> config['DEFAULT']['Compression']
>>> 'yes'
>>> topsecret = config['topsecret.server.com']
>>> topsecret['ForwardX11']
>>> 'no'
>>> topsecret['Port']
>>> '50022'
# 还可以通过 `.get()`,`.getint(),`.getfloat()`,`.getboolean()`直接获取合适的格式
>>> config.getint('DEFAULT','ServerAliveInterval')
>>> 45  
>>> config.getboolean('DEFAULT','Compression')
>>> True
>>> config.getfloat('DEFAULT','CompressionLevel')
>>> 9.0

python-dotenv

优点:

  • 可以将配置文件里面的变量转换为环境变量, 通过os.getenv()获取.

基本用法

  1. 加载配置文件. dotenv.load_dotenv
    1. dotenv_path: 配置文件路径;
    2. verbose: 如果没有找到.env配置文件, 是否发出警告.
    3. override: 是否覆盖环境变零.

文件读取

使用上下文管理器:

with open(file_path, 'r', encoding='utf-8') as fin:
    fin.read()  # 读取全部或固定字节
    fin.readline()  # 读取第一行
    fin.readlines() # 读取所有行, 返回列表类型, 每行是一个元素
    
    # 推荐用法: 逐行读取, 返回生成器类型
    for line in fin:
        pass

NOTICE:

  • 打开文件, 最好在主函数, 不要每次读取或写入都打开/关闭一次文件. 连接数据库/网络时也类似。

  • 读取文件时, 获取的每行数据以 \n结尾.

  • 写入文件时, 每个字符串必须以 \n结尾, 否则会写到同一行.

  • 当需要打开的文件比较多时, 如超过20个, 无法再使用该种方法管理文件, 可以使用

    fin = open(file_path, 'r', encoding='utf-8')
    ...
    fin.close()
    

    因为 Python 规定同一个函数内的嵌套层数不能超多 20 层;

    这样的缺点: 脚本被意外终止时, 文件不能正确关闭.

序列化与反序列化

推荐工具:

  • json: 标准库;
  • ujson: 第三方库: 解析速度更快, 容错比较低. 速度能提升 20%;

正则表达式

常用:

  1. re.compile(): 预编译, 如果需要经常使用, 推荐预编译;
  2. re.match(): 从字符串开始进行匹配;
  3. re.search(): 从字符串任何位置开始;
  4. re.findall(): 找出所有的匹配项, 返回列表类型;
  5. re.split(): 具有正则工具的分割;
  6. re.sub(): 可以使用替换达到去掉前面/后面某些词的功能;

日志与错误记录

日志记录

如果只记录单个日志文件, 可以进行如下设置.

logging.basicConfig(
    filename='./log.log',
    filemode='a',
    level=logging.DEBUG,
    format="%(asctime)s|%(name)s|%(levelname)s|%(message)s",
)
logger = logging.getLogger(__name__)
logger.info()

其中:

  1. level: 日志等级可以分为: DEBUG, INFO, WARNING, ERROR, CRITICAL.
  2. format 的参数如下:
%(levelno)s 打印日志级别的数值
%(levelname)s 打印日志级别名称
%(pathname)s 打印当前执行程序的路径,其实就是sys.argv[0]
%(filename)s 打印当前执行程序名
%(funcName)s 打印日志的当前函数
%(name)s logger 名称
%(lineno)d 打印日志的当前行号
%(asctime)s 打印日志的时间
%(thread)d 打印线程ID
%(threadName)s 打印线程名称
%(message)s 打印日志信息

常用的格式可以为:format='%(asctime)s|%(levelname)s|%(name)s|%(funcName)s|%(message)s'

记录报错信息

  1. logging.exception(info) 等价于 logging.error(info, exc_info=True).

参数exc_info=True可用于控制输出报错时的 traceback.

@click.command()
@click.option("--debug", is_flag=True, default=False)
def main(debug):
    click.echo(f"{debug}")

    logging.basicConfig(level=logging.DEBUG if debug else logging.INFO)

    logging.debug("debug", exc_info=True)

这样很好地切换开发环境和生产环境.

  1. 记录报错信息可以通过logging.exception()函数获得, 并记录到日志文件:

    import logging
    try:
        printf('hello world')
    except Exception as a:
        logging.exception('Error occured while printing')
    
    print('1')
    # 以下为输出
    # ERROR:root:Error occured while printing
    # Traceback (most recent call last):
      File "/home/light/gitrepo/blockchain-etl/tron-etl/test.py", line 10, in <module>
        printf('hello world')
    # NameError: name 'printf' is not defined
    # 1
    
  1. 报错信息也可以通过tracebase模块, 同时结合logging模块, 把报错信息记录到日志文件中. 常用的函数有以下几个:
  • traceback.format_exc(): 返回异常信息的字符串, 可以和logging模块结合使用, 把报错信息记录记到日志文件;
  • traceback.print_exc(): 直接把报错信息输出到终端; 也可以把异常信息写入到文件 traceback.print_exc(file=open('traceback_INFO.txt','w+')).
try:
    func()
except Exception as e:
    logging.debug(traceback.format_exc())
    logging.debug(repr(e))

测试

pytest

控制输出

使用 test_xxx 函数中使用 print 输出信息时,默认不会输出这些信息,可通过 -s 设置输出 print 的输出信息。

fixture

def add(x, y):
    return x + y

@pytest.mark.parametrize("a, b, expected", [[1, 2, 3], [2, 3, 5]])
def test_add(a, b, expected):
    assert add(a, b) == expected

@pytest.fixture
def args():
    return [[1, 2, 3], [2, 3, 5]]


def test_args(args):
    for a, b, expected in args:
        assert add(a, b) == expected

# 类似于通过命令行执行 pytest -s -v test_one.py
if __name__ == '__main__':
   pytest.main(["-s","-v","test_one.py"])

fixture 可以请求其他 fixture

# contents of test_append.py
import pytest


# Arrange
@pytest.fixture
def first_entry():
    return "a"


# Arrange
@pytest.fixture
def order(first_entry):
    return [first_entry]


def test_string(order):
    # Act
    order.append("b")

    # Assert
    assert order == ["a", "b"]

同一个 fixture 可以被使用多次

一个 fixture 每次被调用的结果都一样,这就确保了测试不会相互影响。

# contents of test_append.py
import pytest


# Arrange
@pytest.fixture
def first_entry():
    return "a"


# Arrange
@pytest.fixture
def order(first_entry):
    return [first_entry]


def test_string(order):
    # Act
    order.append("b")

    # Assert
    assert order == ["a", "b"]


def test_int(order):
    # Act
    order.append(2)

    # Assert
    assert order == ["a", 2]

执行 test_string 时,order 是 ["a"],执行 test_int 时,order 依然是 ["a"],而不是 ["a", "b"],

上面的测试脚本如果手动执行,是:

entry = first_entry()
the_list = order(first_entry=entry)
test_string(order=the_list)

entry = first_entry()
the_list = order(first_entry=entry)
test_int(order=the_list)

每个测试都可以多次使用 同一个 fixture (这时会缓存返回值)

# contents of test_append.py
import pytest


# Arrange
@pytest.fixture
def first_entry():
    return "a"


# Arrange
@pytest.fixture
def order():
    return []


# Act
@pytest.fixture
def append_first(order, first_entry):
    return order.append(first_entry)


def test_string_only(append_first, order, first_entry):
    # Assert
    assert order == [first_entry]


def test_string(order):
    assert order == []

如果一个被请求的 fixture 在测试期间每次被请求时都被执行一次,那么这个测试将会失败,因为append_first和test_string_only都会将order视为一个空列表,但由于order的返回值在第一次被调用后被缓存(以及执行它可能有的任何副作用),test和append_first都引用了同一个对象,测试中看到了append_first对该对象的影响。

在 test_string 中 order 依旧是 [].

说明在一个测试函数中,如果对同一个 fixture 多次使用,那么引用的都是同一个对象(如 test_string_only 中);但在其他测试函数中,调用该 fixture 时,却引用的另一个对象。因为默认情况下,fixture 的有效范围就是当前函数(默认值),如果将 fixture order 的范围(scope)改为 session,那么所有的 测试函数引用的都是同一个对象。如下面的代码,test_string 就无法通过测试,因为在 append_first 函数中,order 已经变为 ["a"].

# contents of test_append.py
import pytest


# Arrange
@pytest.fixture
def first_entry():
    return "a"


# Arrange
@pytest.fixture(scope="session")
def order():
    return []


# Act
@pytest.fixture
def append_first(order, first_entry):
    return order.append(first_entry)


def test_string_only(append_first, order, first_entry):
    # Assert
    assert order == [first_entry]


def test_string(order):
    assert order == []

autouse

如果有一个 fixture,所有的测试函数都需要用到,就可以通过设置 autouse=True 得以实现。

# contents of test_append.py
import pytest


@pytest.fixture
def first_entry():
    return "a"


@pytest.fixture
def order(first_entry):
    return []


@pytest.fixture(autouse=True)
def append_first(order, first_entry):
    return order.append(first_entry)


def test_string_only(append_first, order, first_entry):
    # Assert
    assert order == [first_entry]


def test_string(order):
    assert order == ["a"]

正常情况下,test_string 引用的 order 是 [], 但是通过将 append_first 设置成自动使用,在所有的测试函数执行之前,会先执行 append_first 这时 order 就变成了 ["a"],之后所有测试函数引用 order 时,order的初始值都是 ["a"].

作用域

上面的例子可以看出,在每个测试函数中,order都对应不同的实例,这是因为 fixture 的默认作用域是 "function", 当该函数结束时,该 fixture 对应的实例就会被销毁。fixture 的作用域有以下几种:

  • function:默认作用域,当前测试函数结束时被销毁;
  • class:在该 class 的最后一个测试函数结束时,该 fixture 实例被销毁;
  • module:在当前 module (当前文件)的最后一个测试函数结束时,该 fixture 实例被销毁;
  • package: 在当前 package 的最后一个测试函数结束时,该 fixture 实例被销毁;
  • session:当前会话,所有的测试函数都使用同一个实例;

在进行网络、数据库操作时,创建连接往往好用大量的资源,这是就可以将作用域根据实际情况适当扩大。

拆除 fixture

在我们运行测试时,我们希望确保它们在自己完成之后进行清理,这样它们就不会扰乱其他测试(也不会留下大量的测试数据来膨胀系统)。pytest中的fixture提供了一个非常有用的拆卸系统,它允许我们为每个fixture定义必要的特定步骤,以便在它们自己之后进行清理。

yield fixtures

使用这些fixture,我们可以运行一些代码并将一个对象传回请求fixture/test,就像使用其他fixture一样。唯一的区别是:

  • return被换成了yield。
  • 该fixture的拆卸代码位于生成之后。

一旦pytest为fixture确定了一个线性顺序,它将运行每个fixture,直到它返回或产生,然后移动到列表中的下一个fixture来做同样的事情。

测试完成后,pytest将返回fixture列表,但顺序相反,获取每个产生的fixture,并在其中运行yield语句之后的代码。

作为一个简单的例子,考虑这个基本的电子邮件模块:

# content of emaillib.py
class MailAdminClient:
    def create_user(self):
        return MailUser()

    def delete_user(self, user):
        # do some cleanup
        user.inbox.clear()
        pass


class MailUser:
    def __init__(self):
        self.inbox = []

    def send_email(self, email, other):
        other.inbox.append(email)

    def clear_mailbox(self):
        self.inbox.clear()


class Email:
    def __init__(self, subject, body):
        self.subject = subject
        self.body = body
# content of test_emaillib.py
import pytest

from emaillib import Email, MailAdminClient


@pytest.fixture
def mail_admin():
    return MailAdminClient()


@pytest.fixture
def sending_user(mail_admin):
    user = mail_admin.create_user()
    yield user
    mail_admin.delete_user(user)


@pytest.fixture
def receiving_user(mail_admin):
    user = mail_admin.create_user()
    yield user
    mail_admin.delete_user(user)


def test_email_received(sending_user, receiving_user):
    email = Email(subject="Hey!", body="How's it going?")
    sending_user.send_email(email, receiving_user)
    assert email in receiving_user.inbox

因为receiving_user是安装期间运行的最后一个fixture,所以它是拆卸期间运行的第一个fixture。

处理yield fixture的错误

如果yield fixture在yield之前引发异常,pytest将不会尝试在该yield fixture的yield语句之后运行拆卸代码。但是,对于已经为该测试成功运行的每个fixture, pytest仍然会像正常情况一样试图将它们删除。

# content of test_emaillib.py
import pytest

from emaillib import Email, MailAdminClient


@pytest.fixture
def setup():
    mail_admin = MailAdminClient()
    sending_user = mail_admin.create_user()
    receiving_user = mail_admin.create_user()
    email = Email(subject="Hey!", body="How's it going?")
    sending_user.send_email(email, receiving_user)
    yield receiving_user, email
    receiving_user.clear_mailbox()
    mail_admin.delete_user(sending_user)
    mail_admin.delete_user(receiving_user)


def test_email_received(setup):
    receiving_user, email = setup
    assert email in receiving_user.inbox

以上 2 段的测试代码的处理逻辑一致,第 2 个版本更紧凑,但也更难阅读,没有一个非常描述性的fixture名称,而且没有一个fixture可以很容易地重用。

还有一个更严重的问题,即如果设置中的任何一个步骤引发异常,则所有的销毁代码都不会运行。

最安全、最简单的fixture结构要求限制每个fixture只做一个状态更改操作,然后将它们与拆卸代码捆绑在一起,如第 1 个例子所示。

参数化 fixture

import pytest

@pytest.fixture(params=[1, 2], ids=["first", "second"])
def args(request):
    return request.param

def test_args(args):
    assert args == 1

可以通过以上方式参数化 fixture,会依次执行 params 中的每个参数。

备注:

  • 参数需要是 request,可以通过 request.param 返回其参数;
  • 可以通过 ids 为 params 中的每个参数对应的实例赋予 ID;

使用 usefixture 在类和模块中自动使用 fixture

前面介绍了通过参数 autouse 设置 fixture 是否自动执行,但是如果该 fixture 只需要在测试某个类或模块时自动执行,在测试其他类和模块时不需要自动执行,就不能使用 autouse 设置自动执行。该种情况可以使用 usefixture 进行设置。

# content of conftest.py
import os
import tempfile

import pytest

@pytest.fixture
def cleandir():
    with tempfile.TemporaryDirectory() as newpath:
        old_cwd = os.getcwd()
        os.chdir(newpath)
        yield
        os.chdir(old_cwd)
# content of test_setenv.py
import os
import pytest

@pytest.mark.usefixtures("cleandir")
class TestDirectoryInit:
    def test_cwd_starts_empty(self):
        assert os.listdir(os.getcwd()) == []
        with open("myfile", "w") as f:
            f.write("hello")

    def test_cwd_again_starts_empty(self):
        assert os.listdir(os.getcwd()) == []

通过在类定义前面使用 @pytest.mark.usefixtures("cleandir"),该类下的所有测试函数执行前都会自动执行 cleandir。

如果将 usefixture 移动到 test_cwd_starts_empty 定义前,那么 fixture cleandir 只在 test_cwd_starts_empty 执行前被调用。

可以通过 @pytest.mark.usefixtures("cleandir", "anotherfixture") 指定使用多个 fixture。

重写 fixture

如果有同名 fixture,引用顺序为:

  1. 当前文件中通过 pytest.mark 指定的参数,如 @pytest.mark.parametrize('username', ['directly-overridden-username'])
  2. 当前文件中的 fixture;
  3. 父级文件中的 fixture;
  4. 非参数化 fixture 覆盖 参数化 fixture;
tests/
    __init__.py

    conftest.py
        # content of tests/conftest.py
        import pytest

        @pytest.fixture(params=['one', 'two', 'three'])
        def parametrized_username(request):
            return request.param

        @pytest.fixture
        def non_parametrized_username(request):
            return 'username'

    test_something.py
        # content of tests/test_something.py
        import pytest

        @pytest.fixture
        def parametrized_username():
            return 'overridden-username'

        @pytest.fixture(params=['one', 'two', 'three'])
        def non_parametrized_username(request):
            return request.param

        def test_username(parametrized_username):
            assert parametrized_username == 'overridden-username'

        def test_parametrized_username(non_parametrized_username):
            assert non_parametrized_username in ['one', 'two', 'three']

    test_something_else.py
        # content of tests/test_something_else.py
        def test_username(parametrized_username):
            assert parametrized_username in ['one', 'two', 'three']

        def test_username(non_parametrized_username):
            assert non_parametrized_username == 'username'

conftest.py 表示该模块的 fixture,执行 pytest 测试时,会自动从该文件中获取 fixture,并且不会从其他文件中获取,比如测试 test_something_else.py 时,就无法获取 test_something.py 中定义的 fixture parametrized_username,non_parametrized_username,只能获取 conftest.py 中的 fixture parametrized_username,non_parametrized_username。因为可以在以下位置写 fixture:

  1. 当前文件;
  2. 当前 module 中的 conftest.py;
  3. 上一级 module 中的 conftest.py;

处理异常

def myfunc():
    raise ValueError("Exception 123 raised")

def test_zero_division():
    with pytest.raises(ValueError):
        myfunc()

如果执行 statement, 抛出 ZeroDivisionError 的异常, 则视为通过测试; 反之则视为未通过测试.

处理进度

可使用tqdm输出进度, 前提是明确总工作量(如总函数), 具体用法为:

from tqdm import tqdm

for i in tqdm(range(100), desc="Processing", mininterval=1):
    time.sleep(0.05)

同时, tqdm也可以与 Python 中的其他迭代函数结合使用, 如 enumerate, zip, map

字符串格式化输出

  • 当数值很接近于 0 时(如 1E-18), 怎么才能让其表示为 0.0000000000001, 而不是科学计数法.

    当数值很接近0时, 用科学计数法表示, 但这只是一种表示方法, 该变量的值并没有变, 可以考虑使用 format将其转换成字符串, 这样在显示或输出的时候就可以完整显示了.

    format()的更多用法可以参考官网.

    "{:18f}".format(Decimal(10)/(10**18)).lstrip(): 至少保留18位, 当数值的实际长度大于18位, 完整显示; 小于18位时, 左侧使用空格填充.

    "{:.18f}".format(): 保留18位小数, 如果小数位大于18位, 会截断; 如果小于, 右侧使用0填充;

    ":^/</>".format(): ^, <, >: 分别表示 居中/左/友对齐;

    "{:+/-f}".format(): 将该数值的 +/- 也显示出来;

    "int:{0:d}, hex:{0:x}, oct:{0:o}, bin:{0:b}".format(42): 将 42 分别转换为十六进制, 八进制和二进制.

if 判断

空列表 ([]), 空字符串 (""), 空字段 ({}) , None, 0 放在 if条件中, 都会视为 FALSE.

多线程变量共享

Python 多线程编程时, 可通过 queue 模块在不同线程之后共享变量, queue 本身就具有线程安全.

queue 主要包括以下 3 种数据结构:

  • Queue: first in, first out 模型的队列;
  • LifoQueue: Last in, first out 模型的队列;
  • PriorityQueue: 带有优先级的队列, 每次插入数据, 都需要指定一个优先级 (priority_number, data), 取数据的时候, 优先返回优先级较低的数.

最佳实践

import threading
import queue
import time


def worker(q: queue.Queue):
    """消费者"""
    while True:
        item = q.get()
        print(f"Working on {item}")
        print(f"Finished {item}")
        q.task_done()


def producer(q: queue.Queue):
    """生产者"""
    # Send thirty task requests to the worker.
    for item in range(30):
        q.put(item)
        time.sleep(0.1)


def main():
    # Turn-on the worker thread.
    q = queue.Queue()
    # 任务完成后会自动退出
    threading.Thread(target=producer, args=(q,)).start()
    # 将消费者线程设置为 daemon,该线程会和主进程一起退出
    threading.Thread(target=worker, args=(q,), daemon=True).start()

    # 阻塞主进程,一直到队列里面所有任务都完成。
    q.join()
    print("All work completed")


if __name__ == "__main__":
    main()

网络请求

requests

import json
import requests

url = r'http://192.168.1.20:8332'
headers = {'Content-Type': 'application/json'}
data = {"jsonrpc": "2.0", "id": "id", "method": "omni_gettransaction",
        "params": ["6c0b21ed486c15fbf495d77ff75bd34e059b9fc7a964bb5d8305d153c2d4bc56"]}
# 如果需要简单的认证, 可以把账号和密码放入`auth`参数中
# 如果需要传入参数, 可以放入`data`参数, 该参数只接受字符串格式, 如果是data这样的字典, 需要转换成字符串格式;
resp = requests.post(url, auth=(username, password), headers=headers, data=json.dumps(data))

# 如果`data`本身就是字符串格式, 可以直接传入, 如:
data = '{"jsonrpc": "2.0", "id": "id", "method": "omni_gettransaction","params":                ["6c0b21ed486c15fbf495d77ff75bd34e059b9fc7a964bb5d8305d153c2d4bc56"]}'
resp = requests.post(url, auth=(username, password), headers=headers, data=data)

curl命令的转换:

以上案例对应的curl命令为:

curl -X POST -H "Content-Type: application/json" --user omni:omni -d '{"jsonrpc":"2.0","id":"id","method":"omni_gettransaction","params":["6c0b21ed486c15fbf495d77ff75bd34e059b9fc7a964bb5d8305d153c2d4bc56"]}' http://192.168.1.20:8332

异步请求

推荐工具: aiohttp.

  1. 等待多个异步任务完成.
for task in tasks:
    await asyncio.wait_for(task, timeout=None)

# 可以使用 以下语句代替
await asyncio.wait(tasks, timeout=None)

日期, 时间, 时区

备注: UNIX 时间戳是指当前的UTC时间相对于 1970-01-01 00:00:00.000(UTC)的秒数(10位整型)或毫秒数(13位), 和时区无关;

将UNIX时间戳转化为 datetime 类型:

  • UTC 时间: dt.datetime.utcfromtimestamp();
  • 当前时区的时间: time.localtime(), dt.datetime.fromtimestamp()

datetime 类型转化为时间戳: dt.datetime(2022,1,1).timestamp()

将字符串转化为UTC时间: dt.datetime(*(time.strptime('yyyy-mm-dd', '%Y-%m-%d')[:6]), tzinfo=dt.timezone.utc)

生成指定时区的指定时间: dt.datetime(2022, 1, 1, tzinfo=dt.timezone(dt.timedelta(hours=8)));

生成UTC时区的指定时间: dt.datetime(2022, 1, 1, tzinfo=dt.timezone.utc)

文件读写

open, close

open(): 打开文件;

close(): 关闭文件;

如果在执行 close()之前发生异常, fw.write(), fw.writelines()要写入的数据可能会有部分没有正确写入. 所以推荐使用with open() as f:, 使用上下文管理器, 哪怕程序发生异常, 也会自动调用close()正确关闭文件.

备注: 使用with open() as f: 会使程序的嵌套关系增加一层 (Python 允许的最大嵌套层数为20层), 所以如果要同时打开很多个文件, 也可以采用open(), close()打开和关闭文件.

buffering

使用 open开发文件时, 有个参数buffering表示打开文件所使用的缓冲策略, 默认是 4096 bytes(4KB), 可使用os.stas()查看.

  1. 读取或写入二进制(bites)时, 默认关闭缓冲, 此时 buffering=0; 也可以通过设置buffering打开缓冲.
  2. buffering=1: 表示只缓冲一行数据;
  3. buffering>1: 表示缓冲的 byte 数, 如默认为 4096, 表示 4096 bytes.

备注: 一般情况下, buffering 越大, 读写效率越高, 但是内存占用也会越大.

write(), writelines()

write() 表示向文件中写入一个字符串类型的数据;

writelines() 表示向文件中写入一个可迭代数据, 其中的每个元素都需要是字符串. 如果每次写入的内容大小设置地合适, 效率会比 write() 稍微高一些. 写入 2 亿行数据时, 如果每次写入 1 千万行, 耗时 48 S, 使用write()逐行写入(其他设置相同)耗时为 53 S. 代价是占用的内容比使用write()逐行写入大很多.

结论: 写入文件时, 建议结合使用write()buffering, 根据机器的内存调节buffering.

difflib 对比2个文件的差异

对比2个文件是否一致时, 可以使用标准库 difflib, 具体用法可参考以下代码:

对比之后, 如果2个文件一样, 对比结果是空列表; 否则会返回差异部分.

import difflib
import sys

with open("node_flow/test/1.txt", "r") as f1, open("node_flow/test/2.txt", "r") as f2:
    text1 = f1.readlines()
    text2 = f2.readlines()

result = list(difflib.context_diff(text1, text2, fromfile="原文件", tofile="新文件"))
if result:
    sys.stdout.writelines(result)
else:
    print("2个文件一致")

制作单个可执行文件

pyinstaller

  1. 进入容器: docker run --rm -it -v $(pwd):/work -w /work -u pilot modicn/pyinstaller:py2.7-centos7 bash.
  2. 安装需要的依赖: pip install -r requirements.txt
  3. 将脚本及其依赖打包成一个可执行文件: pyinstaller --onefile xxx.py.
    1. --onefile: 表示把生成的所有可执行文件都打包成一个文件, 默认会放到一个文件夹内;
    2. 如果需要打包多个脚本, 生成的可执行文件名默认使用第一个脚本的名称. 执行时也会先执行该脚本.

Pyinstaller 的常用参数:

  1. --distpath: dist 目录位置, 默认 ./dist;
  2. --workpath: build 目录位置, 默认 ./build;
  3. --clean: 删除缓存和临时文件;
  4. -D, --onedir: 将生成的可执行文件放入一个文件夹
  5. -F, --onefile: 将生成的可执行文件打包成一个可执行文件.
  6. -n, --name: 生成的可执行文件/文件夹的名称, 默认和脚本名称一致, 如果有多个脚本, 和第一个脚本一致.

常用文件类型:

CSV TSV 的对比:

CSV 采用逗号作为分隔符;

TSV 采用 \t 作为分隔符;

输出 csv 文件时, 最好使用以下格式:

name    age city    fav_colors
John    27  ["London", "Paris", "New York"] {"youth": "red", "teenager": "blue", "now": "green"}
  • 使用 \t, 而非 , 作为分隔符;

  • 使用双引号, 而非单引号, 双引号可以直接使用 json.loads(), 单引号不行.

    2 者在使用 csv 模块读数据没有区别.

    如果使用 for line in f, line.rstrip().split('\t')读取时, 就会有区别:

    • 当一个字段中包含多个元素, 且每个元素之间使用 ,分割时, 会被分成多个元素.
    • 每一行使用\t分割之后, 可以针对性的进行反序列化(json.loads()).

    备注: 在读取 csv/tsc 文件时, 有以下几种方式:

    • for line in f, line.rstrip().split('\t'): 性能最好, 但是如果某个字段内容中包含 \t就会误判;

    • csv.reader(): 即便某个字段内容中包含\t也不会误判, 分割之后需要按索引获取相应的值;

    • csv.DictReader(): 即便某个字段内容中包含\t也不会误判, 分割之后可以按 键(标题) 获取相应的值, 性能最差.

    • 性能测试: 测试数据: 1341 万行, 4.3 G;

    • 测试结果:

      • 直接使用 for row in f row = row.split('\t'), 耗时: 28 S;
      • 使用 csv.reader() 耗时: 58 S;
      • 使用 csv.DictReader()耗时: 72 S;
    • 测试项: 批量写入 VS 一次性写入:

      • 测试数据: 1341 万行, 4.3 G;
      • 一次性写入: 耗时: 117 S;
      • 批量写入(每次 1 万行): 耗时: 189 S;
      • 写入时的原则可能就是, 如果能一次性写入, 就一次性全部写入.
  • 输出方法:

    1. 将字段的各个值序列化, 用 \t连接各个元素.

      mydata = {
          "name": "John",
          "age": 27,
          "city": ["London", "Paris", "New York"],
          "fav_colors": {"youth": "red", "teenager": "blue", "now": "green"},
      }
      with open("./test/mydata.csv", "w") as f:
          w = csv.DictWriter(
              f,
              mydata.keys(),
              delimiter="\t",
          )
          f.write("\t".join([json.dumps(key) for key in mydata.keys()]) + "\n")
          f.write("\t".join([json.dumps(item) for item in mydata.values()]) + "\n")
      

      该方法能保证获得的同一个 dict 的keys 和 values 相对应, 但是无法保证相同结构的其他 dict 的 values 也能和第一个 dict 的 keys 相对应.

    2. 使用 csv 模块;

      mydata = {
          "name": "John",
          "age": 27,
          "city": ["London", "Paris", "New York"],
          "fav_colors": {"youth": "red", "teenager": "blue", "now": "green"},
      }
      with open("./test/mydata.csv", "w") as f:
          w = csv.DictWriter(
              f,
              mydata.keys(),
              delimiter="\t",
              quotechar="'",
          )
          w.writeheader()
          list_cols = ["city", "fav_colors"]
          for key in list_cols:
              mydata[key] = json.dumps(mydata[key])
          w.writerow(mydata)
      

推荐使用这种方式:

  1. 可以保证所有的 value 都可以 表头的 key 顺序保持一致;
  2. 只序列化集合、列表和字典 类型的列.
  3. 通过 escapechar="\" 设置 escapechar;
  4. 应用方式:quote:
    1. QUOTE_NONE: 对任何字段都不使用 quotechar 应用,如 python,而非 "python";
    2. QUOTE_MINIMAL: 一般情况下不使用 quotechar,但是遇到 j"ack 这种字符串本身包含 quotechar 的情况,就会写成 "j""ack"
    3. QUOTE_NONNUMERIC: 对应数值型字段,不使用 quotechar;对应其他类型的字段使用 quotechar,如 23, "jack"
    4. QUOTE_ALL:对任何字段都使用 quotechar;

推荐设置:(和 pandas 的默认设置一致)

csv.DictWriter(
    fw,
    fieldnames=["age", "name", "gendor"],
    restval="Na",
    delimiter="\t",
    extrasaction="ignore",
    quoting=csv.QUOTE_MINIMAL,
    doublequote=True,
)

读取加密的 Excel 文件

使用 msoffcryto-tool:

from msoffcrypto import OfficeFile

with open(origin_excel_path, 'rb')as f, open(decrypted_excel_path, 'wb')as g:
        excel = OfficeFile(f)
        excel.load_key(excel_pwd)
        excel.decrypt(g)

文件[夹]的移动 复制

文件复制 移动 主要使用 shutil.

备注: 使用shutil时, 最好都使用 str 类型表示路径, 因为对于Path()类型的路径, 有些函数支持, 有些不支持.

文件复制

  • 可以使用 shutil库中的copyFile(), copy(), copy2():

  • shutil.copyFile(src, dst): src, dst 必须是 文件路径;

  • shutil.copy(src, dst): src 必须是文件路径, dst 可以是文件路径, 也可以是文件夹;

    • src 必须是文件, 否则报错IsADirectoryError;
    • dst 可以是文件或文件夹; 如果是文件, 就复制或替换; 如果是文件夹, 就将 src 放入该文件夹内;
    • 返回值是复制之后的文件路径;
  • shutil.copy2(): 和shutil.copy()类似, 但是会尽量复制元信息, 若创建时间, 权限等, 复制过程不会报错;

  • shutil.copytree(src, dst, copy_function=copy2, dirs_exist_ok=False): 复制文件夹内所有文件和文件夹, copy: 复制时采用的函数, dirs_exist_ok: 当目前文件夹已经存在时的操作;

    • src, dst 必须是文件夹路径, 并且默认 dst 不存在, 否则报错FileExistsError;
    • 如果 dst 已经存在, 可以设置参数dirs_exist_ok=True. 该文件夹内的现有文件不变.
    • 把 src 下的文件或文件夹全部复制到 dst 下.

文件[夹]移动

  • shutil.move(src, dst):

    • src, dst 既可以是文件, 也可以是文件夹;
    • 如果 dst 是目录, 就将 src 移动到该目录下.
    • 如果 dst 是文件, 并且已经存在, 相当于用新文件覆盖该文件.

文件[夹]删除

  1. shutil.rmtree(): 删除文件夹;
    1. 如果文件夹不存在, 会报错FileNotFoundError.
    2. 可通过设置参数ignore_errors=True, 去掉报错;
  2. Path(file_path).unlink():
    1. 删除文件, 如果文件不存在, 会报错FileNotFoundError;
    2. 可设置参数missing_ok=True, 即便文件不存在也不会报错;

路径相关

主要使用 pathlib.Path().

Path() 是一种数据类型, 可通过Path("path/to/file")将字符串类型的路径转换为Path()类型的路径.

可通过str(Path())Path().__str__()Path()类型的路径转化为 字符串 类型的路径.

  • Path().joinpath("path1", "path2"): 将多级目录串起来, 注意不能以/开头, 否则会被识别为 绝对路径; 可以以 /结尾, 和不以/结尾效果一样.

  • Path().absolute(): 该路径的绝对路径, 但不能解决如../../的嵌套关系;

  • Path().resolve(): 返回绝对路径, 可以解决如../../的嵌套关系;

  • Path().__str__(): 以字符串类型返回当前路径, 等同于 str(Path());

  • Path().parts(): 列表, 每个元素表示路径的组成元素;

  • Path().parents: 列表, [0]是父级路径, [-1]是根目录;

  • Path().parent(): 父级目录;

  • Path().stats(): 返回该路径执行的文件的信息, os.stats_result类型.

  • Path().name(): 文件名(包括后缀)

  • Path().suffix(): 文件后缀

  • Path().stem(): 文件名(不包含后缀)

  • Path().relative_to(A): 相对于A的相对路径;

  • Path().home(): 当前用户的根目录(/home/guang), 如果是 root 用户 /root

  • Path().cwd(): 当前文件所在文件夹路径;

  • Path().exists(): 该路径是否存在;

  • Path(src).rename(dst): 移动或重命名 src 至 dst;

    • src, dst 可以是文件或目录, 但是必须保持一致;
    • 如果 src, dst 都是文件, 会自动覆盖(如果 dst 已经存在);
    • 如果 src, dst 都是目录 , dst 必须不存在或为空, 否则报错 OSError: [Errno 39] Directory not empty.
  • Path().is_dir(): 是否是目录, 如果该路径下的文件不存在, 也会返回 False;

  • Path().is_file(): 是否是文件, 如果该路径下的文件不存在, 也会返回 False;

  • Path().touch(mode, exist_ok): 在指定路径下创建文件.

  • Path().mkdir(): 创建文件夹;

    • exist_ok=True: 如果文件夹已经存在, 也不会报错; 否认会报错 FileExistsError.
    • parents=True: 如果父级目录不存在就创建父级目录; 否则会报错 FileNotFoundError.
  • Path().iterdir(): 对该文件夹下所有的文件[夹]进行遍历;

  • Path().unlink(): 删除文件;

    • 删除文件, 如果文件不存在, 会报错FileNotFoundError;
    • 可设置参数missing_ok=True, 即便文件不存在也不会报错;

执行 shell 命令

很多时候需要在脚本中执行一个或多个 shell 命令, 可以使用subprocess.run().

如果想在 Python 脚本中执行 ps aux, 如果直接使用subprocess.run("ps aux"), 会遇到以下报错:

>>> subprocess.run("ps aux")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python3.8/subprocess.py", line 493, in run
    with Popen(*popenargs, **kwargs) as process:
  File "/usr/lib/python3.8/subprocess.py", line 858, in __init__
    self._execute_child(args, executable, preexec_fn, close_fds,
  File "/usr/lib/python3.8/subprocess.py", line 1704, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'ps aux'

需要将命令拆分成 list, 如 subprocess.run(["ps", "aux"])便可以正常执行.

当不知道该怎么拆分命令时, 一个简单的方法就是把命令的每个参数都分开, 如 调用 bitcoinetl 下载交易数据, 在shell 中执行的完整的命令是:

bitcoinetl export_blocks_and_transactions --start-block 500000 --end-block 500010 -p http://user:pwd@10.1.1.20:8332 --chain bitcoin --blocks-output blocks.csv --transactions-output transactions.csv

可以拆分成:

cmd = [
    "bitcoinetl",
    "export_blocks_and_transactions",
    "--start-block",
    f"{batch_start_block}",
    "--end-block",
    f"{batch_end_block}",
    "-p",
    f"http://{node_user}:{node_pwd}@10.1.1.20:8332",
    "--blocks-output",
    f"{block_path}",
    "--transactions-output",
    f"{transaction_path}",
]

备注:

  1. 如果不想把一个完整命令分开, 可以传入参数shell=True表示该命令在一个 shell 中执行.

自动拆分命令

将字符串类型的 shell 命令拆分成 subprocess.run()可以使用的命令, 可以调用shlex.split

>>> import shlex
>>> shlex.split("ps aux")
['ps', 'aux']
>>> subprocess.run(shlex.split("ps uax"))

shlex.join()shlex.split()的逆变换, 可以将['ps', 'aux']拼接成ps aux.

捕获执行结果

如果需要捕捉该命令的执行结果, 可以传入参数capture_output=True. 返回结果是 subprocess.CompletedProcess

CompletedProcess(
    args='bitcoinetl export_blocks_and_transactions --start-block 500000 --end-block 500001 -p http://omni:omni@10.1.1.20:8332 --chain bitcoin --blocks-output blocks.csv --transactions-output transactions.csv', 
    returncode=0, 
    stdout=b'', 
    stderr=b'2022-06-08 11:18:30,073 - ProgressLogger [INFO] - Started work. Items to process: 2.\n2022-06-08 11:18:56,978 - ProgressLogger [INFO] - 1 items processed. Progress is 50%.\n2022-06-08 11:18:59,760 - ProgressLogger [INFO] - 2 items processed. Progress is 100%.\n2022-06-08 11:18:59,760 - ProgressLogger [INFO] - Finished work. Total items processed: 2. Took 0:00:29.686959.\n2022-06-08 11:18:59,761 - CompositeItemExporter [INFO] - block items exported: 2\n2022-06-08 11:18:59,762 - CompositeItemExporter [INFO] - transaction items exported: 5346\n'
)

有以下属性:

  1. args: list 或 str. 执行的命令;
  2. returncode: 子进程的退出代码, 0: 表示成功执行. 其他代码表示为执行成功;
  3. stdout: 标准输出, 一般 print 的输出都是 stdout. 如果执行 subprocess.run()时带上参数stderr=subprocess.STDOUT, stderr 会和 stdout 合并到 stdout.
  4. stderr: 标准错误输出, 一般 logging 日志属于 stderr.
  5. check_returncode(): 如果 returncode 不是 0, 就会抛出异常 CalledProcessError.

将 stdout, stderr 由二进制改为 utf-8 编码

观察subprocess.CompletedProcess可以看出 stdout, stderr 是二进制数据, 如果想转换成 utf-8 编码, 可以加上参数 text=True. 结果如下:

CompletedProcess(
    args='bitcoinetl export_blocks_and_transactions --start-block 500000 --end-block 500001 -p http://omni:omni@10.1.1.20:8332 --chain bitcoin --blocks-output blocks.csv --transactions-output transactions.csv', 
    returncode=0, 
    stdout='', 
    stderr='2022-06-08 11:42:24,429 - ProgressLogger [INFO] - Started work. Items to process: 2.\n2022-06-08 11:42:37,948 - ProgressLogger [INFO] - 1 items processed. Progress is 50%.\n2022-06-08 11:42:37,955 - ProgressLogger [INFO] - 2 items processed. Progress is 100%.\n2022-06-08 11:42:37,956 - ProgressLogger [INFO] - Finished work. Total items processed: 2. Took 0:00:13.526463.\n2022-06-08 11:42:37,956 - CompositeItemExporter [INFO] - block items exported: 2\n2022-06-08 11:42:37,957 - CompositeItemExporter [INFO] - transaction items exported: 5346\n'
)

stderr, stdout 写入文件

with open('command.out', 'w') as stdout_file:
    process_output = subprocess.run(['date', '+%a'], stdout=stdout_file, stderr=subprocess.PIPE, text=True)

抛出异常

如果执行的命令有错, 默认会将报错记录到 stderr, 不会直接报错. 如果想要脚本直接抛出异常, 可以设置参数check=True.

>>> subprocess.run(['date', '%a'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, check=True)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python3.8/subprocess.py", line 516, in run
    raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['date', '%a']' returned non-zero exit status 1.

会抛出异常 subprocess.CalledProcessError. 这时便可以更好地处理异常:

import subprocess

try:
    process_output = subprocess.run(['date', '%a'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, check=True)
except subprocess.CalledProcessError:
    print("Error detected while executing the command")

执行多条命令

在 shell 中可以使用 PIPE 连接多条命令, 那么在 Python 脚本中该怎么执行多条命令呢? 有 两种方法:

  1. 设置参数 shell=True:

    subprocess.run("ps aux | grep python", shell=True)
    
  2. 使用 input, stdout, stderr:

    import subprocess
    
    ps_cmd = subprocess.run(['ps', '-aux'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    print("wc_cmd object: {}".format(ps_cmd.__dict__))
    
    grep_cmd = subprocess.run(['grep', 'python'], input=pc_cmd.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    print("awk_cmd object: {}".format(grep_cmd.__dict__))
    
    print("The ouput of the command is: {}".format(grep_cmd.stdout.decode()))
    

pandas

连接 SQL 数据库

从 SQL 数据获取数据, 或向其中导入数据, 可配合 sqlalchemy 使用. 如果数据量比较大, 可以通过参数 chunksize 控制每次读取/写的数据量.

from sqlalchemy import create_engine
import pandas as pd

engine = create_engine(
    "mysql+pymysql://root:root@10.1.1.77:3306/chainadmin", echo=False
)
sql_state = "select * from chainadmin.mining_log"
df = pd.read_sql(
    sql_state, con=engine, index_col="id", parse_dates=["date"], chunksize=1000
)
df = pd.concat(df)
print(df)

df.to_sql(
    "mining_log_bak", engine, schema="chainadmin", if_exists="replace", chunksize=1000
)

备注:

  • echo=True: 会打印出与数据库的交互过程, 可在生产环境下设为 False, 调试环境下设为 True.

编码方式

to_csv(encoding='utf-8-sig'): 类似于 UTF8-with-BOM, 即可以使用 Excel 直接打开, 又可以使用 head, tail, less 等命令直接查看.

read_csv(): 使用默认的 utf-8 编码方式也可以正确读取 utf-8-sig 编码的文件.

对 2 列集合或列表进行合并

data = [
    {"class": "first", "students": [1, 2, 3], "rank": [1, 2, 3]},
    {"class": "second", "students": [4, 5, 6], "rank": [4, 5, 6]},
]
df = pd.DataFrame(data)

for student, rank in df[["students", "rank"]].to_numpy():
    student.append(rank)
    print(student, rank)

SettingWithCopyWarning

链式索引(chained indexing)与 loc

取数时

>>> dfmi
>>>
    one          two       
  first second first second
0     a      b     c      d
1     e      f     g      h
2     i      j     k      l
3     m      n     o      p

# 链式索引
dfmi['one']['second']

dfmi.loc[:, ('one', 'second')]

以上两种操作的效果一样, 但是第二种方式 (loc) 效率更高.

赋值时

dfmi.loc[:, ('one', 'second')] = value
# 对应的底层操作是:
dfmi.loc.__setitem__((slice(None), ('one', 'second')), value)

dfmi['one']['second'] = value
# 对应的底层操作是:
dfmi.__getitem__('one').__setitem__('second', value)

结论:

  • 使用第一种方式(loc)可以确保赋值正确.
  • 第二种情况的结果无法预测, 和内存有关.

案例分析

chunk = chunk.loc[chunk["type名称"] == msg_type.upper(), :]
# chunk.query("type名称 == @msg_type.upper()", inplace=True)

chunk.loc[:, "时间"] = pd.to_datetime(
    chunk.loc[:, "时间"], errors="coerce", format=r"%Y-%m-%d %H:%M:%S"
)

以上代码是要从原来的 chunk 中筛选出指定类型的消息, 然后将"时间"列转化成 datetime 类型. 在实际执行时依然会有提示(warning), 说转换"时间"列的语句有SettingWithCopyWarning. 但是从语句上来说, 该语句并没有使用 chained indexing, 那为什么还会有这样的提示呢?

根本原因在于筛序指定类型消息的语句chunk = chunk.loc[chunk["type名称"] == msg_type.upper(), :], 这里虽然将筛选后的结果也命名为 chunk, 但是 pandas 会认为筛选后的 chunk 是原 chunk 的 view, 因此对筛选后的 chunk 的修改(将"时间"列修改为 datetime 类型)并不会反映到原 chunk, 所以会有提醒.

修改建议: 当需要对 DataFrame 做筛选时, 可以使用query(). 筛选出指定消息类型的语句可替换为chunk.query("type名称 == @msg_type.upper()", inplace=True).

备注: 当需要在query()的查询语句中使用@, 更多用法可以参考 pandas 中 query 章节.

删除索引中的重复值

df = df[~df.index.duplicated()]

to_csv

  1. 可以通过参数 float_format 控制 float 类型的数据在输出至文件时的保存格式:float_format='{:.2f}'.format, 可设置为只保留2位小数。

常用操作

pd.read_csv():

  • header: None: 没有标题. 如果有, 使用默认即可.
  • sep: 列之间的分隔符;
  • usecols: 只加载需要的列;
  • dtype: 预定义每列的数据类型, pd.StringDtype()

pd.to_datetime(): 将字符串或Unix时间戳转换成 datetime 类型

  • unit: 如果 Unix 时间戳(如 18998080)转换成可读的字符串, 可以是 "s" "ms"
  • format: 字符串格式 "%Y-%m-%d %H:%M:%S"
  • errors: 'coerce', 如果

pd.to_numeric

  • errors: 'coerce'; 如果转换过程中出现异常, 就用 pd.NaN 代替.

df["源IP"] == srcip: 筛选 源IP 等于 srcip 的项

df['时间'].between(left, right): 筛选在指定范围内的数据

df['时间'].size: 判断满足条件的数据的条数.

df.sort_values("时间").iloc[0]: 对筛选结果在 "时间"列上升序排列, 并取第一个.

# 筛选出满足时间范围的数据
df = df[df['时间'].between(left, right)]

# 筛选出满足其他条件的数据
cond = (df['源IP']==src_ip) & (df['目的IP']==dst_ip) & ....
df = df[cond]

data_size = df['时间'].size
if data_size == 0:
    # 没有符合条件的数据
    pass
elif data_size ==1:
    # 只有一条符合条件的数据
    packet_time = df['时间'].iloc[0]    
else:
    packet_time = df.sort_values("时间")['时间'].iloc[0]

jupyter

同一个 cell 中多个输出

from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"

相关文章

网友评论

      本文标题:Python开发实践技巧

      本文链接:https://www.haomeiwen.com/subject/rseyydtx.html