- Configuration Module
- Data Loading Module
- Preprocessing Module
- Feature Extraction Module
- Classification Module
- Evaluation Module
- Augmentation Module
- API Loader Module
- Experiment Scripts
File: config.py
Manages all pipeline configuration through environment variables, supporting 50+ parameters across 8 categories. Implements singleton pattern for global config access.
Configuration container with 50+ fields organized by category.
Key Attributes:
DATA_SOURCE(str): Data source type - "mock", "kaggle", or "huggingface" (default: "mock")NUM_SAMPLES(Optional[int]): Maximum samples to use (default: None = all)TEST_MODE(bool): Quick test mode with reduced samples (default: False)CROP(bool): Apply circular cropping (default: True)RESIZE_SIZE(int): Target image dimension (default: 256)CLAHE(bool): Apply CLAHE enhancement (default: True)CHANNEL(str): Channel selection - "green" or "rgb" (default: "green")EXTRACT_COLOR(bool): Enable color features (default: True)EXTRACT_TEXTURE_GLCM(bool): Enable GLCM texture (default: True)EXTRACT_TEXTURE_LBP(bool): Enable LBP histogram (default: True)USE_AUGMENTATION(bool): Enable data augmentation (default: False)RF_N_ESTIMATORS(int): RF tree count (default: 200)RF_MAX_DEPTH(Optional[int]): RF max depth (default: None)VERBOSE(bool): Verbose output (default: True)
@classmethod
def from_env(cls) -> "Config":
"""Load configuration from environment variables."""Purpose: Reads all DR_* prefixed environment variables and creates Config instance.
Logic:
- Iterate through all fields
- Check for corresponding DR_* environment variable
- Parse type: bool (true/false), int, float, string
- Use default if env var not set
- Return Config instance with all values
Example:
os.environ['DR_DATA_SOURCE'] = 'kaggle'
os.environ['DR_NUM_SAMPLES'] = '500'
cfg = Config.from_env()
# cfg.DATA_SOURCE = 'kaggle'
# cfg.NUM_SAMPLES = 500def to_dict(self) -> dict:
"""Convert config to dictionary."""Purpose: Serialize Config to dict for JSON export or programmatic access.
Returns: Dictionary with all configuration key-value pairs.
def print_config(self):
"""Print configuration."""Purpose: Pretty-print all configuration values to console.
Output:
======================================================================
DR CLASSIFICATION PIPELINE CONFIGURATION
======================================================================
DATA_SOURCE = mock
NUM_SAMPLES = (None)
CROP = True
...
======================================================================
def get_config() -> Config:
"""Get global config instance."""Purpose: Lazy-load singleton Config from environment variables.
Behavior:
- First call: Creates Config instance via
Config.from_env() - Subsequent calls: Returns cached instance
- Ensures consistent config throughout pipeline execution
Returns: Global Config instance
def reset_config():
"""Reset global config (useful for testing)."""Purpose: Clear cached Config instance (for testing different configs).
Usage:
reset_config()
get_config() # Reloads from env variablesFile: data/loader.py
Handles loading images and labels from local filesystem, splitting data, and managing dataset information.
def load_data(images_dir: str, csv_path: str) -> Tuple[Dict[str, np.ndarray], pd.DataFrame]:
"""Load images and labels from disk."""Parameters:
images_dir(str): Directory containing PNG imagescsv_path(str): Path to CSV with columns: id_code, diagnosis
Returns:
images(dict): {id_code: numpy_array} mappingdf(DataFrame): Labels with id_code and diagnosis columns
Logic:
- Read CSV file
- For each row:
- Construct image path:
{images_dir}/{id_code}.png - Read image with cv2.imread() (BGR format)
- Store in dictionary
- Construct image path:
- Return both images dict and DataFrame
Example:
images, df = load_data('train_images', 'train.csv')
# images['0a0c']: numpy array (512, 512, 3)
# df.shape: (3662, 2) with columns [id_code, diagnosis]def create_train_val_split(df: pd.DataFrame, val_size: float = 0.15, seed: int = 42) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Create stratified train/validation split."""Parameters:
df(DataFrame): Input dataframe with diagnosis columnval_size(float): Validation set ratio (0.15 = 15%)seed(int): Random seed for reproducibility
Returns:
train_df(DataFrame): Training set (~85% of data)val_df(DataFrame): Validation set (~15% of data)
Logic:
- Use sklearn's
train_test_split()with stratification - Stratify on diagnosis column to preserve class distribution
- Set random_state for reproducibility
- Return split dataframes
Example:
train_df, val_df = create_train_val_split(df, val_size=0.15)
# train_df: 3112 samples (85%)
# val_df: 550 samples (15%)
# Class distribution preserved in both setsdef get_class_distribution(df: pd.DataFrame) -> Dict[int, int]:
"""Get class distribution from dataframe."""Parameters:
df(DataFrame): Input dataframe with diagnosis column
Returns: Dictionary {class_label: count}
Example:
dist = get_class_distribution(df)
# {0: 1485, 1: 590, 2: 690, 3: 430, 4: 370}def print_dataset_info(images: dict, df: pd.DataFrame):
"""Print dataset statistics."""Purpose: Print dataset overview including:
- Total samples
- Image dimensions
- Class distribution
- Class percentages
File: pipeline/preprocessing.py
Transforms raw retinal images into normalized, enhanced 256×256 single-channel format using circular cropping, resizing, and CLAHE contrast enhancement.
Configuration for preprocessing pipeline.
Attributes:
crop(bool): Apply circular cropping (default: True)resize_size(int): Target size for resizing (default: 256)clahe(bool): Apply CLAHE (default: True)channel(str): 'green' or 'rgb' (default: 'green')clahe_clip_limit(float): CLAHE clip limit (default: 2.0)clahe_grid_size(int): CLAHE grid size (default: 8)
Main preprocessing engine.
Methods:
def __init__(self, config: PreprocessingConfig):
"""Initialize preprocessing pipeline."""Parameters: config - PreprocessingConfig instance
Logic: Stores config for use in process() method
def process(self, image: np.ndarray) -> np.ndarray:
"""Apply preprocessing to image."""Parameters:
image(np.ndarray): Input BGR image from cv2.imread()
Returns: Processed single-channel image (256×256) or RGB if configured
Processing Pipeline:
-
Circular Crop (if config.crop=True):
- Find largest inscribed circle in image
- Mask everything outside circle to black
- Purpose: Remove scanner borders
-
Channel Selection:
- If 'green': Extract green channel (B channel in BGR)
- If 'rgb': Keep all three channels
- Purpose: Green channel has best vessel/lesion contrast
-
CLAHE Enhancement (if config.clahe=True):
- Apply Contrast Limited Adaptive Histogram Equalization
- Parameters: clip_limit, grid_size
- Purpose: Enhance local contrast while preventing artifacts
-
Resizing:
- Resize to config.resize_size × config.resize_size
- Use cv2.INTER_LINEAR interpolation
- Purpose: Normalize input dimensions for feature extraction
Output Shape:
- If green: (256, 256) - 2D array
- If rgb: (256, 256, 3) - 3D array
Example:
config = PreprocessingConfig(crop=True, channel='green')
preprocessor = PreprocessingPipeline(config)
img = cv2.imread('image.png') # (512, 512, 3)
processed = preprocessor.process(img) # (256, 256)def _circular_crop(self, image: np.ndarray) -> np.ndarray:
"""Apply circular crop to image."""Purpose: Remove scanner borders by masking to inscribed circle.
Logic:
- Find image center: (H/2, W/2)
- Calculate max radius: min(H, W) / 2
- Create circular mask using cv2.circle()
- Apply mask:
result = image * mask - Return masked image
Why: Retinal images are circular; scanner borders add noise
Example Result:
Input: 512×512 BGR with black borders
Output: 512×512 with circular mask applied
def _apply_clahe(self, image: np.ndarray) -> np.ndarray:
"""Apply CLAHE enhancement."""Purpose: Enhance local contrast to improve vessel/lesion visibility.
Logic:
- Convert to uint8 if needed
- Create CLAHE object with clip_limit and grid_size
- Apply:
enhanced = clahe.apply(image) - Return enhanced image
Parameters Used:
-
clip_limit(default 2.0): Max slope allowed in histogram- Higher = more aggressive enhancement
- Too high creates artifacts
-
grid_size(default 8): Local region size- Larger = smoothing effect
- Smaller = more detail enhancement
Why: Medical images need contrast enhancement for subtle features
File: pipeline/feature_extraction.py
Extracts 72 hand-crafted features across 6 categories (color, texture, vessels, lesions, statistics) from preprocessed images for Random Forest classification.
Configuration for feature extraction.
Attributes:
color(bool): Extract color features (default: True)texture_glcm(bool): Extract GLCM texture (default: True)texture_lbp(bool): Extract LBP histogram (default: True)vessels(bool): Extract vessel features (default: True)lesions(bool): Extract lesion features (default: True)statistics(bool): Extract statistics (default: True)glcm_distances(list): GLCM distances (default: [1, 2])glcm_angles(list): GLCM angles in radians (default: [0, π/4, π/2, 3π/4])lbp_radius(int): LBP radius (default: 1)lbp_n_points(int): LBP neighbor points (default: 8)
Main feature extraction engine.
Methods:
def __init__(self, config: FeatureExtractorConfig):
"""Initialize feature extractor."""Logic: Stores config and initializes feature name lists
def extract(self, image: np.ndarray) -> np.ndarray:
"""Extract all features from image."""Parameters:
image(np.ndarray): Preprocessed image (256×256 or 256×256×3)
Returns: 1D array of features (length ~72)
Logic:
features = []
if config.color:
features += color_features()
if config.texture_glcm:
features += glcm_features()
if config.texture_lbp:
features += lbp_features()
if config.vessels:
features += vessel_features()
if config.lesions:
features += lesion_features()
if config.statistics:
features += statistical_features()
return concatenate(features)
Example:
config = FeatureExtractorConfig(color=True, texture_glcm=True)
extractor = FeatureExtractor(config)
img = preprocessed_image # (256, 256)
features = extractor.extract(img) # (72,) arraydef _extract_color_features(self, image: np.ndarray) -> np.ndarray:
"""Extract color features."""Returns: Array with 6 features
Features Extracted:
- Mean of R channel
- Mean of G channel
- Mean of B channel
- Std dev of R channel
- Std dev of G channel
- Std dev of B channel
Logic:
return np.array([
image[:,:,0].mean(), # R mean
image[:,:,1].mean(), # G mean
image[:,:,2].mean(), # B mean
image[:,:,0].std(), # R std
image[:,:,1].std(), # G std
image[:,:,2].std() # B std
])Why: Color statistics capture overall image brightness and color balance
def _extract_texture_glcm(self, image: np.ndarray) -> np.ndarray:
"""Extract GLCM texture features."""Returns: Array with 4-20 features (depending on angles/distances)
GLCM (Gray-Level Co-occurrence Matrix):
- Measures spatial relationships of pixel values
- Computed for 4 directions: 0°, 45°, 90°, 135°
Features per angle:
-
Contrast: Measure of local variations
- Formula: Σ(i-j)² × P(i,j)
- High = many sharp transitions
-
Correlation: Pixel dependency
- Formula: Σ(i-m_x)(j-m_y)P(i,j) / σ_x σ_y
- High = strong dependencies
-
Energy: Texture uniformity
- Formula: Σ P(i,j)²
- High = uniform texture
-
Homogeneity: Local homogeneity
- Formula: Σ P(i,j) / (1 + (i-j)²)
- High = homogeneous texture
Logic:
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
glcm = graycomatrix(gray, distances=[1,2], angles=[0, π/4, π/2, 3π/4])
features = []
for prop in ['contrast', 'correlation', 'energy', 'homogeneity']:
for d in distances:
for a in angles:
features.append(graycoprops(glcm, prop)[d,a])
return np.array(features)Why: GLCM captures texture quality critical for DR classification
def _extract_texture_lbp(self, image: np.ndarray) -> np.ndarray:
"""Extract LBP histogram features."""Returns: Array with 59 features (uniform LBP-8,1 histogram bins)
LBP (Local Binary Pattern):
- Encodes local texture by comparing pixel to neighbors
- Radius=1, n_points=8 (LBP-8,1)
- Produces 59 uniform patterns
Logic:
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
lbp = local_binary_pattern(gray, n_points=8, radius=1, method='uniform')
hist, _ = np.histogram(lbp.ravel(), bins=59, range=(0, 59))
return hist.astype(float) # Normalize if neededWhy: LBP is efficient, rotationally invariant texture descriptor
def _extract_vessel_features(self, image: np.ndarray) -> np.ndarray:
"""Extract vessel segmentation features."""Returns: Array with 5 features
Features:
- Vessel density: Mean vesselness value
- Vessel mean thickness: Statistical measure
- Vessel branching: Local maxima count 4-5. Supplementary vessel metrics
Vessel Detection Method:
- Frangi vesselness filter (multiscale)
- Detects ridge-like structures (vessels)
- Outputs probability map (0-1)
Logic:
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
vesselness = frangi_vesselness_filter(gray, sigma_range=[1, 10])
density = vesselness.mean()
thickness = np.percentile(vesselness, 75) - np.percentile(vesselness, 25)
branching = count_local_maxima(vesselness)
return np.array([density, thickness, branching, ...])Why: Blood vessel patterns important for DR staging
Challenge: Vessels hard to detect reliably in low-quality images
def _extract_lesion_features(self, image: np.ndarray) -> np.ndarray:
"""Extract lesion detection features."""Returns: Array with 7 features
Lesion Types:
Bright Lesions (Exudates):
- Yellow/white deposits
- Hard exudates indicate advanced DR
- Detected: green channel > 95th percentile
Dark Lesions (Hemorrhages):
- Red/dark spots
- Indicate vascular damage
- Detected: green channel < 5th percentile
Features:
- Bright lesion count
- Bright lesion total area
- Bright lesion concentration (central vs peripheral)
- Dark lesion count
- Dark lesion total area
- Dark lesion concentration
- Lesion severity estimate
Logic:
green = image[:,:,1] if len(image.shape)==3 else image
bright_threshold = np.percentile(green, 95)
dark_threshold = np.percentile(green, 5)
bright_mask = green > bright_threshold
dark_mask = green < dark_threshold
bright_count = cv2.countNonZero(bright_mask.astype(uint8))
dark_count = cv2.countNonZero(dark_mask.astype(uint8))
# ... more metricsWhy: Lesion presence crucial for DR diagnosis
Challenge: Threshold-dependent, noise-sensitive
def _extract_statistical_features(self, image: np.ndarray) -> np.ndarray:
"""Extract statistical features."""Returns: Array with 3-5 features
Features:
-
Entropy: Image randomness/information content
- Formula: -Σ P(x) * log2(P(x))
- High = noisy image; Low = uniform
-
Skewness: Histogram asymmetry
- Positive = bright tail; Negative = dark tail
-
Kurtosis: Distribution peakedness
- High = sharp peak (concentrated values)
- Low = flat distribution
Logic:
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) if len(image.shape)==3 else image
pixels = gray.ravel().astype(float)
entropy = -np.sum(p * np.log2(p + 1e-10) for p in np.histogram(pixels, 256)[0]/len(pixels))
skewness_val = calculate_skewness(pixels)
kurtosis_val = calculate_kurtosis(pixels)
return np.array([entropy, skewness_val, kurtosis_val])Why: Statistical properties capture image quality and distribution
def get_feature_names(self) -> List[str]:
"""Get list of feature names."""Returns: List of human-readable feature names
Example:
names = extractor.get_feature_names()
# ['color_r_mean', 'color_g_mean', 'color_b_mean',
# 'glcm_contrast_0', 'glcm_contrast_45', ...,
# 'lbp_hist_0', 'lbp_hist_1', ...,
# 'vessel_density', 'lesion_bright_count', ...]Purpose: Map feature indices to names for interpretability
File: pipeline/classification.py
Trains and evaluates Random Forest classifier for DR severity prediction with hyperparameter tuning support.
Random Forest classification engine.
Methods:
def __init__(self, model_type: str = 'rf', random_state: int = 42):
"""Initialize classifier."""Parameters:
model_type(str): 'rf' for Random Forest (currently only option)random_state(int): Seed for reproducibility
Logic: Initialize RandomForestClassifier with default parameters
def train(self, X_train: np.ndarray, y_train: np.ndarray,
X_val: np.ndarray = None, y_val: np.ndarray = None,
tune_hyperparameters: bool = False, **kwargs):
"""Train classifier."""Parameters:
X_train(ndarray): Training features (n_samples, n_features)y_train(ndarray): Training labels (n_samples,)X_val(ndarray): Validation features (optional)y_val(ndarray): Validation labels (optional)tune_hyperparameters(bool): Use GridSearchCV**kwargs: Additional RF parameters (n_estimators, max_depth, class_weight, etc.)
Logic without tuning:
rf = RandomForestClassifier(
n_estimators=kwargs.get('n_estimators', 200),
max_depth=kwargs.get('max_depth', None),
class_weight='balanced',
random_state=42
)
rf.fit(X_train, y_train)
self.model = rfLogic with tuning (GridSearchCV):
param_grid = {
'n_estimators': [100, 200, 300, 500],
'max_depth': [10, 20, 30, None],
'class_weight': ['balanced', 'balanced_subsample']
}
cv_splits = StratifiedKFold(n_splits=5)
grid = GridSearchCV(rf, param_grid, cv=cv_splits, scoring='f1_weighted')
grid.fit(X_train, y_train)
self.model = grid.best_estimator_Default RF Parameters:
n_estimators: 200 (number of trees)max_depth: None (grow fully)class_weight: 'balanced' (handle imbalance)n_jobs: -1 (use all cores)random_state: 42
Why GridSearchCV: Automatically finds best hyperparameters using cross-validation
def predict(self, X: np.ndarray) -> np.ndarray:
"""Make predictions."""Parameters:
X(ndarray): Features (n_samples, n_features)
Returns: Predicted class labels (n_samples,)
Logic:
predictions = self.model.predict(X)
return predictions.astype(int)def predict_proba(self, X: np.ndarray) -> np.ndarray:
"""Get prediction probabilities."""Parameters:
X(ndarray): Features
Returns: Probability matrix (n_samples, n_classes)
Example:
probs = clf.predict_proba(X_test)
# probs[0]: [0.1, 0.2, 0.3, 0.25, 0.15] (probabilities for each class)def get_feature_importance(self) -> np.ndarray:
"""Get feature importance scores."""Returns: Feature importance array (n_features,)
Logic:
return self.model.feature_importances_Importance Calculation:
- From all RF trees
- Normalized: sum to 1.0
- Higher = more important for prediction
Example:
importance = clf.get_feature_importance()
# importance[0]: 0.043 (feature 0 is 4.3% important)def save_model(self, filepath: str):
"""Save trained model to disk."""Parameters:
filepath(str): Path to save model pickle
Logic:
import joblib
joblib.dump(self.model, filepath)def load_model(self, filepath: str):
"""Load trained model from disk."""Parameters:
filepath(str): Path to model pickle
Logic:
self.model = joblib.load(filepath)File: pipeline/evaluation.py
Computes comprehensive classification metrics and generates visualizations for model evaluation.
Static utility class for evaluation.
Static Methods:
@staticmethod
def compute_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> Dict:
"""Compute classification metrics."""Parameters:
y_true(ndarray): True labelsy_pred(ndarray): Predicted labels
Returns: Dictionary with 6+ metrics
Metrics Computed:
-
Accuracy: Proportion of correct predictions
accuracy = np.mean(y_true == y_pred)
-
F1 (Macro): Unweighted average of F1 scores
f1_macro = f1_score(y_true, y_pred, average='macro')
- Better for imbalanced data
- Treats all classes equally
-
F1 (Weighted): Weighted by class frequency
f1_weighted = f1_score(y_true, y_pred, average='weighted')
-
Quadratic Weighted Kappa (QWK): Ordinal agreement metric
qwk = cohen_kappa_score(y_true, y_pred, weights='quadratic')
- Range: [-1, 1]
- 1.0 = perfect agreement
- 0.0 = random guessing
- Penalizes off-by-one errors less than off-by-many
- Primary metric for DR classification
-
Confusion Matrix: Detailed per-class breakdown
cm = confusion_matrix(y_true, y_pred)
-
Per-class Metrics: Precision, recall, F1 for each class
precision, recall, f1, support = precision_recall_fscore_support(y_true, y_pred)
Returns:
{
'accuracy': 0.75,
'f1_macro': 0.45,
'f1_weighted': 0.62,
'qwk': 0.58,
'confusion_matrix': array(...),
'per_class_precision': array([...]),
'per_class_recall': array([...]),
'per_class_f1': array([...]),
'per_class_support': array([...])
}Example:
y_true = np.array([0, 1, 2, 1, 0])
y_pred = np.array([0, 1, 2, 2, 0])
metrics = Evaluator.compute_metrics(y_true, y_pred)
print(f"Accuracy: {metrics['accuracy']:.3f}") # 0.800
print(f"QWK: {metrics['qwk']:.3f}") # 0.583@staticmethod
def print_metrics(metrics: Dict, set_name: str = "Test Set"):
"""Print metrics in readable format."""Parameters:
metrics(dict): Output from compute_metrics()set_name(str): Name of dataset (e.g., "Training", "Validation")
Output:
===========================================
Test Set Metrics
===========================================
Accuracy: 0.7500
F1 (macro): 0.4500
F1 (weighted): 0.6200
QWK: 0.5800
Per-class Performance:
Class 0: Precision=0.80, Recall=0.85, F1=0.82
Class 1: Precision=0.50, Recall=0.45, F1=0.47
Class 2: Precision=0.40, Recall=0.30, F1=0.35
...
===========================================
@staticmethod
def plot_confusion_matrix(y_true: np.ndarray, y_pred: np.ndarray, classes=None):
"""Plot confusion matrix heatmap."""Parameters:
y_true(ndarray): True labelsy_pred(ndarray): Predicted labelsclasses(list): Class name labels
Output: Matplotlib figure with heatmap
Visualization:
- Rows: True classes
- Columns: Predicted classes
- Color intensity: Count of predictions
- Diagonal: Correct predictions
Purpose: Visualize where model makes mistakes
File: pipeline/augmentation.py
Implements image augmentation techniques (rotation, brightness, contrast, flips) to expand training data and improve model robustness.
Configuration for augmentation parameters.
Attributes:
rotation_range_min(int): Min rotation degrees (default: -15)rotation_range_max(int): Max rotation degrees (default: 15)brightness_min(float): Min brightness multiplier (default: 0.8)brightness_max(float): Max brightness multiplier (default: 1.2)contrast_min(float): Min contrast multiplier (default: 0.8)contrast_max(float): Max contrast multiplier (default: 1.2)horizontal_flip(bool): Enable horizontal flipping (default: True)vertical_flip(bool): Enable vertical flipping (default: True)
Main augmentation engine.
Methods:
def __init__(self, config: AugmentationConfig, seed: int = 42):
"""Initialize augmenter."""Parameters:
config: AugmentationConfig instanceseed: Random seed for reproducibility
def augment_single_image(self, image: np.ndarray) -> np.ndarray:
"""Apply random augmentation to single image."""Parameters:
image(ndarray): Input image (256×256 or 256×256×3)
Returns: Augmented image (same shape)
Augmentations Applied Randomly:
-
Rotation (if config.rotation_range_min != 0):
angle = np.random.uniform(rotation_range_min, rotation_range_max) rotated = cv2.warpAffine(image, cv2.getRotationMatrix2D(center, angle, 1.0), size)
- Default: -15 to +15 degrees
- Why: Vessels can be at any angle
-
Brightness (if brightness_min != 1.0):
factor = np.random.uniform(brightness_min, brightness_max) brightened = np.clip(image.astype(float) * factor, 0, 255).astype(uint8)
- Default: 0.8x to 1.2x
- Why: Imager lighting variation
-
Contrast (if contrast_min != 1.0):
factor = np.random.uniform(contrast_min, contrast_max) contrasted = np.clip((image - 128) * factor + 128, 0, 255).astype(uint8)
- Default: 0.8x to 1.2x
- Why: Equipment sensitivity variation
-
Horizontal Flip (if config.horizontal_flip):
if np.random.rand() > 0.5: flipped = cv2.flip(image, 1) # Flip along vertical axis
- Why: Left-right eye symmetry
-
Vertical Flip (if config.vertical_flip):
if np.random.rand() > 0.5: flipped = cv2.flip(image, 0) # Flip along horizontal axis
- Why: Superior-inferior symmetry
Logic:
result = image.copy()
# Apply random augmentations
if chance: result = rotate(result)
if chance: result = change_brightness(result)
if chance: result = change_contrast(result)
if chance: result = flip_h(result)
if chance: result = flip_v(result)
return resultWhy Random: Each augmentation applied probabilistically, ensuring diversity
def augment_batch(self, images: List[np.ndarray], num_variants: int = 2) -> List[np.ndarray]:
"""Create augmented variants for batch of images."""Parameters:
images(list): Original imagesnum_variants(int): Variants per image (e.g., 2 = 2x dataset)
Returns: Original + augmented images combined
Logic:
augmented = list(images) # Start with originals
for img in images:
for _ in range(num_variants - 1): # num_variants includes original
augmented_img = self.augment_single_image(img)
augmented.append(augmented_img)
return augmentedExample:
augmenter = ImageAugmenter(config)
original_images = [img1, img2, img3] # 3 images
augmented = augmenter.augment_batch(original_images, num_variants=3)
# Result: 9 images (3 original + 6 augmented)Impact on Training:
- 2x augmentation: +200% training data
- 3x augmentation: +300% training data
- Improves generalization especially on small imbalanced datasets
Dataset wrapper with on-the-fly augmentation.
Methods:
def __init__(self, images: List[np.ndarray], labels: np.ndarray,
augmenter: ImageAugmenter, num_variants: int = 2):
"""Initialize augmented dataset."""Parameters:
images: Original imageslabels: Original labelsaugmenter: ImageAugmenter instancenum_variants: Augmentations per image
def __len__(self) -> int:
"""Return total dataset size including augmentations."""Returns: len(images) * num_variants
def __getitem__(self, idx: int) -> Tuple[np.ndarray, int]:
"""Get augmented image-label pair."""Returns: (augmented_image, label)
Purpose: Enables efficient batched augmentation during training
File: data/api_loader.py
Provides multiple data source APIs (Mock, Kaggle, Hugging Face) for loading DR images without pre-downloading entire dataset.
Interface for data sources.
Abstract Methods:
@abstractmethod
def get_image(self, image_id: str) -> np.ndarray:
"""Get image by ID. Returns numpy array (BGR format)."""@abstractmethod
def get_labels(self) -> pd.DataFrame:
"""Get labels DataFrame with columns: id_code, diagnosis."""@abstractmethod
def get_available_ids() -> List[str]:
"""Get list of available image IDs."""Generates synthetic retinal images on-the-fly for testing.
Methods:
def __init__(self, num_samples: int = 100):
"""Initialize mock data source."""Parameters:
num_samples(int): Number of synthetic images to generate
Logic:
- Creates mock labels with stochastic class distribution
- Generates deterministic samples (same seed = same images)
def get_image(self, image_id: str) -> np.ndarray:
"""Generate synthetic retinal image."""Returns: 512×512×3 BGR array with synthetic features:
- Reddish background (simulates retina)
- Bright circular optic disc
- Dark lines (blood vessels)
- Gaussian noise
Why: Fast testing without downloads, deterministic
Example:
api = MockDataAPI(num_samples=100)
img = api.get_image("mock_00001")
# img.shape: (512, 512, 3)
# Unique synthetic image each call (but deterministic with seed)def get_labels(self) -> pd.DataFrame:
"""Get mock labels."""Returns: DataFrame with columns [id_code, diagnosis]
- id_code: "mock_00000" to "mock_00999"
- diagnosis: 0-4 randomly sampled
def get_available_ids() -> List[str]:
"""Get available image IDs."""Returns: List of mock_* IDs
Downloads real APTOS 2019 dataset from Kaggle.
Setup Required:
pip install kaggle
# Get credentials from https://www.kaggle.com/settings/account
mkdir -p ~/.kaggle
cp kaggle.json ~/.kaggle/
chmod 600 ~/.kaggle/kaggle.jsonMethods:
def __init__(self, dataset_name: str = "aravind-krishnan/aptos-2019-blindness-detection"):
"""Initialize Kaggle data source."""Logic:
- Loads Kaggle API credentials from
~/.kaggle/kaggle.json - Authenticates with Kaggle
- Sets cache directory to
~/.cache/kaggle_aptos/
Error Handling: Graceful fallback if Kaggle not installed or credentials missing
def get_labels(self) -> pd.DataFrame:
"""Download and cache labels."""Logic:
- Check if
train.csvexists in cache - If not, download full dataset from Kaggle
- Cache locally for future runs
- Return DataFrame
First Run: ~30 minutes (downloads 8GB)
Subsequent Runs: <1 second (uses cache)
def get_image(self, image_id: str) -> np.ndarray:
"""Get image from cache or download from Kaggle."""Logic:
- Check cache:
~/.cache/kaggle_aptos/train_images/{image_id}.png - If not in cache, trigger full download
- Read image with cv2.imread()
- Return BGR array
Streams APTOS 2019 from Hugging Face Hub with minimal storage.
Setup Required:
pip install datasetsMethods:
def __init__(self, dataset_name: str = ...):
"""Initialize Hugging Face data source."""Logic: Lazy loads dataset on first access
def _ensure_loaded(self):
"""Load dataset if not already loaded."""Purpose: Cache dataset in memory after first access
def get_labels() -> pd.DataFrame:
"""Get labels from Hugging Face dataset."""Logic:
- Load dataset
- Iterate through samples
- Extract id_code and diagnosis
- Return DataFrame
def get_image(self, image_id: str) -> np.ndarray:
"""Get image from Hugging Face dataset."""Logic:
- Load dataset
- Search for image_id in samples
- Convert PIL image to numpy array
- Convert RGB → BGR (for cv2 consistency)
- Return
Storage: ~100MB cache vs 8GB for Kaggle
Unified interface for all data sources.
Methods:
def __init__(self, api: DataSourceAPI):
"""Initialize loader."""Parameters: Any DataSourceAPI implementation
def load_labels() -> pd.DataFrame:
"""Load labels with caching."""Purpose: Cache-aware label loading
def load_image(self, image_id: str) -> np.ndarray:
"""Load single image."""def load_images_batch(self, image_ids: List[str]) -> np.ndarray:
"""Load multiple images."""Returns: Array (n_images, H, W, 3) with built-in error handling
Logic:
images = []
for img_id in image_ids:
try:
img = self.load_image(img_id)
images.append(img)
except Exception as e:
print(f"Failed to load {img_id}: {e}")
# Continue with remaining images
return np.array(images)Purpose: Robust batch loading that skips problematic images
def get_class_distribution() -> Dict:
"""Get class distribution."""Returns: {class_label: count}
def create_data_loader(source: str = "mock", **kwargs) -> APIDataLoader:
"""Factory function to create data loader."""Parameters:
source(str): "mock" | "kaggle" | "huggingface"**kwargs: Arguments for specific API
Returns: APIDataLoader instance
Logic:
if source == "mock":
api = MockDataAPI(**kwargs)
elif source == "kaggle":
api = KaggleAPISource(**kwargs)
elif source == "huggingface":
api = HuggingFaceDatasetAPI(**kwargs)
else:
raise ValueError(f"Unknown source: {source}")
return APIDataLoader(api)Example:
# Mock (instant)
loader = create_data_loader("mock", num_samples=300)
# Real data (downloads on first run)
loader = create_data_loader("kaggle")
# Hugging Face
loader = create_data_loader("huggingface")File: experiments/api_pipeline.py
Purpose: End-to-end DR classification pipeline using API-based data loading.
Main Function: run_api_based_pipeline(source="mock", test_mode=False, num_samples=None)
Steps:
- Create data loader (mock/kaggle/huggingface)
- Load labels and create train/val split
- Preprocess images (crop, CLAHE, resize)
- Extract 72 features
- Train Random Forest
- Evaluate on validation set
- Save results (metrics CSV, feature importance)
Configuration:
- Reads all DR_* environment variables
- Command-line args override env vars
- Command-line args override defaults
Output Files:
api_metrics.csv: Accuracy, F1, QWKapi_feature_importance.csv: Feature rankings
Usage:
python experiments/api_pipeline.py --source mock --test
python experiments/api_pipeline.py --source kaggle
DR_USE_AUGMENTATION=true python experiments/api_pipeline.pyFile: experiments/ablation_study.py
Purpose: Systematic ablation study with 15+ experiments.
Experiment Types:
-
Feature removal (5 experiments)
- No color, no GLCM, no LBP, no vessels, no lesions
-
Preprocessing variants (3 experiments)
- With/without CLAHE, with/without cropping, RGB vs green
-
Feature combinations (4+ experiments)
- Texture only, color+texture, all features
Output: Ablation report CSV with columns:
- Experiment name
- Features used
- Accuracy
- F1 (macro)
- QWK
- Change vs baseline (%)
Key Function: AblationStudy class orchestrates full study
File: experiments/optimize_with_augmentation.py
Purpose: Compare baseline vs augmented performance.
Steps:
- Train model without augmentation
- Train model with 3x augmentation
- Compare metrics
- Generate visualizations
- Calculate improvement percentages
Output:
- Metrics CSV comparing both models
- 4 visualization plots
- Text summary of improvements
File: experiments/visualize_results.py
Purpose: Generate comprehensive visualization plots.
Plots Generated (11 types):
- Feature importance (all features)
- Feature importance (top 20)
- Feature importance by category
- Ablation study QWK ranking
- Ablation study impact analysis
- Baseline vs augmented metrics
- Confusion matrices (train, val, test)
- Per-class performance bars
- Feature correlation heatmap
- Model decision boundaries (if applicable)
- Training curves (if available)
Output: Saved as PNG files in results directory
| Module | File | Functions | Purpose |
|---|---|---|---|
| Config | config.py | from_env(), to_dict(), print_config(), get_config(), reset_config() | Environment variable configuration |
| Data Loader | data/loader.py | load_data(), create_train_val_split(), get_class_distribution() | Load images and labels |
| Preprocessing | pipeline/preprocessing.py | PreprocessingPipeline.process(), _circular_crop(), _apply_clahe() | Image normalization and enhancement |
| Features | pipeline/feature_extraction.py | FeatureExtractor.extract(), _extract_color/texture/vessel/lesion() | Extract 72 hand-crafted features |
| Classification | pipeline/classification.py | DRClassifier.train(), predict(), get_feature_importance() | Random Forest training and prediction |
| Evaluation | pipeline/evaluation.py | Evaluator.compute_metrics(), print_metrics(), plot_confusion_matrix() | Evaluation metrics and visualization |
| Augmentation | pipeline/augmentation.py | ImageAugmenter.augment_single_image(), augment_batch() | Data augmentation |
| API Loader | data/api_loader.py | MockDataAPI, KaggleAPISource, HuggingFaceDatasetAPI, APIDataLoader | Multiple data source support |
| Scripts | experiments/*.py | run_api_based_pipeline(), AblationStudy, results visualization | Pipeline orchestration |
raw_image.png
↓
[Preprocessing] → crops, resizes, applies CLAHE
↓
256×256 normalized image
↓
[Feature Extraction] → extracts 72 features
↓
Feature vector (72,)
↓
[Classification] → Random Forest prediction
↓
Predicted class (0-4): dr_severity
↓
[Evaluation] → Compute metrics vs ground truth
↓
QWK, Accuracy, F1, confusion matrix
Typical Execution Times:
- Single image preprocessing: 50ms
- Single image feature extraction: 150ms
- Batch processing 100 images: ~30 seconds
- Training RF on 2500 samples: 2-5 seconds
- Full pipeline (mock data, 500 samples): 3-5 minutes
- Full pipeline (Kaggle first run): 30+ minutes (includes download)
- Full pipeline (Kaggle cached): 10-15 minutes
Memory Usage:
- Single image: ~10 MB (raw) → ~1 MB (processed)
- 500 images in memory: ~500 MB
- 3600 images in memory: ~3.5 GB
- RF model (72 features, 200 trees): ~50 MB
Optimization Techniques Used:
- On-demand image loading (not all-in-memory)
- Caching for API-based sources (Kaggle, HF)
- Vectorized feature extraction (numpy)
- Parallel RF training (n_jobs=-1)
- Classical ML over Deep Learning: Interpretability + efficiency for ~3600 samples
- Hand-crafted Features: Domain knowledge embedded; easy to ablate
- Modular Architecture: Each component (preprocess, features, classify) independent
- Configuration-driven: All parameters via environment variables
- Multiple Data Sources: Support mock/Kaggle/HF for flexibility
- Ordinal Metric (QWK): Respects severity ordering (off-by-one less bad than off-by-four)
- Stratified Splitting: Preserve class distribution in train/val/test
- Deep Learning: CNN backbone for automatic feature learning
- Ensemble Methods: Combine RF with SVM, Gradient Boosting
- Active Learning: Query most uncertain samples for labeling
- Class Imbalance: SMOTE, cost-sensitive learning
- Uncertainty Quantification: Bayesian RF, Monte Carlo dropout
- Model Interpretability: LIME, SHAP for local explanations
- Production Deployment: REST API, Docker containerization, monitoring
Documentation Complete!
This covers every function, class, and module in the DR Classification Pipeline with detailed explanations, examples, and design rationale.