Python API Reference¶
PyForge CLI v1.0.9 provides a comprehensive Python API for programmatic access to all conversion functionality, including specialized support for Databricks environments.
Core Classes¶
BaseConverter¶
The foundation class for all format converters.
from pyforge_cli.converters.base import BaseConverter
from pathlib import Path
from typing import Any, Dict, Optional
class BaseConverter(ABC):
    """Abstract base class for all format converters."""
    def __init__(self):
        self.supported_inputs: set = set()
        self.supported_outputs: set = set()
    def convert(self, input_path: Path, output_path: Path, **options: Any) -> bool:
        """Convert input file to output format."""
        pass
    def validate_input(self, input_path: Path) -> bool:
        """Validate if input file can be processed."""
        pass
    def get_output_extension(self, output_format: str) -> str:
        """Get appropriate file extension for output format."""
        pass
    def get_metadata(self, input_path: Path) -> Optional[Dict[str, Any]]:
        """Extract metadata from input file."""
        pass
ConverterRegistry¶
Manages and provides access to all registered converters.
from pyforge_cli.plugins.registry import ConverterRegistry
from pathlib import Path
from typing import Optional
class ConverterRegistry:
    """Registry for managing format converters."""
    def register(self, name: str, converter_class: Type[BaseConverter]) -> None:
        """Register a converter class."""
        pass
    def get_converter(self, input_path: Path) -> Optional[BaseConverter]:
        """Get appropriate converter for input file."""
        pass
    def list_converters(self) -> Dict[str, Type[BaseConverter]]:
        """List all registered converters."""
        pass
Database Backend API¶
DatabaseBackend¶
Abstract base class for database connection backends.
from pyforge_cli.backends.base import DatabaseBackend
from typing import List
import pandas as pd
class DatabaseBackend(ABC):
    """Abstract base class for database connection backends."""
    def is_available(self) -> bool:
        """Check if backend dependencies are available."""
        pass
    def connect(self, db_path: str, password: str = None) -> bool:
        """Connect to database."""
        pass
    def list_tables(self) -> List[str]:
        """List all readable tables in the database."""
        pass
    def read_table(self, table_name: str) -> pd.DataFrame:
        """Read table data as DataFrame."""
        pass
    def close(self):
        """Close database connection and cleanup resources."""
        pass
UCanAccessSubprocessBackend¶
New in v1.0.9: Subprocess-based backend for Databricks Serverless compatibility.
from pyforge_cli.backends.ucanaccess_subprocess_backend import UCanAccessSubprocessBackend
from pathlib import Path
from typing import List
import pandas as pd
class UCanAccessSubprocessBackend(DatabaseBackend):
    """UCanAccess backend using subprocess for Databricks Serverless compatibility."""
    def __init__(self):
        """Initialize subprocess backend."""
        self.logger = logging.getLogger(__name__)
        self.jar_manager = UCanAccessJARManager()
        self.db_path = None
        self._temp_file_path = None
        self._tables_cache = None
    def is_available(self) -> bool:
        """Check if subprocess UCanAccess backend is available.
        Returns:
            True if Java is available via subprocess, False otherwise
        """
        pass
    def connect(self, db_path: str, password: str = None) -> bool:
        """Connect to Access database (prepare for subprocess operations).
        Args:
            db_path: Path to Access database file (supports Unity Catalog volumes)
            password: Optional password (not supported in subprocess mode)
        Returns:
            True if file is accessible, False otherwise
        """
        pass
    def list_tables(self) -> List[str]:
        """List all user tables using Java subprocess.
        Returns:
            List of table names, or empty list on error
        """
        pass
    def read_table(self, table_name: str) -> pd.DataFrame:
        """Read table data using Java subprocess to export to CSV.
        Args:
            table_name: Name of table to read
        Returns:
            DataFrame containing table data
        """
        pass
    def get_connection_info(self) -> dict:
        """Get information about the current connection.
        Returns:
            Dictionary with connection information
        """
        pass
Databricks Environment API¶
DatabricksEnvironment¶
New in v1.0.9: Comprehensive Databricks environment detection and management.
from pyforge_cli.databricks.environment import DatabricksEnvironment, detect_databricks_environment
from typing import Any, Dict, Optional
class DatabricksEnvironment:
    """Class representing a Databricks environment."""
    def __init__(self, env_vars: Dict[str, str], is_databricks: bool = False, version: str = None):
        """Initialize Databricks environment.
        Args:
            env_vars: Environment variables
            is_databricks: Whether running in Databricks
            version: Databricks runtime version
        """
        pass
    @property
    def is_databricks(self) -> bool:
        """Check if running in Databricks environment."""
        pass
    @property
    def version(self) -> Optional[str]:
        """Get Databricks runtime version."""
        pass
    def is_serverless(self) -> bool:
        """Check if running in Databricks serverless environment."""
        pass
    @property
    def cluster_id(self) -> Optional[str]:
        """Get Databricks cluster ID."""
        pass
    @property
    def workspace_id(self) -> Optional[str]:
        """Get Databricks workspace ID."""
        pass
    def get_environment_info(self) -> Dict[str, Any]:
        """Get comprehensive environment information."""
        pass
# Utility functions
def detect_databricks_environment() -> DatabricksEnvironment:
    """Detect if running in Databricks environment."""
    pass
def is_running_in_databricks() -> bool:
    """Simple check if running in Databricks environment."""
    pass
def is_running_in_serverless() -> bool:
    """Check if running in Databricks serverless environment."""
    pass
Unity Catalog Volume Path Utilities¶
New in v1.0.9: Built-in support for Unity Catalog volume paths.
# Unity Catalog volume path detection
def is_unity_catalog_path(path: str) -> bool:
    """Check if path is a Unity Catalog volume path.
    Args:
        path: File path to check
    Returns:
        True if path starts with /Volumes/
    """
    return path.startswith("/Volumes/")
# Example usage in UCanAccessSubprocessBackend
if db_path.startswith("/Volumes/"):
    # Handle Unity Catalog volume paths
    # Copy to local storage for Java access
    import tempfile
    temp_dir = tempfile.gettempdir()
    file_name = os.path.basename(db_path)
    local_path = os.path.join(temp_dir, f"pyforge_{os.getpid()}_{file_name}")
    # Copy from volume to local storage
    copy_cmd = f"cp '{db_path}' '{local_path}'"
    result = subprocess.run(copy_cmd, shell=True, capture_output=True, text=True, timeout=60)
Dependency Checker API¶
Utility for checking system dependencies.
from pyforge_cli.utils.dependency_checker import DependencyChecker
from typing import Tuple, Optional
class DependencyChecker:
    """Check and validate dependencies for database backends."""
    def check_java(self) -> Tuple[bool, Optional[str]]:
        """Check if Java runtime is available.
        Returns:
            Tuple of (is_available, version_string)
        """
        pass
    def check_jaydebeapi(self) -> Tuple[bool, Optional[str]]:
        """Check if JayDeBeApi is available.
        Returns:
            Tuple of (is_available, version_string)
        """
        pass
Usage Examples¶
Basic Conversion¶
from pyforge_cli.plugins.registry import ConverterRegistry
from pyforge_cli.plugins.loader import load_plugins
from pathlib import Path
# Load all plugins
load_plugins()
# Get registry
registry = ConverterRegistry()
# Convert a file
input_path = Path("data.xlsx")
output_path = Path("data.parquet")
converter = registry.get_converter(input_path)
if converter:
    success = converter.convert(input_path, output_path)
    print(f"Conversion {'successful' if success else 'failed'}")
Databricks Environment Detection¶
from pyforge_cli.databricks.environment import detect_databricks_environment
# Detect environment
env = detect_databricks_environment()
if env.is_databricks:
    print(f"Running in Databricks {env.version}")
    if env.is_serverless():
        print("Serverless environment detected")
        # Use subprocess backend for MDB files
        from pyforge_cli.backends.ucanaccess_subprocess_backend import UCanAccessSubprocessBackend
        backend = UCanAccessSubprocessBackend()
    else:
        print("Classic cluster environment")
else:
    print("Not running in Databricks")
