Last active
April 10, 2017 14:45
-
-
Save l04m33/1aa059b1a85c73bc7222 to your computer and use it in GitHub Desktop.
Async file wrapper for asyncio
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class AsyncFileWrapper(object): | |
DEFAULT_BLOCK_SIZE = 8192 | |
def __init__(self, loop=None, filename=None, | |
fileobj=None, mode='rb'): | |
if (filename is None and fileobj is None) or \ | |
(filename is not None and fileobj is not None): | |
raise RuntimeError('Confilicting arguments') | |
if filename is not None: | |
if 'b' not in mode: | |
raise RuntimeError('Only binary mode is supported') | |
fileobj = open(filename, mode=mode) | |
elif 'b' not in fileobj.mode: | |
raise RuntimeError('Only binary mode is supported') | |
fl = fcntl.fcntl(fileobj, fcntl.F_GETFL) | |
if fcntl.fcntl(fileobj, fcntl.F_SETFL, fl | os.O_NONBLOCK) != 0: | |
if filename is not None: | |
fileobj.close() | |
errcode = ctypes.get_errno() | |
raise OSError((errcode, errno.errorcode[errcode])) | |
self.fileobj = fileobj | |
if loop is None: | |
loop = asyncio.get_event_loop() | |
self.loop = loop | |
self.rbuffer = bytearray() | |
def seek(self, offset, whence=None): | |
if whence is None: | |
return self.fileobj.seek(offset) | |
else: | |
return self.fileobj.seek(offset, whence) | |
def read_ready(self, future, n, total): | |
try: | |
res = self.fileobj.read(n) | |
except Exception as exc: | |
future.set_exception(exc) | |
return | |
if res is None: # Blocked | |
self.read_handle = self.loop.call_soon(self.read_ready, future, n, total) | |
return | |
if not res: # EOF | |
future.set_result(bytes(self.rbuffer)) | |
return | |
self.rbuffer.extend(res) | |
if total > 0: | |
more_to_go = total - len(self.rbuffer) | |
if more_to_go <= 0: # enough | |
res, self.rbuffer = self.rbuffer[:n], self.rbuffer[n:] | |
future.set_result(bytes(res)) | |
else: | |
more_to_go = min(self.DEFAULT_BLOCK_SIZE, more_to_go) | |
self.read_handle = self.loop.call_soon(self.read_ready, future, more_to_go, total) | |
else: # total < 0 | |
self.read_handle = self.loop.call_soon(self.read_ready, future, self.DEFAULT_BLOCK_SIZE, total) | |
@asyncio.coroutine | |
def read(self, n=-1): | |
future = asyncio.Future(loop=self.loop) | |
if n == 0: | |
future.set_result(b'') | |
return future | |
elif n < 0: | |
self.rbuffer.clear() | |
self.read_handle = self.loop.call_soon(self.read_ready, future, self.DEFAULT_BLOCK_SIZE, n) | |
else: | |
self.rbuffer.clear() | |
read_block_size = min(self.DEFAULT_BLOCK_SIZE, n) | |
self.read_handle = self.loop.call_soon(self.read_ready, future, read_block_size, n) | |
return future | |
def write_ready(self, future, data, written): | |
try: | |
res = self.fileobj.write(data) | |
except BlockingIOError: | |
self.write_handle = self.loop.call_soon(self.write_ready, future, data, written) | |
return | |
except Exception as exc: | |
future.set_exception(exc) | |
return | |
if res < len(data): | |
data = data[res:] | |
self.write_handle = self.loop.call_soon(self.write_ready, future, data, written + res) | |
else: | |
future.set_result(written + res) | |
@asyncio.coroutine | |
def write(self, data): | |
future = asyncio.Future(loop=self.loop) | |
if len(data) > 0: | |
self.write_handle = self.loop.call_soon(self.write_ready, future, data, 0) | |
else: | |
future.set_result(0) | |
return future | |
@asyncio.coroutine | |
def copy_to(self, dest, copy_len=-1): | |
copied_size = 0 | |
while copy_len != 0: | |
if copy_len >= 0: | |
read_size = min(copy_len, self.DEFAULT_BLOCK_SIZE) | |
else: | |
read_size = self.DEFAULT_BLOCK_SIZE | |
rcontent = yield from self.read(read_size) | |
rlen = len(rcontent) | |
if rlen <= 0: | |
break | |
write_res = dest.write(rcontent) | |
if isinstance(write_res, asyncio.Future) \ | |
or asyncio.iscoroutine(write_res): | |
yield from write_res | |
copied_size += rlen | |
copy_len = copy_len - len(rcontent) if copy_len > 0 else copy_len | |
return copied_size | |
def close(self): | |
self.fileobj.close() | |
if hasattr(self, 'read_handle'): | |
self.read_handle.cancel() | |
if hasattr(self, 'write_handle'): | |
self.write_handle.cancel() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi, 今晚翻邮件, 看到一年前因为这代码向你发过邮件.
很抱歉没有在你回复我的时候就私自使用修改了这个gist的代码(个人项目)
如果不被允许, 请联系我吧: )
谢谢.