From 75c0c9b6bf60408b808c0d1da60ecde922f680df Mon Sep 17 00:00:00 2001 From: Daniel Fangl Date: Mon, 8 Jan 2024 11:18:19 +0100 Subject: [PATCH 1/4] Add config option to gateway worker thread count --- localstack/aws/serving/asgi.py | 3 ++- localstack/config.py | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/localstack/aws/serving/asgi.py b/localstack/aws/serving/asgi.py index 7d4d93683aaa0..42d397bd530d6 100644 --- a/localstack/aws/serving/asgi.py +++ b/localstack/aws/serving/asgi.py @@ -3,6 +3,7 @@ from asyncio import AbstractEventLoop from typing import Optional +from localstack import config from localstack.aws.gateway import Gateway from localstack.aws.serving.wsgi import WsgiGateway from localstack.http.asgi import ASGIAdapter, ASGILifespanListener @@ -39,7 +40,7 @@ def __init__( self, gateway: Gateway, event_loop: Optional[AbstractEventLoop] = None, - threads: int = 1000, + threads: int = config.GATEWAY_WORKER_THREAD_COUNT, lifespan_listener: Optional[ASGILifespanListener] = None, websocket_listener=None, ) -> None: diff --git a/localstack/config.py b/localstack/config.py index 3cc1baf6aa11f..0b159ba8fc0bf 100644 --- a/localstack/config.py +++ b/localstack/config.py @@ -655,6 +655,8 @@ def populate_edge_configuration( GATEWAY_LISTEN, ) = populate_edge_configuration(os.environ) +GATEWAY_WORKER_THREAD_COUNT = int(os.environ.get("GATEWAY_WORKER_THREAD_COUNT")) or 1000 + # IP of the docker bridge used to enable access between containers DOCKER_BRIDGE_IP = os.environ.get("DOCKER_BRIDGE_IP", "").strip() @@ -1114,6 +1116,7 @@ def use_custom_dns(): "EXTRA_CORS_ALLOWED_ORIGINS", "EXTRA_CORS_EXPOSE_HEADERS", "GATEWAY_LISTEN", + "GATEWAY_WORKER_THREAD_COUNT", "HOSTNAME", "HOSTNAME_FROM_LAMBDA", "KINESIS_ERROR_PROBABILITY", From d2f5d18ddb5aa6637152a11d3b45c8dd9629560a Mon Sep 17 00:00:00 2001 From: Daniel Fangl Date: Mon, 8 Jan 2024 11:25:25 +0100 Subject: [PATCH 2/4] fix error in default value --- localstack/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/localstack/config.py b/localstack/config.py index 0b159ba8fc0bf..26751c43c5951 100644 --- a/localstack/config.py +++ b/localstack/config.py @@ -655,7 +655,7 @@ def populate_edge_configuration( GATEWAY_LISTEN, ) = populate_edge_configuration(os.environ) -GATEWAY_WORKER_THREAD_COUNT = int(os.environ.get("GATEWAY_WORKER_THREAD_COUNT")) or 1000 +GATEWAY_WORKER_THREAD_COUNT = int(os.environ.get("GATEWAY_WORKER_THREAD_COUNT") or 1000) # IP of the docker bridge used to enable access between containers DOCKER_BRIDGE_IP = os.environ.get("DOCKER_BRIDGE_IP", "").strip() From 4620fe8b1c4150c240061e6243d47d9558de45c3 Mon Sep 17 00:00:00 2001 From: Daniel Fangl Date: Mon, 8 Jan 2024 12:21:03 +0100 Subject: [PATCH 3/4] move usage of newly introduced variable to the serving of the edge gateway --- localstack/aws/serving/asgi.py | 5 ++--- localstack/aws/serving/edge.py | 3 ++- localstack/http/hypercorn.py | 12 ++++++++---- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/localstack/aws/serving/asgi.py b/localstack/aws/serving/asgi.py index 42d397bd530d6..5ba09194fdb15 100644 --- a/localstack/aws/serving/asgi.py +++ b/localstack/aws/serving/asgi.py @@ -3,7 +3,6 @@ from asyncio import AbstractEventLoop from typing import Optional -from localstack import config from localstack.aws.gateway import Gateway from localstack.aws.serving.wsgi import WsgiGateway from localstack.http.asgi import ASGIAdapter, ASGILifespanListener @@ -40,14 +39,14 @@ def __init__( self, gateway: Gateway, event_loop: Optional[AbstractEventLoop] = None, - threads: int = config.GATEWAY_WORKER_THREAD_COUNT, + threads: int = None, lifespan_listener: Optional[ASGILifespanListener] = None, websocket_listener=None, ) -> None: self.gateway = gateway self.event_loop = event_loop or asyncio.get_event_loop() - self.executor = _ThreadPool(threads, thread_name_prefix="asgi_gw") + self.executor = _ThreadPool(threads or 1000, thread_name_prefix="asgi_gw") self.adapter = ASGIAdapter( WsgiGateway(gateway), event_loop=event_loop, diff --git a/localstack/aws/serving/edge.py b/localstack/aws/serving/edge.py index e4a835a82eb44..bdd9a769b08ac 100644 --- a/localstack/aws/serving/edge.py +++ b/localstack/aws/serving/edge.py @@ -1,6 +1,7 @@ import logging from typing import List +from localstack import config from localstack.config import HostAndPort from localstack.http.hypercorn import GatewayServer from localstack.runtime.shutdown import ON_AFTER_SERVICE_SHUTDOWN_HANDLERS @@ -21,7 +22,7 @@ def serve_gateway( gateway = LocalstackAwsGateway(SERVICE_PLUGINS) # start serving gateway - server = GatewayServer(gateway, listen, use_ssl) + server = GatewayServer(gateway, listen, use_ssl, config.GATEWAY_WORKER_THREAD_COUNT) server.start() # with the current way the infrastructure is started, this is the easiest way to shut down the server correctly diff --git a/localstack/http/hypercorn.py b/localstack/http/hypercorn.py index e68f6bda61682..e14f2e167c797 100644 --- a/localstack/http/hypercorn.py +++ b/localstack/http/hypercorn.py @@ -82,15 +82,19 @@ class GatewayServer(HypercornServer): """ def __init__( - self, gateway: Gateway, listen: HostAndPort | list[HostAndPort], use_ssl: bool = False + self, + gateway: Gateway, + listen: HostAndPort | list[HostAndPort], + use_ssl: bool = False, + threads: int | None = None, ): """ Creates a new GatewayServer instance. :param gateway: which will be served by this server - :param port: defining the port of this server instance - :param bind_address: to bind this server instance to. Can be a host string or a list of host strings. + :param listen: defining the address and port pairs this server binds to. Can be a list of host and port pairs. :param use_ssl: True if the LocalStack cert should be loaded and HTTP/HTTPS multiplexing should be enabled. + :param threads: Number of worker threads the gateway will use. """ # build server config config = Config() @@ -109,7 +113,7 @@ def __init__( # build gateway loop = asyncio.new_event_loop() - app = AsgiGateway(gateway, event_loop=loop) + app = AsgiGateway(gateway, event_loop=loop, threads=threads) # start serving gateway super().__init__(app, config, loop) From 24278fae889d7cb720bb1c961a3d4d2f190abb90 Mon Sep 17 00:00:00 2001 From: Daniel Fangl Date: Tue, 9 Jan 2024 09:58:34 +0100 Subject: [PATCH 4/4] change GATEWAY_WORKER_THREAD_COUNT to GATEWAY_WORKER_COUNT --- localstack/aws/serving/edge.py | 2 +- localstack/config.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/localstack/aws/serving/edge.py b/localstack/aws/serving/edge.py index bdd9a769b08ac..08ff45097fd86 100644 --- a/localstack/aws/serving/edge.py +++ b/localstack/aws/serving/edge.py @@ -22,7 +22,7 @@ def serve_gateway( gateway = LocalstackAwsGateway(SERVICE_PLUGINS) # start serving gateway - server = GatewayServer(gateway, listen, use_ssl, config.GATEWAY_WORKER_THREAD_COUNT) + server = GatewayServer(gateway, listen, use_ssl, config.GATEWAY_WORKER_COUNT) server.start() # with the current way the infrastructure is started, this is the easiest way to shut down the server correctly diff --git a/localstack/config.py b/localstack/config.py index 26751c43c5951..21898370f6848 100644 --- a/localstack/config.py +++ b/localstack/config.py @@ -655,7 +655,7 @@ def populate_edge_configuration( GATEWAY_LISTEN, ) = populate_edge_configuration(os.environ) -GATEWAY_WORKER_THREAD_COUNT = int(os.environ.get("GATEWAY_WORKER_THREAD_COUNT") or 1000) +GATEWAY_WORKER_COUNT = int(os.environ.get("GATEWAY_WORKER_COUNT") or 1000) # IP of the docker bridge used to enable access between containers DOCKER_BRIDGE_IP = os.environ.get("DOCKER_BRIDGE_IP", "").strip()