Skip to content

API Reference

This section contains the automatically generated API documentation for MKV Episode Matcher.

Core Modules

mkv_episode_matcher.episode_matcher

Classes

Functions

process_show

process_show(season=None, dry_run=False, get_subs=False)

Process the show using streaming speech recognition with OCR fallback.

Source code in mkv_episode_matcher/episode_matcher.py
def process_show(season=None, dry_run=False, get_subs=False):
    """Process the show using streaming speech recognition with OCR fallback."""
    config = get_config(CONFIG_FILE)
    show_dir = config.get("show_dir")
    show_name = clean_text(os.path.basename(show_dir))
    matcher = EpisodeMatcher(CACHE_DIR, show_name)

    season_paths = get_valid_seasons(show_dir)
    if not season_paths:
        logger.warning(f"No seasons with .mkv files found")
        return

    if season is not None:
        season_path = os.path.join(show_dir, f"Season {season}")
        if season_path not in season_paths:
            logger.warning(f"Season {season} has no .mkv files to process")
            return
        season_paths = [season_path]

    for season_path in season_paths:
        mkv_files = [f for f in glob.glob(os.path.join(season_path, "*.mkv"))
                    if not check_filename(f)]

        if not mkv_files:
            logger.info(f"No new files to process in {season_path}")
            continue

        season_num = int(re.search(r'Season (\d+)', season_path).group(1))
        temp_dir = Path(season_path) / "temp"
        ocr_dir = Path(season_path) / "ocr"
        temp_dir.mkdir(exist_ok=True)
        ocr_dir.mkdir(exist_ok=True)

        try:
            if get_subs:
                show_id = fetch_show_id(matcher.show_name)
                if show_id:
                    get_subtitles(show_id, seasons={season_num})

            unmatched_files = []
            for mkv_file in mkv_files:
                logger.info(f"Attempting speech recognition match for {mkv_file}")
                match = matcher.identify_episode(mkv_file, temp_dir, season_num)

                if match:
                    new_name = f"{matcher.show_name} - S{match['season']:02d}E{match['episode']:02d}.mkv"
                    new_path = os.path.join(season_path, new_name)

                    logger.info(f"Speech matched {os.path.basename(mkv_file)} to {new_name} "
                              f"(confidence: {match['confidence']:.2f})")

                    if not dry_run:
                        logger.info(f"Renaming {mkv_file} to {new_name}")
                        rename_episode_file(mkv_file, new_name)
                else:
                    logger.info(f"Speech recognition match failed for {mkv_file}, trying OCR")
                    unmatched_files.append(mkv_file)

            # OCR fallback for unmatched files
            if unmatched_files:
                logger.info(f"Attempting OCR matching for {len(unmatched_files)} unmatched files")
                convert_mkv_to_srt(season_path, unmatched_files)

                reference_text_dict = process_reference_srt_files(matcher.show_name)
                srt_text_dict = process_srt_files(str(ocr_dir))

                compare_and_rename_files(
                    srt_text_dict, 
                    reference_text_dict, 
                    dry_run=dry_run,
                )

        finally:
            if not dry_run:
                shutil.rmtree(temp_dir)
                cleanup_ocr_files(show_dir)

TMDB Client

mkv_episode_matcher.tmdb_client

Classes

RateLimitedRequest

RateLimitedRequest(rate_limit=30, period=1)

A class that represents a rate-limited request object.

ATTRIBUTE DESCRIPTION
rate_limit

Maximum number of requests allowed per period.

TYPE: int

period

Period in seconds.

TYPE: int

requests_made

Counter for requests made.

TYPE: int

start_time

Start time of the current period.

TYPE: float

lock

Lock for synchronization.

TYPE: Lock

Source code in mkv_episode_matcher/tmdb_client.py
def __init__(self, rate_limit=30, period=1):
    self.rate_limit = rate_limit
    self.period = period
    self.requests_made = 0
    self.start_time = time.time()
    self.lock = Lock()
Functions
get
get(url)

Sends a rate-limited GET request to the specified URL.

PARAMETER DESCRIPTION
url

The URL to send the request to.

TYPE: str

RETURNS DESCRIPTION
Response

The response object returned by the request.

Source code in mkv_episode_matcher/tmdb_client.py
def get(self, url):
    """
    Sends a rate-limited GET request to the specified URL.

    Args:
        url (str): The URL to send the request to.

    Returns:
        Response: The response object returned by the request.
    """
    with self.lock:
        if self.requests_made >= self.rate_limit:
            sleep_time = self.period - (time.time() - self.start_time)
            if sleep_time > 0:
                time.sleep(sleep_time)
            self.requests_made = 0
            self.start_time = time.time()

        self.requests_made += 1

    response = requests.get(url)
    return response

Functions

fetch_show_id

fetch_show_id(show_name)

Fetch the TMDb ID for a given show name.

PARAMETER DESCRIPTION
show_name

The name of the show.

TYPE: str

RETURNS DESCRIPTION
str

The TMDb ID of the show, or None if not found.

Source code in mkv_episode_matcher/tmdb_client.py
def fetch_show_id(show_name):
    """
    Fetch the TMDb ID for a given show name.

    Args:
        show_name (str): The name of the show.

    Returns:
        str: The TMDb ID of the show, or None if not found.
    """
    config = get_config(CONFIG_FILE)
    tmdb_api_key = config.get("tmdb_api_key")
    url = f"https://api.themoviedb.org/3/search/tv?query={show_name}&api_key={tmdb_api_key}"
    response = requests.get(url)
    if response.status_code == 200:
        results = response.json().get("results", [])
        if results:
            return str(results[0]["id"])
    return None

fetch_season_details

fetch_season_details(show_id, season_number)

Fetch the total number of episodes for a given show and season from the TMDb API.

PARAMETER DESCRIPTION
show_id

The ID of the show on TMDb.

TYPE: str

season_number

The season number to fetch details for.

TYPE: int

RETURNS DESCRIPTION
int

The total number of episodes in the season, or 0 if the API request failed.

Source code in mkv_episode_matcher/tmdb_client.py
def fetch_season_details(show_id, season_number):
    """
    Fetch the total number of episodes for a given show and season from the TMDb API.

    Args:
        show_id (str): The ID of the show on TMDb.
        season_number (int): The season number to fetch details for.

    Returns:
        int: The total number of episodes in the season, or 0 if the API request failed.
    """
    logger.info(f"Fetching season details for Season {season_number}...")
    config = get_config(CONFIG_FILE)
    tmdb_api_key = config.get("tmdb_api_key")
    url = f"https://api.themoviedb.org/3/tv/{show_id}/season/{season_number}?api_key={tmdb_api_key}"
    try:
        response = requests.get(url)
        response.raise_for_status()
        season_data = response.json()
        total_episodes = len(season_data.get("episodes", []))
        return total_episodes
    except requests.exceptions.RequestException as e:
        logger.error(f"Failed to fetch season details for Season {season_number}: {e}")
        return 0
    except KeyError:
        logger.error(
            f"Missing 'episodes' key in response JSON data for Season {season_number}"
        )
        return 0

get_number_of_seasons

get_number_of_seasons(show_id)

Retrieves the number of seasons for a given TV show from the TMDB API.

Parameters: - show_id (int): The ID of the TV show.

Returns: - num_seasons (int): The number of seasons for the TV show.

Raises: - requests.HTTPError: If there is an error while making the API request.

Source code in mkv_episode_matcher/tmdb_client.py
def get_number_of_seasons(show_id):
    """
    Retrieves the number of seasons for a given TV show from the TMDB API.

    Parameters:
    - show_id (int): The ID of the TV show.

    Returns:
    - num_seasons (int): The number of seasons for the TV show.

    Raises:
    - requests.HTTPError: If there is an error while making the API request.
    """
    config = get_config(CONFIG_FILE)
    tmdb_api_key = config.get("tmdb_api_key")
    url = f"https://api.themoviedb.org/3/tv/{show_id}?api_key={tmdb_api_key}"
    response = requests.get(url)
    response.raise_for_status()
    show_data = response.json()
    num_seasons = show_data.get("number_of_seasons", 0)
    logger.info(f"Found {num_seasons} seasons")
    return num_seasons

MKV Converter

mkv_episode_matcher.mkv_to_srt

Functions

check_if_processed

check_if_processed(filename)

Check if the file has already been processed (has SxxExx format)

PARAMETER DESCRIPTION
filename

Filename to check

TYPE: str

RETURNS DESCRIPTION
bool

True if file is already processed

TYPE: bool

Source code in mkv_episode_matcher/mkv_to_srt.py
def check_if_processed(filename: str) -> bool:
    """
    Check if the file has already been processed (has SxxExx format)

    Args:
        filename (str): Filename to check

    Returns:
        bool: True if file is already processed
    """
    import re
    match = re.search(r"S\d+E\d+", filename)
    return bool(match)

convert_mkv_to_sup

convert_mkv_to_sup(mkv_file, output_dir)

Convert an .mkv file to a .sup file using FFmpeg and pgs2srt.

PARAMETER DESCRIPTION
mkv_file

Path to the .mkv file.

TYPE: str

output_dir

Path to the directory where the .sup file will be saved.

TYPE: str

RETURNS DESCRIPTION
str

Path to the converted .sup file.

Source code in mkv_episode_matcher/mkv_to_srt.py
def convert_mkv_to_sup(mkv_file, output_dir):
    """
    Convert an .mkv file to a .sup file using FFmpeg and pgs2srt.

    Args:
        mkv_file (str): Path to the .mkv file.
        output_dir (str): Path to the directory where the .sup file will be saved.

    Returns:
        str: Path to the converted .sup file.
    """
    # Get the base name of the .mkv file without the extension
    base_name = os.path.splitext(os.path.basename(mkv_file))[0]

    # Construct the output .sup file path
    sup_file = os.path.join(output_dir, f"{base_name}.sup")
    if not os.path.exists(sup_file):
        logger.info(f"Processing {mkv_file} to {sup_file}")
        # FFmpeg command to convert .mkv to .sup
        ffmpeg_cmd = ["ffmpeg", "-i", mkv_file, "-map", "0:s:0", "-c", "copy", sup_file]
        try:
            subprocess.run(ffmpeg_cmd, check=True)
            logger.info(f"Converted {mkv_file} to {sup_file}")
        except subprocess.CalledProcessError as e:
            logger.error(f"Error converting {mkv_file}: {e}")
    else:
        logger.info(f"File {sup_file} already exists, skipping")
    return sup_file

perform_ocr

perform_ocr(sup_file_path)

Perform OCR on a .sup file and save the extracted text to a .srt file. Returns the path to the created SRT file.

Source code in mkv_episode_matcher/mkv_to_srt.py
@logger.catch
def perform_ocr(sup_file_path: str) -> Optional[str]:
    """
    Perform OCR on a .sup file and save the extracted text to a .srt file.
    Returns the path to the created SRT file.
    """
    # Get the base name of the .sup file without the extension
    base_name = os.path.splitext(os.path.basename(sup_file_path))[0]
    output_dir = os.path.dirname(sup_file_path)
    logger.info(f"Performing OCR on {sup_file_path}")

    # Construct the output .srt file path
    srt_file = os.path.join(output_dir, f"{base_name}.srt")

    if os.path.exists(srt_file):
        logger.info(f"SRT file {srt_file} already exists, skipping OCR")
        return srt_file

    # Load a PGS/SUP file.
    pgs = PGSReader(sup_file_path)

    # Set index
    i = 0

    # Complete subtitle track index
    si = 0

    tesseract_lang = "eng"
    tesseract_config = f"-c tessedit_char_blacklist=[] --psm 6 --oem {1}"

    config = get_config(CONFIG_FILE)
    tesseract_path = config.get("tesseract_path")
    logger.debug(f"Setting Teesseract Path to {tesseract_path}")
    pytesseract.pytesseract.tesseract_cmd = str(tesseract_path)

    # SubRip output
    output = ""

    if not os.path.exists(srt_file):
        # Iterate the pgs generator
        for ds in pgs.iter_displaysets():
            # If set has image, parse the image
            if ds.has_image:
                # Get Palette Display Segment
                pds = ds.pds[0]
                # Get Object Display Segment
                ods = ds.ods[0]

                if pds and ods:
                    # Create and show the bitmap image and convert it to RGBA
                    src = make_image(ods, pds).convert("RGBA")

                    # Create grayscale image with black background
                    img = Image.new("L", src.size, "BLACK")
                    # Paste the subtitle bitmap
                    img.paste(src, (0, 0), src)
                    # Invert images so the text is readable by Tesseract
                    img = ImageOps.invert(img)

                    # Parse the image with tesesract
                    text = pytesseract.image_to_string(
                        img, lang=tesseract_lang, config=tesseract_config
                    ).strip()

                    # Replace "|" with "I"
                    # Works better than blacklisting "|" in Tesseract,
                    # which results in I becoming "!" "i" and "1"
                    text = re.sub(r"[|/\\]", "I", text)
                    text = re.sub(r"[_]", "L", text)
                    start = datetime.fromtimestamp(ods.presentation_timestamp / 1000)
                    start = start + timedelta(hours=-1)

            else:
                # Get Presentation Composition Segment
                pcs = ds.pcs[0]

                if pcs:
                    end = datetime.fromtimestamp(pcs.presentation_timestamp / 1000)
                    end = end + timedelta(hours=-1)

                    if (
                        isinstance(start, datetime)
                        and isinstance(end, datetime)
                        and len(text)
                    ):
                        si = si + 1
                        sub_output = str(si) + "\n"
                        sub_output += (
                            start.strftime("%H:%M:%S,%f")[0:12]
                            + " --> "
                            + end.strftime("%H:%M:%S,%f")[0:12]
                            + "\n"
                        )
                        sub_output += text + "\n\n"

                        output += sub_output
                        start = end = text = None
            i = i + 1
        with open(srt_file, "w") as f:
            f.write(output)
        logger.info(f"Saved to: {srt_file}")

extract_subtitles

extract_subtitles(mkv_file, output_dir)

Extract subtitles from MKV file based on detected subtitle type.

Source code in mkv_episode_matcher/mkv_to_srt.py
def extract_subtitles(mkv_file: str, output_dir: str) -> Optional[str]:
    """
    Extract subtitles from MKV file based on detected subtitle type.
    """
    subtitle_type, stream_index = detect_subtitle_type(mkv_file)
    if not subtitle_type:
        logger.error(f"No supported subtitle streams found in {mkv_file}")
        return None

    base_name = Path(mkv_file).stem

    if subtitle_type == 'subrip':
        # For SRT subtitles, extract directly to .srt
        output_file = os.path.join(output_dir, f"{base_name}.srt")
        if not os.path.exists(output_file):
            cmd = [
                "ffmpeg", "-i", mkv_file,
                "-map", f"0:{stream_index}",
                output_file
            ]
    else:
        # For DVD or PGS subtitles, extract to SUP format first
        output_file = os.path.join(output_dir, f"{base_name}.sup")
        if not os.path.exists(output_file):
            cmd = [
                "ffmpeg", "-i", mkv_file,
                "-map", f"0:{stream_index}",
                "-c", "copy",
                output_file
            ]

    if not os.path.exists(output_file):
        try:
            subprocess.run(cmd, check=True)
            logger.info(f"Extracted subtitles from {mkv_file} to {output_file}")
            return output_file
        except subprocess.CalledProcessError as e:
            logger.error(f"Error extracting subtitles: {e}")
            return None
    else:
        logger.info(f"Subtitle file {output_file} already exists, skipping extraction")
        return output_file

convert_mkv_to_srt

convert_mkv_to_srt(season_path, mkv_files)

Convert subtitles from MKV files to SRT format.

Source code in mkv_episode_matcher/mkv_to_srt.py
def convert_mkv_to_srt(season_path: str, mkv_files: list[str]) -> None:
    """
    Convert subtitles from MKV files to SRT format.
    """
    logger.info(f"Converting {len(mkv_files)} files to SRT")

    # Filter out already processed files
    unprocessed_files = []
    for mkv_file in mkv_files:
        if check_if_processed(os.path.basename(mkv_file)):
            logger.info(f"Skipping {mkv_file} - already processed")
            continue
        unprocessed_files.append(mkv_file)

    if not unprocessed_files:
        logger.info("No new files to process")
        return

    # Create OCR directory
    output_dir = os.path.join(season_path, "ocr")
    os.makedirs(output_dir, exist_ok=True)

    for mkv_file in unprocessed_files:
        subtitle_file = extract_subtitles(mkv_file, output_dir)
        if not subtitle_file:
            continue

        if subtitle_file.endswith('.srt'):
            # Already have SRT, keep it in OCR directory
            logger.info(f"Extracted SRT subtitle to {subtitle_file}")
        else:
            # For SUP files (DVD or PGS), perform OCR
            srt_file = perform_ocr(subtitle_file)
            if srt_file:
                logger.info(f"Created SRT from OCR: {srt_file}")

detect_subtitle_type

detect_subtitle_type(mkv_file)

Detect the type and index of subtitle streams in an MKV file.

Source code in mkv_episode_matcher/mkv_to_srt.py
def detect_subtitle_type(mkv_file: str) -> tuple[Optional[str], Optional[int]]:
    """
    Detect the type and index of subtitle streams in an MKV file.
    """
    cmd = ["ffmpeg", "-i", mkv_file]

    try:
        result = subprocess.run(cmd, capture_output=True, text=True)

        subtitle_streams = []
        for line in result.stderr.split('\n'):
            if 'Subtitle' in line:
                stream_index = int(line.split('#0:')[1].split('(')[0])
                if 'subrip' in line:
                    subtitle_streams.append(('subrip', stream_index))
                elif 'dvd_subtitle' in line:
                    subtitle_streams.append(('dvd_subtitle', stream_index))
                elif 'hdmv_pgs_subtitle' in line:
                    subtitle_streams.append(('hdmv_pgs_subtitle', stream_index))

        # Prioritize subtitle formats: SRT > DVD > PGS
        for format_priority in ['subrip', 'dvd_subtitle', 'hdmv_pgs_subtitle']:
            for format_type, index in subtitle_streams:
                if format_type == format_priority:
                    return format_type, index

        return None, None

    except subprocess.CalledProcessError as e:
        logger.error(f"Error detecting subtitle type: {e}")
        return None, None

Utilities

mkv_episode_matcher.utils

Functions

get_valid_seasons

get_valid_seasons(show_dir)

Get all season directories that contain MKV files.

PARAMETER DESCRIPTION
show_dir

Base directory for the TV show

TYPE: str

RETURNS DESCRIPTION
list

List of paths to valid season directories

Source code in mkv_episode_matcher/utils.py
def get_valid_seasons(show_dir):
    """
    Get all season directories that contain MKV files.

    Args:
        show_dir (str): Base directory for the TV show

    Returns:
        list: List of paths to valid season directories
    """
    # Get all season directories
    season_paths = [
        os.path.join(show_dir, d)
        for d in os.listdir(show_dir)
        if os.path.isdir(os.path.join(show_dir, d))
    ]

    # Filter seasons to only include those with .mkv files
    valid_season_paths = []
    for season_path in season_paths:
        mkv_files = [f for f in os.listdir(season_path) if f.endswith(".mkv")]
        if mkv_files:
            valid_season_paths.append(season_path)

    if not valid_season_paths:
        logger.warning(f"No seasons with .mkv files found in show '{os.path.basename(show_dir)}'")
    else:
        logger.info(
            f"Found {len(valid_season_paths)} seasons with .mkv files in '{os.path.basename(show_dir)}'"
        )

    return valid_season_paths

check_filename

check_filename(filename)

Check if the filename is in the correct format (S01E02).

PARAMETER DESCRIPTION
filename

The filename to check.

TYPE: str

RETURNS DESCRIPTION
bool

True if the filename matches the expected pattern.

Source code in mkv_episode_matcher/utils.py
def check_filename(filename):
    """
    Check if the filename is in the correct format (S01E02).

    Args:
        filename (str): The filename to check.

    Returns:
        bool: True if the filename matches the expected pattern.
    """
    # Check if the filename matches the expected format
    match = re.search(r'.*S\d+E\d+', filename)
    return bool(match)

scramble_filename

scramble_filename(original_file_path, file_number)

Scrambles the filename of the given file path by adding the series title and file number.

PARAMETER DESCRIPTION
original_file_path

The original file path.

TYPE: str

file_number

The file number to be added to the filename.

TYPE: int

RETURNS DESCRIPTION

None

Source code in mkv_episode_matcher/utils.py
def scramble_filename(original_file_path, file_number):
    """
    Scrambles the filename of the given file path by adding the series title and file number.

    Args:
        original_file_path (str): The original file path.
        file_number (int): The file number to be added to the filename.

    Returns:
        None
    """
    logger.info(f"Scrambling {original_file_path}")
    series_title = os.path.basename(
        os.path.dirname(os.path.dirname(original_file_path))
    )
    original_file_name = os.path.basename(original_file_path)
    extension = os.path.splitext(original_file_path)[-1]
    new_file_name = f"{series_title} - {file_number:03d}{extension}"
    new_file_path = os.path.join(os.path.dirname(original_file_path), new_file_name)
    if not os.path.exists(new_file_path):
        logger.info(f"Renaming {original_file_name} -> {new_file_name}")
        os.rename(original_file_path, new_file_path)

rename_episode_file

rename_episode_file(original_file_path, new_filename)

Rename an episode file with a standardized naming convention.

PARAMETER DESCRIPTION
original_file_path

The original file path of the episode.

TYPE: str

new_filename

The new filename including season/episode info.

TYPE: str

RETURNS DESCRIPTION
str

Path to the renamed file, or None if rename failed.

Source code in mkv_episode_matcher/utils.py
def rename_episode_file(original_file_path, new_filename):
    """
    Rename an episode file with a standardized naming convention.

    Args:
        original_file_path (str): The original file path of the episode.
        new_filename (str): The new filename including season/episode info.

    Returns:
        str: Path to the renamed file, or None if rename failed.
    """
    original_dir = os.path.dirname(original_file_path)
    new_file_path = os.path.join(original_dir, new_filename)

    # Check if new filepath already exists
    if os.path.exists(new_file_path):
        logger.warning(f"File already exists: {new_filename}")

        # Add numeric suffix if file exists
        base, ext = os.path.splitext(new_filename)
        suffix = 2
        while True:
            new_filename = f"{base}_{suffix}{ext}"
            new_file_path = os.path.join(original_dir, new_filename)
            if not os.path.exists(new_file_path):
                break
            suffix += 1

    try:
        os.rename(original_file_path, new_file_path)
        logger.info(f"Renamed {os.path.basename(original_file_path)} -> {new_filename}")
        return new_file_path
    except OSError as e:
        logger.error(f"Failed to rename file: {e}")
        return None
    except FileExistsError as e:
        logger.error(f"Failed to rename file: {e}")
        return None

get_subtitles

get_subtitles(show_id, seasons)

Retrieves and saves subtitles for a given TV show and seasons.

PARAMETER DESCRIPTION
show_id

The ID of the TV show.

TYPE: int

seasons

A set of season numbers for which subtitles should be retrieved.

TYPE: Set[int]

Source code in mkv_episode_matcher/utils.py
def get_subtitles(show_id, seasons: set[int]):
    """
    Retrieves and saves subtitles for a given TV show and seasons.

    Args:
        show_id (int): The ID of the TV show.
        seasons (Set[int]): A set of season numbers for which subtitles should be retrieved.
    """
    logger.info(f"Getting subtitles for show ID {show_id}")
    config = get_config(CONFIG_FILE)
    show_dir = config.get("show_dir")
    series_name = sanitize_filename(os.path.basename(show_dir))
    tmdb_api_key = config.get("tmdb_api_key")
    open_subtitles_api_key = config.get("open_subtitles_api_key")
    open_subtitles_user_agent = config.get("open_subtitles_user_agent")
    open_subtitles_username = config.get("open_subtitles_username")
    open_subtitles_password = config.get("open_subtitles_password")

    if not all([
        show_dir,
        tmdb_api_key,
        open_subtitles_api_key,
        open_subtitles_user_agent,
        open_subtitles_username,
        open_subtitles_password,
    ]):
        logger.error("Missing configuration settings. Please run the setup script.")
        return

    try:
        subtitles = OpenSubtitles(open_subtitles_user_agent, open_subtitles_api_key)
        subtitles.login(open_subtitles_username, open_subtitles_password)
    except Exception as e:
        logger.error(f"Failed to log in to OpenSubtitles: {e}")
        return

    for season in seasons:
        episodes = fetch_season_details(show_id, season)
        logger.info(f"Found {episodes} episodes in Season {season}")

        for episode in range(1, episodes + 1):
            logger.info(f"Processing Season {season}, Episode {episode}...")

            series_cache_dir = os.path.join(CACHE_DIR, "data", series_name)
            os.makedirs(series_cache_dir, exist_ok=True)

            # Check for existing subtitle in any supported format
            existing_subtitle = find_existing_subtitle(
                series_cache_dir, series_name, season, episode
            )

            if existing_subtitle:
                logger.info(f"Subtitle already exists: {os.path.basename(existing_subtitle)}")
                continue

            # Default to standard format for new downloads
            srt_filepath = os.path.join(
                series_cache_dir,
                f"{series_name} - S{season:02d}E{episode:02d}.srt",
            )

            # get the episode info from TMDB
            url = f"https://api.themoviedb.org/3/tv/{show_id}/season/{season}/episode/{episode}?api_key={tmdb_api_key}"
            response = requests.get(url)
            response.raise_for_status()
            episode_data = response.json()
            episode_id = episode_data["id"]

            # search for the subtitle
            response = subtitles.search(tmdb_id=episode_id, languages="en")
            if len(response.data) == 0:
                logger.warning(
                    f"No subtitles found for {series_name} - S{season:02d}E{episode:02d}"
                )
                continue

            for subtitle in response.data:
                subtitle_dict = subtitle.to_dict()
                # Remove special characters and convert to uppercase
                filename_clean = re.sub(r"\W+", " ", subtitle_dict["file_name"]).upper()
                if f"E{episode:02d}" in filename_clean:
                    logger.info(f"Original filename: {subtitle_dict['file_name']}")
                    srt_file = subtitles.download_and_save(subtitle)
                    shutil.move(srt_file, srt_filepath)
                    logger.info(f"Subtitle saved to {srt_filepath}")
                    break

cleanup_ocr_files

cleanup_ocr_files(show_dir)

Clean up OCR files generated during the episode matching process.

PARAMETER DESCRIPTION
show_dir

The directory containing the show files.

TYPE: str

RETURNS DESCRIPTION

None

This function cleans up the OCR files generated during the episode matching process. It deletes the 'ocr' directory and all its contents in each season directory of the show.

Source code in mkv_episode_matcher/utils.py
def cleanup_ocr_files(show_dir):
    """
    Clean up OCR files generated during the episode matching process.

    Args:
        show_dir (str): The directory containing the show files.

    Returns:
        None

    This function cleans up the OCR files generated during the episode matching process.
    It deletes the 'ocr' directory and all its contents in each season directory of the show.
    """
    for season_dir in os.listdir(show_dir):
        season_dir_path = os.path.join(show_dir, season_dir)
        ocr_dir_path = os.path.join(season_dir_path, "ocr")
        if os.path.exists(ocr_dir_path):
            logger.info(f"Cleaning up OCR files in {ocr_dir_path}")
            shutil.rmtree(ocr_dir_path)

process_reference_srt_files

process_reference_srt_files(series_name)

Process reference SRT files for a given series.

PARAMETER DESCRIPTION
series_name

The name of the series.

TYPE: str

RETURNS DESCRIPTION
dict

A dictionary containing the reference files where the keys are the MKV filenames and the values are the corresponding SRT texts.

