第 2 章 使用多路复用套接字I/O提升性能

本章攻略:

  • 在套接字服务器程序中使用ForkingMixIn
  • 在套接字服务器程序中使用ThreadingMixIn
  • 使用select.select编写一个聊天室服务器
  • 使用select.epoll多路复用Web服务器
  • 使用并发库Diesel多路复用回显服务器

2.1 简介

本章专注于使用一些有用的技术提升套接字服务器的性能。和前一章不同,本章考虑多个客户端连接服务器的情况,而且可以异步通信。服务器不需要在阻塞模式中处理客户端发出的请求,而是单独处理每个请求。如果某个客户端接收或处理数据时花了很长时间,服务器无需等待处理完成,可以使用另外的线程或进程和其他客户端通信。

本章还要介绍select模块。这个模块建立在底层操作系统内核的select系统调用基础之上,提供了平台专用的I/O监控功能。Linux用户可访问http://man7.org/linux/man-pages/man2/select.2.html查看手册,手册中介绍了select系统调用的可用功能。我们的套接字服务器要和多个客户端交互,所以select可以帮助我们监控非阻塞式套接字。有些第三方Python库也能帮助我们同时处理多个客户端,本章包含一个使用Diesel并发库的示例攻略。

为简单起见,我们只会使用少数几个客户端,但读者可以自行扩展本章的攻略,让它们处理几十甚至几百个客户端。

2.2 在套接字服务器程序中使用ForkingMixIn

你已经决定要编写一个异步Python套接字服务器程序。服务器处理客户端发出的请求时不能阻塞,因此要找到一种机制来单独处理每个客户端。

Python 2.7版中的SocketServer模块提供了两个实用类:ForkingMixInThreadingMixInForkingMixIn会为每个客户端请求派生一个新进程。本节介绍ForkingMixIn类,ThreadingMixIn类将在下一节中介绍。有关SocketServer模块的详情,请参阅Python文档:http://docs.python.org/2/library/socketserver.html

2.2.1 实战演练

我们要利用SocketServer模块提供的类,重写第1章中的回显服务器。SocketServer模块提供了可以直接使用的TCP、UDP及其他协议服务器。我们可以创建ForkingServer类,继承TCPServerForkingMixIn类。前一个父类让ForkingServer类实现了之前手动完成的所有服务器操作,例如创建套接字、绑定地址和监听进入的连接。我们的服务器还要继承ForkingMixIn类,异步处理客户端。

ForkingServer类还要创建一个请求处理程序,说明如何处理客户端请求。在这个攻略中,我们的服务器会回显客户端发送的文本字符串。请求处理类ForkingServerRequestHandler继承自SocketServer库提供的BaseRequestHandler类。

回显服务器的客户端ForkingClient可以使用面向对象的方式编写。在Python中,类的构造方法叫作__init__()。按照惯例,要把self作为参数传入__init__()方法,以便指定具体实例的属性。ForkingClient连接的回显服务器要在__init__()方法中初始化,然后在run()方法中向服务器发送消息。

如果你根本不知道“面向对象编程”(Object-oriented Programming,简称OOP),学习这个攻略之前最好熟悉一下OOP的基本概念。

若想测试ForkingServer类,可以启动多个回显客户端,看看服务器如何响应客户端。

