附录 E 习题解答

E.1 第1章“Python初探”

(1) 如果你还没有安装 Python 3,现在就立刻动手。具体方法请阅读附录 D。

(2) 启动 Python 3 交互式解释器。再说一次,具体方法请阅读附录 D。它会打印出几行信息和一行 >>>,这是你输入 Python 命令的提示符。

下面是在我的 MacBook Pro 上显示的内容:

  1. $ python
  2. Python 3.3.0 (v3.3.0:bd8afb90ebf2, Sep 29 2012, 01:25:11)
  3. [GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin
  4. Type "help", "copyright", "credits" or "license" for more information.
  5. >>>

(3) 随便玩玩解释器。可以用它来计算 8 * 9。按下回车来查看结果,Python 应该会打印出 72

  1. >>> 8 * 9
  2. 72

(4) 输入数字 47 并按下回车,解释器有没有在下一行打印出 47

  1. >>> 47
  2. 47

(5) 现在输入 print(47) 并按下回车,解释器有没有在下一行打印出 47

  1. >>> print(47)
  2. 47

E.2 第2章“Python基本元素:数字、字符串和变量”

(1) 一个小时有多少秒?这里,请把交互式解释器当作计算器使用,将每分钟的秒数(60)乘以每小时的分钟数(60)得到结果。

  1. >>> 60 * 60
  2. 3600

(2) 将上一个练习得到的结果(每小时的秒数)赋值给名为 seconds_per_hour 的变量。

  1. >>> seconds_per_hour = 60 * 60
  2. >>> seconds_per_hour
  3. 3600

(3) 一天有多少秒?用你的 seconds_per_hour 变量进行计算。

  1. >>> seconds_per_hour * 24
  2. 86400

(4) 再次计算每天的秒数,但这一次将结果存储在名为 seconds_per_day 的变量中。

  1. >>> seconds_per_day = seconds_per_hour * 24
  2. >>> seconds_per_day
  3. 86400

(5) 用 seconds_per_day 除以 seconds_per_hour,使用浮点除法(/)。

  1. >>> seconds_per_day / seconds_per_hour
  2. 24.0

(6) 用 seconds_per_day 除以 seconds_per_hour,使用整数除法(//)。除了末尾的 .0,本练习所得结果是否与前一个练习用浮点数除法得到的结果一致?

  1. >>> seconds_per_day // seconds_per_hour
  2. 24

E.3 第3章“Python容器:列表、元组、字典与集合”

(1) 创建一个叫作 years_list 的列表,存储从你出生的那一年到你五岁那一年的年份。例如,如果你是 1980 年出生的,那么你的列表应该是 years_list = [1980, 1981, 1982, 1983, 1984, 1985]

假设你出生在 1980 年,输入如下所示:

  1. >>> years_list = [1980, 1981, 1982, 1983, 1984, 1985]

(2) 在 years_list 中,哪一年是你三岁生日那年?别忘了,你出生的第一年算 0 岁。

你需要的偏移量为 3,如果你出生在 1980 年,那么:

  1. >>> years_list[3]
  2. 1983

(3) 在 years_list 中,哪一年你的年纪最大?

你需要得到列表中的最后一项,因此使用偏移量 -1,或者你也可以使用偏移量 5,因为你已经提前知道列表中有 6 项,但是 -1 返回任何大小列表的最后一项。对于一个 1980 年代的人:

  1. >>> years_list[-1]
  2. 1985

(4) 创建一个名为 things 的列表,包含以下三个元素:"mozzarella""cinderella""salmonella"

  1. >>> things = ["mozzarella", "cinderella", "salmonella"]
  2. >>> things
  3. ['mozzarella', 'cinderella', 'salmonella']

(5) 将 things 中代表人名的字符串变成首字母大写形式,并打印整个列表。看看列表中的元素改变了么?

下面的方法实现了单词首字母大写,但是没有在列表中改变它:

  1. >>> things[1].capitalize()
  2. 'Cinderella'
  3. >>> things
  4. ['mozzarella', 'cinderella', 'salmonella']

如果你想在列表中改变它,应该将它重新赋值回列表:

  1. >>> things[1] = things[1].capitalize()
  2. >>> things
  3. ['mozzarella', 'Cinderella', 'salmonella']

(6) 将 things 中代表奶酪的元素全部改成大写,并打印整个列表。

  1. >>> things[0] = things[0].upper()
  2. >>> things
  3. ['MOZZARELLA', 'Cinderella', 'salmonella']

(7) 将代表疾病的元素从 things 中删除,收好你因此得到的诺贝尔奖,并打印列表。

按照值删掉它:

  1. >>> things.remove("salmonella")
  2. >>> things
  3. ['MOZZARELLA', 'Cinderella']

因为它是列表中的最后一项,所以下面的方法也是可行的:

  1. >>> del things[-1]

或者从列表开始处按照偏移量删掉它:

  1. >>> del things[2]

(8) 创建一个名为 surprise 的列表,包含以下三个元素:"Groucho""Chico""Harpo"

  1. >>> surprise = ['Groucho', 'Chico', 'Harpo']
  2. >>> surprise
  3. ['Groucho', 'Chico', 'Harpo']

(9) 将 surprise 列表的最后一个元素变成小写,翻转过来,再将首字母变成大写。

  1. >>> surprise[-1] = surprise[-1].lower()
  2. >>> surprise[-1] = surprise[-1][::-1]
  3. >>> surprise[-1].capitalize()
  4. 'Oprah'

(10) 创建一个名为 e2f 的英法字典并打印出来。这里提供一些单词对:dogchiencatchat 以及 walrusmorse

  1. >>> e2f = {'dog': 'chien', 'cat': 'chat', 'walrus': 'morse'}
  2. >>> e2f
  3. {'cat': 'chat', 'walrus': 'morse', 'dog': 'chien'}

(11) 使用你的仅包含三个词的字典 e2f 查询并打印出 walrus 对应的法语词。

  1. >>> e2f['walrus']
  2. 'morse'

(12) 利用 e2f 创建一个名为 f2e 的法英字典。注意要使用 items 方法。

  1. >>> f2e = {}
  2. >>> for english, french in e2f.items():
  3. f2e[french] = english
  4. >>> f2e
  5. {'morse': 'walrus', 'chien': 'dog', 'chat': 'cat'}

(13) 使用 f2e,查询并打印法语词 chien 对应的英文词。

  1. >>> f2e['chien']
  2. 'dog'

(14) 创建并打印由 e2f 的键组成的英语单词集合。

  1. >>> set(e2f.keys())
  2. {'cat', 'walrus', 'dog'}

(15) 建立一个名为 life 的多级字典,将下面这些字符串作为顶级键:'animals''plants' 以及 'others'。令 'animals' 键指向另一个字典,这个字典包含键 'cats''octopi' 以及 'emus'。令 'cats' 键指向一个字符串列表,这个列表包括 'Henri''Grumpy''Lucy'。让其余的键都指向空字典。

这是一道比较难的题,如果第一眼看到,不要觉得不舒服:

  1. >>> life = {
  2. ... 'animals': {
  3. ... 'cats': [
  4. ... 'Henri', 'Grumpy', 'Lucy'
  5. ... ],
  6. ... 'octopi': {},
  7. ... 'emus': {}
  8. ... },
  9. ... 'plants': {},
  10. ... 'other': {}
  11. ... }
  12. >>>

(16) 打印 life 的顶级键。

  1. >> print(life.keys())
  2. dict_keys(['animals', 'other', 'plants'])

Python 3 包含 dict_keys 的项,把它作为普通的列表打印输出:

  1. >>> print(list(life.keys()))
  2. ['animals', 'other', 'plants']

顺便提一句,在代码中多加空格可以提高可读性:

  1. >>> print ( list ( life.keys() ) )
  2. ['animals', 'other', 'plants']

(17)打印 life['animals'] 的全部键。

  1. >>> print(life['animals'].keys())
  2. dict_keys(['cats', 'octopi', 'emus'])

(18)打印 life['animals']['cats'] 的值。

  1. >>> print(life['animals']['cats'])
  2. ['Henri', 'Grumpy', 'Lucy']

E.4 第4章“Python外壳:代码结构”

(1) 将 7 赋值给变量 guess_me,然后写一段条件判断(ifelseelif)的代码:如果 guess_me 小于 7 输出 'too low',大于 7 则输出 'too high',等于 7 则输出 'just right'

  1. guess_me = 7
  2. if guess_me < 7:
  3. print('too low')
  4. elif guess_me > 7:
  5. print('too high')
  6. else:
  7. print('just right')

执行这段代码得到如下结果:

  1. just right

(2) 将 7 赋值给变量 guess_me,再将 1 赋值给变量 start。写一段 while 循环代码比较 startguess_me:如果 start 小于 guess_me 则输出 'too low',如果等于则输出 'found it',如果大于则输出 'oops',然后终止循环。在每次循环结束时自增 start

  1. guess_me = 7
  2. start = 1
  3. while True:
  4. if start < guess_me:
  5. print('too low')
  6. elif start == guess_me:
  7. print('found it!')
  8. break
  9. elif start > guess_me:
  10. print('oops')
  11. break
  12. start += 1

如果代码正确执行,结果如下所示:

  1. too low
  2. too low
  3. too low
  4. too low
  5. too low
  6. too low
  7. found it!

注意 elif start > guess_me:这一行可以只用简单的 else:,因为 start 不小于等于 guess_me 即大于,至少在这是对的。

(3) 使用 for 循环输出列表 [3, 2, 1, 0] 的值。

  1. >>> for value in [3, 2, 1, 0]:
  2. ... print(value)
  3. ...
  4. 3
  5. 2
  6. 1
  7. 0

(4) 使用列表推导生成 10 以内(range(10))偶数的列表。

  1. >>> even = [number for number in range(10) if number % 2 == 0]
  2. >>> even
  3. [0, 2, 4, 6, 8]

(5) 使用字典推导创建字典 squares。把 0~9 内的整数作为键,每个键的平方作为对应的值。

  1. >>> squares = {key: key*key for key in range(10)}
  2. >>> squares
  3. {0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64, 9: 81}

(6) 使用集合推导创建集合 odd,包含 0~9 内(range(10))的奇数。

  1. >>> odd = {number for number in range(10) if number % 2 == 1}
  2. >>> odd
  3. {1, 3, 9, 5, 7}

(7) 使用生成器推导返回字符串 'Got ' 和 0~9 内的一个整数,使用 for 循环进行迭代。

  1. >> for thing in ('Got %s' % number for number in range(10)):
  2. ... print(thing)
  3. ...
  4. Got 0
  5. Got 1
  6. Got 2
  7. Got 3
  8. Got 4
  9. Got 5
  10. Got 6
  11. Got 7
  12. Got 8
  13. Got 9

(8) 定义函数 good():返回列表 ['Harry', 'Ron', 'Hermione']

  1. >>> def good():
  2. ... return ['Harry', 'Ron', 'Hermione']
  3. ...
  4. >>> good()
  5. ['Harry', 'Ron', 'Hermione']

(9) 定义一个生成器函数 get_odds():返回 0~9 内的奇数。使用 for 循环查找并输出返回的第三个值。

  1. >>> def get_odds():
  2. ... for number in range(1, 10, 2):
  3. ... yield number
  4. ...
  5. >>> for count, number in enumerate(get_odds(), 1):
  6. ... if count == 3:
  7. ... print("The third odd number is", number)
  8. ... break
  9. ...
  10. The third odd number is 5

(10) 定义一个装饰器 test:当一个函数被调用时输出 'start',当函数结束时输出 'end'

  1. >>> def test(func):
  2. ... def new_func(*args, **kwargs):
  3. ... print('start')
  4. ... result = func(*args, **kwargs)
  5. ... print('end')
  6. ... return result
  7. ... return new_func
  8. ...
  9. >>>
  10. >>> @test
  11. ... def greeting():
  12. ... print("Greetings, Earthling")
  13. ...
  14. >>> greeting()
  15. start
  16. Greetings, Earthling
  17. end

(11) 定义一个异常 OopsException:编写代码捕捉该异常,并输出 'Caught an oops'

  1. >>> class OopsException(Exception):
  2. ... pass
  3. ...
  4. >>> raise OopsException()
  5. Traceback (most recent call last):
  6. File "<stdin>", line 1, in <module>
  7. __main__.OopsException
  8. >>>
  9. >>> try:
  10. ... raise OopsException
  11. ... except OopsException:
  12. ... print('Caught an oops')
  13. ...
  14. Caught an oops

(12) 使用函数 zip() 创建字典 movies:匹配两个列表 titles = ['Creature of Habit', 'Crewel Fate']plots = ['A nun turns into a monster', 'A haunted yarn shop']

  1. >>> titles = ['Creature of Habit', 'Crewel Fate']
  2. >>> plots = ['A nun turns into a monster', 'A haunted yarn shop']
  3. >>> movies = dict(zip(titles, plots))
  4. >>> movies
  5. {'Crewel Fate': 'A haunted yarn shop', 'Creature of Habit': 'A nun turns
  6. into a monster'}

E.5 第5章“Python盒子:模块、包和程序”

(1) 创建文件 zoo.py。在文件中定义函数 hours:输出字符串 'Open 9-5 daily'。然后使用交互式解释器导入模块 zoo,调用函数 hours。下面是文件 zoo.py:

  1. def hours():
  2. print('Open 9-5 daily')

现在,在解释器中导入它:

  1. >>> import zoo
  2. >>> zoo.hours()
  3. Open 9-5 daily

(2) 在交互式解释器,把模块 zoo 作为 menagerie 导入,然后调用函数 hours()

  1. >>> import zoo as menagerie
  2. >>> menagerie.hours()
  3. Open 9-5 daily

(3) 继续在解释器中,直接从模块 zoo 导入函数 hours(),然后调用。

  1. >>> from zoo import hours
  2. >>> hours()
  3. Open 9-5 daily

(4) 把函数 hours() 作为 info 导入,然后调用它。

  1. >>> from zoo import hours as info
  2. >>> info()
  3. Open 9-5 daily

(5) 创建字典 plain:包含键值对 'a':1'b':2'c':3,然后输出它。

  1. >>> plain = {'a': 1, 'b': 2, 'c': 3}
  2. >>> plain
  3. {'a': 1, 'c': 3, 'b': 2}

(6) 创建有序字典 fancy:键值对和 (5) 相同,然后输出它。输出顺序和 plain 相同吗?

  1. >>> from collections import OrderedDict
  2. >>> fancy = OrderedDict([('a', 1), ('b', 2), ('c', 3)])
  3. >>> fancy
  4. OrderedDict([('a', 1), ('b', 2), ('c', 3)])

(7) 创建默认字典 dict_of_lists,传入参数 list:给 dict_of_lists['a'] 赋值 'something for a',输出 dict_of_lists['a'] 的值。

  1. >>> from collections import defaultdict
  2. >>> dict_of_lists = defaultdict(list)
  3. >>> dict_of_lists['a'].append('something for a')
  4. >>> dict_of_lists['a']
  5. ['something for a']

E.6 第6章“对象和类”

(1) 创建一个名为 Thing 的空类并将它打印出来。接着创建一个属于该类的对象 example,同样将它打印出来。看看这两次打印结果是一样的还是不同的?

  1. >>> class Thing:
  2. ... pass
  3. ...
  4. >>> print(Thing)
  5. <class '__main__.Thing'>
  6. >>> example = Thing()
  7. >>> print(example)
  8. <__main__.Thing object at 0x1006f3fd0>

(2) 创建一个新类 Thing2,将 'abc' 赋值给类特性 letters,打印 letters

  1. >>> class Thing2:
  2. ... letters = 'abc'
  3. ...
  4. >>> print(Thing2.letters)
  5. abc

(3) 再创建一个新类 Thing3。这次将 'xyz' 赋值给实例(对象)特性 letters,并打印 letters。看看你是不是必须先创建一个对象才可以进行打印操作?

  1. >>> class Thing3:
  2. ... def __init__(self):
  3. ... self.letters = 'xyz'
  4. ...

变量 letters 属于类 Thing3 的任何对象,而不是 Thing3 类本身:

  1. >>> print(Thing3.letters)
  2. Traceback (most recent call last):
  3. File "<stdin>", line 1, in <module>
  4. AttributeError: type object 'Thing3' has no attribute 'letters'
  5. >>> something = Thing3()
  6. >>> print(something.letters)
  7. xyz

(4) 创建一个名为 Element 的类,它包含实例属性 namesymbolnumber。使用 'Hydrogen''H'1 实例化一个对象 hydrogen

  1. >>> class Element:
  2. ... def __init__(self, name, symbol, number):
  3. ... self.name = name
  4. ... self.symbol = symbol
  5. ... self.number = number
  6. ...
  7. >>> hydrogen = Element('Hydrogen', 'H', 1)

(5) 创建一个字典,包含这些键值对:'name': 'Hydrogen''symbol': 'H''number': 1。然后用这个字典实例化 Element 类的对象 hydrogen

首先创建该字典:

  1. >>> el_dict = {'name': 'Hydrogen', 'symbol': 'H', 'number': 1}

虽然会编写较多代码,但这是可行的:

  1. >>> hydrogen = Element(el_dict['name'], el_dict['symbol'], el_dict['number'])

检查一下实例化的结果:

  1. >>> hydrogen.name
  2. 'Hydrogen'

然而,你可以直接从字典初始化对象,因为它的键名称是和 __init__ 参数相匹配的(参考第 4 章关于关键字参数的讨论):

  1. >>> hydrogen = Element(**el_dict)
  2. >>> hydrogen.name
  3. 'Hydrogen'

(6) 为 Element 类定义个 dump() 方法,用于打印对象的属性(namesymbolnumber)。使用这个新类创建一个对象 hydrogen 并用 dump() 打印。

  1. >>> class Element:
  2. ... def __init__(self, name, symbol, number):
  3. ... self.name = name
  4. ... self.symbol = symbol
  5. ... self.number = number
  6. ... def dump(self):
  7. ... print('name=%s, symbol=%s, number=%s' %
  8. ... (self.name, self.symbol, self.number))
  9. ...
  10. >>> hydrogen = Element(**el_dict)
  11. >>> hydrogen.dump()
  12. name=Hydrogen, symbol=H, number=1

(7) 调用 print(hydrogen),然后修改 Element 的定义,将 dump 方法的名字改为 __str__。再次创建一个 hydrogen 对象并调用 print(hydrogen),观察输出结果。

  1. >>> print(hydrogen)
  2. <__main__.Element object at 0x1006f5310>
  3. >>> class Element:
  4. ... def __init__(self, name, symbol, number):
  5. ... self.name = name
  6. ... self.symbol = symbol
  7. ... self.number = number
  8. ... def __str__(self):
  9. ... return ('name=%s, symbol=%s, number=%s' %
  10. ... (self.name, self.symbol, self.number))
  11. ...
  12. >>> hydrogen = Element(**el_dict)
  13. >>> print(hydrogen)
  14. name=Hydrogen, symbol=H, number=1

__str__() 是 Python 的一个魔术方法,print 函数调用一个对象的 __str__() 方法获取它的字符串表示。如果类中没有定义 __str__() 方法,它会采用父类的默认方法,返回类似于 <__main__.Element object at 0x1006f5310> 的一个字符串。

(8) 修改 Element 使得 namesymbolnumber 特性都变成私有的。为它们各定义一个 getter 属性(property)来返回各自的值。

  1. >>> class Element:
  2. ... def __init__(self, name, symbol, number):
  3. ... self.__name = name
  4. ... self.__symbol = symbol
  5. ... self.__number = number
  6. ... @property
  7. ... def name(self):
  8. ... return self.__name
  9. ... @property
  10. ... def symbol(self):
  11. ... return self.__symbol
  12. ... @property
  13. ... def number(self):
  14. ... return self.__number
  15. ...
  16. >>> hydrogen = Element('Hydrogen', 'H', 1)
  17. >>> hydrogen.name
  18. 'Hydrogen'
  19. >>> hydrogen.symbol
  20. 'H'
  21. >>> hydrogen.number
  22. 1

(9) 定义三个类:BearRabbitOctothorpe。对每个类都只定义一个方法 eats(),分别返回 'berries'(Bear)'clover'(Rabbit)'campers'(Octothorpo)。为每个类创建一个对象并输出它们各自吃的食物(调用 eats())。

  1. >> class Bear:
  2. ... def eats(self):
  3. ... return 'berries'
  4. ...
  5. >>> class Rabbit:
  6. ... def eats(self):
  7. ... return 'clover'
  8. ...
  9. >>> class Octothorpe:
  10. ... def eats(self):
  11. ... return 'campers'
  12. ...
  13. >>> b = Bear()
  14. >>> r = Rabbit()
  15. >>> o = Octothorpe()
  16. >>> print(b.eats())
  17. berries
  18. >>> print(r.eats())
  19. clover
  20. >>> print(o.eats())
  21. campers

(10) 定义三个类:LaserClaw 以及 SmartPhone。每个类都仅有一个方法 does(),分别返回 'disintegrate'(Laser)'cursh'(Claw) 以及 'ring'(SmartPhone)。接着定义 Robot 类,包含上述三个类的实例(对象)各一个。给 Robot 定义 does() 方法,用于输出它各部分的功能。

  1. >>> class Laser:
  2. ... def does(self):
  3. ... return 'disintegrate'
  4. ...
  5. >>> class Claw:
  6. ... def does(self):
  7. ... return 'crush'
  8. ...
  9. >>> class SmartPhone:
  10. ... def does(self):
  11. ... return 'ring'
  12. ...
  13. >>> class Robot:
  14. ... def __init__(self):
  15. ... self.laser = Laser()
  16. ... self.claw = Claw()
  17. ... self.smartphone = SmartPhone()
  18. ... def does(self):
  19. ... return '''I have many attachments:
  20. ... My laser, to %s.
  21. ... My claw, to %s.
  22. ... My smartphone, to %s.''' % (
  23. ... self.laser.does(),
  24. ... self.claw.does(),
  25. ... self.smartphone.does() )
  26. ...
  27. >>> robbie = Robot()
  28. >>> print( robbie.does() )
  29. I have many attachments:
  30. My laser, to disintegrate.
  31. My claw, to crush.
  32. My smartphone, to ring.

E.7 第7章“像高手一样玩转数据”

(1) 创建一个 Unicode 字符串 mystery 并将它的值设为 '\U0001f4a9'。打印 mystery,并查看 mystery 的 Unicode 名称。

  1. >>> import unicodedata
  2. >>> mystery = '\U0001f4a9'
  3. >>> mystery
  4. ' '
  5. >>> unicodedata.name(mystery)
  6. 'PILE OF POO'

它们还会变成什么呢?

(2) 使用 UTF-8 对 mystery 进行编码,存入字节型变量 pop_bytes,并将它打印出来。

  1. >>> pop_bytes = mystery.encode('utf-8')
  2. >>> pop_bytes
  3. b'\xf0\x9f\x92\xa9'

(3) 使用 UTF-8 对 pop_bytes 进行解码,存入字符串型变量 pop_string,并将它打印出来,看看它与 mystery 是否一致?

  1. >>> pop_string = pop_bytes.decode('utf-8')
  2. >>> pop_string
  3. ' '
  4. >>> pop_string == mystery
  5. True

(4) 使用旧式格式化方法生成下面的诗句,把 'roast beef''ham''head' 以及 'clam' 依次插入字符串:

  1. My kitty cat likes %s,
  2. My kitty cat likes %s,
  3. My kitty cat fell on his %s
  4. And now thinks he's a %s.
  5. >>> poem = '''
  6. ... My kitty cat likes %s,
  7. ... My kitty cat likes %s,
  8. ... My kitty cat fell on his %s
  9. ... And now thinks he's a %s.
  10. ... '''
  11. >>> args = ('roast beef', 'ham', 'head', 'clam')
  12. >>> print(poem % args)
  13. My kitty cat likes roast beef,
  14. My kitty cat likes ham,
  15. My kitty cat fell on his head
  16. And now thinks he's a clam.

(5) 使用新式格式化方法生成下面的套用信函,将下面的字符串存储为 letter(后面的练习中会用到):

  1. Dear {salutation} {name},
  2. Thank you for your letter. We are sorry that our {product} {verbed} in your {room}. Please note that it should never be used in a {room}, especially near any {animals}.
  3. Send us your receipt and {amount} for shipping and handling. We will send you another {product} that, in our tests, is {percent}% less likely to have {verbed}.
  4. Thank you for your support.
  5. Sincerely,
  6. {spokesman}
  7. {job_title}
  8. >>> letter = '''
  9. ... Dear {salutation} {name},
  10. ...
  11. ... Thank you for your letter. We are sorry that our {product} {verb} in your
  12. ... {room}. Please note that it should never be used in a {room}, especially
  13. ... near any {animals}.
  14. ...
  15. ... Send us your receipt and {amount} for shipping and handling. We will send
  16. ... you another {product} that, in our tests, is {percent}% less likely to
  17. ... have {verbed}.
  18. ...
  19. ... Thank you for your support.
  20. ...
  21. ... Sincerely,
  22. ... {spokesman}
  23. ... {job_title}
  24. ... '''

(6) 创建一个字典 response,包含以下键:'salutaion''name''product'verved(动词过去式)、'room''animals''percent''spokesman' 以及 'job_title'。设定这些键对应的值,并打印由 response 的值填充的 letter

  1. >>> response = {
  2. ... 'salutation': 'Colonel',
  3. ... 'name': 'Hackenbush',
  4. ... 'product': 'duck blind',
  5. ... 'verbed': 'imploded',
  6. ... 'room': 'conservatory',
  7. ... 'animals': 'emus',
  8. ... 'amount': '$1.38',
  9. ... 'percent': '1',
  10. ... 'spokesman': 'Edgar Schmeltz',
  11. ... 'job_title': 'Licensed Podiatrist'
  12. ... }
  13. ...
  14. >>> print( letter.format(**response) )
  15. Dear Colonel Hackenbush,
  16. Thank you for your letter. We are sorry that our duck blind imploded in your conservatory. Please note that it should never be used in a conservatory, especially near any emus.
  17. Send us your receipt and $1.38 for shipping and handling. We will send you another duck blind that, in our tests, is 1% less likely to have imploded.
  18. Thank you for your support.
  19. Sincerely,
  20. Edgar Schmeltz
  21. Licensed Podiatrist

(7) 正则表达式在处理文本上非常方便。在这个练习中,我们会对示例文本尝试做各种各样的操作。我们示例文本是一首名为 Ode on the Mammoth Cheese 的诗,它的作者是 James McIntyre,写于 1866 年,出于对当时安大略湖手工制造的 7000 磅的巨型奶酪的敬意,它当时甚至在全球巡回展出。如果你不愿意自己一字一句敲出来,直接百度一下粘贴到你的 Python 代码里即可。你也可以从 Project Gutenberg(http://www.gutenberg.org/ebooks/36068?msg=welcome_stranger)找到。我们将这个字符串命名为 mammoth

  1. >>> mammoth = '''
  2. ... We have seen thee, queen of cheese,
  3. ... Lying quietly at your ease,
  4. ... Gently fanned by evening breeze,
  5. ... Thy fair form no flies dare seize.
  6. ...
  7. ... All gaily dressed soon you'll go
  8. ... To the great Provincial show,
  9. ... To be admired by many a beau
  10. ... In the city of Toronto.
  11. ...
  12. ... Cows numerous as a swarm of bees,
  13. ... Or as the leaves upon the trees,
  14. ... It did require to make thee please,
  15. ... And stand unrivalled, queen of cheese.
  16. ...
  17. ... May you not receive a scar as
  18. ... We have heard that Mr. Harris
  19. ... Intends to send you off as far as
  20. ... The great world's show at Paris.
  21. ...
  22. ... Of the youth beware of these,
  23. ... For some of them might rudely squeeze
  24. ... And bite your cheek, then songs or glees
  25. ... We could not sing, oh! queen of cheese.
  26. ...
  27. ... We'rt thou suspended from balloon,
  28. ... You'd cast a shade even at noon,
  29. ... Folks would think it was the moon
  30. ... About to fall and crush them soon.
  31. ... '''

(8) 引入 re 模块以便使用正则表达式相关函数。使用 re.findall() 打印出所有以 'c' 开头的单词。

首先对所要匹配的模式定义变量 pat,然后在 mammoth 中查找:

  1. >>> import re
  2. >>> re = r'\bc\w*'
  3. >>> re.findall(pat, mammoth)
  4. ['cheese', 'city', 'cheese', 'cheek', 'could', 'cheese', 'cast', 'crush']

\b 代表以单词之间的分隔符作为开始,使用它一般用于指定单词的开始或者结束,字母 c 是我们要查找单词的首字母。\w 代表任意单词字符(包括字母、数字和下划线)。* 表示一个或者多个字符。综合起来,它用来查找以字母 c 开头的单词,包括 'c' 本身。如果你不使用原始字符串(在第一个引号前加 r),Python 会把 \b 解释为退格字符,查找会神奇地挂掉:

  1. >>> pat = '\bc\w*'
  2. >>> re.findall(pat, mammoth)
  3. []

(9) 找到所有以 c 开头的 4-字母单词。

  1. >>> pat = r'\bc\w{3}\b'
  2. >>> re.findall(pat, mammoth)
  3. ['city', 'cast']

你需要最后的 \b 来指明单词的结束。否则,你会得到所有以 c 开头并且至少有四个字母的单词的前四个字母:

  1. >>> pat = r'\bc\w{3}'
  2. >>> re.findall(pat, mammoth)
  3. ['chee', 'city', 'chee', 'chee', 'coul', 'chee', 'cast', 'crus']

(10) 找到所有以 r 结尾的单词。

下面代码使用要小心,对于以 r 结尾的单词会得到完美的结果:

  1. >>> pat = r'\b\w*r\b'
  2. >>> re.findall(pat,mammoth)
  3. ['your', 'fair', 'Or', 'scar', 'Mr', 'far', 'For', 'your', 'or']

然而,用在以 l 结尾的单词结果就不好:

  1. >>> pat = r'\b\w*l\b'
  2. >>> re.findall(pat,mammoth)
  3. ['All', 'll', 'Provincial', 'fall']

但是,ll 为什么出现在那儿? \w 仅仅匹配到字母、数字和下划线,不会匹配到 ASCII 中的撇号 (')。所以,它会从 you'll 抽取到最后的 ll。解决该问题可以把撇号加到要匹配的字符集合。第一次这样做失败了:

  1. >>> >>> pat = r'\b[\w']*l\b'
  2. File "<stdin>", line 1
  3. pat = r'\b[\w']*l\b'

Python 指到了错误附近的位置,但仍然需要花费一段时间发现模式串被两个引号(撇号)同时包括,一个解决方法是加转移字符:

  1. >>> pat = r'\b[\w\']*l\b'
  2. >>> re.findall(pat, mammoth)
  3. ['All', "you'll", 'Provincial', 'fall']

另一种方法是给模式串加双引号:

  1. >>> pat = r"\b[\w']*l\b"
  2. >>> re.findall(pat, mammoth)
  3. ['All', "you'll", 'Provincial', 'fall']

(11) 找到所有包含且仅包含连续 3 个元音的单词。

开始匹配时是一个单词边界符,然后任意数目的字母、三个连续的元音,接下来是任意数目的非元音字符直到单词结束:

  1. >>> pat = r'\b\w*[aeiou]{3}[^aeiou]\w*\b'
  2. >>> re.findall(pat, mammoth)
  3. ['queen', 'quietly', 'beau\nIn', 'queen', 'squeeze', 'queen']

上面的匹配看起来是对的,除了字符串 'beau\nIn'。把 mammoth 作为多行的字符串进行搜索,[^aeiou] 匹配任何非元音字符包括换行符。所以要把一些间隔字符加入到忽略集合,例如 \n\s 匹配到间隔字符):

  1. >>> pat = r'\b\w*[aeiou]{3}[^aeiou\s]\w*\b'
  2. >>> re.findall(pat, mammoth)
  3. ['queen', 'quietly', 'queen', 'squeeze', 'queen']

但这一次没有搜索到 beau,所以还要对模式串进行变动,匹配到三个连续元音之后还要匹配任意数目的非元音。之前的模式串只匹配了一个非元音:

  1. >>> pat = r'\b\w*[aeiou]{3}[^aeiou\s]*\w*\b'
  2. >>> re.findall(pat, mammoth)
  3. ['queen', 'quietly', 'beau', 'queen', 'squeeze', 'queen']

上面所有的内容表明了什么?其中一点是:正则表达式可以完成很多事情,但正确使用它还要多加小心。

(12) 使用 unhexlify() 将下面的十六进制串(出于排版原因将它们拆成两行字符串)转换为 bytes 型变量,命名为 gif

  1. '47494638396101000100800000000000ffffff21f9' +
  2. '0401000000002c000000000100010000020144003b'
  3. >>> import binascii
  4. >>> hex_str = '47494638396101000100800000000000ffffff21f9' + \
  5. ... '0401000000002c000000000100010000020144003b'
  6. >>> gif = binascii.unhexlify(hex_str)
  7. >>> len(gif)
  8. 42

(13) gif 定义了一个 1 像素的透明 GIF 文件(最常见的图片格式之一)。合法的 GIF 文件开头由 GIF89a 组成,检测一下上面的 gif 是否为合法的 GIF 文件?

  1. >>> gif[:6] == b'GIF89a'
  2. True

注意,我们使用 b 来定义一个字节串而不是 Unicode 字符串,你可以在字节之间做比较,但是不能用字符串和字节比较:

  1. >>> gif[:6] == 'GIF89a'
  2. False
  3. >>> type(gif)
  4. <class 'bytes'>
  5. >>> type('GIF89a')
  6. <class 'str'>
  7. >>> type(b'GIF89a')
  8. <class 'bytes'>

(14) GIF 文件的像素宽度是一个 16 比特的以大端方案存储的整数,偏移量为 6 字节,高度数据的大小与之相同,偏移量为 8。从 gif 中抽取这些信息并打印出来,看看它们是否与预期的一样都为 1

  1. >>> import struct
  2. >>> width, height = struct.unpack('<HH', gif[6:10])
  3. >>> width, height
  4. (1, 1)

E.8 第8章“数据的归宿”

(1) 将字符串 'This is a test of the emergency text system' 赋给变量 test1,然后把它写到文件 test.txt。

  1. >>> test1 = 'This is a test of the emergency text system'
  2. >>> len(test1)
  3. 43

下面是如何使用 openwriteclose 函数实现题目要求:

  1. >>> outfile = open('test.txt', 'wt')
  2. >>> outfile.write(test1)
  3. 43
  4. >>> outfile.close()

或者直接使用 with,避免调用 close(Python 帮你实现):

  1. >>> with open('test.txt', 'wt') as outfile:
  2. ... outfile.write(test1)
  3. ...
  4. 43

(2) 打开文件 test.txt,读文件内容到字符串 test2test1test2 是一样的吗?

  1. >>> with open('test.txt', 'rt') as infile:
  2. ... test2 = infile.read()
  3. ...
  4. >>> len(test2)
  5. 43
  6. >>> test1 == test2
  7. True

(3) 保存这些文本到 test.csv 文件。注意,字段间是通过逗号隔开的,如果字段中含有逗号需要在整个字段加引号。

  1. author,book
  2. J R R Tolkien,The Hobbit
  3. Lynne Truss,"Eats, Shoots & Leaves"
  4. >>> text = '''author,book
  5. ... J R R Tolkien,The Hobbit
  6. ... Lynne Truss,"Eats, Shoots & Leaves"
  7. ... '''
  8. >>> with open('test.csv', 'wt') as outfile:
  9. ... outfile.write(text)
  10. ...
  11. 73

(4) 使用 csv 模块和它的 DictReader() 方法读取文件 test.csv 到变量 books。输出变量 books 的值。DictReader() 可以处理第二本书题目中的引号和逗号吗?

  1. >>> with open('test.csv', 'rt') as infile:
  2. ... books = csv.DictReader(infile)
  3. ... for book in books:
  4. ... print(book)
  5. ...
  6. {'book': 'The Hobbit', 'author': 'J R R Tolkien'}
  7. {'book': 'Eats, Shoots & Leaves', 'author': 'Lynne Truss'}

(5) 创建包含下面这些行的 CSV 文件 books.csv:

  1. title,author,year
  2. The Weirdstone of Brisingamen,Alan Garner,1960
  3. Perdido Street Station,China Miéville,2000
  4. Thud!,Terry Pratchett,2005
  5. The Spellman Files,Lisa Lutz,2007
  6. Small Gods,Terry Pratchett,1992
  7. >>> text = '''title,author,year
  8. ... The Weirdstone of Brisingamen,Alan Garner,1960
  9. ... Perdido Street Station,China Miéville,2000
  10. ... Thud!,Terry Pratchett,2005
  11. ... The Spellman Files,Lisa Lutz,2007
  12. ... Small Gods,Terry Pratchett,1992
  13. ... '''
  14. >>> with open('books.csv', 'wt') as outfile:
  15. ... outfile.write(text)
  16. ...
  17. 201

(6) 使用 sqlite3 模块创建一个 SQLite 数据库 books.db 以及包含字段 title(text)、author(text)以及 year(integer)的表单 book

  1. >>> import sqlite3
  2. >>> db = sqlite3.connect('books.db')
  3. >>> curs = db.cursor()
  4. >>> curs.execute('''create table book (title text, author text, year int)''')
  5. <sqlite3.Cursor object at 0x1006e3b90>
  6. >>> db.commit()

(7) 读取文件 books.csv,把数据插入到表单 book

  1. >>> import csv
  2. >>> import sqlite3
  3. >>> ins_str = 'insert into book values(?, ?, ?)'
  4. >>> with open('books.csv', 'rt') as infile:
  5. ... books = csv.DictReader(infile)
  6. ... for book in books:
  7. ... curs.execute(ins_str, (book['title'], book['author'], book['year']))
  8. ...
  9. <sqlite3.Cursor object at 0x1007b21f0>
  10. <sqlite3.Cursor object at 0x1007b21f0>
  11. <sqlite3.Cursor object at 0x1007b21f0>
  12. <sqlite3.Cursor object at 0x1007b21f0>
  13. <sqlite3.Cursor object at 0x1007b21f0>
  14. >>> db.commit()

(8) 选择表单 book 中的 title 列,并按照字母表顺序输出。

  1. >>> sql = 'select title from book order by title asc'
  2. >>> for row in db.execute(sql):
  3. ... print(row)
  4. ...
  5. ('Perdido Street Station',)
  6. ('Small Gods',)
  7. ('The Spellman Files',)
  8. ('The Weirdstone of Brisingamen',)
  9. ('Thud!',)

如果你只想输出 title 的值,不包含引号和逗号,试下这个方法:

  1. >>> for row in db.execute(sql):
  2. ... print(row[0])
  3. ...
  4. Perdido Street Station
  5. Small Gods
  6. The Spellman Files
  7. The Weirdstone of Brisingamen
  8. Thud!

如果你想排序时忽略掉题目开头的 'The',还需要加额外的 SQL 魔法(语句):

  1. >>> sql = '''select title from book order by
  2. ... case when (title like "The %") then substr(title, 5) else title end'''
  3. >>> for row in db.execute(sql):
  4. ... print(row[0])
  5. ...
  6. Perdido Street Station
  7. Small Gods
  8. The Spellman Files
  9. Thud!
  10. The Weirdstone of Brisingamen

(9) 选择表单 book 中所有的列,并按照出版顺序输出。

  1. >>> for row in db.execute('select * from book order by year'):
  2. ... print(row)
  3. ...
  4. ('The Weirdstone of Brisingamen', 'Alan Garner', 1960)
  5. ('Small Gods', 'Terry Pratchett', 1992)
  6. ('Perdido Street Station', 'China Miéville', 2000)
  7. ('Thud!', 'Terry Pratchett', 2005)
  8. ('The Spellman Files', 'Lisa Lutz', 2007)

为了打印输出表单 book 的每一行所有的字段,用逗号和空格把它们隔开:

  1. >>> for row in db.execute('select * from book order by year'):
  2. ... print(*row, sep=', ')
  3. ...
  4. The Weirdstone of Brisingamen, Alan Garner, 1960
  5. Small Gods, Terry Pratchett, 1992
  6. Perdido Street Station, China Miéville, 2000
  7. Thud!, Terry Pratchett, 2005
  8. The Spellman Files, Lisa Lutz, 2007

(10) 使用 sqlalchemy 模块连接到 sqlite3 数据库 books.db,按照 (8) 一样,选择表单 book 中的 title 列,并按照字母表顺序输出。

  1. >>> import sqlalchemy
  2. >>> conn = sqlalchemy.create_engine('sqlite:///books.db')
  3. >>> sql = 'select title from book order by title asc'
  4. >>> rows = conn.execute(sql)
  5. >>> for row in rows:
  6. ... print(row)
  7. ...
  8. ('Perdido Street Station',)
  9. ('Small Gods',)
  10. ('The Spellman Files',)
  11. ('The Weirdstone of Brisingamen',)
  12. ('Thud!',)

(11) 在你的计算机安装 Redis 服务器(参见附录 D)和 Python 的 redis 库(pip install redis)。创建一个 Redis 的哈希表 test,包含字段 count(1)name('Fester Bestertester'),输出 test 的所有字段。

  1. >>> import redis
  2. >>> conn = redis.Redis()
  3. >>> conn.delete('test')
  4. 1
  5. >>> conn.hmset('test', {'count': 1, 'name': 'Fester Bestertester'})
  6. True
  7. >>> conn.hgetall('test')
  8. {b'name': b'Fester Bestertester', b'count': b'1'}

(12) 自增 testcount 字段并输出它。

  1. >>> conn.hincrby('test', 'count', 3)
  2. 4
  3. >>> conn.hget('test', 'count')
  4. b'4'

E.9 第9章“剖析Web”

(1) 如果你还没有安装 flask,现在安装它。这样会自动安装 werkzeugjinja2 和其他包。

(2) 搭建一个网站框架,使用 Flask 的调试 / 代码重载来开发 Web 服务器。使用主机名 localhost 和默认端口 5000 来启动服务器。如果你电脑的 5000 端口已经被占用,使用其他端口。

下面是文件 flask1.py:

  1. from flask import Flask
  2. app = Flask(__name__)
  3. app.run(port=5000, debug=True)

开启 Web 服务器引擎:

  1. $ python flask1.py
  2. Running on http://127.0.0.1:5000/
  3. Restarting with reloader

(3) 添加一个 home() 函数来处理对于主页的请求,让它返回字符串 It's alive!

我们该如何调用 flask2.py 呢?

  1. from flask import Flask
  2. app = Flask(__name__)
  3. @app.route('/')
  4. def home():
  5. return "It's alive!"
  6. app.run(debug=True)

开启服务器:

  1. $ python flask2.py
  2. Running on http://127.0.0.1:5000/
  3. Restarting with reloader

最后通过浏览器或者命令行 HTTP 程序(例如 curlwget 甚至 telnet)进入主页 :

  1. $ curl http://localhost:5000/
  2. It's alive!

(4) 创建一个名为 home.html 的 Jinja2 模板文件,内容如下所示:

  1. I'm of course referring to {{thing}}, which is {{height}} feet tall and {{color}}.

创建名为 templates 的目录,在该目录下创建包含以上内容的文件 home.html。如果你的 Flask 服务器仍在运行中,它会检测到新的内容并自动重启。

(5) 修改 home() 函数,让它使用 home.html 模板。给模板传入三个 GET 参数:thingheightcolor

下面是文件 flask3.py:

  1. from flask import Flask, request, render_template
  2. app = Flask(__name__)
  3. @app.route('/')
  4. def home():
  5. thing = request.values.get('thing')
  6. height = request.values.get('height')
  7. color = request.values.get('color')
  8. return render_template('home.html',
  9. thing=thing, height=height, color=color)
  10. app.run(debug=True)

在 Web 客户端前往地址 http://localhost:5000/?thing=Octothorpe&height=7&color=green,你应该可以看到如下内容:

  1. I'm of course referring to Octothorpe, which is 7 feet tall and green.

E.10 第10章“系统”

(1) 把当前日期以字符串形式写入文本文件 today.txt。

  1. >>> from datetime import date
  2. >>> now = date.today()
  3. >>> now_str = now.isoformat()
  4. >>> with open('today', 'wt') as output:
  5. ... print(now_str, file=output)
  6. >>>

除了 print,你可以使用 output.write(now_str) 作为输出。使用 print 会在文件末尾增加一行空行。

(2) 从 today.txt 中读取字符串到 today_string 中。

  1. >>> with open('today', 'rt') as input:
  2. ... today_string = input.read()
  3. ...
  4. >>> today_string
  5. '2014-02-04\n'

(3) 从 today_string 中解析日期。

  1. >>> fmt = '%Y-%m-%d\n'
  2. >>> datetime.strptime(today_string, fmt)
  3. datetime.datetime(2014, 2, 4, 0, 0)

如果你在文件末尾写入空行,需要在格式字符串(format string)中匹配它。

(4) 列出当前目录下的文件。

如果你的当前目录为 ohmy,包含三个以动物命名的文件,结果可能是这样的:

  1. >>> import os
  2. >>> os.listdir('.')
  3. ['bears', 'lions', 'tigers']

(5) 列出父目录下的文件。

如果父目录包含两个文件和当前的 ohmy 目录,结果可能是这样的:

  1. >>> import os
  2. >>> os.listdir('..')
  3. ['ohmy', 'paws', 'whiskers']

(6) 使用 multiprocessing 创建三个独立的进程,每一个进程在 0 和 1 之间等待随机的时间,输出当前时间,然后终止进程。

保存下面代码到文件 multi_times.py:

  1. import multiprocessing
  2. def now(seconds):
  3. from datetime import datetime
  4. from time import sleep
  5. sleep(seconds)
  6. print('wait', seconds, 'seconds, time is', datetime.utcnow())
  7. if __name__ == '__main__':
  8. import random
  9. for n in range(3):
  10. seconds = random.random()
  11. proc = multiprocessing.Process(target=now, args=(seconds,))
  12. proc.start()
  13. $ python multi_times.py
  14. wait 0.4670532005508353 seconds, time is 2014-06-03 05:14:22.930541
  15. wait 0.5908421960431798 seconds, time is 2014-06-03 05:14:23.054925
  16. wait 0.8127669040699719 seconds, time is 2014-06-03 05:14:23.275767

(7) 创建一个你出生日的日期对象。

假设你出生在 1982 年 8 月 14 日:

  1. >>> my_day = date(1982, 8, 14)
  2. >>> my_day
  3. datetime.date(1982, 8, 14)

(8) 你的生日是星期几?

  1. >>> my_day.weekday()
  2. 5
  3. >>> my_day.isoweekday()
  4. 6

使用函数 weekday(),周一返回 0,周日返回 6。而使用函数 isoweekday(),周一返回 1,周日返回 7。因此,这一天是周六。

(9) 你出生 10 000 天的日期是什么时候?

  1. >>> from datetime import timedelta
  2. >>> party_day = my_day + timedelta(days=10000)
  3. >>> party_day
  4. datetime.date(2009, 12, 30)

如果你的生日真的是那天,你可能失去了一个参加聚会的理由(Party 已经过时了)。

E.11 第11章“并发和网络”

(1) 使用原始的 socket 来实现一个获取当前时间的服务。当客户端向服务器发送字符串 time 时,服务器会返回当前日期和时间的 ISO 格式字符串。

下面是实现服务器端的一种方式,udp_time_server.py:

  1. from datetime import datetime
  2. import socket
  3. address = ('localhost', 6789)
  4. max_size = 4096
  5. print('Starting the server at', datetime.now())
  6. print('Waiting for a client to call.')
  7. server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  8. server.bind(address)
  9. while True:
  10. data, client_addr = server.recvfrom(max_size)
  11. if data == b'time':
  12. now = str(datetime.utcnow())
  13. data = now.encode('utf-8')
  14. server.sendto(data, client_addr)
  15. print('Server sent', data)
  16. server.close()

下面是客户端 udp_time_client.py:

  1. import socket
  2. from datetime import datetime
  3. from time import sleep
  4. address = ('localhost', 6789)
  5. max_size = 4096
  6. print('Starting the client at', datetime.now())
  7. client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  8. while True:
  9. sleep(5)
  10. client.sendto(b'time', address)
  11. data, server_addr = client.recvfrom(max_size)
  12. print('Client read', data)
  13. client.close()

在客户端的循环中加入 sleep(5) 以避免数据交换过于迅速。在一个窗口开启服务器进程:

  1. $ python udp_time_server.py
  2. Starting the server at 2014-06-02 20:28:47.415176
  3. Waiting for a client to call.

在另一个窗口执行客户端:

  1. $ python udp_time_client.py
  2. Starting the client at 2014-06-02 20:28:51.454805

5 秒钟后,你开始得到两者的输出。下面是来自服务器的前三行:

  1. Server sent b'2014-06-03 01:28:56.462565'
  2. Server sent b'2014-06-03 01:29:01.463906'
  3. Server sent b'2014-06-03 01:29:06.465802'

以下是来自客户端的前三行输出 :

  1. Client read b'2014-06-03 01:28:56.462565'
  2. Client read b'2014-06-03 01:29:01.463906'
  3. Client read b'2014-06-03 01:29:06.465802'

这两个程序都会一直运行,需要人工进行终止。

(2) 使用 ZeroMQ 的 REQ 和 REP 套接字实现同样的功能。

这是服务器端程序 zmq_time_server.py:

  1. import zmq
  2. from datetime import datetime
  3. host = '127.0.0.1'
  4. port = 6789
  5. context = zmq.Context()
  6. server = context.socket(zmq.REP)
  7. server.bind("tcp://%s:%s" % (host, port))
  8. print('Server started at', datetime.utcnow())
  9. while True:
  10. # 等待客户端的下一个请求
  11. message = server.recv()
  12. if message == b'time':
  13. now = datetime.utcnow()
  14. reply = str(now)
  15. server.send(bytes(reply, 'utf-8'))
  16. print('Server sent', reply)

以下是客户端程序 zmq_time_client.py:

  1. import zmq
  2. from datetime import datetime
  3. from time import sleep
  4. host = '127.0.0.1'
  5. port = 6789
  6. context = zmq.Context()
  7. client = context.socket(zmq.REQ)
  8. client.connect("tcp://%s:%s" % (host, port))
  9. print('Client started at', datetime.utcnow())
  10. while True:
  11. sleep(5)
  12. request = b'time'
  13. client.send(request)
  14. reply = client.recv()
  15. print("Client received %s" % reply)

对于原始的 socket,你需要首先启动服务器;而使用 ZeroMQ,先启动服务器或者客户端都是可行的。

  1. $ python zmq_time_server.py
  2. Server started at 2014-06-03 01:39:36.933532
  3. $ python zmq_time_client.py
  4. Client started at 2014-06-03 01:39:42.538245

大约 15 秒后,服务器会返回一些行 :

  1. Server sent 2014-06-03 01:39:47.539878
  2. Server sent 2014-06-03 01:39:52.540659
  3. Server sent 2014-06-03 01:39:57.541403

可以在客户端看到:

  1. Client received b'2014-06-03 01:39:47.539878'
  2. Client received b'2014-06-03 01:39:52.540659'
  3. Client received b'2014-06-03 01:39:57.541403'

(3) 使用 XMLRPC 实现同样的功能。

服务器端,xmlrpc_time_server.py:

  1. from xmlrpc.server import SimpleXMLRPCServer
  2. def now():
  3. from datetime import datetime
  4. data = str(datetime.utcnow())
  5. print('Server sent', data)
  6. return data
  7. server = SimpleXMLRPCServer(("localhost", 6789))
  8. server.register_function(now, "now")
  9. server.serve_forever()

客户端,xmlrpc_time_client.py:

  1. import xmlrpc.client
  2. from time import sleep
  3. proxy = xmlrpc.client.ServerProxy("http://localhost:6789/")
  4. while True:
  5. sleep(5)
  6. data = proxy.now()
  7. print('Client received', data)

启动服务器进程:

  1. $ python xmlrpc_time_server.py

启动客户端进程:

  1. $ python xmlrpc_time_client.py

大约 15 秒后,这是服务器端输出的前三行:

  1. Server sent 2014-06-03 02:14:52.299122
  2. 127.0.0.1 - - [02/Jun/2014 21:14:52] "POST HTTP1.1" 200 -
  3. Server sent 2014-06-03 02:14:57.304741
  4. 127.0.0.1 - - [02/Jun/2014 21:14:57] "POST HTTP1.1" 200 -
  5. Server sent 2014-06-03 02:15:02.310377
  6. 127.0.0.1 - - [02/Jun/2014 21:15:02] "POST HTTP1.1" 200 -

下面是客户端输出的前三行:

  1. Client received 2014-06-03 02:14:52.299122
  2. Client received 2014-06-03 02:14:57.304741
  3. Client received 2014-06-03 02:15:02.310377

(4) 你可能看过那部很老的《我爱露西》(I Love Lucy)电视节目。露西和埃塞尔在一个巧克力工厂里工作(这是传统)。他们落在了运输甜点的传送带后面,所以必须用更快的速度进行处理。写一个程序来模拟这个过程,程序会把不同类型的巧克力添加到一个 Redis 列表中,露西是一个客户端,对列表执行阻塞的弹出操作。她需要 0.5 秒来处理一块巧克力。打印出时间和露西处理的每块巧克力类型以及剩余巧克力的数量。

redis_choc_supply.py 提供初始的工作:

  1. import redis
  2. import random
  3. from time import sleep
  4. conn = redis.Redis()
  5. varieties = ['truffle', 'cherry', 'caramel', 'nougat']
  6. conveyor = 'chocolates'
  7. while True:
  8. seconds = random.random()
  9. sleep(seconds)
  10. piece = random.choice(varieties)
  11. conn.rpush(conveyor, piece)

露西的过程更像是 redis_lucy.py:

  1. import redis
  2. from datetime import datetime
  3. from time import sleep
  4. conn = redis.Redis()
  5. timeout = 10
  6. conveyor = 'chocolates'
  7. while True:
  8. sleep(0.5)
  9. msg = conn.blpop(conveyor, timeout)
  10. remaining = conn.llen(conveyor)
  11. if msg:
  12. piece = msg[1]
  13. print('Lucy got a', piece, 'at', datetime.utcnow(),
  14. ', only', remaining, 'left')

任意的顺序打开服务器或者客户端进程,因为露西需要半秒钟处理每一个巧克力,而且平均每隔半秒钟会生产一块巧克力,这是一场追赶着的比赛。开始放入传送带上的巧克力越多,露西的工作难度也越大。

  1. $ python redis_choc_supply.py&
  2. $ python redis_lucy.py
  3. Lucy got a b'nougat' at 2014-06-03 03:15:08.721169 , only 4 left
  4. Lucy got a b'cherry' at 2014-06-03 03:15:09.222816 , only 3 left
  5. Lucy got a b'truffle' at 2014-06-03 03:15:09.723691 , only 5 left
  6. Lucy got a b'truffle' at 2014-06-03 03:15:10.225008 , only 4 left
  7. Lucy got a b'cherry' at 2014-06-03 03:15:10.727107 , only 4 left
  8. Lucy got a b'cherry' at 2014-06-03 03:15:11.228226 , only 5 left
  9. Lucy got a b'cherry' at 2014-06-03 03:15:11.729735 , only 4 left
  10. Lucy got a b'truffle' at 2014-06-03 03:15:12.230894 , only 6 left
  11. Lucy got a b'caramel' at 2014-06-03 03:15:12.732777 , only 7 left
  12. Lucy got a b'cherry' at 2014-06-03 03:15:13.234785 , only 6 left
  13. Lucy got a b'cherry' at 2014-06-03 03:15:13.736103 , only 7 left
  14. Lucy got a b'caramel' at 2014-06-03 03:15:14.238152 , only 9 left
  15. Lucy got a b'cherry' at 2014-06-03 03:15:14.739561 , only 8 left

可怜的露西!!

(5) 使用 ZeroMQ 发布第 7 章练习 (7) 中的诗,每次发布一个单词。写一个 ZeroMQ 客户端来打印出每个以元音开头的单词,再写另一个客户端来打印出所有长度为 5 的单词。忽略标点符号。

下面是服务器 poem_pub.py,把每个单词从诗中拆分出来。如果单词首字母为元音,就发布到主题 vowels 上;如果单词有五个字母,就发布到主题 five 上。一些词可能同时包括在两个主题内,也有一些都没有。

  1. import string
  2. import zmq
  3. host = '127.0.0.1'
  4. port = 6789
  5. ctx = zmq.Context()
  6. pub = ctx.socket(zmq.PUB)
  7. pub.bind('tcp://%s:%s' % (host, port))
  8. with open('mammoth.txt', 'rt') as poem:
  9. words = poem.read()
  10. for word in words.split():
  11. word = word.strip(string.punctuation)
  12. data = word.encode('utf-8')
  13. if word.startswith(('a','e','i','o','u','A','e','i','o','u')):
  14. pub.send_multipart([b'vowels', data])
  15. if len(word) == 5:
  16. pub.send_multipart([b'five', data])

客户端程序 poem_sub.py,订阅主题 vowelsfive,并打印输出主题和单词:

  1. import string
  2. import zmq
  3. host = '127.0.0.1'
  4. port = 6789
  5. ctx = zmq.Context()
  6. sub = ctx.socket(zmq.SUB)
  7. sub.connect('tcp://%s:%s' % (host, port))
  8. sub.setsockopt(zmq.SUBSCRIBE, b'vowels')
  9. sub.setsockopt(zmq.SUBSCRIBE, b'five')
  10. while True:
  11. topic, word = sub.recv_multipart()
  12. print(topic, word)

如果你开启这些服务并执行代码,它们几乎是不工作的。代码看起来是对的,但是没有做任何事情。首先需要阅读 ZeroMQ 文档(http://zguide.zeromq.org/page:all)了解慢连接(slow joiner)问题:即使是在服务器端之前开启客户端,服务器会立刻发布数据,客户端只有片刻时间连接到服务器。如果你发布持续的数据流,当订阅的客户错过一些是没有关系的。但在本例中,数据流很小导致订阅客户端“眨眼”间流过,就像快球迅速掠过击球手。

最简单的解决办法是在发布者(服务器端)调用 bind() 函数之后和开始发送消息之前休眠一秒。使用这个版本的程序 poem_pub_sleep.py:

  1. import string
  2. import zmq
  3. from time import sleep
  4. host = '127.0.0.1'
  5. port = 6789
  6. ctx = zmq.Context()
  7. pub = ctx.socket(zmq.PUB)
  8. pub.bind('tcp://%s:%s' % (host, port))
  9. sleep(1)
  10. with open('mammoth.txt', 'rt') as poem:
  11. words = poem.read()
  12. for word in words.split():
  13. word = word.strip(string.punctuation)
  14. data = word.encode('utf-8')
  15. if word.startswith(('a','e','i','o','u','A','e','i','o','u')):
  16. print('vowels', data)
  17. pub.send_multipart([b'vowels', data])
  18. if len(word) == 5:
  19. print('five', data)
  20. pub.send_multipart([b'five', data])

开启订阅者进程,然后打开休眠版的发布者进程:

  1. $ python poem_sub.py
  2. $ python poem_pub_sleep.py

现在,订阅者有时间来捕捉这两个主题的消息。以下是它输出的前几行:

  1. b'five' b'queen'
  2. b'vowels' b'of'
  3. b'five' b'Lying'
  4. b'vowels' b'at'
  5. b'vowels' b'ease'
  6. b'vowels' b'evening'
  7. b'five' b'flies'
  8. b'five' b'seize'
  9. b'vowels' b'All'
  10. b'five' b'gaily'
  11. b'five' b'great'
  12. b'vowels' b'admired'

如果你不能在发布者程序中加入 sleep() 函数,可以使用 REQREPsockets 同步进行发布者和订阅者。在 GitHub(https://github.com/zeromq/pyzmq/tree/master/examples/pubsub)查看实例代码 publisher.py 和 subscriber.py。