Skip to content

API Reference

This section contains the automatically generated API documentation for MKV Episode Matcher.

Core Modules

mkv_episode_matcher.__main__

Functions

main

main()

Entry point for the application.

If no arguments are provided, defaults to launching the web server. This makes the executable user-friendly when double-clicked.

Source code in mkv_episode_matcher/__main__.py
def main():
    """Entry point for the application.

    If no arguments are provided, defaults to launching the web server.
    This makes the executable user-friendly when double-clicked.
    """
    # If no arguments provided (e.g., double-clicking the exe), default to serve
    if len(sys.argv) == 1:
        # Call serve directly with defaults
        serve(port=8001, host="0.0.0.0", no_browser=False)
    else:
        cli_app()

mkv_episode_matcher.cli

Unified CLI Interface for MKV Episode Matcher V2

This module provides a single, intuitive command-line interface that handles all use cases with intelligent auto-detection and minimal configuration.

Functions

print_banner

print_banner()

Print application banner.

Source code in mkv_episode_matcher/cli.py
def print_banner():
    """Print application banner."""
    banner = Text("MKV Episode Matcher", style="bold blue")
    console.print(
        Panel(banner, subtitle="Intelligent episode matching with zero-config setup")
    )

match

match(
    path=typer.Argument(
        ...,
        help="Path to MKV file, series folder, or entire library",
        exists=True,
    ),
    season=typer.Option(
        None,
        "--season",
        "-s",
        help="Override season number for all files",
    ),
    recursive=typer.Option(
        True,
        "--recursive/--no-recursive",
        "-r/-nr",
        help="Search recursively in directories",
    ),
    dry_run=typer.Option(
        False,
        "--dry-run",
        "-d",
        help="Preview changes without renaming files",
    ),
    output_dir=typer.Option(
        None,
        "--output-dir",
        "-o",
        help="Copy renamed files to this directory instead of renaming in place",
    ),
    json_output=typer.Option(
        False,
        "--json",
        help="Output results in JSON format for automation",
    ),
    confidence_threshold=typer.Option(
        None,
        "--confidence",
        "-c",
        min=0.0,
        max=1.0,
        help="Minimum confidence score for matches (0.0-1.0)",
    ),
    download_subs=typer.Option(
        True,
        "--download-subs/--no-download-subs",
        help="Automatically download subtitles if not found locally",
    ),
    tmdb_id=typer.Option(
        None,
        "--tmdb-id",
        help="Manually specify the TMDB Show ID (e.g. 549 for Law & Order)",
    ),
    log_level=typer.Option(
        "INFO",
        "--log-level",
        "-l",
        help="Set logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
        case_sensitive=False,
    ),
)

Process MKV files with intelligent episode matching.

Automatically detects whether you're processing: • A single file • A series folder • An entire library

Examples:

# Process a single file
mkv-match episode.mkv

# Process a series season
mkv-match "/media/Breaking Bad/Season 1/"

# Process entire library
mkv-match /media/tv-shows/ --recursive

# Dry run with custom output
mkv-match episode.mkv --dry-run --output-dir ./renamed/

# Automation mode
mkv-match show/ --json --confidence 0.8
Source code in mkv_episode_matcher/cli.py
@app.command()
def match(
    path: Path = typer.Argument(
        ..., help="Path to MKV file, series folder, or entire library", exists=True
    ),
    # Core options
    season: int | None = typer.Option(
        None, "--season", "-s", help="Override season number for all files"
    ),
    recursive: bool = typer.Option(
        True,
        "--recursive/--no-recursive",
        "-r/-nr",
        help="Search recursively in directories",
    ),
    dry_run: bool = typer.Option(
        False, "--dry-run", "-d", help="Preview changes without renaming files"
    ),
    # Output options
    output_dir: Path | None = typer.Option(
        None,
        "--output-dir",
        "-o",
        help="Copy renamed files to this directory instead of renaming in place",
    ),
    json_output: bool = typer.Option(
        False, "--json", help="Output results in JSON format for automation"
    ),
    # Quality options
    confidence_threshold: float | None = typer.Option(
        None,
        "--confidence",
        "-c",
        min=0.0,
        max=1.0,
        help="Minimum confidence score for matches (0.0-1.0)",
    ),
    # Subtitle options
    download_subs: bool = typer.Option(
        True,
        "--download-subs/--no-download-subs",
        help="Automatically download subtitles if not found locally",
    ),
    # TMDB options
    tmdb_id: int | None = typer.Option(
        None,
        "--tmdb-id",
        help="Manually specify the TMDB Show ID (e.g. 549 for Law & Order)",
    ),
    # Logging options
    log_level: str = typer.Option(
        "INFO",
        "--log-level",
        "-l",
        help="Set logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
        case_sensitive=False,
    ),
):
    """
    Process MKV files with intelligent episode matching.

    Automatically detects whether you're processing:
    • A single file
    • A series folder
    • An entire library

    Examples:

        # Process a single file
        mkv-match episode.mkv

        # Process a series season
        mkv-match "/media/Breaking Bad/Season 1/"

        # Process entire library
        mkv-match /media/tv-shows/ --recursive

        # Dry run with custom output
        mkv-match episode.mkv --dry-run --output-dir ./renamed/

        # Automation mode
        mkv-match show/ --json --confidence 0.8
    """

    # Configure logging level
    log_level = log_level.upper()
    valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
    if log_level not in valid_levels:
        console.print(f"[red]Invalid log level: {log_level}. Must be one of {', '.join(valid_levels)}[/red]")
        sys.exit(1)

    logger.remove()
    logger.add(sys.stderr, level=log_level)

    # Add file logging to the documented log directory
    try:
        _cm = get_config_manager()
        _cfg = _cm.load()
        _log_dir = _cfg.cache_dir.parent / "logs"
        _log_dir.mkdir(parents=True, exist_ok=True)
        logger.add(
            str(_log_dir / "mkv-match.log"),
            rotation="10 MB",
            retention="1 week",
            level=log_level,
            encoding="utf-8",
        )
    except Exception:
        pass  # Fall back to stderr-only if config isn't available yet

    if not json_output:
        print_banner()

    # Load configuration
    try:
        cm = get_config_manager()
        config = cm.load()

        # Override config with CLI options
        if confidence_threshold is not None:
            config.min_confidence = confidence_threshold

        if not download_subs:
            config.sub_provider = "local"

    except Exception as e:
        if json_output:
            print(json.dumps({"error": f"Configuration error: {e}"}))
        else:
            console.print(f"[red]Configuration error: {e}[/red]")
        sys.exit(1)

    # Initialize engine
    try:
        engine = MatchEngineV2(config)
    except Exception as e:
        if json_output:
            print(json.dumps({"error": f"Engine initialization failed: {e}"}))
        else:
            console.print(f"[red]Failed to initialize engine: {e}[/red]")
        sys.exit(1)

    # Detect processing mode
    if path.is_file():
        mode = "single_file"
    elif path.is_dir():
        # Count MKV files to determine if it's a series or library
        mkv_count = len(list(path.rglob("*.mkv") if recursive else path.glob("*.mkv")))
        if mkv_count == 0:
            if json_output:
                print(json.dumps({"error": "No MKV files found"}))
            else:
                console.print("[yellow]No MKV files found[/yellow]")
            sys.exit(0)
        elif mkv_count <= 30:  # Arbitrary threshold
            mode = "series_folder"
        else:
            mode = "library"
    else:
        if json_output:
            print(json.dumps({"error": "Invalid path"}))
        else:
            console.print("[red]Invalid path[/red]")
        sys.exit(1)

    if not json_output:
        mode_descriptions = {
            "single_file": "Processing single file",
            "series_folder": "Processing series folder",
            "library": "Processing entire library",
        }
        console.print(f"[blue]{mode_descriptions[mode]}[/blue]: {path}")

        if dry_run:
            console.print("[yellow]DRY RUN MODE - No files will be renamed[/yellow]")

    # Process files
    try:
        results, failures = engine.process_path(
            path=path,
            season_override=season,
            recursive=recursive,
            dry_run=dry_run,
            output_dir=output_dir,
            json_output=json_output,
            confidence_threshold=confidence_threshold,
            tmdb_id=tmdb_id,
        )

        # Output results
        if json_output:
            output_data = {
                "mode": mode,
                "path": str(path),
                "total_matches": len(results),
                "total_failures": len(failures),
                "dry_run": dry_run,
                "results": json.loads(engine.export_results(results)),
                "failures": [
                    {
                        "original_file": str(f.original_file),
                        "reason": f.reason,
                        "confidence": f.confidence,
                    }
                    for f in failures
                ],
            }
            print(json.dumps(output_data, indent=2))
        else:
            # Rich console summary
            if results or failures:
                _display_comprehensive_summary(
                    results, failures, dry_run, output_dir, console
                )
            else:
                console.print("[yellow]No MKV files processed[/yellow]")

    except Exception as e:
        if json_output:
            print(json.dumps({"error": f"Processing failed: {e}"}))
        else:
            console.print(f"[red]Processing failed: {e}[/red]")
        sys.exit(1)

config

config(
    show_cache_dir=typer.Option(
        False,
        "--show-cache-dir",
        help="Show current cache directory location",
    ),
    reset=typer.Option(
        False,
        "--reset",
        help="Reset configuration to defaults",
    ),
)

Configure MKV Episode Matcher settings.

Most settings are auto-configured, but you can customize: • Cache directory location • Default confidence thresholds • ASR model preferences

Source code in mkv_episode_matcher/cli.py
@app.command()
def config(
    show_cache_dir: bool = typer.Option(
        False, "--show-cache-dir", help="Show current cache directory location"
    ),
    reset: bool = typer.Option(
        False, "--reset", help="Reset configuration to defaults"
    ),
):
    """
    Configure MKV Episode Matcher settings.

    Most settings are auto-configured, but you can customize:
    • Cache directory location
    • Default confidence thresholds
    • ASR model preferences
    """

    cm = get_config_manager()

    if show_cache_dir:
        config = cm.load()
        console.print(f"Cache directory: [blue]{config.cache_dir}[/blue]")
        return

    if reset:
        config = Config()  # Default config
        cm.save(config)
        console.print("[green]Configuration reset to defaults[/green]")
        return

    # Interactive configuration
    console.print(Panel("MKV Episode Matcher Configuration"))

    config = cm.load()

    # Cache directory
    current_cache = str(config.cache_dir)
    new_cache = typer.prompt(
        "Cache directory", default=current_cache, show_default=True
    )
    if new_cache != current_cache:
        config.cache_dir = Path(new_cache)

    # Confidence threshold
    current_confidence = config.min_confidence
    new_confidence = typer.prompt(
        "Minimum confidence threshold (0.0-1.0)",
        type=float,
        default=current_confidence,
        show_default=True,
    )
    if 0.0 <= new_confidence <= 1.0:
        config.min_confidence = new_confidence

    # ASR Model Selection
    console.print("\n[bold]ASR Model Configuration:[/bold]")

    try:
        from mkv_episode_matcher.core.model_registry import (
            list_recommended_models,
            get_leaderboard_url,
            get_model_info,
            DEFAULT_MODEL,
        )

        current_model = config.asr_model_name
        models = list_recommended_models()

        # Display available models
        console.print("\n  [dim]Recommended models:[/dim]")
        model_list = list(models.keys())
        for i, model_name in enumerate(model_list, 1):
            model_info = models[model_name]
            is_default = " [DEFAULT]" if model_name == DEFAULT_MODEL else ""
            is_current = " [CURRENT]" if model_name == current_model else ""
            gpu_req = "GPU required" if model_info["requires_gpu"] else "CPU-friendly"
            console.print(
                f"    {i}. {model_name}{is_default}{is_current}"
            )
            console.print(
                f"       ({model_info['size_mb']}MB, {gpu_req}, {model_info['quality']} quality)"
            )

        console.print(f"\n  [dim]Browse more models: {get_leaderboard_url()}[/dim]")
        console.print("  [dim]Enter a number (1-{}) or a custom HuggingFace model ID[/dim]".format(len(model_list)))

        new_model = typer.prompt(
            "ASR model",
            default=current_model,
            show_default=True,
        )

        # Handle numeric selection
        if new_model.isdigit() and 1 <= int(new_model) <= len(model_list):
            new_model = model_list[int(new_model) - 1]

        if new_model.strip():
            config.asr_model_name = new_model.strip()
            # Keep provider as parakeet
            config.asr_provider = "parakeet"

    except Exception as e:
        console.print(f"[yellow]Error loading model registry: {e}[/yellow]")
        # Fallback to simple prompt
        current_model = config.asr_model_name
        new_model = typer.prompt(
            "ASR model name",
            default=current_model,
            show_default=True,
        )
        if new_model.strip():
            config.asr_model_name = new_model.strip()

    # Subtitle provider
    current_sub = config.sub_provider
    new_sub = typer.prompt(
        "Subtitle provider (local/opensubtitles)",
        default=current_sub,
        show_default=True,
    )
    if new_sub in ["local", "opensubtitles"]:
        config.sub_provider = new_sub

    # OpenSubtitles config
    if config.sub_provider == "opensubtitles":
        console.print("\n[bold]OpenSubtitles Configuration:[/bold]")

        current_api = config.open_subtitles_api_key or ""
        new_api = typer.prompt("API Key", default=current_api, show_default=True)
        if new_api.strip():
            config.open_subtitles_api_key = new_api.strip()

        current_user = config.open_subtitles_username or ""
        new_user = typer.prompt("Username", default=current_user, show_default=True)
        if new_user.strip():
            config.open_subtitles_username = new_user.strip()

        current_pass = config.open_subtitles_password or ""
        new_pass = typer.prompt(
            "Password", default=current_pass, show_default=False, hide_input=True
        )
        if new_pass.strip():
            config.open_subtitles_password = new_pass.strip()

    # TMDB API key (optional)
    current_tmdb = config.tmdb_api_key or ""
    new_tmdb = typer.prompt(
        "TMDb API key (optional, for episode titles)",
        default=current_tmdb,
        show_default=False,
    )
    if new_tmdb.strip():
        config.tmdb_api_key = new_tmdb.strip()

    # Save configuration
    cm.save(config)
    console.print("[green]Configuration saved successfully[/green]")

info

info()

Show system information and available models.

Source code in mkv_episode_matcher/cli.py
@app.command()
def info():
    """
    Show system information and available models.
    """
    console.print(Panel("MKV Episode Matcher - System Information"))

    # Configuration info
    try:
        cm = get_config_manager()
        config = cm.load()

        console.print("\n[bold]Current Configuration:[/bold]")
        console.print(f"  Cache directory: {config.cache_dir}")
        console.print(f"  ASR model: [cyan]{config.asr_model_name}[/cyan]")
        console.print(f"  Subtitle provider: {config.sub_provider}")
        console.print(f"  Confidence threshold: {config.min_confidence}")

    except Exception as e:
        console.print(f"[red]Error loading config: {e}[/red]")
        config = None

    # Model registry info
    try:
        from mkv_episode_matcher.core.model_registry import (
            list_recommended_models,
            get_leaderboard_url,
            is_model_downloaded,
            get_model_info,
        )

        console.print("\n[bold]Recommended ASR Models:[/bold]")
        models = list_recommended_models()

        for model_name, model_info in models.items():
            is_current = config and model_name == config.asr_model_name
            current_marker = " [CURRENT]" if is_current else ""
            downloaded = is_model_downloaded(model_name)
            status = "[green]Downloaded[/green]" if downloaded else "[dim]Not downloaded[/dim]"
            gpu_req = "[yellow]GPU[/yellow]" if model_info["requires_gpu"] else "[green]CPU[/green]"

            console.print(
                f"  • {model_name}{current_marker}"
            )
            console.print(
                f"    {model_info['description']}"
            )
            console.print(
                f"    Size: {model_info['size_mb']}MB | {gpu_req} | Quality: {model_info['quality']} | {status}"
            )

        console.print(f"\n[dim]Browse more models: {get_leaderboard_url()}[/dim]")
        console.print("[dim]Run 'mkv-match config' to change your model[/dim]")

    except Exception as e:
        console.print(f"[red]Error checking models: {e}[/red]")

version

version()

Show version information.

Source code in mkv_episode_matcher/cli.py
@app.command()
def version():
    """Show version information."""
    try:
        import mkv_episode_matcher

        version = mkv_episode_matcher.__version__
    except AttributeError:
        version = "unknown"

    console.print(f"MKV Episode Matcher v{version}")

serve

serve(
    port=typer.Option(
        8001,
        "--port",
        "-p",
        help="Port to run the server on",
    ),
    host=typer.Option(
        "0.0.0.0", "--host", help="Host to bind to"
    ),
    no_browser=typer.Option(
        False,
        "--no-browser",
        help="Don't open browser automatically",
    ),
)

Launch the Web UI server.

Starts the backend API server and opens the web interface in your browser. This is the recommended way to use MKV Episode Matcher for most users.

Examples:

# Start web UI on default port
mkv-match serve

# Start on custom port without opening browser
mkv-match serve --port 9000 --no-browser
Source code in mkv_episode_matcher/cli.py
@app.command()
def serve(
    port: int = typer.Option(8001, "--port", "-p", help="Port to run the server on"),
    host: str = typer.Option("0.0.0.0", "--host", help="Host to bind to"),
    no_browser: bool = typer.Option(False, "--no-browser", help="Don't open browser automatically"),
):
    """
    Launch the Web UI server.

    Starts the backend API server and opens the web interface in your browser.
    This is the recommended way to use MKV Episode Matcher for most users.

    Examples:

        # Start web UI on default port
        mkv-match serve

        # Start on custom port without opening browser
        mkv-match serve --port 9000 --no-browser
    """
    import threading
    import time
    import webbrowser

    import uvicorn

    from mkv_episode_matcher.backend.main import app as fastapi_app

    print_banner()
    console.print(f"[blue]Starting Web UI server on http://{host}:{port}[/blue]")
    console.print("[dim]Press Ctrl+C to stop the server[/dim]\n")

    if not no_browser:
        def open_browser():
            time.sleep(1.5)
            webbrowser.open(f"http://localhost:{port}")
        threading.Thread(target=open_browser, daemon=True).start()

    uvicorn.run(fastapi_app, host=host, port=port)

gui

gui(
    port=typer.Option(
        8001,
        "--port",
        "-p",
        help="Port to run the server on",
    ),
    no_browser=typer.Option(
        False,
        "--no-browser",
        help="Don't open browser automatically",
    ),
)

Launch the Web UI (alias for 'serve').

Source code in mkv_episode_matcher/cli.py
@app.command()
def gui(
    port: int = typer.Option(8001, "--port", "-p", help="Port to run the server on"),
    no_browser: bool = typer.Option(False, "--no-browser", help="Don't open browser automatically"),
):
    """Launch the Web UI (alias for 'serve')."""
    serve(port=port, host="0.0.0.0", no_browser=no_browser)

mkv_episode_matcher.episode_identification

Classes

SubtitleCache

SubtitleCache()

Cache for storing parsed subtitle data to avoid repeated loading and parsing.

Source code in mkv_episode_matcher/episode_identification.py
def __init__(self):
    self.subtitles = {}  # {file_path: parsed_content}
    self.chunk_cache = {}  # {(file_path, chunk_idx): text}
Functions
get_subtitle_content
get_subtitle_content(srt_file)

Get the full content of a subtitle file, loading it only once.

Source code in mkv_episode_matcher/episode_identification.py
def get_subtitle_content(self, srt_file):
    """Get the full content of a subtitle file, loading it only once."""
    srt_file = str(srt_file)
    if srt_file not in self.subtitles:
        reader = SubtitleReader()
        self.subtitles[srt_file] = reader.read_srt_file(srt_file)
    return self.subtitles[srt_file]
get_chunk
get_chunk(srt_file, chunk_idx, chunk_start, chunk_end)

Get a specific time chunk from a subtitle file, with caching.

Source code in mkv_episode_matcher/episode_identification.py
def get_chunk(self, srt_file, chunk_idx, chunk_start, chunk_end):
    """Get a specific time chunk from a subtitle file, with caching."""
    srt_file = str(srt_file)
    cache_key = (srt_file, chunk_idx)

    if cache_key not in self.chunk_cache:
        content = self.get_subtitle_content(srt_file)
        reader = SubtitleReader()
        text_lines = reader.extract_subtitle_chunk(content, chunk_start, chunk_end)
        self.chunk_cache[cache_key] = " ".join(text_lines)

    return self.chunk_cache[cache_key]

EpisodeMatcher

EpisodeMatcher(
    cache_dir, show_name, min_confidence=0.6, device=None
)
Source code in mkv_episode_matcher/episode_identification.py
def __init__(self, cache_dir, show_name, min_confidence=0.6, device=None):
    self.cache_dir = Path(cache_dir)
    self.min_confidence = min_confidence
    self.show_name = show_name
    self.chunk_duration = 30
    self.skip_initial_duration = 300
    self.device = device or ("cuda" if ctranslate2.get_cuda_device_count() > 0 else "cpu")
    self.temp_dir = Path(tempfile.gettempdir()) / "whisper_chunks"
    self.temp_dir.mkdir(exist_ok=True)
    # Initialize subtitle cache
    self.subtitle_cache = SubtitleCache()
    # Cache for extracted audio chunks
    self.audio_chunks = {}
    # Store reference files to avoid repeated glob operations
    self.reference_files_cache = {}
Functions
extract_audio_chunk
extract_audio_chunk(mkv_file, start_time)

Extract a chunk of audio from MKV file with caching.

Source code in mkv_episode_matcher/episode_identification.py
def extract_audio_chunk(self, mkv_file, start_time):
    """Extract a chunk of audio from MKV file with caching."""
    cache_key = (str(mkv_file), start_time)

    if cache_key in self.audio_chunks:
        return self.audio_chunks[cache_key]

    chunk_path = self.temp_dir / f"chunk_{start_time}.wav"
    if not chunk_path.exists():
        cmd = [
            "ffmpeg",
            "-ss",
            str(start_time),
            "-t",
            str(self.chunk_duration),
            "-i",
            str(mkv_file),
            "-vn",  # Disable video
            "-sn",  # Disable subtitles
            "-dn",  # Disable data streams
            "-acodec",
            "pcm_s16le",
            "-ar",
            "16000",
            "-ac",
            "1",
            "-y",  # Overwrite output files without asking
            str(chunk_path),
        ]

        try:
            logger.debug(
                f"Extracting audio chunk from {mkv_file} at {start_time}s using FFmpeg"
            )
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)

            if result.returncode != 0:
                error_msg = f"FFmpeg failed with return code {result.returncode}"
                if result.stderr:
                    error_msg += f". Error: {result.stderr.strip()}"
                logger.error(error_msg)
                logger.debug(f"FFmpeg command: {' '.join(cmd)}")
                raise RuntimeError(error_msg)

            # Check if the output file was actually created and has content
            if not chunk_path.exists():
                error_msg = f"FFmpeg completed but output file was not created: {chunk_path}"
                logger.error(error_msg)
                raise RuntimeError(error_msg)

            # Check if the file has meaningful content (at least 1KB)
            if chunk_path.stat().st_size < 1024:
                error_msg = f"Generated audio chunk is too small ({chunk_path.stat().st_size} bytes), likely corrupted"
                logger.warning(error_msg)
                # Don't raise an error for small files, but log the warning

            logger.debug(
                f"Successfully extracted {chunk_path.stat().st_size} byte audio chunk"
            )

        except subprocess.TimeoutExpired as e:
            error_msg = f"FFmpeg timed out after 30 seconds while extracting audio from {mkv_file}"
            logger.error(error_msg)
            raise RuntimeError(error_msg) from e

        except Exception as e:
            error_msg = f"Failed to extract audio chunk from {mkv_file} at {start_time}s: {str(e)}"
            logger.error(error_msg)
            # Clean up partial file if it exists
            if chunk_path.exists():
                try:
                    chunk_path.unlink()
                except Exception as cleanup_error:
                    logger.warning(
                        f"Failed to clean up partial file {chunk_path}: {cleanup_error}"
                    )
            raise RuntimeError(error_msg) from e

    chunk_path_str = str(chunk_path)
    self.audio_chunks[cache_key] = chunk_path_str
    return chunk_path_str
load_reference_chunk
load_reference_chunk(srt_file, chunk_idx)

Load reference subtitles for a specific time chunk with caching.

PARAMETER DESCRIPTION
srt_file

Path to the SRT file

TYPE: str or Path

chunk_idx

Index of the chunk to load

TYPE: int

RETURNS DESCRIPTION
str

Combined text from the subtitle chunk

Source code in mkv_episode_matcher/episode_identification.py
def load_reference_chunk(self, srt_file, chunk_idx):
    """
    Load reference subtitles for a specific time chunk with caching.

    Args:
        srt_file (str or Path): Path to the SRT file
        chunk_idx (int): Index of the chunk to load

    Returns:
        str: Combined text from the subtitle chunk
    """
    try:
        # Apply the same offset as in _try_match_with_model
        chunk_start = self.skip_initial_duration + (chunk_idx * self.chunk_duration)
        chunk_end = chunk_start + self.chunk_duration

        return self.subtitle_cache.get_chunk(
            srt_file, chunk_idx, chunk_start, chunk_end
        )

    except Exception as e:
        logger.error(f"Error loading reference chunk from {srt_file}: {e}")
        return ""
get_reference_files
get_reference_files(season_number)

Get reference subtitle files with caching.

Source code in mkv_episode_matcher/episode_identification.py
def get_reference_files(self, season_number):
    """Get reference subtitle files with caching."""
    cache_key = (self.show_name, season_number)
    logger.debug(f"Reference cache key: {cache_key}")

    if cache_key in self.reference_files_cache:
        logger.debug("Returning cached reference files")
        return self.reference_files_cache[cache_key]

    reference_dir = self.cache_dir / "data" / self.show_name
    patterns = [
        f"S{season_number:02d}E",
        f"S{season_number}E",
        f"{season_number:02d}x",
        f"{season_number}x",
    ]

    reference_files = []
    for pattern in patterns:
        # Use case-insensitive file extension matching by checking both .srt and .SRT
        srt_files = list(reference_dir.glob("*.srt")) + list(
            reference_dir.glob("*.SRT")
        )
        files = [
            f
            for f in srt_files
            if re.search(f"{pattern}\\d+", f.name, re.IGNORECASE)
        ]
        reference_files.extend(files)

    # Remove duplicates while preserving order
    reference_files = list(dict.fromkeys(reference_files))
    logger.debug(
        f"Found {len(reference_files)} reference files for season {season_number}"
    )
    self.reference_files_cache[cache_key] = reference_files
    return reference_files
identify_episode
identify_episode(video_file, temp_dir, season_number)

Progressive episode identification with faster initial attempt.

Source code in mkv_episode_matcher/episode_identification.py
def identify_episode(self, video_file, temp_dir, season_number):
    """Progressive episode identification with faster initial attempt."""
    try:
        # Get reference files first with caching
        reference_files = self.get_reference_files(season_number)

        if not reference_files:
            logger.error(f"No reference files found for season {season_number}")
            return None

        # Cache video duration
        try:
            duration = get_video_duration(video_file)
        except Exception as e:
            logger.error(f"Failed to get video duration for {video_file}: {e}")
            return None

        # Try with Parakeet CTC model
        logger.info("Attempting match with Parakeet CTC model...")
        try:
            match = self._try_match_with_model(
                video_file,
                {
                    "type": "parakeet",
                    "name": "nvidia/parakeet-ctc-0.6b",
                    "device": self.device,
                },
                min(duration, 600),  # Allow up to 10 minutes
                reference_files,
            )
            if match:
                logger.info(
                    f"Successfully matched with Parakeet CTC model at {match['matched_at']}s (confidence: {match['confidence']:.2f})"
                )
                return match
        except Exception as e:
            logger.warning(f"Parakeet CTC model failed: {e}")

        logger.info(
            "Speech recognition match failed - no models were able to process this file"
        )
        return None

    except Exception as e:
        logger.error(
            f"Unexpected error during episode identification for {video_file}: {e}"
        )
        return None

    finally:
        # Cleanup temp files - keep this limited to only files we know we created
        for chunk_info in self.audio_chunks.values():
            try:
                Path(chunk_info).unlink(missing_ok=True)
            except Exception as e:
                logger.warning(f"Failed to delete temp file {chunk_info}: {e}")

SubtitleReader

Helper class for reading and parsing subtitle files.

Functions
parse_timestamp staticmethod
parse_timestamp(timestamp)

Parse SRT timestamp into seconds.

Source code in mkv_episode_matcher/episode_identification.py
@staticmethod
def parse_timestamp(timestamp):
    """Parse SRT timestamp into seconds."""
    hours, minutes, seconds = timestamp.replace(",", ".").split(":")
    return float(hours) * 3600 + float(minutes) * 60 + float(seconds)
read_srt_file staticmethod
read_srt_file(file_path)

Read an SRT file and return its contents with robust encoding handling.

PARAMETER DESCRIPTION
file_path

Path to the SRT file

TYPE: str or Path

RETURNS DESCRIPTION
str

Contents of the SRT file

Source code in mkv_episode_matcher/episode_identification.py
@staticmethod
def read_srt_file(file_path):
    """
    Read an SRT file and return its contents with robust encoding handling.

    Args:
        file_path (str or Path): Path to the SRT file

    Returns:
        str: Contents of the SRT file
    """
    return read_file_with_fallback(file_path)