Source code in mkv_episode_matcher/utils.py
@logger.catch
def process_reference_srt_files(series_name):
    """
    Process reference SRT files for a given series.

    Args:
        series_name (str): The name of the series.

    Returns:
        dict: A dictionary containing the reference files where the keys are the MKV filenames
              and the values are the corresponding SRT texts.
    """
    from mkv_episode_matcher.__main__ import CACHE_DIR
    import os

    reference_files = {}
    reference_dir = os.path.join(CACHE_DIR, "data", series_name)

    for dirpath, _, filenames in os.walk(reference_dir):
        for filename in filenames:
            if filename.lower().endswith(".srt"):
                srt_file = os.path.join(dirpath, filename)
                logger.info(f"Processing {srt_file}")
                srt_text = extract_srt_text(srt_file)
                season, episode = extract_season_episode(filename)
                mkv_filename = f"{series_name} - S{season:02}E{episode:02}.mkv"
                reference_files[mkv_filename] = srt_text

    return reference_files

extract_srt_text

extract_srt_text(filepath)

Extracts text content from an SRT file.

PARAMETER DESCRIPTION
filepath

Path to the SRT file.

TYPE: str

RETURNS DESCRIPTION
list

List of text lines from the SRT file.

Source code in mkv_episode_matcher/utils.py
def extract_srt_text(filepath):
    """
    Extracts text content from an SRT file.

    Args:
        filepath (str): Path to the SRT file.

    Returns:
        list: List of text lines from the SRT file.
    """
    # Read the file content
    with open(filepath) as f:
        content = f.read()

    # Split into subtitle blocks
    blocks = content.strip().split('\n\n')

    text_lines = []
    for block in blocks:
        lines = block.split('\n')
        if len(lines) < 3:
            continue

        # Skip index and timestamp, get all remaining lines as text
        text = ' '.join(lines[2:])
        # Remove stage directions and tags
        text = re.sub(r'\[.*?\]|\<.*?\>', '', text)
        if text:
            text_lines.append(text)

    return text_lines

extract_season_episode

extract_season_episode(filename)

Extract season and episode numbers from filename.

PARAMETER DESCRIPTION
filename

Filename to parse

TYPE: str

RETURNS DESCRIPTION
tuple

(season_number, episode_number)

Source code in mkv_episode_matcher/utils.py
def extract_season_episode(filename):
    """
    Extract season and episode numbers from filename.

    Args:
        filename (str): Filename to parse

    Returns:
        tuple: (season_number, episode_number)
    """
    match = re.search(r'S(\d+)E(\d+)', filename)
    if match:
        return int(match.group(1)), int(match.group(2))
    return None, None

process_srt_files

process_srt_files(show_dir)

Process all SRT files in the given directory and its subdirectories.

PARAMETER DESCRIPTION
show_dir

The directory path where the SRT files are located.

TYPE: str

RETURNS DESCRIPTION
dict

A dictionary containing the SRT file paths as keys and their corresponding text content as values.

Source code in mkv_episode_matcher/utils.py
def process_srt_files(show_dir):
    """
    Process all SRT files in the given directory and its subdirectories.

    Args:
        show_dir (str): The directory path where the SRT files are located.

    Returns:
        dict: A dictionary containing the SRT file paths as keys and their corresponding text content as values.
    """
    srt_files = {}
    for dirpath, _, filenames in os.walk(show_dir):
        for filename in filenames:
            if filename.lower().endswith(".srt"):
                srt_file = os.path.join(dirpath, filename)
                logger.info(f"Processing {srt_file}")
                srt_text = extract_srt_text(srt_file)
                srt_files[srt_file] = srt_text
    return srt_files

compare_and_rename_files

compare_and_rename_files(srt_files, reference_files, dry_run=False)

Compare the srt files with the reference files and rename the matching mkv files.

PARAMETER DESCRIPTION
srt_files

A dictionary containing the srt files as keys and their contents as values.

TYPE: dict

reference_files

A dictionary containing the reference files as keys and their contents as values.

TYPE: dict

dry_run

If True, the function will only log the renaming actions without actually renaming the files. Defaults to False.

TYPE: bool DEFAULT: False

Source code in mkv_episode_matcher/utils.py
def compare_and_rename_files(srt_files, reference_files, dry_run=False):
    """
    Compare the srt files with the reference files and rename the matching mkv files.

    Args:
        srt_files (dict): A dictionary containing the srt files as keys and their contents as values.
        reference_files (dict): A dictionary containing the reference files as keys and their contents as values.
        dry_run (bool, optional): If True, the function will only log the renaming actions without actually renaming the files. Defaults to False.
    """
    logger.info(
        f"Comparing {len(srt_files)} srt files with {len(reference_files)} reference files"
    )
    for srt_text in srt_files.keys():
        parent_dir = os.path.dirname(os.path.dirname(srt_text))
        for reference in reference_files.keys():
            _season, _episode = extract_season_episode(reference)
            mkv_file = os.path.join(
                parent_dir, os.path.basename(srt_text).replace(".srt", ".mkv")
            )
            matching_lines = compare_text(
                reference_files[reference], srt_files[srt_text]
            )
            if matching_lines >= int(len(reference_files[reference]) * 0.1):
                logger.info(f"Matching lines: {matching_lines}")
                logger.info(f"Found matching file: {mkv_file} ->{reference}")
                new_filename = os.path.join(parent_dir, reference)
                if not dry_run:
                    logger.info(f"Renaming {mkv_file} to {new_filename}")
                    rename_episode_file(mkv_file, new_filename)

