import threading import datetime import random import base64 import io import json import os from diffusers import ( StableDiffusionPipeline, StableDiffusionXLPipeline, DPMSolverMultistepScheduler, ) import torch from models import GenerationOptions, ImageParams, ImageMetadata, ImageResult # --- Model Loaders --- # To add a new model type, create a loader function and register it in MODEL_LOADERS def load_sd15(model_path, device, is_single_file): """Load Stable Diffusion 1.5 model.""" if is_single_file: pipe = StableDiffusionPipeline.from_single_file( model_path, torch_dtype=torch.float16, ) pipe.safety_checker = None pipe.requires_safety_checker = False else: pipe = StableDiffusionPipeline.from_pretrained( model_path, torch_dtype=torch.float16, safety_checker=None, ) return pipe def load_sdxl(model_path, device, is_single_file): """Load Stable Diffusion XL model.""" if is_single_file: pipe = StableDiffusionXLPipeline.from_single_file( model_path, torch_dtype=torch.float16, ) else: pipe = StableDiffusionXLPipeline.from_pretrained( model_path, torch_dtype=torch.float16, ) return pipe MODEL_LOADERS = { "sd15": load_sd15, "sdxl": load_sdxl, } # --- Pipeline Manager --- class SDPipeline: _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self._initialized = True self._generation_lock = threading.Lock() self._stop_requested = False self.device = "cuda" self.pipe = None self.model_path = os.environ.get("SD_MODEL_PATH", "./models/realistic-vision-v51") self.model_type = os.environ.get("SD_MODEL_TYPE", "sd15") self.low_vram = os.environ.get("SD_LOW_VRAM", "").lower() in ("1", "true", "yes") self.lora_stack = self._parse_lora_stack(os.environ.get("SD_LORA_STACK", "")) self.quality_keywords = "hyper detail, cinematic lighting, realistic, unedited" #Canon50, f/1.4, ISO 200, 1/160s, 8K, RAW def _parse_lora_stack(self, lora_env: str) -> list[tuple[str, float]]: """Parse SD_LORA_STACK env var into list of (path, weight) tuples. Format: path/to/lora.safetensors:0.8,path/to/other.safetensors:0.5 """ if not lora_env.strip(): return [] result = [] for entry in lora_env.split(","): entry = entry.strip() if not entry: continue if ":" in entry: path, weight_str = entry.rsplit(":", 1) weight = float(weight_str) else: path = entry weight = 1.0 result.append((path, weight)) return result def load(self): """Load the model into GPU memory.""" if self.pipe is not None: return if not os.path.exists(self.model_path): raise FileNotFoundError(f"Model not found: {self.model_path}") if self.model_type not in MODEL_LOADERS: available = ", ".join(MODEL_LOADERS.keys()) raise ValueError(f"Unknown model type '{self.model_type}'. Available: {available}") print(f"Loading model ({self.model_type}) from {self.model_path}...") is_single_file = self.model_path.endswith((".safetensors", ".ckpt")) loader = MODEL_LOADERS[self.model_type] self.pipe = loader(self.model_path, self.device, is_single_file) self.pipe.scheduler = DPMSolverMultistepScheduler.from_config( self.pipe.scheduler.config, use_karras_sigmas=True, ) if self.low_vram: self.pipe.enable_sequential_cpu_offload() self.pipe.vae.enable_slicing() self.pipe.vae.enable_tiling() print("Low VRAM mode: enabled sequential CPU offload and VAE slicing/tiling") else: self.pipe = self.pipe.to(self.device) self.pipe.enable_attention_slicing() self._load_loras() print("Model loaded successfully!") def _load_loras(self): """Load LoRA weights from SD_LORA_STACK configuration.""" if not self.lora_stack: return adapter_names = [] adapter_weights = [] for i, (path, weight) in enumerate(self.lora_stack): if not os.path.exists(path): print(f"Warning: LoRA not found, skipping: {path}") continue adapter_name = f"lora_{i}" print(f"Loading LoRA: {path} (weight={weight})") self.pipe.load_lora_weights(path, adapter_name=adapter_name) adapter_names.append(adapter_name) adapter_weights.append(weight) if adapter_names: self.pipe.set_adapters(adapter_names, adapter_weights=adapter_weights) print(f"Loaded {len(adapter_names)} LoRA(s)") def stop(self): """Signal the generation loop to stop.""" self._stop_requested = True def generate_stream(self, options: GenerationOptions): """Generate images and yield results one by one.""" if self.pipe is None: self.load() self._stop_requested = False with self._generation_lock: for i in range(options.count): if self._stop_requested: self._stop_requested = False return params = self._compute_params(options, i) full_prompt = f"{options.prompt}, {self.quality_keywords}" if options.add_quality_keywords else options.prompt image = self._generate_image(full_prompt, options.negative_prompt, params, options.width, options.height) if self._stop_requested: self._stop_requested = False return result = self._save_and_encode(image, options, params, full_prompt, i) yield result def _compute_params(self, options: GenerationOptions, index: int) -> ImageParams: """Compute generation parameters for a single image.""" if options.seed is not None: if options.increment_seed: current_seed = options.seed + index else: current_seed = self._random_seed() if options.vary_guidance and options.count > 1: t = index / (options.count - 1) current_guidance = options.guidance_low + t * (options.guidance_high - options.guidance_low) else: current_guidance = options.guidance_scale if options.vary_steps and options.count > 1: t = index / (options.count - 1) current_steps = int(options.steps_low + t * (options.steps_high - options.steps_low)) else: current_steps = options.steps return ImageParams( seed=current_seed, steps=current_steps, guidance_scale=current_guidance, ) def _generate_image(self, prompt: str, negative_prompt: str, params: ImageParams, width: int | None, height: int | None): """Run the diffusion pipeline to generate a single image.""" if self.low_vram: torch.cuda.empty_cache() gen_device = "cpu" if self.low_vram else self.device generator = torch.Generator(device=gen_device) generator.manual_seed(params.seed) kwargs = { "prompt": prompt, "num_inference_steps": params.steps, "guidance_scale": params.guidance_scale, "generator": generator, } if negative_prompt: kwargs["negative_prompt"] = negative_prompt if width: kwargs["width"] = width if height: kwargs["height"] = height with torch.no_grad(): result = self.pipe(**kwargs) return result.images[0] def _save_and_encode(self, image, options: GenerationOptions, params: ImageParams, full_prompt: str, index: int) -> ImageResult: """Save image to disk and encode as base64.""" dt = datetime.datetime.now().strftime("%y-%m-%d_%H-%M-%S") base_file = f"out/{dt}_{params.seed}" image.save(f"{base_file}.jpg") width = options.width or image.width height = options.height or image.height metadata = ImageMetadata( prompt=options.prompt, negative_prompt=options.negative_prompt, seed=params.seed, steps=params.steps, guidance_scale=params.guidance_scale, width=width, height=height, add_quality_keywords=options.add_quality_keywords, full_prompt=full_prompt, ) with open(f"{base_file}.json", "w") as f: json.dump(metadata.to_dict(), f, indent=2) buffer = io.BytesIO() image.save(buffer, format="JPEG") b64_image = base64.b64encode(buffer.getvalue()).decode("utf-8") return ImageResult( index=index + 1, total=options.count, filename=f"{dt}_{params.seed}.jpg", url=f"/out/{dt}_{params.seed}.jpg", base64=f"data:image/jpeg;base64,{b64_image}", metadata=metadata, ) def _random_seed(self, length=9): """Generate a random seed with the specified number of digits.""" random.seed() min_val = 10 ** (length - 1) max_val = 10 ** length - 1 return random.randint(min_val, max_val) pipeline = SDPipeline()