extract_subtitle_chunk staticmethod
extract_subtitle_chunk(content, start_time, end_time)

Extract subtitle text for a specific time window.

PARAMETER DESCRIPTION
content

Full SRT file content

TYPE: str

start_time

Chunk start time in seconds

TYPE: float

end_time

Chunk end time in seconds

TYPE: float

RETURNS DESCRIPTION
list

List of subtitle texts within the time window

Source code in mkv_episode_matcher/episode_identification.py
@staticmethod
def extract_subtitle_chunk(content, start_time, end_time):
    """
    Extract subtitle text for a specific time window.

    Args:
        content (str): Full SRT file content
        start_time (float): Chunk start time in seconds
        end_time (float): Chunk end time in seconds

    Returns:
        list: List of subtitle texts within the time window
    """
    text_lines = []

    for block in content.strip().split("\n\n"):
        lines = block.split("\n")
        if len(lines) < 3 or "-->" not in lines[1]:
            continue

        try:
            timestamp = lines[1]
            time_parts = timestamp.split(" --> ")
            start_stamp = time_parts[0].strip()
            end_stamp = time_parts[1].strip()

            subtitle_start = SubtitleReader.parse_timestamp(start_stamp)
            subtitle_end = SubtitleReader.parse_timestamp(end_stamp)

            # Check if this subtitle overlaps with our chunk
            if subtitle_end >= start_time and subtitle_start <= end_time:
                text = " ".join(lines[2:])
                text_lines.append(text)

        except (IndexError, ValueError) as e:
            logger.warning(f"Error parsing subtitle block: {e}")
            continue

    return text_lines

Functions

get_video_duration cached

get_video_duration(video_file)

Get video duration with caching and error handling.

Source code in mkv_episode_matcher/episode_identification.py
@lru_cache(maxsize=100)
def get_video_duration(video_file):
    """Get video duration with caching and error handling."""
    try:
        logger.debug(f"Getting duration for video file: {video_file}")
        result = subprocess.run(
            [
                "ffprobe",
                "-v",
                "error",
                "-show_entries",
                "format=duration",
                "-of",
                "default=noprint_wrappers=1:nokey=1",
                str(video_file),
            ],
            capture_output=True,
            text=True,
            timeout=10,
        )

        if result.returncode != 0:
            error_msg = f"ffprobe failed with return code {result.returncode}"
            if result.stderr:
                error_msg += f". Error: {result.stderr.strip()}"
            logger.error(error_msg)
            raise RuntimeError(error_msg)

        duration_str = result.stdout.strip()
        if not duration_str:
            raise RuntimeError("ffprobe returned empty duration")

        duration = float(duration_str)
        if duration <= 0:
            raise RuntimeError(f"Invalid duration: {duration}")

        result_duration = int(np.ceil(duration))
        logger.debug(f"Video duration: {result_duration} seconds")
        return result_duration

    except subprocess.TimeoutExpired as e:
        error_msg = f"ffprobe timed out while getting duration for {video_file}"
        logger.error(error_msg)
        raise RuntimeError(error_msg) from e
    except ValueError as e:
        error_msg = (
            f"Failed to parse duration from ffprobe output for {video_file}: {e}"
        )
        logger.error(error_msg)
        raise RuntimeError(error_msg) from e
    except Exception as e:
        error_msg = f"Unexpected error getting video duration for {video_file}: {e}"
        logger.error(error_msg)
        raise RuntimeError(error_msg) from e

detect_file_encoding

detect_file_encoding(file_path)

Detect the encoding of a file using chardet.

PARAMETER DESCRIPTION
file_path

Path to the file

TYPE: str or Path

RETURNS DESCRIPTION
str

Detected encoding, defaults to 'utf-8' if detection fails

Source code in mkv_episode_matcher/episode_identification.py
def detect_file_encoding(file_path):
    """
    Detect the encoding of a file using chardet.

    Args:
        file_path (str or Path): Path to the file

    Returns:
        str: Detected encoding, defaults to 'utf-8' if detection fails
    """
    try:
        with open(file_path, "rb") as f:
            raw_data = f.read(
                min(1024 * 1024, Path(file_path).stat().st_size)
            )  # Read up to 1MB
        result = chardet.detect(raw_data)
        encoding = result["encoding"]
        confidence = result["confidence"]

        logger.debug(
            f"Detected encoding {encoding} with {confidence:.2%} confidence for {file_path}"
        )
        return encoding if encoding else "utf-8"
    except Exception as e:
        logger.warning(f"Error detecting encoding for {file_path}: {e}")
        return "utf-8"

read_file_with_fallback cached

read_file_with_fallback(file_path, encodings=None)

Read a file trying multiple encodings in order of preference.

PARAMETER DESCRIPTION
file_path

Path to the file

TYPE: str or Path

encodings

List of encodings to try, defaults to common subtitle encodings

TYPE: list DEFAULT: None

RETURNS DESCRIPTION
str

File contents

RAISES DESCRIPTION
ValueError

If file cannot be read with any encoding

Source code in mkv_episode_matcher/episode_identification.py
@lru_cache(maxsize=100)
def read_file_with_fallback(file_path, encodings=None):
    """
    Read a file trying multiple encodings in order of preference.

    Args:
        file_path (str or Path): Path to the file
        encodings (list): List of encodings to try, defaults to common subtitle encodings

    Returns:
        str: File contents

    Raises:
        ValueError: If file cannot be read with any encoding
    """
    if encodings is None:
        # First try detected encoding, then fallback to common subtitle encodings
        detected = detect_file_encoding(file_path)
        encodings = [detected, "utf-8", "latin-1", "cp1252", "iso-8859-1"]

    file_path = Path(file_path)
    errors = []

    for encoding in encodings:
        try:
            with open(file_path, encoding=encoding) as f:
                content = f.read()
            logger.debug(f"Successfully read {file_path} using {encoding} encoding")
            return content
        except UnicodeDecodeError as e:
            errors.append(f"{encoding}: {str(e)}")
            continue

    error_msg = f"Failed to read {file_path} with any encoding. Errors:\n" + "\n".join(
        errors
    )
    logger.error(error_msg)
    raise ValueError(error_msg)

mkv_episode_matcher.asr_models

ASR Model Abstraction Layer

This module provides a unified interface for different Automatic Speech Recognition models, supporting OpenAI Whisper models via faster-whisper for efficient inference.

Classes

ASRModel

ASRModel(model_name, device=None)

Bases: ABC

Abstract base class for ASR models.

Initialize ASR model.

PARAMETER DESCRIPTION
model_name

Name/identifier of the model

TYPE: str

device

Device to run on ('cpu', 'cuda', or None for auto-detect)

TYPE: str | None DEFAULT: None

Source code in mkv_episode_matcher/asr_models.py
def __init__(self, model_name: str, device: str | None = None):
    """
    Initialize ASR model.

    Args:
        model_name: Name/identifier of the model
        device: Device to run on ('cpu', 'cuda', or None for auto-detect)
    """
    self.model_name = model_name
    self.device = device or self._get_default_device()
    self._model = None
Attributes
is_loaded property
is_loaded

Check if model is loaded.

Functions
load abstractmethod
load()

Load the model. Should be called before transcription.

Source code in mkv_episode_matcher/asr_models.py
@abc.abstractmethod
def load(self):
    """Load the model. Should be called before transcription."""
    pass
transcribe abstractmethod
transcribe(audio_path)

Transcribe audio file.

PARAMETER DESCRIPTION
audio_path

Path to audio file

TYPE: str | Path

RETURNS DESCRIPTION
dict

Dictionary with at least 'text' key containing transcription

Source code in mkv_episode_matcher/asr_models.py
@abc.abstractmethod
def transcribe(self, audio_path: str | Path) -> dict:
    """
    Transcribe audio file.

    Args:
        audio_path: Path to audio file

    Returns:
        Dictionary with at least 'text' key containing transcription
    """
    pass
calculate_match_score
calculate_match_score(transcription, reference)

Calculate similarity score between transcription and reference.

PARAMETER DESCRIPTION
transcription

Transcribed text

TYPE: str

reference

Reference subtitle text

TYPE: str

RETURNS DESCRIPTION
float

Float score between 0.0 and 1.0

Source code in mkv_episode_matcher/asr_models.py
def calculate_match_score(self, transcription: str, reference: str) -> float:
    """
    Calculate similarity score between transcription and reference.

    Args:
        transcription: Transcribed text
        reference: Reference subtitle text

    Returns:
        Float score between 0.0 and 1.0
    """
    # Default implementation: Standard weights
    # Token sort ratio (70%) + Partial ratio (30%)
    token_weight = 0.7
    partial_weight = 0.3

    score = (
        fuzz.token_sort_ratio(transcription, reference) * token_weight
        + fuzz.partial_ratio(transcription, reference) * partial_weight
    ) / 100.0

    return score
unload
unload()

Unload model to free memory.

Source code in mkv_episode_matcher/asr_models.py
def unload(self):
    """Unload model to free memory."""
    self._model = None

FasterWhisperModel

FasterWhisperModel(model_name='small', device=None)

Bases: ASRModel

OpenAI Whisper ASR model implementation using faster-whisper.

This uses CTranslate2 for efficient inference, providing: - Faster inference than original Whisper - Lower memory usage - Easy CPU and GPU support - No complex dependencies (unlike NVIDIA NeMo)

Initialize Faster Whisper model.

PARAMETER DESCRIPTION
model_name

Whisper model size (tiny, base, small, medium, large-v3)

TYPE: str DEFAULT: 'small'

device

Device to run on ('cpu', 'cuda', or None for auto-detect)

TYPE: str | None DEFAULT: None

Source code in mkv_episode_matcher/asr_models.py
def __init__(
    self, model_name: str = "small", device: str | None = None
):
    """
    Initialize Faster Whisper model.

    Args:
        model_name: Whisper model size (tiny, base, small, medium, large-v3)
        device: Device to run on ('cpu', 'cuda', or None for auto-detect)
    """
    # Normalize model name
    if model_name.startswith("openai/whisper-"):
        model_name = model_name.replace("openai/whisper-", "")

    super().__init__(model_name, device)
Attributes
is_loaded property
is_loaded

Check if model is loaded.

Functions
load
load()

Load Faster Whisper model with caching.

Source code in mkv_episode_matcher/asr_models.py
def load(self):
    """Load Faster Whisper model with caching."""
    if self.is_loaded:
        return

    cache_key = f"faster_whisper_{self.model_name}_{self.device}"

    if cache_key in _model_cache:
        self._model = _model_cache[cache_key]
        logger.debug(
            f"Using cached Faster Whisper model: {self.model_name} on {self.device}"
        )
        return

    try:
        from faster_whisper import WhisperModel

        compute_type = self._get_compute_type()

        logger.info(
            f"Loading Faster Whisper model: {self.model_name} on {self.device} "
            f"(compute_type={compute_type})"
        )

        try:
            self._model = WhisperModel(
                self.model_name,
                device=self.device,
                compute_type=compute_type,
                download_root=None,  # Use default cache location
            )

            # Eagerly verify CUDA execution to trigger potential DLL errors immediately
            if self.device == "cuda":
                try:
                    # Attempt a dummy encode to check if libraries are actually loadable
                    # Create a small dummy feature vector [1, 80, 30] (typical mel spectrogram shape)
                    # We just need to trigger the engine
                    logger.debug("Verifying CUDA availability by running dummy encoding...")
                    # Simply accessing the model properties or running a tiny transcribe might be safer
                    # Let's try to transcribe a 1-second silence if possible, or just rely on ctranslate2 check
                    # Actually, just checking if we can encode a dummy tensor is best, but easier is 
                    # to let the first transcribe fail? No better to catch it here.
                    # Using a minimal transcribe on a dummy array
                    import numpy as np
                    dummy_audio = np.zeros(16000, dtype=np.float32)
                    next(self._model.transcribe(dummy_audio, language="en")[0], None)
                except Exception as e:
                    # If verifying fails, raise it to be caught by the outer try/except
                    logger.warning(f"CUDA verification failed: {e}")
                    raise RuntimeError(f"CUDA verification failed: {e}") from e

        except RuntimeError as e:
            # Fallback to CPU if CUDA libraries are missing
            if self.device == "cuda" and ("Library" in str(e) or "verification failed" in str(e)):
                logger.warning(
                    f"Failed to load/run on CUDA due to missing libraries: {e}. "
                    "Falling back to CPU."
                )
                self.device = "cpu"
                compute_type = "int8"
                self._model = WhisperModel(
                    self.model_name,
                    device=self.device,
                    compute_type=compute_type,
                    download_root=None,
                )
            else:
                raise

        _model_cache[cache_key] = self._model
        logger.info(
            f"Loaded Faster Whisper model: {self.model_name} on {self.device}"
        )

    except ImportError as e:
        raise ImportError(
            "faster-whisper not installed. Run: pip install faster-whisper"
        ) from e
    except Exception as e:
        logger.error(f"Failed to load Faster Whisper model {self.model_name}: {e}")
        raise
transcribe
transcribe(audio_path)

Transcribe audio using Faster Whisper.

PARAMETER DESCRIPTION
audio_path

Path to audio file

TYPE: str | Path

RETURNS DESCRIPTION
dict

Dictionary with 'text', 'raw_text', 'segments', and 'language'

Source code in mkv_episode_matcher/asr_models.py
def transcribe(self, audio_path: str | Path) -> dict:
    """
    Transcribe audio using Faster Whisper.

    Args:
        audio_path: Path to audio file

    Returns:
        Dictionary with 'text', 'raw_text', 'segments', and 'language'
    """
    if not self.is_loaded:
        self.load()

    preprocessed_audio = None
    try:
        logger.debug(f"Starting Faster Whisper transcription for {audio_path}")

        # Preprocess audio
        preprocessed_audio = self._preprocess_audio(audio_path)

        # Transcribe with faster-whisper
        segments, info = self._model.transcribe(
            preprocessed_audio,
            language="en",  # Force English for TV episode matching
            beam_size=5,
            best_of=5,
            temperature=0.0,  # Greedy decoding for consistency
            condition_on_previous_text=False,
            vad_filter=True,  # Filter out non-speech
        )

        # Collect all segment texts
        segment_list = []
        full_text_parts = []

        for segment in segments:
            segment_list.append({
                "start": segment.start,
                "end": segment.end,
                "text": segment.text,
            })
            full_text_parts.append(segment.text)

        raw_text = " ".join(full_text_parts).strip()
        cleaned_text = self._clean_transcription_text(raw_text)

        logger.debug(f"Raw transcription: '{raw_text}'")
        logger.debug(f"Cleaned transcription: '{cleaned_text}'")

        return {
            "text": cleaned_text,
            "raw_text": raw_text,
            "segments": segment_list,
            "language": info.language if hasattr(info, 'language') else "en",
        }

    except Exception as e:
        logger.error(
            f"Faster Whisper transcription failed for {audio_path}: {type(e).__name__}: {e}"
        )
        import traceback
        traceback.print_exc()
        # Return empty result instead of raising to allow fallback
        return {"text": "", "raw_text": "", "segments": [], "language": "en"}
    finally:
        # Clean up preprocessed audio file
        if preprocessed_audio and preprocessed_audio != str(audio_path):
            try:
                Path(preprocessed_audio).unlink(missing_ok=True)
            except Exception as e:
                logger.debug(f"Failed to clean up preprocessed audio: {e}")
calculate_match_score
calculate_match_score(transcription, reference)

Calculate similarity score between transcription and reference.

PARAMETER DESCRIPTION
transcription

Transcribed text

TYPE: str

reference

Reference subtitle text

TYPE: str

RETURNS DESCRIPTION
float

Float score between 0.0 and 1.0

Source code in mkv_episode_matcher/asr_models.py
def calculate_match_score(self, transcription: str, reference: str) -> float:
    """
    Calculate similarity score between transcription and reference.

    Args:
        transcription: Transcribed text
        reference: Reference subtitle text

    Returns:
        Float score between 0.0 and 1.0
    """
    # Default implementation: Standard weights
    # Token sort ratio (70%) + Partial ratio (30%)
    token_weight = 0.7
    partial_weight = 0.3

    score = (
        fuzz.token_sort_ratio(transcription, reference) * token_weight
        + fuzz.partial_ratio(transcription, reference) * partial_weight
    ) / 100.0

    return score
unload
unload()

Unload model to free memory.

Source code in mkv_episode_matcher/asr_models.py
def unload(self):
    """Unload model to free memory."""
    self._model = None

Functions

create_asr_model

create_asr_model(model_config)

Factory function to create ASR models from configuration.

PARAMETER DESCRIPTION
model_config

Dictionary with 'type' and 'name' keys

TYPE: dict

RETURNS DESCRIPTION
ASRModel

Configured ASRModel instance

Example

model_config = {"type": "whisper", "name": "small"} model = create_asr_model(model_config)

Source code in mkv_episode_matcher/asr_models.py
def create_asr_model(model_config: dict) -> ASRModel:
    """
    Factory function to create ASR models from configuration.

    Args:
        model_config: Dictionary with 'type' and 'name' keys

    Returns:
        Configured ASRModel instance

    Example:
        model_config = {"type": "whisper", "name": "small"}
        model = create_asr_model(model_config)
    """
    model_type = model_config.get("type", "").lower()
    model_name = model_config.get("name", "")
    device = model_config.get("device")

    # Handle whisper and faster-whisper types
    if model_type in ("whisper", "faster-whisper", "openai-whisper"):
        if not model_name:
            model_name = "small"

        logger.info(f"Creating Faster Whisper model: {model_name}")
        return FasterWhisperModel(model_name, device)

    # Legacy parakeet support - redirect to whisper
    elif model_type == "parakeet":
        logger.warning(
            "Parakeet models are no longer supported. Using Whisper 'small' model instead."
        )
        return FasterWhisperModel("small", device)

    else:
        raise ValueError(
            f"Unsupported model type: {model_type}. Supported types: 'whisper', 'faster-whisper'"
        )

get_cached_model

get_cached_model(model_config)

Get a cached model instance, creating it if necessary.

PARAMETER DESCRIPTION
model_config

Dictionary with model configuration

TYPE: dict

RETURNS DESCRIPTION
ASRModel

ASRModel instance (loaded and ready for use)

Source code in mkv_episode_matcher/asr_models.py
def get_cached_model(model_config: dict) -> ASRModel:
    """
    Get a cached model instance, creating it if necessary.

    Args:
        model_config: Dictionary with model configuration

    Returns:
        ASRModel instance (loaded and ready for use)
    """
    cache_key = f"{model_config.get('type', '')}_{model_config.get('name', '')}_{model_config.get('device', 'auto')}"

    if cache_key not in _model_cache:
        model = create_asr_model(model_config)
        model.load()  # Load immediately for caching
        _model_cache[cache_key] = model

    return _model_cache[cache_key]

clear_model_cache

clear_model_cache()

Clear all cached models to free memory.

Source code in mkv_episode_matcher/asr_models.py
def clear_model_cache():
    """Clear all cached models to free memory."""
    global _model_cache
    for model in _model_cache.values():
        if hasattr(model, "unload"):
            model.unload()
    _model_cache.clear()
    logger.info("Cleared ASR model cache")

list_available_models

list_available_models()

List available model types and their requirements.

RETURNS DESCRIPTION
dict

Dictionary with model types and their availability status

Source code in mkv_episode_matcher/asr_models.py
def list_available_models() -> dict:
    """
    List available model types and their requirements.

    Returns:
        Dictionary with model types and their availability status
    """
    availability = {}

    # Check Faster Whisper availability
    try:
        import faster_whisper  # noqa: F401

        availability["whisper"] = {
            "available": True,
            "models": list(FasterWhisperModel.MODEL_SIZES.keys()),
            "default": "small",
            "description": "OpenAI Whisper models via faster-whisper (CTranslate2)",
        }
    except ImportError:
        availability["whisper"] = {
            "available": False,
            "error": "faster-whisper not installed. Run: pip install faster-whisper",
        }

    return availability

mkv_episode_matcher.subtitle_utils

Functions

generate_subtitle_patterns

generate_subtitle_patterns(series_name, season, episode)

Generate various common subtitle filename patterns.

PARAMETER DESCRIPTION
series_name

Name of the series

TYPE: str

season

Season number

TYPE: int

episode

Episode number

TYPE: int

RETURNS DESCRIPTION
list[str]

List[str]: List of possible subtitle filenames

Source code in mkv_episode_matcher/subtitle_utils.py
def generate_subtitle_patterns(
    series_name: str, season: int, episode: int
) -> list[str]:
    """
    Generate various common subtitle filename patterns.

    Args:
        series_name (str): Name of the series
        season (int): Season number
        episode (int): Episode number

    Returns:
        List[str]: List of possible subtitle filenames
    """
    patterns = [
        # Standard format: "Show Name - S01E02.srt"
        f"{series_name} - S{season:02d}E{episode:02d}.srt",
        # Season x Episode format: "Show Name - 1x02.srt"
        f"{series_name} - {season}x{episode:02d}.srt",
        # Separate season/episode: "Show Name - Season 1 Episode 02.srt"
        f"{series_name} - Season {season} Episode {episode:02d}.srt",
        # Compact format: "ShowName.S01E02.srt"
        f"{series_name.replace(' ', '')}.S{season:02d}E{episode:02d}.srt",
        # Numbered format: "Show Name 102.srt"
        f"{series_name} {season:01d}{episode:02d}.srt",
        # Dot format: "Show.Name.1x02.srt"
        f"{series_name.replace(' ', '.')}.{season}x{episode:02d}.srt",
        # Underscore format: "Show_Name_S01E02.srt"
        f"{series_name.replace(' ', '_')}_S{season:02d}E{episode:02d}.srt",
    ]

    return patterns

find_existing_subtitle

find_existing_subtitle(
    series_cache_dir, series_name, season, episode
)

Check for existing subtitle files in various naming formats.

PARAMETER DESCRIPTION
series_cache_dir

Directory containing subtitle files

TYPE: str

series_name

Name of the series

TYPE: str

season

Season number

TYPE: int

episode

Episode number

TYPE: int

RETURNS DESCRIPTION
str | None

Optional[str]: Path to existing subtitle file if found, None otherwise

Source code in mkv_episode_matcher/subtitle_utils.py
def find_existing_subtitle(
    series_cache_dir: str, series_name: str, season: int, episode: int
) -> str | None:
    """
    Check for existing subtitle files in various naming formats.

    Args:
        series_cache_dir (str): Directory containing subtitle files
        series_name (str): Name of the series
        season (int): Season number
        episode (int): Episode number

    Returns:
        Optional[str]: Path to existing subtitle file if found, None otherwise
    """
    patterns = generate_subtitle_patterns(series_name, season, episode)

    for pattern in patterns:
        filepath = Path(series_cache_dir) / pattern
        if filepath.exists():
            return filepath

    return None

sanitize_filename

sanitize_filename(filename)

Sanitize filename by removing/replacing invalid characters.

PARAMETER DESCRIPTION
filename

Original filename

TYPE: str

RETURNS DESCRIPTION
str

Sanitized filename

TYPE: str

Source code in mkv_episode_matcher/subtitle_utils.py
def sanitize_filename(filename: str) -> str:
    """
    Sanitize filename by removing/replacing invalid characters.

    Args:
        filename (str): Original filename

    Returns:
        str: Sanitized filename
    """
    # Replace problematic characters
    filename = filename.replace(":", " -")
    filename = filename.replace("/", "-")
    filename = filename.replace("\\", "-")

    # Remove any other invalid characters
    filename = re.sub(r'[<>:"/\\|?*]', "", filename)

    return filename.strip()

TMDB Client

mkv_episode_matcher.tmdb_client

Classes

RateLimitedRequest

RateLimitedRequest(rate_limit=30, period=1)

A class that represents a rate-limited request object.

ATTRIBUTE DESCRIPTION
rate_limit

Maximum number of requests allowed per period.

TYPE: int

period

Period in seconds.

TYPE: int

requests_made

Counter for requests made.

TYPE: int

start_time

Start time of the current period.

TYPE: float

lock

Lock for synchronization.

TYPE: Lock

Source code in mkv_episode_matcher/tmdb_client.py
def __init__(self, rate_limit=30, period=1):
    self.rate_limit = rate_limit
    self.period = period
    self.requests_made = 0
    self.start_time = time.time()
    self.lock = Lock()
Functions
get
get(url)

Sends a rate-limited GET request to the specified URL.

PARAMETER DESCRIPTION
url

The URL to send the request to.

TYPE: str

RETURNS DESCRIPTION
Response

The response object returned by the request.

Source code in mkv_episode_matcher/tmdb_client.py
def get(self, url):
    """
    Sends a rate-limited GET request to the specified URL.

    Args:
        url (str): The URL to send the request to.

    Returns:
        Response: The response object returned by the request.
    """
    with self.lock:
        if self.requests_made >= self.rate_limit:
            sleep_time = self.period - (time.time() - self.start_time)
            if sleep_time > 0:
                time.sleep(sleep_time)
            self.requests_made = 0
            self.start_time = time.time()

        self.requests_made += 1

    response = requests.get(url, timeout=30)
    return response

Functions

retry_network_operation

retry_network_operation(max_retries=3, base_delay=1.0)

Decorator for retrying network operations.

Source code in mkv_episode_matcher/tmdb_client.py
def retry_network_operation(
    max_retries: int = 3, base_delay: float = 1.0
) -> Callable[[F], F]:
    """Decorator for retrying network operations."""

    def decorator(func: F) -> F:
        @wraps(func)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            last_exception = None
            delay = base_delay

            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except (requests.RequestException, ConnectionError, TimeoutError) as e:
                    last_exception = e
                    if attempt == max_retries:
                        logger.error(
                            f"Max retries ({max_retries}) exceeded for {func.__name__}: {e}"
                        )
                        raise e

                    logger.warning(
                        f"Network retry {attempt + 1}/{max_retries + 1} for {func.__name__}: {e}"
                    )
                    time.sleep(delay)
                    delay = min(delay * 2, 30)  # Cap at 30 seconds

            raise last_exception

        return wrapper  # type: ignore

    return decorator

fetch_show_id

fetch_show_id(show_name)

Fetch the TMDb ID for a given show name.

PARAMETER DESCRIPTION
show_name

The name of the show.

TYPE: str

RETURNS DESCRIPTION
str

The TMDb ID of the show, or None if not found.

TYPE: str | None

Source code in mkv_episode_matcher/tmdb_client.py
@retry_network_operation(max_retries=3, base_delay=1.0)
def fetch_show_id(show_name: str) -> str | None:
    """
    Fetch the TMDb ID for a given show name.

    Args:
        show_name (str): The name of the show.

    Returns:
        str: The TMDb ID of the show, or None if not found.
    """
    config = get_config_manager().load()
    tmdb_api_key = config.tmdb_api_key
    url = f"https://api.themoviedb.org/3/search/tv?query={show_name}&api_key={tmdb_api_key}"
    response = requests.get(url)
    if response.status_code == 200:
        results = response.json().get("results", [])
        if results:
            return str(results[0]["id"])
    return None

fetch_show_details

fetch_show_details(show_id)

Fetch show details from TMDB by ID.

PARAMETER DESCRIPTION
show_id

The TMDB show ID

TYPE: int

RETURNS DESCRIPTION
dict

Show details including 'name', 'number_of_seasons', etc.

TYPE: dict | None

None

If request fails or API key not configured

TYPE: dict | None

Source code in mkv_episode_matcher/tmdb_client.py
@retry_network_operation(max_retries=3, base_delay=1.0)
def fetch_show_details(show_id: int) -> dict | None:
    """
    Fetch show details from TMDB by ID.

    Args:
        show_id: The TMDB show ID

    Returns:
        dict: Show details including 'name', 'number_of_seasons', etc.
        None: If request fails or API key not configured
    """
    config = get_config_manager().load()
    if not config.tmdb_api_key:
        logger.warning("TMDB API key not configured")
        return None

    url = f"https://api.themoviedb.org/3/tv/{show_id}?api_key={config.tmdb_api_key}"

    try:
        response = requests.get(url, timeout=30)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        logger.error(f"Failed to fetch show details for ID {show_id}: {e}")
        return None

fetch_season_details

fetch_season_details(show_id, season_number)

Fetch the total number of episodes for a given show and season from the TMDb API.

PARAMETER DESCRIPTION
show_id

The ID of the show on TMDb.

TYPE: str

season_number

The season number to fetch details for.

TYPE: int

RETURNS DESCRIPTION
int

The total number of episodes in the season, or 0 if the API request failed.

TYPE: int

Source code in mkv_episode_matcher/tmdb_client.py
@retry_network_operation(max_retries=3, base_delay=1.0)
def fetch_season_details(show_id: str, season_number: int) -> int:
    """
    Fetch the total number of episodes for a given show and season from the TMDb API.

    Args:
        show_id (str): The ID of the show on TMDb.
        season_number (int): The season number to fetch details for.

    Returns:
        int: The total number of episodes in the season, or 0 if the API request failed.
    """
    logger.info(f"Fetching season details for Season {season_number}...")
    config = get_config_manager().load()
    tmdb_api_key = config.tmdb_api_key
    url = f"https://api.themoviedb.org/3/tv/{show_id}/season/{season_number}?api_key={tmdb_api_key}"
    try:
        response = requests.get(url, timeout=30)
        response.raise_for_status()
        season_data = response.json()
        total_episodes = len(season_data.get("episodes", []))
        return total_episodes
    except requests.exceptions.RequestException as e:
        logger.error(f"Failed to fetch season details for Season {season_number}: {e}")
        return 0
    except KeyError:
        logger.error(
            f"Missing 'episodes' key in response JSON data for Season {season_number}"
        )
        return 0

