Skip to content

API Reference

This section contains the automatically generated API documentation for MKV Episode Matcher.

Core Modules

mkv_episode_matcher.__main__

mkv_episode_matcher.cli

Unified CLI Interface for MKV Episode Matcher V2

This module provides a single, intuitive command-line interface that handles all use cases with intelligent auto-detection and minimal configuration.

Functions

print_banner

print_banner()

Print application banner.

Source code in mkv_episode_matcher/cli.py
def print_banner():
    """Print application banner."""
    banner = Text("MKV Episode Matcher", style="bold blue")
    console.print(
        Panel(banner, subtitle="Intelligent episode matching with zero-config setup")
    )

match

match(
    path=typer.Argument(
        ...,
        help="Path to MKV file, series folder, or entire library",
        exists=True,
    ),
    season=typer.Option(
        None,
        "--season",
        "-s",
        help="Override season number for all files",
    ),
    recursive=typer.Option(
        True,
        "--recursive/--no-recursive",
        "-r/-nr",
        help="Search recursively in directories",
    ),
    dry_run=typer.Option(
        False,
        "--dry-run",
        "-d",
        help="Preview changes without renaming files",
    ),
    output_dir=typer.Option(
        None,
        "--output-dir",
        "-o",
        help="Copy renamed files to this directory instead of renaming in place",
    ),
    json_output=typer.Option(
        False,
        "--json",
        help="Output results in JSON format for automation",
    ),
    confidence_threshold=typer.Option(
        None,
        "--confidence",
        "-c",
        min=0.0,
        max=1.0,
        help="Minimum confidence score for matches (0.0-1.0)",
    ),
    download_subs=typer.Option(
        True,
        "--download-subs/--no-download-subs",
        help="Automatically download subtitles if not found locally",
    ),
    tmdb_id=typer.Option(
        None,
        "--tmdb-id",
        help="Manually specify the TMDB Show ID (e.g. 549 for Law & Order)",
    ),
    log_level=typer.Option(
        "INFO",
        "--log-level",
        "-l",
        help="Set logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
        case_sensitive=False,
    ),
)

Process MKV files with intelligent episode matching.

Automatically detects whether you're processing: • A single file • A series folder • An entire library

Examples:

# Process a single file
mkv-match episode.mkv

# Process a series season
mkv-match "/media/Breaking Bad/Season 1/"

# Process entire library
mkv-match /media/tv-shows/ --recursive

# Dry run with custom output
mkv-match episode.mkv --dry-run --output-dir ./renamed/

# Automation mode
mkv-match show/ --json --confidence 0.8
Source code in mkv_episode_matcher/cli.py
@app.command()
def match(
    path: Path = typer.Argument(
        ..., help="Path to MKV file, series folder, or entire library", exists=True
    ),
    # Core options
    season: int | None = typer.Option(
        None, "--season", "-s", help="Override season number for all files"
    ),
    recursive: bool = typer.Option(
        True,
        "--recursive/--no-recursive",
        "-r/-nr",
        help="Search recursively in directories",
    ),
    dry_run: bool = typer.Option(
        False, "--dry-run", "-d", help="Preview changes without renaming files"
    ),
    # Output options
    output_dir: Path | None = typer.Option(
        None,
        "--output-dir",
        "-o",
        help="Copy renamed files to this directory instead of renaming in place",
    ),
    json_output: bool = typer.Option(
        False, "--json", help="Output results in JSON format for automation"
    ),
    # Quality options
    confidence_threshold: float | None = typer.Option(
        None,
        "--confidence",
        "-c",
        min=0.0,
        max=1.0,
        help="Minimum confidence score for matches (0.0-1.0)",
    ),
    # Subtitle options
    download_subs: bool = typer.Option(
        True,
        "--download-subs/--no-download-subs",
        help="Automatically download subtitles if not found locally",
    ),
    # TMDB options
    tmdb_id: int | None = typer.Option(
        None,
        "--tmdb-id",
        help="Manually specify the TMDB Show ID (e.g. 549 for Law & Order)",
    ),
    # Logging options
    log_level: str = typer.Option(
        "INFO",
        "--log-level",
        "-l",
        help="Set logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
        case_sensitive=False,
    ),
):
    """
    Process MKV files with intelligent episode matching.

    Automatically detects whether you're processing:
    • A single file
    • A series folder
    • An entire library

    Examples:

        # Process a single file
        mkv-match episode.mkv

        # Process a series season
        mkv-match "/media/Breaking Bad/Season 1/"

        # Process entire library
        mkv-match /media/tv-shows/ --recursive

        # Dry run with custom output
        mkv-match episode.mkv --dry-run --output-dir ./renamed/

        # Automation mode
        mkv-match show/ --json --confidence 0.8
    """

    # Configure logging level
    log_level = log_level.upper()
    valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
    if log_level not in valid_levels:
        console.print(f"[red]Invalid log level: {log_level}. Must be one of {', '.join(valid_levels)}[/red]")
        sys.exit(1)

    logger.remove()
    logger.add(sys.stderr, level=log_level)

    if not json_output:
        print_banner()

    # Load configuration
    try:
        cm = get_config_manager()
        config = cm.load()

        # Override config with CLI options
        if confidence_threshold is not None:
            config.min_confidence = confidence_threshold

        if not download_subs:
            config.sub_provider = "local"

    except Exception as e:
        if json_output:
            print(json.dumps({"error": f"Configuration error: {e}"}))
        else:
            console.print(f"[red]Configuration error: {e}[/red]")
        sys.exit(1)

    # Initialize engine
    try:
        engine = MatchEngineV2(config)
    except Exception as e:
        if json_output:
            print(json.dumps({"error": f"Engine initialization failed: {e}"}))
        else:
            console.print(f"[red]Failed to initialize engine: {e}[/red]")
        sys.exit(1)

    # Detect processing mode
    if path.is_file():
        mode = "single_file"
    elif path.is_dir():
        # Count MKV files to determine if it's a series or library
        mkv_count = len(list(path.rglob("*.mkv") if recursive else path.glob("*.mkv")))
        if mkv_count == 0:
            if json_output:
                print(json.dumps({"error": "No MKV files found"}))
            else:
                console.print("[yellow]No MKV files found[/yellow]")
            sys.exit(0)
        elif mkv_count <= 30:  # Arbitrary threshold
            mode = "series_folder"
        else:
            mode = "library"
    else:
        if json_output:
            print(json.dumps({"error": "Invalid path"}))
        else:
            console.print("[red]Invalid path[/red]")
        sys.exit(1)

    if not json_output:
        mode_descriptions = {
            "single_file": "Processing single file",
            "series_folder": "Processing series folder",
            "library": "Processing entire library",
        }
        console.print(f"[blue]{mode_descriptions[mode]}[/blue]: {path}")

        if dry_run:
            console.print("[yellow]DRY RUN MODE - No files will be renamed[/yellow]")

    # Process files
    try:
        results, failures = engine.process_path(
            path=path,
            season_override=season,
            recursive=recursive,
            dry_run=dry_run,
            output_dir=output_dir,
            json_output=json_output,
            confidence_threshold=confidence_threshold,
            tmdb_id=tmdb_id,
        )

        # Output results
        if json_output:
            output_data = {
                "mode": mode,
                "path": str(path),
                "total_matches": len(results),
                "total_failures": len(failures),
                "dry_run": dry_run,
                "results": json.loads(engine.export_results(results)),
                "failures": [
                    {
                        "original_file": str(f.original_file),
                        "reason": f.reason,
                        "confidence": f.confidence,
                    }
                    for f in failures
                ],
            }
            print(json.dumps(output_data, indent=2))
        else:
            # Rich console summary
            if results or failures:
                _display_comprehensive_summary(
                    results, failures, dry_run, output_dir, console
                )
            else:
                console.print("[yellow]No MKV files processed[/yellow]")

    except Exception as e:
        if json_output:
            print(json.dumps({"error": f"Processing failed: {e}"}))
        else:
            console.print(f"[red]Processing failed: {e}[/red]")
        sys.exit(1)

config

config(
    show_cache_dir=typer.Option(
        False,
        "--show-cache-dir",
        help="Show current cache directory location",
    ),
    reset=typer.Option(
        False,
        "--reset",
        help="Reset configuration to defaults",
    ),
)

Configure MKV Episode Matcher settings.

Most settings are auto-configured, but you can customize: • Cache directory location • Default confidence thresholds • ASR model preferences

Source code in mkv_episode_matcher/cli.py
@app.command()
def config(
    show_cache_dir: bool = typer.Option(
        False, "--show-cache-dir", help="Show current cache directory location"
    ),
    reset: bool = typer.Option(
        False, "--reset", help="Reset configuration to defaults"
    ),
):
    """
    Configure MKV Episode Matcher settings.

    Most settings are auto-configured, but you can customize:
    • Cache directory location
    • Default confidence thresholds
    • ASR model preferences
    """

    cm = get_config_manager()

    if show_cache_dir:
        config = cm.load()
        console.print(f"Cache directory: [blue]{config.cache_dir}[/blue]")
        return

    if reset:
        config = Config()  # Default config
        cm.save(config)
        console.print("[green]Configuration reset to defaults[/green]")
        return

    # Interactive configuration
    console.print(Panel("MKV Episode Matcher Configuration"))

    config = cm.load()

    # Cache directory
    current_cache = str(config.cache_dir)
    new_cache = typer.prompt(
        "Cache directory", default=current_cache, show_default=True
    )
    if new_cache != current_cache:
        config.cache_dir = Path(new_cache)

    # Confidence threshold
    current_confidence = config.min_confidence
    new_confidence = typer.prompt(
        "Minimum confidence threshold (0.0-1.0)",
        type=float,
        default=current_confidence,
        show_default=True,
    )
    if 0.0 <= new_confidence <= 1.0:
        config.min_confidence = new_confidence

    # ASR provider
    current_asr = config.asr_provider
    new_asr = typer.prompt(
        "ASR provider (parakeet)",
        default=current_asr,
        show_default=True,
    )
    if new_asr in ["parakeet"]:
        config.asr_provider = new_asr

    # Subtitle provider
    current_sub = config.sub_provider
    new_sub = typer.prompt(
        "Subtitle provider (local/opensubtitles)",
        default=current_sub,
        show_default=True,
    )
    if new_sub in ["local", "opensubtitles"]:
        config.sub_provider = new_sub

    # OpenSubtitles config
    if config.sub_provider == "opensubtitles":
        console.print("\n[bold]OpenSubtitles Configuration:[/bold]")

        current_api = config.open_subtitles_api_key or ""
        new_api = typer.prompt("API Key", default=current_api, show_default=True)
        if new_api.strip():
            config.open_subtitles_api_key = new_api.strip()

        current_user = config.open_subtitles_username or ""
        new_user = typer.prompt("Username", default=current_user, show_default=True)
        if new_user.strip():
            config.open_subtitles_username = new_user.strip()

        current_pass = config.open_subtitles_password or ""
        new_pass = typer.prompt(
            "Password", default=current_pass, show_default=False, hide_input=True
        )
        if new_pass.strip():
            config.open_subtitles_password = new_pass.strip()

    # TMDB API key (optional)
    current_tmdb = config.tmdb_api_key or ""
    new_tmdb = typer.prompt(
        "TMDb API key (optional, for episode titles)",
        default=current_tmdb,
        show_default=False,
    )
    if new_tmdb.strip():
        config.tmdb_api_key = new_tmdb.strip()

    # Save configuration
    cm.save(config)
    console.print("[green]Configuration saved successfully[/green]")

info

info()

Show system information and available models.

Source code in mkv_episode_matcher/cli.py
@app.command()
def info():
    """
    Show system information and available models.
    """
    console.print(Panel("MKV Episode Matcher - System Information"))

    try:
        from mkv_episode_matcher.asr_models import list_available_models

        models = list_available_models()

        console.print("\n[bold]Available ASR Models:[/bold]")
        for model_type, info in models.items():
            if info.get("available"):
                status = "[green]Available[/green]"
                model_list = ", ".join(info.get("models", [])[:3])  # Show first 3
                console.print(f"  {model_type}: {status}")
                console.print(f"    Models: {model_list}")
            else:
                status = "[red]Not available[/red]"
                error = info.get("error", "Unknown error")
                console.print(f"  {model_type}: {status} ({error})")

    except Exception as e:
        console.print(f"[red]Error checking models: {e}[/red]")

    # Configuration info
    try:
        cm = get_config_manager()
        config = cm.load()

        console.print("\n[bold]Current Configuration:[/bold]")
        console.print(f"  Cache directory: {config.cache_dir}")
        console.print(f"  ASR provider: {config.asr_provider}")
        console.print(f"  Subtitle provider: {config.sub_provider}")
        console.print(f"  Confidence threshold: {config.min_confidence}")

    except Exception as e:
        console.print(f"[red]Error loading config: {e}[/red]")

version

version()

Show version information.

Source code in mkv_episode_matcher/cli.py
@app.command()
def version():
    """Show version information."""
    try:
        import mkv_episode_matcher

        version = mkv_episode_matcher.__version__
    except AttributeError:
        version = "unknown"

    console.print(f"MKV Episode Matcher v{version}")

gui

gui()

Launch the GUI application.

Source code in mkv_episode_matcher/cli.py
@app.command()
def gui():
    """Launch the GUI application."""
    import flet as ft

    from mkv_episode_matcher.ui.flet_app import main

    ft.app(target=main)

mkv_episode_matcher.episode_identification

Classes

SubtitleCache

SubtitleCache()

Cache for storing parsed subtitle data to avoid repeated loading and parsing.

Source code in mkv_episode_matcher/episode_identification.py
def __init__(self):
    self.subtitles = {}  # {file_path: parsed_content}
    self.chunk_cache = {}  # {(file_path, chunk_idx): text}
Functions
get_subtitle_content
get_subtitle_content(srt_file)

Get the full content of a subtitle file, loading it only once.

Source code in mkv_episode_matcher/episode_identification.py
def get_subtitle_content(self, srt_file):
    """Get the full content of a subtitle file, loading it only once."""
    srt_file = str(srt_file)
    if srt_file not in self.subtitles:
        reader = SubtitleReader()
        self.subtitles[srt_file] = reader.read_srt_file(srt_file)
    return self.subtitles[srt_file]
get_chunk
get_chunk(srt_file, chunk_idx, chunk_start, chunk_end)

Get a specific time chunk from a subtitle file, with caching.

Source code in mkv_episode_matcher/episode_identification.py
def get_chunk(self, srt_file, chunk_idx, chunk_start, chunk_end):
    """Get a specific time chunk from a subtitle file, with caching."""
    srt_file = str(srt_file)
    cache_key = (srt_file, chunk_idx)

    if cache_key not in self.chunk_cache:
        content = self.get_subtitle_content(srt_file)
        reader = SubtitleReader()
        text_lines = reader.extract_subtitle_chunk(content, chunk_start, chunk_end)
        self.chunk_cache[cache_key] = " ".join(text_lines)

    return self.chunk_cache[cache_key]

EpisodeMatcher

EpisodeMatcher(
    cache_dir, show_name, min_confidence=0.6, device=None
)
Source code in mkv_episode_matcher/episode_identification.py
def __init__(self, cache_dir, show_name, min_confidence=0.6, device=None):
    self.cache_dir = Path(cache_dir)
    self.min_confidence = min_confidence
    self.show_name = show_name
    self.chunk_duration = 30
    self.skip_initial_duration = 300
    self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
    self.temp_dir = Path(tempfile.gettempdir()) / "whisper_chunks"
    self.temp_dir.mkdir(exist_ok=True)
    # Initialize subtitle cache
    self.subtitle_cache = SubtitleCache()
    # Cache for extracted audio chunks
    self.audio_chunks = {}
    # Store reference files to avoid repeated glob operations
    self.reference_files_cache = {}
Functions
extract_audio_chunk
extract_audio_chunk(mkv_file, start_time)

Extract a chunk of audio from MKV file with caching.

Source code in mkv_episode_matcher/episode_identification.py
def extract_audio_chunk(self, mkv_file, start_time):
    """Extract a chunk of audio from MKV file with caching."""
    cache_key = (str(mkv_file), start_time)

    if cache_key in self.audio_chunks:
        return self.audio_chunks[cache_key]

    chunk_path = self.temp_dir / f"chunk_{start_time}.wav"
    if not chunk_path.exists():
        cmd = [
            "ffmpeg",
            "-ss",
            str(start_time),
            "-t",
            str(self.chunk_duration),
            "-i",
            str(mkv_file),
            "-vn",  # Disable video
            "-sn",  # Disable subtitles
            "-dn",  # Disable data streams
            "-acodec",
            "pcm_s16le",
            "-ar",
            "16000",
            "-ac",
            "1",
            "-y",  # Overwrite output files without asking
            str(chunk_path),
        ]

        try:
            logger.debug(
                f"Extracting audio chunk from {mkv_file} at {start_time}s using FFmpeg"
            )
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)

            if result.returncode != 0:
                error_msg = f"FFmpeg failed with return code {result.returncode}"
                if result.stderr:
                    error_msg += f". Error: {result.stderr.strip()}"
                logger.error(error_msg)
                logger.debug(f"FFmpeg command: {' '.join(cmd)}")
                raise RuntimeError(error_msg)

            # Check if the output file was actually created and has content
            if not chunk_path.exists():
                error_msg = f"FFmpeg completed but output file was not created: {chunk_path}"
                logger.error(error_msg)
                raise RuntimeError(error_msg)

            # Check if the file has meaningful content (at least 1KB)
            if chunk_path.stat().st_size < 1024:
                error_msg = f"Generated audio chunk is too small ({chunk_path.stat().st_size} bytes), likely corrupted"
                logger.warning(error_msg)
                # Don't raise an error for small files, but log the warning

            logger.debug(
                f"Successfully extracted {chunk_path.stat().st_size} byte audio chunk"
            )

        except subprocess.TimeoutExpired as e:
            error_msg = f"FFmpeg timed out after 30 seconds while extracting audio from {mkv_file}"
            logger.error(error_msg)
            raise RuntimeError(error_msg) from e

        except Exception as e:
            error_msg = f"Failed to extract audio chunk from {mkv_file} at {start_time}s: {str(e)}"
            logger.error(error_msg)
            # Clean up partial file if it exists
            if chunk_path.exists():
                try:
                    chunk_path.unlink()
                except Exception as cleanup_error:
                    logger.warning(
                        f"Failed to clean up partial file {chunk_path}: {cleanup_error}"
                    )
            raise RuntimeError(error_msg) from e

    chunk_path_str = str(chunk_path)
    self.audio_chunks[cache_key] = chunk_path_str
    return chunk_path_str
load_reference_chunk
load_reference_chunk(srt_file, chunk_idx)

Load reference subtitles for a specific time chunk with caching.

PARAMETER DESCRIPTION
srt_file

Path to the SRT file

TYPE: str or Path

chunk_idx

Index of the chunk to load

TYPE: int

RETURNS DESCRIPTION
str

Combined text from the subtitle chunk

Source code in mkv_episode_matcher/episode_identification.py
def load_reference_chunk(self, srt_file, chunk_idx):
    """
    Load reference subtitles for a specific time chunk with caching.

    Args:
        srt_file (str or Path): Path to the SRT file
        chunk_idx (int): Index of the chunk to load

    Returns:
        str: Combined text from the subtitle chunk
    """
    try:
        # Apply the same offset as in _try_match_with_model
        chunk_start = self.skip_initial_duration + (chunk_idx * self.chunk_duration)
        chunk_end = chunk_start + self.chunk_duration

        return self.subtitle_cache.get_chunk(
            srt_file, chunk_idx, chunk_start, chunk_end
        )

    except Exception as e:
        logger.error(f"Error loading reference chunk from {srt_file}: {e}")
        return ""
get_reference_files
get_reference_files(season_number)

Get reference subtitle files with caching.

Source code in mkv_episode_matcher/episode_identification.py
def get_reference_files(self, season_number):
    """Get reference subtitle files with caching."""
    cache_key = (self.show_name, season_number)
    logger.debug(f"Reference cache key: {cache_key}")

    if cache_key in self.reference_files_cache:
        logger.debug("Returning cached reference files")
        return self.reference_files_cache[cache_key]

    reference_dir = self.cache_dir / "data" / self.show_name
    patterns = [
        f"S{season_number:02d}E",
        f"S{season_number}E",
        f"{season_number:02d}x",
        f"{season_number}x",
    ]

    reference_files = []
    for pattern in patterns:
        # Use case-insensitive file extension matching by checking both .srt and .SRT
        srt_files = list(reference_dir.glob("*.srt")) + list(
            reference_dir.glob("*.SRT")
        )
        files = [
            f
            for f in srt_files
            if re.search(f"{pattern}\\d+", f.name, re.IGNORECASE)
        ]
        reference_files.extend(files)

    # Remove duplicates while preserving order
    reference_files = list(dict.fromkeys(reference_files))
    logger.debug(
        f"Found {len(reference_files)} reference files for season {season_number}"
    )
    self.reference_files_cache[cache_key] = reference_files
    return reference_files
identify_episode
identify_episode(video_file, temp_dir, season_number)

Progressive episode identification with faster initial attempt.

Source code in mkv_episode_matcher/episode_identification.py
def identify_episode(self, video_file, temp_dir, season_number):
    """Progressive episode identification with faster initial attempt."""
    try:
        # Get reference files first with caching
        reference_files = self.get_reference_files(season_number)

        if not reference_files:
            logger.error(f"No reference files found for season {season_number}")
            return None

        # Cache video duration
        try:
            duration = get_video_duration(video_file)
        except Exception as e:
            logger.error(f"Failed to get video duration for {video_file}: {e}")
            return None

        # Try with Parakeet CTC model
        logger.info("Attempting match with Parakeet CTC model...")
        try:
            match = self._try_match_with_model(
                video_file,
                {
                    "type": "parakeet",
                    "name": "nvidia/parakeet-ctc-0.6b",
                    "device": self.device,
                },
                min(duration, 600),  # Allow up to 10 minutes
                reference_files,
            )
            if match:
                logger.info(
                    f"Successfully matched with Parakeet CTC model at {match['matched_at']}s (confidence: {match['confidence']:.2f})"
                )
                return match
        except Exception as e:
            logger.warning(f"Parakeet CTC model failed: {e}")

        logger.info(
            "Speech recognition match failed - no models were able to process this file"
        )
        return None

    except Exception as e:
        logger.error(
            f"Unexpected error during episode identification for {video_file}: {e}"
        )
        return None

    finally:
        # Cleanup temp files - keep this limited to only files we know we created
        for chunk_info in self.audio_chunks.values():
            try:
                Path(chunk_info).unlink(missing_ok=True)
            except Exception as e:
                logger.warning(f"Failed to delete temp file {chunk_info}: {e}")

SubtitleReader

Helper class for reading and parsing subtitle files.

Functions
parse_timestamp staticmethod
parse_timestamp(timestamp)

Parse SRT timestamp into seconds.

Source code in mkv_episode_matcher/episode_identification.py
@staticmethod
def parse_timestamp(timestamp):
    """Parse SRT timestamp into seconds."""
    hours, minutes, seconds = timestamp.replace(",", ".").split(":")
    return float(hours) * 3600 + float(minutes) * 60 + float(seconds)
read_srt_file staticmethod
read_srt_file(file_path)

Read an SRT file and return its contents with robust encoding handling.

PARAMETER DESCRIPTION
file_path

Path to the SRT file

TYPE: str or Path

RETURNS DESCRIPTION
str

Contents of the SRT file

Source code in mkv_episode_matcher/episode_identification.py
@staticmethod
def read_srt_file(file_path):
    """
    Read an SRT file and return its contents with robust encoding handling.

    Args:
        file_path (str or Path): Path to the SRT file

    Returns:
        str: Contents of the SRT file
    """
    return read_file_with_fallback(file_path)
extract_subtitle_chunk staticmethod
extract_subtitle_chunk(content, start_time, end_time)

Extract subtitle text for a specific time window.

PARAMETER DESCRIPTION
content

Full SRT file content

TYPE: str

start_time

Chunk start time in seconds

TYPE: float

end_time

Chunk end time in seconds

TYPE: float

RETURNS DESCRIPTION
list

List of subtitle texts within the time window

Source code in mkv_episode_matcher/episode_identification.py
@staticmethod
def extract_subtitle_chunk(content, start_time, end_time):
    """
    Extract subtitle text for a specific time window.

    Args:
        content (str): Full SRT file content
        start_time (float): Chunk start time in seconds
        end_time (float): Chunk end time in seconds

    Returns:
        list: List of subtitle texts within the time window
    """
    text_lines = []

    for block in content.strip().split("\n\n"):
        lines = block.split("\n")
        if len(lines) < 3 or "-->" not in lines[1]:
            continue

        try:
            timestamp = lines[1]
            time_parts = timestamp.split(" --> ")
            start_stamp = time_parts[0].strip()
            end_stamp = time_parts[1].strip()

            subtitle_start = SubtitleReader.parse_timestamp(start_stamp)
            subtitle_end = SubtitleReader.parse_timestamp(end_stamp)

            # Check if this subtitle overlaps with our chunk
            if subtitle_end >= start_time and subtitle_start <= end_time:
                text = " ".join(lines[2:])
                text_lines.append(text)

        except (IndexError, ValueError) as e:
            logger.warning(f"Error parsing subtitle block: {e}")
            continue

    return text_lines

Functions

get_video_duration cached

get_video_duration(video_file)

Get video duration with caching and error handling.

Source code in mkv_episode_matcher/episode_identification.py
@lru_cache(maxsize=100)
def get_video_duration(video_file):
    """Get video duration with caching and error handling."""
    try:
        logger.debug(f"Getting duration for video file: {video_file}")
        result = subprocess.run(
            [
                "ffprobe",
                "-v",
                "error",
                "-show_entries",
                "format=duration",
                "-of",
                "default=noprint_wrappers=1:nokey=1",
                str(video_file),
            ],
            capture_output=True,
            text=True,
            timeout=10,
        )

        if result.returncode != 0:
            error_msg = f"ffprobe failed with return code {result.returncode}"
            if result.stderr:
                error_msg += f". Error: {result.stderr.strip()}"
            logger.error(error_msg)
            raise RuntimeError(error_msg)

        duration_str = result.stdout.strip()
        if not duration_str:
            raise RuntimeError("ffprobe returned empty duration")

        duration = float(duration_str)
        if duration <= 0:
            raise RuntimeError(f"Invalid duration: {duration}")

        result_duration = int(np.ceil(duration))
        logger.debug(f"Video duration: {result_duration} seconds")
        return result_duration

    except subprocess.TimeoutExpired as e:
        error_msg = f"ffprobe timed out while getting duration for {video_file}"
        logger.error(error_msg)
        raise RuntimeError(error_msg) from e
    except ValueError as e:
        error_msg = (
            f"Failed to parse duration from ffprobe output for {video_file}: {e}"
        )
        logger.error(error_msg)
        raise RuntimeError(error_msg) from e
    except Exception as e:
        error_msg = f"Unexpected error getting video duration for {video_file}: {e}"
        logger.error(error_msg)
        raise RuntimeError(error_msg) from e

detect_file_encoding

detect_file_encoding(file_path)

Detect the encoding of a file using chardet.

PARAMETER DESCRIPTION
file_path

Path to the file

TYPE: str or Path

RETURNS DESCRIPTION
str

Detected encoding, defaults to 'utf-8' if detection fails

Source code in mkv_episode_matcher/episode_identification.py
def detect_file_encoding(file_path):
    """
    Detect the encoding of a file using chardet.

    Args:
        file_path (str or Path): Path to the file

    Returns:
        str: Detected encoding, defaults to 'utf-8' if detection fails
    """
    try:
        with open(file_path, "rb") as f:
            raw_data = f.read(
                min(1024 * 1024, Path(file_path).stat().st_size)
            )  # Read up to 1MB
        result = chardet.detect(raw_data)
        encoding = result["encoding"]
        confidence = result["confidence"]

        logger.debug(
            f"Detected encoding {encoding} with {confidence:.2%} confidence for {file_path}"
        )
        return encoding if encoding else "utf-8"
    except Exception as e:
        logger.warning(f"Error detecting encoding for {file_path}: {e}")
        return "utf-8"

read_file_with_fallback cached

read_file_with_fallback(file_path, encodings=None)

Read a file trying multiple encodings in order of preference.

PARAMETER DESCRIPTION
file_path

Path to the file

TYPE: str or Path

encodings

List of encodings to try, defaults to common subtitle encodings

TYPE: list DEFAULT: None

RETURNS DESCRIPTION
str

File contents

RAISES DESCRIPTION
ValueError

If file cannot be read with any encoding

Source code in mkv_episode_matcher/episode_identification.py
@lru_cache(maxsize=100)
def read_file_with_fallback(file_path, encodings=None):
    """
    Read a file trying multiple encodings in order of preference.

    Args:
        file_path (str or Path): Path to the file
        encodings (list): List of encodings to try, defaults to common subtitle encodings

    Returns:
        str: File contents

    Raises:
        ValueError: If file cannot be read with any encoding
    """
    if encodings is None:
        # First try detected encoding, then fallback to common subtitle encodings
        detected = detect_file_encoding(file_path)
        encodings = [detected, "utf-8", "latin-1", "cp1252", "iso-8859-1"]

    file_path = Path(file_path)
    errors = []

    for encoding in encodings:
        try:
            with open(file_path, encoding=encoding) as f:
                content = f.read()
            logger.debug(f"Successfully read {file_path} using {encoding} encoding")
            return content
        except UnicodeDecodeError as e:
            errors.append(f"{encoding}: {str(e)}")
            continue

    error_msg = f"Failed to read {file_path} with any encoding. Errors:\n" + "\n".join(
        errors
    )
    logger.error(error_msg)
    raise ValueError(error_msg)

