一般web框架,同步模式下存在的问题
1、计算等待
sleep模拟,A用户需耗费计算量的情况下占用线程时,B用户处于等待状态


import tornado.ioloop import tornado.webclass MainHandler(tornado.web.RequestHandler):def get(self):import timetime.sleep(10)self.write("Hello, world")class IndexHandler(tornado.web.RequestHandler):def get(self):self.write("Index") application = tornado.web.Application([(r"/main", MainHandler),(r"/index", IndexHandler), ])if __name__ == "__main__":application.listen(8888)tornado.ioloop.IOLoop.instance().start()
2、IO等待
写文件 / 网络返回等请求,A用户占用线程,B用户等待


import tornado.ioloop import tornado.webclass MainHandler(tornado.web.RequestHandler):def get(self):import requestsrequests.get('http://www.google.com')self.write('xxxxx')class IndexHandler(tornado.web.RequestHandler):def get(self):self.write("Index") application = tornado.web.Application([(r"/main", MainHandler),(r"/index", IndexHandler), ])if __name__ == "__main__":application.listen(8888)tornado.ioloop.IOLoop.instance().start()
Tornado的解决方式---异步非阻塞
Future对象 + gen.coroutine装饰器 + yield生成器
1、计算等待


import tornado.ioloop import tornado.web from tornado import gen from tornado.concurrent import Future import timeclass MainHandler(tornado.web.RequestHandler):@gen.coroutinedef get(self):future = Future()# 特殊的形式等待5stornado.ioloop.IOLoop.current().add_timeout(time.time() + 5, self.done)yield futuredef done(self, *args, **kwargs):self.write('Main')self.finish()class IndexHandler(tornado.web.RequestHandler):def get(self):self.write("Index") application = tornado.web.Application([(r"/main", MainHandler),(r"/index", IndexHandler), ])if __name__ == "__main__":application.listen(8888)tornado.ioloop.IOLoop.instance().start()
2、IO中的网络请求


import tornado.ioloop import tornado.web from tornado import genclass MainHandler(tornado.web.RequestHandler):@gen.coroutinedef get(self):from tornado import httpclienthttp = httpclient.AsyncHTTPClient()yield http.fetch("http://www.google.com", self.done)def done(self, *args, **kwargs):self.write('Main')self.finish()class IndexHandler(tornado.web.RequestHandler):def get(self):self.write("Index") application = tornado.web.Application([(r"/main", MainHandler),(r"/index", IndexHandler), ])if __name__ == "__main__":application.listen(8888)tornado.ioloop.IOLoop.instance().start()
Tornado的请求挂起
1、用yield结合Future对象来挂起请求


import tornado.ioloop import tornado.web from tornado import gen from tornado.concurrent import Futurefuture = None class MainHandler(tornado.web.RequestHandler):@gen.coroutinedef get(self):global futurefuture = Future()future.add_done_callback(self.done)yield futuredef done(self, *args, **kwargs):self.write('Main')self.finish()class IndexHandler(tornado.web.RequestHandler):def get(self):global futurefuture.set_result(None)self.write("Index")application = tornado.web.Application([(r"/main", MainHandler),(r"/index", IndexHandler), ])if __name__ == "__main__":application.listen(8888)tornado.ioloop.IOLoop.instance().start()
自己实现一个异步非阻塞框架
1、select和socket先实现同步阻塞式web框架


import socket import selectclass HttpRequest(object):"""用户封装用户请求信息"""def __init__(self, content):""":param content:用户发送的请求数据:请求头和请求体"""self.content = contentself.header_bytes = bytes()self.body_bytes = bytes()self.header_dict = {}self.method = ""self.url = ""self.protocol = ""self.initialize()self.initialize_headers()def initialize(self):temp = self.content.split(b'\r\n\r\n', 1)if len(temp) == 1:self.header_bytes += tempelse:h, b = tempself.header_bytes += hself.body_bytes += b@propertydef header_str(self):return str(self.header_bytes, encoding='utf-8')def initialize_headers(self):headers = self.header_str.split('\r\n')first_line = headers[0].split(' ')if len(first_line) == 3:self.method, self.url, self.protocol = headers[0].split(' ')for line in headers:kv = line.split(':')if len(kv) == 2:k, v = kvself.header_dict[k] = v# class Future(object): # def __init__(self): # self.result = Nonedef main(request):return "main"def index(request):return "indexasdfasdfasdf"routers = [('/main/',main),('/index/',index), ]def run():sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)sock.bind(("127.0.0.1", 9999,))sock.setblocking(False)sock.listen(128)inputs = []inputs.append(sock)while True:rlist,wlist,elist = select.select(inputs,[],[],0.05)for r in rlist:if r == sock:"""新请求到来"""conn,addr = sock.accept()conn.setblocking(False)inputs.append(conn)else:"""客户端发来数据"""data = b""while True:try:chunk = r.recv(1024)data = data + chunkexcept Exception as e:chunk = Noneif not chunk:break# data进行处理:请求头和请求体request = HttpRequest(data)# 1. 请求头中获取url# 2. 去路由中匹配,获取指定的函数# 3. 执行函数,获取返回值# 4. 将返回值 r.sendall(b'alskdjalksdjf;asfd')import reflag = Falsefunc = Nonefor route in routers:if re.match(route[0],request.url):flag = Truefunc = route[1]breakif flag:result = func(request)r.sendall(bytes(result,encoding='utf-8'))else:r.sendall(b"404")inputs.remove(r)r.close()if __name__ == '__main__':run()
2、自定义一个对象来挂起请求