代码清单2-1展示了如何在套接字服务器程序中使用ForkingMixIn类,如下所示:

  1. #!usrbin/env python
  2. # Python Network Programming Cookbook -- Chapter - 2
  3. # This program is optimized for Python 2.7.
  4. # It may run on any other version with/without modifications.
  5. # See more: http://docs.python.org/2/library/socketserver.html
  6. import os
  7. import socket
  8. import threading
  9. import SocketServer
  10. SERVER_HOST = 'localhost'
  11. SERVER_PORT = 0 # tells the kernel to pick up a port dynamically
  12. BUF_SIZE = 1024
  13. ECHO_MSG = 'Hello echo server!'
  14. class ForkingClient():
  15. """ A client to test forking server"""
  16. def __init__(self, ip, port):
  17. # Create a socket
  18. self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  19. # Connect to the server
  20. self.sock.connect((ip, port))
  21. def run(self):
  22. """ Client playing with the server"""
  23. # Send the data to server
  24. current_process_id = os.getpid()
  25. print 'PID %s Sending echo message to the server : "%s"' % (current_process_id, ECHO_MSG)
  26. sent_data_length = self.sock.send(ECHO_MSG)
  27. print "Sent: %d characters, so far..." %sent_data_length
  28. # Display server response
  29. response = self.sock.recv(BUF_SIZE)
  30. print "PID %s received: %s" % (current_process_id, response[5:])
  31. def shutdown(self):
  32. """ Cleanup the client socket """
  33. self.sock.close()
  34. class ForkingServerRequestHandler(SocketServer.BaseRequestHandler):
  35. def handle(self):
  36. # Send the echo back to the client
  37. data = self.request.recv(BUF_SIZE)
  38. current_process_id = os.getpid()
  39. response = '%s: %s' % (current_process_id, data)
  40. print "Server sending response [current_process_id: data] = [%s]" %response
  41. self.request.send(response)
  42. return
  43. class ForkingServer(SocketServer.ForkingMixIn,
  44. SocketServer.TCPServer,
  45. ):
  46. """Nothing to add here, inherited everything necessary from parents"""
  47. pass
  48. def main():
  49. # Launch the server
  50. server = ForkingServer((SERVER_HOST, SERVER_PORT), ForkingServerRequestHandler)
  51. ip, port = server.server_address # Retrieve the port number
  52. server_thread = threading.Thread(target=server.serve_forever)
  53. server_thread.setDaemon(True) # don't hang on exit
  54. server_thread.start()
  55. print 'Server loop running PID: %s' %os.getpid()
  56. # Launch the client(s)
  57. client1 = ForkingClient(ip, port)
  58. client1.run()
  59. client2 = ForkingClient(ip, port)
  60. client2.run()
  61. # Clean them up
  62. server.shutdown()
  63. client1.shutdown()
  64. client2.shutdown()
  65. server.socket.close()
  66. if __name__ == '__main__':
  67. main()

2.2.2 原理分析

主线程中创建了一个ForkingServer实例,作为守护进程在后台运行。然后再创建两个客户端和服务器交互。

运行这个脚本后,会看到如下输出:

  1. $ python 2_1_forking_mixin_socket_server.py
  2. Server loop running PID: 12608
  3. PID 12608 Sending echo message to the server : "Hello echo server!"
  4. Sent: 18 characters, so far...
  5. Server sending response [current_process_id: data] = [12610: Hello echo server!]
  6. PID 12608 received: : Hello echo server!
  7. PID 12608 Sending echo message to the server : "Hello echo server!"
  8. Sent: 18 characters, so far...
  9. Server sending response [current_process_id: data] = [12611: Hello echo server!]
  10. PID 12608 received: : Hello echo server!

在你的设备中可能会使用不同的服务器端口号,因为端口号由操作系统内核动态选择。

2.3 在套接字服务器程序中使用ThreadingMixIn

或许基于某些原因你不想编写基于进程的应用程序,而更愿意编写多线程应用程序。可能的原因有:在线程之间共享应用的状态,避免进程间通信的复杂操作,等等。遇到这种需求,如果想使用SocketServer库编写异步网络服务器,就得使用ThreadingMixIn类。

2.3.1 准备工作

对前一个攻略做几处小改动就能使用ThreadingMixIn编写一个可用的套接字服务器。

下载示例代码

如果你是通过http://www.packtpub.com的注册账户购买的图书,可以从该账户中下载相应Packt图书的示例代码。如果你是从其他地方购买的本书,可以访问http://www.packtpub.com/support,注册账户后,我们将会为你发送一封附有示例代码文件的电子邮件。

2.3.2 实战演练

和前一节中基于ForkingMixIn的套接字服务器一样,使用ThreadingMixIn编写的套接字服务器要遵循相同的回显服务器编程模式,不过仍有几点不同。首先,ThreadedTCPServer继承自TCPServerTheadingMixIn。客户端连接这个多线程版服务器时,会创建一个新线程。详情参见http://docs.python.org/2/library/socketserver.html

套接字服务器的请求处理类ForkingServerRequestHandler在一个新线程中把消息回显给客户端。在这个类中可以获取线程的信息。简单起见,我们把客户端的代码放在一个函数中,而不是一个类中。客户端代码创建客户端套接字,然后向服务器发送消息。

代码清单2-2展示了如何在回显套接字服务器中使用ThreadingMixIn类,如下所示:

  1. #!usrbin/env python
  2. # Python Network Programming Cookbook -- Chapter - 2
  3. # This program is optimized for Python 2.7.
  4. # It may run on any other version with/without modifications.
  5. import os
  6. import socket
  7. import threading
  8. import SocketServer
  9. SERVER_HOST = 'localhost'
  10. SERVER_PORT = 0 # tells the kernel to pickup a port dynamically
  11. BUF_SIZE = 1024
  12. def client(ip, port, message):
  13. """ A client to test threading mixin server"""
  14. # Connect to the server
  15. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  16. sock.connect((ip, port))
  17. try:
  18. sock.sendall(message)
  19. response = sock.recv(BUF_SIZE)
  20. print "Client received: %s" %response
  21. finally:
  22. sock.close()
  23. class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
  24. """ An example of threaded TCP request handler """
  25. def handle(self):
  26. data = self.request.recv(1024)
  27. cur_thread = threading.current_thread()
  28. response = "%s: %s" %(cur_thread.name, data)
  29. self.request.sendall(response)
  30. class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
  31. """Nothing to add here, inherited everything necessary from parents"""
  32. pass
  33. if __name__ == "__main__":
  34. # Run server
  35. server = ThreadedTCPServer((SERVER_HOST, SERVER_PORT), ThreadedTCPRequestHandler)
  36. ip, port = server.server_address # retrieve ip address
  37. # Start a thread with the server -- one thread per request
  38. server_thread = threading.Thread(target=server.serve_forever)
  39. # Exit the server thread when the main thread exits
  40. server_thread.daemon = True
  41. server_thread.start()
  42. print "Server loop running on thread: %s" %server_thread.name
  43. # Run clients
  44. client(ip, port, "Hello from client 1")
  45. client(ip, port, "Hello from client 2")
  46. client(ip, port, "Hello from client 3")
  47. # Server cleanup
  48. server.shutdown()

2.3.3 原理分析

这个攻略首先创建一个服务器线程,并在后台启动。然后启动三个测试客户端,向服务器发送消息。作为响应,服务器把消息回显给客户端。在服务器请求处理类的handle()方法中,我们取回了当前线程的信息并将其打印出来,这些信息在每次客户端连接中都不同。

在客户端和服务器的通信中用到了sendall()方法,以保证发送的数据无任何丢失。

  1. $ python 2_2_threading_mixin_socket_server.py
  2. Server loop running on thread: Thread-1
  3. Client received: Thread-2: Hello from client 1
  4. Client received: Thread-3: Hello from client 2
  5. Client received: Thread-4: Hello from client 3

2.4 使用select.select编写一个聊天室服务器

在大型网络服务器应用程序中可能有几百或几千个客户端同时连接服务器,此时为每个客户端创建单独的线程或进程可能不切实际。由于内存可用量受限,且主机的CPU能力有限,我们需要一种更好的技术来处理大量的客户端。幸好,Python提供的select模块能解决这一问题。

2.4.1 实战演练

我们将编写一个高效的聊天室服务器,处理几百或更多数量的客户端连接。我们要使用select模块提供的select()方法,让聊天室服务器和客户端所做的操作始终不会阻塞消息的发送和接收。

这个攻略使用一个脚本就能启动客户端和服务器,执行脚本时要指定--name参数。只有在命令行中传入了--name=server,脚本才启动聊天室服务器。如果为--name参数指定了其他值,例如client1client2,则脚本会启动聊天室客户端。聊天室服务器绑定的端口在命令行参数--port中指定。对大型应用程序而言,最好在不同的模块中编写服务器和客户端。