mkv_episode_matcher.asr_models

ASR Model Abstraction Layer

This module provides a unified interface for different Automatic Speech Recognition models, including OpenAI Whisper and NVIDIA Parakeet models.

Classes

ASRModel

ASRModel(model_name, device=None)

Bases: ABC

Abstract base class for ASR models.

Initialize ASR model.

PARAMETER DESCRIPTION
model_name

Name/identifier of the model

TYPE: str

device

Device to run on ('cpu', 'cuda', or None for auto-detect)

TYPE: str | None DEFAULT: None

Source code in mkv_episode_matcher/asr_models.py
def __init__(self, model_name: str, device: str | None = None):
    """
    Initialize ASR model.

    Args:
        model_name: Name/identifier of the model
        device: Device to run on ('cpu', 'cuda', or None for auto-detect)
    """
    self.model_name = model_name
    self.device = device or self._get_default_device()
    self._model = None
Attributes
is_loaded property
is_loaded

Check if model is loaded.

Functions
load abstractmethod
load()

Load the model. Should be called before transcription.

Source code in mkv_episode_matcher/asr_models.py
@abc.abstractmethod
def load(self):
    """Load the model. Should be called before transcription."""
    pass
transcribe abstractmethod
transcribe(audio_path)

Transcribe audio file.

PARAMETER DESCRIPTION
audio_path

Path to audio file

TYPE: str | Path

RETURNS DESCRIPTION
dict

Dictionary with at least 'text' key containing transcription

Source code in mkv_episode_matcher/asr_models.py
@abc.abstractmethod
def transcribe(self, audio_path: str | Path) -> dict:
    """
    Transcribe audio file.

    Args:
        audio_path: Path to audio file

    Returns:
        Dictionary with at least 'text' key containing transcription
    """
    pass
calculate_match_score
calculate_match_score(transcription, reference)

Calculate similarity score between transcription and reference.

PARAMETER DESCRIPTION
transcription

Transcribed text

TYPE: str

reference

Reference subtitle text

TYPE: str

RETURNS DESCRIPTION
float

Float score between 0.0 and 1.0

Source code in mkv_episode_matcher/asr_models.py
def calculate_match_score(self, transcription: str, reference: str) -> float:
    """
    Calculate similarity score between transcription and reference.

    Args:
        transcription: Transcribed text
        reference: Reference subtitle text

    Returns:
        Float score between 0.0 and 1.0
    """
    # Default implementation: Standard weights
    # Token sort ratio (70%) + Partial ratio (30%)
    token_weight = 0.7
    partial_weight = 0.3

    score = (
        fuzz.token_sort_ratio(transcription, reference) * token_weight
        + fuzz.partial_ratio(transcription, reference) * partial_weight
    ) / 100.0

    return score
unload
unload()

Unload model to free memory.

Source code in mkv_episode_matcher/asr_models.py
def unload(self):
    """Unload model to free memory."""
    self._model = None

ParakeetTDTModel

ParakeetTDTModel(
    model_name="nvidia/parakeet-tdt-0.6b-v2", device=None
)

Bases: ASRModel

NVIDIA Parakeet TDT ASR model implementation.

WARNING: This model (TDT) uses the Transducer decoder which requires significant GPU resources and may be unstable on some Windows configurations (CUDA errors).

Initialize Parakeet TDT model.

PARAMETER DESCRIPTION
model_name

Parakeet model identifier from HuggingFace

TYPE: str DEFAULT: 'nvidia/parakeet-tdt-0.6b-v2'

device

Device to run on

TYPE: str | None DEFAULT: None

Source code in mkv_episode_matcher/asr_models.py
def __init__(
    self, model_name: str = "nvidia/parakeet-tdt-0.6b-v2", device: str | None = None
):
    """
    Initialize Parakeet TDT model.

    Args:
        model_name: Parakeet model identifier from HuggingFace
        device: Device to run on
    """
    super().__init__(model_name, device)
Attributes
is_loaded property
is_loaded

Check if model is loaded.

Functions
load
load()

Load Parakeet model with caching.

Source code in mkv_episode_matcher/asr_models.py
def load(self):
    """Load Parakeet model with caching."""
    if self.is_loaded:
        return

    cache_key = f"parakeet_tdt_{self.model_name}_{self.device}"

    if cache_key in _model_cache:
        self._model = _model_cache[cache_key]
        logger.debug(
            f"Using cached Parakeet TDT model: {self.model_name} on {self.device}"
        )
        return

    try:
        # Windows compatibility: Patch signal module before importing NeMo
        if os.name == "nt":  # Windows
            import signal

            if not hasattr(signal, "SIGKILL"):
                # Add missing signal constants for Windows compatibility
                signal.SIGKILL = 9
                signal.SIGTERM = 15

        import nemo.collections.asr as nemo_asr

        # Store original environment variables for restoration
        original_env = {}

        # Configure environment to suppress NeMo warnings and optimize performance
        nemo_env_settings = {
            "NEMO_DISABLE_TRAINING_LOGS": "1",
            "NEMO_DISABLE_HYDRA_LOGS": "1",
            "HYDRA_FULL_ERROR": "0",
            "PYTHONWARNINGS": "ignore::UserWarning",
            "TOKENIZERS_PARALLELISM": "false",  # Avoid tokenizer warnings
        }

        # Windows compatibility: Add optimizations but avoid signal issues
        if os.name == "nt":  # Windows
            nemo_env_settings.update({
                "OMP_NUM_THREADS": "1",
                "MKL_NUM_THREADS": "1",
                "NEMO_BYPASS_SIGNALS": "1",  # Bypass NeMo signal handling on Windows
            })

        for key, value in nemo_env_settings.items():
            original_env[key] = os.environ.get(key)
            os.environ[key] = value

        try:
            # Set device for NeMo
            if self.device == "cuda" and torch.cuda.is_available():
                # NeMo will automatically use CUDA if available
                pass
            elif self.device == "cpu":
                # Force CPU usage - NeMo respects CUDA_VISIBLE_DEVICES=""
                original_env["CUDA_VISIBLE_DEVICES"] = os.environ.get(
                    "CUDA_VISIBLE_DEVICES"
                )
                os.environ["CUDA_VISIBLE_DEVICES"] = ""

            # Load model with reduced verbosity
            self._model = nemo_asr.models.ASRModel.from_pretrained(
                model_name=self.model_name,
                strict=False,  # Allow loading with missing keys to reduce warnings
            )

            # Configure model for optimal inference
            if hasattr(self._model, "set_batch_size"):
                self._model.set_batch_size(1)  # Optimize for single file processing

            # Fix for Windows: Force num_workers to 0 to avoid multiprocessing errors/locks
            if hasattr(self._model, "cfg"):
                for ds_config in ["test_ds", "validation_ds"]:
                    if ds_config in self._model.cfg:
                        self._model.cfg[ds_config].num_workers = 0

            if hasattr(self._model, "eval"):
                self._model.eval()  # Set to evaluation mode

        finally:
            # Restore original environment variables
            for key, original_value in original_env.items():
                if original_value is not None:
                    os.environ[key] = original_value
                elif key in os.environ:
                    del os.environ[key]

        _model_cache[cache_key] = self._model
        logger.info(
            f"Loaded Parakeet TDT model: {self.model_name} on {self.device}"
        )

    except ImportError as e:
        raise ImportError(
            "NVIDIA NeMo not installed. Run: pip install nemo_toolkit[asr]"
        ) from e
    except Exception as e:
        logger.error(f"Failed to load Parakeet TDT model {self.model_name}: {e}")
        raise
calculate_match_score
calculate_match_score(transcription, reference)

Calculate similarity score with Parakeet-specific weights. Parakeet produces longer, more detailed transcriptions, so we favor partial matches.

Source code in mkv_episode_matcher/asr_models.py
def calculate_match_score(self, transcription: str, reference: str) -> float:
    """
    Calculate similarity score with Parakeet-specific weights.
    Parakeet produces longer, more detailed transcriptions, so we favor partial matches.
    """
    # Parakeet weights: Boost partial_ratio
    token_weight = 0.4
    partial_weight = 0.6

    # Additional boost for very detailed transcriptions
    length_ratio = len(transcription) / max(len(reference), 1)
    if length_ratio > 2.0:  # Much longer transcription
        partial_weight = 0.8
        token_weight = 0.2

    score = (
        fuzz.token_sort_ratio(transcription, reference) * token_weight
        + fuzz.partial_ratio(transcription, reference) * partial_weight
    ) / 100.0

    return score
transcribe
transcribe(audio_path)

Transcribe audio using Parakeet with preprocessing and text normalization.

PARAMETER DESCRIPTION
audio_path

Path to audio file

TYPE: str | Path

RETURNS DESCRIPTION
dict

Dictionary with 'text' and 'segments' from Parakeet

