Skip to content

Instantly share code, notes, and snippets.

@dpedu
Created July 5, 2019 05:37
Show Gist options
  • Save dpedu/49353778db399e70f90cfda1407fb49d to your computer and use it in GitHub Desktop.
Save dpedu/49353778db399e70f90cfda1407fb49d to your computer and use it in GitHub Desktop.
class S3WriteProxy(object):
def __init__(self, storage, key):
"""
A file-like class that accepts writes (until the writer closes me) and accepts reads (until closed)
"""
self.storage = storage
self.key = key
self.q = Queue(maxsize=16) # 16 # number of 4 byte buffers (OS enforced ... )
self.leftovers = None
self.eof = False
self.uploaded = Queue(maxsize=1)
Thread(target=self._upload).start()
def _upload(self):
self.storage.s3.upload_fileobj(Fileobj=self,
Bucket=self.storage.bucket,
Key=self.key)
self.uploaded.put(None)
def write(self, data):
self.q.put(data)
def read(self, maxsize=0):
if self.eof:
return b''
buf = self.leftovers if self.leftovers else b''
while True:
p = self.q.get()
if not p:
self.eof = True
break
buf += p
if len(buf) >= maxsize:
break
if len(buf) > maxsize:
self.leftovers = buf[maxsize:]
buf = buf[0:maxsize]
else:
self.leftovers = None
return buf
def close(self):
self.q.put(None)
self.uploaded.get()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment