Last active
January 27, 2023 08:35
-
-
Save prschmid/80a19c22012e42d4d6e791c1e4eb8515 to your computer and use it in GitHub Desktop.
python open_sftp() context manager for sftp read and writing of files with similar behavior to open()
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from contextlib import contextmanager | |
from urlparse import urlparse | |
from paramiko import AuthenticationException | |
import pysftp | |
@contextmanager | |
def open_sftp(url, mode='r', n_retries=5, retry_sleep_time=5): | |
"""Context manager to read/write a file via SFTP | |
Note: We are using urlparse.urlparse to parse the :attr:`url`. There is | |
an corner case where this will not work if there is a ':' in the username. | |
:param url: Path to file with the format | |
sftp://username:password@host:[port]/path/to/file.txt | |
:param mode: The mode to open the file with. Either 'r' or 'w' | |
:param n_retries: The number of times to retry connecting to the SFTP server | |
:param retry_sleep_time: The number of seconds to sleep between retries | |
:returns: File object for the specified file | |
:raises: | |
:ValueError: If there is an error with the input | |
:AuthenticationException: If could not connect | |
:IOError: If file cannot be found/opened | |
""" | |
# Make sure that we have a proper minimum retry settings | |
n_retries = n_retries or 5 | |
n_retries = max(n_retries, 1) | |
retry_sleep_time = retry_sleep_time or 5 | |
retry_sleep_time = max(retry_sleep_time, 1) | |
# Parse url into username, password, host, port, path | |
# using the proper ftp URL syntax of | |
# sftp://username:password@host:port/path/to/file.txt | |
split_url = urlparse(url) | |
if not split_url.scheme == 'sftp': | |
raise ValueError( | |
"Invalid URL: Invalid scheme '{}'".format(split_url.scheme)) | |
if not split_url.hostname: | |
raise ValueError("Invalid URL: No hostname") | |
if not split_url.path: | |
raise ValueError("Invalid URL: No file path") | |
path_split = split_url.path.split('/') | |
if len(path_split) < 1: | |
raise ValueError( | |
"Invalid URL: Invalid path '{}'".format(split_url.path)) | |
file_name = path_split.pop() | |
file_path = path_split | |
if not file_name: | |
raise ValueError("Invalid URL: No file name") | |
cinfo = { | |
'host': split_url.hostname, | |
'username': split_url.username, | |
'password': split_url.password | |
} | |
try: | |
if split_url.port: | |
cinfo['port'] = split_url.port | |
except ValueError: | |
raise ValueError("Invalid URL: Invalid port") | |
# Connect to host. Sometimes it takes a couple tries... =( | |
sftp = None | |
for i in xrange(n_retries-1): | |
try: | |
sftp = pysftp.Connection(**cinfo) | |
break | |
except AuthenticationException: | |
warnings.warn("Could not authenticate... Will re-attempt") | |
import time as t | |
t.sleep(retry_sleep_time) | |
# Try one more time... if this fails, raise the AuthenticationError | |
if not sftp: | |
sftp = pysftp.Connection(**cinfo) | |
try: | |
for directory in file_path: | |
# Skip blanks (e.g. caused by '//') | |
if not directory: | |
continue | |
try: | |
sftp.chdir(directory) | |
except IOError as exc: | |
raise IOError( | |
"Could not navigate to path '{}': {}".format( | |
'/'.join(file_path), exc)) | |
try: | |
file_obj = sftp.open(file_name, mode=mode) | |
except IOError as exc: | |
raise IOError("Could not open file '{}' in path '{}': {}".format( | |
file_name, file_path, exc)) | |
yield file_obj | |
finally: | |
sftp.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment