目录
- 在 CentOS 7 从源代码安装 Python3.10.8
- 在 Ubuntu 20.04 从源代码安装 Python3.10.8
- 虚拟环境及包管理
- 常用的数据结构及其特点:
- 编码
- IO
- 转义字符
- 代码规范
- 异常处理
- 命令行程序
- 配置文件
- 文件读取
- 序列化与反序列化
- 正则表达式
- 日志与错误记录
- 测试
- 处理进度
- 字符串格式化输出
- if 判断
- 多线程变量共享
- 网络请求
- 日期, 时间, 时区
- 文件读写
- difflib 对比2个文件的差异
- 制作单个可执行文件
- 常用文件类型:
- 读取加密的 Excel 文件
- 文件[夹]的移动 复制
- 路径相关
- 执行 shell 命令
- pandas
- jupyter
在 CentOS 7 从源代码安装 Python3.10.8
sudo yum install --downloadonly --downloaddir pkgs --security --bugfix yum-utils wget
sudo yum install yum-utils wget -y
sudo yum-builddep python3 -y
wget https://www.python.org/ftp/python/3.10.8/Python-3.10.8.tgz
tar -xzf Python-3.10.8.tgz
cd Python-3.10.8
sudo ./configure --enable-optimizations --enable-loadable-sqlite-extensions
sudo make
sudo make install
python3.10 -V
--with-openssl=/etc/pki/tls
备注: python3.10的安装路径为:/usr/local/bin/python3.10
--downloadonly --downloaddir=rpms
在 Ubuntu 20.04 从源代码安装 Python3.10.8
echo "deb-src http://mirrors.aliyun.com/ubuntu/ focal main" | sudo tee -a /etc/apt/sources.list
sudo apt-get update
sudo apt-get build-dep python3
sudo apt-get install pkg-config
sudo apt-get install build-essential gdb lcov pkg-config libbz2-dev libffi-dev libgdbm-dev libgdbm-compat-dev liblzma-dev libncurses5-dev libreadline6-dev libsqlite3-dev libssl-dev lzma lzma-dev tk-dev uuid-dev zlib1g-dev wget
wget https://www.python.org/ftp/python/3.10.8/Python-3.10.8.tar.xz
tar -xvf Python-3.10.8.tar.xz
cd Python-3.10.8
sudo ./configure --enable-optimizations --enable-loadable-sqlite-extensions
sudo make install
/usr/local/bin/python3.10 -V
虚拟环境及包管理
# 创建虚拟环境
python3 -m venv /path/to/new/virtual/environment
# 激活环境
source path/to/py_env/bin/activate
# 导出当前环境中已经安装的 package
pip3 freeze > requirements.txt
# 从指定文件安装 package
pip3 install -r requirements.txt
# 安装指定包
pip install xxx
# 卸载所有依赖: (除 pip, setuptools 之外, 以下方法均可)
1. pip uninstall -y -r <(pip freeze)
2. pip freeze | xargs pip uninstall -y
常用的数据结构及其特点:
常用的数据结构有: int, float, decimal, str, tupple, list, set, dict, generater.
常用操作对应的时间复杂度: 时间复杂度.
特点:
- 整型 int:
- 没有大小限制;
- 字符串:
-
str.split("AA|BB|CC")
: 会默认将每个字符视为分隔符, 无法表示 AA 或者 BB 或者 CC, 如果想要达到这样的效果可以使用正则:re.split("AA|BB|CC")
. 其中 | 表示 或者; -
str.strip("城市")
: 会去掉结尾的 "城市" 或"城", "市". 如果只想去掉两侧的"城市"可以使用re.sub()
-
- 列表:
- 获取指定位置的元素;
- 从最后插入数据;
- 从最后删除数据;
- 遍历;
- 集合:
- 各元素互不相同;
- in: 判断是否包含;
- 可以进行数学上的集合操作, 如 交集, 并集, 差集等;
- 字典:
- key 各不相同;
- 根据 key 快递获取对应的 value;
- 生成器:
- 类似于数据库中的 cursor;
- 不能追加, 删除其中的元素;
- 只能遍历一遍;
- 占用内存少;
编码
str 转化成 byte:
形式上转换成 byte 之后, 只是在前面加了一个 b, 如 b"hello Python"
.
-
str.encode(errors="replace")
, 遇到错误时, 使用 ? 替代. -
bytes("a", encoding='utf-8')
.
byte 转化成 str:
bytes.decode(errors="replace")
, 遇到错误时, 使用 �(U+FFFD)代替.
int -> byte:
-
int.to_bytes(length, byteorder, signed=False)
, 如 (1024).to_bytes(2, byteorder='big') -
bytes([1,2,3,4])
Out[27]: b'\x01\x02\x03\x04', 可通过形如 a[0] 的索引单独引用, 不可变序列, 类似于元祖. - bytearray([1,2,3]), bytearray(b'\x01\x02\x03'), 也可以通过索引访问, 类似于列表.
hex -> byte:
- bytes.fromhex().
byte -> hex:
- bytes.hex()
byte -> int:
int.from_bytes(length, byteorder, signed=True)
, 如 int.from_bytes(b'\x00\x10', byteorder='big')
struct.pack(format, v1, v2...): 将整数, 浮点数 或字节串按照指定的字节序及类型转换成 bytes, 可以同时转化多个. struct.pack(">QQ", 1,2)
struct.unpack(format, buffer): 将 strict.pack() 生成结果反向转化为 v1, v2.
bytes, bytearray 常用的method, 如 find(), endswith() 等基本和字符串一样.
IO
主要有以下2种IO
- 文本IO
- 二进制IO;
每种IO都可以通过打开文件, 如 with open(file, 'w') as f
, with opne(file, 'wb') as f
.
将现有内容转化成IO, 如s = "hello python", io.StringIO(s), s = b"hello", io.BytesIO(s)
大部分Unix系统使用 utf-8 编码, 但是 win 系统的默认编码并不是 utf-8.
文件编码格式与读文件性能
如果文件以 ascii 编码, open(file, 'rb'), open(file, "r") 的性能基本一致;
如果文件以其他编码格式编码(如GBK,UTF-8), open(file, 'rb')的性能比 open(file, "r")大约快3部(和编码方式有关)。
seek(), tell() 中使用的都是byte, 而非字符(utf-8编码下,中文占用 3 byte)
转义字符
Python 读取文件时, 会把里面的 \
字符当做原始字符(不当做转义字符), 但是在程序中定义的字符串中的\
会被当做转义字符处理, 可以在字符串前面加上 r
告诉程序该字符串中的\
不是转义字符, 如r"\a\n"
.
Python 读取以下文件时:
\F9\BE\B4\D9inv\00\00
使用 print 打印出来之后是:
\\F9\\BE\\B4\\D9inv\\00\\00
而如果在 Python 程序中定义的字符串:
first = "\abc"
print(first) # 结果是: bc 默认把 \ 视为转义字符
second = r"\abc"
print(second) # 结果是: \\abc 把将 \abc 视为原始字符
代码规范
Python 要求符合 pep8 规范.
格式化工具:
- autopep8;
- black(推荐).
推荐格式:
-
脚本最开始注明该脚本的用途, 输入参数, 输出内容, 主要流程, 及用到的复杂的数据结构;
-
其次是需要导入的 package;
-
函数定义: 1. 注明该函数的用途; 2. 输入参数及其类型; 3: 返回值类型; 如下所示:
def block_by_number( session: Session, block_number: int, host: str = "http://10.1.1.20:8545" ) -> Dict: """从节点服务器下载该区块内的主币交易数据""" ....
异常处理
异常处理的基本原则: 白名单.
只有在白名单内的异常才可以通过, 其他异常一定有记录到日志或输出到终端, 绝对不能用 Exception 处理一切异常.
在个别生成环境下, 如处理中心的数据, 脚本日志没法导出, 同时为了降低记录对处理速度的影响, 可以先不记录日志. 只在调试情况下, 才记录日志.
执行顺序
try:
...
except IndexError:
...
except:
...
finally:
...
代码先从 try
进入:
- 如果没有遇到报错, 则会执行
finally
; - 如果遇到异常, 则会在
except
中寻找对应的异常;- 如果有对应的异常处理, 就会执行相应的代码块, 最后执行
finally
; - 如果没有对应的异常处理, 就会执行最后的
except
(类似于else
), 然后再执行finally
; - 如果没有对应的异常处理, 并且没有最后的
except
, 就会直接执行finally
, 然后报错;
- 如果有对应的异常处理, 就会执行相应的代码块, 最后执行
NOTICE:
- 如果要逐行处理文件, 需要将
try...except...finally
模块放到for line in fin
之内. 这样可以在不中断程序的情况下, 将文件的所有行全部处理.
捕获多个异常
如果对于多个异常的处理方式相同, 就可以把多个异常放入一个 except
中, 如:
try:
...
except (ValueError, ArithmeticError) as e:
...
except :
print("未知异常")
print("程序继续运行")
命令行程序
参数类型:
- 子命令;
- 可选参数: 以 -- 开始, 缩写为 -, 可有可无, 每个参数可以设置默认值, 类型.
click.option()
- 位置参数, 使用位置表示含义, 必须有的参数;
click.argument()
$ docker run --help
Usage: docker run [OPTIONS] IMAGE [COMMAND] [ARG...]
Run a command in a new container
Options:
--add-host list Add a custom host-to-IP mapping (host:ip)
-a, --attach list Attach to STDIN, STDOUT or STDERR
--blkio-weight uint16 Block IO (relative weight), between 10 and 1000, or 0 to disable
(default 0)
推荐的命令行工具:
- click.
click
添加 -h
查看帮助信息 (默认情况是 --help).
@click.group(context_settings=dict(help_option_names=['-h', '--help']))
@click.command(context_settings=dict(help_option_names=['-h', '--help']))
命令组
优点:
- 一个程序作为入口, 调用其他程序, 便于管理;
- 可以在入口程序指定常用参数, 并且可以传递至被调用程序;
- 指定命令的映射关系 (通过 cli.add_command(hello, "hello"))
程序组织结构:
- cli.py:
- commands/:
- first.py
- second.py
程序入口: cli.py
from pathlib import Path
from sys import path
import click
path.insert(0, str(Path(__file__).parent))
from hello import hello
@click.group()
@click.option("--start")
@click.pass_context
def cli(ctx, start):
ctx.ensure_object(dict)
ctx.obj['start'] = start
cli.add_command(hello, "hello")
if __name__=="__main__":
cli()
被调用程序: hello.py
from pathlib import Path
from sys import path
import click
path.insert(0, str(Path(__file__).parent))
@click.command()
@click.option("--end")
@click.pass_context
def hello(ctx, end):
print("hello")
print(end)
print(ctx.obj['start'])
argparse
如果使用argparse
解析命令行参数, 可以这样使用:
parser.add_argument("--debug", action="store_true", help="是否开启调试模式")
表示: 当出现--debug
时, 是 True; 没有出现时, 是 False.
配置文件
configparser
Python 读取配置文件可以通过configparser
这个自带的 package.
; 配置文件为(文件名为`example.ini`):
; 等号两侧可以有空格, 也可以没有, 会自动去掉
; 字符串不需要带双引号或单引号
[DEFAULT]
ServerAliveInterval = 45
Compression = yes
CompressionLevel = 9
ForwardX11 = yes
[bitbucket.org]
User = hg
[topsecret.server.com]
Port = 50022
ForwardX11 = no
>>> import configparser
>>> config = configparser.ConfigParser()
>>> config.sections()
>>> []
>>> config.read('example.ini')
>>> ['example.ini']
>>> config.sections()
>>> ['bitbucket.org', 'topsecret.server.com']
>>> 'bitbucket.org' in config
>>> True
>>> 'bytebong.com' in config
>>> False
# 直接通过字典形式的访问 返回值都是字符格式
>>> config['bitbucket.org']['User']
>>> 'hg'
>>> config['DEFAULT']['Compression']
>>> 'yes'
>>> topsecret = config['topsecret.server.com']
>>> topsecret['ForwardX11']
>>> 'no'
>>> topsecret['Port']
>>> '50022'
# 还可以通过 `.get()`,`.getint(),`.getfloat()`,`.getboolean()`直接获取合适的格式
>>> config.getint('DEFAULT','ServerAliveInterval')
>>> 45
>>> config.getboolean('DEFAULT','Compression')
>>> True
>>> config.getfloat('DEFAULT','CompressionLevel')
>>> 9.0
python-dotenv
优点:
- 可以将配置文件里面的变量转换为环境变量, 通过
os.getenv()
获取.
基本用法
- 加载配置文件.
dotenv.load_dotenv
-
dotenv_path
: 配置文件路径; -
verbose
: 如果没有找到.env
配置文件, 是否发出警告. -
override
: 是否覆盖环境变零.
-
文件读取
使用上下文管理器:
with open(file_path, 'r', encoding='utf-8') as fin:
fin.read() # 读取全部或固定字节
fin.readline() # 读取第一行
fin.readlines() # 读取所有行, 返回列表类型, 每行是一个元素
# 推荐用法: 逐行读取, 返回生成器类型
for line in fin:
pass
NOTICE:
-
打开文件, 最好在主函数, 不要每次读取或写入都打开/关闭一次文件. 连接数据库/网络时也类似。
-
读取文件时, 获取的每行数据以
\n
结尾. -
写入文件时, 每个字符串必须以
\n
结尾, 否则会写到同一行. -
当需要打开的文件比较多时, 如超过20个, 无法再使用该种方法管理文件, 可以使用
fin = open(file_path, 'r', encoding='utf-8') ... fin.close()
因为 Python 规定同一个函数内的嵌套层数不能超多 20 层;
这样的缺点: 脚本被意外终止时, 文件不能正确关闭.
序列化与反序列化
推荐工具:
- json: 标准库;
- ujson: 第三方库: 解析速度更快, 容错比较低. 速度能提升 20%;
正则表达式
常用:
-
re.compile()
: 预编译, 如果需要经常使用, 推荐预编译; -
re.match()
: 从字符串开始进行匹配; -
re.search()
: 从字符串任何位置开始; -
re.findall()
: 找出所有的匹配项, 返回列表类型; -
re.split()
: 具有正则工具的分割; -
re.sub()
: 可以使用替换达到去掉前面/后面某些词的功能;
日志与错误记录
日志记录
如果只记录单个日志文件, 可以进行如下设置.
logging.basicConfig(
filename='./log.log',
filemode='a',
level=logging.DEBUG,
format="%(asctime)s|%(name)s|%(levelname)s|%(message)s",
)
logger = logging.getLogger(__name__)
logger.info()
其中:
- level: 日志等级可以分为: DEBUG, INFO, WARNING, ERROR, CRITICAL.
- format 的参数如下:
%(levelno)s | 打印日志级别的数值 |
---|---|
%(levelname)s | 打印日志级别名称 |
%(pathname)s | 打印当前执行程序的路径,其实就是sys.argv[0] |
%(filename)s | 打印当前执行程序名 |
%(funcName)s | 打印日志的当前函数 |
%(name)s | logger 名称 |
%(lineno)d | 打印日志的当前行号 |
%(asctime)s | 打印日志的时间 |
%(thread)d | 打印线程ID |
%(threadName)s | 打印线程名称 |
%(message)s | 打印日志信息 |
常用的格式可以为:format='%(asctime)s|%(levelname)s|%(name)s|%(funcName)s|%(message)s'
记录报错信息
-
logging.exception(info)
等价于logging.error(info, exc_info=True)
.
参数exc_info=True
可用于控制输出报错时的 traceback.
@click.command()
@click.option("--debug", is_flag=True, default=False)
def main(debug):
click.echo(f"{debug}")
logging.basicConfig(level=logging.DEBUG if debug else logging.INFO)
logging.debug("debug", exc_info=True)
这样很好地切换开发环境和生产环境.
-
记录报错信息可以通过
logging.exception()
函数获得, 并记录到日志文件:import logging try: printf('hello world') except Exception as a: logging.exception('Error occured while printing') print('1') # 以下为输出 # ERROR:root:Error occured while printing # Traceback (most recent call last): File "/home/light/gitrepo/blockchain-etl/tron-etl/test.py", line 10, in <module> printf('hello world') # NameError: name 'printf' is not defined # 1
- 报错信息也可以通过
tracebase
模块, 同时结合logging
模块, 把报错信息记录到日志文件中. 常用的函数有以下几个:
-
traceback.format_exc()
: 返回异常信息的字符串, 可以和logging
模块结合使用, 把报错信息记录记到日志文件; -
traceback.print_exc()
: 直接把报错信息输出到终端; 也可以把异常信息写入到文件traceback.print_exc(file=open('traceback_INFO.txt','w+'))
.
try:
func()
except Exception as e:
logging.debug(traceback.format_exc())
logging.debug(repr(e))
测试
pytest
控制输出
使用 test_xxx 函数中使用 print 输出信息时,默认不会输出这些信息,可通过 -s 设置输出 print 的输出信息。
fixture
def add(x, y):
return x + y
@pytest.mark.parametrize("a, b, expected", [[1, 2, 3], [2, 3, 5]])
def test_add(a, b, expected):
assert add(a, b) == expected
@pytest.fixture
def args():
return [[1, 2, 3], [2, 3, 5]]
def test_args(args):
for a, b, expected in args:
assert add(a, b) == expected
# 类似于通过命令行执行 pytest -s -v test_one.py
if __name__ == '__main__':
pytest.main(["-s","-v","test_one.py"])
fixture 可以请求其他 fixture
# contents of test_append.py
import pytest
# Arrange
@pytest.fixture
def first_entry():
return "a"
# Arrange
@pytest.fixture
def order(first_entry):
return [first_entry]
def test_string(order):
# Act
order.append("b")
# Assert
assert order == ["a", "b"]
同一个 fixture 可以被使用多次
一个 fixture 每次被调用的结果都一样,这就确保了测试不会相互影响。
# contents of test_append.py
import pytest
# Arrange
@pytest.fixture
def first_entry():
return "a"
# Arrange
@pytest.fixture
def order(first_entry):
return [first_entry]
def test_string(order):
# Act
order.append("b")
# Assert
assert order == ["a", "b"]
def test_int(order):
# Act
order.append(2)
# Assert
assert order == ["a", 2]
执行 test_string 时,order 是 ["a"],执行 test_int 时,order 依然是 ["a"],而不是 ["a", "b"],
上面的测试脚本如果手动执行,是:
entry = first_entry()
the_list = order(first_entry=entry)
test_string(order=the_list)
entry = first_entry()
the_list = order(first_entry=entry)
test_int(order=the_list)
每个测试都可以多次使用 同一个 fixture (这时会缓存返回值)
# contents of test_append.py
import pytest
# Arrange
@pytest.fixture
def first_entry():
return "a"
# Arrange
@pytest.fixture
def order():
return []
# Act
@pytest.fixture
def append_first(order, first_entry):
return order.append(first_entry)
def test_string_only(append_first, order, first_entry):
# Assert
assert order == [first_entry]
def test_string(order):
assert order == []
如果一个被请求的 fixture 在测试期间每次被请求时都被执行一次,那么这个测试将会失败,因为append_first和test_string_only都会将order视为一个空列表,但由于order的返回值在第一次被调用后被缓存(以及执行它可能有的任何副作用),test和append_first都引用了同一个对象,测试中看到了append_first对该对象的影响。
在 test_string 中 order 依旧是 [].
说明在一个测试函数中,如果对同一个 fixture 多次使用,那么引用的都是同一个对象(如 test_string_only 中);但在其他测试函数中,调用该 fixture 时,却引用的另一个对象。因为默认情况下,fixture 的有效范围就是当前函数(默认值),如果将 fixture order 的范围(scope)改为 session,那么所有的 测试函数引用的都是同一个对象。如下面的代码,test_string 就无法通过测试,因为在 append_first 函数中,order 已经变为 ["a"].
# contents of test_append.py
import pytest
# Arrange
@pytest.fixture
def first_entry():
return "a"
# Arrange
@pytest.fixture(scope="session")
def order():
return []
# Act
@pytest.fixture
def append_first(order, first_entry):
return order.append(first_entry)
def test_string_only(append_first, order, first_entry):
# Assert
assert order == [first_entry]
def test_string(order):
assert order == []
autouse
如果有一个 fixture,所有的测试函数都需要用到,就可以通过设置 autouse=True
得以实现。
# contents of test_append.py
import pytest
@pytest.fixture
def first_entry():
return "a"
@pytest.fixture
def order(first_entry):
return []
@pytest.fixture(autouse=True)
def append_first(order, first_entry):
return order.append(first_entry)
def test_string_only(append_first, order, first_entry):
# Assert
assert order == [first_entry]
def test_string(order):
assert order == ["a"]
正常情况下,test_string 引用的 order 是 [], 但是通过将 append_first 设置成自动使用,在所有的测试函数执行之前,会先执行 append_first 这时 order 就变成了 ["a"],之后所有测试函数引用 order 时,order的初始值都是 ["a"].
作用域
上面的例子可以看出,在每个测试函数中,order都对应不同的实例,这是因为 fixture 的默认作用域是 "function", 当该函数结束时,该 fixture 对应的实例就会被销毁。fixture 的作用域有以下几种:
- function:默认作用域,当前测试函数结束时被销毁;
- class:在该 class 的最后一个测试函数结束时,该 fixture 实例被销毁;
- module:在当前 module (当前文件)的最后一个测试函数结束时,该 fixture 实例被销毁;
- package: 在当前 package 的最后一个测试函数结束时,该 fixture 实例被销毁;
- session:当前会话,所有的测试函数都使用同一个实例;
在进行网络、数据库操作时,创建连接往往好用大量的资源,这是就可以将作用域根据实际情况适当扩大。
拆除 fixture
在我们运行测试时,我们希望确保它们在自己完成之后进行清理,这样它们就不会扰乱其他测试(也不会留下大量的测试数据来膨胀系统)。pytest中的fixture提供了一个非常有用的拆卸系统,它允许我们为每个fixture定义必要的特定步骤,以便在它们自己之后进行清理。
yield fixtures
使用这些fixture,我们可以运行一些代码并将一个对象传回请求fixture/test,就像使用其他fixture一样。唯一的区别是:
- return被换成了yield。
- 该fixture的拆卸代码位于生成之后。
一旦pytest为fixture确定了一个线性顺序,它将运行每个fixture,直到它返回或产生,然后移动到列表中的下一个fixture来做同样的事情。
测试完成后,pytest将返回fixture列表,但顺序相反,获取每个产生的fixture,并在其中运行yield语句之后的代码。
作为一个简单的例子,考虑这个基本的电子邮件模块:
# content of emaillib.py
class MailAdminClient:
def create_user(self):
return MailUser()
def delete_user(self, user):
# do some cleanup
user.inbox.clear()
pass
class MailUser:
def __init__(self):
self.inbox = []
def send_email(self, email, other):
other.inbox.append(email)
def clear_mailbox(self):
self.inbox.clear()
class Email:
def __init__(self, subject, body):
self.subject = subject
self.body = body
# content of test_emaillib.py
import pytest
from emaillib import Email, MailAdminClient
@pytest.fixture
def mail_admin():
return MailAdminClient()
@pytest.fixture
def sending_user(mail_admin):
user = mail_admin.create_user()
yield user
mail_admin.delete_user(user)
@pytest.fixture
def receiving_user(mail_admin):
user = mail_admin.create_user()
yield user
mail_admin.delete_user(user)
def test_email_received(sending_user, receiving_user):
email = Email(subject="Hey!", body="How's it going?")
sending_user.send_email(email, receiving_user)
assert email in receiving_user.inbox
因为receiving_user是安装期间运行的最后一个fixture,所以它是拆卸期间运行的第一个fixture。
处理yield fixture的错误
如果yield fixture在yield之前引发异常,pytest将不会尝试在该yield fixture的yield语句之后运行拆卸代码。但是,对于已经为该测试成功运行的每个fixture, pytest仍然会像正常情况一样试图将它们删除。
# content of test_emaillib.py
import pytest
from emaillib import Email, MailAdminClient
@pytest.fixture
def setup():
mail_admin = MailAdminClient()
sending_user = mail_admin.create_user()
receiving_user = mail_admin.create_user()
email = Email(subject="Hey!", body="How's it going?")
sending_user.send_email(email, receiving_user)
yield receiving_user, email
receiving_user.clear_mailbox()
mail_admin.delete_user(sending_user)
mail_admin.delete_user(receiving_user)
def test_email_received(setup):
receiving_user, email = setup
assert email in receiving_user.inbox
以上 2 段的测试代码的处理逻辑一致,第 2 个版本更紧凑,但也更难阅读,没有一个非常描述性的fixture名称,而且没有一个fixture可以很容易地重用。
还有一个更严重的问题,即如果设置中的任何一个步骤引发异常,则所有的销毁代码都不会运行。
最安全、最简单的fixture结构要求限制每个fixture只做一个状态更改操作,然后将它们与拆卸代码捆绑在一起,如第 1 个例子所示。
参数化 fixture
import pytest
@pytest.fixture(params=[1, 2], ids=["first", "second"])
def args(request):
return request.param
def test_args(args):
assert args == 1
可以通过以上方式参数化 fixture,会依次执行 params 中的每个参数。
备注:
- 参数需要是 request,可以通过 request.param 返回其参数;
- 可以通过 ids 为 params 中的每个参数对应的实例赋予 ID;
使用 usefixture 在类和模块中自动使用 fixture
前面介绍了通过参数 autouse 设置 fixture 是否自动执行,但是如果该 fixture 只需要在测试某个类或模块时自动执行,在测试其他类和模块时不需要自动执行,就不能使用 autouse 设置自动执行。该种情况可以使用 usefixture 进行设置。
# content of conftest.py
import os
import tempfile
import pytest
@pytest.fixture
def cleandir():
with tempfile.TemporaryDirectory() as newpath:
old_cwd = os.getcwd()
os.chdir(newpath)
yield
os.chdir(old_cwd)
# content of test_setenv.py
import os
import pytest
@pytest.mark.usefixtures("cleandir")
class TestDirectoryInit:
def test_cwd_starts_empty(self):
assert os.listdir(os.getcwd()) == []
with open("myfile", "w") as f:
f.write("hello")
def test_cwd_again_starts_empty(self):
assert os.listdir(os.getcwd()) == []
通过在类定义前面使用 @pytest.mark.usefixtures("cleandir")
,该类下的所有测试函数执行前都会自动执行 cleandir。
如果将 usefixture 移动到 test_cwd_starts_empty 定义前,那么 fixture cleandir 只在 test_cwd_starts_empty 执行前被调用。
可以通过 @pytest.mark.usefixtures("cleandir", "anotherfixture")
指定使用多个 fixture。
重写 fixture
如果有同名 fixture,引用顺序为:
- 当前文件中通过 pytest.mark 指定的参数,如 @pytest.mark.parametrize('username', ['directly-overridden-username'])
- 当前文件中的 fixture;
- 父级文件中的 fixture;
- 非参数化 fixture 覆盖 参数化 fixture;
tests/
__init__.py
conftest.py
# content of tests/conftest.py
import pytest
@pytest.fixture(params=['one', 'two', 'three'])
def parametrized_username(request):
return request.param
@pytest.fixture
def non_parametrized_username(request):
return 'username'
test_something.py
# content of tests/test_something.py
import pytest
@pytest.fixture
def parametrized_username():
return 'overridden-username'
@pytest.fixture(params=['one', 'two', 'three'])
def non_parametrized_username(request):
return request.param
def test_username(parametrized_username):
assert parametrized_username == 'overridden-username'
def test_parametrized_username(non_parametrized_username):
assert non_parametrized_username in ['one', 'two', 'three']
test_something_else.py
# content of tests/test_something_else.py
def test_username(parametrized_username):
assert parametrized_username in ['one', 'two', 'three']
def test_username(non_parametrized_username):
assert non_parametrized_username == 'username'
conftest.py 表示该模块的 fixture,执行 pytest 测试时,会自动从该文件中获取 fixture,并且不会从其他文件中获取,比如测试 test_something_else.py 时,就无法获取 test_something.py 中定义的 fixture parametrized_username,non_parametrized_username,只能获取 conftest.py 中的 fixture parametrized_username,non_parametrized_username。因为可以在以下位置写 fixture:
- 当前文件;
- 当前 module 中的 conftest.py;
- 上一级 module 中的 conftest.py;
处理异常
def myfunc():
raise ValueError("Exception 123 raised")
def test_zero_division():
with pytest.raises(ValueError):
myfunc()
如果执行 statement, 抛出 ZeroDivisionError 的异常, 则视为通过测试; 反之则视为未通过测试.
处理进度
可使用tqdm
输出进度, 前提是明确总工作量(如总函数), 具体用法为:
from tqdm import tqdm
for i in tqdm(range(100), desc="Processing", mininterval=1):
time.sleep(0.05)
同时, tqdm
也可以与 Python 中的其他迭代函数结合使用, 如 enumerate, zip, map
字符串格式化输出
-
当数值很接近于 0 时(如 1E-18), 怎么才能让其表示为
0.0000000000001
, 而不是科学计数法.当数值很接近0时, 用科学计数法表示, 但这只是一种表示方法, 该变量的值并没有变, 可以考虑使用
format
将其转换成字符串, 这样在显示或输出的时候就可以完整显示了.format()
的更多用法可以参考官网."{:18f}".format(Decimal(10)/(10**18)).lstrip()
: 至少保留18位, 当数值的实际长度大于18位, 完整显示; 小于18位时, 左侧使用空格填充."{:.18f}".format()
: 保留18位小数, 如果小数位大于18位, 会截断; 如果小于, 右侧使用0填充;":^/</>".format()
:^, <, >
: 分别表示 居中/左/友对齐;"{:+/-f}".format()
: 将该数值的 +/- 也显示出来;"int:{0:d}, hex:{0:x}, oct:{0:o}, bin:{0:b}".format(42)
: 将 42 分别转换为十六进制, 八进制和二进制.
if 判断
空列表 ([]
), 空字符串 (""
), 空字段 ({}
) , None, 0 放在 if
条件中, 都会视为 FALSE.
多线程变量共享
Python 多线程编程时, 可通过 queue 模块在不同线程之后共享变量, queue 本身就具有线程安全.
queue 主要包括以下 3 种数据结构:
- Queue: first in, first out 模型的队列;
-
LifoQueue
: Last in, first out 模型的队列; -
PriorityQueue
: 带有优先级的队列, 每次插入数据, 都需要指定一个优先级 (priority_number, data), 取数据的时候, 优先返回优先级较低的数.
最佳实践
import threading
import queue
import time
def worker(q: queue.Queue):
"""消费者"""
while True:
item = q.get()
print(f"Working on {item}")
print(f"Finished {item}")
q.task_done()
def producer(q: queue.Queue):
"""生产者"""
# Send thirty task requests to the worker.
for item in range(30):
q.put(item)
time.sleep(0.1)
def main():
# Turn-on the worker thread.
q = queue.Queue()
# 任务完成后会自动退出
threading.Thread(target=producer, args=(q,)).start()
# 将消费者线程设置为 daemon,该线程会和主进程一起退出
threading.Thread(target=worker, args=(q,), daemon=True).start()
# 阻塞主进程,一直到队列里面所有任务都完成。
q.join()
print("All work completed")
if __name__ == "__main__":
main()
网络请求
requests
import json
import requests
url = r'http://192.168.1.20:8332'
headers = {'Content-Type': 'application/json'}
data = {"jsonrpc": "2.0", "id": "id", "method": "omni_gettransaction",
"params": ["6c0b21ed486c15fbf495d77ff75bd34e059b9fc7a964bb5d8305d153c2d4bc56"]}
# 如果需要简单的认证, 可以把账号和密码放入`auth`参数中
# 如果需要传入参数, 可以放入`data`参数, 该参数只接受字符串格式, 如果是data这样的字典, 需要转换成字符串格式;
resp = requests.post(url, auth=(username, password), headers=headers, data=json.dumps(data))
# 如果`data`本身就是字符串格式, 可以直接传入, 如:
data = '{"jsonrpc": "2.0", "id": "id", "method": "omni_gettransaction","params": ["6c0b21ed486c15fbf495d77ff75bd34e059b9fc7a964bb5d8305d153c2d4bc56"]}'
resp = requests.post(url, auth=(username, password), headers=headers, data=data)
和curl
命令的转换:
以上案例对应的curl
命令为:
curl -X POST -H "Content-Type: application/json" --user omni:omni -d '{"jsonrpc":"2.0","id":"id","method":"omni_gettransaction","params":["6c0b21ed486c15fbf495d77ff75bd34e059b9fc7a964bb5d8305d153c2d4bc56"]}' http://192.168.1.20:8332
异步请求
推荐工具: aiohttp.
- 等待多个异步任务完成.
for task in tasks:
await asyncio.wait_for(task, timeout=None)
# 可以使用 以下语句代替
await asyncio.wait(tasks, timeout=None)
日期, 时间, 时区
备注: UNIX 时间戳是指当前的UTC时间相对于 1970-01-01 00:00:00.000(UTC)的秒数(10位整型)或毫秒数(13位), 和时区无关;
将UNIX时间戳转化为 datetime
类型:
- UTC 时间:
dt.datetime.utcfromtimestamp()
; - 当前时区的时间:
time.localtime(), dt.datetime.fromtimestamp()
将 datetime
类型转化为时间戳: dt.datetime(2022,1,1).timestamp()
将字符串转化为UTC时间: dt.datetime(*(time.strptime('yyyy-mm-dd', '%Y-%m-%d')[:6]), tzinfo=dt.timezone.utc)
生成指定时区的指定时间: dt.datetime(2022, 1, 1, tzinfo=dt.timezone(dt.timedelta(hours=8)))
;
生成UTC时区的指定时间: dt.datetime(2022, 1, 1, tzinfo=dt.timezone.utc)
文件读写
open, close
open()
: 打开文件;
close()
: 关闭文件;
如果在执行 close()
之前发生异常, fw.write(), fw.writelines()
要写入的数据可能会有部分没有正确写入. 所以推荐使用with open() as f:
, 使用上下文管理器, 哪怕程序发生异常, 也会自动调用close()
正确关闭文件.
备注: 使用with open() as f
: 会使程序的嵌套关系增加一层 (Python 允许的最大嵌套层数为20层), 所以如果要同时打开很多个文件, 也可以采用open(), close()
打开和关闭文件.
buffering
使用 open
开发文件时, 有个参数buffering
表示打开文件所使用的缓冲策略, 默认是 4096 bytes(4KB), 可使用os.stas()
查看.
- 读取或写入二进制(bites)时, 默认关闭缓冲, 此时
buffering=0
; 也可以通过设置buffering
打开缓冲. -
buffering=1
: 表示只缓冲一行数据; -
buffering>1
: 表示缓冲的 byte 数, 如默认为 4096, 表示 4096 bytes.
备注: 一般情况下, buffering 越大, 读写效率越高, 但是内存占用也会越大.
write(), writelines()
write()
表示向文件中写入一个字符串类型的数据;
writelines()
表示向文件中写入一个可迭代数据, 其中的每个元素都需要是字符串. 如果每次写入的内容大小设置地合适, 效率会比 write()
稍微高一些. 写入 2 亿行数据时, 如果每次写入 1 千万行, 耗时 48 S, 使用write()
逐行写入(其他设置相同)耗时为 53 S. 代价是占用的内容比使用write()
逐行写入大很多.
结论: 写入文件时, 建议结合使用write()
和buffering
, 根据机器的内存调节buffering
.
difflib 对比2个文件的差异
对比2个文件是否一致时, 可以使用标准库 difflib, 具体用法可参考以下代码:
对比之后, 如果2个文件一样, 对比结果是空列表; 否则会返回差异部分.
import difflib
import sys
with open("node_flow/test/1.txt", "r") as f1, open("node_flow/test/2.txt", "r") as f2:
text1 = f1.readlines()
text2 = f2.readlines()
result = list(difflib.context_diff(text1, text2, fromfile="原文件", tofile="新文件"))
if result:
sys.stdout.writelines(result)
else:
print("2个文件一致")
制作单个可执行文件
pyinstaller
- 进入容器:
docker run --rm -it -v $(pwd):/work -w /work -u pilot modicn/pyinstaller:py2.7-centos7 bash
. - 安装需要的依赖:
pip install -r requirements.txt
- 将脚本及其依赖打包成一个可执行文件:
pyinstaller --onefile xxx.py
.-
--onefile
: 表示把生成的所有可执行文件都打包成一个文件, 默认会放到一个文件夹内; - 如果需要打包多个脚本, 生成的可执行文件名默认使用第一个脚本的名称. 执行时也会先执行该脚本.
-
Pyinstaller 的常用参数:
-
--distpath
: dist 目录位置, 默认 ./dist; -
--workpath
: build 目录位置, 默认 ./build; -
--clean
: 删除缓存和临时文件; -
-D, --onedir
: 将生成的可执行文件放入一个文件夹 -
-F, --onefile
: 将生成的可执行文件打包成一个可执行文件. -
-n, --name
: 生成的可执行文件/文件夹的名称, 默认和脚本名称一致, 如果有多个脚本, 和第一个脚本一致.
常用文件类型:
CSV TSV 的对比:
CSV 采用逗号作为分隔符;
TSV 采用 \t
作为分隔符;
输出 csv 文件时, 最好使用以下格式:
name age city fav_colors
John 27 ["London", "Paris", "New York"] {"youth": "red", "teenager": "blue", "now": "green"}
-
使用
\t
, 而非,
作为分隔符; -
使用双引号, 而非单引号, 双引号可以直接使用
json.loads()
, 单引号不行.2 者在使用 csv 模块读数据没有区别.
如果使用
for line in f, line.rstrip().split('\t')
读取时, 就会有区别:- 当一个字段中包含多个元素, 且每个元素之间使用
,
分割时, 会被分成多个元素. - 每一行使用
\t
分割之后, 可以针对性的进行反序列化(json.loads()
).
备注: 在读取 csv/tsc 文件时, 有以下几种方式:
-
for line in f, line.rstrip().split('\t')
: 性能最好, 但是如果某个字段内容中包含\t
就会误判; -
csv.reader()
: 即便某个字段内容中包含\t
也不会误判, 分割之后需要按索引获取相应的值; -
csv.DictReader()
: 即便某个字段内容中包含\t
也不会误判, 分割之后可以按 键(标题) 获取相应的值, 性能最差. -
性能测试: 测试数据: 1341 万行, 4.3 G;
-
测试结果:
- 直接使用
for row in f row = row.split('\t')
, 耗时: 28 S; - 使用
csv.reader()
耗时: 58 S; - 使用
csv.DictReader()
耗时: 72 S;
- 直接使用
-
测试项: 批量写入 VS 一次性写入:
- 测试数据: 1341 万行, 4.3 G;
- 一次性写入: 耗时: 117 S;
- 批量写入(每次 1 万行): 耗时: 189 S;
- 写入时的原则可能就是, 如果能一次性写入, 就一次性全部写入.
- 当一个字段中包含多个元素, 且每个元素之间使用
-
输出方法:
-
将字段的各个值序列化, 用
\t
连接各个元素.mydata = { "name": "John", "age": 27, "city": ["London", "Paris", "New York"], "fav_colors": {"youth": "red", "teenager": "blue", "now": "green"}, } with open("./test/mydata.csv", "w") as f: w = csv.DictWriter( f, mydata.keys(), delimiter="\t", ) f.write("\t".join([json.dumps(key) for key in mydata.keys()]) + "\n") f.write("\t".join([json.dumps(item) for item in mydata.values()]) + "\n")
该方法能保证获得的同一个 dict 的keys 和 values 相对应, 但是无法保证相同结构的其他 dict 的 values 也能和第一个 dict 的 keys 相对应.
-
使用 csv 模块;
mydata = { "name": "John", "age": 27, "city": ["London", "Paris", "New York"], "fav_colors": {"youth": "red", "teenager": "blue", "now": "green"}, } with open("./test/mydata.csv", "w") as f: w = csv.DictWriter( f, mydata.keys(), delimiter="\t", quotechar="'", ) w.writeheader() list_cols = ["city", "fav_colors"] for key in list_cols: mydata[key] = json.dumps(mydata[key]) w.writerow(mydata)
-
推荐使用这种方式:
- 可以保证所有的 value 都可以 表头的 key 顺序保持一致;
- 只序列化集合、列表和字典 类型的列.
- 通过 escapechar="\" 设置 escapechar;
- 应用方式:quote:
- QUOTE_NONE: 对任何字段都不使用 quotechar 应用,如 python,而非 "python";
- QUOTE_MINIMAL: 一般情况下不使用 quotechar,但是遇到 j"ack 这种字符串本身包含 quotechar 的情况,就会写成 "j""ack"
- QUOTE_NONNUMERIC: 对应数值型字段,不使用 quotechar;对应其他类型的字段使用 quotechar,如 23, "jack"
- QUOTE_ALL:对任何字段都使用 quotechar;
推荐设置:(和 pandas 的默认设置一致)
csv.DictWriter(
fw,
fieldnames=["age", "name", "gendor"],
restval="Na",
delimiter="\t",
extrasaction="ignore",
quoting=csv.QUOTE_MINIMAL,
doublequote=True,
)
读取加密的 Excel 文件
使用 msoffcryto-tool
:
from msoffcrypto import OfficeFile
with open(origin_excel_path, 'rb')as f, open(decrypted_excel_path, 'wb')as g:
excel = OfficeFile(f)
excel.load_key(excel_pwd)
excel.decrypt(g)
文件[夹]的移动 复制
文件复制 移动 主要使用 shutil
.
备注: 使用shutil
时, 最好都使用 str 类型表示路径, 因为对于Path()
类型的路径, 有些函数支持, 有些不支持.
文件复制
-
可以使用
shutil
库中的copyFile(), copy(), copy2()
: -
shutil.copyFile(src, dst)
: src, dst 必须是 文件路径; -
shutil.copy(src, dst)
: src 必须是文件路径, dst 可以是文件路径, 也可以是文件夹;- src 必须是文件, 否则报错
IsADirectoryError
; - dst 可以是文件或文件夹; 如果是文件, 就复制或替换; 如果是文件夹, 就将 src 放入该文件夹内;
- 返回值是复制之后的文件路径;
- src 必须是文件, 否则报错
-
shutil.copy2()
: 和shutil.copy()
类似, 但是会尽量复制元信息, 若创建时间, 权限等, 复制过程不会报错; -
shutil.copytree(src, dst, copy_function=copy2, dirs_exist_ok=False)
: 复制文件夹内所有文件和文件夹, copy: 复制时采用的函数, dirs_exist_ok: 当目前文件夹已经存在时的操作;- src, dst 必须是文件夹路径, 并且默认 dst 不存在, 否则报错
FileExistsError
; - 如果 dst 已经存在, 可以设置参数
dirs_exist_ok=True
. 该文件夹内的现有文件不变. - 把 src 下的文件或文件夹全部复制到 dst 下.
- src, dst 必须是文件夹路径, 并且默认 dst 不存在, 否则报错
文件[夹]移动
-
shutil.move(src, dst)
:- src, dst 既可以是文件, 也可以是文件夹;
- 如果 dst 是目录, 就将 src 移动到该目录下.
- 如果 dst 是文件, 并且已经存在, 相当于用新文件覆盖该文件.
文件[夹]删除
-
shutil.rmtree()
: 删除文件夹;- 如果文件夹不存在, 会报错
FileNotFoundError
. - 可通过设置参数
ignore_errors=True
, 去掉报错;
- 如果文件夹不存在, 会报错
-
Path(file_path).unlink()
:- 删除文件, 如果文件不存在, 会报错
FileNotFoundError
; - 可设置参数
missing_ok=True
, 即便文件不存在也不会报错;
- 删除文件, 如果文件不存在, 会报错
路径相关
主要使用 pathlib.Path()
.
Path()
是一种数据类型, 可通过Path("path/to/file")
将字符串类型的路径转换为Path()
类型的路径.
可通过str(Path())
或Path().__str__()
将 Path()
类型的路径转化为 字符串 类型的路径.
-
Path().joinpath("path1", "path2")
: 将多级目录串起来, 注意不能以/
开头, 否则会被识别为 绝对路径; 可以以/
结尾, 和不以/
结尾效果一样. -
Path().absolute()
: 该路径的绝对路径, 但不能解决如../../
的嵌套关系; -
Path().resolve()
: 返回绝对路径, 可以解决如../../
的嵌套关系; -
Path().__str__()
: 以字符串类型返回当前路径, 等同于str(Path())
; -
Path().parts()
: 列表, 每个元素表示路径的组成元素; -
Path().parents
: 列表, [0]是父级路径, [-1]是根目录; -
Path().parent()
: 父级目录; -
Path().stats()
: 返回该路径执行的文件的信息,os.stats_result
类型. -
Path().name()
: 文件名(包括后缀) -
Path().suffix()
: 文件后缀 -
Path().stem()
: 文件名(不包含后缀) -
Path().relative_to(A)
: 相对于A的相对路径; -
Path().home()
: 当前用户的根目录(/home/guang
), 如果是 root 用户/root
-
Path().cwd()
: 当前文件所在文件夹路径; -
Path().exists()
: 该路径是否存在; -
Path(src).rename(dst)
: 移动或重命名 src 至 dst;- src, dst 可以是文件或目录, 但是必须保持一致;
- 如果 src, dst 都是文件, 会自动覆盖(如果 dst 已经存在);
- 如果 src, dst 都是目录 , dst 必须不存在或为空, 否则报错
OSError: [Errno 39] Directory not empty
.
-
Path().is_dir()
: 是否是目录, 如果该路径下的文件不存在, 也会返回 False; -
Path().is_file()
: 是否是文件, 如果该路径下的文件不存在, 也会返回 False; -
Path().touch(mode, exist_ok)
: 在指定路径下创建文件. -
Path().mkdir()
: 创建文件夹;-
exist_ok=True
: 如果文件夹已经存在, 也不会报错; 否认会报错FileExistsError
. -
parents=True
: 如果父级目录不存在就创建父级目录; 否则会报错FileNotFoundError
.
-
-
Path().iterdir()
: 对该文件夹下所有的文件[夹]进行遍历; -
Path().unlink()
: 删除文件;- 删除文件, 如果文件不存在, 会报错
FileNotFoundError
; - 可设置参数
missing_ok=True
, 即便文件不存在也不会报错;
- 删除文件, 如果文件不存在, 会报错
执行 shell 命令
很多时候需要在脚本中执行一个或多个 shell 命令, 可以使用subprocess.run()
.
如果想在 Python 脚本中执行 ps aux
, 如果直接使用subprocess.run("ps aux")
, 会遇到以下报错:
>>> subprocess.run("ps aux")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/python3.8/subprocess.py", line 493, in run
with Popen(*popenargs, **kwargs) as process:
File "/usr/lib/python3.8/subprocess.py", line 858, in __init__
self._execute_child(args, executable, preexec_fn, close_fds,
File "/usr/lib/python3.8/subprocess.py", line 1704, in _execute_child
raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'ps aux'
需要将命令拆分成 list, 如 subprocess.run(["ps", "aux"])
便可以正常执行.
当不知道该怎么拆分命令时, 一个简单的方法就是把命令的每个参数都分开, 如 调用 bitcoinetl
下载交易数据, 在shell
中执行的完整的命令是:
bitcoinetl export_blocks_and_transactions --start-block 500000 --end-block 500010 -p http://user:pwd@10.1.1.20:8332 --chain bitcoin --blocks-output blocks.csv --transactions-output transactions.csv
可以拆分成:
cmd = [
"bitcoinetl",
"export_blocks_and_transactions",
"--start-block",
f"{batch_start_block}",
"--end-block",
f"{batch_end_block}",
"-p",
f"http://{node_user}:{node_pwd}@10.1.1.20:8332",
"--blocks-output",
f"{block_path}",
"--transactions-output",
f"{transaction_path}",
]
备注:
- 如果不想把一个完整命令分开, 可以传入参数
shell=True
表示该命令在一个 shell 中执行.
自动拆分命令
将字符串类型的 shell 命令拆分成 subprocess.run()
可以使用的命令, 可以调用shlex.split
>>> import shlex
>>> shlex.split("ps aux")
['ps', 'aux']
>>> subprocess.run(shlex.split("ps uax"))
shlex.join()
是shlex.split()
的逆变换, 可以将['ps', 'aux']
拼接成ps aux
.
捕获执行结果
如果需要捕捉该命令的执行结果, 可以传入参数capture_output=True
. 返回结果是 subprocess.CompletedProcess
CompletedProcess(
args='bitcoinetl export_blocks_and_transactions --start-block 500000 --end-block 500001 -p http://omni:omni@10.1.1.20:8332 --chain bitcoin --blocks-output blocks.csv --transactions-output transactions.csv',
returncode=0,
stdout=b'',
stderr=b'2022-06-08 11:18:30,073 - ProgressLogger [INFO] - Started work. Items to process: 2.\n2022-06-08 11:18:56,978 - ProgressLogger [INFO] - 1 items processed. Progress is 50%.\n2022-06-08 11:18:59,760 - ProgressLogger [INFO] - 2 items processed. Progress is 100%.\n2022-06-08 11:18:59,760 - ProgressLogger [INFO] - Finished work. Total items processed: 2. Took 0:00:29.686959.\n2022-06-08 11:18:59,761 - CompositeItemExporter [INFO] - block items exported: 2\n2022-06-08 11:18:59,762 - CompositeItemExporter [INFO] - transaction items exported: 5346\n'
)
有以下属性:
- args: list 或 str. 执行的命令;
- returncode: 子进程的退出代码, 0: 表示成功执行. 其他代码表示为执行成功;
- stdout: 标准输出, 一般 print 的输出都是 stdout. 如果执行
subprocess.run()
时带上参数stderr=subprocess.STDOUT
, stderr 会和 stdout 合并到 stdout. - stderr: 标准错误输出, 一般 logging 日志属于 stderr.
- check_returncode(): 如果 returncode 不是 0, 就会抛出异常 CalledProcessError.
将 stdout, stderr 由二进制改为 utf-8 编码
观察subprocess.CompletedProcess
可以看出 stdout, stderr
是二进制数据, 如果想转换成 utf-8 编码, 可以加上参数 text=True
. 结果如下:
CompletedProcess(
args='bitcoinetl export_blocks_and_transactions --start-block 500000 --end-block 500001 -p http://omni:omni@10.1.1.20:8332 --chain bitcoin --blocks-output blocks.csv --transactions-output transactions.csv',
returncode=0,
stdout='',
stderr='2022-06-08 11:42:24,429 - ProgressLogger [INFO] - Started work. Items to process: 2.\n2022-06-08 11:42:37,948 - ProgressLogger [INFO] - 1 items processed. Progress is 50%.\n2022-06-08 11:42:37,955 - ProgressLogger [INFO] - 2 items processed. Progress is 100%.\n2022-06-08 11:42:37,956 - ProgressLogger [INFO] - Finished work. Total items processed: 2. Took 0:00:13.526463.\n2022-06-08 11:42:37,956 - CompositeItemExporter [INFO] - block items exported: 2\n2022-06-08 11:42:37,957 - CompositeItemExporter [INFO] - transaction items exported: 5346\n'
)
将 stderr, stdout
写入文件
with open('command.out', 'w') as stdout_file:
process_output = subprocess.run(['date', '+%a'], stdout=stdout_file, stderr=subprocess.PIPE, text=True)
抛出异常
如果执行的命令有错, 默认会将报错记录到 stderr
, 不会直接报错. 如果想要脚本直接抛出异常, 可以设置参数check=True
.
>>> subprocess.run(['date', '%a'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, check=True)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/python3.8/subprocess.py", line 516, in run
raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['date', '%a']' returned non-zero exit status 1.
会抛出异常 subprocess.CalledProcessError
. 这时便可以更好地处理异常:
import subprocess
try:
process_output = subprocess.run(['date', '%a'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, check=True)
except subprocess.CalledProcessError:
print("Error detected while executing the command")
执行多条命令
在 shell 中可以使用 PIPE 连接多条命令, 那么在 Python 脚本中该怎么执行多条命令呢? 有 两种方法:
-
设置参数
shell=True
:subprocess.run("ps aux | grep python", shell=True)
-
使用
input, stdout, stderr
:import subprocess ps_cmd = subprocess.run(['ps', '-aux'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) print("wc_cmd object: {}".format(ps_cmd.__dict__)) grep_cmd = subprocess.run(['grep', 'python'], input=pc_cmd.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE) print("awk_cmd object: {}".format(grep_cmd.__dict__)) print("The ouput of the command is: {}".format(grep_cmd.stdout.decode()))
pandas
连接 SQL 数据库
从 SQL 数据获取数据, 或向其中导入数据, 可配合 sqlalchemy 使用. 如果数据量比较大, 可以通过参数 chunksize 控制每次读取/写的数据量.
from sqlalchemy import create_engine
import pandas as pd
engine = create_engine(
"mysql+pymysql://root:root@10.1.1.77:3306/chainadmin", echo=False
)
sql_state = "select * from chainadmin.mining_log"
df = pd.read_sql(
sql_state, con=engine, index_col="id", parse_dates=["date"], chunksize=1000
)
df = pd.concat(df)
print(df)
df.to_sql(
"mining_log_bak", engine, schema="chainadmin", if_exists="replace", chunksize=1000
)
备注:
- echo=True: 会打印出与数据库的交互过程, 可在生产环境下设为 False, 调试环境下设为 True.
编码方式
to_csv(encoding='utf-8-sig'): 类似于 UTF8-with-BOM, 即可以使用 Excel 直接打开, 又可以使用 head, tail, less 等命令直接查看.
read_csv(): 使用默认的 utf-8 编码方式也可以正确读取 utf-8-sig 编码的文件.
对 2 列集合或列表进行合并
data = [
{"class": "first", "students": [1, 2, 3], "rank": [1, 2, 3]},
{"class": "second", "students": [4, 5, 6], "rank": [4, 5, 6]},
]
df = pd.DataFrame(data)
for student, rank in df[["students", "rank"]].to_numpy():
student.append(rank)
print(student, rank)
SettingWithCopyWarning
链式索引(chained indexing)与 loc
取数时
>>> dfmi
>>>
one two
first second first second
0 a b c d
1 e f g h
2 i j k l
3 m n o p
# 链式索引
dfmi['one']['second']
dfmi.loc[:, ('one', 'second')]
以上两种操作的效果一样, 但是第二种方式 (loc) 效率更高.
赋值时
dfmi.loc[:, ('one', 'second')] = value
# 对应的底层操作是:
dfmi.loc.__setitem__((slice(None), ('one', 'second')), value)
dfmi['one']['second'] = value
# 对应的底层操作是:
dfmi.__getitem__('one').__setitem__('second', value)
结论:
- 使用第一种方式(loc)可以确保赋值正确.
- 第二种情况的结果无法预测, 和内存有关.
案例分析
chunk = chunk.loc[chunk["type名称"] == msg_type.upper(), :]
# chunk.query("type名称 == @msg_type.upper()", inplace=True)
chunk.loc[:, "时间"] = pd.to_datetime(
chunk.loc[:, "时间"], errors="coerce", format=r"%Y-%m-%d %H:%M:%S"
)
以上代码是要从原来的 chunk 中筛选出指定类型的消息, 然后将"时间"列转化成 datetime 类型. 在实际执行时依然会有提示(warning), 说转换"时间"列的语句有SettingWithCopyWarning
. 但是从语句上来说, 该语句并没有使用 chained indexing, 那为什么还会有这样的提示呢?
根本原因在于筛序指定类型消息的语句chunk = chunk.loc[chunk["type名称"] == msg_type.upper(), :]
, 这里虽然将筛选后的结果也命名为 chunk, 但是 pandas 会认为筛选后的 chunk 是原 chunk 的 view, 因此对筛选后的 chunk 的修改(将"时间"列修改为 datetime 类型)并不会反映到原 chunk, 所以会有提醒.
修改建议: 当需要对 DataFrame 做筛选时, 可以使用query()
. 筛选出指定消息类型的语句可替换为chunk.query("type名称 == @msg_type.upper()", inplace=True)
.
备注: 当需要在query()
的查询语句中使用@, 更多用法可以参考 pandas 中 query 章节.
删除索引中的重复值
df = df[~df.index.duplicated()]
to_csv
- 可以通过参数 float_format 控制 float 类型的数据在输出至文件时的保存格式:float_format='{:.2f}'.format, 可设置为只保留2位小数。
常用操作
pd.read_csv():
- header: None: 没有标题. 如果有, 使用默认即可.
- sep: 列之间的分隔符;
- usecols: 只加载需要的列;
- dtype: 预定义每列的数据类型, pd.StringDtype()
pd.to_datetime(): 将字符串或Unix时间戳转换成 datetime 类型
- unit: 如果 Unix 时间戳(如 18998080)转换成可读的字符串, 可以是 "s" "ms"
- format: 字符串格式 "%Y-%m-%d %H:%M:%S"
- errors: 'coerce', 如果
pd.to_numeric
- errors: 'coerce'; 如果转换过程中出现异常, 就用 pd.NaN 代替.
df["源IP"] == srcip: 筛选 源IP 等于 srcip 的项
df['时间'].between(left, right): 筛选在指定范围内的数据
df['时间'].size: 判断满足条件的数据的条数.
df.sort_values("时间").iloc[0]: 对筛选结果在 "时间"列上升序排列, 并取第一个.
# 筛选出满足时间范围的数据
df = df[df['时间'].between(left, right)]
# 筛选出满足其他条件的数据
cond = (df['源IP']==src_ip) & (df['目的IP']==dst_ip) & ....
df = df[cond]
data_size = df['时间'].size
if data_size == 0:
# 没有符合条件的数据
pass
elif data_size ==1:
# 只有一条符合条件的数据
packet_time = df['时间'].iloc[0]
else:
packet_time = df.sort_values("时间")['时间'].iloc[0]
jupyter
同一个 cell 中多个输出
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
网友评论