"""Nextflow Configuration Management.
This module handles configuration for Nextflow workflows, including
Docker settings, resource management, and parameter validation.
Simple Usage Examples:
# Auto-configure based on environment (easiest)
config = NextflowConfig.auto_configure()
# Production settings
config = NextflowConfig.for_production()
# Testing settings
config = NextflowConfig.for_testing()
# Local Docker testing (uses local image built by 'make docker')
config = NextflowConfig.for_local_docker_testing()
"""
import math
import os
import shutil
import subprocess # nosec B404
from pathlib import Path
from typing import Any
from pydantic import BaseModel, Field
from sirnaforge.utils.logging_utils import get_logger
logger = get_logger(__name__)
DEFAULT_SIRNAFORGE_DOCKER_IMAGE = "ghcr.io/austin-s-h/sirnaforge:latest"
[docs]
class EnvironmentInfo(BaseModel):
"""Information about the current execution environment."""
running_in_docker: bool = Field(description="Whether the current process is running inside a Docker container")
docker_available: bool = Field(description="Whether Docker is available and functional for Nextflow execution")
requested_profile: str = Field(description="The originally requested execution profile")
recommended_profile: str = Field(description="The profile recommended based on environment detection")
docker_image: str | None = Field(default=None, description="Docker image to use (if applicable)")
profile_override_reason: str | None = Field(default=None, description="Reason for profile override (if any)")
[docs]
def is_profile_overridden(self) -> bool:
"""Check if the recommended profile differs from the requested profile."""
return self.requested_profile != self.recommended_profile
[docs]
def get_execution_summary(self) -> str:
"""Get a human-readable summary of the execution environment."""
summary = f"Profile: {self.recommended_profile}"
if self.is_profile_overridden():
summary += f" (overridden from {self.requested_profile})"
if self.profile_override_reason:
summary += f" - {self.profile_override_reason}"
if self.running_in_docker:
summary += " | Running in container"
if self.docker_available and self.recommended_profile == "docker":
summary += f" | Using Docker image: {self.docker_image}"
return summary
def _get_executable_path(tool_name: str) -> str | None:
"""Get the full path to an executable, ensuring it exists."""
path = shutil.which(tool_name)
if path is None:
logger.warning(f"Tool '{tool_name}' not found in PATH")
return path
def _validate_command_args(cmd: list[str]) -> None:
"""Validate command arguments for subprocess execution."""
if not cmd:
raise ValueError("Command list cannot be empty")
executable = cmd[0]
if not executable:
raise ValueError("Executable path cannot be empty")
# Ensure we have an absolute path to the executable
if not Path(executable).is_absolute():
raise ValueError(f"Executable must be an absolute path: {executable}")
[docs]
class NextflowConfig:
"""Configuration manager for Nextflow workflows."""
DEFAULT_SIRNAFORGE_DOCKER_IMAGE = "ghcr.io/austin-s-h/sirnaforge:latest"
MEMORY_BUFFER_GB = 0.5
MIN_MEMORY_GB = 1
_UNLIMITED_MEMORY_THRESHOLD_BYTES = 1 << 60 # Treat extremely large limits as "unbounded"
[docs]
def __init__(
self,
docker_image: str = DEFAULT_SIRNAFORGE_DOCKER_IMAGE,
profile: str = "docker",
work_dir: Path | None = None,
nxf_home: Path | None = None,
max_cpus: int = 16,
max_memory: str = "128.GB",
max_time: str = "240.h",
**kwargs: Any,
) -> None:
"""Initialize Nextflow configuration.
Args:
docker_image: Docker container image to use
profile: Nextflow profile (docker, singularity, conda, local)
work_dir: Working directory for Nextflow execution
nxf_home: Nextflow home cache
max_cpus: Maximum CPU cores
max_memory: Maximum memory allocation
max_time: Maximum execution time
**kwargs: Additional configuration parameters
"""
self.docker_image = docker_image
self.profile = profile
self.work_dir = work_dir or Path.cwd() / "nextflow_work"
self.nxf_home = nxf_home or Path.home() / ".nextflow"
self.max_cpus = max_cpus
self.max_memory = max_memory
self.max_time = max_time
self.extra_params = kwargs
@classmethod
def _autodetect_max_memory(cls) -> str | None:
"""Detect a safe Nextflow max_memory value based on system limits."""
if os.getenv("SIRNAFORGE_DISABLE_MEMORY_AUTODETECT", "").lower() in (
"1",
"true",
"yes",
): # pragma: no cover - opt-out
logger.info("Memory auto-detect disabled via SIRNAFORGE_DISABLE_MEMORY_AUTODETECT")
return None
env_override = os.getenv("SIRNAFORGE_MAX_MEMORY_GB")
if env_override:
try:
override_gb = float(env_override)
detected = cls._calculate_safe_memory_limit_from_gb(override_gb)
logger.info(
"Using SIRNAFORGE_MAX_MEMORY_GB override (requested %.2f GiB -> %s)",
override_gb,
detected,
)
return detected
except ValueError:
logger.warning("Invalid SIRNAFORGE_MAX_MEMORY_GB value '%s' - ignoring", env_override)
cgroup_limit = cls._read_cgroup_memory_limit_bytes()
if cgroup_limit:
detected = cls._calculate_safe_memory_limit(cgroup_limit)
logger.info(
"Detected cgroup memory limit %.2f GiB -> using %s for Nextflow max_memory",
cgroup_limit / (1024**3),
detected,
)
return detected
meminfo_total = cls._read_meminfo_total_bytes()
if meminfo_total:
detected = cls._calculate_safe_memory_limit(meminfo_total)
logger.info(
"Detected system memory %.2f GiB -> using %s for Nextflow max_memory",
meminfo_total / (1024**3),
detected,
)
return detected
logger.warning("Unable to auto-detect system memory - falling back to default max_memory")
return None
@classmethod
def _read_cgroup_memory_limit_bytes(cls) -> int | None:
"""Read memory limits exposed via cgroups (v1 or v2)."""
candidate_paths = [
Path("/sys/fs/cgroup/memory.max"), # cgroup v2
Path("/sys/fs/cgroup/memory.high"),
Path("/sys/fs/cgroup/memory/memory.limit_in_bytes"), # cgroup v1
Path("/sys/fs/cgroup/memory.limit_in_bytes"),
]
for path in candidate_paths:
value = cls._read_int_from_file(path)
if value is None:
continue
if value >= cls._UNLIMITED_MEMORY_THRESHOLD_BYTES:
continue
return value
return None
@staticmethod
def _read_int_from_file(path: Path) -> int | None:
"""Try to parse an integer from the given file."""
try:
if not path.exists():
return None
content = path.read_text().strip()
if not content or content.lower() == "max":
return None
value = int(content)
if value <= 0:
return None
return value
except (OSError, ValueError):
return None
@staticmethod
def _read_meminfo_total_bytes() -> int | None:
"""Read MemTotal from /proc/meminfo."""
try:
with Path("/proc/meminfo").open() as meminfo:
for line in meminfo:
if line.startswith("MemTotal:"):
parts = line.split()
if len(parts) < 2:
continue
# Value provided in kB
total_kb = int(parts[1])
return total_kb * 1024
except (OSError, ValueError):
return None
return None
@classmethod
def _calculate_safe_memory_limit(cls, total_bytes: int) -> str:
"""Convert a byte limit into a safe Nextflow memory string."""
total_gb = total_bytes / (1024**3)
return cls._calculate_safe_memory_limit_from_gb(total_gb)
@classmethod
def _calculate_safe_memory_limit_from_gb(cls, total_gb: float) -> str:
"""Apply safety buffer and formatting to a raw GiB value."""
safe_gb = math.floor(total_gb - cls.MEMORY_BUFFER_GB)
safe_gb = max(safe_gb, cls.MIN_MEMORY_GB)
return f"{safe_gb}.GB"
[docs]
def get_nextflow_args(
self,
input_file: Path,
output_dir: Path,
genome_species: list[str],
additional_params: dict[str, Any] | None = None,
include_test_profile: bool = False,
) -> list[str]:
"""Generate Nextflow command arguments.
Args:
input_file: Input FASTA file path
output_dir: Output directory
genome_species: List of species for miRNA genome lookups (not genomic DNA)
additional_params: Additional parameters to pass
include_test_profile: Whether to include 'test' profile for integration testing
Returns:
List of command arguments for Nextflow
"""
# Ensure all paths are absolute to avoid working directory issues
abs_input_file = input_file.resolve()
abs_output_dir = output_dir.resolve()
abs_work_dir = self.work_dir.resolve()
args = [
"--input",
str(abs_input_file),
"--outdir",
str(abs_output_dir),
"--genome_species",
",".join(genome_species),
"-profile",
self.profile,
"-w",
str(abs_work_dir),
"-resume",
]
# Add test profile for local integration testing
if include_test_profile and os.getenv("SIRNAFORGE_USE_LOCAL_EXECUTION", "").lower() in ("true", "1", "yes"):
args.extend(["-profile", "test"])
# Add Docker image if using Docker profile
if self.profile == "docker":
args.extend(["-with-docker", self.docker_image])
# Add resource limits
args.extend(
[
"--max_cpus",
str(self.max_cpus),
"--max_memory",
self.max_memory,
"--max_time",
self.max_time,
]
)
# Add extra parameters from initialization
for key, value in self.extra_params.items():
if isinstance(value, bool):
if value:
args.append(f"--{key}")
else:
args.extend([f"--{key}", str(value)])
# Add additional runtime parameters
if additional_params:
for key, value in additional_params.items():
if isinstance(value, bool):
if value:
args.append(f"--{key}")
else:
args.extend([f"--{key}", str(value)])
return args
[docs]
def create_config_file(self, config_path: Path) -> Path:
"""Create a custom Nextflow configuration file.
Args:
config_path: Path where to create the config file
Returns:
Path to the created configuration file
"""
config_content = f"""
// Generated Nextflow configuration for siRNAforge
params {{
max_cpus = {self.max_cpus}
max_memory = '{self.max_memory}'
max_time = '{self.max_time}'
}}
process {{
container = '{self.docker_image}'
}}
{self.profile} {{
enabled = true
}}
"""
config_path.write_text(config_content)
logger.info(f"Created Nextflow config at {config_path}")
return config_path
[docs]
def validate_docker_available(self) -> bool:
"""Check if Docker is available for Nextflow execution.
This checks if Docker can be used by Nextflow to run containers.
Note: This is different from running tests inside Docker containers.
Returns:
True if Docker is available and accessible for Nextflow
"""
try:
# Get absolute path to docker executable
docker_path = _get_executable_path("docker")
if not docker_path:
logger.warning("Docker executable not found in PATH")
return False
cmd = [docker_path, "version"]
_validate_command_args(cmd)
subprocess.run(cmd, capture_output=True, timeout=10, check=True) # nosec B603
# Additional check: try to run a simple container to ensure Docker daemon is accessible
# This helps distinguish between Docker being installed vs Docker daemon being available
test_cmd = [docker_path, "run", "--rm", "hello-world"]
subprocess.run(test_cmd, capture_output=True, timeout=30, check=True) # nosec B603
logger.debug("Docker is available and functional for Nextflow")
return True
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
logger.warning("Docker is not available or not functional for Nextflow")
return False
[docs]
def is_running_in_docker(self) -> bool:
"""Check if we're currently running inside a Docker container.
This is useful for determining the appropriate execution profile
when running tests or workflows.
Returns:
True if running inside a Docker container
"""
try:
# Check for Docker-specific files
if Path("/.dockerenv").exists():
logger.debug("Detected container execution via /.dockerenv")
return True
# Check cgroup for Docker container indicators
if Path("/proc/1/cgroup").exists():
cgroup_content = Path("/proc/1/cgroup").read_text()
if "docker" in cgroup_content or "containerd" in cgroup_content:
logger.debug("Detected container execution via /proc/1/cgroup")
return True
# Additional check: look for container-specific environment variables
if os.getenv("SIRNAFORGE_IN_CONTAINER") or os.getenv("CONTAINER"):
logger.debug("Detected container execution via environment variable")
return True
except (FileNotFoundError, OSError):
pass
return False
def _is_singularity_available(self) -> bool:
"""Check if Singularity is available."""
try:
singularity_path = _get_executable_path("singularity")
if not singularity_path:
return False
cmd = [singularity_path, "--version"]
_validate_command_args(cmd)
subprocess.run(cmd, capture_output=True, timeout=10, check=True) # nosec B603
return True
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
return False
def _is_conda_available(self) -> bool:
"""Check if Conda or uv is available for environment management."""
# First check for uv (preferred for this project)
try:
uv_path = _get_executable_path("uv")
if uv_path:
cmd = [uv_path, "--version"]
_validate_command_args(cmd)
subprocess.run(cmd, capture_output=True, timeout=10, check=True) # nosec B603
return True
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
pass
# Fallback to conda
try:
conda_path = _get_executable_path("conda")
if not conda_path:
return False
cmd = [conda_path, "--version"]
_validate_command_args(cmd)
subprocess.run(cmd, capture_output=True, timeout=10, check=True) # nosec B603
return True
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
return False
[docs]
def get_execution_profile(self) -> str:
"""Get the appropriate execution profile based on available tools and environment.
This method considers:
1. Environment variables (SIRNAFORGE_USE_LOCAL_EXECUTION)
2. Whether we're running inside a Docker container (for testing)
3. Whether Docker is available for Nextflow execution
4. Availability of Singularity or Conda as fallbacks
5. The requested profile
Returns:
Recommended execution profile
"""
recommended_profile: str | None = None
# Check for explicit environment variable to force local execution
if os.getenv("SIRNAFORGE_USE_LOCAL_EXECUTION", "").lower() in ("true", "1", "yes"):
logger.info("SIRNAFORGE_USE_LOCAL_EXECUTION set, using local execution profile")
recommended_profile = "local"
# If we're running inside a Docker container (e.g., for testing),
# we might not have access to Docker daemon, so use local execution
if recommended_profile is None and self.is_running_in_docker():
logger.info("Running inside Docker container, using local execution profile")
recommended_profile = "local"
# If Docker profile is requested, check if Docker is available
if recommended_profile is None and self.profile == "docker":
if self.validate_docker_available():
logger.debug("Using Docker profile for Nextflow execution")
recommended_profile = "docker"
else:
logger.warning("Docker requested but not available, falling back to alternatives")
# Check for alternative container runtimes
if recommended_profile is None:
if self._is_singularity_available():
logger.info("Using Singularity profile as Docker alternative")
recommended_profile = "singularity"
elif self._is_conda_available():
logger.info("Using Conda profile as container alternative")
recommended_profile = "conda"
# Fallback to local if no container runtime available and a container profile was requested
if recommended_profile is None:
if self.profile in ["docker", "singularity", "conda"]:
logger.warning(f"Requested profile '{self.profile}' not available, falling back to local")
recommended_profile = "local"
else:
supported_profiles = ["local", "docker", "singularity", "conda", "test"]
if self.profile not in supported_profiles:
raise ValueError(
f"Requested profile '{self.profile}' is not supported. Supported: {supported_profiles}"
)
logger.debug(f"Using requested profile: {self.profile}")
recommended_profile = self.profile
return recommended_profile
[docs]
def get_environment_info(self) -> EnvironmentInfo:
"""Get information about the current execution environment.
This provides structured information about Docker availability,
profile selection, and environment detection.
Returns:
EnvironmentInfo model with environment details
"""
running_in_docker = self.is_running_in_docker()
docker_available = self.validate_docker_available()
recommended_profile = self.get_execution_profile()
# Determine override reason
override_reason = None
if self.profile != recommended_profile:
if running_in_docker:
override_reason = "Running inside container"
elif os.getenv("SIRNAFORGE_USE_LOCAL_EXECUTION"):
override_reason = "Environment variable SIRNAFORGE_USE_LOCAL_EXECUTION set"
elif self.profile == "docker" and not docker_available:
override_reason = "Docker not available"
return EnvironmentInfo(
running_in_docker=running_in_docker,
docker_available=docker_available,
requested_profile=self.profile,
recommended_profile=recommended_profile,
docker_image=self.docker_image if recommended_profile == "docker" else None,
profile_override_reason=override_reason,
)
[docs]
@classmethod
def for_testing(cls) -> "NextflowConfig":
"""Create a configuration optimized for testing.
This automatically detects if we're running in Docker and adjusts accordingly.
Uses uv/conda for environment management when available.
Returns:
NextflowConfig instance with test-friendly settings
"""
# For testing, use test profile by default, but allow environment override
instance = cls(
profile="test", # Use test profile which runs locally with uv/conda
max_cpus=2,
max_memory="6.GB",
max_time="6.h",
max_hits=100,
)
# Auto-detect and adjust profile only if environment variable is set or in container
detected_profile = instance.get_execution_profile()
if detected_profile != instance.profile and (
os.getenv("SIRNAFORGE_USE_LOCAL_EXECUTION", "").lower() in ("true", "1", "yes")
or instance.is_running_in_docker()
):
instance.profile = detected_profile
return instance
[docs]
@classmethod
def for_production(cls, **kwargs: Any) -> "NextflowConfig":
"""Create a configuration optimized for production use.
This uses Docker by default for reproducible execution with full resources.
Args:
**kwargs: Additional configuration parameters to override defaults
Returns:
NextflowConfig instance with production settings
"""
return cls(
docker_image="ghcr.io/austin-s-h/sirnaforge:latest",
profile="docker",
max_cpus=16,
max_memory="128.GB",
max_time="240.h",
**kwargs,
)