Skip to content

Commit db79dd5

Browse files
committed
fix error with portal bridge response polling
Signed-off-by: Lance-Drane <Lance-Drane@users.noreply.github.com>
1 parent dfd05f1 commit db79dd5

2 files changed

Lines changed: 86 additions & 57 deletions

File tree

ipsframework/bridges/portal_bridge.py

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,19 @@
1111
from multiprocessing.synchronize import Event as EventType
1212
from typing import Any, Callable, Literal, Union
1313

14-
import urllib3
14+
from urllib3 import PoolManager
15+
from urllib3.exceptions import MaxRetryError
16+
from urllib3.util import Retry as Urllib3Retry
1517

1618
from ipsframework import Component
1719

20+
MAX_RETRIES = 10
21+
1822

1923
def send_post(conn: Connection, stop: EventType, url: str):
2024
fail_count = 0
2125

22-
http = urllib3.PoolManager(retries=urllib3.util.Retry(3, backoff_factor=0.25), headers={'Content-Type': 'application/json'})
26+
http = PoolManager(retries=Urllib3Retry(total=MAX_RETRIES, backoff_factor=1, respect_retry_after_header=True), headers={'Content-Type': 'application/json'})
2327

2428
while True:
2529
if conn.poll(0.1):
@@ -28,14 +32,14 @@ def send_post(conn: Connection, stop: EventType, url: str):
2832
msgs.append(conn.recv())
2933
try:
3034
resp = http.request('POST', url, body=json.dumps(msgs).encode())
31-
except urllib3.exceptions.MaxRetryError as e:
35+
except MaxRetryError as e:
3236
fail_count += 1
33-
conn.send((999, str(e)))
37+
conn.send((999, f'Max retry error: {e}'))
3438
else:
3539
conn.send((resp.status, resp.data.decode()))
3640
fail_count = 0
3741

38-
if fail_count >= 3:
42+
if fail_count >= MAX_RETRIES:
3943
conn.send((-1, 'Too many consecutive failed connections'))
4044
break
4145
elif stop.is_set():
@@ -45,7 +49,7 @@ def send_post(conn: Connection, stop: EventType, url: str):
4549
def send_jupyter_notebook(conn: Connection, stop: EventType, url: str, api_key: str, username: str):
4650
fail_count = 0
4751

48-
http = urllib3.PoolManager(retries=urllib3.util.Retry(3, backoff_factor=0.25))
52+
http = PoolManager(retries=Urllib3Retry(total=MAX_RETRIES, backoff_factor=1, respect_retry_after_header=True))
4953

5054
while True:
5155
if conn.poll(0.1):
@@ -64,14 +68,14 @@ def send_jupyter_notebook(conn: Connection, stop: EventType, url: str, api_key:
6468
'X-Ips-Filename': next_val['filename'],
6569
},
6670
)
67-
except urllib3.exceptions.MaxRetryError as e:
71+
except MaxRetryError as e:
6872
fail_count += 1
69-
conn.send((999, str(e)))
73+
conn.send((999, f'Max retry error: {e}'))
7074
else:
7175
conn.send((resp.status, resp.data.decode()))
7276
fail_count = 0
7377

