# AbstractTask - Base Task Class > **Base class cho background tasks với lifecycle management, signals, và serialization** ## Overview `AbstractTask` là abstract base class cho tất cả tasks: - QRunnable integration (QThreadPool) - Qt signals (statusChanged, progressUpdated, taskFinished) - Lifecycle management (PENDING → RUNNING → COMPLETED/FAILED/CANCELLED) - Serialization/deserialization - Retry mechanism - Cancellation support - Unique Task support ## API Reference ### Constructor ```python from core.taskSystem import AbstractTask, UniqueType class MyTask(AbstractTask): def __init__(self, name, **kwargs): super().__init__( name=name, description='Task description', isPersistent=False, maxRetries=0, retryDelaySeconds=5, failSilently=False, retryDelaySeconds=5, failSilently=False, chainUuid=None, tags=None, uniqueType=UniqueType.NONE ) ) ``` ### Abstract Methods ```python def handle(self): """Main task logic - must implement""" pass def _performCancellationCleanup(self): """Cleanup on cancellation - must implement""" pass @classmethod def deserialize(cls, data: dict): """Deserialize from dict - must implement if isPersistent=True""" pass def deserialize(cls, data: dict): """Deserialize from dict - must implement if isPersistent=True""" pass ``` ### Tagging ```python task.addTag('Network') task.removeTag('Network') hasTag = task.hasTag('Network') tags = task.tags # Set[str] ``` ### Status Management ```python self.setStatus(TaskStatus.RUNNING) self.setProgress(50) # 0-100 ``` ### Cancellation ```python if self.isStopped(): return self.cancel() # Request cancellation self.fail('Reason') # Mark as failed ``` ### Serialization ```python # Auto-serialization data = task.serialize() # Custom fields class MyTask(AbstractTask): serializables = ['customField1', 'customField2'] ``` ## Unique Tasks Allows preventing duplicate tasks from being queued or running concurrently. ### Concepts **UniqueType Enum**: - `UniqueType.NONE`: No uniqueness constraint (default). - `UniqueType.JOB`: **Global Uniqueness**. Ensures only one instance exists in the system (Pending OR Running). If a duplicate is added, it is ignored (deduplicated). - `UniqueType.UNTIL_PROCESSING`: **Queue Uniqueness**. Ensures only one instance exists in the **Pending** queue. Once the task starts running, another instance can be added to the queue. **uniqueVia()**: - Defines the unique key for the task. - Default: Returns `self.__class__.__name__` (Class-based uniqueness). - Override to provide granular uniqueness (e.g., per file, per ID). ### usage ```python from core.taskSystem import AbstractTask, UniqueType class ProcessFileTask(AbstractTask): def __init__(self, filePath): super().__init__( name=f'Process {filePath}', uniqueType=UniqueType.JOB # Only one task for this file allowed globally ) self.filePath = filePath def uniqueVia(self) -> str: # Unique per file path return f"{self.__class__.__name__}_{self.filePath}" ``` ## Task Status ```python from core.taskSystem import TaskStatus TaskStatus.PENDING # Waiting to start TaskStatus.RUNNING # Currently executing TaskStatus.COMPLETED # Finished successfully TaskStatus.FAILED # Failed with error TaskStatus.CANCELLED # Cancelled by user TaskStatus.PAUSED # Paused (not fully implemented) ``` ## Signals ```python task.statusChanged.connect(lambda status: print(f'Status: {status}')) task.progressUpdated.connect(lambda progress: print(f'Progress: {progress}%')) task.taskFinished.connect(lambda: print('Task finished')) ``` ## Usage Examples ### Basic Task ```python from core.taskSystem import AbstractTask class DownloadTask(AbstractTask): def __init__(self, url, savePath): super().__init__(name=f'Download {url}') self.url = url self.savePath = savePath def handle(self): import requests response = requests.get(self.url, stream=True) total = int(response.headers.get('content-length', 0)) with open(self.savePath, 'wb') as f: downloaded = 0 for chunk in response.iter_content(chunk_size=8192): if self.isStopped(): return f.write(chunk) downloaded += len(chunk) self.setProgress(int(downloaded / total * 100)) def _performCancellationCleanup(self): # Remove partial file if os.path.exists(self.savePath): os.remove(self.savePath) ``` ### With Scoped Resources ```python class BrowserTask(AbstractTask): def __init__(self, url): super().__init__(name=f'Browse {url}') self.url = url def handle(self): from core import QtAppContext ctx = QtAppContext.globalInstance() taskId = self.uuid browser = ChromeBrowserService() ctx.registerScopedService(taskId, browser) try: browser.navigate(self.url) # Do scraping... for i in range(10): if self.isStopped(): return # Process... self.setProgress(i * 10) finally: ctx.releaseScope(taskId) def _performCancellationCleanup(self): # Browser cleanup handled by releaseScope pass ``` ### Persistent Task ```python class DataProcessTask(AbstractTask): serializables = ['dataPath', 'outputPath'] def __init__(self, dataPath, outputPath): super().__init__( name='Process Data', isPersistent=True ) self.dataPath = dataPath self.outputPath = outputPath def handle(self): # Process data... pass @classmethod def deserialize(cls, data: dict): return cls( dataPath=data['dataPath'], outputPath=data['outputPath'] ) def _performCancellationCleanup(self): pass ``` ### With Retry ```python class ApiTask(AbstractTask): def __init__(self, endpoint): super().__init__( name=f'API Call {endpoint}', maxRetries=3, retryDelaySeconds=5 ) self.endpoint = endpoint def handle(self): import requests try: response = requests.get(self.endpoint) response.raise_for_status() # Process response... except requests.RequestException as e: self.fail(f'API call failed: {e}') def _performCancellationCleanup(self): pass ``` ## Lifecycle ```python # 1. Created task = MyTask(name='My Task') # Status: PENDING # 2. Added to queue taskManager.addTask(task) # Status: PENDING # 3. Execution starts # Status: RUNNING # Calls: handle() # 4. Completion # Success: Status: COMPLETED # Error: Status: FAILED # Cancelled: Status: CANCELLED # 5. Finished # Emits: taskFinished signal ``` ## Best Practices ### ✅ DO ```python # Check isStopped() regularly def handle(self): for item in items: if self.isStopped(): return # Process... # Update progress def handle(self): total = len(items) for i, item in enumerate(items): # Process... self.setProgress(int(i / total * 100)) # Use try-finally for cleanup def handle(self): try: # Work... pass finally: # Cleanup... pass # Implement cancellation cleanup def _performCancellationCleanup(self): # Remove temp files, kill processes, etc. pass ``` ### ❌ DON'T ```python # Don't block indefinitely def handle(self): while True: # No stop check! # Work... pass # Don't forget progress updates def handle(self): # Long operation... pass # No setProgress() calls # Don't use UI thread APIs def handle(self): ctx = QtAppContext.globalInstance() network = ctx.network # Wrong! Use requests # Don't forget cleanup implementation def _performCancellationCleanup(self): pass # Empty! Should cleanup resources ``` ## Thread Safety - ✅ `setStatus()`: Thread-safe (emits signal) - ✅ `setProgress()`: Thread-safe (emits signal) - ✅ `isStopped()`: Thread-safe (threading.Event) - ✅ `cancel()`: Thread-safe (threading.Event) ## Related Documentation - [Task System Overview](12-task-system-overview.md) - Architecture - [TaskChain](14-task-chain.md) - Sequential execution - [TaskManager](15-task-manager.md) - Task management - [ServiceLocator](02-dependency-injection.md) - Scoped resources