get_number_of_seasons

get_number_of_seasons(show_id)

Retrieves the number of seasons for a given TV show from the TMDB API.

Parameters: - show_id (int): The ID of the TV show.

Returns: - num_seasons (int): The number of seasons for the TV show.

Raises: - requests.HTTPError: If there is an error while making the API request.

Source code in mkv_episode_matcher/tmdb_client.py
@retry_network_operation(max_retries=3, base_delay=1.0)
def get_number_of_seasons(show_id: str) -> int:
    """
    Retrieves the number of seasons for a given TV show from the TMDB API.

    Parameters:
    - show_id (int): The ID of the TV show.

    Returns:
    - num_seasons (int): The number of seasons for the TV show.

    Raises:
    - requests.HTTPError: If there is an error while making the API request.
    """
    config = get_config_manager().load()
    tmdb_api_key = config.tmdb_api_key
    url = f"https://api.themoviedb.org/3/tv/{show_id}?api_key={tmdb_api_key}"
    response = requests.get(url, timeout=30)
    response.raise_for_status()
    show_data = response.json()
    num_seasons = show_data.get("number_of_seasons", 0)
    logger.info(f"Found {num_seasons} seasons")
    return num_seasons

Utilities

mkv_episode_matcher.utils

Functions

normalize_path

normalize_path(path_str)

Normalize a path string to handle cross-platform path issues. Properly handles trailing slashes and backslashes in both Windows and Unix paths. Also strips surrounding quotes that might be present in command line arguments.

PARAMETER DESCRIPTION
path_str

The path string to normalize

TYPE: str

RETURNS DESCRIPTION

pathlib.Path: A normalized Path object

Source code in mkv_episode_matcher/utils.py
def normalize_path(path_str):
    """
    Normalize a path string to handle cross-platform path issues.
    Properly handles trailing slashes and backslashes in both Windows and Unix paths.
    Also strips surrounding quotes that might be present in command line arguments.

    Args:
        path_str (str): The path string to normalize

    Returns:
        pathlib.Path: A normalized Path object
    """
    # Convert to string if it's a Path object
    if isinstance(path_str, Path):
        path_str = str(path_str)

    # Strip surrounding quotes (both single and double)
    path_str = path_str.strip().strip('"').strip("'")

    # Remove trailing slashes or backslashes
    path_str = path_str.rstrip("/").rstrip("\\")

    # Handle Windows paths on non-Windows platforms
    if os.name != "nt" and "\\" in path_str and ":" in path_str[:2]:
        # This looks like a Windows path on a non-Windows system
        # Extract the last component which should be the directory/file name
        components = path_str.split("\\")
        return Path(components[-1])

    return Path(path_str)

get_valid_seasons

get_valid_seasons(show_dir)

Get all season directories that contain MKV files.

PARAMETER DESCRIPTION
show_dir

Base directory for the TV show

TYPE: str

RETURNS DESCRIPTION
list

List of paths to valid season directories

Source code in mkv_episode_matcher/utils.py
def get_valid_seasons(show_dir):
    """
    Get all season directories that contain MKV files.

    Args:
        show_dir (str): Base directory for the TV show

    Returns:
        list: List of paths to valid season directories
    """
    # Get all season directories
    show_path = normalize_path(show_dir)
    season_paths = [str(show_path / d.name) for d in show_path.iterdir() if d.is_dir()]

    # Filter seasons to only include those with .mkv files
    valid_season_paths = []
    for season_path in season_paths:
        season_path_obj = Path(season_path)
        mkv_files = [f for f in season_path_obj.iterdir() if f.name.endswith(".mkv")]
        if mkv_files:
            valid_season_paths.append(season_path)

    if not valid_season_paths:
        logger.warning(
            f"No seasons with .mkv files found in show '{normalize_path(show_dir).name}'"
        )
    else:
        logger.info(
            f"Found {len(valid_season_paths)} seasons with .mkv files in '{normalize_path(show_dir).name}'"
        )

    return valid_season_paths

check_filename

check_filename(filename)

Check if the filename is in the correct format (S01E02).

PARAMETER DESCRIPTION
filename

The filename to check.

TYPE: str or Path

RETURNS DESCRIPTION
bool

True if the filename matches the expected pattern.

Source code in mkv_episode_matcher/utils.py
def check_filename(filename):
    """
    Check if the filename is in the correct format (S01E02).

    Args:
        filename (str or Path): The filename to check.

    Returns:
        bool: True if the filename matches the expected pattern.
    """
    # Convert Path object to string if needed
    if isinstance(filename, Path):
        filename = str(filename)
    # Check if the filename matches the expected format
    match = re.search(r".*S\d+E\d+", filename)
    return bool(match)

scramble_filename

scramble_filename(original_file_path, file_number)

Scrambles the filename of the given file path by adding the series title and file number.

PARAMETER DESCRIPTION
original_file_path

The original file path.

TYPE: str

file_number

The file number to be added to the filename.

TYPE: int

RETURNS DESCRIPTION

None

Source code in mkv_episode_matcher/utils.py
def scramble_filename(original_file_path, file_number):
    """
    Scrambles the filename of the given file path by adding the series title and file number.

    Args:
        original_file_path (str): The original file path.
        file_number (int): The file number to be added to the filename.

    Returns:
        None
    """
    logger.info(f"Scrambling {original_file_path}")
    series_title = normalize_path(original_file_path).parent.parent.name
    original_file_name = Path(original_file_path).name
    extension = Path(original_file_path).suffix
    new_file_name = f"{series_title} - {file_number:03d}{extension}"
    new_file_path = Path(original_file_path).parent / new_file_name
    if not new_file_path.exists():
        logger.info(f"Renaming {original_file_name} -> {new_file_name}")
        Path(original_file_path).rename(new_file_path)

rename_episode_file

rename_episode_file(original_file_path, new_filename)

Rename an episode file with a standardized naming convention.

PARAMETER DESCRIPTION
original_file_path

The original file path of the episode.

TYPE: str or Path

new_filename

The new filename including season/episode info.

TYPE: str or Path

RETURNS DESCRIPTION
Path

Path to the renamed file, or None if rename failed.

Source code in mkv_episode_matcher/utils.py
def rename_episode_file(original_file_path, new_filename):
    """
    Rename an episode file with a standardized naming convention.

    Args:
        original_file_path (str or Path): The original file path of the episode.
        new_filename (str or Path): The new filename including season/episode info.

    Returns:
        Path: Path to the renamed file, or None if rename failed.
    """
    original_dir = Path(original_file_path).parent
    new_file_path = original_dir / new_filename

    # Check if new filepath already exists
    if new_file_path.exists():
        logger.warning(f"File already exists: {new_filename}")

        # Add numeric suffix if file exists
        base, ext = Path(new_filename).stem, Path(new_filename).suffix
        suffix = 2
        while True:
            new_filename = f"{base}_{suffix}{ext}"
            new_file_path = original_dir / new_filename
            if not new_file_path.exists():
                break
            suffix += 1

    try:
        Path(original_file_path).rename(new_file_path)
        logger.info(f"Renamed {Path(original_file_path).name} -> {new_filename}")
        return new_file_path
    except OSError as e:
        logger.error(f"Failed to rename file: {e}")
        return None
    except FileExistsError as e:
        logger.error(f"Failed to rename file: {e}")
        return None

get_subtitles

get_subtitles(show_id, seasons, config=None, max_retries=3)

Retrieves and saves subtitles for a given TV show and seasons.

PARAMETER DESCRIPTION
show_id

The ID of the TV show.

TYPE: int

seasons

A set of season numbers for which subtitles should be retrieved.

TYPE: Set[int]

config

Preloaded configuration.

TYPE: Config object DEFAULT: None

max_retries

Number of times to retry subtitle download on OpenSubtitlesException. Defaults to 3.

TYPE: int DEFAULT: 3

