Last active
September 7, 2015 12:10
-
-
Save quininer/ada9da0b5f2c38b55069 to your computer and use it in GitHub Desktop.
Tar Stream.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from tarfile import TarInfo | |
from os import path | |
from io import BytesIO | |
class TarIO: | |
"""Stream Tar IO. | |
>>> from tempfile import gettempdir | |
>>> from tarfile import open as taropen | |
>>> from io import BytesIO | |
>>> tmpdir = gettempdir() | |
>>> open("{}/{}".format(tmpdir, "test1.txt"), 'w').write("one"*1024) | |
>>> open("{}/{}".format(tmpdir, "test2.txt"), 'w').write("two"*1024) | |
>>> tio = TarIO([ | |
... "{}/{}".format(tmpdir, "test1.txt"), | |
... "{}/{}".format(tmpdir, "test2.txt") | |
... ]) | |
>>> tardata = b"" | |
>>> tmpdata = tio.read(2048) | |
>>> while tmpdata: | |
... tardata += tmpdata | |
... tmpdata = tio.read(2048) | |
>>> tarobj = taropen(fileobj=BytesIO(tardata)) | |
>>> list(tarobj)[0].name | |
'test1.txt' | |
>>> list(tarobj)[1].name | |
'test2.txt' | |
>>> list(tarobj)[0].size | |
3072 | |
""" | |
def __init__(self, locations): | |
self.tarlist = [] | |
self.pos = 0 | |
for location in locations: | |
tinfo = TarInfo(path.split(location)[-1]) | |
file = open(location, 'rb') | |
file.seek(0, 2) | |
fsize = file.tell() | |
file.close() | |
tinfo.mtime = path.getmtime(location) | |
tinfo.size = fsize | |
tailsize = ((512 + fsize) // 512 + 1) * 512 - (512 + fsize) | |
self.tarlist.append((512, BytesIO(tinfo.tobuf()))) | |
self.tarlist.append((fsize, open(location, 'rb'))) | |
self.tarlist.append((tailsize, BytesIO(b"\00" * tailsize))) | |
def seek(self, pos, whence=0): | |
if whence == 0 or whence == None: | |
self.pos = pos | |
elif whence == 1: | |
self.pos += pos | |
elif whence == 2: | |
self.pos = sum(map((lambda t: t[0]), self.tarlist)) - pos | |
return self.pos | |
def tell(self): | |
return self.pos | |
def read(self, size=None): | |
_bytes = b"" | |
if size is None: | |
for _, get_bytes in self.tarlist: | |
_bytes += get_bytes.read() | |
else: | |
pos_count = 0 | |
for num, bio in self.tarlist: | |
if size <= 0: | |
break | |
pos_count += num | |
if pos_count < self.pos: | |
continue | |
bpos = self.pos - (pos_count - num) | |
bio.seek(bpos) | |
readsize = ((num - bpos) if size > (num - bpos) else size) | |
_bytes += bio.read(readsize) | |
self.pos += readsize | |
size -= readsize | |
return _bytes |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment