The Algorithms logo
算法
关于我们捐赠

Lz 77

R
"""
LZ77 compression algorithm
- lossless data compression published in papers by Abraham Lempel and Jacob Ziv in 1977
- also known as LZ1 or sliding-window compression
- form the basis for many variations including LZW, LZSS, LZMA and others

It uses a “sliding window” method. Within the sliding window we have:
  - search buffer
  - look ahead buffer
len(sliding_window) = len(search_buffer) + len(look_ahead_buffer)

LZ77 manages a dictionary that uses triples composed of:
    - Offset into search buffer, it's the distance between the start of a phrase and
      the beginning of a file.
    - Length of the match, it's the number of characters that make up a phrase.
    - The indicator is represented by a character that is going to be encoded next.

As a file is parsed, the dictionary is dynamically updated to reflect the compressed
data contents and size.

Examples:
"cabracadabrarrarrad" <-> [(0, 0, 'c'), (0, 0, 'a'), (0, 0, 'b'), (0, 0, 'r'),
                           (3, 1, 'c'), (2, 1, 'd'), (7, 4, 'r'), (3, 5, 'd')]
"ababcbababaa" <-> [(0, 0, 'a'), (0, 0, 'b'), (2, 2, 'c'), (4, 3, 'a'), (2, 2, 'a')]
"aacaacabcabaaac" <-> [(0, 0, 'a'), (1, 1, 'c'), (3, 4, 'b'), (3, 3, 'a'), (1, 2, 'c')]

Sources:
en.wikipedia.org/wiki/LZ77_and_LZ78
"""

from dataclasses import dataclass

__version__ = "0.1"
__author__ = "Lucia Harcekova"


@dataclass
class Token:
    """
    Dataclass representing triplet called token consisting of length, offset
    and indicator. This triplet is used during LZ77 compression.
    """

    offset: int
    length: int
    indicator: str

    def __repr__(self) -> str:
        """
        >>> token = Token(1, 2, "c")
        >>> repr(token)
        '(1, 2, c)'
        >>> str(token)
        '(1, 2, c)'
        """
        return f"({self.offset}, {self.length}, {self.indicator})"