代码清单2-3展示了一个使用select.select编写的聊天室应用示例,如下所示:

  1. #!usrbin/env python
  2. # Python Network Programming Cookbook -- Chapter - 2
  3. # This program is optimized for Python 2.7.
  4. # It may run on any other version with/without modifications.
  5. import select
  6. import socket
  7. import sys
  8. import signal
  9. import cPickle
  10. import struct
  11. import argparse
  12. SERVER_HOST = 'localhost'
  13. CHAT_SERVER_NAME = 'server'
  14. # Some utilities
  15. def send(channel, *args):
  16. buffer = cPickle.dumps(args)
  17. value = socket.htonl(len(buffer))
  18. size = struct.pack("L",value)
  19. channel.send(size)
  20. channel.send(buffer)
  21. def receive(channel):
  22. size = struct.calcsize("L")
  23. size = channel.recv(size)
  24. try:
  25. size = socket.ntohl(struct.unpack("L", size)[0])
  26. except struct.error, e:
  27. return ''
  28. buf = ""
  29. while len(buf) < size:
  30. buf = channel.recv(size - len(buf))
  31. return cPickle.loads(buf)[0]

send()函数接收一个具名参数channel和一个定位参数*args,使用cPickle模块中的dumps()方法序列化数据,使用struct模块计算数据的大小。同样,receive()函数也接收一个具名参数channel

然后定义ChatServer类,如下所示:

  1. class ChatServer(object):
  2. """ An example chat server using select """
  3. def __init__(self, port, backlog=5):
  4. self.clients = 0
  5. self.clientmap = {}
  6. self.outputs = [] # list output sockets
  7. self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  8. self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  9. self.server.bind((SERVER_HOST, port))
  10. print 'Server listening to port: %s ...' %port
  11. self.server.listen(backlog)
  12. # Catch keyboard interrupts
  13. signal.signal(signal.SIGINT, self.sighandler)
  14. def sighandler(self, signum, frame):
  15. """ Clean up client outputs"""
  16. # Close the server
  17. print 'Shutting down server...'
  18. # Close existing client sockets
  19. for output in self.outputs:
  20. output.close()
  21. self.server.close()
  22. def get_client_name(self, client):
  23. """ Return the name of the client """
  24. info = self.clientmap[client]
  25. host, name = info[0][0], info[1]
  26. return '@'.join((name, host))

ChatServer类的主要执行方法如下所示:

  1. def run(self):
  2. inputs = [self.server, sys.stdin]
  3. self.outputs = []
  4. running = True
  5. while running:
  6. try:
  7. readable, writeable, exceptional = select.select(inputs, self.outputs, [])
  8. except select.error, e:
  9. break
  10. for sock in readable:
  11. if sock == self.server:
  12. # handle the server socket
  13. client, address = self.server.accept()
  14. print "Chat server: got connection %d from %s" % (client.fileno(), address)
  15. # Read the login name
  16. cname = receive(client).split('NAME: ')[1]
  17. # Compute client name and send back
  18. self.clients += 1
  19. send(client, 'CLIENT: ' + str(address[0]))
  20. inputs.append(client)
  21. self.clientmap[client] = (address, cname)
  22. # Send joining information to other clients
  23. msg = "\n(Connected: New client (%d) from %s)" % (self.clients, self.get_client_name(client))
  24. for output in self.outputs:
  25. send(output, msg)
  26. self.outputs.append(client)
  27. elif sock == sys.stdin:
  28. # handle standard input
  29. junk = sys.stdin.readline()
  30. running = False
  31. else:
  32. # handle all other sockets
  33. try:
  34. data = receive(sock)
  35. if data:
  36. # Send as new client's message...
  37. msg = '\n#[' + self.get_client_name(sock) + ']>>' + data
  38. # Send data to all except ourself
  39. for output in self.outputs:
  40. if output != sock:
  41. send(output, msg)
  42. else:
  43. print "Chat server: %d hung up" % sock.fileno()
  44. self.clients -= 1
  45. sock.close()
  46. inputs.remove(sock)
  47. self.outputs.remove(sock)
  48. # Sending client leaving information to others
  49. msg = "\n(Now hung up: Client from %s)" % self.get_client_name(sock)
  50. for output in self.outputs:
  51. send(output, msg)
  52. except socket.error, e:
  53. # Remove
  54. inputs.remove(sock)
  55. self.outputs.remove(sock)
  56. self.server.close()

