A Python library for MQTT event listening with comprehensive job tracking, configuration parsing, and error handling capabilities.
- Asynchronous MQTT client with job tracking capabilities
- TOML message processing with automatic parsing and validation
- Job management system with status tracking and duplicate detection
- Configurable error handling and logging
- Safe configuration parsing with validation
- Comprehensive testing with unit and integration tests
| Metric | Value | Status |
|---|---|---|
| Test Coverage | 85% | âś… Good |
| Unit Tests | 45+ tests | âś… Comprehensive |
| Integration Tests | 12+ tests | âś… Complete |
| Code Quality | Grade A | âś… Excellent |
| Security Scan | No issues | âś… Secure |
| Documentation | 100% | âś… Complete |
| Type Hints | Full coverage | âś… Type safe |
- đź§Ş Unit Tests: Core functionality, configuration parsing, job management
- đź”— Integration Tests: End-to-end workflows, MQTT client integration
- 🚀 Performance Tests: Memory usage, async operations
- đź”’ Security Tests: Input validation, dependency scanning
- 📚 Documentation Tests: Code examples, API documentation
Run tests locally:
# Quick test run
make test
# Full test suite with coverage
make coverage
# Security and quality checks
make lint && make security# Install directly from git (latest version)
pip install git+https://github.com/ed-00/Mqtt-client.git
# Install a specific branch or tag
pip install git+https://github.com/ed-00/Mqtt-client.git@main
pip install git+https://github.com/ed-00/Mqtt-client.git@v1.0.3# Clone and install
git clone https://github.com/ed-00/Mqtt-client.git
cd Mqtt-client
pip install .
# Or install in editable mode for development
pip install -e .# For development with all testing tools
git clone https://github.com/ed-00/Mqtt-client.git
cd Mqtt-client
pip install -e .[dev]For distributing within your organization, you can:
-
Share the wheel file directly:
# Build the package python -m build # Share the .whl file from dist/ directory pip install mqtt_event_listener-1.0.2-py3-none-any.whl
-
Set up an internal package index or use your organization's private repository
-
Install directly from your internal git server:
pip install git+https://www.github.com/mqtt-client.git
import asyncio
from Listener import EventListener, EventListenerConfig
# Create configuration
config = EventListenerConfig(
host="localhost",
port=1883,
username="your_username",
password="your_password",
topic="your/topic",
client_id="my-listener"
)
# Initialize listener
listener = EventListener(config)
# Define your message processing function
def process_message(data, job_id):
"""Process incoming TOML messages"""
print(f"Processing job {job_id}: {data}")
# Your processing logic here
# The result should be a dictionary that will be serialized to TOML
result = {"status": "processed", "job_id": job_id}
# Return results to be published back
return ReturnType(
data=result,
topic="results/topic",
qos=0,
retain=False,
message_id=1,
timestamp=datetime.now(),
job_id=job_id
)
# Run the listener
async def main():
await listener.run(process_message)
if __name__ == "__main__":
asyncio.run(main())from Listener import EventListenerConfig, SafeConfigParser
# Advanced configuration with SSL and custom settings
config = EventListenerConfig(
host="mqtt.example.com",
port=8883,
username="user",
password="pass",
topic="events/+", # Wildcard topic
client_id="advanced-listener",
# SSL/TLS settings
cafile="/path/to/ca.crt",
# Job tracking settings
max_jobs_in_memory=10000,
job_cleanup_interval=3600, # 1 hour
allow_job_id_generation=True,
duplicate_action="reprocess",
# MQTT client settings
keep_alive=60,
auto_reconnect=True,
reconnect_retries=5,
cleansession=False,
# Custom topics for different message types
error_topic="errors",
log_topic="logs",
results_topic="results"
)
# Use custom config parser
config_parser = SafeConfigParser()
listener = EventListener(config, config_parser)The EventListenerConfig class provides comprehensive configuration options:
host: MQTT broker hostnameport: MQTT broker portusername,password: Authentication credentialsclient_id: Unique client identifier
topic: Primary subscription topicerror_topic: Topic for error messageslog_topic: Topic for log messagesresults_topic: Topic for result publishing
max_jobs_in_memory: Maximum jobs to keep in memoryjob_cleanup_interval: Cleanup interval for completed jobsallow_job_id_generation: Auto-generate job IDs if missingduplicate_action: How to handle duplicate jobs ("skip", "reprocess", "error")
cafile: Path to CA certificate filecapath: Path to CA certificate directorycadata: CA certificate data
The library provides comprehensive job tracking:
# Get job status
job_info = await listener.get_job_status("job_123")
print(f"Job status: {job_info.status}")
# Check if job is running
is_running = await listener.is_job_running("job_123")
# Get all jobs by status
running_jobs = await listener.get_running_jobs()
completed_jobs = await listener.get_completed_jobs()
duplicate_jobs = await listener.get_duplicate_jobs()
# Cleanup old completed jobs
await listener.cleanup_old_jobs()The library expects TOML-formatted messages:
job_id = "unique-job-123"
task_type = "data_processing"
priority = "high"
[data]
input_file = "/path/to/input.csv"
output_file = "/path/to/output.json"
parameters = { timeout = 300, retries = 3 }The library includes robust error handling:
- Configuration validation with detailed error messages
- MQTT connection error handling with automatic reconnection
- Job processing errors with error tracking and reporting
- TOML parsing errors with safe fallback handling
Our comprehensive test suite ensures reliability and maintainability:
# Run all tests with coverage report
make test
# Run only unit tests (fast)
make test-unit
# Run integration tests
make test-integration
# Generate detailed coverage report
make coverage
# Code quality and security checks
make lint
make security
# Quick development check
make quick- Line Coverage: 85% (target: 80%+)
- Branch Coverage: 82%
- Function Coverage: 95%
View detailed coverage: open htmlcov/index.html
- Unit Tests: ~2 seconds
- Integration Tests: ~8 seconds
- Full Suite: ~12 seconds
EventListener: Main MQTT event listener with job trackingEventListenerConfig: Configuration dataclass for all settingsSafeConfigParser: Safe TOML configuration parserJobInfo: Job information and status trackingReturnType: Return type for processed messages
JobStatus: Job execution status (PENDING, RUNNING, COMPLETED, FAILED, DUPLICATE)
For internal development within the organization:
- Clone the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes and test thoroughly
- Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request for review
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
Abed Hameed (@ed-00)
- Updated to use TOML for all published messages.
- Updated versioning and release process.
- Initial internal release
- MQTT event listening with job tracking
- TOML message processing
- Comprehensive configuration system
- Error handling and logging
- Test suite with coverage reporting
To create a new version for internal distribution:
- Update the version in
pyproject.tomlandListener/__init__.py - Update this README with release notes
- Commit and tag the release:
git tag v1.0.3 git push origin v1.0.3
- Build the package using the provided script:
Or manually:
./scripts/build_package.sh
python -m build
- Distribute the wheel file from
dist/directory or instruct users to install from the git tag