Source code in mkv_episode_matcher/utils.py
def get_subtitles(show_id, seasons: set[int], config=None, max_retries=3):
    """
    Retrieves and saves subtitles for a given TV show and seasons.

    Args:
        show_id (int): The ID of the TV show.
        seasons (Set[int]): A set of season numbers for which subtitles should be retrieved.
        config (Config object, optional): Preloaded configuration.
        max_retries (int, optional): Number of times to retry subtitle download on OpenSubtitlesException. Defaults to 3.
    """
    if config is None:
        config = get_config_manager().load()
    show_dir = config.show_dir
    series_name = sanitize_filename(normalize_path(show_dir).name)
    tmdb_api_key = config.tmdb_api_key
    open_subtitles_api_key = config.open_subtitles_api_key
    open_subtitles_user_agent = config.open_subtitles_user_agent
    open_subtitles_username = config.open_subtitles_username
    open_subtitles_password = config.open_subtitles_password

    if not all([
        show_dir,
        tmdb_api_key,
        open_subtitles_api_key,
        open_subtitles_user_agent,
        open_subtitles_username,
        open_subtitles_password,
    ]):
        logger.error("Missing configuration settings. Please run the setup script.")
        return

    try:
        subtitles = OpenSubtitles(open_subtitles_user_agent, open_subtitles_api_key)
        subtitles.login(open_subtitles_username, open_subtitles_password)
    except Exception as e:
        logger.error(f"Failed to log in to OpenSubtitles: {e}")
        return

    for season in seasons:
        episodes = fetch_season_details(show_id, season)
        logger.info(f"Found {episodes} episodes in Season {season}")

        for episode in range(1, episodes + 1):
            logger.info(f"Processing Season {season}, Episode {episode}...")

            series_cache_dir = config.cache_dir / "data" / series_name
            os.makedirs(series_cache_dir, exist_ok=True)

            # Check for existing subtitle in any supported format
            existing_subtitle = find_existing_subtitle(
                series_cache_dir, series_name, season, episode
            )

            if existing_subtitle:
                logger.info(f"Subtitle already exists: {Path(existing_subtitle).name}")
                continue

            # Default to standard format for new downloads
            srt_filepath = str(
                series_cache_dir / f"{series_name} - S{season:02d}E{episode:02d}.srt"
            )

            # get the episode info from TMDB
            url = f"https://api.themoviedb.org/3/tv/{show_id}/season/{season}/episode/{episode}?api_key={tmdb_api_key}"
            response = requests.get(url)
            response.raise_for_status()
            episode_data = response.json()
            episode_id = episode_data["id"]

            # search for the subtitle
            response = subtitles.search(tmdb_id=episode_id, languages="en")
            if len(response.data) == 0:
                logger.warning(
                    f"No subtitles found for {series_name} - S{season:02d}E{episode:02d}"
                )
                continue

            for subtitle in response.data:
                subtitle_dict = subtitle.to_dict()
                # Remove special characters and convert to uppercase
                filename_clean = re.sub(
                    r"\\W+", " ", subtitle_dict["file_name"]
                ).upper()
                if f"E{episode:02d}" in filename_clean:
                    logger.info(f"Original filename: {subtitle_dict['file_name']}")
                    retry_count = 0
                    while retry_count < max_retries:
                        try:
                            srt_file = subtitles.download_and_save(subtitle)
                            shutil.move(srt_file, srt_filepath)
                            logger.info(f"Subtitle saved to {srt_filepath}")
                            break
                        except OpenSubtitlesException as e:
                            retry_count += 1
                            logger.error(
                                f"OpenSubtitlesException (attempt {retry_count}): {e}"
                            )
                            console.print(
                                f"[red]OpenSubtitlesException (attempt {retry_count}): {e}[/red]"
                            )
                            if retry_count >= max_retries:
                                user_input = input(
                                    "Would you like to continue matching? (y/n): "
                                )
                                if user_input.strip().lower() != "y":
                                    logger.info(
                                        "User chose to stop matching due to the error."
                                    )
                                    return
                                else:
                                    logger.info(
                                        "User chose to continue matching despite the error."
                                    )
                                    break
                        except Exception as e:
                            logger.error(f"Failed to download and save subtitle: {e}")
                            console.print(
                                f"[red]Failed to download and save subtitle: {e}[/red]"
                            )
                            user_input = input(
                                "Would you like to continue matching despite the error? (y/n): "
                            )
                            if user_input.strip().lower() != "y":
                                logger.info(
                                    "User chose to stop matching due to the error."
                                )
                                return
                            else:
                                logger.info(
                                    "User chose to continue matching despite the error."
                                )
                                break
                    else:
                        continue
                    break

process_reference_srt_files

process_reference_srt_files(series_name)

Process reference SRT files for a given series.

PARAMETER DESCRIPTION
series_name

The name of the series.

TYPE: str

RETURNS DESCRIPTION
dict

A dictionary containing the reference files where the keys are the MKV filenames and the values are the corresponding SRT texts.

Source code in mkv_episode_matcher/utils.py
@logger.catch
def process_reference_srt_files(series_name):
    """
    Process reference SRT files for a given series.

    Args:
        series_name (str): The name of the series.

    Returns:
        dict: A dictionary containing the reference files where the keys are the MKV filenames
              and the values are the corresponding SRT texts.
    """
    config = get_config_manager().load()

    reference_files = {}
    reference_dir = config.cache_dir / "data" / series_name

    for dirpath, _, filenames in os.walk(reference_dir):
        for filename in filenames:
            if filename.lower().endswith(".srt"):
                srt_file = Path(dirpath) / filename
                logger.info(f"Processing {srt_file}")
                srt_text = extract_srt_text(srt_file)
                season, episode = extract_season_episode(filename)
                mkv_filename = f"{series_name} - S{season:02}E{episode:02}.mkv"
                reference_files[mkv_filename] = srt_text

    return reference_files

extract_srt_text

extract_srt_text(filepath)

Extracts text content from an SRT file.

PARAMETER DESCRIPTION
filepath

Path to the SRT file.

TYPE: str

RETURNS DESCRIPTION
list

List of text lines from the SRT file.

Source code in mkv_episode_matcher/utils.py
def extract_srt_text(filepath):
    """
    Extracts text content from an SRT file.

    Args:
        filepath (str): Path to the SRT file.

    Returns:
        list: List of text lines from the SRT file.
    """
    # Read the file content
    with open(filepath) as f:
        content = f.read()

    # Split into subtitle blocks
    blocks = content.strip().split("\n\n")

    text_lines = []
    for block in blocks:
        lines = block.split("\n")
        if len(lines) < 3:
            continue

        # Skip index and timestamp, get all remaining lines as text
        text = " ".join(lines[2:])
        # Remove stage directions and tags
        text = re.sub(r"\[.*?\]|\<.*?\>", "", text)
        if text:
            text_lines.append(text)

    return text_lines

extract_season_episode

extract_season_episode(filename)

Extract season and episode numbers from filename with support for multiple formats.

PARAMETER DESCRIPTION
filename

Filename to parse

TYPE: str

RETURNS DESCRIPTION
tuple

(season_number, episode_number)

Source code in mkv_episode_matcher/utils.py
def extract_season_episode(filename):
    """
    Extract season and episode numbers from filename with support for multiple formats.

    Args:
        filename (str): Filename to parse

    Returns:
        tuple: (season_number, episode_number)
    """
    # List of patterns to try
    patterns = [
        r"S(\d+)E(\d+)",  # S01E01
        r"(\d+)x(\d+)",  # 1x01 or 01x01
        r"Season\s*(\d+).*?(\d+)",  # Season 1 - 01
    ]

    for pattern in patterns:
        match = re.search(pattern, filename, re.IGNORECASE)
        if match:
            return int(match.group(1)), int(match.group(2))

    return None, None

process_srt_files

process_srt_files(show_dir)

Process all SRT files in the given directory and its subdirectories.

PARAMETER DESCRIPTION
show_dir

The directory path where the SRT files are located.

TYPE: str

RETURNS DESCRIPTION
dict

A dictionary containing the SRT file paths as keys and their corresponding text content as values.

Source code in mkv_episode_matcher/utils.py
def process_srt_files(show_dir):
    """
    Process all SRT files in the given directory and its subdirectories.

    Args:
        show_dir (str): The directory path where the SRT files are located.

    Returns:
        dict: A dictionary containing the SRT file paths as keys and their corresponding text content as values.
    """
    srt_files = {}
    for dirpath, _, filenames in os.walk(show_dir):
        for filename in filenames:
            if filename.lower().endswith(".srt"):
                srt_file = Path(dirpath) / filename
                logger.info(f"Processing {srt_file}")
                srt_text = extract_srt_text(srt_file)
                srt_files[srt_file] = srt_text
    return srt_files

compare_and_rename_files

compare_and_rename_files(
    srt_files, reference_files, dry_run=False
)

Compare the srt files with the reference files and rename the matching mkv files.

PARAMETER DESCRIPTION
srt_files

A dictionary containing the srt files as keys and their contents as values.

TYPE: dict

reference_files

A dictionary containing the reference files as keys and their contents as values.

TYPE: dict

dry_run

If True, the function will only log the renaming actions without actually renaming the files. Defaults to False.

TYPE: bool DEFAULT: False

Source code in mkv_episode_matcher/utils.py
def compare_and_rename_files(srt_files, reference_files, dry_run=False):
    """
    Compare the srt files with the reference files and rename the matching mkv files.

    Args:
        srt_files (dict): A dictionary containing the srt files as keys and their contents as values.
        reference_files (dict): A dictionary containing the reference files as keys and their contents as values.
        dry_run (bool, optional): If True, the function will only log the renaming actions without actually renaming the files. Defaults to False.
    """
    logger.info(
        f"Comparing {len(srt_files)} srt files with {len(reference_files)} reference files"
    )
    for srt_text in srt_files.keys():
        parent_dir = Path(srt_text).parent.parent
        for reference in reference_files.keys():
            _season, _episode = extract_season_episode(reference)
            mkv_file = str(parent_dir / Path(srt_text).name.replace(".srt", ".mkv"))
            matching_lines = compare_text(
                reference_files[reference], srt_files[srt_text]
            )
            if matching_lines >= int(len(reference_files[reference]) * 0.1):
                logger.info(f"Matching lines: {matching_lines}")
                logger.info(f"Found matching file: {mkv_file} ->{reference}")
                new_filename = parent_dir / reference
                if not dry_run:
                    logger.info(f"Renaming {mkv_file} to {str(new_filename)}")
                    rename_episode_file(mkv_file, reference)

compare_text

compare_text(text1, text2)

Compare two lists of text lines and return the number of matching lines.

PARAMETER DESCRIPTION
text1

List of text lines from the first source.

TYPE: list

text2

List of text lines from the second source.

TYPE: list

RETURNS DESCRIPTION
int

Number of matching lines between the two sources.

Source code in mkv_episode_matcher/utils.py
def compare_text(text1, text2):
    """
    Compare two lists of text lines and return the number of matching lines.

    Args:
        text1 (list): List of text lines from the first source.
        text2 (list): List of text lines from the second source.

    Returns:
        int: Number of matching lines between the two sources.
    """
    # Flatten the list of text lines
    flat_text1 = [line for lines in text1 for line in lines]
    flat_text2 = [line for lines in text2 for line in lines]

    # Compare the two lists of text lines
    matching_lines = set(flat_text1).intersection(flat_text2)
    return len(matching_lines)