初始化聊天室服务器时创建了一些属性:客户端数量、客户端映射和输出的套接字。和之前创建服务器套接字一样,初始化时也设定了重用地址的选项,这么做可以使用同一个端口重启服务器。聊天室服务器类的构造方法还有一个可选参数backlog,用于设定服务器监听的连接队列的最大数量。

这个聊天室服务器有个值得介绍的地方,它可以使用signal模块捕获用户的中断操作。中断操作一般通过键盘输入。ChatServer类为中断信号(SIGINT)注册了一个信号处理方法sighandler。信号处理方法捕获从键盘输入的中断信号后,关闭所有输出套接字,其中一些套接字可能还有数据等待发送。

聊天室服务器的主要执行方法是run(),在while循环中执行操作。run()方法注册了一个select接口,输入参数是聊天室服务器套接字stdin,输出参数由服务器的输出套接字列表指定。调用select.select()方法后得到三个列表:可读套接字、可写套接字和异常套接字。聊天室服务器只关心可读套接字,其中保存了准备被读取的数据。如果可读套接字是服务器本身,表示有一个新客户端连到服务器上了,服务器会读取客户端的名字,将其广播给其他客户端。如果输入参数中有内容,聊天室服务器会退出。类似地,这个聊天室服务器也能处理其他客户端套接字的输入,转播客户端直接传送的数据,还能共享客户端进入和离开聊天室的信息。

聊天室客户端应该包含以下代码:

  1. class ChatClient(object):
  2. """ A command line chat client using select """
  3. def __init__(self, name, port, host=SERVER_HOST):
  4. self.name = name
  5. self.connected = False
  6. self.host = host
  7. self.port = port
  8. # Initial prompt
  9. self.prompt='[' + '@'.join((name, socket.gethostname().split('.')[0])) + ']> '
  10. # Connect to server at port
  11. try:
  12. self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  13. self.sock.connect((host, self.port))
  14. print "Now connected to chat server@ port %d" % self.port
  15. self.connected = True
  16. # Send my name...
  17. send(self.sock,'NAME: ' + self.name)
  18. data = receive(self.sock)
  19. # Contains client address, set it
  20. addr = data.split('CLIENT: ')[1]
  21. self.prompt = '[' + '@'.join((self.name, addr)) + ']> '
  22. except socket.error, e:
  23. print "Failed to connect to chat server @ port %d" % self.port
  24. sys.exit(1)
  25. def run(self):
  26. """ Chat client main loop """
  27. while self.connected:
  28. try:
  29. sys.stdout.write(self.prompt)
  30. sys.stdout.flush()
  31. # Wait for input from stdin and socket
  32. readable, writeable,exceptional = select.select([0, self.sock], [],[])
  33. for sock in readable:
  34. if sock == 0:
  35. data = sys.stdin.readline().strip()
  36. if data: send(self.sock, data)
  37. elif sock == self.sock:
  38. data = receive(self.sock)
  39. if not data:
  40. print 'Client shutting down.'
  41. self.connected = False
  42. break
  43. else:
  44. sys.stdout.write(data + '\n')
  45. sys.stdout.flush()
  46. except KeyboardInterrupt:
  47. print " Client interrupted. """
  48. self.sock.close()
  49. break

