Last active
August 12, 2022 12:38
-
-
Save kkirsche/1791bc608c2cc0156d0de018ba41046d to your computer and use it in GitHub Desktop.
rsync algorithm in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# based on | |
# https://tylercipriani.com/blog/2017/07/09/the-rsync-algorithm-in-python/ | |
from collections.abc import Generator | |
from hashlib import md5 | |
from io import BufferedReader, TextIOWrapper | |
from logging import DEBUG, INFO, basicConfig, getLogger | |
from os import PathLike | |
from pathlib import Path | |
from typing import TypeAlias, Union, overload | |
from zlib import adler32 | |
__all__ = [ | |
"Chunks", | |
"Signature", | |
"_Openable", | |
"file", | |
] | |
BLOCK_SIZE = 4_096 | |
ENABLE_DEBUG: bool = True | |
_Adler32Checksum: TypeAlias = int | |
_ChunkIndex: TypeAlias = int | |
_FileChunkBytes: TypeAlias = bytes | |
_FileChunkStr: TypeAlias = str | |
_FileChunk: TypeAlias = Union[_FileChunkBytes, _FileChunkStr] | |
_Md5Hash: TypeAlias = str | |
_Openable: TypeAlias = str | bytes | PathLike[str] | PathLike[bytes] | int | |
basicConfig() | |
logger = getLogger(__name__) | |
logger.setLevel(DEBUG) if ENABLE_DEBUG else logger.setLevel(INFO) | |
@overload | |
def iter_file(f: BufferedReader) -> Generator[_FileChunkBytes, None, None]: | |
... | |
@overload | |
def iter_file(f: TextIOWrapper) -> Generator[_FileChunkStr, None, None]: | |
... | |
def iter_file(f: BufferedReader | TextIOWrapper) -> Generator[_FileChunk, None, None]: | |
while True: | |
logger.debug("reading chunk") | |
chunk = f.read(BLOCK_SIZE) | |
if not chunk: | |
logger.debug("no chunk located") | |
break | |
yield chunk | |
def md5_chunk(chunk: _FileChunk) -> _Md5Hash: | |
"""Return md5 checksum of chunk.""" | |
logger.debug("creating md5 hash of chunk") | |
if isinstance(chunk, _FileChunkStr): | |
chunk = chunk.encode() | |
return md5(chunk, usedforsecurity=False).hexdigest() | |
def adler32_chunk(chunk: _FileChunk) -> _Adler32Checksum: | |
"""Return adler32 chucksum for chunk.""" | |
logger.debug("creating adler32 checksum of chunk") | |
if isinstance(chunk, _FileChunkStr): | |
chunk = chunk.encode() | |
return adler32(chunk) | |
class Signature: | |
md5: _Md5Hash | |
adler32: _Adler32Checksum | |
def __init__(self, chunk: _FileChunk) -> None: | |
"""Initialize a new signature.""" | |
logger.debug("creating signature for chunk") | |
self.md5 = md5_chunk(chunk) | |
self.adler32 = adler32_chunk(chunk) | |
class Chunks: | |
"""A data structure that holds the rolling checksums for files.""" | |
chunks: list[Signature] | |
# chunk_sigs[checksum][md5][chunk_len] | |
chunk_sigs: dict[_Adler32Checksum, dict[_Md5Hash, _ChunkIndex]] | |
def __init__(self) -> None: | |
"""Initialize a chunks registry.""" | |
logger.debug("creating chunk signature registry") | |
self.chunks = [] | |
self.chunk_sigs = {} | |
def append(self, sig: Signature) -> None: | |
logger.debug("appending signature") | |
self.chunks.append(sig) | |
self.chunk_sigs.setdefault(sig.adler32, {}) | |
self.chunk_sigs[sig.adler32][sig.md5] = len(self.chunks) - 1 | |
def get_chunk(self, chunk: _FileChunk) -> _ChunkIndex | None: | |
logger.debug("retrieving chunk of file") | |
try: | |
checksum = adler32_chunk(chunk) | |
md5_hash = md5_chunk(chunk) | |
located_chunk = self.chunk_sigs[checksum][md5_hash] | |
logger.debug("chunk located") | |
return located_chunk | |
except KeyError: | |
logger.debug("chunk missing") | |
return None | |
def __getitem__(self, idx: int) -> Signature: | |
logger.debug("getting signature at idx %d", idx) | |
return self.chunks[idx] | |
def __len__(self) -> int: | |
logger.debug("retrieving signature count") | |
return len(self.chunks) | |
def __iter__(self) -> Generator[Signature, None, None]: | |
logger.debug("iterating over signatures") | |
yield from self.chunks | |
def checksums_file(filename: _Openable) -> Chunks: | |
"""Returns object with checksums of file.""" | |
logger.debug("creating signatures for %s", filename) | |
chunks = Chunks() | |
with open(filename, "r") as f: | |
for chunk in iter_file(f): | |
sig = Signature(chunk) | |
chunks.append(sig) | |
logger.debug("signatures created") | |
return chunks | |
def _get_chunk_list( | |
file_one: _Openable, file_two: _Openable | |
) -> list[_FileChunk | _ChunkIndex]: | |
"""The good stuff :) | |
1. Create a rolling checksum of file_two. | |
2. For each chunk in file one, determine if chunk is already in file two | |
a. If so: | |
i. return the index of that chunk. | |
ii. move the read head by the size of a chunk. | |
b. If not: | |
i. return the next byte. | |
ii. move the read head by 1 byte. | |
3. Start over at 2 until you're out of file to read. | |
""" | |
logger.debug("creating signatures for file two") | |
checksums = checksums_file(file_two) | |
logger.debug("signatures created") | |
chunks: list[_FileChunk | _ChunkIndex] = [] | |
offset = 0 | |
logger.debug("opening %s", file_one) | |
with open(file_one, "r") as f: | |
for chunk in iter_file(f): | |
located_chunk = checksums.get_chunk(chunk) | |
if located_chunk is None: | |
logger.debug("no signature located") | |
offset += 1 | |
chunks.append(chunk[0]) | |
f.seek(offset) | |
continue | |
logger.info("signature located") | |
offset += BLOCK_SIZE | |
chunks.append(located_chunk) | |
logger.debug("comparison completed, returning signatures") | |
return chunks | |
def file(file_one: _Openable, file_two: _Openable) -> str: | |
if isinstance(file_one, Path): | |
file_one = str(file_one.expanduser().resolve()) | |
if isinstance(file_two, Path): | |
file_two = str(file_two.expanduser().resolve()) | |
output = "" | |
logger.debug("opening file two") | |
with open(file_two, "r") as ft: | |
for chunk in _get_chunk_list(file_one, file_two): | |
if isinstance(chunk, int): | |
logger.debug("hunk found") | |
ft.seek(chunk * BLOCK_SIZE) | |
content = ft.read(BLOCK_SIZE) | |
output += content | |
continue | |
if isinstance(chunk, bytes): | |
chunk = chunk.decode() | |
output += chunk | |
return output | |
if __name__ == "__main__": | |
file_one = Path("./f1.txt") | |
file_two = Path("./f2.txt") | |
output = file(file_one, file_two) | |
print(output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment