如何在&Python中写出更好的科学代码?
任何科学工作的一大部分都在于编写代码。无论是典型的机器学习建模、分析,还是对数据项目的贡献,都有很大一部分时间用于新功能的原型设计。由于是探索性的,预计程序的几个部分将被替换或修改,往往超出最初的计划。
与 "消费者软件 "不同,变化往往不是由客户的要求引起的,而是由过程中的结果引起的。由于这个原因,如果实验证据表明有不同的路径,以不需要 "完全重建 "的方式来设计它是非常有价值的。
编写科学代码时有两个(额外的)具体挑战。第一个是与数学错误有关。计算中的错误往往很难追踪,特别是当代码在语义上是正确的时候。没有bug被发现。没有异常被提出。一切看起来都很好,但(数字)结果是错误的。特别是,在实现概率模型时,结果有时可能看起来很好,这取决于一些初始条件或随机因素。
第二个问题来自前面描述的事实。总是会有实验部分,而且它们一直在变化。因此,关键是要设计好每一个部件,使大部分的工作能够留下来,并作为下一阶段发展的坚硬基础。
在这篇文章中,我们重点讨论可以使代码更健壮、更易理解、总体上更容易处理的模式。你将看到简单的改进如何导致更多的可重用代码和更少的错误。
玩具任务
为了演示,我们的任务是计算一个随机过程的结果的期望值。在数学上,它可以归结为这个公式:E[X]=∑i∈XXip(Xi)
其中p(X)是一个概率质量函数(PMC),或者对于一个连续变量:E[X]=∫Xxp(x)dx
其中p(x)是一个概率密度函数(PDF)
正如你可能想知道的那样,这里的挑战是,你希望它能与任何分布一起工作:连续或离散。或者,如果不是似是而非,你至少希望认识到问题的性质,这样你就可以顺利地调整代码。
糟糕的代码--起点
让我们从一个不那么好的代码开始。假设你想掷六面骰子。在每个结果的可能性都相同的情况下,它归结为计算取样结果的平均值。
import random
import statistics
def die(sides = 6):
return random.randint(1, sides)
def expected_value(n_samples):
samples = [die() for _ in range(n_samples)]
return statistics.mean(samples)
这段代码有什么问题?有几个问题...
首先,die函数每次返回一个样本。它需要被调用N次才能得到N个样本,这很慢。
其次,expected_value函数强烈依赖于产生样本的die函数。这是显而易见的,一旦你考虑使用不同的模具,例如12面的模具。在这种设计下,你需要 "打开 "expected_value来接受一个额外的参数边,只是把它传递给die来扩展它,用于更普遍的情况。
虽然这样做可行,但它使 expected_value 的界面变得不直观,但该解决方案仍然依赖于使用 die 作为样本的来源,因此很难考虑其他分布。
补救措施...
让我们考虑一下你有哪些选择。
想法#1
你可以使用一个外部变量来存储样本。
def expected_value(samples):
return statistics.mean(samples)
samples = [die(12) for _ in range(n_samples)]
ev = expected_value(samples)
这很明显,但你只是把问题转移到了其他地方......现在,样本变成了一个新的实体,存储数据(甚至是非常大的数据),而且它是相当匿名的。expected_value函数期望收到它,但准备它是你的事。
想法#2
另一种方法是把模子放在里面,把它作为一个对象传给 expected_value。
from functools import partial
twelve_sided_die = partial(die, 12)
def expected_values(die, n_samples):
samples = [die() for _ in range(n_samples)]
return statistics.mean(samples)
ev = expected_values(twelve_sided_die, 10000)
这个想法使用了一个准备好的Die "版本",并使expected_value使用它作为样本的来源。然而,一个新的问题出现了。expected_value只与die兼容。它不能用任何其他的 "样本发生器 "来计算结果,或者至少不能保证它能正确计算。
思路三
第三个想法是在更抽象的层次上认识问题,设计更好的接口。
在抽象的层面上,我们有两个概念。
存在一个概率分布,我们可以从中取样。(它可以是骰子、硬币、正态分布--并不重要)。
存在一个消耗和转换数据的数学运算。(例如,计算平均值、方差等)。
让我们更多地关注我们如何建立正确的抽象,以及如何控制它们的相互兼容性。
分布(数据)
在数学上,概率分布可以是函数--连续或离散的,有限或无限的,我们可以从中抽取样本。 这个函数的 "处方 "可以根据问题的不同而有很大不同。我们可以使用一个 "现有的 "公式,如高斯或泊松分布,但它也可以是一个 "自定义 "的创造,例如从直方图中导出。
考虑到这一点,让我们建立以下的抽象概念。
from abc import ABC, abstractmethod
class Distribution(ABC):
@abstractmethod
def sample(self):
...
实施
由于@abstractmethod的存在,我们的分布强制要求我们对任何从这个抽象派生出来的子类实现样本方法。对于我们的模具,这可以是。
import numpy as np
class Die(Distribution):
def __init__(self, sides = 6):
self.sides = sides
def sample(self, n_samples):
return np.random.randint(1, self.sides + 1, size=n_samples)
在这里,样本的传递是通过调用专门用于掷出公平的骰子的方法sample来进行的。Die(12).sample(10000)
此外,由于numpy的存在,我们可以通过用np.ndarray代替list comprehension来快速创建大量的数据。事实上,事情还可以进一步改进。
目前,调用Die()会返回类似这样的结果 <main.Die at 0x7f43f4448400>,这并不具有参考价值。因为Die() == Die() 的结果是假的,对于Python来说,它们是同一个类的两个不同对象实例。为了解决这个问题,我们需要再实现两个方法。
def __repr__(self):
return f"{self.__class__.__name__}(sides={self.sides})"
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.sides == other.sides
return False
repr 方法使对象的渲染变得漂亮,而 eq 只在骰子是 "等边 "的情况下返回 True。
数据类
每次实现这四个方法都会很乏味。此外,目前 Die 的实现并不能防止改变一个对象,即使是意外的,通过将属性分配给一个现有的对象,如 die.sides = 20。
因此,我们可以使用python的数据类重新定义Die类。
from dataclasses import dataclass
@dataclass(frozen=True)
class Die(Distribution):
sides: int = 6
def sample(self, n):
return np.random.randint(1, self.sides + 1, size=n)
这个例子的行为与之前的例子相同。此外,设置frozen=True,给die.sidi分配一个新的值会引发一个异常。如果我们想要一个新的die,我们应该创建一个新的对象。
现在,我们的expected_value函数可能会将die作为一个分布对象,并通过调用其sample方法来进行计算。
def expected_value(distribution, n):
return distribution.sample(n=n).mean()
类型
上面的例子很简洁。我们清楚地知道 expected_value 的作用,而且很容易测试。然而,一个n边的骰子并不是我们可能想要计算的唯一分布的期望值。例如,对于抛硬币来说,结果不是数字的(除非我们建立一个惯例并坚持下去)。自然,提供一些提示,说明哪些接口可以一起使用,以及如何使用,是有意义的。
对于像python这样的动态类型的语言,你并不被迫坚持变量的类型。然而,通过各种IDE和工具,如mypy,类型化可以帮助你发现潜在的故障点,使代码更加透明。
让我们使用类型化来重新定义我们的类,并创建两个更具体的分布。
from typing import Generic, Sequence, TypeVar
D = TypeVar("D")
class Distribution(ABC, Generic[D]):
@abstractmethod
def sample(self, n: int) -> Sequence[D]:
...
@dataclass(frozen=True)
class Die(Distribution[int]):
sides: int = 6
def sample(self, n: int) -> Sequence[int]:
return np.random.randint(1, self.sides + 1, size=n)
@dataclass(frozen=True)
class Coin(Distribution[str]):
outcomes: tuple = ("H", "T")
fairness: float = 0.5
def sample(self, n: int) -> Sequence[str]:
p = (self.fairness, 1.0 - self.fairness)
return np.random.choice(self.outcomes, size=n, p=p)
@dataclass(frozen=True)
class Gaussian(Distribution[float]):
mu: float = 0.0
sigma: float = 1.0
def sample(self, n: int) -> Sequence[float]:
np.random.normal(loc=self.mu, scale=self.sigma, size=n)
这里发生了几件事。由于D = TypeVar("D"),我们现在可以定义一个新的变量类型,通过它来参数化每个分布的类型。
你可以注意到,Distribution不仅继承了抽象基类,也继承了Generic[D],这使它也变成了一个新的(参数化的)类型。现在,它成为一种身份,构成了一种新的数据类型。
每个版本的sample被期望返回一个特定类型的序列,这对每个单独的分布的上下文是有意义的。这样,我们就有了一个统一的接口,而且是一个参数化的接口。我们可以用它来确保 expected_value 的正确行为。
def expected_value(distribution: Distribution[float | int], n: int) -> float:
return distribution.sample(n=n).mean()
虽然将die = Die()或gaussian = Gaussian()传递给expected_value可以工作(因为int和float都是数字),但将coin = Coin()传递给mypy会被标记出来,说明
> error: Argument 1 to "expected_value" has incompatible type "Coin";
> expected "Distribution[Union[float, int]]"
这可以在我们运行代码之前给我们一个早期警告。
提高纯净度
正如你所看到的,使用类型化设计接口有助于正式确定意图并及早发现错误。你甚至可以通过利用numpy的dtype将其提升到一个新的水平。这样一来,你不仅可以确保不同的元素相互匹配,而且还可以更清楚地了解数据的内存占用情况。比如说:
import numpy.typing as npt
class Distribution(ABC, Generic[D]):
@abstractmethod
def sample(self, n: int) -> np.NDArray[np.generic]:
...
class Die(Distribution[int]):
sides: int = 6
def sample(self, n: int) -> npt.NDArray[np.uint8]:
return np.random.randint(
1, self.sides + 1, size=n, dtype=np.uint8
)
这样,如果die.sample方法返回的数字与严格意义上的无符号8位整数不同,你甚至会被告知。问题是,你是否想走得那么深?这是要考虑的问题。
计算
让我们再来设计一下计算部分。到目前为止,我们已经有了为处理数字分布而准备的expected_value。自然,我们可以计算Die和Gaussian的期望值,但不能计算Coin。在目前的设计中是不行的。
要解决这个问题,有两个选择。
我们可以通过映射创建一个代理分布,例如("H","T")->(0,1),或者
我们在expected_value中加入一个映射,给出一个可能的 "适配器"。
第一种方法创造了一个人工机构,其想法依赖于惯例。它并不能阻止任何人用("H","T")->(1,0)来定义另一个代理,从而导致难以检测的错误。
相反,我们可以修改 expected_value 并给它一个使用自定义适配器的可能性。
def expected_value(
d: Distribution[D],
f: Callable[[D], Any] = lambda x: x,
n: int = 1000
) -> float:
return np.mean(np.apply_along_axis(f, axis=0, arr=d.sample(n)))
expected_value的第二个参数是一个可调用的参数(一个函数),我们可以选择用它来翻译例如Coin()分布的结果。然而,在默认情况下,它将保持结果不变。
die = Die(12)
expected_values(die, n=10000)
gaussian = Gaussian(mu=4.0, sigma=2.0)
expected_value(gaussian, n=100000)
# but
coin = Coin(fairness=0.75)
expected_value(coin, f=lambda x: np.where(x == "H", 1.0, 0.0))
在这里,我们不仅避免了创建一个代理分布,而且还设法避免将 expected_value 与任何特定的数据转换方式联系起来。expected_value函数只做它承诺要做的事情:计算预期值。如果需要任何改编或转换,它将在外部提供。
请注意,在这里我们也有一个选择:我们可以定义一个命名的函数(例如,coin_conversion),以备我们计划重复使用它,或者在单独的定义不增加价值时坚持使用lambda
组合和迭代
抽象化的数学计算被证明是有用的,特别是在设计迭代算法时。很多时候,除了主要的计算之外,我们必须关注一些附带的结果,比如收敛性、早期停止(最大迭代)、度量等等。
让我们以e常数为例。在数学上,我们可以通过以下的极限来获得它的值:e=lim
越高,近似值就越精确。
我们怎样才能充分地解决它呢?
不是最好的方法...
让我们从一个穷举的方法开始,用一个循环。
def approximate_e(
initial_guess: float,
max_iter: int = 10,
epsilon: float = 0.01
) -> float:
e_value = initial_guess
for n in range(1, max_iter + 1):
new_e_value = (1.0 + 1.0 / n) ** n
if abs(new_e_value - e_value) < epsilon:
return new_e_value
e_value = new_e_value
return new_e_value
同样,这种方法有什么问题呢?
首先,这个函数做了三件事,而不是只有一件。第8行是计算的绝对精华。然而,由于有了提前停止和收敛条件,我们留下了大量的代码开销,这些开销与实际计算紧密相连。虽然这两个条件看起来是比较通用的东西,但如果我们选择替换近似的主题(例如替换为平方根),我们就必须复制粘贴这些额外的代码,并确保它不会破坏新的算法。
其次,关于这两个条件的参数化,我们唯一的选择是要么硬编码max_iter和epsilon的值,要么允许用户将它们作为参数提供。这破坏了界面,使测试更加困难。
最后,该算法 "急切地 "生成数据。它不是专注于数学和 "在被要求时 "提供数值,而是向你抛出数据。对于大量的数据,这可能导致内存问题。
计算的抽象化
现在,让我们通过分离不同部分之间的责任来一次解决所有三个问题。我们有三样东西。
我们期望返回正确数字的东西(实际计算)。
如果迭代的次数足够多,就停止这个过程(早期停止)。
如果数值不再有明显的改善(收敛),则需要停止该过程。
from typing import Iterator
import itertools
def approximate_e() -> Iterator[float]:
n = 1
while True:
yield (1.0 + 1.0 / n) ** n
n += 1
def early_stop(
values: Iterator[float], max_iter: int = 100
) -> Iterator[float]:
return itertools.islice(values, max_iter)
def convergence(
values: Iterator[float], epsilon: float = 0.01
) -> Iterator[float]:
for a, b in itertools.pairwise(values):
yield a
if (a - b) < epsilon:
break
这种设计使用了迭代器,它实现了 "懒惰 "加载。数据项只有在被请求时才会被一个一个地返回(因此有关键字yield)。由于这一点,我们(几乎)不用担心内存的问题。
此外,这三个函数中的每一个都可以单独存在。它们有自己特定的接口,可以单独进行单元测试。
最后(也是最重要的),最终的结果可以通过将它们串联起来获得
values = approximate_e()
values = early_stop(values, max_iter=50)
values = convergence(values, epsilon=0.001)
for value in values:
print("e becomes:", value)
较老的python,对于3.10以上的Python,可以编写itertools.pairwise。
def pairwise(values: Iterator[D]) -> Iterator[Tuple[D, D]]:
a = next(values, None)
if a is None:
return
for b in values:
yield a, b
a = b
结论
科学编程带来了额外的挑战。它是一门令人兴奋的学科,但陷阱的数量似乎随着问题的复杂性至少呈四倍增长。
在这篇文章中,我们讨论了在每个科学编程任务中似乎都会反复出现的两个主要部分:数据和计算。希望通过正确的方式对它们进行抽象,你不仅可以提高代码的效率,减少错误,还可以使编码成为更好的体验。
网友评论