初始化聊天室客户端时指定了name参数,连接到聊天室服务器之后,这个名字会发送给服务器。初始化时还设置了一个自定义的提示符[name@host]>。客户端的执行方法run()在连接到服务器的过程中一直运行着。和聊天室服务器类似,聊天室客户端也使用select()方法注册。只要可读套接字做好了准备,客户端就开始接收数据。如果sock的值为0,而且有可用的数据,客户端就可以发送数据。发送的数据还会显示在stdout或者本例中的命令行终端里。主方法应该接收命令行参数,调用服务器或者客户端,如下所示:

  1. if __name__ == "__main__":
  2. parser = argparse.ArgumentParser(description='Socket Server Example with Select')
  3. parser.add_argument('--name', action="store", dest="name", required=True)
  4. parser.add_argument('--port', action="store", dest="port", type=int, required=True)
  5. given_args = parser.parse_args()
  6. port = given_args.port
  7. name = given_args.name
  8. if name == CHAT_SERVER_NAME:
  9. server = ChatServer(port)
  10. server.run()
  11. else:
  12. client = ChatClient(name=name, port=port)
  13. client.run()

这个脚本要运行三次:一次用于启动聊天室服务器,两次用于启动两个聊天室客户端。启动服务器时,在命令行中传入参数–-name=server--port=8800。启动client1时,把名字参数改成--name=client1;启动client2时改为--name=client2。然后在client1中发送消息"Hello from client 1",这个消息会显示在client2的终端里。同样,在client2中发送消息"hello from client 2",也会在client1的终端里显示。

服务器的输出如下:

  1. $ python 2_3_chat_server_with_select.py --name=server --port=8800
  2. Server listening to port: 8800 ...
  3. Chat server: got connection 4 from ('127.0.0.1', 56565)
  4. Chat server: got connection 5 from ('127.0.0.1', 56566)

client1的输出如下:

  1. $ python 2_3_chat_server_with_select.py --name=client1 --port=8800
  2. Now connected to chat server@ port 8800
  3. [client1@127.0.0.1]>
  4. (Connected: New client (2) from client2@127.0.0.1)
  5. [client1@127.0.0.1]> Hello from client 1
  6. [client1@127.0.0.1]>
  7. #[client2@127.0.0.1]>>hello from client 2

client2的输出如下:

  1. $ python 2_3_chat_server_with_select.py --name=client2 --port=8800
  2. Now connected to chat server@ port 8800
  3. [client2@127.0.0.1]>
  4. #[client1@127.0.0.1]>>Hello from client 1
  5. [client2@127.0.0.1]> hello from client 2
  6. [client2@127.0.0.1]

整个交互过程如下面的截图所示:

图2-1

2.4.2 原理分析

在这个模块的顶端定义了两个实用函数:send()receive()

在聊天室服务器和客户端中用到了这两个函数,如前面的代码所示。聊天室服务器和客户端中定义的方法前面也介绍过了。

2.5 使用select.epoll多路复用Web服务器

Python的select模块中有很多针对特定平台的网络事件管理函数。在Linux设备中可以使用epoll。这个函数利用操作系统内核轮询网络事件,让脚本知道有事件发生了。这听起来比前面介绍的select.select方案更高效。

2.5.1 实战演练

我们来编写一个简单的Web服务器,向每一个连接服务器的网页浏览器返回一行文本。

这个脚本的核心在Web服务器的初始化过程中,我们要调用方法select.epoll(),注册服务器的文件描述符,以达到事件通知的目的。在Web服务器执行的代码中,套接字事件由下述代码监控:

代码清单2-4 展示了如何使用select.epoll实现简单的Web服务器,如下所示:

  1. #!usrbin/env python
  2. # Python Network Programming Cookbook -- Chapter - 2
  3. # This program is optimized for Python 2.7.
  4. # It may run on any other version with/without modifications.
  5. import socket
  6. import select
  7. import argparse
  8. SERVER_HOST = 'localhost'
  9. EOL1 = b'\n\n'
  10. EOL2 = b'\n\r\n'
  11. SERVER_RESPONSE = b"""HTTP/1.1 200 OK\r\nDate: Mon, 1 Apr 2013 01:01:01 GMT\r\nContent-Type: text/plain\r\nContent-Length: 25\r\n\r\n
  12. Hello from Epoll Server!"""
  13. class EpollServer(object):
  14. """ A socket server using Epoll"""
  15. def __init__(self, host=SERVER_HOST, port=0):
  16. self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  17. self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  18. self.sock.bind((host, port))
  19. self.sock.listen(1)
  20. self.sock.setblocking(0)
  21. self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  22. print "Started Epoll Server"
  23. self.epoll = select.epoll()
  24. self.epoll.register(self.sock.fileno(), select.EPOLLIN)
  25. def run(self):
  26. """Executes epoll server operation"""
  27. try:
  28. connections = {}; requests = {}; responses = {}
  29. while True:
  30. events = self.epoll.poll(1)
  31. for fileno, event in events:
  32. if fileno == self.sock.fileno():
  33. connection, address = self.sock.accept()
  34. connection.setblocking(0)
  35. self.epoll.register(connection.fileno(), select.EPOLLIN)
  36. connections[connection.fileno()] = connection
  37. requests[connection.fileno()] = b''
  38. responses[connection.fileno()] = SERVER_RESPONSE
  39. elif event & select.EPOLLIN:
  40. requests[fileno] += connections[fileno].recv(1024)
  41. if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
  42. self.epoll.modify(fileno, select.EPOLLOUT)
  43. print('-'*40 + '\n' + requests[fileno].decode()[:-2])
  44. elif event & select.EPOLLOUT:
  45. byteswritten = connections[fileno].send(responses[fileno])
  46. responses[fileno] = responses[fileno][byteswritten:]
  47. if len(responses[fileno]) == 0:
  48. self.epoll.modify(fileno, 0)
  49. connections[fileno].shutdown(socket.SHUT_RDWR)
  50. elif event & select.EPOLLHUP:
  51. self.epoll.unregister(fileno)
  52. connections[fileno].close()
  53. del connections[fileno]
  54. finally:
  55. self.epoll.unregister(self.sock.fileno())
  56. self.epoll.close()
  57. self.sock.close()
  58. if __name__ == '__main__':
  59. parser = argparse.ArgumentParser(description='Socket Server Example with Epoll')
  60. parser.add_argument('--port', action="store", dest="port", type=int, required=True)
  61. given_args = parser.parse_args()
  62. port = given_args.port
  63. server = EpollServer(host=SERVER_HOST, port=port)
  64. server.run()

运行这个脚本,在网页浏览器(例如Firefox或IE)中输入http://localhost:8800/访问服务器,在终端会看到如下输出:

  1. $ python 2_4_simple_web_server_with_epoll.py --port=8800
  2. Started Epoll Server
  3. ----------------------------------------
  4. GET HTTP1.1
  5. Host: localhost:8800
  6. Connection: keep-alive
  7. Accept: text/html,application/xhtml+xml,application/xml;q=0.9,/;q=0.8
  8. User-Agent: Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.31 (KHTML, like Gecko) Chrome/26.0.1410.43 Safari/537.31
  9. DNT: 1
  10. Accept-Encoding: gzip,deflate,sdch
  11. Accept-Language: en-GB,en-US;q=0.8,en;q=0.6
  12. Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.3
  13. Cookie: MoodleSession=69149dqnvhett7br3qebsrcmh1;MOODLEID1_=%257F%25BA%2B%2540V
  14. ----------------------------------------
  15. GET favicon.ico HTTP1.1
  16. Host: localhost:8800
  17. Connection: keep-alive
  18. Accept: /
  19. DNT: 1
  20. User-Agent: Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.31 (KHTML, like Gecko) Chrome/26.0.1410.43 Safari/537.31
  21. Accept-Encoding: gzip,deflate,sdch
  22. Accept-Language: en-GB,en-US;q=0.8,en;q=0.6
  23. Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.3

在浏览器中还会看到以下这行文本:

  1. Hello from Epoll Server!

这一过程的截图如下所示:

图2-2

2.5.2 原理分析

在Web服务器EpollServer的构造方法中创建了一个套接字服务器,绑定到本地主机的指定端口上。服务器的套接字被设定为非阻塞模式(setblocking(0)),并设定了TCP_NODELAY选项,让服务器无需缓冲便可直接交换数据(比如在SSH连接中)。然后创建了一个select.epoll()实例,再把套接字的文件描述符传给这个实例,以便监控。

在这个Web服务器的run()方法中开始监听套接字事件。事件由下述常量表示:

  • EPOLLIN:套接字读事件
  • EPOLLOUT:套接字写事件

这个套接字服务器把响应设为SERVER_RESPONSE。如果连接套接字服务器的客户端想写数据,可以在EPOLLOUT事件中完成。发生内部错误时,EPOLLHUP事件会把一个异常关闭信号发给套接字服务器。

2.6 使用并发库Diesel多路复用回显服务器

有时你需要编写一个大型自定义网络应用程序,但不想重复输入初始化服务器的代码,比如说创建套接字、绑定地址、监听以及处理基本的错误等。有很多Python网络库都可以帮助你把样板代码删除。这里我们要使用一个提供这种功能的库,它叫作Diesel。

2.6.1 准备工作

Diesel使用非阻塞和协程技术提升编写网络服务器的效率。Diesel的网站上有这么一句话:“Diesel的核心是一个紧密的事件轮询,使用epoll提供几近平稳的性能,即便有10 000个或更多的连接也无妨。”这一节我们通过一个简单的回显服务器介绍Diesel的用法。你需要安装Diesel 3.0或者更新的版本,使用pip命令即可完成:$ pip install diesel >= 3.0

2.6.2 实战演练

在Python的Diesel框架中,应用程序使用Application()类的实例初始化,事件处理函数注册在这个实例上。我们来看一下使用Diesel编写回显服务器是多么简单。

代码清单2-5展示了如何使用Diesel编写回显服务器,如下所示:

  1. #!usrbin/env python
  2. # Python Network Programming Cookbook -- Chapter - 2
  3. # This program is optimized for Python 2.7.
  4. # It may run on any other version with/without modifications.
  5. # You alos need diesel library 3.0 or any later version
  6. import diesel
  7. import argparse
  8. class EchoServer(object):
  9. """ An echo server using diesel"""
  10. def handler(self, remote_addr):
  11. """Runs the echo server"""
  12. host, port = remote_addr[0], remote_addr[1]
  13. print "Echo client connected from: %s:%d" %(host, port)
  14. while True:
  15. try:
  16. message = diesel.until_eol()
  17. your_message = ': '.join(['You said', message])
  18. diesel.send(your_message)
  19. except Exception, e:
  20. print "Exception:",e
  21. def main(server_port):
  22. app = diesel.Application()
  23. server = EchoServer()
  24. app.add_service(diesel.Service(server.handler, server_port))
  25. app.run()
  26. if __name__ == '__main__':
  27. parser = argparse.ArgumentParser(description='Echo server example with Diesel')
  28. parser.add_argument('--port', action="store", dest="port", type=int, required=True)
  29. given_args = parser.parse_args()
  30. port = given_args.port
  31. main(port)

运行这个脚本后,服务器会显示如下输出:

  1. $ python 2_5_echo_server_with_diesel.py --port=8800
  2. [2013/04/08 11:48:32] {diesel} WARNING:Starting diesel <hand-rolled select.epoll>
  3. Echo client connected from: 127.0.0.1:56603

在另一个终端窗口中可以使用Telnet客户端连接回显服务器,测试消息回显,如下所示:

  1. $ telnet localhost 8800
  2. Trying 127.0.0.1...
  3. Connected to localhost.
  4. Escape character is '^]'.
  5. Hello Diesel server?
  6. You said: Hello Diesel server?

下面这个截图显示了和Diesel聊天室服务器交互的过程:

图2-3

2.6.3 原理分析

这个脚本从命令行参数--port中获取端口号,将其传给main()函数。Diesel应用程序在main()函数中初始化并运行。

在Diesel中有“服务”的概念,应用程序可以提供多种服务。EchoServer类中定义了handler()方法,让服务器能够处理单独的客户端连接。运行服务时要把handler()方法和端口号作为参数传给Service()方法。

handler()方法决定服务器的行为,在这个脚本中,服务器直接返回消息文本。

如果把这个脚本和第1章中的“编写一个简单的回显客户端/服务器应用”攻略(代码清单1-13a)对比,很明显能看出,我们不需要编写样板代码,因此很容易把精力集中在高层应用逻辑上。