async def process_requests_by_type(self, requests, request_types, request_ids):
tasks = []
for request_data, request_type, request_id in zip(requests, request_types, request_ids):
if request_type == 'embed':
task = asyncio.create_task(self.run_with_semaphore(self.model.embed, request_data.sentences, request_id))
else: # 'rerank'
task = asyncio.create_task(self.run_with_semaphore(self.model.rerank, request_data.sentence_pairs, request_id))
tasks.append(task)
await asyncio.gather(*tasks)
async def run_with_semaphore(self, func, data, request_id):
async with self.gpu_lock: # Wait for sem
future = self.executor.submit(func, data)
try:
result = await asyncio.wait_for(asyncio.wrap_future(future), timeout= gpu_time_out)
self.response_futures[request_id].set_result(result)
except asyncio.TimeoutError:
self.response_futures[request_id].set_exception(TimeoutError("GPU processing timeout"))
except Exception as e:
self.response_futures[request_id].set_exception(e)