Source code in mkv_episode_matcher/asr_models.py
def transcribe(self, audio_path: str | Path) -> dict:
    """
    Transcribe audio using Parakeet with preprocessing and text normalization.

    Args:
        audio_path: Path to audio file

    Returns:
        Dictionary with 'text' and 'segments' from Parakeet
    """
    if not self.is_loaded:
        self.load()

    preprocessed_audio = None
    try:
        logger.debug(f"Starting Parakeet transcription for {audio_path}")

        # Preprocess audio for optimal Parakeet performance
        preprocessed_audio = self._preprocess_audio(audio_path)

        # Configure NeMo model settings to reduce warnings
        old_env_vars = {}
        try:
            # Set environment variables to reduce NeMo warnings
            env_settings = {
                "CUDA_LAUNCH_BLOCKING": "0",
                "NEMO_DISABLE_TRAINING_LOGS": "1",
            }

            for key, value in env_settings.items():
                old_env_vars[key] = os.environ.get(key)
                os.environ[key] = value

            # Parakeet expects list of file paths
            result = self._model.transcribe([preprocessed_audio])

        finally:
            # Restore original environment variables
            for key, old_value in old_env_vars.items():
                if old_value is not None:
                    os.environ[key] = old_value
                elif key in os.environ:
                    del os.environ[key]

        logger.debug(f"Parakeet raw result: {result}, type: {type(result)}")

        # Extract text from result
        raw_text = ""
        if isinstance(result, list) and len(result) > 0:
            if hasattr(result[0], "text"):
                raw_text = result[0].text
            elif isinstance(result[0], str):
                raw_text = result[0]
            else:
                raw_text = str(result[0])
        else:
            logger.warning(f"Unexpected Parakeet result format: {result}")
            raw_text = ""

        # Clean and normalize the transcription
        cleaned_text = self._clean_transcription_text(raw_text)

        logger.debug(f"Raw transcription: '{raw_text}'")
        logger.debug(f"Cleaned transcription: '{cleaned_text}'")

        return {
            "text": cleaned_text,
            "raw_text": raw_text,
            "segments": [],
            "language": "en",
        }

    except Exception as e:
        logger.error(
            f"Parakeet transcription failed for {audio_path}: {type(e).__name__}: {e}"
        )
        import traceback

        traceback.print_exc()
        # Return empty result instead of raising to allow fallback
        return {"text": "", "raw_text": "", "segments": [], "language": "en"}
    finally:
        # Clean up preprocessed audio file
        if preprocessed_audio and preprocessed_audio != str(audio_path):
            try:
                Path(preprocessed_audio).unlink(missing_ok=True)
            except Exception as e:
                logger.debug(f"Failed to clean up preprocessed audio: {e}")
unload
unload()

Unload model to free memory.

Source code in mkv_episode_matcher/asr_models.py
def unload(self):
    """Unload model to free memory."""
    self._model = None

ParakeetCTCModel

ParakeetCTCModel(
    model_name="nvidia/parakeet-ctc-0.6b", device=None
)

Bases: ParakeetTDTModel

NVIDIA Parakeet CTC ASR model implementation.

This uses the CTC decoder which is more stable and robust on various hardware than the TDT version, though potentially slightly less accurate.

Initialize Parakeet CTC model.

PARAMETER DESCRIPTION
model_name

Parakeet model identifier (default: nvidia/parakeet-ctc-0.6b)

TYPE: str DEFAULT: 'nvidia/parakeet-ctc-0.6b'

device

Device to run on

TYPE: str | None DEFAULT: None

Source code in mkv_episode_matcher/asr_models.py
def __init__(
    self, model_name: str = "nvidia/parakeet-ctc-0.6b", device: str | None = None
):
    """
    Initialize Parakeet CTC model.

    Args:
        model_name: Parakeet model identifier (default: nvidia/parakeet-ctc-0.6b)
        device: Device to run on
    """
    # Ensure we use a CTC-compatible model name if not specified
    # But we trust the user input if provided.
    super().__init__(model_name, device)
Attributes
is_loaded property
is_loaded

Check if model is loaded.

Functions
load
load()

Load Parakeet CTC model with caching.

Source code in mkv_episode_matcher/asr_models.py
def load(self):
    """Load Parakeet CTC model with caching."""
    # We override load simply to use a different cache key if needed, or we can just reuse parent load
    # reusing parent load is fine as it uses self.model_name in cache key.
    # But we need to ensure the logging says CTC.
    super().load()
transcribe
transcribe(audio_path)

Transcribe audio using Parakeet with preprocessing and text normalization.

PARAMETER DESCRIPTION
audio_path

Path to audio file

TYPE: str | Path

RETURNS DESCRIPTION
dict

Dictionary with 'text' and 'segments' from Parakeet

Source code in mkv_episode_matcher/asr_models.py
def transcribe(self, audio_path: str | Path) -> dict:
    """
    Transcribe audio using Parakeet with preprocessing and text normalization.

    Args:
        audio_path: Path to audio file

    Returns:
        Dictionary with 'text' and 'segments' from Parakeet
    """
    if not self.is_loaded:
        self.load()

    preprocessed_audio = None
    try:
        logger.debug(f"Starting Parakeet transcription for {audio_path}")

        # Preprocess audio for optimal Parakeet performance
        preprocessed_audio = self._preprocess_audio(audio_path)

        # Configure NeMo model settings to reduce warnings
        old_env_vars = {}
        try:
            # Set environment variables to reduce NeMo warnings
            env_settings = {
                "CUDA_LAUNCH_BLOCKING": "0",
                "NEMO_DISABLE_TRAINING_LOGS": "1",
            }

            for key, value in env_settings.items():
                old_env_vars[key] = os.environ.get(key)
                os.environ[key] = value

            # Parakeet expects list of file paths
            result = self._model.transcribe([preprocessed_audio])

        finally:
            # Restore original environment variables
            for key, old_value in old_env_vars.items():
                if old_value is not None:
                    os.environ[key] = old_value
                elif key in os.environ:
                    del os.environ[key]

        logger.debug(f"Parakeet raw result: {result}, type: {type(result)}")

        # Extract text from result
        raw_text = ""
        if isinstance(result, list) and len(result) > 0:
            if hasattr(result[0], "text"):
                raw_text = result[0].text
            elif isinstance(result[0], str):
                raw_text = result[0]
            else:
                raw_text = str(result[0])
        else:
            logger.warning(f"Unexpected Parakeet result format: {result}")
            raw_text = ""

        # Clean and normalize the transcription
        cleaned_text = self._clean_transcription_text(raw_text)

        logger.debug(f"Raw transcription: '{raw_text}'")
        logger.debug(f"Cleaned transcription: '{cleaned_text}'")

        return {
            "text": cleaned_text,
            "raw_text": raw_text,
            "segments": [],
            "language": "en",
        }

    except Exception as e:
        logger.error(
            f"Parakeet transcription failed for {audio_path}: {type(e).__name__}: {e}"
        )
        import traceback

        traceback.print_exc()
        # Return empty result instead of raising to allow fallback
        return {"text": "", "raw_text": "", "segments": [], "language": "en"}
    finally:
        # Clean up preprocessed audio file
        if preprocessed_audio and preprocessed_audio != str(audio_path):
            try:
                Path(preprocessed_audio).unlink(missing_ok=True)
            except Exception as e:
                logger.debug(f"Failed to clean up preprocessed audio: {e}")
calculate_match_score
calculate_match_score(transcription, reference)

Calculate similarity score with Parakeet-specific weights. Parakeet produces longer, more detailed transcriptions, so we favor partial matches.

Source code in mkv_episode_matcher/asr_models.py
def calculate_match_score(self, transcription: str, reference: str) -> float:
    """
    Calculate similarity score with Parakeet-specific weights.
    Parakeet produces longer, more detailed transcriptions, so we favor partial matches.
    """
    # Parakeet weights: Boost partial_ratio
    token_weight = 0.4
    partial_weight = 0.6

    # Additional boost for very detailed transcriptions
    length_ratio = len(transcription) / max(len(reference), 1)
    if length_ratio > 2.0:  # Much longer transcription
        partial_weight = 0.8
        token_weight = 0.2

    score = (
        fuzz.token_sort_ratio(transcription, reference) * token_weight
        + fuzz.partial_ratio(transcription, reference) * partial_weight
    ) / 100.0

    return score
unload
unload()

Unload model to free memory.

Source code in mkv_episode_matcher/asr_models.py
def unload(self):
    """Unload model to free memory."""
    self._model = None

Functions

create_asr_model

create_asr_model(model_config)

Factory function to create ASR models from configuration.

PARAMETER DESCRIPTION
model_config

Dictionary with 'type' and 'name' keys

TYPE: dict

RETURNS DESCRIPTION
ASRModel

Configured ASRModel instance

Example

model_config = {"type": "parakeet", "name": "nvidia/parakeet-ctc-0.6b"} model = create_asr_model(model_config)

Source code in mkv_episode_matcher/asr_models.py
def create_asr_model(model_config: dict) -> ASRModel:
    """
    Factory function to create ASR models from configuration.

    Args:
        model_config: Dictionary with 'type' and 'name' keys

    Returns:
        Configured ASRModel instance

    Example:
        model_config = {"type": "parakeet", "name": "nvidia/parakeet-ctc-0.6b"}
        model = create_asr_model(model_config)
    """
    model_type = model_config.get("type", "").lower()
    model_name = model_config.get("name", "")
    device = model_config.get("device")

    if model_type == "parakeet":
        # Always use the specific working model
        if not model_name:
            model_name = "nvidia/parakeet-ctc-0.6b"
        return ParakeetCTCModel(model_name, device)
    else:
        raise ValueError(
            f"Unsupported model type: {model_type}. Only 'parakeet' is supported."
        )

get_cached_model

get_cached_model(model_config)

Get a cached model instance, creating it if necessary.

PARAMETER DESCRIPTION
model_config

Dictionary with model configuration

TYPE: dict

RETURNS DESCRIPTION
ASRModel

ASRModel instance (loaded and ready for use)

Source code in mkv_episode_matcher/asr_models.py
def get_cached_model(model_config: dict) -> ASRModel:
    """
    Get a cached model instance, creating it if necessary.

    Args:
        model_config: Dictionary with model configuration

    Returns:
        ASRModel instance (loaded and ready for use)
    """
    cache_key = f"{model_config.get('type', '')}_{model_config.get('name', '')}_{model_config.get('device', 'auto')}"

    if cache_key not in _model_cache:
        model = create_asr_model(model_config)
        model.load()  # Load immediately for caching
        _model_cache[cache_key] = model

    return _model_cache[cache_key]

clear_model_cache

clear_model_cache()

Clear all cached models to free memory.

Source code in mkv_episode_matcher/asr_models.py
def clear_model_cache():
    """Clear all cached models to free memory."""
    global _model_cache
    for model in _model_cache.values():
        if hasattr(model, "unload"):
            model.unload()
    _model_cache.clear()
    logger.info("Cleared ASR model cache")

list_available_models

list_available_models()

List available model types and their requirements.

RETURNS DESCRIPTION
dict

Dictionary with model types and their availability status

Source code in mkv_episode_matcher/asr_models.py
def list_available_models() -> dict:
    """
    List available model types and their requirements.

    Returns:
        Dictionary with model types and their availability status
    """
    availability = {}

    # Check Parakeet availability
    try:
        import nemo.collections.asr  # noqa: F401

        availability["parakeet"] = {
            "available": True,
            "models": ["nvidia/parakeet-ctc-0.6b"],
        }
    except ImportError:
        availability["parakeet"] = {
            "available": False,
            "error": "NVIDIA NeMo not installed",
        }

    return availability

mkv_episode_matcher.subtitle_utils

Functions

generate_subtitle_patterns

generate_subtitle_patterns(series_name, season, episode)

Generate various common subtitle filename patterns.

PARAMETER DESCRIPTION
series_name

Name of the series

TYPE: str

season

Season number

TYPE: int

episode

Episode number

TYPE: int

RETURNS DESCRIPTION
list[str]

List[str]: List of possible subtitle filenames

Source code in mkv_episode_matcher/subtitle_utils.py
def generate_subtitle_patterns(
    series_name: str, season: int, episode: int
) -> list[str]:
    """
    Generate various common subtitle filename patterns.

    Args:
        series_name (str): Name of the series
        season (int): Season number
        episode (int): Episode number

    Returns:
        List[str]: List of possible subtitle filenames
    """
    patterns = [
        # Standard format: "Show Name - S01E02.srt"
        f"{series_name} - S{season:02d}E{episode:02d}.srt",
        # Season x Episode format: "Show Name - 1x02.srt"
        f"{series_name} - {season}x{episode:02d}.srt",
        # Separate season/episode: "Show Name - Season 1 Episode 02.srt"
        f"{series_name} - Season {season} Episode {episode:02d}.srt",
        # Compact format: "ShowName.S01E02.srt"
        f"{series_name.replace(' ', '')}.S{season:02d}E{episode:02d}.srt",
        # Numbered format: "Show Name 102.srt"
        f"{series_name} {season:01d}{episode:02d}.srt",
        # Dot format: "Show.Name.1x02.srt"
        f"{series_name.replace(' ', '.')}.{season}x{episode:02d}.srt",
        # Underscore format: "Show_Name_S01E02.srt"
        f"{series_name.replace(' ', '_')}_S{season:02d}E{episode:02d}.srt",
    ]

    return patterns

find_existing_subtitle

find_existing_subtitle(
    series_cache_dir, series_name, season, episode
)

Check for existing subtitle files in various naming formats.

PARAMETER DESCRIPTION
series_cache_dir

Directory containing subtitle files

TYPE: str

series_name

Name of the series

TYPE: str

season

Season number

TYPE: int

episode

Episode number

TYPE: int

RETURNS DESCRIPTION
str | None

Optional[str]: Path to existing subtitle file if found, None otherwise

Source code in mkv_episode_matcher/subtitle_utils.py
def find_existing_subtitle(
    series_cache_dir: str, series_name: str, season: int, episode: int
) -> str | None:
    """
    Check for existing subtitle files in various naming formats.

    Args:
        series_cache_dir (str): Directory containing subtitle files
        series_name (str): Name of the series
        season (int): Season number
        episode (int): Episode number

    Returns:
        Optional[str]: Path to existing subtitle file if found, None otherwise
    """
    patterns = generate_subtitle_patterns(series_name, season, episode)

    for pattern in patterns:
        filepath = Path(series_cache_dir) / pattern
        if filepath.exists():
            return filepath

    return None

sanitize_filename

sanitize_filename(filename)

Sanitize filename by removing/replacing invalid characters.

PARAMETER DESCRIPTION
filename

Original filename

TYPE: str

RETURNS DESCRIPTION
str

Sanitized filename

TYPE: str

Source code in mkv_episode_matcher/subtitle_utils.py
def sanitize_filename(filename: str) -> str:
    """
    Sanitize filename by removing/replacing invalid characters.

    Args:
        filename (str): Original filename

    Returns:
        str: Sanitized filename
    """
    # Replace problematic characters
    filename = filename.replace(":", " -")
    filename = filename.replace("/", "-")
    filename = filename.replace("\\", "-")

    # Remove any other invalid characters
    filename = re.sub(r'[<>:"/\\|?*]', "", filename)

    return filename.strip()

TMDB Client

mkv_episode_matcher.tmdb_client

Classes

RateLimitedRequest

RateLimitedRequest(rate_limit=30, period=1)

A class that represents a rate-limited request object.

ATTRIBUTE DESCRIPTION
rate_limit

Maximum number of requests allowed per period.

TYPE: int

period

Period in seconds.

TYPE: int

requests_made

Counter for requests made.

TYPE: int

start_time

Start time of the current period.

TYPE: float

lock

Lock for synchronization.

TYPE: Lock

Source code in mkv_episode_matcher/tmdb_client.py
def __init__(self, rate_limit=30, period=1):
    self.rate_limit = rate_limit
    self.period = period
    self.requests_made = 0
    self.start_time = time.time()
    self.lock = Lock()
Functions
get
get(url)

Sends a rate-limited GET request to the specified URL.

PARAMETER DESCRIPTION
url

The URL to send the request to.

TYPE: str

RETURNS DESCRIPTION
Response

The response object returned by the request.

Source code in mkv_episode_matcher/tmdb_client.py
def get(self, url):
    """
    Sends a rate-limited GET request to the specified URL.

    Args:
        url (str): The URL to send the request to.

    Returns:
        Response: The response object returned by the request.
    """
    with self.lock:
        if self.requests_made >= self.rate_limit:
            sleep_time = self.period - (time.time() - self.start_time)
            if sleep_time > 0:
                time.sleep(sleep_time)
            self.requests_made = 0
            self.start_time = time.time()

        self.requests_made += 1

    response = requests.get(url, timeout=30)
    return response

Functions

retry_network_operation

retry_network_operation(max_retries=3, base_delay=1.0)

Decorator for retrying network operations.

Source code in mkv_episode_matcher/tmdb_client.py
def retry_network_operation(
    max_retries: int = 3, base_delay: float = 1.0
) -> Callable[[F], F]:
    """Decorator for retrying network operations."""

    def decorator(func: F) -> F:
        @wraps(func)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            last_exception = None
            delay = base_delay

            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except (requests.RequestException, ConnectionError, TimeoutError) as e:
                    last_exception = e
                    if attempt == max_retries:
                        logger.error(
                            f"Max retries ({max_retries}) exceeded for {func.__name__}: {e}"
                        )
                        raise e

                    logger.warning(
                        f"Network retry {attempt + 1}/{max_retries + 1} for {func.__name__}: {e}"
                    )
                    time.sleep(delay)
                    delay = min(delay * 2, 30)  # Cap at 30 seconds

            raise last_exception

        return wrapper  # type: ignore

    return decorator

fetch_show_id

fetch_show_id(show_name)

Fetch the TMDb ID for a given show name.

PARAMETER DESCRIPTION
show_name

The name of the show.

TYPE: str

RETURNS DESCRIPTION
str

The TMDb ID of the show, or None if not found.

TYPE: str | None

Source code in mkv_episode_matcher/tmdb_client.py
@retry_network_operation(max_retries=3, base_delay=1.0)
def fetch_show_id(show_name: str) -> str | None:
    """
    Fetch the TMDb ID for a given show name.

    Args:
        show_name (str): The name of the show.

    Returns:
        str: The TMDb ID of the show, or None if not found.
    """
    config = get_config_manager().load()
    tmdb_api_key = config.tmdb_api_key
    url = f"https://api.themoviedb.org/3/search/tv?query={show_name}&api_key={tmdb_api_key}"
    response = requests.get(url)
    if response.status_code == 200:
        results = response.json().get("results", [])
        if results:
            return str(results[0]["id"])
    return None

fetch_show_details

fetch_show_details(show_id)

Fetch show details from TMDB by ID.

PARAMETER DESCRIPTION
show_id

The TMDB show ID

TYPE: int

RETURNS DESCRIPTION
dict

Show details including 'name', 'number_of_seasons', etc.

TYPE: dict | None

None

If request fails or API key not configured

TYPE: dict | None

Source code in mkv_episode_matcher/tmdb_client.py
@retry_network_operation(max_retries=3, base_delay=1.0)
def fetch_show_details(show_id: int) -> dict | None:
    """
    Fetch show details from TMDB by ID.

    Args:
        show_id: The TMDB show ID

    Returns:
        dict: Show details including 'name', 'number_of_seasons', etc.
        None: If request fails or API key not configured
    """
    config = get_config_manager().load()
    if not config.tmdb_api_key:
        logger.warning("TMDB API key not configured")
        return None

    url = f"https://api.themoviedb.org/3/tv/{show_id}?api_key={config.tmdb_api_key}"

    try:
        response = requests.get(url, timeout=30)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        logger.error(f"Failed to fetch show details for ID {show_id}: {e}")
        return None

fetch_season_details

fetch_season_details(show_id, season_number)

Fetch the total number of episodes for a given show and season from the TMDb API.

PARAMETER DESCRIPTION
show_id

The ID of the show on TMDb.

TYPE: str

season_number

The season number to fetch details for.

TYPE: int

RETURNS DESCRIPTION
int

The total number of episodes in the season, or 0 if the API request failed.

TYPE: int

