一、app.run() 在做什麼?
執行 app.run() 便啓動了 Flask 服務,這個服務為什麼能夠監聽 http 請求並做出響應?讓我們進入 run 函數內部一探究竟。
def run(self, host='localhost', port=5000, **options):
from werkzeug import run_simple
if 'debug' in options:
self.debug = options.pop('debug')
options.setdefault('use_reloader', self.debug)
options.setdefault('use_debugger', self.debug)
return run_simple(host, port, self, **options)
可以看到,run 函數3-6行做了些參數默認值設置,最後將參數傳入 run_simple 並調用返回,注意,第3個參數是 Flask 對象(留意 Flask 對象的傳遞)。run_simple 是從 werkzeug 導入的。
def run_simple(hostname, port, application, use_reloader=False,
use_debugger=False, use_evalex=True,
extra_files=None, reloader_interval=1, threaded=False,
processes=1, request_handler=None, static_files=None,
passthrough_errors=False, ssl_context=None):
...
def inner():
make_server(hostname, port, application, threaded,
processes, request_handler,
passthrough_errors, ssl_context).serve_forever()
...
if use_reloader:
test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
test_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
test_socket.bind((hostname, port))
test_socket.close()
run_with_reloader(inner, extra_files, reloader_interval)
else:
inner()
run_simple() 最終調用 inner(),這個函數做了兩件事:
- 構造一個服務:
make_server() - 啓動服務:
.serve_forever()
def make_server(host, port, app=None, threaded=False, processes=1,
request_handler=None, passthrough_errors=False,
ssl_context=None):
if threaded and processes > 1:
raise ValueError("cannot have a multithreaded and "
"multi process server.")
elif threaded:
return ThreadedWSGIServer(host, port, app, request_handler,
passthrough_errors, ssl_context)
elif processes > 1:
return ForkingWSGIServer(host, port, app, processes, request_handler,
passthrough_errors, ssl_context)
else:
return BaseWSGIServer(host, port, app, request_handler,
passthrough_errors, ssl_context)
make_server() 返回一個 BaseWSGIServer 對象。
class BaseWSGIServer(HTTPServer, object):
...
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None):
...
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
...
BaseWSGIServer 繼承 HTTPServer,HTTPServer 繼承來自 Python 標準庫 SocketServer.TCPServer 類,其最終繼承自 SocketServer.BaseServer。注意 Flask 對象被賦值給 self.app。
BaseServer 類有一個 serve_forever 方法:
def serve_forever(self, poll_interval=0.5):
self.__is_shut_down.clear()
try:
while not self.__shutdown_request:
r, w, e = _eintr_retry(select.select, [self], [], [],
poll_interval)
if self.__shutdown_request:
break
if self in r:
self._handle_request_noblock()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()
其中有一個 while 循環,在不斷執行:
if ready:
self._handle_request_noblock()
def _handle_request_noblock(self):
...
self.process_request(request, client_address)
...
_handle_request_noblock() 調用 process_request(),這是處理請求的函數,這個函數實例化 self.RequestHandlerClass 類來處理請求,這個類是 BaseServer 初始化時傳入的參數。
到此我們知道,app.run() 實際上實例化了一個 socketserver.BaseServer 類,並調用該類的 server_forever 方法,這個方法主體是一個 while 循環,最終在不斷實例化 self.RequestHandlerClass 來處理請求,在不中斷 while 的情況下,程序會一直運行,不斷接收請求,處理請求。
二、Flask 如何收到請求?
while 程序在不斷監聽請求,當接收到請求時,實例化 self.RequestHandlerClass 來處理請求。這個變量在 werkzeug.BaseWSGIServer 的 __init__ 方法中被賦值(以下第10行):
class BaseWSGIServer(HTTPServer, object):
multithread = False
multiprocess = False
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
HTTPServer.__init__(self, (host, int(port)), handler)
默認值為 werkzeug.WSGIRequestHandler,這個類最終繼承自 SocketServer.BaseRequestHandler,也就是説isinstance(self.RequestHandlerClass, SocketServer.BaseRequestHandler)。
class BaseRequestHandler:
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
def setup(self):
pass
def handle(self):
pass
def finish(self):
pass
SocketServer.BaseRequestHandler 實例化時調用 self.handle 方法。注意,此時 Flask 對象存在於 self.server.app 中。werkzeug.WSGIRequestHandler 將這個方法覆寫:
class WSGIRequestHandler(BaseHTTPRequestHandler, object):
def handle(self):
try:
return BaseHTTPRequestHandler.handle(self)
except (socket.error, socket.timeout), e:
self.connection_dropped(e)
except:
if self.server.ssl_context is None or not is_ssl_error():
raise
def handle_one_request(self):
self.raw_requestline = self.rfile.readline()
if not self.raw_requestline:
self.close_connection = 1
elif self.parse_request():
return self.run_wsgi()
class BaseHTTPRequestHandler(object):
def handle(self):
self.close_connection = 1
self.handle_one_request()
while not self.close_connection:
self.handle_one_request()
覆寫的方法調用 BaseHTTPRequestHandler.handle(self),內部調用了 self.handle_one_request 方法,最終調用了 WSGIRequestHandler.run_wsgi 方法:
class WSGIRequestHandler(BaseHTTPRequestHandler, object):
def run_wsgi(self):
app = self.server.app
environ = self.make_environ()
...
def execute(app):
application_iter = app(environ, start_response)
try:
for data in application_iter:
write(data)
# make sure the headers are sent
if not headers_sent:
write('')
finally:
if hasattr(application_iter, 'close'):
application_iter.close()
application_iter = None
...
try:
execute(app)
except (socket.error, socket.timeout), e:
...
run_wsgi 方法第一行即取出 Flask 對象,然後將其傳入 execute 函數並調用,execute 第一行 application_iter = app(environ, start_response),這是在調用 Flask.__call__ 方法。
class Flask(object):
def wsgi_app(self, environ, start_response):
with self.request_context(environ):
rv = self.preprocess_request()
if rv is None:
rv = self.dispatch_request()
response = self.make_response(rv)
response = self.process_response(response)
return response(environ, start_response)
def __call__(self, environ, start_response):
return self.wsgi_app(environ, start_response)
至此,請求被傳到了 Flask 對象中處理,或者説 Flask 收到了請求。
三、Flask 如何處理請求?
Web 服務器把 environ, start_response 兩個參數傳入 Flask.__call__ 處理,正常處理完後將 Flask.__call__ 返回的數據寫入響應體中。Flask 處理請求其實是接收這兩個參數並返回數據。
class Flask(object):
def __call__(self, environ, start_response):
return self.wsgi_app(environ, start_response)
def wsgi_app(self, environ, start_response):
with self.request_context(environ):
rv = self.preprocess_request()
if rv is None:
rv = self.dispatch_request()
response = self.make_response(rv)
response = self.process_response(response)
return response(environ, start_response)
Flask.__call__ 調用 Flask.wsgi_app 並返回,不將處理邏輯直接實現在 Flask.__call__ 裏而封裝在 Flask.wsgi_app裏,是為了能應用中間件,例如:app.wsgi_app = MyMiddleware(app.wsgi_app)。
Flask.wsgi_app 詳解:
- 第8行(
self.preprocess_request()):執行處理請求前的某些處理。即執行所有被before_request裝飾的函數,如果返回值非空,則不進行真正的請求處理。 - 第10行(
self.dispatch_request()):分發處理請求。將 URL 與對應的處理函數匹配,執行處理。 - 第11行(
self.make_response()):將返回值轉換為響應對象。 - 第12行(
self.process_response()):執行處理請求後的某些處理。即執行所有被after_request裝飾的函數。 - 第13行(
response(environ, start_response)):返回一個可迭代對象。
FLask 先調用 Flask.preprocess_request 處理請求,再調用與 URL 對應的函數處理(注意,如果 Flask.preprocess_request 返回值非空,則跳過與 URL 對應的處理函數),然後把返回值轉換為響應對象,最後調用 Flask.process_response 處理。
3.1 請求前置處理
class Flask(object):
def before_request(self, f):
self.before_request_funcs.append(f)
return f
def preprocess_request(self):
for func in self.before_request_funcs:
rv = func()
if rv is not None:
return rv
Flask.before_request 是一個裝飾器,會把被裝飾的函數添加到列表 self.before_request_funcs 中, self.preprocess_request 遍歷 self.before_request_funcs,執行所有函數,如果執行的函數返回值非空,則返回。
3.2 請求處理
class Flask(object):
def match_request(self):
rv = _request_ctx_stack.top.url_adapter.match()
request.endpoint, request.view_args = rv
return rv
def dispatch_request(self):
try:
endpoint, values = self.match_request()
return self.view_functions[endpoint](**values)
except HTTPException, e:
handler = self.error_handlers.get(e.code)
if handler is None:
return e
return handler(e)
except Exception, e:
handler = self.error_handlers.get(500)
if self.debug or handler is None:
raise
return handler(e)
match_request 中調用的 _request_ctx_stack.top.url_adapter.match(),是 _RequestContext.url_adapter.match()。
class _RequestContext(object):
def __init__(self, app, environ):
self.app = app
self.url_adapter = app.url_map.bind_to_environ(environ)
_RequestContext.url_adapter 是 Flask 對象中 url_map.bind_to_environ(environ) 返回的值,Map.bin_to_environ 返回 MapAdapter 對象。
MapAdapter.match() 會根據 URL 與 請求方法(GET、POST 等)(URL、請求方法等信息會從 environ 中獲取)返回當前請求的 endpoint 與 參數。
# eg
>>> urls.match("/downloads/42")
('downloads/show', {'id': 42})
在執行 URL 與 endpoint 解析前,需要先添加匹配規則。Flask 如何做的呢?
from werkzeug.routing import Map, Rule
class Flask(object):
def __init__(self, package_name):
self.url_map = Map()
Map 存儲 URL 規則和配置參數,可以通過 Map.add 添加 URL 匹配規則。
class Flask(object):
def add_url_rule(self, rule, endpoint, **options):
options['endpoint'] = endpoint
options.setdefault('methods', ('GET',))
self.url_map.add(Rule(rule, **options))
def route(self, rule, **options):
def decorator(f):
self.add_url_rule(rule, f.__name__, **options)
self.view_functions[f.__name__] = f
return f
return decorator
方法 Flask.add_url_rule 與裝飾器 Flask.route 都能添加匹配規則。裝飾器 route 會將被裝飾的函數名作為 endpoint,並將函數名作為 key,函數作為 value,存在 Flask 對象的 self.view_functions 屬性中。
通過 endpoint, values = self.match_request() 解析得到 endpoint 與 請求參數,再從 self.view_functions 中取出對應的函數處理(return self.view_functions[endpoint](**values))。
endpoint 作用與意義可參考:https://stackoverflow.com/que...
如果處理過程中拋出異常,會根據異常碼(e.code)取出對應的函數,執行異常處理函數。
異常處理函數通過裝飾器 Flask.errorhandler 添加,將錯誤碼作為 key,被裝飾的函數作為 value,存入 Flask 的屬性 self.error_handlers 中:
class Flask(object):
def errorhandler(self, code):
def decorator(f):
self.error_handlers[code] = f
return f
return decorator
3.3 生成響應對象
為什麼需要這一步,直接返回 Python 內置數據類型不行嗎?
不行,因為 WSGI 規定 application 端必須返回一個可迭代對象[1]。
class Flask(object):
def make_response(self, rv):
if isinstance(rv, self.response_class):
return rv
if isinstance(rv, basestring):
return self.response_class(rv)
if isinstance(rv, tuple):
return self.response_class(*rv)
return self.response_class.force_type(rv, request.environ)
Flask.make_response(rv) 將參數 rv 轉換為 self.response_class 對象。
3.4 請求後置處理
迭代 self.after_request_funcs 中保存的函數,逐個執行,返回最後執行完的數據。
添加函數的方式為 Flask.after_request 裝飾器。
3.5 返回可迭代對象
Flask 默認的 self.response_class 類繼承自 werkzeug.Response,這個類的 __call__ 方法接收 environ, start_response 兩個參數,並返回一個迭代器。
class BaseResponse(object):
def __call__(self, environ, start_response):
app_iter, status, headers = self.get_wsgi_response(environ)
start_response(status, headers)
return app_iter
當調用 response(environ, start_response) 時返回一個迭代器。
四、Flask 如何處理併發請求?
Flask.wsgi_app 中的 with self.request_context(environ) 有什麼用?
self.request_context 返回 _RequestContext 的實例化對象。
class _RequestContext(object):
def __init__(self, app, environ):
self.app = app
self.url_adapter = app.url_map.bind_to_environ(environ)
self.request = app.request_class(environ)
self.session = app.open_session(self.request)
self.g = _RequestGlobals()
self.flashes = None
def __enter__(self):
_request_ctx_stack.push(self)
def __exit__(self, exc_type, exc_value, tb):
if tb is None or not self.app.debug:
_request_ctx_stack.pop()
進入 with 語句體時,會執行 __enter__ 方法,離開 with 語句體時會執行 __exit__ 方法。也就是説,在 Flask 開始處理請求前,會執行 _request_ctx_stack.puth(self),處理完請求後會執行 _request_ctx_stack.pop(self)。
_request_ctx_stack 是一個全局變量,是 LocalStack 類的對象。
class LocalStack(object):
def __init__(self):
self._local = Local()
self._lock = allocate_lock()
def __release_local__(self):
self._local.__release_local__()
def push(self, obj):
self._lock.acquire()
try:
rv = getattr(self._local, 'stack', None)
if rv is None:
self._local.stack = rv = []
rv.append(obj)
return rv
finally:
self._lock.release()
def pop(self):
self._lock.acquire()
try:
stack = getattr(self._local, 'stack', None)
if stack is None:
return None
elif len(stack) == 1:
release_local(self._local)
return stack[-1]
else:
return stack.pop()
finally:
self._lock.release()
@property
def top(self):
try:
return self._local.stack[-1]
except (AttributeError, IndexError):
return None
LocalStack 在初始化時會創建一個線程鎖(self._lock = allocate_lock()),LocalStack.push(obj) 首先請求加鎖,獲取到鎖後將 obj 添加到列表 self._local.stack,如果 self._local 沒有屬性 stack 則將 stack 初始化為空列表,最後釋放鎖(self._lock.release())。self._local.stack 是什麼?
class Local(object):
__slots__ = ('__storage__', '__lock__')
def __init__(self):
object.__setattr__(self, '__storage__', {})
object.__setattr__(self, '__lock__', allocate_lock())
def __getattr__(self, name):
self.__lock__.acquire()
try:
try:
return self.__storage__[get_ident()][name]
except KeyError:
raise AttributeError(name)
finally:
self.__lock__.release()
def __setattr__(self, name, value):
self.__lock__.acquire()
try:
ident = get_ident()
storage = self.__storage__
if ident in storage:
storage[ident][name] = value
else:
storage[ident] = {name: value}
finally:
self.__lock__.release()
在初始化 self._local.stack 屬性時,self._local.stack = rv = [] 等同於 Local.__setattr__(self._local, 'stack', []),這裏首先加鎖,然後獲取線程id(ident = get_ident()),將線程id作為字典 self.__storage__ 的鍵,將 {'stack': []} 作為值,即:
self.__storage__ = {
"thread1": {
"stack": []
},
"thread2": {
"stack": []
},
...
}
LocalStack.push(obj) 是將 obj 加入了對應線程的 stack 列表。LocalStack.pop() 是將 obj 從對應的線程的 stack 列表移除。另外還有 LocalStack.top,返回對應線程存在 stack 中的對象。
為什麼要如此大費周章的做這件事呢?來看一個例子:
假設 Web 服務器單進程啓動,啓動2個線程,同一時刻有2個請求進來,每個請求都有對應的 environ 數據,如果直接賦值給同一個變量,後一個請求會覆蓋前一個請求的數據,因為線程數據是共享的。要如何保存這兩個請求的數據?並在需要的時候能正確取出?Flask 的做法是設置一個全局變量 _request_ctx_stack,存儲數據最終用一個字典 self.__storage__,將不同線程的請求數據用線程id作為 key,請求數據存在線程id對應的 stack 列表中。
self.__storage__ = {
"thread1": {
"stack": [obj1]
},
"thread2": {
"stack": [obj2]
},
...
}
Flask 提供了非常便捷的方式來獲取當前請求中的 Flask 對象、請求、session、全局變量等數據:current_app, request, session, g。
_request_ctx_stack = LocalStack()
current_app = LocalProxy(lambda: _request_ctx_stack.top.app)
request = LocalProxy(lambda: _request_ctx_stack.top.request)
session = LocalProxy(lambda: _request_ctx_stack.top.session)
g = LocalProxy(lambda: _request_ctx_stack.top.g)
其本質是返回存在於 _request_ctx_stack.top 中對應的屬性,返回的數據是與當前線程id相對應的,實現了線程數據隔離。
LocalProxy 是封裝 local 的代理對象,能夠保護 local 對象,防止其被意外修改,對應設計模式中的代理模式。
五、總結
回過頭看看 Flask 的常見用法:
from flask import Flask, request, session
app = Flask(__name__)
@app.route('/login', methods=['GET', 'POST'])
def login():
error = None
if request.method == 'POST':
if request.form['username'] != USERNAME:
error = 'Invalid username'
elif request.form['password'] != PASSWORD:
error = 'Invalid password'
else:
session['logged_in'] = True
flash('You were logged in')
return redirect(url_for('show_entries'))
return render_template('login.html', error=error)
if __name__ == '__main__':
app.run()
首先實例化一個 Flask 對象 app,使用裝飾器 route 添加 URL 與處理函數,執行 app.run(),啓動服務。
訪問 localhost:5000/login,服務接收到請求後會將 request, current_app 等數據保存至線程id對應的 stack 中,然後進行前置處理(self.preprocess_request),解析 URL,獲取 endpoint 與參數,通過 endpoint 從 self.view_functions 中獲取對應的處理函數,處理完後將返回值轉換為響應對象,進行後置處理(self.process_response),最後返回可迭代對象(response(environ, start_response)),之後便是服務器處理的部分了。
版本:
python 2.7, werkzeug==0.6.1, Flask==0.1參考文獻:
[1] Eby, P.J. (2010). PEP 3333 – Python Web Server Gateway Interface v1.0.1. Retrieved February 18, 2023, from https://peps.python.org/pep-3...
[2] What is an 'endpoint' in Flask?.(n.d). Retrieved February 18, 2023, from https://stackoverflow.com/que...
本文源碼:https://github.com/yyywang/fl...