Quick Start
Installation
pip (recommended)
conda
pip install dataforge-sdk==4.2.1
conda install -c dataforge dataforge-sdk==4.2.1
Basic Usage
from dataforge import Client, Pipeline
from dataforge.transforms import Filter, Aggregate
# Initialize client with your API key
client = Client(api_key="YOUR_API_KEY")
# Create a pipeline
pipeline = Pipeline(name="user-behavior-analysis")
# Add transformation steps
pipeline.add_step(
Filter(condition="event_type IN ('click', 'view', 'purchase')")
)
pipeline.add_step(
Aggregate(
group_by=["user_id", "product_category"],
metrics=[
{"name": "total_spend", "expression": "SUM(purchase_amount)"},
{"name": "engagement_score", "expression": "COUNT(*) * 0.5"}
],
window="1h"
)
)
# Deploy the pipeline
deployment = client.deploy(pipeline)
print(f"Pipeline deployed with ID: {deployment.id}")
Authentication
DataForge SDK supports multiple authentication methods depending on your security requirements:
API Key (Standard)
OAuth 2.0
Service Account
from dataforge import Client
client = Client(api_key="YOUR_API_KEY")
from dataforge import Client
from dataforge.auth import OAuthCredentials
credentials = OAuthCredentials(
client_id="YOUR_CLIENT_ID",
client_secret="YOUR_CLIENT_SECRET",
auth_url="https://auth.yourdomain.com/oauth/token"
)
client = Client(oauth=credentials)
from dataforge import Client
from dataforge.auth import ServiceAccount
service_account = ServiceAccount.from_json_file("path/to/service-account.json")
client = Client(service_account=service_account)
Error Handling
The SDK throws specific exceptions for different error scenarios. Implementing proper error handling ensures your application remains resilient:
from dataforge import Client
from dataforge.exceptions import (
AuthenticationError,
RateLimitExceeded,
ResourceNotFound,
ValidationError,
ServiceUnavailable
)
try:
client = Client(api_key="INVALID_KEY")
result = client.query("SELECT * FROM events")
except AuthenticationError as e:
print(f"Authentication failed: {e}")
# Attempt to refresh credentials or notify administrators
except RateLimitExceeded as e:
print(f"Rate limit hit: {e}")
# Implement exponential backoff
import time
time.sleep(e.retry_after)
except ResourceNotFound as e:
print(f"Resource not found: {e}")
# Create the resource or check configuration
except ValidationError as e:
print(f"Invalid request: {e}")
# Log validation details for debugging
for error in e.validation_errors:
print(f"- {error['field']}: {error['message']}")
except ServiceUnavailable as e:
print(f"Service unavailable: {e}")
# Switch to fallback service or local processing mode
except Exception as e:
print(f"Unexpected error: {e}")
# Generic fallback handling
Version Compatibility Matrix
DataForge SDK Version | Python Version | Key Features | Embedded System Compatibility |
---|---|---|---|
4.2.1 (Current) | 3.7+ | Real-time analytics, Rust extensions | Raspberry Pi 3+, NVIDIA Jetson, x86_64 Linux |
4.1.0 | 3.7+ | Schema validation, Query optimizer | Raspberry Pi 3+, NVIDIA Jetson, x86_64 Linux |
4.0.0 | 3.7+ | Core functionality | Raspberry Pi 3+, NVIDIA Jetson, x86_64 Linux |
3.5.2 (Legacy) | 3.6+ | Batch processing only | Raspberry Pi 2+, BeagleBone, x86 Linux |
3.0.0 (Legacy) | 3.5+ | Limited functionality | Basic x86 systems only |
Memory Optimization for Embedded Systems
When deploying on resource-constrained devices, consider these optimizations:
from dataforge import Client, ResourceConfiguration
# Configure resource limits
resource_config = ResourceConfiguration(
max_memory_mb=128,
max_cache_size_mb=32,
threading_mode="limited",
max_threads=2,
batch_size=50
)
# Initialize with resource constraints
client = Client(
api_key="YOUR_API_KEY",
resources=resource_config,
offline_mode=True, # Enable offline processing capabilities
compression=True # Reduce network bandwidth
)
Common Authentication Issues
Error Code | Description | Resolution |
---|---|---|
AUTH_001 |
Invalid API key | Verify the API key in your dashboard |
AUTH_002 |
Expired API key | Generate a new API key in the console |
AUTH_003 |
Insufficient permissions | Update API key permissions in IAM settings |
AUTH_004 |
IP restriction | Add your current IP to the allowlist |
AUTH_005 |
Rate limit exceeded | Implement request throttling or upgrade plan |
AUTH_006 |
Account suspended | Contact support at support@dataforge.io |
Performance Benchmarks
The following benchmarks were conducted on common embedded platforms:
Platform | Events/Second | Memory Usage | CPU Usage |
---|---|---|---|
Raspberry Pi 4 (4GB) | 12,500 | 86MB | 42% |
NVIDIA Jetson Nano | 18,400 | 112MB | 38% |
Intel NUC (i3) | 47,800 | 145MB | 22% |
AWS t2.micro | 15,300 | 124MB | 68% |
Advanced Usage: Custom Transformations
For specialized data processing needs, you can implement custom transformations:
from dataforge import Client, Pipeline
from dataforge.transforms import CustomTransform
class SentimentAnalysis(CustomTransform):
def _init_(self, text_field, score_field):
self.text_field = text_field
self.score_field = score_field
def transform(self, data_frame):
# Import here to minimize memory footprint
import nltk
from nltk.sentiment import SentimentIntensityAnalyzer
# Download only if needed
try:
nltk.data.find('vader_lexicon')
except LookupError:
nltk.download('vader_lexicon', quiet=True)
# Create analyzer
sia = SentimentIntensityAnalyzer()
# Apply sentiment analysis
def get_sentiment(text):
if not text:
return 0.0
return sia.polarity_scores(text)['compound']
# Add sentiment score column
data_frame[self.score_field] = data_frame[self.text_field].apply(get_sentiment)
return data_frame
# Use the custom transform
client = Client(api_key="YOUR_API_KEY")
pipeline = Pipeline(name="sentiment-pipeline")
pipeline.add_step(SentimentAnalysis(text_field="user_comment", score_field="sentiment_score"))
client.deploy(pipeline)