The core engine for Salesforce AI agents. A comprehensive, production-ready Python library for Salesforce integration.
To democratize Salesforce integration for the AI era. We believe that connecting AI agents to business data should be standard, open, and accessible to every developer, not just large enterprises. kinetic-core is the open-source foundation of this vision.
Part of the KineticMCP ecosystem - AI-powered Salesforce integration tools by Antonio Trento
Important
Legal Disclaimer: This project is an independent open-source library and is not affiliated with, sponsored by, or endorsed by Salesforce, Inc. "Salesforce" is a trademark of Salesforce, Inc.
Warning
Deprecation Notice: The package salesforce-toolkit is deprecated. Please use kinetic-core instead.
-
π Multiple Authentication Methods
- JWT Bearer Flow (recommended for production)
- OAuth 2.0 Password Flow
- Environment-based configuration
-
π Complete CRUD Operations
- Works with any Salesforce object (standard or custom)
- Create, Read, Update, Delete, Upsert
- Bulk operations via Composite API
- Query with automatic pagination
-
β‘ Bulk API v2 Support (NEW in v2.0.0)
- Process millions of records efficiently
- Full support for all Bulk API v2 operations
- Smart exponential backoff polling
- Progress tracking with callbacks
- Comprehensive per-record error reporting
- Async job processing
-
ποΈ Metadata API Support (NEW in v2.1.0)
- Configuration as Code: Manage Objects and Fields via Python
- Retrieve: Backup or analyze existing org metadata
- Deploy: Create custom objects and fields programmatically
- Compare: Diff local schema against the org
- Templates: Standard business object templates included
-
πΊοΈ Flexible Field Mapping
- Simple field renaming
- Value transformations with custom functions
- Default values
- Nested field access (dot notation)
- Conditional mapping
-
π ETL Pipeline Framework
- Configuration-driven sync pipelines
- Multiple sync modes (INSERT, UPDATE, UPSERT, DELETE)
- Batch processing
- Progress tracking with callbacks
- Comprehensive error handling
-
π Production-Ready Logging
- File and console output
- Automatic log rotation
- Colored console output
- Contextual logging
- Configurable log levels
-
π οΈ Command-Line Interface
- Query, create, update, delete from terminal
- Run sync pipelines from YAML config
- Describe Salesforce objects
- Test authentication
pip install kinetic-coregit clone https://github.com/yourusername/kinetic-core.git
cd kinetic-core
pip install -e .# Database support
pip install kinetic-core[database]
# Data manipulation (pandas, numpy)
pip install kinetic-core[data]
# Development tools
pip install kinetic-core[dev]Create a .env file in your project root:
# JWT Authentication (Recommended)
SF_CLIENT_ID=3MVG9...
SF_USERNAME=user@example.com.sandbox
SF_PRIVATE_KEY_PATH=/path/to/server.key
SF_LOGIN_URL=https://test.salesforce.com
# Logging
# Logging
LOG_DIR=./logs
LOG_LEVEL=INFO
# Optional: Salesforce API Version
SF_API_VERSION=v62.0 # Defaults to v62.0 if not setfrom kinetic_core import JWTAuthenticator, SalesforceClient
# Authenticate
auth = JWTAuthenticator.from_env()
session = auth.authenticate()
# Create client
client = SalesforceClient(session)
# Create a record
account_id = client.create("Account", {
"Name": "ACME Corporation",
"Industry": "Technology"
})
# Query records
accounts = client.query("SELECT Id, Name FROM Account LIMIT 10")
# Update a record
client.update("Account", account_id, {"Phone": "555-1234"})
# Delete a record
client.delete("Account", account_id)from kinetic_core import (
JWTAuthenticator,
SalesforceClient,
FieldMapper,
SyncPipeline,
SyncMode
)
# Authenticate
auth = JWTAuthenticator.from_env()
session = auth.authenticate()
client = SalesforceClient(session)
# Define field mapping
mapper = FieldMapper({
"customer_name": "Name",
"customer_email": "Email",
"industry_code": ("Industry", lambda x: x.title()) # Transform
})
# Create pipeline
pipeline = SyncPipeline(
client=client,
sobject="Account",
mapper=mapper,
mode=SyncMode.INSERT,
batch_size=200
)
# Sync data
source_data = [
{"customer_name": "ACME", "customer_email": "info@acme.com"},
{"customer_name": "Globex", "customer_email": "contact@globex.com"}
]
result = pipeline.sync(source_data)
print(f"Synced {result.success_count}/{result.total_records} records")β¨ NEW in v2.0.0 - Process millions of records efficiently:
from kinetic_core import JWTAuthenticator, SalesforceClient
# Authenticate
auth = JWTAuthenticator.from_env()
session = auth.authenticate()
client = SalesforceClient(session)
# Bulk insert 10,000 records
records = [
{"Name": f"Account {i}", "Industry": "Technology"}
for i in range(10000)
]
result = client.bulk.insert("Account", records)
print(f"β Success: {result.success_count}")
print(f"β Failed: {result.failed_count}")
# Bulk query for large exports
query = "SELECT Id, Name FROM Account WHERE CreatedDate = THIS_YEAR"
result = client.bulk.query(query)
print(f"Retrieved {result.record_count} records")
# Bulk update with external ID
updates = [
{"External_Key__c": "EXT001", "Name": "Updated Name 1"},
{"External_Key__c": "EXT002", "Name": "Updated Name 2"}
]
result = client.bulk.upsert("Account", updates, "External_Key__c")Key Features:
- β‘ Up to 150 million records per job
- π Async processing with progress tracking
- π Detailed per-record error reporting
- π― Smart retry and exponential backoff
π Complete Bulk API Documentation β
β¨ NEW in v2.1.0 - Manage your Salesforce data model:
from kinetic_core.metadata import CustomField, CustomObject
# 1. Create a new field
field = CustomField(
sobject="Account",
name="Loyalty_Tier__c",
label="Loyalty Tier",
type="Picklist",
options=[
{"fullName": "Silver", "default": True},
{"fullName": "Gold"},
{"fullName": "Platinum"}
]
)
client.metadata.deploy_field(field)
# 2. Retrieve org metadata
client.metadata.retrieve(
component_types=["CustomObject"],
output_dir="./schema_backup"
)π Complete Metadata API Documentation β
# Test authentication
sf-toolkit auth --method jwt
# Query Salesforce
sf-toolkit query "SELECT Id, Name FROM Account LIMIT 10"
# Create a record
sf-toolkit create Account --data '{"Name": "ACME Corp"}'
# Run a sync pipeline
sf-toolkit sync --config sync_config.yaml
# Describe an object
sf-toolkit describe Account --fields- Complete API Reference - Full API documentation
- Bulk Quick Start - Get started in 5 minutes
- Practical Examples - Real-world use cases
- User Guide - Complete usage documentation
- Installation - Installation guide
- Quick Start - Getting started guide
- Salesforce Setup - Configure Salesforce
- Testing Guide - Test your integration
- Docker Guide - Containerized deployment
from kinetic_core import JWTAuthenticator
# From environment variables
auth = JWTAuthenticator.from_env()
# Or manual configuration
auth = JWTAuthenticator(
client_id="3MVG9...",
username="user@example.com",
private_key_path="/path/to/server.key",
login_url="https://test.salesforce.com"
)
session = auth.authenticate()from kinetic_core import OAuthAuthenticator
# From environment variables
auth = OAuthAuthenticator.from_env()
# Or manual configuration
auth = OAuthAuthenticator(
client_id="3MVG9...",
client_secret="1234567890ABCDEF",
username="user@example.com",
password="your_password",
security_token="ABC123",
login_url="https://login.salesforce.com"
)
session = auth.authenticate()# Single record
account_id = client.create("Account", {
"Name": "ACME Corp",
"Industry": "Technology"
})
# Batch create (up to 200 records)
results = client.create_batch("Contact", [
{"FirstName": "John", "LastName": "Doe"},
{"FirstName": "Jane", "LastName": "Smith"}
])# SOQL query with automatic pagination
accounts = client.query(
"SELECT Id, Name, Industry FROM Account WHERE Industry = 'Technology'"
)
# Query first result
account = client.query_one(
"SELECT Id, Name FROM Account WHERE Name = 'ACME Corp'"
)
# Get by ID
account = client.get("Account", "001XXXXXXXXXXXX")
# Count records
total = client.count("Account")
tech_count = client.count("Account", "Industry = 'Technology'")# Update by ID
client.update("Account", "001XXXXXXXXXXXX", {
"Phone": "555-9999",
"Industry": "Manufacturing"
})
# Upsert (requires External ID field)
account_id = client.upsert(
"Account",
"External_Key__c",
"EXT-12345",
{"Name": "ACME Corp", "Industry": "Tech"}
)client.delete("Account", "001XXXXXXXXXXXX")from kinetic_core import FieldMapper
mapper = FieldMapper({
"first_name": "FirstName",
"last_name": "LastName",
"email": "Email"
})
source = {"first_name": "John", "last_name": "Doe", "email": "john@example.com"}
target = mapper.transform(source)
# Result: {"FirstName": "John", "LastName": "Doe", "Email": "john@example.com"}mapper = FieldMapper({
# Simple rename
"customer_name": "Name",
# With transformation
"email": ("Email", lambda x: x.lower()),
# With default value
"status": ("Status__c", None, "Active"),
# With both transformation and default
"created_at": (
"CreatedDate",
lambda x: x.strftime("%Y-%m-%d") if x else None,
datetime.now().strftime("%Y-%m-%d")
),
# Nested field access
"address.city": "BillingCity",
"address.state": "BillingState"
})# Available via YAML configuration
transforms = [
"lowercase", # Convert to lowercase
"uppercase", # Convert to uppercase
"strip", # Strip whitespace
"int", # Convert to integer
"float", # Convert to float
"bool", # Convert to boolean
"date_iso", # Format date as YYYY-MM-DD
"datetime_iso" # Format datetime as ISO 8601
]from kinetic_core import SyncPipeline, SyncMode
pipeline = SyncPipeline(
client=client,
sobject="Account",
mapper=mapper,
mode=SyncMode.INSERT,
batch_size=200,
stop_on_error=False
)
result = pipeline.sync(source_data)def on_record_success(record, salesforce_id):
print(f"β Synced: {record['name']} -> {salesforce_id}")
def on_record_error(record, error):
print(f"β Failed: {record['name']} - {error}")
def on_batch_complete(batch_num, total_batches, result):
print(f"Batch {batch_num}/{total_batches} done")
pipeline = SyncPipeline(
client=client,
sobject="Account",
mapper=mapper,
mode=SyncMode.INSERT,
callbacks={
"on_record_success": on_record_success,
"on_record_error": on_record_error,
"on_batch_complete": on_batch_complete
}
)# sync_config.yaml
source:
type: json
path: data/accounts.json
pipeline:
sobject: Account
mode: upsert
external_id_field: External_Key__c
batch_size: 200
mapping:
customer_name: Name
customer_email: Email
industry_code:
target: Industry
transform: uppercaseimport yaml
from kinetic_core import SyncPipeline
with open("sync_config.yaml") as f:
config = yaml.safe_load(f)
pipeline = SyncPipeline.from_config(config["pipeline"], client)
result = pipeline.sync(source_data)from kinetic_core.logging import setup_logger
import logging
logger = setup_logger(
name="my_app",
log_dir="./logs",
log_level=logging.INFO,
console_colors=True
)
logger.info("Application started")
logger.error("An error occurred", exc_info=True)from kinetic_core.logging import ContextLogger, setup_logger
base_logger = setup_logger("my_app")
context_logger = ContextLogger(base_logger, context={
"transaction_id": "TX-12345",
"user_id": "user@example.com"
})
context_logger.info("Processing record")
# Logs: "Processing record [transaction_id=TX-12345, user_id=user@example.com]"from kinetic_core.utils import (
sanitize_soql,
build_soql_query,
validate_salesforce_id,
format_datetime_for_sf,
generate_external_id,
batch_records
)
# Sanitize SOQL
safe_name = sanitize_soql("O'Brien & Associates")
# Build SOQL query
query = build_soql_query(
sobject="Account",
fields=["Id", "Name", "Industry"],
where="Industry = 'Technology'",
limit=100
)
# Validate Salesforce ID
if validate_salesforce_id("001XXXXXXXXXXXXXXX"):
print("Valid ID")
# Format datetime
sf_datetime = format_datetime_for_sf(datetime.now())
# Generate external ID
ext_id = generate_external_id("CUST", timestamp=True)
# Returns: "CUST-20251205-103000-abc123"
# Batch records
batches = batch_records(records, batch_size=200)The examples/ directory contains comprehensive examples:
- 01_basic_authentication.py - Authentication methods
- 02_crud_operations.py - CRUD operations
- 03_data_sync_pipeline.py - Data synchronization
Run an example:
cd examples
python 01_basic_authentication.pyCopy config/.env.example to .env and configure:
# Salesforce
SF_CLIENT_ID=your_consumer_key
SF_USERNAME=user@example.com
SF_PRIVATE_KEY_PATH=/path/to/server.key
SF_LOGIN_URL=https://test.salesforce.com
# Logging
LOG_DIR=./logs
LOG_LEVEL=INFOSee config/sync_config_example.yaml for pipeline configuration.
# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Run tests with coverage
pytest --cov=salesforce_toolkit --cov-report=html
# Run linter
flake8 salesforce_toolkit/
# Run type checker
mypy salesforce_toolkit/
# Format code
black salesforce_toolkit/SalesforceSession- Authenticated session objectSalesforceClient- Main API client for CRUD operationsJWTAuthenticator- JWT Bearer Flow authenticationOAuthAuthenticator- OAuth Password Flow authenticationFieldMapper- Field mapping and transformation engineSyncPipeline- ETL pipeline for data synchronization
salesforce_toolkit.auth- Authentication providerssalesforce_toolkit.core- Core client and session managementsalesforce_toolkit.mapping- Field mapping enginesalesforce_toolkit.pipeline- Sync pipeline frameworksalesforce_toolkit.logging- Logging systemsalesforce_toolkit.utils- Utility functions
- Support for Bulk API 2.0 (async bulk operations)
- Metadata API support (deploy/retrieve)
- Streaming API (PushTopic, Generic Streaming)
- Built-in retry mechanism with exponential backoff
- Dry-run mode for pipelines
- Performance monitoring and metrics
- Integration with popular ORMs (SQLAlchemy, Django)
Contributions are welcome! Please follow these steps:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
git clone https://github.com/yourusername/kinetic-core.git
cd kinetic-core
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -e ".[dev]"Kinetic Core is the foundational library powering KineticMCP, an AI-powered Salesforce integration platform.
KineticMCP combines the capabilities of Kinetic Core with the Model Context Protocol (MCP) to enable intelligent automation, natural language interactions, and advanced data management for Salesforce.
- π Website: kineticmcp.com
- π€ AI Integration: Natural language Salesforce operations
- π MCP Protocol: Seamless integration with AI assistants
- π Advanced Analytics: AI-driven insights and reporting
Built and maintained by Antonio Trento - connecting Salesforce with the future of AI.
This project is licensed under the MIT License - see the LICENSE file for details.
Copyright (c) 2025 Antonio Trento. All rights reserved.
See COPYRIGHT for detailed attribution and trademark information.
Antonio Trento
- GitHub: @antoniotrento
- LinkedIn: Antonio Trento
- Portfolio: Salesforce Toolkit Case Study
- Inspired by Simple Salesforce
- Built with Requests
- Powered by PyJWT
Made with β€οΈ by Antonio Trento
