Python Cookbook

作者: plutoese | 来源:发表于2017-06-08 09:38 被阅读13次

    1. 数据结构和算法

    1.1 分解记录

    record = ('Dave', 'dave@example.com', '773-555-1212', '847-555-1212')
    name, email, *phone_numbers = user_record
    

    1.2 找到最大或最小的N个元素

    import heapq
    
    nums = [1, 8, 2, 23, 7, -4, 18, 23, 42, 37, 2]
    print(heapq.nlargest(3, nums)) # Prints [42, 37, 23]
    print(heapq.nsmallest(3, nums)) # Prints [-4, 1, 2]
    

    这里可以接受一个参数key。

    portfolio = [
    {'name': 'IBM', 'shares': 100, 'price': 91.1},
    {'name': 'AAPL', 'shares': 50, 'price': 543.22},
    {'name': 'FB', 'shares': 200, 'price': 21.09},
    {'name': 'HPQ', 'shares': 35, 'price': 31.75},
    {'name': 'YHOO', 'shares': 45, 'price': 16.35},
    {'name': 'ACME', 'shares': 75, 'price': 115.65}
    ]
    
    cheap = heapq.nsmallest(3, portfolio, key=lambda s: s['price'])
    expensive = heapq.nlargest(3, portfolio, key=lambda s: s['price'])
    

    1.3 实现优先级队列

    这里的重点是heapq模块的使用。

    import heapq
    
    class PriorityQueue:
      def __init__(self):
        self._queue = []
        self._index = 0
    
      def push(self, item, priority):
        heapq.heappush(self._queue, (-priority, self._index, item))
        self._index += 1
    
      def pop(self):
        return heapq.heappop(self._queue)[-1]
    

    使用这个类

    >>> class Item:
    ...   def __init__(self, name):
    ...     self.name = name
    ...   def __repr__(self):
    ...     return 'Item({!r})'.format(self.name)
    ...
    >>> q = PriorityQueue()
    >>> q.push(Item('foo'), 1)
    >>> q.push(Item('bar'), 5)
    >>> q.push(Item('spam'), 4)
    >>> q.push(Item('grok'), 1)
    >>> q.pop()
    Item('bar')
    >>> q.pop()
    Item('spam')
    >>> q.pop()
    Item('foo')
    >>> q.pop()
    Item('grok')
    

    1.4 在字典中将键映射到多个值上

    from collections import defaultdict
    
    d = defaultdict(list)
    d['a'].append(1)
    d['a'].append(2)
    d['b'].append(4)
    
    d = defaultdict(set)
    d['a'].add(1)
    d['a'].add(2)
    d['b'].add(4)
    

    1.5 与字典有关的计算问题

    prices = {
    'ACME': 45.23,
    'AAPL': 612.78,
    'IBM': 205.55,
    'HPQ': 37.20,
    'FB': 10.75
    }
    
    min_price = min(zip(prices.values(), prices.keys()))
    # min_price is (10.75, 'FB')
    
    max_price = max(zip(prices.values(), prices.keys()))
    # max_price is (612.78, 'AAPL')
    
    prices_sorted = sorted(zip(prices.values(), prices.keys()))
    # prices_sorted is [(10.75, 'FB'), (37.2, 'HPQ'),
    # (45.23, 'ACME'), (205.55, 'IBM'),
    # (612.78, 'AAPL')]
    

    1.6 在两个字典中寻找相同点

    a = {
    'x' : 1,
    'y' : 2,
    'z' : 3
    }
    b = {
    'w' : 10,
    'x' : 11,
    'y' : 2
    }
    
    # Find keys in common
    a.keys() & b.keys() # { 'x', 'y' }
    # Find keys in a that are not in b
    a.keys() - b.keys() # { 'z' }
    # Find (key,value) pairs in common
    a.items() & b.items() # { ('y', 2) }
    

    1.7 从序列中移除重复项且保持元素间顺序不变

    def dedupe(items, key=None):
        seen = set()
        for item in items:
            val = item if key is None else key(item)
            if val not in seen:
                yield item
                seen.add(val)
    
    >>> a = [ {'x':1, 'y':2}, {'x':1, 'y':3}, {'x':1, 'y':2}, {'x':2, 'y':4}]
    >>> list(dedupe(a, key=lambda d: (d['x'],d['y'])))
    [{'x': 1, 'y': 2}, {'x': 1, 'y': 3}, {'x': 2, 'y': 4}]
    >>> list(dedupe(a, key=lambda d: d['x']))
    [{'x': 1, 'y': 2}, {'x': 2, 'y': 4}]
    

    1.8 找出序列中出现次数最多的元素

    words = [
    'look', 'into', 'my', 'eyes', 'look', 'into', 'my', 'eyes',
    'the', 'eyes', 'the', 'eyes', 'the', 'eyes', 'not', 'around', 'the',
    'eyes', "don't", 'look', 'around', 'the', 'eyes', 'look', 'into',
    'my', 'eyes', "you're", 'under'
    ]
    from collections import Counter
    word_counts = Counter(words)
    top_three = word_counts.most_common(3)
    print(top_three)
    # Outputs [('eyes', 8), ('the', 5), ('look', 4)]
    

    1.9 通过公共键对字典列表排序

    rows = [
    {'fname': 'Brian', 'lname': 'Jones', 'uid': 1003},
    {'fname': 'David', 'lname': 'Beazley', 'uid': 1002},
    {'fname': 'John', 'lname': 'Cleese', 'uid': 1001},
    {'fname': 'Big', 'lname': 'Jones', 'uid': 1004}
    ]
    
    from operator import itemgetter
    rows_by_fname = sorted(rows, key=itemgetter('fname'))
    rows_by_uid = sorted(rows, key=itemgetter('uid'))
    

    1.10 排序不支持原生比较的对象

    内置的sorted() 函数有一个关键字参数key ,可以传入一个callable 对象给它,这个callable 对象对每个传入的对象返回一个值,这个值会被sorted 用来排序这些对象。

    class User:
        def __init__(self, user_id):
            self.user_id = user_id
    
        def __repr__(self):
            return 'User({})'.format(self.user_id)
    
    def sort_notcompare():
        users = [User(23), User(3), User(99)]
        print(users)
        print(sorted(users, key=lambda u: u.user_id))
    
        from operator import attrgetter
        print(sorted(users, key=attrgetter('user_id')))
    
        # print(sorted(users, key=attrgetter('last_name', 'first_name')))
    
        print(min(users, key=attrgetter('user_id')))
        print(max(users, key=attrgetter('user_id')))
    
    sort_notcompare()
    #[User(23), User(3), User(99)]
    #[User(3), User(23), User(99)]
    #[User(3), User(23), User(99)]
    #User(3)
    #User(99)
    

    1.11 根据字段将记录分组

    rows = [
    {'address': '5412 N CLARK', 'date': '07/01/2012'},
    {'address': '5148 N CLARK', 'date': '07/04/2012'},
    {'address': '5800 E 58TH', 'date': '07/02/2012'},
    {'address': '2122 N CLARK', 'date': '07/03/2012'},
    {'address': '5645 N RAVENSWOOD', 'date': '07/02/2012'},
    {'address': '1060 W ADDISON', 'date': '07/02/2012'},
    {'address': '4801 N BROADWAY', 'date': '07/01/2012'},
    {'address': '1039 W GRANVILLE', 'date': '07/04/2012'},
    ]
    
    from operator import itemgetter
    from itertools import groupby
    
    # Sort by the desired field first
    rows.sort(key=itemgetter('date'))
    
    # Iterate in groups
    for date, items in groupby(rows, key=itemgetter('date')):
        print(date)
        for i in items:
            print(' ', i)
    

    1.12 将多个映射合并为单个映射

    a = {'x': 1, 'z': 3 }
    b = {'y': 2, 'z': 4 }
    
    from collections import ChainMap
    c = ChainMap(a,b)
    print(c['x']) # Outputs 1 (from a)
    print(c['y']) # Outputs 2 (from b)
    print(c['z']) # Outputs 3 (from a)
    

    2. 字符串和文本

    2.1 针对任意多的分隔符拆分字符串

    当你使用re.split() 函数时候,需要特别注意的是正则表达式中是否包含一个括号捕获分组。如果使用了捕获分组,那么被匹配的文本也将出现在结果列表中。

    如果你不想保留分割字符串到结果列表中去,但仍然需要使用到括号来分组正则表达式的话,确保你的分组是非捕获分组,形如(?:...) 。

    >>> line = 'asdf fjdk; afed, fjek,asdf, foo'
    >>> import re
    >>> re.split(r'[;,\s]\s*', line)
    ['asdf', 'fjdk', 'afed', 'fjek', 'asdf', 'foo']
    
    >>> fields = re.split(r'(;|,|\s)\s*', line)
    >>> fields
    ['asdf', ' ', 'fjdk', ';', 'afed', ',', 'fjek', ',', 'asdf', ',', 'foo']
    >>> re.split(r'(?:,|;|\s)\s*', line)
    ['asdf', 'fjdk', 'afed', 'fjek', 'asdf', 'foo']
    

    2.2 文本模式的匹配和查找

    >>> datepat = re.compile(r'\d+/\d+/\d+')
    >>> if datepat.match(text1):
              print('yes')
            else:
              print('no')
    
    >>> text = 'Today is 11/27/2012. PyCon starts 3/13/2013.'
    >>> datepat.findall(text)
    ['11/27/2012', '3/13/2013']
    

    3. 迭代器和生成器

    3.1 用生成器创建新的迭代模式

    def frange(start, stop, increment):
        x = start
        while x < stop:
            yield x
            x += increment
    
    >>> list(frange(0, 1, 0.125))
    [0, 0.125, 0.25, 0.375, 0.5, 0.625, 0.75, 0.875]
    

    一个函数中需要有一个yield 语句即可将其转换为一个生成器。

    3.2 实现迭代协议

    我们用Node类来表示树结构。

    class Node:
        def __init__(self, value):
            self._value = value
            self._children = []
    
        def __repr__(self):
            return 'Node({!r})'.format(self._value)
    
        def add_child(self, node):
            self._children.append(node)
    
        def __iter__(self):
            return iter(self._children)
    
        def depth_first(self):
            yield self
            for c in self:
                yield from c.depth_first()
    
    if __name__ == '__main__':
        root = Node(0)
        child1 = Node(1)
        child2 = Node(2)
        root.add_child(child1)
        root.add_child(child2)
        child1.add_child(Node(3))
        child1.add_child(Node(4))
        child2.add_child(Node(5))
        
        for ch in root.depth_first():
            print(ch)
        # Outputs Node(0), Node(1), Node(3), Node(4), Node(2), Node(5)
    

    3.3 跳过可迭代对象中的前一部分元素

    itertools 模块中有一些函数可以完成这个任务。首先介绍的是itertools.dropwhile() 函数。使用时,你给它传递一个函数对象和一个可迭代对象。它会返回一个迭代器对象,丢弃原有序列中直到函数返回True 之前的所有元素,然后返回后面所有元素。

    from itertools import dropwhile
    
    with open('/etc/passwd') as f:
        #跳过开始部分的注释行
        for line in dropwhile(lambda line: line.startswith('#'), f):
            print(line, end='')
    

    如果你已经明确知道了要跳过的元素的个数的话,那么可以使用itertools.islice() 来代替。

    from itertools import islice
    
    items = ['a', 'b', 'c', 1, 4, 10, 15]
    for x in islice(items, 3, None):
        print(x)
    

    3.4 迭代所有可能的组合和排列

    >>> items = ['a', 'b', 'c']
    >>> from itertools import permutations
    >>> for p in permutations(items):
    ... print(p)
    ...
    ('a', 'b', 'c')
    ('a', 'c', 'b')
    ('b', 'a', 'c')
    ('b', 'c', 'a')
    ('c', 'a', 'b')
    ('c', 'b', 'a')
    

    3.5 以索引-值对的形式迭代序列

    >>> my_list = ['a', 'b', 'c']
    >>> for idx, val in enumerate(my_list):
    ... print(idx, val)
    ...
    0 a
    1 b
    2 c
    

    3.6 在不同的容器中进行迭代

    itertools.chain() 方法可以用来简化这个任务。它接受一个可迭代对象列表作为输入,并返回一个迭代器

    >>> from itertools import chain
    >>> a = [1, 2, 3, 4]
    >>> b = ['x', 'y', 'z']
    >>> for x in chain(a, b):
    ... print(x)
    ...
    1
    2
    3
    4
    x
    y
    z
    

    3.7 创建处理数据的管道

    假如我们有海量的数据需要处理,但是没法完全将数据加载到内存中。生成器函数是一个实现管道机制的好办法。

    import os
    import fnmatch
    import gzip
    import bz2
    import re
    
    def gen_find(filepat, top):
        '''
        Find all filenames in a directory tree that match a shell wildcard pattern
        '''
        for path, dirlist, filelist in os.walk(top):
            for name in fnmatch.filter(filelist, filepat):
                yield os.path.join(path,name)
    
    def gen_opener(filenames):
        '''
        Open a sequence of filenames one at a time producing a file object.
        The file is closed immediately when proceeding to the next iteration.
        '''
        for filename in filenames:
            if filename.endswith('.gz'):
                f = gzip.open(filename, 'rt')
            elif filename.endswith('.bz2'):
                f = bz2.open(filename, 'rt')
            else:
                f = open(filename, 'rt')
            yield f
            f.close()
    
    def gen_concatenate(iterators):
        '''
        Chain a sequence of iterators together into a single sequence.
        '''
        for it in iterators:
            yield from it
    
    def gen_grep(pattern, lines):
        '''
        Look for a regex pattern in a sequence of lines
        '''
        pat = re.compile(pattern)
        for line in lines:
            if pat.search(line):
                yield line
    

    找出所有包含关键字python的日志行。

    lognames = gen_find('access-log*', 'www')
    files = gen_opener(lognames)
    lines = gen_concatenate(files)
    pylines = gen_grep('(?i)python', lines)
    for line in pylines:
    print(line)
    

    找出传送的字节数并统计出总字节量。

    lognames = gen_find('access-log*', 'www')
    files = gen_opener(lognames)
    lines = gen_concatenate(files)
    pylines = gen_grep('(?i)python', lines)
    bytecolumn = (line.rsplit(None,1)[1] for line in pylines)
    bytes = (int(x) for x in bytecolumn if x != '-')
    print('Total', sum(bytes))
    

    3.8 扁平化处理嵌套型的序列

    一个包含yield from 语句的递归生成器来轻松解决这个问题。

    from collections import Iterable
    
    def flatten(items, ignore_types=(str, bytes)):
        for x in items:
            if isinstance(x, Iterable) and not isinstance(x, ignore_types):
            yield from flatten(x)
        else:
            yield x
    
    items = [1, 2, [3, 4, [5, 6], 7], 8]
    # Produces 1 2 3 4 5 6 7 8
    for x in flatten(items):
        print(x)
    

    3.9 合并多个有序序列,再对整个有序序列进行迭代

    >>> import heapq
    >>> a = [1, 4, 7, 10]
    >>> b = [2, 5, 6, 11]
    >>> for c in heapq.merge(a, b):
    ... print(c)
    ...
    1
    2
    4
    5
    6
    7
    10
    11
    

    4. 函数

    4.1 在匿名函数中绑定变量的值

    你用lambda 定义了一个匿名函数,并想在定义时捕获到某些变量的值。

    x = 10
    a = lambda y: x + y
    x = 20
    b = lambda y: x + y
    print(a(10))
    #30
    print(b(10))
    #30
    

    这其中的奥妙在于lambda 表达式中的x 是一个自由变量,在运行时绑定值,而不是定义时就绑定,这跟函数的默认值参数定义是不同的。因此,在调用这个lambda 表达式的时候,x 的值是执行时的值。

    如果你想让某个匿名函数在定义时就捕获到值,可以将那个参数值定义成默认参数即可。

    x = 10
    a = lambda y, x=x: x + y
    x = 20
    b = lambda y, x=x: x + y
    print(a(10))
    #20
    print(b(10))
    #30
    
    funcs = [lambda x: x+n for n in range(5)]
    print([f(0) for f in funcs])
    #[4, 4, 4, 4, 4]
    funcs = [lambda x, n=n: x+n for n in range(5)]
    print([f(0) for f in funcs])
    #[0, 1, 2, 3, 4]
    

    4.2 在回调函数中携带额外的状态

    def apply_async(func, args, *, callback):
        # Compute the result
        result = func(*args)
    
        # Invoke the callback with the result
        callback(result)
    
    class ResultHandler:
        def __init__(self):
            self.sequence = 0
    
        def handler(self, result):
            self.sequence += 1
            print('[{}] Got: {}'.format(self.sequence, result))
    
    >>> r = ResultHandler()
    >>> apply_async(add, (2, 3), callback=r.handler)
    [1] Got: 5
    >>> apply_async(add, ('hello', 'world'), callback=r.handler)
    [2] Got: helloworld
    

    这里也可以使用闭包

    def make_handler():
        sequence = 0
        def handler(result):
            nonlocal sequence
            sequence += 1
            print('[{}] Got: {}'.format(sequence, result))
        return handler
    
    >>> handler = make_handler()
    >>> apply_async(add, (2, 3), callback=handler)
    [1] Got: 5
    >>> apply_async(add, ('hello', 'world'), callback=handler)
    [2] Got: helloworld
    

    有时候可以利用协程来完成同样的任务

    def make_handler():
        sequence = 0
        while True:
            result = yield
            sequence += 1
            print('[{}] Got: {}'.format(sequence, result))
    
    >>> handler = make_handler()
    >>> next(handler) # Advance to the yield
    >>> apply_async(add, (2, 3), callback=handler.send)
    [1] Got: 5
    >>> apply_async(add, ('hello', 'world'), callback=handler.send)
    [2] Got: helloworld
    

    5. 类与对象

    5.1 修改实例的字符串表示

    可以通过定义 __str__()和__repr__()方法来实现。

    class Pair:
        def __init__(self, x, y):
            self.x = x
            self.y = y
    
        def __repr__(self):
            return 'Pair({0.x!r}, {0.y!r})'.format(self)
            
        def __str__(self):
            return '({0.x!s}, {0.y!s})'.format(self)
    

    5.2 当创建大量实例时如何节省内存

    class Date:
        __slots__ = ['year', 'month', 'day']
        def __init__(self, year, month, day):
            self.year = year
            self.month = month
            self.day = day
    

    5.3 创建可管理的属性

    class Person:
        def __init__(self, first_name):
            self.first_name = first_name
        
        # Getter function
        @property
        def first_name(self):
            return self._first_name
    
        # Setter function
        @first_name.setter
        def first_name(self, value):
            if not isinstance(value, str):
                raise TypeError('Expected a string')
            self._first_name = value
    
        # Deleter function (optional)
        @first_name.deleter
        def first_name(self):
            raise AttributeError("Can't delete attribute")
    
    >>> a = Person('Guido')
    >>> a.first_name # Calls the getter
    'Guido'
    >>> a.first_name = 42 # Calls the setter
    Traceback (most recent call last):
    File "<stdin>", line 1, in <module>
    File "prop.py", line 14, in first_name
    raise TypeError('Expected a string')
    TypeError: Expected a string
    >>> del a.first_name
    Traceback (most recent call last):
    File "<stdin>", line 1, in <module>
    AttributeError: can't delete attribute
    

    5.4 调用父类中的方法

    class A:
        def __init__(self):
            self.x = 0
    
    class B(A):
        def __init__(self):
            super().__init__()
            self.y = 1
    

    5.5 创建一种新形式的类属性或实例属性

    # Descriptor attribute for an integer type-checked attribute
    class Integer:
        def __init__(self, name):
            self.name = name
        
        def __get__(self, instance, cls):
            if instance is None:
                return self
            else:
                return instance.__dict__[self.name]
    
        def __set__(self, instance, value):
            if not isinstance(value, int):
                raise TypeError('Expected an int')
            instance.__dict__[self.name] = value
    
        def __delete__(self, instance):
            del instance.__dict__[self.name]
    
    class Point:
        x = Integer('x')
        y = Integer('y')
        
        def __init__(self, x, y):
            self.x = x
            self.y = y
    
    >>> p = Point(2, 3)
    >>> p.x # Calls Point.x.__get__(p,Point)
    2
    >>> p.y = 5 # Calls Point.y.__set__(p, 5)
    >>> p.x = 2.3 # Calls Point.x.__set__(p, 2.3)
    Traceback (most recent call last):
    File "<stdin>", line 1, in <module>
    File "descrip.py", line 12, in __set__
    raise TypeError('Expected an int')
    TypeError: Expected an int
    

    5.6 让属性具有惰性求值的能力

    我们想将一个只读的属性定义为property属性方法,只有在访问它时才参与计算。但是,一旦访问了该属性,我们希望把计算出的值缓存起来,不要每次访问它时都重新计算。

    class lazyproperty:
        def __init__(self, func):
            self.func = func
        
        def __get__(self, instance, cls):
            if instance is None:
                return self
            else:
                value = self.func(instance)
                setattr(instance, self.func.__name__, value)
                return value
    
    import math
    
    class Circle:
        def __init__(self, radius):
            self.radius = radius
        
        @lazyproperty
        def area(self):
            print('Computing area')
            return math.pi * self.radius ** 2
        
        @lazyproperty
        def perimeter(self):
            print('Computing perimeter')
            return 2 * math.pi * self.radius
    
    >>> c = Circle(4.0)
    >>> c.radius
    4.0
    >>> c.area
    Computing area
    50.26548245743669
    >>> c.area
    50.26548245743669
    >>> c.perimeter
    Computing perimeter
    25.132741228718345
    >>> c.perimeter
    25.132741228718345
    

    5.7 简化数据结构的初始化过程

    我们可以将初始化数据结构的步骤归纳到一个单独的__init__()函数中,并将其定义在一个公共的基类中。

    class Structure:
        _fields= []
        
        def __init__(self, *args, **kwargs):
            if len(args) > len(self._fields):
                raise TypeError('Expected {} arguments'.format(len(self._fields)))
        
            # Set all of the positional arguments
            for name, value in zip(self._fields, args):
                setattr(self, name, value)
            
            # Set the remaining keyword arguments
            for name in self._fields[len(args):]:
                setattr(self, name, kwargs.pop(name))
            
            # Check for any remaining unknown arguments
            if kwargs:
                raise TypeError('Invalid argument(s): {}'.format(','.join(kwargs)))
    
    # Example use
    if __name__ == '__main__':
        class Stock(Structure):
            _fields = ['name', 'shares', 'price']
    
        s1 = Stock('ACME', 50, 91.1)
        s2 = Stock('ACME', 50, price=91.1)
        s3 = Stock('ACME', shares=50, price=91.1)
    

    5.8 定义一个接口或抽象基类

    要定义一个抽象基类,可以使用abc模块。

    from abc import ABCMeta, abstractmethod
    
    class IStream(metaclass=ABCMeta):
        @abstractmethod
        def read(self, maxbytes=-1):
            pass
            
        @abstractmethod
        def write(self, data):
            pass
    

    抽象基类的主要用途是强制规定所需的编程接口。例如,一种看待IStream基类的方式就是在高层次上指定一个接口规范,使其允许读取和写入数据。

    def serialize(obj, stream):
        if not isinstance(stream, IStream):
            raise TypeError('Expected an IStream')
        ...
    

    5.9 实现自定义的容器

    collections库中定义了各种各样的抽象基类,当实现自定义的容器类它们会非常有用。

    import collections
    
    class A(collections.Iterable):
        pass
    
    >>> a = A()
    Traceback (most recent call last):
    File "<stdin>", line 1, in <module>
    TypeError: Can't instantiate abstract class A with abstract methods __iter__
    

    5.10 在类中定义多个构造函数

    import time
    
    class Date:
        # Primary constructor
        def __init__(self, year, month, day):
            self.year = year
            self.month = month
            self.day = day
        
        # Alternate constructor
        @classmethod
        def today(cls):
            t = time.localtime()
            return cls(t.tm_year, t.tm_mon, t.tm_mday)
    
    a = Date(2012, 12, 21) # Primary
    b = Date.today() # Alternate
    

    5.11 实现带有状态的对象或状态机

    我们想实现一个状态机,或者让对象可以在不同的状态中进行操作。但是我们并不希望代码里因此出现大量的条件判断。

    一个相对优雅的方式是将每种操作状态以一个单独的类来定义,然后在Connection类中使用这些状态类。

    class Connection:
        def __init__(self):
            self.new_state(ClosedConnection)
    
        def new_state(self, state):
            self.__class__ = state
    
        def read(self):
            raise NotImplementedError()
    
        def write(self, data):
            raise NotImplementedError()
    
        def open(self):
            raise NotImplementedError()
    
        def close(self):
            raise NotImplementedError()
    
    class ClosedConnection(Connection):
        def read(self):
            raise RuntimeError('Not open')
    
        def write(self, data):
            raise RuntimeError('Not open')
    
        def open(self):
            self.new_state(OpenConnection)
    
        def close(self):
            raise RuntimeError('Already closed')
    
    class OpenConnection(Connection):
        def read(self):
            print('reading')
    
        def write(self, data):
            print('writing')
    
        def open(self):
            raise RuntimeError('Already open')
    
        def close(self):
            self.new_state(ClosedConnection)
    
    # Example
    if __name__ == '__main__':
        c = Connection()
        print(c)
        try:
            c.read()
        except RuntimeError as e:
            print(e)
    
        c.open()
        print(c)
        c.read()
        c.close()
        print(c)
    

    5.12 调用对象上的方法,方法名以字符串形式给出

    对于简单的情况,可能会使用getattr()。

    import math
    class Point:
        def __init__(self, x, y):
            self.x = x
            self.y = y
    
        def __repr__(self):
            return 'Point({!r:},{!r:})'.format(self.x, self.y)
    
        def distance(self, x, y):
            return math.hypot(self.x - x, self.y - y)
    
    p = Point(2,3)
    
    # Method 1 : Use getattr
    d = getattr(p, 'distance')(0, 0)     # Calls p.distance(0, 0)
    

    另一种方法是使用operator.mathodcaller()。

    import operator
    d = operator.methodcaller('distance', 0, 0)(p)
    
    >>> p = Point(3, 4)
    >>> d = operator.methodcaller('distance', 0, 0)
    >>> d(p)
    5.0
    

    5.13 实现访问者模式

    我们需要编写代码来处理或遍历一个由许多不同类型的对象组成的复杂数据结构,每种类型的对象处理方式都不相同。

    # Example of the visitor pattern
    
    # --- The following classes represent nodes in an expression tree
    class Node:
        pass
    
    class UnaryOperator(Node):
        def __init__(self, operand):
            self.operand = operand
    
    class BinaryOperator(Node):
        def __init__(self, left, right):
            self.left = left
            self.right = right
    
    class Add(BinaryOperator):
        pass
    
    class Sub(BinaryOperator):
        pass
    
    class Mul(BinaryOperator):
        pass
    
    class Div(BinaryOperator):
        pass
    
    class Negate(UnaryOperator):
        pass
    
    class Number(Node):
        def __init__(self, value):
            self.value = value
    
    # --- The visitor base class
    
    class NodeVisitor:
        def visit(self, node):
            methname = 'visit_' + type(node).__name__
            meth = getattr(self, methname, None)
            if meth is None:
                meth = self.generic_visit
            return meth(node)
        
        def generic_visit(self, node):
            raise RuntimeError('No {} method'.format('visit_' + type(node).__name__))
    
    # --- Example 1:  An expression evaluator
    
    class Evaluator(NodeVisitor):
        def visit_Number(self, node):
            return node.value
    
        def visit_Add(self, node):
            return self.visit(node.left) + self.visit(node.right)
    
        def visit_Sub(self, node):
            return self.visit(node.left) - self.visit(node.right)
    
        def visit_Mul(self, node):
            return self.visit(node.left) * self.visit(node.right)
    
        def visit_Div(self, node):
            return self.visit(node.left) / self.visit(node.right)
    
        def visit_Negate(self, node):
            return -node.operand
    
    # --- Example 2: Generate stack instructions
    
    class StackCode(NodeVisitor):
        def generate_code(self, node):
            self.instructions = []
            self.visit(node)
            return self.instructions
    
        def visit_Number(self, node):
            self.instructions.append(('PUSH', node.value))
    
        def binop(self, node, instruction):
            self.visit(node.left)
            self.visit(node.right)
            self.instructions.append((instruction,))
    
        def visit_Add(self, node):
            self.binop(node, 'ADD')
    
        def visit_Sub(self, node):
            self.binop(node, 'SUB')
     
        def visit_Mul(self, node):
            self.binop(node, 'MUL')
           
        def visit_Div(self, node):
            self.binop(node, 'DIV')
    
        def unaryop(self, node, instruction):
            self.visit(node.operand)
            self.instructions.append((instruction,))
      
        def visit_Negate(self, node):
            self.unaryop(node, 'NEG')
    
    # --- Example of the above classes in action
    
    # Representation of 1 + 2 * (3 - 4) / 5
    t1 = Sub(Number(3), Number(4))
    t2 = Mul(Number(2), t1)
    t3 = Div(t2, Number(5))
    t4 = Add(Number(1), t3)
    
    e = Evaluator()
    print('Should get 0.6 :', e.visit(t4))
    
    s = StackCode()
    code = s.generate_code(t4)
    for c in code:
        print(c)
    

    5.14 让类支持比较操作

    Python中的类可以支持比较操作,但是如果要实现每种可能的比较操作,那么实现这么多特殊方法则很快会变得繁琐。functools.total_ordering装饰器可以简化这个过程。要使用它,可以用它来装饰一个类,然后定义__eq__()以及另一个比较方法。那么装饰器就会自动为我们实现其他的比较方法。

    from functools import total_ordering
    class Room:
        def __init__(self, name, length, width):
            self.name = name
            self.length = length
            self.width = width
            self.square_feet = self.length * self.width
    
    @total_ordering
    class House:
        def __init__(self, name, style):
            self.name = name
            self.style = style
            self.rooms = list()
    
        @property
        def living_space_footage(self):
            return sum(r.square_feet for r in self.rooms)
    
        def add_room(self, room):
            self.rooms.append(room)
    
        def __str__(self):
            return '{}: {} square foot {}'.format(self.name, 
                                                  self.living_space_footage, 
                                                  self.style)
    
        def __eq__(self, other):
            return self.living_space_footage == other.living_space_footage
    
        def __lt__(self, other):
            return self.living_space_footage < other.living_space_footage 
    
    
    
    # Build a few houses, and add rooms to them.
    h1 = House('h1', 'Cape')
    h1.add_room(Room('Master Bedroom', 14, 21))
    h1.add_room(Room('Living Room', 18, 20))
    h1.add_room(Room('Kitchen', 12, 16))
    h1.add_room(Room('Office', 12, 12))
    
    h2 = House('h2', 'Ranch')
    h2.add_room(Room('Master Bedroom', 14, 21))
    h2.add_room(Room('Living Room', 18, 20))
    h2.add_room(Room('Kitchen', 12, 16))
    
    h3 = House('h3', 'Split')
    h3.add_room(Room('Master Bedroom', 14, 21))
    h3.add_room(Room('Living Room', 18, 20))
    h3.add_room(Room('Office', 12, 16))
    h3.add_room(Room('Kitchen', 15, 17))
    houses = [h1, h2, h3]
    
    print("Is h1 bigger than h2?", h1 > h2) # prints True
    print("Is h2 smaller than h3?", h2 < h3) # prints True
    print("Is h2 greater than or equal to h1?", h2 >= h1) # prints False
    print("Which one is biggest?", max(houses)) # prints 'h3: 1101 square foot Split'
    print("Which is smallest?", min(houses)) # prints 'h2: 846 square foot Ranch'
    

    5.15 创建缓存实例

    当创建类实例时我们想返回一个缓存引用,让其指向上一个用同样参数创建出的类实例。

    # Simple example
    
    class Spam:
        def __init__(self, name):
            self.name = name
    
    # Caching support
    import weakref
    _spam_cache = weakref.WeakValueDictionary()
    
    def get_spam(name):
        if name not in _spam_cache:
            s = Spam(name)
            _spam_cache[name] = s
        else:
            s = _spam_cache[name]
        return s
    
    if __name__ == '__main__':
        a = get_spam('foo')
        b = get_spam('bar')
        print('a is b:', a is b)
        c = get_spam('foo')
        print('a is c:', a is c)
    

    6. 元编程

    6.1 编写装饰器时如何保存函数的元数据

    import time
    from functools import wraps
    
    def timethis(func):
        '''
        Decorator that reports the execution time.
        '''
        @wraps(func)
        def wrapper(*args, **kwargs):
            start = time.time()
            result = func(*args, **kwargs)
            end = time.time()
            print(func.__name__, end-start)
            return result
        return wrapper
    
    if __name__ == '__main__':
        @timethis
        def countdown(n:int):
            '''
            Counts down
            '''
            while n > 0:
                n -= 1
    
        countdown(100000)
        print('Name:', countdown.__name__)
        print('Docstring:', repr(countdown.__doc__))
        print('Annotations:', countdown.__annotations__)
    

    6.2 定义一个可接受参数的装饰器

    from functools import wraps
    import logging
    
    def logged(level, name=None, message=None):
        '''
        Add logging to a function.  level is the logging
        level, name is the logger name, and message is the
        log message.  If name and message aren't specified,
        they default to the function's module and name.
        '''
        def decorate(func):
            logname = name if name else func.__module__
            log = logging.getLogger(logname)
            logmsg = message if message else func.__name__
    
            @wraps(func)
            def wrapper(*args, **kwargs):
                log.log(level, logmsg)
                return func(*args, **kwargs)
            return wrapper
        return decorate
    
    # Example use
    @logged(logging.DEBUG)
    def add(x, y):
        return x + y
    
    @logged(logging.CRITICAL, 'example')
    def spam():
        print('Spam!')
    
    if __name__ == '__main__':
        import logging
        logging.basicConfig(level=logging.DEBUG)
        print(add(2,3))
        spam()
    

    6.3 定义一个能接受可选参数的装饰器

    from functools import wraps, partial
    import logging
    
    def logged(func=None, *, level=logging.DEBUG, name=None, message=None):
        if func is None:
            return partial(logged, level=level, name=name, message=message)
    
        logname = name if name else func.__module__
        log = logging.getLogger(logname)
        logmsg = message if message else func.__name__
        @wraps(func)
        def wrapper(*args, **kwargs):
            log.log(level, logmsg)
            return func(*args, **kwargs)
        return wrapper
    
    # Example use
    @logged
    def add(x, y):
        return x + y
    
    @logged()
    def sub(x, y):
        return x - y
    
    @logged(level=logging.CRITICAL, name='example')
    def spam():
        print('Spam!')
    
    if __name__ == '__main__':
        import logging
        logging.basicConfig(level=logging.DEBUG)
        add(2,3)
        sub(2,3)
        spam()
    

    6.4 把装饰器定义成类

    要把装饰器定义成类实例,需要确保在类中实现__call__()和__get__()方法。

    # Example of defining a decorator as a class
    import types
    from functools import wraps
           
    class Profiled:
        def __init__(self, func):
            wraps(func)(self)
            self.ncalls = 0
    
        def __call__(self, *args, **kwargs):
            self.ncalls += 1
            return self.__wrapped__(*args, **kwargs)
    
        def __get__(self, instance, cls):
            if instance is None:
                return self
            else:
                return types.MethodType(self, instance)
    
    # Example
    
    @Profiled
    def add(x, y):
        return x + y
    
    class Spam:
        @Profiled
        def bar(self, x):
            print(self, x)
    
    if __name__ == '__main__':
        print(add(2,3))
        print(add(4,5))
        print('ncalls:', add.ncalls)
    
        s = Spam()
        s.bar(1)
        s.bar(2)
        s.bar(3)
        print('ncalls:', Spam.bar.ncalls)
    
    # Reformulation using closures and function attributes
    # This example tests the composability of decorators
    
    import time
    from functools import wraps
    
    def timethis(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            start = time.time()
            r = func(*args, **kwargs)
            end = time.time()
            print(func.__name__, end - start)
            return r
        return wrapper
    
    def profiled(func):
        ncalls = 0
        @wraps(func)
        def wrapper(*args, **kwargs):
            nonlocal ncalls
            ncalls += 1
            return func(*args, **kwargs)
        wrapper.ncalls = lambda: ncalls
        return wrapper
    
    # Example
    
    @profiled
    def add(x, y):
        return x + y
    
    class Spam:
        @profiled
        def bar(self, x):
            print(self, x)
    
    @timethis
    @profiled
    def countdown(n):
        while n > 0:
            n -= 1
    
    if __name__ == '__main__':
        print(add(2,3))
        print(add(4,5))
        print('ncalls:', add.ncalls())
    
        s = Spam()
        s.bar(1)
        s.bar(2)
        s.bar(3)
        print('ncalls:', Spam.bar.ncalls())
    
        countdown(100000)
        countdown(10000000)
        print(countdown.ncalls())
    

    6.5 利用元类来控制实例的创建

    # example2.py
    #
    # Singleton
    
    class Singleton(type):
        def __init__(self, *args, **kwargs):
            self.__instance = None
            super().__init__(*args, **kwargs)
    
        def __call__(self, *args, **kwargs):
            if self.__instance is None:
                self.__instance = super().__call__(*args, **kwargs)
                return self.__instance
            else:
                return self.__instance
    
    class Spam(metaclass=Singleton):
        def __init__(self):
            print('Creating Spam')
    
    if __name__ == '__main__':
        a = Spam()
        b = Spam()
        print(a is b)
    

    7. 并发

    7.1 启动和停止线程

    相关文章

      网友评论

        本文标题:Python Cookbook

        本文链接:https://www.haomeiwen.com/subject/xxoifxtx.html