美文网首页
Python编程技巧整理

Python编程技巧整理

作者: finlu | 来源:发表于2019-04-21 11:11 被阅读0次

    过滤列表中的数据

    实际案例:

    过滤掉列表里面的负数

    案例分析:

    • filter(function or None, iterable) py2返回一个列表,py3返回一个迭代器
    • 列表解析
    • 使用timeit来测试函数的运行时间

    案例代码:

    # python3
    from random import randint
    data = [randint(-10, 10) for _ in range(10)]
    # 方法一,直接迭代
    new_data0 = []
    for x in data:
        if x >= 0:
            new_data0.append(X)
    # 方法二,使用filter函数
    new_data1 = list(filter(lambda x: x>=10, data))
    # 使用timeit测试函数运行时间
    # 00000 loops, best of 3: 1.45 µs per loop
    # 方法三,使用列表解析
    new_data2 = [x for x in data if x >= 0]
    # 使用timeit测试函数运行时间
    # 1000000 loops, best of 3: 559 ns per loop
    # new_data0, new_data1, new_data2
    # [8, 0, 1, 10, 9]
    

    结论:

    ​ 运行速度:列表解析 > filter函数 > 直接迭代

    过滤字典,集合中的数据

    • 字典解析
    # python3
    from random impory randint
    data = {x: randint(60, 100) for x in range(1, 21)}
    new_data = {key: value for key, value in data.items() if value >= 80}
    # new_data
    # {16: 80, 1: 100, 2: 96, 17: 86, 11: 81, 12: 83, 15: 87}
    
    • 集合解析
    # python3
    from random import randint
    data = [randint(-10, 10) for _ in range(10)]
    s = set(data)
    new_data = {x for x in s if x % 3 == 0}
    # new_data
    # {-6, 3, 6}
    

    为元组的每个元素命名

    元组的优势:

    • 存储空间小
    • 访问速度快

    实际案例:

    学生信息系统中数据为固定格式:

    (名字,年龄,性别,邮箱地址,...)

    学生数量很大为了减小存储的开销,对每个学生信息用元组表示:

    (name='jim', age=16, sex='male', email='jim@163.com')

    (name='jack', age=16, sex='male', email='jack@163.com')

    (name='bob', age=16, sex='male', email='bob@163.com')

    ......

    访问是,我们使用索引(index)访问,大量索引降低程序的可读性。如何解决这个问题?

    # 输出一个学生的信息
    # python3
    # 使用内置库
    from collections import namedtuple
    # 创建一个有名字的元组,元组名:'Student'
    Student = namedtuple('Student', ['name', 'age', 'sex', 'email'])
    # 位置传参
    s1 = Student实际案例:
    
    过滤掉列表里面的负数
    
    # 关键字传参
    s2 = Student(name='jim', age=16, sex='male', email='jim@163.com')
    # 可以通过类的属性来访问
    print(s1.name)   # 'jim'
    print(s1.age)    # 16
    print(s1.sex)    # male
    print(s1.email)  # 'jim@163.com'
    # s1 是元组类的一个子类
    isinstance(s1, tuple)   # True
    
    # 常规方法, 借鉴C语言
    NAME, AGE, SEX, EMAIL = range(4)
    student = ('jim', 16, 'male', 'jim@163.com')
    # name
    print(student[NAME])   # 'jim'
    # age
    print(student[AGE])    # 16
    # sex
    print(student[SEX])    # male
    # email
    print(student[EMAIL])  # 'jim@163.com'
    

    统计序列中元素出现的频度

    实际案例:

    1. 某随机序列[12, 5, 9, 8, 7, ....]中,找到出现次数最高的三个元素,它们出现的次数是多少?
    2. 对某英文文章的单词,进行词频统计,找到出现次数最高的10个单词,它们出现的次数是多少?

    使用collections.Counter对象

    • 将序列传入Counter的构造器,得到Counter对象是元素频度的字典{元素值: 频数}
    • Counter.most_common(n) 方法得到平度最高的n个元素为元素和频数的列表

    案例一:

    # python3
    from random import randint
    data = [randint(0, 20) for _ in range(30)]
    c1 = dict.fromkeys(data, 0)
    from collections import Counter
    c2 = Counter(data)
    # 找到出现频度最高的三个元素
    c2.most_common(3)
    # 常规方法
    for x in data:
        c[x] += 1
    

    案例二:

    # python3
    import re
    from collections import Counter 
    txt = open('a.txt').read()
    count = Counter(re.split('\W+', txt))
    count.most_common(3)
    

    根据字典中值的大小,对字典进行排序

    iter(iterobj) 返回可迭代对象的信息

    sorted函数:sorted(iterable[, cmp[, key[, reverse]]])

    参数说明:

    • iterable -- 可迭代对象。
    • cmp -- 比较的函数,这个具有两个参数,参数的值都是从可迭代对象中取出,此函数必须遵守的规则为,大于则返回1,小于则返回-1,等于则返回0。
    • key -- 主要是用来进行比较的元素,只有一个参数,具体的函数的参数就是取自于可迭代对象中,指定可迭代对象中的一个元素来进行排序。
    • reverse -- 排序规则,reverse = True 降序 , reverse = False 升序(默认)。

    实际案例:

    某班英语成绩以字典形式存储为:{'LiLei': 79, 'Jim': 88, 'Lucy': 92, ...}

    根据成绩高低,计算学生排名。

    解决方案:使用内置函数sorted

    • 利用zip将字典数据转化为元组
    from random import randint
    data = {x: randint(60, 100)for x in 'xyzabc'}
    # 原理:元组的比较方式:从第一个元素开始,逐个开始比较
    sorted(list(zip(data.values(), data.keys())))
    # [(62, 'y'), (76, 'c'), (88, 'a'), (92, 'x'), (96, 'b'), (99, 'z')]
    
    • 传递sorted函数的key参数
    from random import randint
    data = {x: randint(60, 100)for x in 'xyzabc'}
    sorted(data.items(), key=lambda x: x[1])
    # [('y', 62), ('c', 76), ('a', 88), ('x', 92), ('b', 96), ('z', 99)]
    

    如何快速找到多个字典中的公共键

    map函数和reduce函数:

    • map(func, *iterable)

    在Python 3里,reduce()函数已经被从全局名字空间里移除了,它现在被放置在fucntools模块里 用的话要 先引 入: from functools import reduce

    参数说明:

    • function -- 函数,有两个参数
    • iterable -- 一个或多个序列

    返回值:

    • Python 2.x 返回列表。
    • Python 3.x 返回迭代器。
    • reduce(function, iterable[, initial])

    参数说明:

    • function -- 函数,有两个参数
    • iterable -- 可迭代对象
    • initializer -- 可选,初始参数

    返回值:

    • 返回函数计算结果。

    function参数是一个有两个参数的函数,reduce依次从sequence中取一个元素,和上一次调用function的结果做参数再次调用function。

    第一次调用function时,如果提供initial参数,会以sequence中的第一个元素和initial作为参数调用function,否则会以序列sequence中的前两个元素做参数调用function

    实际案例:

    西班牙足球甲级联赛,每轮球员进球统计:

    第一轮:{'苏亚雷斯': 1, '梅西': 2, '本泽马': 1, 'C罗': 3, ...}

    第二轮:{'苏亚雷斯': 2, 'C罗': 1, '格里兹曼': 2, '贝尔': 1, ...}

    第三轮:{'苏亚雷斯': 1, '托雷斯': 2, '贝尔': 1, '内马尔': 1, ...}

    ......

    统计出前N轮,每场比赛都有进球的球员

    案例分析:

    字典中的key表示每一次进球的球员 --> 求出所有字典里面的公共key

    解决方案:利用集合的交集操作

    1. 使用字典的keys()方法,得到一个字典keys的集合
    2. 使用map函数,得到所有字典的keys集合
    3. 使用reduce函数,取所有字典的keys的集合的交集
    # python3
    from functools import reduce
    from random import randint, sample
    # 生成三个随机字典
    s_list = [{x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))} for _ in range(3)]
    reduce(lambda a, b: a & b, map(dict.keys, s_list))
    

    一般处理方法:

    from random import randint, sample
    # 生成三个随机字典
    s1 = {x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))}
    s2 = {x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))}
    s3 = {x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))}
    res = []
    for k in s1:
        if k in s2 and k in s3:
            res.append(s2)
    

    如何让字典保持有序

    OrderedDict函数:

    • OrderedDict的Key会按照插入的顺序排列,不是Key本身排序。

    • OrderedDict可以实现一个FIFO(先进先出)的dict,当容量超出限制时,先删除最早添加的Key

    实际案例:

    某编程竞赛系统,对参数选手编程解题进行计时,选手完成题目后,把该选手解题用时记录到字典中,以便赛后按选手名查询成绩。

    (答题用时越短,成绩越优)

    {’Lilie': (2, 43), 'HanMeimei': (5,52), 'Jim': (1, 39) ...}

    比赛结束后,须按排名顺序依次打印选手成绩,如何实现?

    案例分析:

    以进入字典的为选手的排名 --> 按进入字典的顺序输出选手的比赛信息

    使用collections.OrderedDict,以OrderedDict代替内置字典Dict,一次将选手的成绩存入OrderedDict

    案例代码:

    # python3
    from collections import OrderedDict
    from random import randint
    import time
    d = OrderedDict()
    players = list('ABCDEFGH')   # 模拟选手A,B,C,D,E,F,G,H
    start = time.time()    # 记录比赛开始时间
    for i in range(8):
        input('>>>')
        p = players.pop(randint(0, 7 - i))
        end = time.time()   # 每个选手完成比赛的时间
        d[p] = (i + 1, end - start)
    

    实现用户的历史记录功能(最多n条)

    **deque函数:from collections import deque

    • 创建deque序列: d=deque()
    • deque提供了类似list的操作方法: d.append(3) ;d.append(8);d.append(1); 此时d=deque([3,8,1])
    • 两端都使用pop:
      • d.pop() --> 抛出队列中的最后一个元素
      • d.leftpop() --> 抛出队列中的第一个元素
    • 限制deque的长度:d=deque(maxlen=20)

    **pickle模块:

    • pickle.dump(obj, file, protocol=None, *, fix_imports=True)

      • 参数说明:
        • obj --> python对象
        • file --> python中的文件对象
        • protocol --> 可选的协议参数
        • 如果fix_imports为true且protocol小于3,pickle会尝试将新的Python 3名称映射到Python 2中使用的旧模块名称,以便pickle数据流可以用Python 2读取。

      说明:

      序列化对象,并将结果数据流写入到文件对象中。参数protocol是序列化模式,默认值为0,表示以文本的形式序列化。protocol的值还可以是1或2,表示以二进制的形式序列化。

    • pickle.dumps(obj, protocol=None, fix_imports=True)

      将对象的pickle d表示形式作为bytes对象返回,而不是将其写入文件。

    • pickle.load(file, *, fix_imports = True, encoding ="ASCII", errors ="strict ")

      从打开的文件对象文件中*读取一个pickle d对象表示并返回其中指定的重组对象层次结构。

    • pickle.loads(bytes_object, *, fix_imports = True, encoding ="ASCII", errors ="strict ")

      从对象中读取pickle d对象层次结构bytes并返回其中指定的重组对象层次结构。

      可选参数:fix_imports, encoding, errors ---->

      ​ 用于控制由Python 2生成的pickle stream的兼容性支持。如果fix_imports为true,pickle将尝试将旧的Python 2名称映射到Python 3中使用的新名称。编码错误告诉pickle如何解码由Python 2腌制的8位字符串实例; 这些默认值分别为'ASCII'和'strict'。该编码可以是“字节”作为字节对象读取这些8位串的实例。

    实际案例:

    很多应用程序都有浏览用户的历史记录的功能,

    例如:

    浏览器可以查看最近访问过的网页。

    视频播放器可以查看最近播放过的视频文件。

    Shell可以查看用户输入过的命令

    ......

    现在我们制作了一个简单的猜数字小游戏,添加历史记录的功能,显示用户最近猜过的数字,如何实现?

    解决方案:使用容量为n的队列存储历史记录

    • 使用标准库collections中的deque,它是一个双端循环队列。
    • 程序退出前,可以使用pickle将队列对象存入文件,再次运行程序时将其导入。
    from collections import deque
    from random import randint
    
    
    N = randint(0, 100)
    history = deque([], 5)
    def guess(num):
        if num == N:
            print("right")
            return True
        if num < N:
            print("{} is less than num".format(num))
        else:
            print("{} is greater than num".format(num))
        return False
    
    
    while True:
        line = input('Please input a number: ')
        if line.isdigit():
            k = int(line)
            history.append(k)
            if guess(k):
                break
        elif line == 'history' or line == 'h?':
            print(list(history))
    

    实现可迭代对象和迭代器对象

    实际案例:

    某软件要求,从网络抓取各个城市气温信息,并依次显示:

    北京:15~20

    天津:17~22

    长春:12~18

    ......

    如果一次抓取所有城市天气再显示,显示第一个城市气温时,有很高的延时,并且浪费存储空间,我们期望以 “用时访问” 的策略,并且能把所有城市气温封装到一个对象里, 可用for语句进行迭代,怎么解决?

    案例思路:

    1. 实现一个迭代器对象WheatherIterator,next方法每次返回一个城市的气温。
    2. 实现一个可迭代对象WeatherIterable,__iter__方法返回一个迭代器对象。

    案例代码:

    # python3
    import requests
    from collections import Iterable, Iterator
    
    
    # 实现一个天气的迭代器对象
    class WeatherIterator(Iterator):
        def __init__(self, cities):
            self.cities = cities
            self.index = 0
        def getWhether(self, city):
            response = requests.get('http://wthrcdn.etouch.cn/weather_mini?city=' + city)
            data = response.json()['data']['forecast'][0]
            return "{}: {}, {}".format(city, data['low'], data['high'])
        # python2 中直接使用next()方法
        def __next__(self):
            # 当索引越界的时候触发异常
            if self.index == len(self.cities):
                raise StopIteration
            city = self.cities[self.index]
            self.index += 1
            return self.getWhether(city)
    # 实现天气的可迭代对象
    class WeatherIterable(Iterable):
        def __init__(self, cities):
            self.cities = cities
        def __iter__(self):
            return WeatherIterator(self.cities)
    for x in WeatherIterator(['北京', '上海', '广州']):
        print(x)
    # 北京: 低温 23℃, 高温 35℃
    # 上海: 低温 23℃, 高温 29℃
    # 广州: 低温 27℃, 高温 33℃
    

    使用生成器函数实现可迭代对象

    实际案例:

    实现一个可迭代对象的类,它能迭代出给定范围内的所有素数:

    pn = PrimeNumbers(1, 30)

    for k in pn:

    ​ print(k)

    输出结果:

    2 3 5 7 11 13 17 19 23 29

    解决方案:

    将该类的__iter__方法实现生成器函数,每次yield返回一个素数.

    案例代码:

    # python3
    class PrimeNumbers:
        def __init__(self, start, end):
            self.start = start
            self.end = end
        def isPrimerNum(self, k):
            if k < 2:
                return False
            
            for i in range(2, k):
                if k % i == 0:
                    return False
            return True
        def __iter__(self):
            for k in range(self.start, self.end + 1):
                if self.isPrimerNum(k):
                    yield k
    
                    
    for x in PrimeNumbers(1, 30):
        print(x)
    

    实现序列的反向迭代操作

    内置方法:

    reversed --> 其实是调用了__reversed__()方法

    返回一个反向迭代器,

    iter --> 调用了__iter__() 方法

    返回一个正向迭代器

    实际案例:

    实现一个连续浮点数发生器FloatRange (和range类似),根据给定范围 (start, end) 和步进值 (step) 产生一系列连续浮点数,如迭代FloatRange(3.0, 4.0, 0.2) 可产生序列:

    正向:3.0 -> 3.2 -> 3.4 -> 3.6 -> 3.8 -> 4.0

    反向:4.0 -> 3.8 -> 3.6 -> 3.4 -> 3.2 -> 3.0

    案例思路:

    重写类里面的__iter__() 和 _reversred_() 方法

    案例代码:

    # python3
    class FloatRange:
        def __init__(self, start, end, step=0.1):
            self.start = start
            self.end = end
            self.step = step
        def __iter__(self):
            t = self.start
            while t <= self.end:
                yield t
                t += self.step
        def __reversed__(self):
            t = self.end
            while t >= self.start:
                yield t
                t -= self.step
    
    
    for x in iter(FloatRange(1.0, 4.0, 0.2)):
        print(x)
    for x in reversed(FloatRange(1.0, 4.0, 0.2)):
        print(x)
    

    如何对迭代器做切片操作

    itertools.islice 方法:islice(iterable, start, stop[, step]) --> islice object

    参数说明:

    • iterable --> 可迭代对象
    • start --> 迭代的起始位置
    • end --> 迭代的终止位置(end 为None的时候迭代到末尾)
    • step --> 两个元素之间的索引差值

    实际案例:

    有某个文本文件,我们想读取其中某范围的内容,如100 ~ 300 行之间的内容,python中文本文件是可迭代对象,我们是否可以使用类似列表切片的方式得到一个100 ~ 300 行文件内容的生成器?

    f = open('a.txt')

    f[100:300] # 可以么?

    解决方案:

    使用标准库中的itertools.islice,它能返回一个迭代器对象切片的生成器

    案例代码:

    # python3
    from itertools import islice
    f = open('/var/log/dpkg.log')
    # 在Python中,文件对象也是可迭代对象
    f_iter = islice(f, 100, 300)    
    for x in f_iter:
        print(x)
    

    在for语句中迭代多个可迭代对象

    zip 函数:zip(iter1 [,iter2 [...]]) --> zip object

    itertools.chain 方法:chain(*iterables) --> chain object

    实际案例:

    1. 某班学生期末考试成绩,语文,数学,英语分别存储在3个列表中,同时迭代三个列表,计算每个学生的总分。(并行)
    2. 某年级有四个班,某次考试没办英语成绩分别存储在4个列表中,一次迭代每个列表,统计全学年成绩高于90分人数。(串行)

    解决方案:

    并行:使用内置函数zip,它能将多个可迭代对象合并,每次迭代返回一个元组

    串行:使用标准库中的itertools.chain,它能将多个可迭代对象连接

    案例代码:

    from random import randint
    
    
    chinese = [randint(60, 100) for _ in range(40)]
    math = [randint(60, 100) for _ in range(40)]
    english = [randint(60, 100) for _ in range(40)]
    total_sorce = []
    # 一般方法:生成一个索引序列
    for i in range(len(chinese)):
        total_sorce.append(chinese[i] + math[i] + english[I])
    # 使用内置zip函数
    for c , m ,e in zip(chinese, math, English):
        total_sorce.append(c + m + e)
    print(total_sorce)
    
    # python3
    from random import randint
    
    
    e1 = [randint(60, 100) for _ in range(42)]
    e2 = [randint(60, 100) for _ in range(41)]
    e3 = [randint(60, 100) for _ in range(38)]
    e4 = [randint(60, 100) for _ in range(37)]
    count = 0
    for s in chain(e1, e2, e3, e4):
        if s > 90:
            count += 1
    

    拆分含有多种分隔符的字符串

    实际案例:

    我们要把某个字符串依据分隔符号拆分不同的字段,该字符串包含多种不同的分隔符,例如:

    s = 'ab;sdfsgsef|sgegssg\gfsdf\tasdavsdfup'

    其中<,>,<;>,<|>,<\t>都是分隔符号,如何处理?

    解决方案:

    1. 连续使用str.split() 方法,每次数理一种分隔符号。
    2. 使用正则表达式的re.split() 方法,一次性拆分字符串。(推荐)
    # python3
    def mySplit(s, ds):
        res = [s]
        
        for d in ds:
            t = []
            map(lambda x: t.extend(x.split(d)), res)
            res = t
        return [x for x in res if x]
    
    s = 'fasfgajhgijaseg|adfg、/vg.Adafsda'
    print(mySplit(s, '/\?;,'))
    
    import re
    s = 'fasfgajhgijaseg|adfg、/vg.Adafsda'
    res = re.split(r'[,;\t|]+', s)    # + 表示匹配多次
    

    如何判断字符串a是否以字符串b开头或结尾

    实际案例:

    某文件系统目录下有一系列文件:

    quicksort.c

    graph.py

    install.sh

    stack.cpp

    ......

    编写程序给其中所有的.sh文件和.py文件加上用户可执行权限。

    案例代码:

    # endwith 的参数只能是元组或者字符串,不能为列表
    f_li = [name for name in os.listdir('.') if name.endswith(('.sh', '.py'))]
    [os.chmod(x, os.stat(x).st_mode | stat.S_IXUSR) for x in f_li]
    

    如何调整字符串中文本的格式

    实际案例:

    某软件的log文件,其中的日期格式为 'yyyy-mm-dd':

    ......

    2018-06-20 18:32:25 status unpacked python3-pip:all 8.1.1-2ubuntu0.4
    2018-06-20 18:32:25 status half-configured python3-pip:all 8.1.1-2ubuntu0.4
    2018-06-20 18:32:25 status installed python3-pip:all 8.1.1-2ubuntu0.4
    2018-06-20 18:32:26 startup packages configure

    ......

    我们想把其中日期改为美国日期的格式 'mm/dd/yyyy'.

    '2018-06-20' => '06/20/2018',应如何处理?

    解决方案:

    使用正则表达式 re.sub()方法做字符串替换,利用正则表达式的补货组,捕获每个部分内容,在替换字符串中调整各个捕获组的顺序。

    import re
    log = open('/var/log/dpkg.log').read()
    re.sub('(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})', r'\g<month>/\g<day>/\g<year>', log)   # 给小组取别名?P<name>
    

    如何将多个小字符串拼接成一个大的字符串

    实际案例:

    在设计某网络程序时,我们自定义了一个基于UDP的网络协议。

    在按照固定次序向服务器传递一系列参数:

    hwDetect: "<0112>"

    gxDepthBits: "<32>"

    gxResolution: "<1024x768>"

    gxRefresh: "<60>"

    fullAlpha: "<1>"

    lodDist: "<100.0>"

    DistCull: "<500.0>"

    在程序中我们将各个参数按次序收集到列表中:

    [ "<0112>", "<32>", "<1024x768>", "<60>", "<1>", "<100.0>", "<500.0>"]

    最终我们要把各个参数拼成一个数据报进行发送,

    '<0112><32><1024x768><60><1><100.0><500.0>'

    解决方案:

    1. 迭代列表,连续使用 + 操作依次拼接每一个字符串。
    2. 使用str.join() 方法,更加快速的拼接列表中所有字符串。(推荐)

    案例代码:

    pl = ["<0112>", "<32>", "<1024x768>", "<60>", "<1>", "<100.0>", "<500.0>"]
    s = ''
    for p in pl:
        s += p
    
    pl = [ "<0112>",  "<32>",  "<1024x768>", "<60>",  "<1>", "<100.0>", "<500.0>"]
    s = ''.join(pl)
    

    对字符串进行左,右对齐

    str.ljust()方法:ljust(width[, fillchar]) -> str

    参数说明:

    • width --> 字符串填充后的宽度
    • fillchar --> 填充的字符

    实际案例:

    某个字典存储了一系列的属性值:

    {

    ​ "lodDist": 100.0,

    ​ "SmallCull": 0.04,

    ​ "DistCull": 500.0,

    ​ "trilinear": 40,

    ​ "farclip": 477

    }

    在程序中,我们想以以下工整的格式将其内容输出,如何处理?

    lodDist : 100.0
    farclip : 477
    trilinear : 40
    SmallCull : 0.04
    DistCull : 500.0

    解决方案:

    1. 使用字符串的str.ljust(), str.rjust(), str.center() 进行左,右,居中对齐。
    2. 使用format() 方法,传递类似'<20', '>20', '^20' 参数完成同样任务。例如:format(s, '<20') --> 左对齐

    案例代码:

    d = {"lodDist": 100.0, "SmallCull": 0.04, "DistCull": 500.0, "trilinear": 40, "farclip": 477}
    w = max(map(len, d.keys()))
    for k in d.keys():
        print(k.ljust(w), ':', d[k])
    # lodDist   : 100.0
    # farclip   : 477
    # trilinear : 40
    # SmallCull : 0.04
    # DistCull  : 500.0
    

    如何去掉字符串中不需要的字符

    实际案例:

    1. 过滤掉用户输入中前后多余的空白字符:

      ' nick2008@gmail.com '

    2. 过滤某windows下编辑文本中的'\r':

      'hello world\r\n'

    3. 去掉文本中的unicode组合符号(音调):

      u'ni hao chi fan'

    解决方案:

    1. 字符串strip(), lstrip(), rstrip() 方法去掉字符串两端的字符(不限于空格)。
    2. 删除单个固定位置的字符,可以使用切片 + 拼接的方式。
    3. 字符串的replace() 方法或者在正则表达式re.sub()删除任意位置字符。
    4. 字符串translate() 方法,可以同时删除多种不同字符。(python2)

    如何设置文件的缓冲

    实际案例:

    将文件内容写入到硬件设备时,使用系统调用,这类I/O操作的时间很长。为了减少I/O操作的次数,文件通常使用缓冲区。(有足够多的数据才进行系统调用 )文件的缓冲行为,分为全缓冲,行缓冲,无缓冲。

    如何设置python中文件对象的无缓冲行为?

    解决方案:

    全缓冲:open函数的buffering设置为大于1的证书n,n为缓冲区的大小。

    行缓冲:open函数的buffering设置为1。

    无缓冲:open函数的buffering设置为0。

    如何将文件映射到内存

    实际案例:

    1. 在访问某些二进制文件时,希望能把文件映射到内存中,可以实现随机访问。(framebuffer设备文件)
    2. 某些嵌入式设备,寄存器被编址到内存地址空间,我们可以映射/dev/mem某范围,去访问这些寄存器。
    3. 如果多个进程映射同一个文件,还能实现进程通信的目的。

    解决方法:

    使用标准库中mmp模块的mmap() 函数,他需要一个打开的文件描述符作为参数。

    如何访问文件的状态

    os模块:python与操作系统通信的模块。

    实际案例:

    在某些项目中,我们需要获得文件状态,例如:

    1. 文件的类型(普通文件,目录,符号链接,设备文件 ... ).

    2. 文件的访问权限。

    3. 文件的最后的访问/修改/节点状态更改时间。

    4. 普通文件的大小。

      ......

    解决方案:

    1. 系统调用:标准库中os模块下的三个系统调用 stat, fstat, lstat 获取文件状态。
    2. 快捷函数:标准库中os.path 下一些函数,使用起来更加简洁。

    如何使用临时文件

    实际案例:

    某项目中,我们从传感器菜鸡数据,每手机到1G数据后,做数据分析,最终只保存分析结果。这样很大的临时数据如果常驻内存,将消耗大量内存资源,我们可以使用临时文件存储这些临时数据(外部存储)。

    临时文件不用命名,且关闭后悔自动被删除

    解决方案:

    使用标准库中 tempfil撒大声地e 下的 Temporaryfile, NamedTemporaryfile

    from tempfile import Temporaryfile, NamedTemporaryfile
    f = Temporaryfile()   # 创建一个临时文件对象,只能由对象f来访问
    f.write('abcdef' * 1000)
    f.seek(0)
    f.read(100)
    
    ntf = NamedTemporaryfile()  # 创建一个有名字的文件对象,可以在文件系统中找到,当对象被重新实例化的时候,原来的文件会消失
    ntf.name    # 临时文件的名字
    ntf = NamedTemporaryfile(delete=False)   # 创建新实例的时候,原来的文件不会被删除
    

    如何读写csv数据

    实际案例:

    将一个csv文件里面的数据读取出来并存到另一个csv文件中。

    解决方案:

    使用标准库中的csv模块,可以使用其中 reader 和 writer 完成 csv 文件读写

    import csv
    
    
    rf = open('test1.csv', 'rb')
    reader = csv.reader(rf)    # 得到一个读取 csv 文件的对象的迭代器
    reader.next()    # 得到第一行的数据
    
    wf = open('test2.csv', 'wb')
    writer = csv.writer(wf)
    writer.writerow(['a', 'b'])      # 将一个一维列表写入csv文件中
    writer.writerows([['a', 'b'], ['c', 'd']])   # 将一个二维写入csv文件中
    

    如何读写json数据

    解决方案:

    使用json模块里的loads、load和dumps、dump进行处理

    json模块:

    • json.dumps(obj, sort_key=True) # 将python对象转化为json对象,对键进行排序
    • json.dump() # 将python的文件对象转化为json对象
    • json.loads() # 将json字符串转化为python对象(一般为字典)
    • json.load() # 讲json文件对象转化为Python对象

    如何解析简单的xml文档

    解决方案:

    使用标准库中的xml.etree.ElementTree,其中的parse函数可以解析xml文档。

    from xml.etree.ElementTree import parse
    
    
    f = open('demo.xml', 'r')
    et = parse(f)   # 得到一个元素树对象
    root = et.getroot()  # 得到元素树的根节点(可迭代元素对象)
    for child in root:
        child.get('name')    # 获取每个子元素的属性
    root.tag   # 查看标签名
    root.attrib   # root的属性(一个字典)
    root.text   # root的内容
    root.text.strip()  # 过滤空白字符
        
    # 支持 xpath 语法
    root.find('tag_name')   # 找到子节点下标签名为 tag_name 的元素,找到第一个就停止
    root.findall('tag_name')  # 找到子节点下所有标签名为 tag_name 的元素,返回一个列表
    root.iterfind('tag_name')   # 找到子节点下所有标签名为 tag_name 的元素,返回一个迭代器
    root.iter('rank')   # 递归地查找标签名为 rank 的子节点,无论在哪个层级下
    

    如何构建xml文档

    解决方案:

    使用标准库中的xml.etree.ElementTree,使用其中的write方法写入xml文件。

    from xml.etree.ElementTree import Element, ElementTree
    from xml.etree.ElementTree import tostring
    
    e = Element('Data')  # 创建一个 xml 元素
    e.tag   # 得到元素的标签名
    e.set('name', 'abc')  # 设置元素的属性
    tostring(e)   # 以字符串的方式查看xml元素
    e.text = '123'   # 设置xml元素的内容
    e2 = Element('Row')
    e2.text = 'abc'
    e3 = Element('Open')
    e3.text = 'def'
    e2.append(e3)   # 将e3设置为e2的子元素
    e.text = None   # 将元素e的内容删除
    e.append(e2)   # 将e2设置为e的子元素
    
    et = ElementTree(e)    # 需要传入一个根节点元素
    et.write('demo1.xml')
    
    # 美化xml文件
    def pretty(e, level=0):
        if len(e) > 0:
            e.text = '\n' + '\t' * (level + 1)
            for child in e:
                pretty(child, level + 1)
            child.tail = child.tail[:-1]
        e.tail = '\n' + '\t' * level
    

    如何读写excel文件

    实际案例:

    利用python读写excel文件,并添加新的一列

    解决方案:

    使用第三方库xlrdxlwt,这两个库分别用于excel读和写

    # 读取excel文件
    import xlrd
    
    
    book = xlrd.open_work_book('demo.xlsx')
    book.sheets()    # 返回一个sheet对象的列表
    sheet = book.sheet_by_index(0)   # 得到一个sheet对象
    sheet.nrows    # 表的行数
    sheet.ncols    # 表的列数
    # 访问单元格
    cell = sheet.cell(0, 0)   # 得到坐标为(0,0)的对象
    cell.ctype  # 内容的类型,枚举类型   xlrd.XL_CELL_XXX 来查看具体所指
    cell.value   # cell的内容
    # 获取一行或者一列
    sheet.row(1) # 返回第一行
    sheet.row_values(1)   # 得到第一行的值
    sheet.row_values(1, start_colx=1, end_colx=None)  # 从第一个开始
    # 添加一个单元格
    sheet.put_cell(rowx=2, colx=2, ctype=1, value="ok", xf_index=None)   # xf_index 设置字体、对齐等
    
    # 写入excel文件
    import xlwt
    
    wbook = xlwt.Workbook()   # 实例一个Workbook对象
    wsheet = wbook.add_sheet('sheet1')   # 创建一个名字为sheet1的表
    wsheet.write(r=0, c=0, label='ok', style=None)
    wsheet.save('demo.xlsx')
    

    如何派生内置不可变类型并修改实例化行为

    实际案例:

    我们想自定义一种新类型的元组,对于传入的可迭代对象,我们只保留作其中int类型且值大于0的元素,例如:

    IntTuple([1, -1, 'abc', 6, ['x', 'y'], 3]) => (1, 6, 3)

    要求IntTuple是内置tuple的子类,如何实现?

    解决方案:

    定义IntTuple继承内置tuple,并实现__new__,修噶爱实例化行为

    class IntTuple(tuple):
    
        # 参数与__init__方法的参数一致
        def __new__(cls, iterable):
            g = (x for x in iterable if isinstance(x, int) and x > 0)
            # 返回一个 IntTuple 的实例对象
            return super(IntTuple, cls).__new__(cls, g)
    
        def __init__(self, iterable):
            super(IntTuple, self).__init__()   # super().__init__() 也可以
    
    
    t = IntTuple([1, -1, 'abc', 6, ['x', 'y'], 3])
    print(t)
    

    如何创建大量实例节省内存

    实际案例:

    某网络游戏中,定义了玩家类Player(id, name, status, ...) 每有一个在线玩家,在服务器程序内则有一个Player的实例,当在线人数很多时,将产生大量实例。(如百万级)

    如何降低这些大量实例的内存开销?

    解决方案:

    定义类的__slots__属性, 它用来声明实例属性名字的列表。

    案例代码:

    # python3
    import sys
    
    
    class Player(object):
    
        def __init__(self, uid, name, status=0, level=1):
            self.uid = uid
            self.name = name
            self.stat = status
            self.level = level
    
    
    class Player2(object):
        __slots__ = ['uid', 'name', 'stat', 'level']   # 关闭__dict__的属性,不能动态绑定属性
    
        def __init__(self, uid, name, status=0, level=1):
            self.uid = uid
            self.name = name
            self.stat = status
            self.level = level
    
    
    p1 = Player('0001', 'Jim')
    p2 = Player('0001', 'Jim')
    dic = p1.__dict__   # 类对象属性的字典, 可以动态的增加和删除值
    sys.getsizeof(dic)    # 获取dic的内存值
    

    如何让对象支持上下文管理

    实际案例:

    我们事先了一个telent客户端的类TelentClient,调用实例的start() 方法启动客户端与服务器的交互,交互完毕后需调用cleanup() 方法,关闭已连接的socket,以及将操作历史记录写入文件并关闭。

    能否让TelentClient的实例支持上下文管理协议,从而代替手工调用cleanup() 方法。

    解决方案:

    实现上下文管理协议,需定义实例的__enter__和__exit__方法,它们分别在with开始和结束时被调用。

    # python
    from telnetlib import Telnet
    from sys import stdin, stdout
    from collections import deque
    
    
    class TelentClient(object):
        def __init__(self, addr, port=23):
            self.addr = addr
            self.port = port
            self.tn = None
            self.history = None
    
        def start(self):
            # user
            t = self.tn.read_until('login: ')
            stdout.write(t)
            user = stdin.readline()
            self.tn.write(user)
    
            # password
            t = self.tn.read_until('Password: ')
            if t.startswith(user[:-1]):
                t = t[len(user) + 1:]
            stdout.write(t)
            self.tn.write(stdin.readline())
    
            t = self.tn.read_until('$ ')
            stdout.write(t)
            while True:
                uinput = stdin.readline()
                if not uinput:
                    break
                self.history.append(uinput)
                self.tn.write(uinput)
                t = self.tn.read_until('$ ')
                stdout.write(t[len(uinput) + 1:])
    
        def cleanup(self):
            self.tn.close()
            self.tn = None
            with open(self.addr + '_history.txt', 'w') as f:
                f.write(self.history)
    
        def __enter__(self):
            self.tn = Telnet(self.addr, self.port)
            self.history = deque()
            # 返回 TelentClient 实例化后的对象,也就是自身
            return self
    
        # 当出现异常的时候,直接跳到__exit__方法并执行
        def __exit__(self, exc_type, exc_val, exc_tb):
            self.cleanup()
            return True   # 写了 'return True' 后不会向外抛出异常
    
    
    with TelentClient('finlu.com.cn') as client:
        client.start()
    

    一般的telent客户端:

    # python3
    from telnetlib import Telnet
    from sys import stdin, stdout
    from collections import deque
    
    
    class TelentClient(object):
        def __init__(self, addr, port=23):
            self.addr = addr
            self.port = port
            self.tn = None
            self.history = None
    
        def start(self):
            self.tn = Telnet(self.addr, self.port)
            self.history = deque()
    
            # user
            t = self.tn.read_until('login: ')
            stdout.write(t)
            user = stdin.readline()
            self.tn.write(user)
    
            # password
            t = self.tn.read_until('Password: ')
            if t.startswith(user[:-1]):
                t = t[len(user) + 1:]
            stdout.write(t)
            self.tn.write(stdin.readline())
    
            t = self.tn.read_until('$ ')
            stdout.write(t)
            while True:
                uinput = stdin.readline()
                if not uinput:
                    break
                self.history.append(uinput)
                self.tn.write(uinput)
                t = self.tn.read_until('$ ')
                stdout.write(t[len(uinput) + 1:])
    
        def cleanup(self):
            self.tn.close()
            self.tn = None
            with open(self.addr + '_history.txt', 'w') as f:
                f.write(self.history)
    
    
    client = TelentClient('finlu.com.cn')
    print('\nstart...')
    client.start()
    print('\ncleanup...')
    

    如何创建可管理的对象属性

    实际案例:

    在面向对象编程中,我们把方法(函数)看作对象的接口。直接访问对象的属性可能是不安全的,或设计上不够灵活。但是使用调用方法在形式上不如访问属性简介。

    circle.get_radius()

    circle.get_radius(5.0) # 繁

    circle.radius

    circle.radius = 5.0 # 简

    是否在形式上是属性访问,但实际上调用方法?

    解决方案:

    使用property函数为类创建可管理属性,fget/fset/fdel对应属性访问。

    案例代码:

    from math import pi
    
    
    class Circle(object):
        def __init__(self, radius):
            self.radius = radius
    
        def get_radius(self):
            return round(self.radius, 2)
    
        def set_radius(self, value):
            if not isinstance(value, (int, float)):
                raise ValueError('wrong type.')
            self.radius = float(value)
    
        def get_area(self):
            return round(self.radius ** 2 * pi, 2)
    
        R = property(get_radius, set_radius)
    
    
    c = Circle(3.2)
    r1 = c.R    # r1 = 3.2
    c.R = 4.2
    r2 = c.R    # r2 = 4.2
    

    如何让类支持比较操作

    实际案例:

    有时我们希望自定义的类,势力见可以使用<, <=, >, >=, ==, !=符号进行比较,我们自定义比较的行为。例如,有一个矩阵的类,我们希望比较两个矩形的实例时,比较的是他们的面积。

    解决方案:

    比较符号运算符重载,需要实现以下方法:

    __lt__,__le__,___gt,__ge__,__eq__,__ne__

    使用标准库下的functools下的类装饰器abstractmethod可以简化此过程。

    案例代码:

    from math import pi
    from abc import abstractmethod
    
    
    class Shape(object):
    
        # 子类中一定要实现该方法
        @abstractmethod
        def area(self):
            pass
    
        def __lt__(self, other):
            print('In __lt__')
            if not isinstance(other, Shape):
                raise TypeError('Type Error')
            return self.area() < other.area()
    
        def __eq__(self, other):
            print('In __lt__')
            if not isinstance(other, Shape):
                raise TypeError('Type Error')
            return self.area() == other.area()
    
        def __gt__(self, other):
            if not isinstance(other, Shape):
                raise TypeError('Type Error')
            return self.area() > other.area()
    
    
    class Rectangle(Shape):
        def __init__(self, w, h):
            self.w = w
            self.h = h
    
        def area(self):
            return self.w * self.h
    
    
    class Circle(Shape):
        def __init__(self, r):
            self.r = r
    
        def area(self):
            return self.r ** 2 * pi
    
    
    r = Rectangle(6, 6)
    c = Circle(6)
    print(r < c)
    

    如何使用描述符对实例属性做类型检查

    描述符:__get__,__set__, __delete__

    实际案例:

    在某项目中,我们实现了一些类,并希望能像静态语言那样(C, C++, Java)对它们的实例属性做类型检查。

    p = Person()

    p.name = 'Bob' # 必须是str

    p.age = 18 # 必须是int

    p.height = 1.83 # 必须是float

    要求:

    1. 可以对实例变量名指定类型
    2. 赋予不正确时抛出异常

    解决方案:

    使用描述符来实现需要类型检查的属性:分别实现__get__,__set__, ___delete__方法,在__set__内部使用isinstance函数做类型检查。

    案例代码:

    # 检查Person类的属性类型是否符合要求
    class Attr(object):
        def __init__(self, name, type_):
            self.name = name
            self.type_ = type_
    
        def __get__(self, instance, owner):
            print('In __get__')
            return instance.__dict__[self.name]
    
        def __set__(self, instance, value):
            print('In __set__')
            if not isinstance(value, self.type_):
                raise TypeError('expected an {}'.format(self.type_))
            instance.__dict__[self.name] = value
    
        def __delete__(self, instance):
            print('In delete')
            del instance.__dict__[self.name]
    
    
    class Person(object):
        name = Attr('name', str)
        age = Attr('age', int)
        height = Attr('height', float)
    
    
    person = Person()
    person.name = 'Bob'
    person.age = 18
    person.height = 1.83
    

    如何在环状结构中管理内存

    sys.getrefcount(a): 返回a被引用的次数

    实际案例:

    在python中,垃圾回收器通过引用计数来回收垃圾对象,但某些环状数据结构(树, 图,...),存在对象间的循环引用,比如树的父节点引用子节点,子节点也同时引用父节点。此时同时del掉引用父子节点,两个对象不能被立即回收。

    如何解决此类的内存管理问题?

    解决方案:

    使用标准库weakref,它可以创建一种能访问对象但不增加引用计数的对象。

    案例代码:

    # 当对象循环引用的时候,删除其中一个对象,两个对象不能被立即回收(执行类的__del__方法)
    import weakref
    
    
    class Data(object):
        def __init__(self, value, owner):
            # self.owner = owner
            self.owner = weakref.ref(owner)
            self.value = value
    
        def __str__(self):
            # return "%s's data, value is %s" % (self.owner, self.value)
            return "%s's data, value is %s" % (self.owner(), self.value)
    
        def __del__(self):
            print('In Data.__del__')
    
    
    class Node(object):
        def __init__(self, value):
            # Node类和Data类循环引用
            self.data = Data(value, self)
    
        def __del__(self):
            print('In Node.__del__')
    
    
    node = Node(100)
    del node
    
    # In Node.__del__
    # In Data.__del__
    

    如何通过实例方法名字的字符串调用方法

    实际案例:

    某项目中,我们的代码使用了三个不同库中的图形类:

    ​ Circle, Triangle, Rectangle

    他们都有一个获取图形面积的接口(方法),但接口名字不同。我们可以实现一个统一的获取面积的函数,使用每种方法名进行尝试,调用相应类的接口。

    解决方案:

    • 使用内置函数getattr,通过名字在实例上获取方法对象,然后调用。
    • 使用标准库operator下的methodcaller函数调用。

    案例代码:

    from math import pi
    from operator import methodcaller
    
    
    class Circle(object):
        def __init__(self, r):
            self.r = r
        def area(self):
            return self.r ** 2 * pi
        
        
    class Rectangle(object):
        def __init__(self, w, h):
            self.w = w
            self.h = h
        def get_area(self):
            return self.w * self.h
        
        
    class Triangle(object):
        def __init__(self, a, b ,c):
            self.a = a
            self.b = b
            self.c = c
        def getArea(self):
            a, b, c = self.a, self.b, self.c
            p = (a + b + c) / 2
            area = (p * (p - a) * (p - b) * (p - c)) ** 0.5
            return area
        
        
    def getArea(shape):
        for name in ('area', 'getArea', 'get_area'):
            f = getattr(shape, name, None)
            if f:
                return f()
            
            
    shape1 = Circle(2)
    shape2 = Rectangle(6, 4)
    shape3 = Triangle(3, 4, 5)
    shapes = [shape1, shape2, shape3]
    print(list(map(getArea, shapes)))
    
    # methodcaller函数的基本使用案例
    s = 'abc123abc123'
    s.find('abc', 4)
    # 6
    methodcaller('find', 'abc', 4)(s)
    # 6
    

    如何使用多线程

    实际案例:

    希望提高从互联网下载资源的速度

    解决方案:

    使用标准库threading.Thread创建线程,在每一个线程下载并处理数据。

    案例代码:

    # 使用线程类
    from threading import Thread
    import requests
    
    
    def download():
        url = 'http://baidu.com'
        response = requests.get(url, timeout=5)
        if response.ok:
            print('ok')
            return response.text
    
    
    for _ in range(10):
        t = Thread(target=download, args=())
        t.start()
        t.join()
    print('__main__ END')
    

    自定义线程类:

    from threading import Thread
    import requests
    
    
    class MyThread(Thread):
        def __init__(self):
            super().__init__()
    
        def run(self):
            self.download()
    
        @classmethod
        def download(cls):
            url = 'http://baidu.com'
            response = requests.get(url, timeout=5)
            if response.ok:
                print('ok')
                return response.text
    
    
    threads = []
    for i in range(1, 11):
        t = MyThread()
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    
    print('__main__ END')
    

    如何线程间通信

    实际案例:

    由于全局解释器锁的存在,多线程进程CPU密集型操作并不能提高执行效率,我们修改程序架构:

    1. 使用多个DownloadThread线程进行下载(I/O操作)
    2. 使用一次ConvertThread线程进行转换(CPU密集型操作)
    3. 下载线程把下载数据安全地传递给转换线程

    解决方案:

    使用标准库中的queue.Queue,它是一个线程安全的队列。Download线程把下载数据放入Covert线程从队列里提取数据。

    案例代码:

    from threading import Thread
    import requests
    from queue import Queue
    
    
    class DownloadThread(Thread):
        def __init__(self, sid, queue):
            super().__init__()
            self.sid = sid
            self.queue = queue
    
        @classmethod
        def download(cls, sid):
            url = 'http://baidu.com'
            sid = sid
            response = requests.get(url, timeout=5)
            if response.ok:
                print('ok')
                return response.text
    
        def run(self):
            print('DownloadThread', self.sid)
            data = self.download(self.sid)
            self.queue.put((self.sid, data))
    
    
    class ConverThread(Thread):
        def __init__(self, queue):
            super().__init__()
            self.queue = queue
    
        @classmethod
        # 模拟CPU运算
        def go(self):
            sum = 0
            for i in range(100):
                sum += I
            return sum
    
        def run(self):
            while True:
                sid, data = self.queue.get()
                print('ConverThread', sid)
                # 当队列中的sid值为-1的时候表示没有数据了,可以退出循环了
                if sid == -1:
                    break
            if data:
                self.go()
    
    
    q = Queue()
    dthreads = [DownloadThread(i, q) for i in range(1, 11)]
    cThread = ConverThread(q)
    
    for t in dthreads:
        t.start()
        t.join()
    cThread.start()
    # 当子线程结束后往队列中put sid 的值为-1
    q.put((-1, None))
    
    print('__main__ END')
    

    如何在线程间进行事件通知

    [图片上传失败...(image-4c542a-1555816204408)]

    解决方案:

    线程间的事件通知,可以使用标准库中的Threading.Event:

    1. 等待事件一端调用wait,等待事件。
    2. 通知事件一端调用set,通知事件。

    案例代码:

    import csv
    import os
    import tarfile
    from xml.etree.ElementTree import Element, ElementTree
    import requests
    from xml pretty import pretty
    from threading import Event, Thread
    
    
    class DownloadThread(Thread):
        def __init__(self, sid, queue):
            super().__init__()
            self.sid = sid
            self.url = 'http://table.fince.yahoo.com/table.csv?s=%s.sz'
            self.url %= str(sid).rjust(6, '0')
            self.queue = queue
    
        def download(cls, url):
            response = requests.get(url, timeout=3)
            if response.ok:
                return response.content
    
        def run(self):
            print('DownloadThread', self.sid)
            data = self.download(self.sid)
            self.queue.put((self.sid, data))
            
    
    class ConverThread(Thread):
        def __init__(self, queue, cEvent, tEvent):
            super().__init__()
            self.queue = queue
            self.cEvent = cEvent
            self.tEvent = tEvent
    
        def csv_to_xml(self):
            reader = csv.reader(scsv)
            headers = reader.__next__()
            headers = map(lambda h: h.replace(' ', ''), headers)
            
            root = Element('Data')
            for row in reader:
                eRow = Element('Row')
                root.append(eRow)
                for tag, text in zip(headers, row):
                    e = Element(tag)
                    e.text = text
                    eRow.append(e)
                    
                pretty(root)
                et.ElementTree(root)
                et.write(fxml)
        def run(self):
            while True:
                count = 0
                sid, data = self.queue.get()
                print('Convert', sid)
                if sid == -1:
                    self.cEvent.set()
                    self.tEvent.wait()
                    break
                if data:
                    fname = str(sid).rjust(6, '0') + '.xml'
                    with open(fname, 'wb') as wf:
                        self.csv_to_xml(data, wf)
                    count += 1
                    if count == 5:
                        self.cEvent.set()
                        self.tEvent.wait()
                        self.tEvent.clear()
                        count = 0
                
    class TarThread(Thread):
        def __init__(self, cEvent, tEvent):
            Thread.__init__()
            self.count = 0
            self.cEvent = cEvent
            self.tEvent = tEvent
            self.setDaemon(True)   # 设置守护线程,当其他线程退出的时候,其自动退出
    
    
        def tarXML(self):
            self.count += 1
            tfname = '%d.tgz' % self.count
            tf = tarfile.open(tfname, 'w:gz')
            for fname in os.listdir('.'):
                if fname in os.listdir('.'):
                    tf.add(fname)
                    os.remove(fname)
            tf.close()
    
            if not tf.members:
                os.remove(tfname)
        def run(self):
            while True:
                self.cEvent.wait()
                self.tarXML()
                self.cEvent.clear()
                
                self.tEvent.set()
    
        
    if __name__ == '__main__':
        q = Queue()
        dthreads = [DownloadThread(i, q) for i in range(1, 11)]
        cEvent = Event()
        tEvent = Event()
        
        cThread = ConverThread(q, cEvent, tEvent)
        tThread = TarThread(cEvent, tEvent)
        tThread.start()
        
        for t in dthreads:
            t.start()
            t.join()
        cThread.start()
        # 当子线程结束后往队列中put sid 的值为-1
        q.put((-1, None))
    
        print('__main__ END')
    

    如何使用线程本地数据

    实际案例:


    1530367458929.png 1530367143844.png

    案例代码:

    1530367216441.png 1530367263773.png 1530367287835.png 1530367343740.png 1530367365545.png 1530367391610.png

    如何使用线程池

    1530367506214.png 1530367685980.png 1530367757968.png

    在上面的代码基础上重写TCPServer类。

    如何使用多进程

    1530608741884.png

    如何使用函数装饰器

    实际案例:

    1530610615652.png

    案例代码:

    # 斐波拉契数列求值函数
    def fibonacci(n):
        if n <= 1:
            return 1
        return fibonacci(n - 1) + fibonacci(n - 2)
    
    # 对斐波拉契数列求值函数的优化
    def fibonacci(n, cache=None):
        if cache is None:
            cache = {}
    
        if n in cache:
            return cache[n]
    
        if n <= 1:
            return 1
        cache[n] = fibonacci(n - 1, cache) + fibonacci(n - 2, cache)
        return cache[n]
    
    # 使用装饰器实现对斐波拉契数列的优化
    def memo(func):
        cache = {}
    
        def wrap(*args):
            if args not in cache:
                print(args)
                cache[args] = func(*args)
            return cache[args]
        return wrap
    
    
    @memo
    def fibonacci(n):
        if n <= 1:
            return 1
        return fibonacci(n - 1) + fibonacci(n - 2)
    
    # 一共有10个台阶,一次只能迈1~3个台阶,且不能后退,求有几种方法
    def climb(n, steps):
        count = 0
        if n == 0:
            count = 1
        elif n > 0:
            for step in steps:
                count += climb(n - step, steps)
        return count
    
    # 装饰器缓存避免重复运算
    def memo(func):
        cache = {}
    
        def wrap(*args):
            if args not in cache:
                print(args)
                cache[args] = func(*args)
            return cache[args]
        return wrap
    
    
    @memo
    def climb(n, steps):
        count = 0
        if n == 0:
            count = 1
        elif n > 0:
            for step in steps:
                count += climb(n - step, steps)
        return count
    

    如何为被装饰的函数保存元数据

    函数的元数据:

    • __name__ 函数的名字,在define的时候写入到函数里面的
    • __doc__ 函数的文档字符串
    • _module_ 函数所属模块名
    • _defaults_ 保存函数的默认参数(关键字参数),默认以元组的形式
    • _closure_ 返回函数的闭包

    实际案例:

    1530611691604.png

    案例代码:

    from functools import update_wrapper, wraps, WRAPPER_ASSIGNMENTS, WRAPPER_UPDATES
    
    
    def memo(func):
        cache = {}
    
        # 方法二:使用wraps装饰器
        @wraps(func)
        def wrap(*args, *kargs):
            """wraper function"""
            # 方法一:使用update_wrapper函数
            update_wrapper(wrap, func, assigned=('__name__', '__doc__'), updated=('__dict__', ))
            # 也可以使用默认参数
            # assigned=WRAPPER_ASSIGNMENTS==('__module__', '__name__', '__qualname__', '__doc__', '__annotations__')
            # updated=WRAPPER_UPDATES==('__dict__',)
            if args not in cache:
                print(args)
                cache[args] = func(*args)
            return cache[args]
        return wrap
    
    
    @memo
    def climb(n, steps):
        """climb function"""
        count = 0
        if n == 0:
            count = 1
        elif n > 0:
            for step in steps:
                count += climb(n - step, steps)
        return count
    

    如何定义带参数的装饰器

    实际案例:

    1530613494146.png

    案例代码:

    from inspect import signature
    
    
    def f(a, b, c=1):
        pass
    
    
    sig = signature(f)
    print(sig.parameters)    # 获得函数参数的一个字典
    a = sig.parameters['a']
    b = sig.parameters['b']
    c = sig.parameters['c']
    # a.kind    # 参数类型
    # a.default   # 参数的默认值
    # 创建一个如上的字典
    bargs = sig.bind(str, int, int)   # 对所有参数做类型检查
    # bargs.arguments    # OrderedDict([('a', <class 'str'>), ('b', <class 'int'>), ('c', <class 'int'>)])
    bargs2 = sig.bind_partial()   # 对指定参数做类型检查
    
    from inspect import signature
    
    
    # 定义带参数的装饰器
    def typeassert(*ty_types, **ty_kwargs):
        def decorator(func):
            # func -> a, b
            # d = {'a': int, 'b': str}
            sig = signature(func)  # 获取func函数的签名
            btypes = sig.bind_partial(*ty_types, **ty_kwargs).arguments
    
            def wrapper(*args, **kwargs):
                # arg in d, isinstance(arg, d[arg])
                for name, obj in sig.bind(*args, **kwargs).arguments.items():
                    if name in btypes:
                        if not isinstance(obj, btypes[name]):
                            raise TypeError('{} must be {}'.format(name, btypes[name]))
                return func(*args, **kwargs)
    
            return wrapper
    
        return decorator
    
    
    # 测试代码
    @typeassert(int, str, list)
    def f(a, b, c):
        print(a, b, c)
    
    
    f(1, 'abc', [1, 2, 3])
    f(1, 2, [1, 2, 3])
    
    

    如何实现属性可修改的函数装饰器

    实际案例:

    1530682194530.png

    案例代码:

    from functools import wraps
    import time
    import logging
    from random import randint
    
    
    def wran(timeout):
        def decorator(func):
            def wrapper(*args, **kwargs):
                start = time.time()
                res = func(*args, **kwargs)
                used = time.time() - start
                if used > timeout:
                    msg = '%s: %s > %s' % (func.__name__, used, timeout)
                    logging.warn(msg)
                return res
            # 动态的修改timeout的值
    
            def set_timeout(k):
                nonlocal timeout
                timeout = k
            wrapper.set_timeout = set_timeout
            return wrapper
        return decorator
    
    
    @wran(1.5)
    def test():
        print('In test')
        while randint(0, 1):
            time.sleep(0.5)
    
    
    for _ in range(30):
        test()
    # 将timeout的值设置为1
    test.set_timeout(1)
    for _ in range(30):
        test()
    

    相关文章

      网友评论

          本文标题:Python编程技巧整理

          本文链接:https://www.haomeiwen.com/subject/tppdgqtx.html