Source code in mkv_episode_matcher/tmdb_client.py
@retry_network_operation(max_retries=3, base_delay=1.0)
def fetch_season_details(show_id: str, season_number: int) -> int:
    """
    Fetch the total number of episodes for a given show and season from the TMDb API.

    Args:
        show_id (str): The ID of the show on TMDb.
        season_number (int): The season number to fetch details for.

    Returns:
        int: The total number of episodes in the season, or 0 if the API request failed.
    """
    logger.info(f"Fetching season details for Season {season_number}...")
    config = get_config_manager().load()
    tmdb_api_key = config.tmdb_api_key
    url = f"https://api.themoviedb.org/3/tv/{show_id}/season/{season_number}?api_key={tmdb_api_key}"
    try:
        response = requests.get(url, timeout=30)
        response.raise_for_status()
        season_data = response.json()
        total_episodes = len(season_data.get("episodes", []))
        return total_episodes
    except requests.exceptions.RequestException as e:
        logger.error(f"Failed to fetch season details for Season {season_number}: {e}")
        return 0
    except KeyError:
        logger.error(
            f"Missing 'episodes' key in response JSON data for Season {season_number}"
        )
        return 0

get_number_of_seasons

get_number_of_seasons(show_id)

Retrieves the number of seasons for a given TV show from the TMDB API.

Parameters: - show_id (int): The ID of the TV show.

Returns: - num_seasons (int): The number of seasons for the TV show.

Raises: - requests.HTTPError: If there is an error while making the API request.

Source code in mkv_episode_matcher/tmdb_client.py
@retry_network_operation(max_retries=3, base_delay=1.0)
def get_number_of_seasons(show_id: str) -> int:
    """
    Retrieves the number of seasons for a given TV show from the TMDB API.

    Parameters:
    - show_id (int): The ID of the TV show.

    Returns:
    - num_seasons (int): The number of seasons for the TV show.

    Raises:
    - requests.HTTPError: If there is an error while making the API request.
    """
    config = get_config_manager().load()
    tmdb_api_key = config.tmdb_api_key
    url = f"https://api.themoviedb.org/3/tv/{show_id}?api_key={tmdb_api_key}"
    response = requests.get(url, timeout=30)
    response.raise_for_status()
    show_data = response.json()
    num_seasons = show_data.get("number_of_seasons", 0)
    logger.info(f"Found {num_seasons} seasons")
    return num_seasons

Utilities

mkv_episode_matcher.utils

Functions

normalize_path

normalize_path(path_str)

Normalize a path string to handle cross-platform path issues. Properly handles trailing slashes and backslashes in both Windows and Unix paths. Also strips surrounding quotes that might be present in command line arguments.

PARAMETER DESCRIPTION
path_str

The path string to normalize

TYPE: str

RETURNS DESCRIPTION

pathlib.Path: A normalized Path object

Source code in mkv_episode_matcher/utils.py
def normalize_path(path_str):
    """
    Normalize a path string to handle cross-platform path issues.
    Properly handles trailing slashes and backslashes in both Windows and Unix paths.
    Also strips surrounding quotes that might be present in command line arguments.

    Args:
        path_str (str): The path string to normalize

    Returns:
        pathlib.Path: A normalized Path object
    """
    # Convert to string if it's a Path object
    if isinstance(path_str, Path):
        path_str = str(path_str)

    # Strip surrounding quotes (both single and double)
    path_str = path_str.strip().strip('"').strip("'")

    # Remove trailing slashes or backslashes
    path_str = path_str.rstrip("/").rstrip("\\")

    # Handle Windows paths on non-Windows platforms
    if os.name != "nt" and "\\" in path_str and ":" in path_str[:2]:
        # This looks like a Windows path on a non-Windows system
        # Extract the last component which should be the directory/file name
        components = path_str.split("\\")
        return Path(components[-1])

    return Path(path_str)

get_valid_seasons

get_valid_seasons(show_dir)

Get all season directories that contain MKV files.

PARAMETER DESCRIPTION
show_dir

Base directory for the TV show

TYPE: str

RETURNS DESCRIPTION
list

List of paths to valid season directories

Source code in mkv_episode_matcher/utils.py
def get_valid_seasons(show_dir):
    """
    Get all season directories that contain MKV files.

    Args:
        show_dir (str): Base directory for the TV show

    Returns:
        list: List of paths to valid season directories
    """
    # Get all season directories
    show_path = normalize_path(show_dir)
    season_paths = [str(show_path / d.name) for d in show_path.iterdir() if d.is_dir()]

    # Filter seasons to only include those with .mkv files
    valid_season_paths = []
    for season_path in season_paths:
        season_path_obj = Path(season_path)
        mkv_files = [f for f in season_path_obj.iterdir() if f.name.endswith(".mkv")]
        if mkv_files:
            valid_season_paths.append(season_path)

    if not valid_season_paths:
        logger.warning(
            f"No seasons with .mkv files found in show '{normalize_path(show_dir).name}'"
        )
    else:
        logger.info(
            f"Found {len(valid_season_paths)} seasons with .mkv files in '{normalize_path(show_dir).name}'"
        )

    return valid_season_paths

check_filename

check_filename(filename)

Check if the filename is in the correct format (S01E02).

PARAMETER DESCRIPTION
filename

The filename to check.

TYPE: str or Path

RETURNS DESCRIPTION
bool

True if the filename matches the expected pattern.

Source code in mkv_episode_matcher/utils.py
def check_filename(filename):
    """
    Check if the filename is in the correct format (S01E02).

    Args:
        filename (str or Path): The filename to check.

    Returns:
        bool: True if the filename matches the expected pattern.
    """
    # Convert Path object to string if needed
    if isinstance(filename, Path):
        filename = str(filename)
    # Check if the filename matches the expected format
    match = re.search(r".*S\d+E\d+", filename)
    return bool(match)

scramble_filename

scramble_filename(original_file_path, file_number)

Scrambles the filename of the given file path by adding the series title and file number.

PARAMETER DESCRIPTION
original_file_path

The original file path.

TYPE: str

file_number

The file number to be added to the filename.

TYPE: int

RETURNS DESCRIPTION

None

Source code in mkv_episode_matcher/utils.py
def scramble_filename(original_file_path, file_number):
    """
    Scrambles the filename of the given file path by adding the series title and file number.

    Args:
        original_file_path (str): The original file path.
        file_number (int): The file number to be added to the filename.

    Returns:
        None
    """
    logger.info(f"Scrambling {original_file_path}")
    series_title = normalize_path(original_file_path).parent.parent.name
    original_file_name = Path(original_file_path).name
    extension = Path(original_file_path).suffix
    new_file_name = f"{series_title} - {file_number:03d}{extension}"
    new_file_path = Path(original_file_path).parent / new_file_name
    if not new_file_path.exists():
        logger.info(f"Renaming {original_file_name} -> {new_file_name}")
        Path(original_file_path).rename(new_file_path)

rename_episode_file

rename_episode_file(original_file_path, new_filename)

Rename an episode file with a standardized naming convention.

PARAMETER DESCRIPTION
original_file_path

The original file path of the episode.

TYPE: str or Path

new_filename

The new filename including season/episode info.

TYPE: str or Path

RETURNS DESCRIPTION
Path

Path to the renamed file, or None if rename failed.

Source code in mkv_episode_matcher/utils.py
def rename_episode_file(original_file_path, new_filename):
    """
    Rename an episode file with a standardized naming convention.

    Args:
        original_file_path (str or Path): The original file path of the episode.
        new_filename (str or Path): The new filename including season/episode info.

    Returns:
        Path: Path to the renamed file, or None if rename failed.
    """
    original_dir = Path(original_file_path).parent
    new_file_path = original_dir / new_filename

    # Check if new filepath already exists
    if new_file_path.exists():
        logger.warning(f"File already exists: {new_filename}")

        # Add numeric suffix if file exists
        base, ext = Path(new_filename).stem, Path(new_filename).suffix
        suffix = 2
        while True:
            new_filename = f"{base}_{suffix}{ext}"
            new_file_path = original_dir / new_filename
            if not new_file_path.exists():
                break
            suffix += 1

    try:
        Path(original_file_path).rename(new_file_path)
        logger.info(f"Renamed {Path(original_file_path).name} -> {new_filename}")
        return new_file_path
    except OSError as e:
        logger.error(f"Failed to rename file: {e}")
        return None
    except FileExistsError as e:
        logger.error(f"Failed to rename file: {e}")
        return None

get_subtitles

get_subtitles(show_id, seasons, config=None, max_retries=3)

Retrieves and saves subtitles for a given TV show and seasons.

PARAMETER DESCRIPTION
show_id

The ID of the TV show.

TYPE: int

seasons

A set of season numbers for which subtitles should be retrieved.

TYPE: Set[int]

config

Preloaded configuration.

TYPE: Config object DEFAULT: None

max_retries

Number of times to retry subtitle download on OpenSubtitlesException. Defaults to 3.

TYPE: int DEFAULT: 3

Source code in mkv_episode_matcher/utils.py
def get_subtitles(show_id, seasons: set[int], config=None, max_retries=3):
    """
    Retrieves and saves subtitles for a given TV show and seasons.

    Args:
        show_id (int): The ID of the TV show.
        seasons (Set[int]): A set of season numbers for which subtitles should be retrieved.
        config (Config object, optional): Preloaded configuration.
        max_retries (int, optional): Number of times to retry subtitle download on OpenSubtitlesException. Defaults to 3.
    """
    if config is None:
        config = get_config_manager().load()
    show_dir = config.show_dir
    series_name = sanitize_filename(normalize_path(show_dir).name)
    tmdb_api_key = config.tmdb_api_key
    open_subtitles_api_key = config.open_subtitles_api_key
    open_subtitles_user_agent = config.open_subtitles_user_agent
    open_subtitles_username = config.open_subtitles_username
    open_subtitles_password = config.open_subtitles_password

    if not all([
        show_dir,
        tmdb_api_key,
        open_subtitles_api_key,
        open_subtitles_user_agent,
        open_subtitles_username,
        open_subtitles_password,
    ]):
        logger.error("Missing configuration settings. Please run the setup script.")
        return

    try:
        subtitles = OpenSubtitles(open_subtitles_user_agent, open_subtitles_api_key)
        subtitles.login(open_subtitles_username, open_subtitles_password)
    except Exception as e:
        logger.error(f"Failed to log in to OpenSubtitles: {e}")
        return

    for season in seasons:
        episodes = fetch_season_details(show_id, season)
        logger.info(f"Found {episodes} episodes in Season {season}")

        for episode in range(1, episodes + 1):
            logger.info(f"Processing Season {season}, Episode {episode}...")

            series_cache_dir = config.cache_dir / "data" / series_name
            os.makedirs(series_cache_dir, exist_ok=True)

            # Check for existing subtitle in any supported format
            existing_subtitle = find_existing_subtitle(
                series_cache_dir, series_name, season, episode
            )

            if existing_subtitle:
                logger.info(f"Subtitle already exists: {Path(existing_subtitle).name}")
                continue

            # Default to standard format for new downloads
            srt_filepath = str(
                series_cache_dir / f"{series_name} - S{season:02d}E{episode:02d}.srt"
            )

            # get the episode info from TMDB
            url = f"https://api.themoviedb.org/3/tv/{show_id}/season/{season}/episode/{episode}?api_key={tmdb_api_key}"
            response = requests.get(url)
            response.raise_for_status()
            episode_data = response.json()
            episode_id = episode_data["id"]

            # search for the subtitle
            response = subtitles.search(tmdb_id=episode_id, languages="en")
            if len(response.data) == 0:
                logger.warning(
                    f"No subtitles found for {series_name} - S{season:02d}E{episode:02d}"
                )
                continue

            for subtitle in response.data:
                subtitle_dict = subtitle.to_dict()
                # Remove special characters and convert to uppercase
                filename_clean = re.sub(
                    r"\\W+", " ", subtitle_dict["file_name"]
                ).upper()
                if f"E{episode:02d}" in filename_clean:
                    logger.info(f"Original filename: {subtitle_dict['file_name']}")
                    retry_count = 0
                    while retry_count < max_retries:
                        try:
                            srt_file = subtitles.download_and_save(subtitle)
                            shutil.move(srt_file, srt_filepath)
                            logger.info(f"Subtitle saved to {srt_filepath}")
                            break
                        except OpenSubtitlesException as e:
                            retry_count += 1
                            logger.error(
                                f"OpenSubtitlesException (attempt {retry_count}): {e}"
                            )
                            console.print(
                                f"[red]OpenSubtitlesException (attempt {retry_count}): {e}[/red]"
                            )
                            if retry_count >= max_retries:
                                user_input = input(
                                    "Would you like to continue matching? (y/n): "
                                )
                                if user_input.strip().lower() != "y":
                                    logger.info(
                                        "User chose to stop matching due to the error."
                                    )
                                    return
                                else:
                                    logger.info(
                                        "User chose to continue matching despite the error."
                                    )
                                    break
                        except Exception as e:
                            logger.error(f"Failed to download and save subtitle: {e}")
                            console.print(
                                f"[red]Failed to download and save subtitle: {e}[/red]"
                            )
                            user_input = input(
                                "Would you like to continue matching despite the error? (y/n): "
                            )
                            if user_input.strip().lower() != "y":
                                logger.info(
                                    "User chose to stop matching due to the error."
                                )
                                return
                            else:
                                logger.info(
                                    "User chose to continue matching despite the error."
                                )
                                break
                    else:
                        continue
                    break

process_reference_srt_files

process_reference_srt_files(series_name)

Process reference SRT files for a given series.

PARAMETER DESCRIPTION
series_name

The name of the series.

TYPE: str

RETURNS DESCRIPTION
dict

A dictionary containing the reference files where the keys are the MKV filenames and the values are the corresponding SRT texts.

Source code in mkv_episode_matcher/utils.py
@logger.catch
def process_reference_srt_files(series_name):
    """
    Process reference SRT files for a given series.

    Args:
        series_name (str): The name of the series.

    Returns:
        dict: A dictionary containing the reference files where the keys are the MKV filenames
              and the values are the corresponding SRT texts.
    """
    config = get_config_manager().load()

    reference_files = {}
    reference_dir = config.cache_dir / "data" / series_name

    for dirpath, _, filenames in os.walk(reference_dir):
        for filename in filenames:
            if filename.lower().endswith(".srt"):
                srt_file = Path(dirpath) / filename
                logger.info(f"Processing {srt_file}")
                srt_text = extract_srt_text(srt_file)
                season, episode = extract_season_episode(filename)
                mkv_filename = f"{series_name} - S{season:02}E{episode:02}.mkv"
                reference_files[mkv_filename] = srt_text

    return reference_files

extract_srt_text

extract_srt_text(filepath)

Extracts text content from an SRT file.

PARAMETER DESCRIPTION
filepath

Path to the SRT file.

TYPE: str

RETURNS DESCRIPTION
list

List of text lines from the SRT file.

Source code in mkv_episode_matcher/utils.py
def extract_srt_text(filepath):
    """
    Extracts text content from an SRT file.

    Args:
        filepath (str): Path to the SRT file.

    Returns:
        list: List of text lines from the SRT file.
    """
    # Read the file content
    with open(filepath) as f:
        content = f.read()

    # Split into subtitle blocks
    blocks = content.strip().split("\n\n")

    text_lines = []
    for block in blocks:
        lines = block.split("\n")
        if len(lines) < 3:
            continue

        # Skip index and timestamp, get all remaining lines as text
        text = " ".join(lines[2:])
        # Remove stage directions and tags
        text = re.sub(r"\[.*?\]|\<.*?\>", "", text)
        if text:
            text_lines.append(text)

    return text_lines

extract_season_episode

extract_season_episode(filename)

Extract season and episode numbers from filename with support for multiple formats.

PARAMETER DESCRIPTION
filename

Filename to parse

TYPE: str

RETURNS DESCRIPTION
tuple

(season_number, episode_number)

Source code in mkv_episode_matcher/utils.py
def extract_season_episode(filename):
    """
    Extract season and episode numbers from filename with support for multiple formats.

    Args:
        filename (str): Filename to parse

    Returns:
        tuple: (season_number, episode_number)
    """
    # List of patterns to try
    patterns = [
        r"S(\d+)E(\d+)",  # S01E01
        r"(\d+)x(\d+)",  # 1x01 or 01x01
        r"Season\s*(\d+).*?(\d+)",  # Season 1 - 01
    ]

    for pattern in patterns:
        match = re.search(pattern, filename, re.IGNORECASE)
        if match:
            return int(match.group(1)), int(match.group(2))

    return None, None

process_srt_files

process_srt_files(show_dir)

Process all SRT files in the given directory and its subdirectories.

PARAMETER DESCRIPTION
show_dir

The directory path where the SRT files are located.

TYPE: str

RETURNS DESCRIPTION
dict

A dictionary containing the SRT file paths as keys and their corresponding text content as values.

Source code in mkv_episode_matcher/utils.py
def process_srt_files(show_dir):
    """
    Process all SRT files in the given directory and its subdirectories.

    Args:
        show_dir (str): The directory path where the SRT files are located.

    Returns:
        dict: A dictionary containing the SRT file paths as keys and their corresponding text content as values.
    """
    srt_files = {}
    for dirpath, _, filenames in os.walk(show_dir):
        for filename in filenames:
            if filename.lower().endswith(".srt"):
                srt_file = Path(dirpath) / filename
                logger.info(f"Processing {srt_file}")
                srt_text = extract_srt_text(srt_file)
                srt_files[srt_file] = srt_text
    return srt_files

compare_and_rename_files

compare_and_rename_files(
    srt_files, reference_files, dry_run=False
)

Compare the srt files with the reference files and rename the matching mkv files.

PARAMETER DESCRIPTION
srt_files

A dictionary containing the srt files as keys and their contents as values.

TYPE: dict

reference_files

A dictionary containing the reference files as keys and their contents as values.

TYPE: dict

dry_run

If True, the function will only log the renaming actions without actually renaming the files. Defaults to False.

TYPE: bool DEFAULT: False

Source code in mkv_episode_matcher/utils.py
def compare_and_rename_files(srt_files, reference_files, dry_run=False):
    """
    Compare the srt files with the reference files and rename the matching mkv files.

    Args:
        srt_files (dict): A dictionary containing the srt files as keys and their contents as values.
        reference_files (dict): A dictionary containing the reference files as keys and their contents as values.
        dry_run (bool, optional): If True, the function will only log the renaming actions without actually renaming the files. Defaults to False.
    """
    logger.info(
        f"Comparing {len(srt_files)} srt files with {len(reference_files)} reference files"
    )
    for srt_text in srt_files.keys():
        parent_dir = Path(srt_text).parent.parent
        for reference in reference_files.keys():
            _season, _episode = extract_season_episode(reference)
            mkv_file = str(parent_dir / Path(srt_text).name.replace(".srt", ".mkv"))
            matching_lines = compare_text(
                reference_files[reference], srt_files[srt_text]
            )
            if matching_lines >= int(len(reference_files[reference]) * 0.1):
                logger.info(f"Matching lines: {matching_lines}")
                logger.info(f"Found matching file: {mkv_file} ->{reference}")
                new_filename = parent_dir / reference
                if not dry_run:
                    logger.info(f"Renaming {mkv_file} to {str(new_filename)}")
                    rename_episode_file(mkv_file, reference)

compare_text

compare_text(text1, text2)

Compare two lists of text lines and return the number of matching lines.

PARAMETER DESCRIPTION
text1

List of text lines from the first source.

TYPE: list

text2

List of text lines from the second source.

TYPE: list

RETURNS DESCRIPTION
int

Number of matching lines between the two sources.

Source code in mkv_episode_matcher/utils.py
def compare_text(text1, text2):
    """
    Compare two lists of text lines and return the number of matching lines.

    Args:
        text1 (list): List of text lines from the first source.
        text2 (list): List of text lines from the second source.

    Returns:
        int: Number of matching lines between the two sources.
    """
    # Flatten the list of text lines
    flat_text1 = [line for lines in text1 for line in lines]
    flat_text2 = [line for lines in text2 for line in lines]

    # Compare the two lists of text lines
    matching_lines = set(flat_text1).intersection(flat_text2)
    return len(matching_lines)