前言
Tornado(龙卷风)和Django一样是Python中比较主流的web框架,
知识铺垫:
什么是别人所说的web server /web服务器?
所有Python写的web服务器本质是就是1个符合wsgi协议标准的 socket(例如:Python的wsgiref模块就可以实现1个web服务)
web server负责 监听在某1个端口、接收客户端的连接,分割http的请求头和请求体..等底层工作
最后封装好请求头和响应头的信息,传给web框架
让我们在使用web框架开发web应用程序的时候更加快捷和方便,就可以 request.xx 、render, HttpResponse,redirect
什么是别人说的web 应用/web框架?
web框架负责web应用逻辑处理的部分,通常做了一些代码封装,可以更加人性性化得让我们使用(Django/Flashk。。。)
web server 和 web 框架如何结合?
如果想要让你的web程序运行起来,2者缺一不可,如果你使用的web框架不包含 web server就需要搭配第3方的模块,例如Django在默认情况下socket借助的是wsgiref模块
但是有的web框架包含了web server(tornado)
以Django为代表的python web 框架搭配的web server一般为gunicorn/uwsgi…这些都是基于多进程和多线程工作的,这种工作模式的缺陷是服务器每接受1个请求都会启动1个线程/进程进行处理,如果并发量过大就需要产生大量的线程/进程,服务器不能无限制得开线程和进程,所以满足不了用户量大、高并发的应用场景;
Tornado 和现在的主流 Web 服务器框架也有着明显的区别:
Tornado是服务器和web框架的结合体,自带web server,并且在框架内部也可以实现了异步、非阻塞、且对WebSocket协议天然支持;
一、Tornado框架的基本组成
Tonado由 web server(基于epoll机制的IO多路复用性IO的socket)、路由系统、视图、模板语言4大部分组成,如果习惯了使用Django你会感觉它功能单薄,但是只有这样才能足够轻量,如果用到什么功能就自己去GitHub上找现成的插件,或者自实现;以下将对这些基本组件进行逐一介绍。
Django功能概览:
socket:有
中间件:无(使用Python的wsgiref模块)
路由系统:有
视图函数:有
ORM操作:有
模板语言:有
simple_tag:有
cokies:有
session:有
csrf:有
xss:有
其他:缓存、信号、Form组件、ModelFormm、Admin
tornado功能概览:
socket:有(异步非阻塞、支持WebScoket)
路由系统:有
视图函数:有
静态文件:有
ORM操作:无
模板语言:有
simple_tag:有,uimethod,uimodule
cokies:有
session:无
csrf:有
xss:有
其他:无
Django和Tonado功能对比
二、Tornado自带功能
1、Tornado执行流程
如果不明白Tornado的IO多路复用机制,作为1个使用者来说将是一件很失败的事情;
Tornado是基于epoll实现的;
importtornado.webimporttornado.ioloopclassIndexHandler(tornado.web.RequestHandler):defget(self):
self.write('hello Martin!')if __name__ == '__main__':
app=tornado.web.Application([(r"/",IndexHandler)])
app.listen(8000) #监听但是不绑定
tornado.ioloop.IOLoop.current().start()#对epoll机制的封装,实例化1个epoll实例,将socket交给epoll进行监听管理
第一步:tornado.ioloop.IOLoop.current().start()执行,实例化1个epoll容器,将socket0交给epoll进行循环监听
第二步:客户端A访问 /index –> http://127.0.0.1:8888/index/对服务器进行连接
第三步:由于客户端A连接了Tornado开启的socket,所有epoll循环发现了有socket0可读,就把客户A连接socket0的socket添加到epoll容器进行循环监听
第四步:如果循环监听发现有客户socket有可读的操作,就响应客户端(走路由–》视图–》模板渲染….)
PS:
Tornado通过1个事件循环监听,监听到哪个socket可以操作,Tornado就操作哪个!只用了1个线程就可对多个请求进行处理;
但是Tornado的单线程有个致命缺陷,如果我们在响应客户端的过程中(路由、视图、查库、模板渲染..)出现了long IO,即使另一个客户端socket可操作,也必须排队等待…….;
于是这就为什么我们要在视图中做异步的原因….
配置文件:
setings={'template_path':'templates',#配置模板路径
'static_path':'static', #配置静态文件存放的路径
'static_url_prefix':'/zhanggen/', #在模板中引用静态文件路径时使用的别名 注意是模板引用时的别名
"xsrf_cookies": True, #使用xsrf认证
'cookie_secret' :'xsseffekrjewkhwy'#cokies加密时使用的盐
}
application=tornado.web.Application([
(r'/login/',LoginHandler) ,#参数1 路由系统
(r'/index/',IndexHandler) ,#参数1 路由系统
],**setings #参数2 配置文件
)
View Code
2、路由系统
2.1、动态路由(url传参数)
app=tornado.web.Application(
[
(r'^/index/$',MainHandler),
(r'^/index/(\d+)$',MainHandler), #url传参
]
)
View Code
2.2、域名匹配
#支持域名匹配 www.zhanggen.com:8888/index/333333
app.add_handlers('www.zhanggen.com',[
(r'^/index/$', MainHandler),
(r'^/index/(\d+)$', MainHandler),
])
View Code
2.3、反向生成url
app.add_handlers('www.zhanggen.com',[
(r'^/index/$', MainHandler,{},"name1"), #反向生成url
(r'^/index/(\d+)$', MainHandler,{},"name2"),
])
路由
classMainHandler(tornado.web.RequestHandler):def get(self,*args,**kwargs):
url1=self.application.reverse_url('name1')
url2= self.application.reverse_url('name2', 666)print(url1,url2)
self.write('hello word')
视图
3、视图
tornado的视图才有CBV模式,url匹配成功之后先 视图执行顺序为initialize 、prepare、get/post/put/delete(视图)、finish;
一定要注意这3个钩子方法:
#!/bin/env python#-*- coding: UTF-8 -*-
"""Copyright (c) 2016 SensorsData, Inc. All Rights Reserved
@author padme(jinsilan@sensorsdata.cn)
@brief
封装些基本的方法 还有logger
mysql> desc user_info;
+———-+————–+——+—–+———+——-+
| Field | Type | Null | Key | Default | Extra |
+———-+————–+——+—–+———+——-+
| name | varchar(100) | NO | PRI | NULL | |
| cname | varchar(100) | NO | | NULL | |
| mail | varchar(100) | NO | | NULL | |
| password | varchar(128) | YES | | NULL | |
| salt | varchar(20) | YES | | NULL | |
| role | varchar(20) | YES | | NULL | |
| comment | text | YES | | NULL | |
+———-+————–+——+—–+———+——-+
7 rows in set (0.00 sec)"""
importcopyimportdatetimeimporthashlibimportjsonimportloggingimportpprintimportpymysqlimportosimportrandomimporttimeimportthreadingimporttornado.webimporttornado.escapeimportsysimportldap
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../')))importconffrom data importgroup_cache, cache
RESERVED_USERS={"monitor": {"name": "monitor", "password": "968df05ea257081d6d7831a3fc4c4145", "role":"super", "cname": "monitor邮件组", "mail": "monitor@sensorsdata.cn"},"sale": {"name": "sale", "password": "128ea23fa279cf2d1fa26a1522cc2a53", "role":"normal", "cname": "sale", "mail": "sale@sensorsdata.cn"},"ztxadmin": {"name": "ztxadmin", "password": "6934fd6089194c9f9ec0e1b011045abf", "role":"admin", "cname": "张天晓admin", "mail": "zhangtianxiao@sensorsdata.cn"},"jenkins": {"name": "jenkins", "password": "przs7j0ubzvvgu9ofw48a55n813edxzk","role": "normal", "cname": "jenkins专用", "mail": "jinsilan@sensorsdata.cn"}
}
RESERVED_USER_TOKENS={"968df05ea257081d6d7831a3fc4c4145": {"name": "monitor", "role": "super","cname": "monitor邮件组"},"128ea23fa279cf2d1fa26a1522cc2a53": {"name": "sale", "role": "normal", "cname": "sale"},"6934fd6089194c9f9ec0e1b011045abf": {"name": "ztxadmin", "role": "admin", "cname": "张天晓admin"},"przs7j0ubzvvgu9ofw48a55n813edxzk": {"name": "jenkins", "role": "normal", "cname": "jenkins专用"},
}
counter_map={}
snapshots=[]
lock=threading.Lock()
uptime= time.time() * 1000
classDatetimeSerializer(json.JSONEncoder):"""实现 date 和 datetime 类型的 JSON 序列化,以符合 SensorsAnalytics 的要求。"""
defdefault(self, obj):ifisinstance(obj, datetime.datetime):
head_fmt= "%Y-%m-%d %H:%M:%S"
return "{main_part}.{ms_part}".format(
main_part=obj.strftime(head_fmt),
ms_part=int(obj.microsecond / 1000))elifisinstance(obj, datetime.date):
fmt= '%Y-%m-%d'
returnobj.strftime(fmt)returnjson.JSONEncoder.default(self, obj)#最多保留100个snapshot
MAX_SNAPSHOT_NUM = 100MIN_SNAPSHOT_INTERVAL= 60 * 1000
classMysqlCursorWrapper():def __init__(self, mysql_conf):
self.mysql_con= pymysql.connect(**mysql_conf)
self.cursor=self.mysql_con.cursor()def __enter__(self):returnself.cursordef __exit__(self, *exc_info):
self.cursor.close()
self.mysql_con.close()classBaseHandler(tornado.web.RequestHandler):'''1. 自带counter
2. 自带logger
3. 自带mysql cursor(短期就每个查询发起一个连接吧 反正目前qps不高 搞个连接池也总会超时的'''
def send_error(self, status_code=500, **kwargs):#对tornado 报错进行了二次封装
#self.set_status(status_code)
self.logger.warn('send error: %d[%s]' % (status_code, kwargs.get('reason', 'unknown reason')))return super().send_error(status_code, **kwargs)def get_json_body(self):#获取 json json.loads
self.logger.debug(self.request.body)
data=Falsetry:
data= json.loads(self.request.body.decode('utf8'))
self.logger.debug(data)except:
self.send_error(400, reason='Invalid json data')returndatadef redirect_login(self, error, url=None):#跳转的方法
if noturl:
url=self.request.uri
self.redirect('/login?next=%s&error=%s' %(tornado.escape.url_escape(url), tornado.escape.url_escape(error)))def check_admin(self):#检测是否是admin角色
if self.role != 'admin':
self.send_error(401)raise Exception('required login')def clear_auth(self): #清除cookie 的auth键中的 权限角色
self.clear_cookie('auth')
self.role= 'normal'self.ldapPass=False
self.ldapRole= 'nobody'
def initialize(self, url):#tornado自带的方法 初始化 把用户角色还
self.logger =logging
self.tag= url.lstrip('/').replace('([0-9]+)', 'id').replace('/', '_').replace('(.*)', 'name')print(self.tag)
self.user= '未登录'self.role= 'normal'self.ldapPass=False
self.ldapRole= 'nobody'
def get_gitlab_role(self, user): #获取用户在 ldap中的角色
returngroup_cache.get_gitlab_role(user)def prepare_ldap(self, user, pwd):#使用用户提交的 用户名、密码去ldap服务器 认证
returngroup_cache.ldap_simple_authenticate(user, pwd)def prepare(self): #tornado自带的钩子顺序 initialize 、prepare、get/post/put/delete、on_finish;
k = "%s_num" %self.tag
counter_map[k]= counter_map.get(k, 0) + 1self.start_time= time.time() * 1000.0auth_str= self.get_secure_cookie("auth")ifauth_str:
auth= json.loads(auth_str.decode('utf8'))
self.user= auth['u']
self.role= auth['r']elif self.get_argument('token', None):#从get请求的url参数中获取token
token = self.get_argument('token')if token inRESERVED_USER_TOKENS:
user_result=RESERVED_USER_TOKENS[token]ifuser_result:
self.user= user_result['name']
self.role= user_result['role']#适配销售创建接口
if self.user == 'sale':
self.user= self.get_argument('user')if notcache.global_user_infos:
cache.update_global_user_infos(self)if not self.user incache.global_user_infos:
self.user= "未登录"
if not self.request.path.startswith('/login'):
logging.getLogger('auth').info('%s %s %s %s\n%s'\%(self.user, self.role, self.request.method, self.request.uri,
self.request.body))defon_finish(self):
k= "%s_succeed_num" %self.tag
counter_map[k]= counter_map.get(k, 0) + 1k= "%s_interval" %self.tag
interval= time.time() * 1000.0 -self.start_time
counter_map[k]= counter_map.get(k, 0) +intervalif random.randint(0, 10) !=0:returnself.add_snapshot()defadd_snapshot(self):
with lock:
s=copy.deepcopy(counter_map)
s['time'] = time.time() * 1000.0snapshots.append(s)while len(snapshots) >MAX_SNAPSHOT_NUM:
snapshots.pop(0)def get_mysql_cursor(self, mysql_conf=conf.mysql_conf):returnMysqlCursorWrapper(mysql_conf)def query_args(self, sql, args, mysql_conf=conf.mysql_conf):'''返回a list of dict'''self.logger.debug('query mysql: %s;args: %s' %(sql, args))
ret=[]
with self.get_mysql_cursor(mysql_conf) as cursor:
cursor.execute(sql, args)
columns= [x[0] for x incursor.description]for row incursor.fetchall():
d=dict(zip(columns, row))
ret.append(d)
cursor.execute('commit')
self.logger.debug('ret %d lines. top 1:\n%s' % (len(ret), pprint.pformat(ret[:1], width=200)))returnretdef update_args(self, sql, args, mysql_conf=conf.mysql_conf):'''返回id'''self.logger.debug('update mysql: %s;args: %s' %(sql, args))
with self.get_mysql_cursor(mysql_conf) as cursor:
cursor.execute(sql, args)
lastrow=cursor.lastrowid
cursor.execute('commit')returnlastrowdef update(self, sql, mysql_conf=conf.mysql_conf):returnself.update_args(sql, None, mysql_conf)def query(self, sql, mysql_conf=conf.mysql_conf):returnself.query_args(sql, None, mysql_conf)defto_json(self, d):return json.dumps(d, cls=DatetimeSerializer)def render(self, name, **args):
args['current_url'] =self.request.path
args['current_user'] =self.user
args['current_role'] =self.rolereturn super().render(name, **args)def check_auth(self, customer_id, customer_info=None):'''检查权限:super有所有权限;其他只能看自己的客户(通过user_id销售和customer_success来标记)'''
if notcustomer_info:
sql= 'select * from customer_info where visible = true AND customer_id = "%s"' %customer_id
customer_info=self.query(sql)[0]#members = [x for x in customer_info['members'].split(',') if x]
if self.role == 'super' orself.check_customer_member(customer_info):returnTrue
self.logger.warn('bad auth: %s[%s] cannot see %s[%s/%s]'\% (self.user, self.role, customer_id, customer_info['user_id'],
customer_info['customer_success']))returnFalsedefcheck_customer_member(self, customer_info):if not customer_info['members']:
members=[]elif type(customer_info['members']) ==str:
members= [x for x in customer_info['members'].split(',') ifx]else:
members= customer_info['members']return customer_info['user_id'] == self.user orcustomer_info['customer_success'] == self.user or self.user inmembersdefredirect_if_not_login(func):'''跳转到登录页面'''
def _decorator(self, *args, **kwargs):if self.user == '未登录':if notself.ldapPass:
self.logger.error('not login!')return self.redirect_login('请先登录才可以看相关内容')return func(self, *args, **kwargs)return_decoratordeferror_if_not_login(func):'''检查是否登录 如果没有则返回401'''
def _decorator(self, *args, **kwargs):if self.user == '未登录':if notself.ldapPass:return self.send_error(401)return func(self, *args, **kwargs)return_decoratordeferror_if_not_admin(func):'''检查是否admin 如果没有则返回401'''
def _decorator(self, *args, **kwargs):if self.role != 'admin' and self.role != 'super':if self.ldapRole != 'admin' and self.ldapRole != 'super':return self.send_error(401)return func(self, *args, **kwargs)return_decoratordeferror_if_not_super(func):'''检查是否super 如果没有则返回401'''
def _decorator(self, *args, **kwargs):if self.role != 'super':if self.ldapRole != 'super':return self.send_error(401)return func(self, *args, **kwargs)return_decoratorclassStatusHandler(BaseHandler):defget(self):
self.add_snapshot()
first={}
with lock:if notsnapshots:
second={}else:
second= snapshots[-1]for x in reversed(snapshots[:-1]):if second['time'] – x['time'] >MIN_SNAPSHOT_INTERVAL:
first=x
self.logger.debug('first=%s second=%s' %(first, second))if first andsecond:
interval= (second['time'] – first['time']) / 1000.0
else:
interval=0if 'time' infirst:
first_date= datetime.datetime.fromtimestamp(first['time'] / 1000.0).strftime('%Y-%m-%d %H:%M:%S')else:
first_date= 'unknown'
if 'time' insecond:
second_date= datetime.datetime.fromtimestamp(second['time'] / 1000.0).strftime('%Y-%m-%d %H:%M:%S')else:
second_date= 'unknown'tags= [x[:-4] for x in second if x.endswith('_num') and not x.endswith('_succeed_num')]
tags.remove(self.tag)
self.logger.debug('tags=%s' %tags)
ret= {'from': first_date, 'to': second_date}for t intags:
args={}for (prefix, v) in [('first', first), ('second', second)]:for (suffix, alias) in [('num', 'n'), ('succeed_num', 's'), ('interval', 'i')]:
args['%s_%s' % (prefix, alias)] =v.get(suffix, 0)if args['first_n']:
ret[t]={'query': args['second_n'] – args['first_n'],'success': args['second_s'] – args['first_s'],'query_per_minutes': (args['second_n'] – args['first_n']) * 60 /interval,'success_rate': (args['second_s'] – args['first_s']) / (args['second_n'] – args['first_n']) ifargs['second_n'] > args['first_n'] else '-','avg_interval': (args['second_i'] – args['first_i']) / (args['second_s'] – args['first_s']) ifargs['second_s'] > args['first_s'] else '-',
}else:
ret[t]={'query': args['second_n'],'success': args['second_s'],'success_rate': args['second_s'] / args['second_n'] if args['second_n'] != 0 else '-','query_per_minutes': '-','avg_interval': args['second_i'] / args['second_s'] if args['second_s'] != 0 else '-',
}
self.write(self.to_json(ret))class HomeHandler(BaseHandler): #首页视图
defget(self):
self.render('home.html')class LoginHandler(BaseHandler): #登录页面的视图
defget(self):
self.clear_auth()
param= {'error': self.get_argument('error', None)}
self.render('login.html', **param)defpost(self):
username= self.get_argument("username", "")
password= self.get_argument("password", "")
remember= self.get_argument('remember', '')
user_result=Noneif username inRESERVED_USERS:
user_result=RESERVED_USERS[username]#保留用户
ifuser_result:
token= self.get_argument('token', None)if token == user_result['password']:
auth= {'u': username, 'r': user_result['role'], 'd': datetime.datetime.now().strftime('%Y-%m-%d')}#cookie中的认证信息
ifremember:
self.set_secure_cookie('auth', json.dumps(auth), expires_days=conf.cookie_expire_day)else:
self.set_secure_cookie('auth', json.dumps(auth))
self.redirect(self.get_argument('next', '/'))return
else:
self.logger.warn('invalid password, given %s result %s' % (token, user_result['password']))
self.redirect_login('密码错误', self.get_argument('next', '/'))return
#查ldap
self.ldapPass, self.ldapRole =self.prepare_ldap(username, password)if not self.ldapPass: #没查到
if self.ldapRole != "locked":
self.logger.warn('user %s wrong password', username)
self.redirect_login('用户密码输入错误', self.get_argument('next', '/'))else:
self.logger.warn('user %s has been locked', username)
self.redirect_login('ldap用户被锁定,请联系管理员解锁', self.get_argument('next', '/'))returnrole= self.ldapRole #查到了ldap的角色
if role != "nobody": #如果 不是匿名角色 #开始写cokie了
auth = {'u': username, 'r': role, 'd': datetime.datetime.now().strftime('%Y-%m-%d')}ifremember:
self.set_secure_cookie('auth', json.dumps(auth), expires_days=conf.cookie_expire_day)else:
self.set_secure_cookie('auth', json.dumps(auth))
self.redirect(self.get_argument('next', '/'))returnself.logger.warn('user %s not in ldap or not in group', username)
self.redirect_login('暂不支持你所在的邮件组', self.get_argument('next', '/'))
base_handler.py
importtornado.ioloopimporttornado.webclassMainHandler(tornado.web.RequestHandler):def initialize(self): #1
print()defprepare(self):pass
def get(self,*args,**kwargs):
self.write('hello word')def post(self, *args, **kwargs):pass
def finish(self, chunk=None):passsuper(self,MainHandler).finish()
View Code
3.1、请求相关
self.get_body_argument('user') :获取POST请求携带的参数
self.get_body_arguments('user_list') :获取POST请求参数列表(如chebox标签和select多选)
self.request.body.decode('utf-8'):获取json数据
self.get_query_argument('user') :获取GET请求携带的参数
self.get_query_arguments('user_list') :获取GET请求参数列表(如chebox标签和select多选)
self.get_argument('user') :获取GET和POST请求携带的参数
self.get_arguments('user_list'):获取GET和POST请求参数列表(如chebox标签和select多选)
注:以上取值方式如果取不到值就会报错,可以设置取不到值就取None;(例如 self.get_argument('user',None))
3.2、响应相关
self.write() :响应字符串
self.render():响应页面
self.redirect():页面跳转
4、模板语言
tornado的模板语言和Python语法一致
View Code
4.1、登录页面
#准备安装Tornado: pip install tornado
importtornado.ioloopimporttornado.webclass LoginHandler(tornado.web.RequestHandler): #注意继承RequestHandler 而不是redirectHandler
defget(self):
self.render('login.html')
setings={'template_path':'templates',#配置模板路径
'static_path':'static', #配置静态文件存放的路径
'static_url_prefix':'/zhanggen/' #在模板中引用静态文件路径时使用的别名 注意是模板引用时的别名
}
application=tornado.web.Application([
(r'/login/',LoginHandler) #参数1 路由系统
],**setings #参数2 配置文件
)if __name__ == '__main__':
application.listen(8888) #创建1个socket对象
tornado.ioloop.IOLoop.instance().start() #conn,addr=socket.accept()进入监听状态
View Code
Title
用户名
密码
提交
模板语言
4.2、引入静态文件
通过别名引入静态文件
static_url()方式引入静态文件
通过static_url()方法引入静态文件的好处:
1、使用static_url()可以不用考虑静态文件修改之后造成引用失效的情况;
2、还会生成静态文件url会有一个v=…的参数,这是tornado根据静态文件MD5之后的值,如果后台的静态文件修改,这个值就会变化,前端就会重新向后台请求静态文件,保证页面实时更新,不引用浏览器缓存;
4.3、上下文对象
如果模板语言中声明了变量,上下文对象必须对应传值,如果没有就设置为空,否则会报错;
self.render('login.html',**{'erro_msg':'' }) #模板中声明了变量,视图必须传值,如果没有就设置为空;
View Code
5、xsrf_tocken认证
setings={'template_path':'templates',#配置模板路径
'static_path':'static', #配置静态文件存放的路径
'static_url_prefix':'/zhanggen/', #在模板中引用静态文件路径时使用的别名 注意是模板引用时的别名
"xsrf_cookies": True, #使用xsrf认证
}
配置文件setings={"xsrf_cookies": True, }
Title
{%raw xsrf_form_html() %}