Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions httpcore/_async/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(
uds: str | None = None,
network_backend: AsyncNetworkBackend | None = None,
socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
h2_ping_interval: float | None = None,
) -> None:
self._origin = origin
self._ssl_context = ssl_context
Expand All @@ -57,6 +58,7 @@ def __init__(
self._retries = retries
self._local_address = local_address
self._uds = uds
self._h2_ping_interval = h2_ping_interval

self._network_backend: AsyncNetworkBackend = (
AutoBackend() if network_backend is None else network_backend
Expand Down Expand Up @@ -89,6 +91,7 @@ async def handle_async_request(self, request: Request) -> Response:
origin=self._origin,
stream=stream,
keepalive_expiry=self._keepalive_expiry,
h2_ping_interval=self._h2_ping_interval,
)
else:
self._connection = AsyncHTTP11Connection(
Expand Down
7 changes: 7 additions & 0 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(
uds: str | None = None,
network_backend: AsyncNetworkBackend | None = None,
socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
h2_ping_interval: float | None = None,
) -> None:
"""
A connection pool for making HTTP requests.
Expand Down Expand Up @@ -88,6 +89,10 @@ def __init__(
network_backend: A backend instance to use for handling network I/O.
socket_options: Socket options that have to be included
in the TCP socket when the connection was established.
h2_ping_interval: Interval in seconds between HTTP/2 PING frames
sent to keep connections alive. Set to ``None`` to disable.
Falls back to the ``HTTPCORE_H2_PING_INTERVAL`` environment
variable if not specified.
"""
self._ssl_context = ssl_context
self._proxy = proxy
Expand All @@ -114,6 +119,7 @@ def __init__(
AutoBackend() if network_backend is None else network_backend
)
self._socket_options = socket_options
self._h2_ping_interval = h2_ping_interval

# The mutable state on a connection pool is the queue of incoming requests,
# and the set of connections that are servicing those requests.
Expand Down Expand Up @@ -176,6 +182,7 @@ def create_connection(self, origin: Origin) -> AsyncConnectionInterface:
uds=self._uds,
network_backend=self._network_backend,
socket_options=self._socket_options,
h2_ping_interval=self._h2_ping_interval,
)

@property
Expand Down
58 changes: 58 additions & 0 deletions httpcore/_async/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import enum
import logging
import os
import threading
import time
import types
import typing
Expand Down Expand Up @@ -48,6 +50,7 @@ def __init__(
origin: Origin,
stream: AsyncNetworkStream,
keepalive_expiry: float | None = None,
h2_ping_interval: float | None = None,
):
self._origin = origin
self._network_stream = stream
Expand All @@ -64,6 +67,15 @@ def __init__(
self._used_all_stream_ids = False
self._connection_error = False

if h2_ping_interval is not None:
self._h2_ping_interval: float | None = h2_ping_interval
else:
env_val = os.environ.get("HTTPCORE_H2_PING_INTERVAL")
self._h2_ping_interval = float(env_val) if env_val else None
self._ping_thread: threading.Thread | None = None
self._ping_stop = threading.Event()
self._ping_write_lock = threading.Lock()

# Mapping from stream ID to response stream events.
self._events: dict[
int,
Expand Down Expand Up @@ -217,6 +229,48 @@ async def _send_connection_init(self, request: Request) -> None:
self._h2_state.increment_flow_control_window(2**24)
await self._write_outgoing_data(request)

if self._h2_ping_interval is not None:
self._start_ping_keepalive()

def _start_ping_keepalive(self) -> None:
self._ping_stop.clear()
self._ping_thread = threading.Thread(
target=self._ping_keepalive_loop, daemon=True
)
self._ping_thread.start()
logger.debug(
"HTTP/2 PING keepalive started (interval=%.0fs)", self._h2_ping_interval
)

def _ping_keepalive_loop(self) -> None:
"""Background thread that sends periodic PING frames via the raw socket."""
assert self._h2_ping_interval is not None

raw_sock = self._network_stream.get_extra_info("socket")
if raw_sock is None:
raw_sock = self._network_stream.get_extra_info("ssl_object")
if raw_sock is None: # pragma: nocover
logger.debug("HTTP/2 PING keepalive: unable to obtain raw socket, stopping")
return

while not self._ping_stop.wait(self._h2_ping_interval):
try:
if self.is_closed(): # pragma: nocover
break
with self._ping_write_lock:
if self.is_closed(): # pragma: nocover
break
opaque = int(time.monotonic_ns() & 0xFFFFFFFFFFFFFFFF).to_bytes(
8, "big"
)
self._h2_state.ping(opaque)
data_to_send = self._h2_state.data_to_send()
if data_to_send:
raw_sock.sendall(data_to_send)
logger.debug("HTTP/2 PING sent")
except Exception: # pragma: nocover
break

# Sending the request...

async def _send_request_headers(self, request: Request, stream_id: int) -> None:
Expand Down Expand Up @@ -424,6 +478,10 @@ async def _response_closed(self, stream_id: int) -> None:
async def aclose(self) -> None:
# Note that this method unilaterally closes the connection, and does
# not have any kind of locking in place around it.
self._ping_stop.set()
if self._ping_thread is not None:
self._ping_thread.join(timeout=2)
self._ping_thread = None
self._h2_state.close_connection()
self._state = HTTPConnectionState.CLOSED
await self._network_stream.aclose()
Expand Down
3 changes: 3 additions & 0 deletions httpcore/_sync/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(
uds: str | None = None,
network_backend: NetworkBackend | None = None,
socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
h2_ping_interval: float | None = None,
) -> None:
self._origin = origin
self._ssl_context = ssl_context
Expand All @@ -57,6 +58,7 @@ def __init__(
self._retries = retries
self._local_address = local_address
self._uds = uds
self._h2_ping_interval = h2_ping_interval

self._network_backend: NetworkBackend = (
SyncBackend() if network_backend is None else network_backend
Expand Down Expand Up @@ -89,6 +91,7 @@ def handle_request(self, request: Request) -> Response:
origin=self._origin,
stream=stream,
keepalive_expiry=self._keepalive_expiry,
h2_ping_interval=self._h2_ping_interval,
)
else:
self._connection = HTTP11Connection(
Expand Down
7 changes: 7 additions & 0 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(
uds: str | None = None,
network_backend: NetworkBackend | None = None,
socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
h2_ping_interval: float | None = None,
) -> None:
"""
A connection pool for making HTTP requests.
Expand Down Expand Up @@ -88,6 +89,10 @@ def __init__(
network_backend: A backend instance to use for handling network I/O.
socket_options: Socket options that have to be included
in the TCP socket when the connection was established.
h2_ping_interval: Interval in seconds between HTTP/2 PING frames
sent to keep connections alive. Set to ``None`` to disable.
Falls back to the ``HTTPCORE_H2_PING_INTERVAL`` environment
variable if not specified.
"""
self._ssl_context = ssl_context
self._proxy = proxy
Expand All @@ -114,6 +119,7 @@ def __init__(
SyncBackend() if network_backend is None else network_backend
)
self._socket_options = socket_options
self._h2_ping_interval = h2_ping_interval

# The mutable state on a connection pool is the queue of incoming requests,
# and the set of connections that are servicing those requests.
Expand Down Expand Up @@ -176,6 +182,7 @@ def create_connection(self, origin: Origin) -> ConnectionInterface:
uds=self._uds,
network_backend=self._network_backend,
socket_options=self._socket_options,
h2_ping_interval=self._h2_ping_interval,
)

@property
Expand Down
58 changes: 58 additions & 0 deletions httpcore/_sync/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import enum
import logging
import os
import threading
import time
import types
import typing
Expand Down Expand Up @@ -48,6 +50,7 @@ def __init__(
origin: Origin,
stream: NetworkStream,
keepalive_expiry: float | None = None,
h2_ping_interval: float | None = None,
):
self._origin = origin
self._network_stream = stream
Expand All @@ -64,6 +67,15 @@ def __init__(
self._used_all_stream_ids = False
self._connection_error = False

if h2_ping_interval is not None:
self._h2_ping_interval: float | None = h2_ping_interval
else:
env_val = os.environ.get("HTTPCORE_H2_PING_INTERVAL")
self._h2_ping_interval = float(env_val) if env_val else None
self._ping_thread: threading.Thread | None = None
self._ping_stop = threading.Event()
self._ping_write_lock = threading.Lock()

# Mapping from stream ID to response stream events.
self._events: dict[
int,
Expand Down Expand Up @@ -217,6 +229,48 @@ def _send_connection_init(self, request: Request) -> None:
self._h2_state.increment_flow_control_window(2**24)
self._write_outgoing_data(request)

if self._h2_ping_interval is not None:
self._start_ping_keepalive()

def _start_ping_keepalive(self) -> None:
self._ping_stop.clear()
self._ping_thread = threading.Thread(
target=self._ping_keepalive_loop, daemon=True
)
self._ping_thread.start()
logger.debug(
"HTTP/2 PING keepalive started (interval=%.0fs)", self._h2_ping_interval
)

def _ping_keepalive_loop(self) -> None:
"""Background thread that sends periodic PING frames via the raw socket."""
assert self._h2_ping_interval is not None

raw_sock = self._network_stream.get_extra_info("socket")
if raw_sock is None:
raw_sock = self._network_stream.get_extra_info("ssl_object")
if raw_sock is None: # pragma: nocover
logger.debug("HTTP/2 PING keepalive: unable to obtain raw socket, stopping")
return

while not self._ping_stop.wait(self._h2_ping_interval):
try:
if self.is_closed(): # pragma: nocover
break
with self._ping_write_lock:
if self.is_closed(): # pragma: nocover
break
opaque = int(time.monotonic_ns() & 0xFFFFFFFFFFFFFFFF).to_bytes(
8, "big"
)
self._h2_state.ping(opaque)
data_to_send = self._h2_state.data_to_send()
if data_to_send:
raw_sock.sendall(data_to_send)
logger.debug("HTTP/2 PING sent")
except Exception: # pragma: nocover
break

# Sending the request...

def _send_request_headers(self, request: Request, stream_id: int) -> None:
Expand Down Expand Up @@ -424,6 +478,10 @@ def _response_closed(self, stream_id: int) -> None:
def close(self) -> None:
# Note that this method unilaterally closes the connection, and does
# not have any kind of locking in place around it.
self._ping_stop.set()
if self._ping_thread is not None:
self._ping_thread.join(timeout=2)
self._ping_thread = None
self._h2_state.close_connection()
self._state = HTTPConnectionState.CLOSED
self._network_stream.close()
Expand Down
Loading
Loading