Gevent Example ServerΒΆ
This example is a basic HTTP/2 server written using gevent, a powerful coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev or libuv event loop.
This example is inspired by the curio one and also demonstrates the correct use of HTTP/2 flow control with h2 and how gevent can be simple to use.
1# -*- coding: utf-8 -*-
2"""
3gevent-server.py
4================
5
6A simple HTTP/2 server written for gevent serving static files from a directory specified as input.
7If no directory is provided, the current directory will be used.
8"""
9import mimetypes
10import sys
11from functools import partial
12from pathlib import Path
13from typing import Tuple, Dict, Optional
14
15from gevent import socket, ssl
16from gevent.event import Event
17from gevent.server import StreamServer
18from h2 import events
19from h2.config import H2Configuration
20from h2.connection import H2Connection
21
22
23def get_http2_tls_context() -> ssl.SSLContext:
24 ctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
25 ctx.options |= (
26 ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
27 )
28
29 ctx.options |= ssl.OP_NO_COMPRESSION
30 ctx.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20')
31 ctx.load_cert_chain(certfile='localhost.crt', keyfile='localhost.key')
32 ctx.set_alpn_protocols(['h2'])
33 try:
34 ctx.set_npn_protocols(['h2'])
35 except NotImplementedError:
36 pass
37
38 return ctx
39
40
41class H2Worker:
42
43 def __init__(self, sock: socket, address: Tuple[str, str], source_dir: str = None):
44 self._sock = sock
45 self._address = address
46 self._flow_control_events: Dict[int, Event] = {}
47 self._server_name = 'gevent-h2'
48 self._connection: Optional[H2Connection] = None
49 self._read_chunk_size = 8192 # The maximum amount of a file we'll send in a single DATA frame
50
51 self._check_sources_dir(source_dir)
52 self._sources_dir = source_dir
53
54 self._run()
55
56 def _initiate_connection(self):
57 config = H2Configuration(client_side=False, header_encoding='utf-8')
58 self._connection = H2Connection(config=config)
59 self._connection.initiate_connection()
60 self._sock.sendall(self._connection.data_to_send())
61
62 @staticmethod
63 def _check_sources_dir(sources_dir: str) -> None:
64 p = Path(sources_dir)
65 if not p.is_dir():
66 raise NotADirectoryError(f'{sources_dir} does not exists')
67
68 def _send_error_response(self, status_code: str, event: events.RequestReceived) -> None:
69 self._connection.send_headers(
70 stream_id=event.stream_id,
71 headers=[
72 (':status', status_code),
73 ('content-length', '0'),
74 ('server', self._server_name),
75 ],
76 end_stream=True
77 )
78 self._sock.sendall(self._connection.data_to_send())
79
80 def _handle_request(self, event: events.RequestReceived) -> None:
81 headers = dict(event.headers)
82 if headers[':method'] != 'GET':
83 self._send_error_response('405', event)
84 return
85
86 file_path = Path(self._sources_dir) / headers[':path'].lstrip('/')
87 if not file_path.is_file():
88 self._send_error_response('404', event)
89 return
90
91 self._send_file(file_path, event.stream_id)
92
93 def _send_file(self, file_path: Path, stream_id: int) -> None:
94 """
95 Send a file, obeying the rules of HTTP/2 flow control.
96 """
97 file_size = file_path.stat().st_size
98 content_type, content_encoding = mimetypes.guess_type(str(file_path))
99 response_headers = [
100 (':status', '200'),
101 ('content-length', str(file_size)),
102 ('server', self._server_name)
103 ]
104 if content_type:
105 response_headers.append(('content-type', content_type))
106 if content_encoding:
107 response_headers.append(('content-encoding', content_encoding))
108
109 self._connection.send_headers(stream_id, response_headers)
110 self._sock.sendall(self._connection.data_to_send())
111
112 with file_path.open(mode='rb', buffering=0) as f:
113 self._send_file_data(f, stream_id)
114
115 def _send_file_data(self, file_obj, stream_id: int) -> None:
116 """
117 Send the data portion of a file. Handles flow control rules.
118 """
119 while True:
120 while self._connection.local_flow_control_window(stream_id) < 1:
121 self._wait_for_flow_control(stream_id)
122
123 chunk_size = min(self._connection.local_flow_control_window(stream_id), self._read_chunk_size)
124 data = file_obj.read(chunk_size)
125 keep_reading = (len(data) == chunk_size)
126
127 self._connection.send_data(stream_id, data, not keep_reading)
128 self._sock.sendall(self._connection.data_to_send())
129
130 if not keep_reading:
131 break
132
133 def _wait_for_flow_control(self, stream_id: int) -> None:
134 """
135 Blocks until the flow control window for a given stream is opened.
136 """
137 event = Event()
138 self._flow_control_events[stream_id] = event
139 event.wait()
140
141 def _handle_window_update(self, event: events.WindowUpdated) -> None:
142 """
143 Unblock streams waiting on flow control, if needed.
144 """
145 stream_id = event.stream_id
146
147 if stream_id and stream_id in self._flow_control_events:
148 g_event = self._flow_control_events.pop(stream_id)
149 g_event.set()
150 elif not stream_id:
151 # Need to keep a real list here to use only the events present at this time.
152 blocked_streams = list(self._flow_control_events.keys())
153 for stream_id in blocked_streams:
154 g_event = self._flow_control_events.pop(stream_id)
155 g_event.set()
156
157 def _run(self) -> None:
158 self._initiate_connection()
159
160 while True:
161 data = self._sock.recv(65535)
162 if not data:
163 break
164
165 h2_events = self._connection.receive_data(data)
166 for event in h2_events:
167 if isinstance(event, events.RequestReceived):
168 self._handle_request(event)
169 elif isinstance(event, events.DataReceived):
170 self._connection.reset_stream(event.stream_id)
171 elif isinstance(event, events.WindowUpdated):
172 self._handle_window_update(event)
173
174 data_to_send = self._connection.data_to_send()
175 if data_to_send:
176 self._sock.sendall(data_to_send)
177
178
179if __name__ == '__main__':
180 files_dir = sys.argv[1] if len(sys.argv) > 1 else f'{Path().cwd()}'
181 server = StreamServer(('127.0.0.1', 8080), partial(H2Worker, source_dir=files_dir),
182 ssl_context=get_http2_tls_context())
183 try:
184 server.serve_forever()
185 except KeyboardInterrupt:
186 server.close()