Unity Catalog Volume File Processing¶
from pyforge_cli.backends.ucanaccess_subprocess_backend import UCanAccessSubprocessBackend
from pathlib import Path
# Process file from Unity Catalog volume
volume_path = "/Volumes/catalog/schema/volume/database.mdb"
backend = UCanAccessSubprocessBackend()
if backend.is_available():
    # Connect handles volume path automatically
    if backend.connect(volume_path):
        tables = backend.list_tables()
        print(f"Found tables: {tables}")
        # Read table data
        if tables:
            df = backend.read_table(tables[0])
            print(f"Read {len(df)} rows from {tables[0]}")
        backend.close()
Databricks Serverless Integration¶
from pyforge_cli.converters.enhanced_mdb_converter import EnhancedMDBConverter
from pyforge_cli.databricks.environment import is_running_in_serverless
from pathlib import Path
# Enhanced MDB conversion in Databricks Serverless
if is_running_in_serverless():
    # Install required packages with proper index URL
    import subprocess
    cmd = [
        "pip", "install", 
        "/Volumes/catalog/schema/volume/pyforge-cli-1.0.9-py3-none-any.whl",
        "--no-cache-dir", "--quiet",
        "--index-url", "https://pypi.org/simple/",
        "--trusted-host", "pypi.org"
    ]
    subprocess.run(cmd, check=True)
    # Use enhanced converter with subprocess backend
    converter = EnhancedMDBConverter()
    # Convert Unity Catalog volume file
    volume_mdb = Path("/Volumes/catalog/schema/volume/database.mdb")
    output_dir = Path("/Volumes/catalog/schema/volume/output/")
    success = converter.convert(volume_mdb, output_dir)
    print(f"Conversion {'successful' if success else 'failed'}")
Error Handling and Logging¶
import logging
from pyforge_cli.backends.ucanaccess_subprocess_backend import UCanAccessSubprocessBackend
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Use backend with error handling
backend = UCanAccessSubprocessBackend()
try:
    if not backend.is_available():
        logger.error("Backend not available")
        exit(1)
    if not backend.connect("/path/to/database.mdb"):
        logger.error("Failed to connect to database")
        exit(1)
    # Get connection info
    info = backend.get_connection_info()
    logger.info(f"Connected using: {info['method']}")
    # Process tables
    tables = backend.list_tables()
    for table in tables:
        try:
            df = backend.read_table(table)
            logger.info(f"Read {len(df)} rows from {table}")
        except Exception as e:
            logger.error(f"Failed to read table {table}: {e}")
except Exception as e:
    logger.error(f"Error: {e}")
finally:
    backend.close()
Class Hierarchy¶
BaseConverter
├── CSVConverter
├── ExcelConverter
├── PDFConverter
├── XMLConverter
├── DBFConverter
└── MDBConverter
    └── EnhancedMDBConverter
DatabaseBackend
├── PyODBCBackend
├── UCanAccessBackend
└── UCanAccessSubprocessBackend (v1.0.9)
DatabricksEnvironment (v1.0.9)
├── Environment detection
├── Serverless identification
└── Runtime version parsing
Version 1.0.9 New Features¶
- Subprocess Backend: 
UCanAccessSubprocessBackendfor Databricks Serverless compatibility - Environment Detection: 
DatabricksEnvironmentclass with comprehensive detection - Unity Catalog Support: Built-in volume path handling for 
/Volumes/paths - Serverless Optimization: Automatic environment detection and backend selection
 - Enhanced Error Handling: Improved logging and error reporting
 - JAR Management: Automatic JAR file discovery and management
 
Error Handling¶
All API methods use proper exception handling:
try:
    converter = registry.get_converter(input_path)
    if converter:
        success = converter.convert(input_path, output_path)
    else:
        raise ValueError(f"No converter found for {input_path}")
except Exception as e:
    logger.error(f"Conversion failed: {e}")
    raise
Next Steps¶
- CLI Reference - Complete command documentation
 - Converters - Format-specific conversion guides
 - Tutorials - Real-world examples
 - Databricks Integration - Databricks-specific features