class LZ77Compressor:
    """
    Class containing compress and decompress methods using LZ77 compression algorithm.
    """

    def __init__(self, window_size: int = 13, lookahead_buffer_size: int = 6) -> None:
        self.window_size = window_size
        self.lookahead_buffer_size = lookahead_buffer_size
        self.search_buffer_size = self.window_size - self.lookahead_buffer_size

    def compress(self, text: str) -> list[Token]:
        """
        Compress the given string text using LZ77 compression algorithm.

        Args:
            text: string to be compressed

        Returns:
            output: the compressed text as a list of Tokens

        >>> lz77_compressor = LZ77Compressor()
        >>> str(lz77_compressor.compress("ababcbababaa"))
        '[(0, 0, a), (0, 0, b), (2, 2, c), (4, 3, a), (2, 2, a)]'
        >>> str(lz77_compressor.compress("aacaacabcabaaac"))
        '[(0, 0, a), (1, 1, c), (3, 4, b), (3, 3, a), (1, 2, c)]'
        """

        output = []
        search_buffer = ""

        # while there are still characters in text to compress
        while text:
            # find the next encoding phrase
            # - triplet with offset, length, indicator (the next encoding character)
            token = self._find_encoding_token(text, search_buffer)

            # update the search buffer:
            # - add new characters from text into it
            # - check if size exceed the max search buffer size, if so, drop the
            #   oldest elements
            search_buffer += text[: token.length + 1]
            if len(search_buffer) > self.search_buffer_size:
                search_buffer = search_buffer[-self.search_buffer_size :]

            # update the text
            text = text[token.length + 1 :]

            # append the token to output
            output.append(token)

        return output

    def decompress(self, tokens: list[Token]) -> str:
        """
        Convert the list of tokens into an output string.

        Args:
            tokens: list containing triplets (offset, length, char)

        Returns:
            output: decompressed text

        Tests:
            >>> lz77_compressor = LZ77Compressor()
            >>> lz77_compressor.decompress([Token(0, 0, 'c'), Token(0, 0, 'a'),
            ... Token(0, 0, 'b'), Token(0, 0, 'r'), Token(3, 1, 'c'),
            ... Token(2, 1, 'd'), Token(7, 4, 'r'), Token(3, 5, 'd')])
            'cabracadabrarrarrad'
            >>> lz77_compressor.decompress([Token(0, 0, 'a'), Token(0, 0, 'b'),
            ... Token(2, 2, 'c'), Token(4, 3, 'a'), Token(2, 2, 'a')])
            'ababcbababaa'
            >>> lz77_compressor.decompress([Token(0, 0, 'a'), Token(1, 1, 'c'),
            ... Token(3, 4, 'b'), Token(3, 3, 'a'), Token(1, 2, 'c')])
            'aacaacabcabaaac'
        """

        output = ""

        for token in tokens:
            for _ in range(token.length):
                output += output[-token.offset]
            output += token.indicator

        return output

    def _find_encoding_token(self, text: str, search_buffer: str) -> Token:
        """Finds the encoding token for the first character in the text.

        Tests:
            >>> lz77_compressor = LZ77Compressor()
            >>> lz77_compressor._find_encoding_token("abrarrarrad", "abracad").offset
            7
            >>> lz77_compressor._find_encoding_token("adabrarrarrad", "cabrac").length
            1
            >>> lz77_compressor._find_encoding_token("abc", "xyz").offset
            0
            >>> lz77_compressor._find_encoding_token("", "xyz").offset
            Traceback (most recent call last):
                ...
            ValueError: We need some text to work with.
            >>> lz77_compressor._find_encoding_token("abc", "").offset
            0
        """

        if not text:
            raise ValueError("We need some text to work with.")

        # Initialise result parameters to default values
        length, offset = 0, 0

        if not search_buffer:
            return Token(offset, length, text[length])

        for i, character in enumerate(search_buffer):
            found_offset = len(search_buffer) - i
            if character == text[0]:
                found_length = self._match_length_from_index(text, search_buffer, 0, i)
                # if the found length is bigger than the current or if it's equal,
                # which means it's offset is smaller: update offset and length
                if found_length >= length:
                    offset, length = found_offset, found_length

        return Token(offset, length, text[length])

    def _match_length_from_index(
        self, text: str, window: str, text_index: int, window_index: int
    ) -> int:
        """Calculate the longest possible match of text and window characters from
        text_index in text and window_index in window.

        Args:
            text: _description_
            window: sliding window
            text_index: index of character in text
            window_index: index of character in sliding window

        Returns:
            The maximum match between text and window, from given indexes.

        Tests:
            >>> lz77_compressor = LZ77Compressor(13, 6)
            >>> lz77_compressor._match_length_from_index("rarrad", "adabrar", 0, 4)
            5
            >>> lz77_compressor._match_length_from_index("adabrarrarrad",
            ...     "cabrac", 0, 1)
            1
        """
        if not text or text[text_index] != window[window_index]:
            return 0
        return 1 + self._match_length_from_index(
            text, window + text[text_index], text_index + 1, window_index + 1
        )


if __name__ == "__main__":
    from doctest import testmod

    testmod()
    # Initialize compressor class
    lz77_compressor = LZ77Compressor(window_size=13, lookahead_buffer_size=6)

    # Example
    TEXT = "cabracadabrarrarrad"
    compressed_text = lz77_compressor.compress(TEXT)
    print(lz77_compressor.compress("ababcbababaa"))
    decompressed_text = lz77_compressor.decompress(compressed_text)
    assert decompressed_text == TEXT, "The LZ77 algorithm returned the invalid result."