import socket import selectclass HttpRequest(object):"""用户封装用户请求信息"""def __init__(self, content):""":param content:用户发送的请求数据:请求头和请求体"""self.content = contentself.header_bytes = bytes()self.body_bytes = bytes()self.header_dict = {}self.method = ""self.url = ""self.protocol = ""self.initialize()self.initialize_headers()def initialize(self):temp = self.content.split(b'\r\n\r\n', 1)if len(temp) == 1:self.header_bytes += tempelse:h, b = tempself.header_bytes += hself.body_bytes += b@propertydef header_str(self):return str(self.header_bytes, encoding='utf-8')def initialize_headers(self):headers = self.header_str.split('\r\n')first_line = headers[0].split(' ')if len(first_line) == 3:self.method, self.url, self.protocol = headers[0].split(' ')for line in headers:kv = line.split(':')if len(kv) == 2:k, v = kvself.header_dict[k] = vclass Future(object):def __init__(self):self.result = None F = None def main(request):global FF = Future()return Fdef stop(request):global FF.result = b"xxxxxxxxxxxxx"return "stop"def index(request):return "indexasdfasdfasdf"routers = [('/main/',main),('/index/',index),('/stop/',stop), ]def run():sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)sock.bind(("127.0.0.1", 9999,))sock.setblocking(False)sock.listen(128)inputs = []inputs.append(sock)async_request_dict = {# 'socket': futrue }while True:rlist,wlist,elist = select.select(inputs,[],[],0.05)for r in rlist:if r == sock:"""新请求到来"""conn,addr = sock.accept()conn.setblocking(False)inputs.append(conn)else:"""客户端发来数据"""data = b""while True:try:chunk = r.recv(1024)data = data + chunkexcept Exception as e:chunk = Noneif not chunk:break# data进行处理:请求头和请求体request = HttpRequest(data)# 1. 请求头中获取url# 2. 去路由中匹配,获取指定的函数# 3. 执行函数,获取返回值# 4. 将返回值 r.sendall(b'alskdjalksdjf;asfd')import reflag = Falsefunc = Nonefor route in routers:if re.match(route[0],request.url):flag = Truefunc = route[1]breakif flag:result = func(request)if isinstance(result,Future):async_request_dict[r] = resultelse:r.sendall(bytes(result,encoding='utf-8'))inputs.remove(r)r.close()else:r.sendall(b"404")inputs.remove(r)r.close()for conn in async_request_dict.keys():future = async_request_dict[conn]if future.result:conn.sendall(future.result)conn.close()del async_request_dict[conn]inputs.remove(conn)if __name__ == '__main__':run()
3、触发定时器回来处理刚才挂起的请求


import socket import select import timeclass HttpRequest(object):"""用户封装用户请求信息"""def __init__(self, content):""":param content:用户发送的请求数据:请求头和请求体"""self.content = contentself.header_bytes = bytes()self.body_bytes = bytes()self.header_dict = {}self.method = ""self.url = ""self.protocol = ""self.initialize()self.initialize_headers()def initialize(self):temp = self.content.split(b'\r\n\r\n', 1)if len(temp) == 1:self.header_bytes += tempelse:h, b = tempself.header_bytes += hself.body_bytes += b@propertydef header_str(self):return str(self.header_bytes, encoding='utf-8')def initialize_headers(self):headers = self.header_str.split('\r\n')first_line = headers[0].split(' ')if len(first_line) == 3:self.method, self.url, self.protocol = headers[0].split(' ')for line in headers:kv = line.split(':')if len(kv) == 2:k, v = kvself.header_dict[k] = vclass Future(object):def __init__(self,timeout=0):self.result = Noneself.timeout = timeoutself.start = time.time() def main(request):f = Future(5)return fdef index(request):return "indexasdfasdfasdf"routers = [('/main/',main),('/index/',index), ]def run():sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)sock.bind(("127.0.0.1", 9999,))sock.setblocking(False)sock.listen(128)inputs = []inputs.append(sock)async_request_dict = {# 'socket': futrue }while True:rlist,wlist,elist = select.select(inputs,[],[],0.05)for r in rlist:if r == sock:"""新请求到来"""conn,addr = sock.accept()conn.setblocking(False)inputs.append(conn)else:"""客户端发来数据"""data = b""while True:try:chunk = r.recv(1024)data = data + chunkexcept Exception as e:chunk = Noneif not chunk:break# data进行处理:请求头和请求体request = HttpRequest(data)# 1. 请求头中获取url# 2. 去路由中匹配,获取指定的函数# 3. 执行函数,获取返回值# 4. 将返回值 r.sendall(b'alskdjalksdjf;asfd')import reflag = Falsefunc = Nonefor route in routers:if re.match(route[0],request.url):flag = Truefunc = route[1]breakif flag:result = func(request)if isinstance(result,Future):async_request_dict[r] = resultelse:r.sendall(bytes(result,encoding='utf-8'))inputs.remove(r)r.close()else:r.sendall(b"404")inputs.remove(r)r.close()for conn in async_request_dict.keys():future = async_request_dict[conn]start = future.starttimeout = future.timeoutctime = time.time()if (start + timeout) <= ctime :future.result = b"timeout"if future.result:conn.sendall(future.result)conn.close()del async_request_dict[conn]inputs.remove(conn)if __name__ == '__main__':run()