All in One
vim app_in_one.py
# coding:utf-8
import socket
EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
body = 'Hello World!'
response_params = [
'HTTP/1.0 200 OK',
'Date: Sat, 10 jun 2017 01:01:01 GMT',
'Content-Type: text/plain; charset=utf-8',
'Content-Length: {}\r\n'.format(len(body)),
body,
]
response = '\r\n'.join(response_params)
def handle_connection(conn, addr):
request = b""
while EOL1 not in request and EOL2 not in request:
request += conn.recv(1024)
print(request)
conn.send(response.encode())
conn.close()
def main():
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('127.0.0.1', 8080))
serversocket.listen(1)
try:
while True:
conn, address = serversocket.accept()
handle_connection(conn, address)
finally:
serversocket.close()
if __name__ == '__main__':
main()
python all_in_one.py
curl localhost:8080
# Hello World!
# b'GET / HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: curl/7.64.1\r\nAccept: */*\r\n\r\n'
WSGI
- WSGI = Python Web Server Gateway Interface
Nginx <===> [Gunicorn](https://gunicorn.org/) <===> Flask App
Web Server <===> WSGI <===> Web App
-
The goal of WSGI is to facilitate easy interconnection of existing servers and applications or frameworks
WSGI gunicorn
vim web_app.py
# coding:utf-8
def web_app(environ, start_response):
print(environ, start_response)
print('\n\r')
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
return [b'Hello World!\n']
gunicorn web_app:web_app
curl localhost:8000
# Hello World!
WSGI wsgiref
vim wsgi_wsgiref.py
# coding:utf-8
from wsgiref.simple_server import make_server
from web_app import web_app
if __name__ == "__main__":
make_server('', 8000, web_app).serve_forever()
python wsgi_wsgiref.py
curl localhost:8000
# Hello World!
WSGI implement
vim wsgi_implement.py
# coding:utf-8
import os
import sys
from web_app import web_app
def run_with_wsgi(application):
headers_set = []
def write(data):
status, response_headers = headers_set
sys.stdout.write('Status: %s\r\n' % status)
for header in response_headers:
sys.stdout.write('%s: %s\r\n' % header)
sys.stdout.write('\r\n')
sys.stdout.write(data.decode())
sys.stdout.flush()
def start_response(status, response_headers):
headers_set[:] = [status, response_headers]
environ = dict(os.environ.items())
environ['wsgi.input'] = sys.stdin
environ['wsgi.errors'] = sys.stderr
environ['wsgi.version'] = (1, 0)
environ['wsgi.multithread'] = False
environ['wsgi.multiprocess'] = True
environ['wsgi.run_once'] = True
environ['wsgi.url_scheme'] = 'http'
result = application(environ, start_response)
try:
for data in result:
if data:
write(data)
finally:
if hasattr(result, 'close'):
result.close()
if __name__ == '__main__':
run_with_wsgi(web_app)
python wsgi_implement.py
# Hello World!
网友评论