compare_text

compare_text(text1, text2)

Compare two lists of text lines and return the number of matching lines.

PARAMETER DESCRIPTION
text1

List of text lines from the first source.

TYPE: list

text2

List of text lines from the second source.

TYPE: list

RETURNS DESCRIPTION
int

Number of matching lines between the two sources.

Source code in mkv_episode_matcher/utils.py
def compare_text(text1, text2):
    """
    Compare two lists of text lines and return the number of matching lines.

    Args:
        text1 (list): List of text lines from the first source.
        text2 (list): List of text lines from the second source.

    Returns:
        int: Number of matching lines between the two sources.
    """
    # Flatten the list of text lines
    flat_text1 = [line for lines in text1 for line in lines]
    flat_text2 = [line for lines in text2 for line in lines]

    # Compare the two lists of text lines
    matching_lines = set(flat_text1).intersection(flat_text2)
    return len(matching_lines)

Configuration

mkv_episode_matcher.config

Functions

set_config

set_config(tmdb_api_key, open_subtitles_api_key, open_subtitles_user_agent, open_subtitles_username, open_subtitles_password, show_dir, file, tesseract_path=None)

Sets the configuration values and writes them to a file.

PARAMETER DESCRIPTION
tmdb_api_key

The API key for TMDB (The Movie Database).

TYPE: str

open_subtitles_api_key

The API key for OpenSubtitles.

TYPE: str

open_subtitles_user_agent

The user agent for OpenSubtitles.

TYPE: str

open_subtitles_username

The username for OpenSubtitles.

TYPE: str

open_subtitles_password

The password for OpenSubtitles.

TYPE: str

show_dir

The directory where the TV show episodes are located.

TYPE: str

file

The path to the configuration file.

TYPE: str

tesseract_path

The path to the Tesseract OCR executable.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION

None

Source code in mkv_episode_matcher/config.py
def set_config(
    tmdb_api_key,
    open_subtitles_api_key,
    open_subtitles_user_agent,
    open_subtitles_username,
    open_subtitles_password,
    show_dir,
    file,
    tesseract_path=None,
):
    """
    Sets the configuration values and writes them to a file.

    Args:
        tmdb_api_key (str): The API key for TMDB (The Movie Database).
        open_subtitles_api_key (str): The API key for OpenSubtitles.
        open_subtitles_user_agent (str): The user agent for OpenSubtitles.
        open_subtitles_username (str): The username for OpenSubtitles.
        open_subtitles_password (str): The password for OpenSubtitles.
        show_dir (str): The directory where the TV show episodes are located.
        file (str): The path to the configuration file.
        tesseract_path (str, optional): The path to the Tesseract OCR executable.

    Returns:
        None
    """
    config = configparser.ConfigParser()
    config["Config"] = {
        "tmdb_api_key": str(tmdb_api_key),
        "show_dir": show_dir,
        "max_threads": int(MAX_THREADS),
        "open_subtitles_api_key": str(open_subtitles_api_key),
        "open_subtitles_user_agent": str(open_subtitles_user_agent),
        "open_subtitles_username": str(open_subtitles_username),
        "open_subtitles_password": str(open_subtitles_password),
        "tesseract_path": str(tesseract_path),
    }
    logger.info(
        f"Setting config with API:{tmdb_api_key}, show_dir: {show_dir}, and max_threads: {MAX_THREADS}"
    )
    with open(file, "w") as configfile:
        config.write(configfile)

get_config

get_config(file)

Read and return the configuration from the specified file.

PARAMETER DESCRIPTION
file

The path to the configuration file.

TYPE: str

RETURNS DESCRIPTION
dict

The configuration settings as a dictionary.

Source code in mkv_episode_matcher/config.py
def get_config(file):
    """
    Read and return the configuration from the specified file.

    Args:
        file (str): The path to the configuration file.

    Returns:
        dict: The configuration settings as a dictionary.

    """
    logger.info(f"Loading config from {file}")
    config = configparser.ConfigParser()
    if os.path.exists(file):
        config.read(file)
        return config["Config"] if "Config" in config else None
    return {}