74-
if fail_count >= 3:
78+
if fail_count >= MAX_RETRIES:
7579
conn.send((-1, 'Too many consecutive failed connections'))
7680
break
7781
elif stop.is_set():
@@ -81,7 +85,7 @@ def send_jupyter_notebook(conn: Connection, stop: EventType, url: str, api_key:
8185
def send_jupyter_notebook_data(conn: Connection, stop: EventType, url: str, api_key: str, username: str):
8286
fail_count = 0
8387

84-
http = urllib3.PoolManager(retries=urllib3.util.Retry(3, backoff_factor=0.25))
88+
http = PoolManager(retries=Urllib3Retry(total=MAX_RETRIES, backoff_factor=1, respect_retry_after_header=True))
8589

8690
while True:
8791
if conn.poll(0.1):
@@ -110,14 +114,14 @@ def send_jupyter_notebook_data(conn: Connection, stop: EventType, url: str, api_
110114
body=body,
111115
headers=headers,
112116
)
113-
except (urllib3.exceptions.MaxRetryError, OSError) as e:
117+
except (MaxRetryError, OSError) as e:
114118
fail_count += 1
115-
conn.send((999, str(e)))
119+
conn.send((999, f'Max retry error: {e}'))
116120
else:
117121
conn.send((resp.status, resp.data.decode()))
118122
fail_count = 0
119123

120-
if fail_count >= 3:
124+
if fail_count >= MAX_RETRIES:
121125
conn.send((-1, 'Too many consecutive failed connections'))
122126
break
123127
elif stop.is_set():
@@ -127,7 +131,7 @@ def send_jupyter_notebook_data(conn: Connection, stop: EventType, url: str, api_
127131
def send_ensemble_variables(conn: Connection, stop: EventType, url: str, api_key: str, username: str):
128132
fail_count = 0
129133

130-
http = urllib3.PoolManager(retries=urllib3.util.Retry(3, backoff_factor=0.25))
134+
http = PoolManager(retries=Urllib3Retry(total=MAX_RETRIES, backoff_factor=1, respect_retry_after_header=True))
131135

132136
while True:
133137
if conn.poll(0.1):
@@ -151,14 +155,14 @@ def send_ensemble_variables(conn: Connection, stop: EventType, url: str, api_key
151155
body=body,
152156
headers=headers,
153157
)
154-
except (urllib3.exceptions.MaxRetryError, OSError) as e:
158+
except (MaxRetryError, OSError) as e:
155159
fail_count += 1
156-
conn.send((999, str(e)))
160+
conn.send((999, f'Max retry error: {e}'))
157161
else:
158162
conn.send((resp.status, resp.data.decode()))
159163
fail_count = 0
160164

161-
if fail_count >= 3:
165+
if fail_count >= MAX_RETRIES:
162166
conn.send((-1, 'Too many consecutive failed connections'))
163167
break
164168
elif stop.is_set():
@@ -339,22 +343,15 @@ def process_event(self, topicName, theEvent):
339343
time.sleep(1)
340344

341345
def check_send_post_responses(self):
342-
while self.parent_conn.poll():
346+
if self.parent_conn is None:
347+
self.services.warning('Giving up on polling for portal responses')
348+
return
349+
while self.parent_conn.poll(timeout=2.0):
343350
try:
344351
code, msg = self.parent_conn.recv()
345352
except (EOFError, OSError):
346353
break
347354

348-
try:
349-
data = json.loads(msg)
350-
if 'runid' in data and 'simname' in data:
351-
# Indicates IPS-START event return
352-
self.services.info('Run Portal URL = %s/%s', self.portal_url, data.get('runid'))
353-
self.services.set_config_param('_IPS_PORTAL_RUNID', str(data.get('runid')), target_sim_name=data.get('simname'))
354-
355-
msg = json.dumps(data)
356-
except (TypeError, json.decoder.JSONDecodeError):
357-
pass
358355
if code >= 400:
359356
self.services.error('Portal Error: %d %s', code, msg)
360357
elif code == -1:
@@ -363,6 +360,14 @@ def check_send_post_responses(self):
363360
self.services.error('Disabling portal because: %s', msg)
364361
else:
365362
self.services.debug('Portal Response: %d %s', code, msg)
363+
try:
364+
data = json.loads(msg)
365+
if 'runid' in data and 'simname' in data:
366+
# Indicates IPS-START event return
367+
self.services.info('Run Portal URL = %s/%s', self.portal_url, data.get('runid'))
368+
self.services.set_config_param('_IPS_PORTAL_RUNID', str(data.get('runid')), target_sim_name=data.get('simname'))
369+
except (TypeError, json.decoder.JSONDecodeError):
370+
pass
366371

367372
def http_req_and_response(self, manager: UrlRequestProcessManager, event_data):
368373
try:

ipsframework/services.py

Lines changed: 53 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,8 @@ def __init__(self, fwk, fwk_in_q: Queue, svc_response_q: Queue, sim_conf: dict[s
344344
"""
345345
This is meant to be a unique identifier fallback in the event we can't get the portal runid.
346346
"""
347+
self._portal_runid_event = threading.Event()
348+
"""Thread-safe means to detect if self._portal_runid has been set"""
347349

348350
def __initialize__(self, component_ref):
349351
"""
@@ -1940,45 +1942,52 @@ def _should_use_portal(self) -> bool:
19401942
self.warning('Unusual value for USE_PORTAL: %s', use_portal_config)
19411943
return False
19421944

1943-
def _get_portal_runid(self) -> int:
1945+
def _establish_portal_runid(self) -> None:
19441946
"""Get the runid Jupyter and the Portal will associate with this run.
19451947
Generally this will be the runid that the portal emits, but we will try to allow for fallbacks in certain cases.
19461948
1947-
If value is < 0, we were unable to get the portal runid.
1948-
"""
1949+
If value >= 0, we have the portal runid
1950+
If value == -1, we have not yet set the portal runid
1951+
If value == -2, we have tried and failed to get the portal runid, and will not try again
19491952
1950-
# already successful
1951-
if self._portal_runid >= 0:
1952-
return self._portal_runid
1953+
You should explicitly check the value of self._portal_runid after this function, as this function is concerned with setting the value in a thread-safe context.
1954+
"""
19531955

1954-
# already failed
1955-
if self._portal_runid == -2:
1956-
return -2
1956+
# first, check if portal_runid was already set
1957+
if self._portal_runid_event.is_set():
1958+
return
19571959

19581960
# first, check to see if we even want to use the portal
19591961
if not self._should_use_portal():
19601962
self.warning('web portal disabled')
19611963
self._portal_runid = -2
1962-
return -2
1964+
self._portal_runid_event.set()
1965+
return
19631966

19641967
# next, check to see if the portal URL was even initialized, fall back if not
19651968
if not self.get_config_param('PORTAL_URL', silent=True):
19661969
self.warning('_get_jupyter_runid: PORTAL_URL was not defined, disabling Jupyter workflow')
19671970
self._portal_runid = -2
1968-
return -2
1971+
self._portal_runid_event.set()
1972+
return
19691973

19701974
# next, check to see if the user remembered to define an API key (adding data requires a runid)
19711975
if not self.get_config_param('_IPS_PORTAL_API_KEY', silent=True):
19721976
self.warning('_get_jupyter_runid: PORTAL_API_KEY was not defined, disabling Jupyter workflow')
19731977
self._portal_runid = -2
1974-
return -2
1978+
self._portal_runid_event.set()
1979+
return
19751980

19761981
# Here, we will periodically check to see if we have our config param set
19771982
# The IPS Portal Bridge component will set this after it gets a response back from the IPS_START event
19781983
# Inside this IPS_START event is the runid as maintained by the IPS Portal itself
1984+
19791985
attempts = 0
1980-
max_attempts = 30
1981-
while True:
1986+
max_attempts = 10
1987+
base_time = time.time()
1988+
1989+
# if the event flag is set from another call, self._portal_runid should also be set to a non-default value, so break early
1990+
while not self._portal_runid_event.is_set():
19821991
try:
19831992
# We expect to fail this call a few times, so do not log the failed attempts
19841993
value = self.get_config_param('_IPS_PORTAL_RUNID', log=False)
@@ -1987,16 +1996,21 @@ def _get_portal_runid(self) -> int:
19871996
except Exception:
19881997
self.warning('got back invalid value for runid from portal: %s', value)
19891998
self._portal_runid = -2
1990-
return -2
1999+
self._portal_runid_event.set()
2000+
return
19912001
self._portal_runid = value
1992-
return value
2002+
self._portal_runid_event.set()
2003+
self.info('took this long to obtain PORTAL_RUNID: %d', time.time() - base_time)
2004+
return
19932005
except Exception:
19942006
attempts += 1
19952007
if attempts >= max_attempts:
19962008
self.warning('_get_jupyter_runid: Unable to get RUNID directly from remote portal, disabling Jupyter workflow')
19972009
self._portal_runid = -2
1998-
return -2
1999-
time.sleep(1.0)
2010+
self._portal_runid_event.set()
2011+
self.warning('took this amount of time to reach max attempts: %d', time.time() - base_time)
2012+
return
2013+
self._portal_runid_event.wait(1.0)
20002014

20012015
def initialize_jupyter_notebook(
20022016
self,
@@ -2012,8 +2026,12 @@ def initialize_jupyter_notebook(
20122026
:param source_notebook_path: location you want to load the source notebook from. This can be either an absolute path, or an IPS-appropriate relative path.
20132027
:param dest_notebook_name: (optional, default None) filename of the notebook to use when saving it to the IPS Portal. If not provided, this will defauly to the filename of the source notebook.
20142028
"""
2015-
portal_runid = self._get_portal_runid()
2016-
if portal_runid < 0:
2029+
# have we initialized a runid yet? if not, block this function call until we can establish or not establish one
2030+
if not self._portal_runid_event.is_set():
2031+
self._establish_portal_runid()
2032+
2033+
# now check to see if we have a valid runid
2034+
if self._portal_runid < 0:
20172035
return
20182036

20192037
if not os.path.exists(source_notebook_path):
@@ -2035,7 +2053,7 @@ def initialize_jupyter_notebook(
20352053
portal_data['data_source'] = os.path.join(os.getcwd(), source_notebook_path) if not os.path.isabs(source_notebook_path) else source_notebook_path
20362054
portal_data['username'] = self.get_config_param('USER')
20372055
portal_data['filename'] = dest_notebook_name
2038-
portal_data['portal_runid'] = portal_runid
2056+
portal_data['portal_runid'] = self._portal_runid
20392057
event_data['portal_data'] = portal_data
20402058
self.publish('_IPS_MONITOR', 'PORTAL_REGISTER_NOTEBOOK', event_data)
20412059
self._send_monitor_event('IPS_PORTAL_REGISTER_NOTEBOOK', f'FILENAME = {dest_notebook_name}')
@@ -2050,8 +2068,12 @@ def add_analysis_data_files(self, current_data_file_paths: list[str], timestamp:
20502068
:param replace: If True, replace the last data file added with the new data file. If False, simply append the new data file. (default: False)
20512069
Note that if replace is not True but you attempt to overwrite it, a ValueError will be thrown.
20522070
"""
2053-
portal_runid = self._get_portal_runid()
2054-
if portal_runid < 0:
2071+
# have we initialized a runid yet? if not, block this function call until we can establish or not establish one
2072+
if not self._portal_runid_event.is_set():
2073+
self._establish_portal_runid()
2074+
2075+
# now check to see if we have a valid runid
2076+
if self._portal_runid < 0:
20552077
return
20562078

20572079
for source in current_data_file_paths:
@@ -2071,7 +2093,7 @@ def add_analysis_data_files(self, current_data_file_paths: list[str], timestamp:
20712093
portal_data['filename'] = filename
20722094
portal_data['tag'] = timestamp
20732095
portal_data['replace'] = replace
2074-
portal_data['portal_runid'] = portal_runid
2096+
portal_data['portal_runid'] = self._portal_runid
20752097
event_data['portal_data'] = portal_data
20762098
self.publish('_IPS_MONITOR', 'PORTAL_ADD_JUPYTER_DATA', event_data)
20772099
self._send_monitor_event('IPS_PORTAL_ADD_JUPYTER_DATA', f'SOURCE = {source} TIMESTAMP = {timestamp} REPLACE = {replace}')
@@ -2576,11 +2598,13 @@ def send_ensemble_instance_to_portal(ensemble_name: str, data_path: Path) -> Non
25762598
if not use_portal:
25772599
return
25782600

2579-
# ensure that the portal is initialized
2580-
portal_runid = self._get_portal_runid()
2581-
if portal_runid < 0:
2601+
# have we initialized a runid yet? if not, block this function call until we can establish or not establish one
2602+
if not self._portal_runid_event.is_set():
2603+
self._establish_portal_runid()
2604+
2605+
# now check to see if we have a valid runid
2606+
if self._portal_runid < 0:
25822607
return
2583-
25842608
event_data = {}
25852609
event_data['sim_name'] = self.sim_conf['__PORTAL_SIM_NAME']
25862610
event_data['real_sim_name'] = self.sim_name
@@ -2592,7 +2616,7 @@ def send_ensemble_instance_to_portal(ensemble_name: str, data_path: Path) -> Non
25922616
portal_data['ensemble_id'] = portal_ensemble_id
25932617
portal_data['ensemble_data_path'] = data_path
25942618
portal_data['username'] = self.get_config_param('USER')
2595-
portal_data['portal_runid'] = portal_runid
2619+
portal_data['portal_runid'] = self._portal_runid
25962620
event_data['portal_data'] = portal_data
25972621
self.publish('_IPS_MONITOR', 'PORTAL_UPLOAD_ENSEMBLE_PARAMS', event_data)
25982622
self._send_monitor_event('IPS_PORTAL_UPLOAD_ENSEMBLE_PARAMS', f'NAME = {name}')

0 commit comments

Comments
 (0)