# Observer Pattern - Publisher/Subscriber Event System
> **Decoupled event-driven communication với smart parameter injection**
## Overview
Observer pattern trong core sử dụng Publisher/Subscriber model để:
- Decouple components (không cần biết nhau)
- Event-driven communication
- Smart parameter injection (type-hint aware)
- Thread-safe event dispatching
- Qt signal integration
## API Reference
### Publisher
**Singleton access:**
```python
from core import Publisher
publisher = Publisher.instance()
# Or via QtAppContext
from core import QtAppContext
publisher = QtAppContext.globalInstance().publisher
```
**Notify event:**
```python
publisher.notify('event_name', arg1, arg2, key1=value1)
```
**Subscribe:**
```python
publisher.subscribe(subscriber, event='specific_event')
publisher.subscribe(subscriber) # Global subscriber (all events)
```
**Unsubscribe:**
```python
publisher.unsubscribe(subscriber, event='specific_event')
publisher.unsubscribe(subscriber) # Unsubscribe from all
```
**Connect Qt signal:**
```python
publisher.connect(widget, 'clicked', 'button_clicked', extraArg='value')
```
### Subscriber
**Base class:**
```python
from core import Subscriber
class MyHandler(Subscriber):
def __init__(self):
super().__init__(events=['event1', 'event2'])
def onEvent1(self, arg):
# Handle event1
pass
def onEvent2(self, arg1, arg2):
# Handle event2
pass
```
**Event handler naming:**
- Method name: `on` + `PascalCase(event_name)`
- Example: `user_login` → `onUserLogin`
## Usage Examples
### Basic Event Publishing
```python
from core import Publisher
publisher = Publisher.instance()
# Publish event
publisher.notify('user.login', userId=123, username='john')
```
### Basic Subscriber
```python
from core import Subscriber
class UserHandler(Subscriber):
def __init__(self):
super().__init__(events=['user.login', 'user.logout'])
def onUserLogin(self, userId: int, username: str):
print(f'User {username} (ID: {userId}) logged in')
def onUserLogout(self, userId: int):
print(f'User {userId} logged out')
# Create handler (auto-subscribes)
handler = UserHandler()
# Publish events
publisher = Publisher.instance()
publisher.notify('user.login', userId=123, username='john')
publisher.notify('user.logout', userId=123)
```
### Smart Parameter Injection
```python
from core import Subscriber
class SmartHandler(Subscriber):
def __init__(self):
super().__init__(events=['data.received'])
# Type hints guide parameter matching
def onDataReceived(self, userId: int, data: dict, timestamp: float):
print(f'User {userId} sent data at {timestamp}')
print(f'Data: {data}')
# Publish with mixed args/kwargs
publisher.notify('data.received',
123, # Matched to userId by type (int)
{'key': 'value'}, # Matched to data by type (dict)
timestamp=1234567890.0 # Matched by name
)
```
### Global Subscriber
```python
from core import Subscriber
class GlobalHandler(Subscriber):
def __init__(self):
# No events specified = global subscriber
super().__init__(events=[], isGlobalSubscriber=True)
def update(self, event: str, *args, **kwargs):
"""Called for ALL events"""
print(f'Event: {event}, Args: {args}, Kwargs: {kwargs}')
# Receives all events
handler = GlobalHandler()
```
### Qt Signal Integration
```python
from core import Publisher
from PySide6.QtWidgets import QPushButton
button = QPushButton('Click Me')
publisher = Publisher.instance()
# Connect Qt signal to event
publisher.connect(button, 'clicked', 'button.clicked', buttonId='submit')
# Handler
class ButtonHandler(Subscriber):
def __init__(self):
super().__init__(events=['button.clicked'])
def onButtonClicked(self, buttonId: str):
print(f'Button {buttonId} clicked')
```
### Controller Integration
```python
from core import BaseController, BaseCtlHandler
from PySide6.QtWidgets import QWidget
class MyController(BaseController, QWidget):
slot_map = {
'saveClicked': ['saveButton', 'clicked'],
'cancelClicked': ['cancelButton', 'clicked']
}
def setupUi(self, widget):
# UI setup
pass
class MyControllerHandler(BaseCtlHandler):
def onSaveClicked(self):
data = self.widgetManager.get('dataInput').text()
# Process data...
# Publish event for other components
publisher = Publisher.instance()
publisher.notify('data.saved', data=data)
def onCancelClicked(self):
publisher = Publisher.instance()
publisher.notify('operation.cancelled')
```
## Smart Parameter Injection
### Matching Priority
1. **By name** (kwargs): `userId=123` matches `userId` parameter
2. **By type hint**: `arg: int` matches first unused int argument
3. **By position**: First unused arg matches first unmatched parameter
### Example
```python
class Handler(Subscriber):
def __init__(self):
super().__init__(events=['complex.event'])
def onComplexEvent(self, userId: int, username: str, data: dict, optional: str = 'default'):
pass
# All these work:
publisher.notify('complex.event', 123, 'john', {'key': 'value'})
publisher.notify('complex.event', userId=123, username='john', data={'key': 'value'})
publisher.notify('complex.event', 123, username='john', data={'key': 'value'})
publisher.notify('complex.event', {'key': 'value'}, 123, 'john') # Type-based matching
```
### Fallback Mechanism
If smart injection fails:
1. Try calling with no args
2. Try calling with first arg only
3. Try calling with first two args
4. Try calling with all args
5. Raise TypeError if all fail
## Architecture
```mermaid
graph TB
Publisher --> GlobalSubs["Global Subscribers
List"]
Publisher --> EventSubs["Event-Specific Subscribers
Dict[event, List]"]
Component1[Component A] -->|notify| Publisher
Component2[Component B] -->|notify| Publisher
Publisher -->|update| Handler1[Handler 1]
Publisher -->|update| Handler2[Handler 2]
Publisher -->|update| Handler3[Handler 3]
QtSignal[Qt Signal] -->|connect| Publisher
Handler1 -.->|no coupling| Component1
Handler2 -.->|no coupling| Component2
style Publisher fill:#e8f5e9
style GlobalSubs fill:#fff4e1
style EventSubs fill:#e1f5ff
```
## Best Practices
### ✅ DO
```python
# Use descriptive event names with namespaces
publisher.notify('user.login', userId=123)
publisher.notify('data.saved', recordId=456)
publisher.notify('task.completed', taskId='abc')
# Use type hints for smart injection
def onUserLogin(self, userId: int, username: str):
pass
# Subscribe to specific events only
class MyHandler(Subscriber):
def __init__(self):
super().__init__(events=['user.login', 'user.logout'])
# Unsubscribe when done
handler = MyHandler()
# ... use handler ...
Publisher.instance().unsubscribe(handler)
# Use kwargs for clarity
publisher.notify('user.login', userId=123, username='john')
```
### ❌ DON'T
```python
# Don't use generic event names
publisher.notify('event', data) # Bad
publisher.notify('user.login', userId=123) # Good
# Don't forget type hints
def onUserLogin(self, userId, username): # Harder to match
pass
# Don't subscribe to all events unless needed
class MyHandler(Subscriber):
def __init__(self):
super().__init__(events=[]) # Receives nothing
# Should specify events
# Don't call handler methods directly
handler.onUserLogin(123, 'john') # Wrong! Use publisher.notify
# Don't create circular dependencies
class HandlerA(Subscriber):
def onEventA(self):
publisher.notify('event.b') # OK
class HandlerB(Subscriber):
def onEventB(self):
publisher.notify('event.a') # Circular! Avoid
```
## Thread Safety
- ✅ `notify()`: Thread-safe (QMutex)
- ✅ `subscribe()`/`unsubscribe()`: Thread-safe (QMutex)
- ✅ Event handlers called in publisher's thread
- ⚠️ Handler exceptions logged but don't stop other handlers
## Common Patterns
### Application Events
```python
# Define application-wide events
class AppEvents:
APP_READY = 'app.ready'
APP_SHUTDOWN = 'app.shutdown'
USER_LOGIN = 'user.login'
USER_LOGOUT = 'user.logout'
DATA_CHANGED = 'data.changed'
# Publish
publisher.notify(AppEvents.USER_LOGIN, userId=123)
# Subscribe
class AppHandler(Subscriber):
def __init__(self):
super().__init__(events=[
AppEvents.APP_READY,
AppEvents.USER_LOGIN
])
```
### Cross-Component Communication
```python
# Component A
class DataService:
def saveData(self, data):
# Save to database
publisher = Publisher.instance()
publisher.notify('data.saved', data=data)
# Component B (no dependency on Component A)
class UIHandler(Subscriber):
def __init__(self):
super().__init__(events=['data.saved'])
def onDataSaved(self, data: dict):
# Update UI
pass
```
### Task Progress Events
```python
# Task
class MyTask(AbstractTask):
def handle(self):
publisher = Publisher.instance()
for i in range(100):
# Do work...
publisher.notify('task.progress', taskId=self.uuid, progress=i)
publisher.notify('task.completed', taskId=self.uuid)
# UI Handler
class TaskProgressHandler(Subscriber):
def __init__(self):
super().__init__(events=['task.progress', 'task.completed'])
def onTaskProgress(self, taskId: str, progress: int):
# Update progress bar
pass
def onTaskCompleted(self, taskId: str):
# Show completion message
pass
```
## Related Documentation
- [BaseController](04-controller-architecture.md) - Controller integration
- [QtAppContext](01-application-context.md) - Publisher access
- [Common Use Cases](20-common-use-cases.md) - Practical examples
## Troubleshooting
**Q: Handler not called**
```python
# Check event name matches
class MyHandler(Subscriber):
def __init__(self):
super().__init__(events=['user.login']) # Must match exactly
def onUserLogin(self): # Method name must be on + PascalCase(event)
pass
publisher.notify('user.login') # Must match event in __init__
```
**Q: Parameter injection fails**
```python
# Add type hints
def onUserLogin(self, userId: int, username: str): # With type hints
pass
# Or use kwargs
publisher.notify('user.login', userId=123, username='john')
```
**Q: Handler called multiple times**
```python
# Check for duplicate subscriptions
handler = MyHandler() # Subscribes once
handler = MyHandler() # Creates new instance, subscribes again
# Unsubscribe old handlers
Publisher.instance().unsubscribe(oldHandler)
```