From 9c85c73f22e8d80b463813ebbf71322fcc2f0f51 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Thu, 26 Dec 2024 16:59:10 -0800 Subject: [PATCH 1/3] Fix missing worker_id from interceptor --- sdks/python/apache_beam/runners/worker/sdk_worker.py | 8 ++++---- sdks/python/apache_beam/runners/worker/worker_status.py | 4 +++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index b091220a06b5..0bcf43756f61 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -201,7 +201,7 @@ def __init__( self._data_channel_factory = data_plane.GrpcClientDataChannelFactory( credentials, self._worker_id, data_buffer_time_limit_ms) self._state_handler_factory = GrpcStateHandlerFactory( - self._state_cache, credentials) + self._state_cache, self._worker_id, credentials) self._profiler_factory = profiler_factory self.data_sampler = data_sampler self.runner_capabilities = runner_capabilities @@ -893,8 +893,8 @@ class GrpcStateHandlerFactory(StateHandlerFactory): Caches the created channels by ``state descriptor url``. """ - def __init__(self, state_cache, credentials=None): - # type: (StateCache, Optional[grpc.ChannelCredentials]) -> None + def __init__(self, state_cache, worker_id, credentials=None): + # type: (StateCache, Optional[str], Optional[grpc.ChannelCredentials]) -> None self._state_handler_cache = {} # type: Dict[str, CachingStateHandler] self._lock = threading.Lock() self._throwing_state_handler = ThrowingStateHandler() @@ -926,7 +926,7 @@ def create_state_handler(self, api_service_descriptor): _LOGGER.info('State channel established.') # Add workerId to the grpc channel grpc_channel = grpc.intercept_channel( - grpc_channel, WorkerIdInterceptor()) + grpc_channel, WorkerIdInterceptor(self._worker_id)) self._state_handler_cache[url] = GlobalCachingStateHandler( self._state_cache, GrpcStateHandler( diff --git a/sdks/python/apache_beam/runners/worker/worker_status.py b/sdks/python/apache_beam/runners/worker/worker_status.py index 2271b4495d79..ecd4dc4e02c0 100644 --- a/sdks/python/apache_beam/runners/worker/worker_status.py +++ b/sdks/python/apache_beam/runners/worker/worker_status.py @@ -151,6 +151,7 @@ def __init__( bundle_process_cache=None, state_cache=None, enable_heap_dump=False, + worker_id=None, log_lull_timeout_ns=DEFAULT_LOG_LULL_TIMEOUT_NS): """Initialize FnApiWorkerStatusHandler. @@ -164,7 +165,8 @@ def __init__( self._state_cache = state_cache ch = GRPCChannelFactory.insecure_channel(status_address) grpc.channel_ready_future(ch).result(timeout=60) - self._status_channel = grpc.intercept_channel(ch, WorkerIdInterceptor()) + self._status_channel = grpc.intercept_channel( + ch, WorkerIdInterceptor(worker_id)) self._status_stub = beam_fn_api_pb2_grpc.BeamFnWorkerStatusStub( self._status_channel) self._responses = queue.Queue() From ed45658a650ec8bcf215df6a3b70ca2e4a218c5d Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Fri, 27 Dec 2024 11:54:24 -0800 Subject: [PATCH 2/3] Add worker_id attribute --- sdks/python/apache_beam/runners/worker/sdk_worker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 0bcf43756f61..b4a2a04997f3 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -900,6 +900,7 @@ def __init__(self, state_cache, worker_id, credentials=None): self._throwing_state_handler = ThrowingStateHandler() self._credentials = credentials self._state_cache = state_cache + self._worker_id = worker_id def create_state_handler(self, api_service_descriptor): # type: (endpoints_pb2.ApiServiceDescriptor) -> CachingStateHandler From 1fe5190fb940d6e3e330eaee587c7be1585bafb6 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Mon, 30 Dec 2024 10:57:49 -0800 Subject: [PATCH 3/3] reorder and default parameters for GrpcStateHandlerFactory --- sdks/python/apache_beam/runners/worker/sdk_worker.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index b4a2a04997f3..3cb1a26b77f1 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -201,7 +201,9 @@ def __init__( self._data_channel_factory = data_plane.GrpcClientDataChannelFactory( credentials, self._worker_id, data_buffer_time_limit_ms) self._state_handler_factory = GrpcStateHandlerFactory( - self._state_cache, self._worker_id, credentials) + state_cache=self._state_cache, + credentials=credentials, + worker_id=self._worker_id) self._profiler_factory = profiler_factory self.data_sampler = data_sampler self.runner_capabilities = runner_capabilities @@ -893,8 +895,8 @@ class GrpcStateHandlerFactory(StateHandlerFactory): Caches the created channels by ``state descriptor url``. """ - def __init__(self, state_cache, worker_id, credentials=None): - # type: (StateCache, Optional[str], Optional[grpc.ChannelCredentials]) -> None + def __init__(self, state_cache, credentials=None, worker_id=None): + # type: (StateCache, Optional[grpc.ChannelCredentials], Optional[str]) -> None self._state_handler_cache = {} # type: Dict[str, CachingStateHandler] self._lock = threading.Lock() self._throwing_state_handler = ThrowingStateHandler()