Created
December 23, 2019 12:56
-
-
Save ducnguyen6431/478bf603bbc2c52ee3bfd7a7cbbfff30 to your computer and use it in GitHub Desktop.
Built (not complete) base on https://github.com/wonjohnchoi/Simple-Python-File-Server-With-Browse-Upload-and-Authentication
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# change these values as you see fit | |
# after running `sudo ./install`, the server should run at http://host:port/base_url | |
# Host info | |
host = '' | |
port = 0 | |
database = 'simple_server.db' | |
# Auth | |
username = '' | |
password = '' | |
# Misc | |
base_url = '' | |
# for example, with above default values, the server should run at 127.0.0.1:5000/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
__author__ = "dn6431" | |
__contributor__ = ["bones7456", "wonjohnchoi"] | |
__description__ = """Simple HTTP Server With Upload and Authentication and more | |
This module is built on HTTPServer by implementing the standard GET | |
and HEAD requests in a fairly straight forward manner""" | |
__epilog__ = """Created by: {} | |
Contributor: {} {}""".format(__author__, __contributor__[0], __contributor__[1]) | |
import os | |
import posixpath | |
from http.server import HTTPServer, BaseHTTPRequestHandler, DEFAULT_ERROR_MESSAGE | |
from http import HTTPStatus | |
import requests | |
import html | |
import shutil | |
import mimetypes | |
from io import BytesIO | |
import re | |
import sys | |
import base64 | |
import settings | |
from functools import partial | |
import json | |
def gen_key(username='', password=''): | |
return base64.b64encode('{}:{}'.format(username, password).encode('utf-8')) | |
class Counter: | |
def __init__(self): | |
import sqlite3 | |
print("Creating database and table counter") | |
self.conn = sqlite3.connect(settings.database) | |
self.cursor = self.conn.cursor() | |
self.cursor.execute('''create table if not exists counter(full_path text primary key, count integer)''') | |
def read_counter(self, path): | |
# Check how many time file is visited | |
self.cursor = self.conn.cursor() | |
self.cursor.execute('select * from counter where full_path=?', (path,)) | |
row = self.cursor.fetchone() | |
count = 0 | |
if row is not None: | |
count = row[-1] | |
return count | |
def increase_counter(self, path): | |
# Increase number of time file is visited | |
res = self.read_counter(path) | |
res += 1 | |
print("{} was visited {} times".format(path, res)) | |
self.cursor = self.conn.cursor() | |
self.cursor.execute('replace into counter(full_path, count) values(?, ?)', (path, res)) | |
self.conn.commit() | |
if self.cursor.rowcount > 0: | |
print("Successfully updated database") | |
def copyfile(source, destination): | |
shutil.copyfileobj(source, destination) | |
class SimpleServer(BaseHTTPRequestHandler): | |
counter = Counter() | |
def __init__(self, *args, directory=None, username=None, password=None, secure=False, **kwargs): | |
if directory: | |
self.directory = directory | |
else: | |
self.directory = os.getcwd() | |
self.username = username | |
self.password = password | |
self.secure = secure | |
super().__init__(*args, **kwargs) | |
def do_HEAD(self): | |
"""Serve a HEAD request""" | |
f = self.send_head() | |
if f: | |
f.close() | |
def do_GET(self): | |
if self.username and not self.authenticate(): | |
return | |
f = self.send_head() | |
if f: | |
copyfile(f, self.wfile) | |
f.close() | |
def do_POST(self): | |
if self.username and not self.authenticate(): | |
return | |
result, info = self.deal_post_data() | |
print('{} by {} @ {}'.format(info, self.client_address[0], self.client_address[1])) | |
f = BytesIO() | |
response = {"saved files": info} | |
self.send_response(200) | |
if type(info) is list: | |
f.write(json.dumps(response).encode('utf-8')) | |
self.send_header("Content-type", "text/json") | |
else: | |
self.send_header("Content-type", "text/html") | |
f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">'.encode('utf-8')) | |
f.write('<html>\n<title>Upload Result Page</title>\n'.encode('utf-8')) | |
f.write('<body>\n<h2>Upload Result Page</h2>\n'.encode('utf-8')) | |
f.write('<hr>\n'.encode('utf-8')) | |
f.write('<strong>Failed:</strong>'.encode('utf-8')) | |
f.write(info.encode('utf-8')) | |
f.write('<br><a href="{}">back</a>'.format(self.headers['referer']).encode('utf-8')) | |
f.write('<hr><small>Powerd By: bones7456, check new version at '.encode('utf-8')) | |
f.write('<a href="http://li2z.cn/?s=SimpleHTTPServerWithUpload">'.encode('utf-8')) | |
f.write('</body>\n</html>\n'.encode('utf-8')) | |
length = f.tell() | |
f.seek(0) | |
self.send_header("Content-Length", str(length)) | |
self.end_headers() | |
if f: | |
print('writing response') | |
copyfile(f, self.wfile) | |
print('end response') | |
f.close() | |
def deal_post_data(self): | |
if 'Content-Type' not in self.headers.keys(): | |
return False, "Cannot determine content type" | |
boundary = self.headers['Content-Type'].split("=")[-1] | |
if 'Content-Length' not in self.headers.keys(): | |
return False, "Cannot determine content length" | |
remain_bytes = int(self.headers['content-length']) | |
line = self.rfile.readline() | |
remain_bytes -= len(line) | |
if boundary not in line.decode(): | |
return False, "Content NOT begin with boundary" | |
path = self.translate_path(self.path) | |
if not os.path.exists(path): | |
os.makedirs(path) | |
uploaded = [] | |
line = self.rfile.readline() | |
remain_bytes -= len(line) | |
result, filename = self.fetch_filename(line) | |
while result: | |
result, message, remain_bytes = self.download_file(remain_bytes, boundary, path, filename) | |
uploaded.append(message) | |
if not result: | |
return result, message | |
else: | |
if remain_bytes > 0: | |
line = self.rfile.readline() | |
remain_bytes -= len(line) | |
result, filename = self.fetch_filename(line) | |
else: | |
result = False | |
return True, uploaded | |
def download_file(self, remain_bytes, boundary, path, filename): | |
file_path = os.path.join(path, filename) | |
# Read content type of file | |
line = self.rfile.readline() | |
remain_bytes -= len(line) | |
# Empty line before content | |
line = self.rfile.readline() | |
remain_bytes -= len(line) | |
try: | |
out = open(file_path, 'wb') | |
except IOError: | |
return False, "Can't create file to write. Permission required", remain_bytes | |
preline = self.rfile.readline() | |
remain_bytes -= len(preline) | |
while remain_bytes > 0: | |
line = self.rfile.readline() | |
remain_bytes -= len(line) | |
if boundary.encode('utf-8') in line: | |
out.write(preline) | |
out.close() | |
return True, filename, remain_bytes | |
else: | |
out.write(preline) | |
preline = line | |
return False, "Unexpected end of data stream", remain_bytes | |
def fetch_filename(self, content): | |
fn = re.findall(r'Content-Disposition.*name="(.*)"; filename="(.*)"', content.decode()) | |
if not fn or len(fn) < 1 or len(fn[0]) < 2: | |
return False, "Can't find filename" | |
return True, fn[0][1] | |
def do_AUTHHEAD(self): | |
self.send_response(HTTPStatus.UNAUTHORIZED) | |
self.send_header('WWW-Authenticate', 'Basic realm="Test"') | |
self.send_header('Content-type', 'text/html') | |
self.end_headers() | |
def authenticate(self): | |
auth_header = self.headers.get('Authorization') | |
print("Authenticating with key {}".format(gen_key(self.username, self.password))) | |
if auth_header and auth_header == 'Basic {}'.format(gen_key(self.username, self.password).decode()): | |
print("Authenticated") | |
return True | |
self.do_AUTHHEAD() | |
print("Authenticate failed") | |
self.wfile.write(b'<html><body><h1>Authenticate failed<h1><body><html>') | |
return False | |
def send_head(self): | |
# Translate to local path | |
path = self.translate_path(self.path) | |
f = None | |
if os.path.isdir(path): | |
if not self.path.endswith('/'): | |
self.send_response(HTTPStatus.MOVED_PERMANENTLY) | |
self.send_header("Location", self.path + '/') | |
self.end_headers() | |
return f | |
for index in "index.html", "index.htm": | |
index = os.path.join(path, index) | |
if os.path.exists(index): | |
path = index | |
break | |
SimpleServer.counter.increase_counter(path) | |
if os.path.isdir(path): | |
return self.list_dir(path) | |
file_type = self.guess_type(path) | |
try: | |
f = open(path, 'rb') | |
except IOError: | |
self.send_error(HTTPStatus.NOT_FOUND, "File not found", "We 're sorry") | |
return f | |
self.send_response(HTTPStatus.OK) | |
file_stat = os.fstat(f.fileno()) | |
self.send_header("Content-Type", file_type) | |
self.send_header("Content-Length", str(file_stat.st_size)) | |
self.send_header("Last-Modified", self.date_time_string(file_stat.st_mtime)) | |
self.end_headers() | |
return f | |
def translate_path(self, path): | |
# abandon query parameters | |
path = path.split('?', 1)[0] | |
path = path.split('#', 1)[0] | |
# Don't forget explicit trailing slash when normalizing. Issue17324 | |
trailing_slash = path.rstrip().endswith('/') | |
try: | |
path = requests.utils.unquote(path, errors='surrogatepass') | |
except UnicodeDecodeError: | |
path = requests.utils.unquote(path) | |
path = posixpath.normpath(path) | |
words = path.split('/') | |
words = filter(None, words) | |
path = self.directory | |
for word in words: | |
if os.path.dirname(word) or word in (os.curdir, os.pardir): | |
# Ignore components that are not a simple file/directory name | |
continue | |
path = os.path.join(path, word) | |
if trailing_slash: | |
path += '/' | |
return path | |
def list_dir(self, path): | |
try: | |
dir_list = os.listdir(path) | |
except os.error: | |
self.send_error(HTTPStatus.NOT_FOUND, "Directory require higher permission") | |
return None | |
dir_list.sort(key=lambda a: a.lower()) | |
if path != '/': | |
dir_list = ['..'] + dir_list | |
f = BytesIO() | |
display_path = html.escape(requests.utils.unquote(self.path)) | |
f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">'.encode('utf-8')) | |
f.write('<html>\n<title>Directory listing for {}</title>\n'.format(display_path).encode('utf-8')) | |
f.write('<body>\n<h2>Directory listing for {} (frequently used directories are more reddish)</h2>\n'.format( | |
display_path).encode('utf-8')) | |
f.write('<hr>\n'.encode('utf-8')) | |
f.write('<form ENCTYPE="multipart/form-data" method="post">'.encode('utf-8')) | |
f.write('<input name="file" type="file"/>'.encode('utf-8')) | |
f.write('<input type="submit" value="upload"/></form>\n'.encode('utf-8')) | |
f.write('<hr>\n<ul>\n'.encode('utf-8')) | |
tot_counts = 0 | |
for name in dir_list: | |
child_file_path = posixpath.normpath(os.path.join(path, name)) | |
counts = self.counter.read_counter(child_file_path) | |
tot_counts += counts | |
# avoid divide by zero error | |
if tot_counts == 0: | |
tot_counts += 1 | |
for name in dir_list: | |
child_file_path = posixpath.normpath(os.path.join(path, name)) | |
display_name = link_name = name | |
# Append / for directories or @ for symbolic links | |
if os.path.isdir(child_file_path): | |
display_name = name + "/" | |
link_name = name + "/" | |
if os.path.islink(child_file_path): | |
display_name = name + "@" | |
# Note: a link to a directory displays with @ and links with / | |
counts = self.counter.read_counter(child_file_path) | |
# red portion of rgb value. with **0.2, it's overall more reddish | |
rgb_r = 255 * (float(counts) / tot_counts) ** 0.2 | |
f.write('<li><a style="color:rgb({},0,0)" href="{}">{}</a>\n'.format(rgb_r, requests.utils.quote(link_name), | |
html.escape(display_name)).encode( | |
'utf-8')) | |
f.write('</ul>\n<hr>\n</body>\n</html>\n'.encode('utf-8')) | |
length = f.tell() | |
f.seek(0) | |
self.send_response(200) | |
self.send_header("Content-type", "text/html") | |
self.send_header("Content-Length", str(length)) | |
self.end_headers() | |
return f | |
def guess_type(self, path): | |
base, ext = posixpath.splitext(path) | |
ext = ext.lower() | |
if ext in self.extensions_map: | |
return self.extensions_map[ext] | |
else: | |
return self.extensions_map[''] | |
if not mimetypes.inited: | |
mimetypes.init() | |
extensions_map = mimetypes.types_map.copy() | |
extensions_map.update({ | |
'': 'application/octet-stream', # Default | |
'.py': 'text/plain', | |
'.c': 'text/plain', | |
'.h': 'text/plain', | |
'.swift': 'text/plain' | |
}) | |
def start_server(handlerClass=BaseHTTPRequestHandler, | |
serverClass=HTTPServer, | |
protocol='HTTP/1.0', bind=settings.host, port=settings.port): | |
server_address = (bind, port) | |
handlerClass.protocol_version = protocol | |
with serverClass(server_address, handlerClass) as httpd: | |
sa = httpd.socket.getsockname() | |
serve_message = "Serving HTTP on {host} port {port}\nhttp://{host}:{port}/" | |
print(serve_message.format(host=sa[0], port=sa[1])) | |
print('Use <Ctrl-C> to stop') | |
try: | |
httpd.serve_forever() | |
except KeyboardInterrupt: | |
print("\nKeyboard interrupt received, exiting.") | |
sys.exit(0) | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser(usage=__description__, | |
description="You can also load args via settings.py", | |
epilog=__epilog__) | |
parser.add_argument('--bind', '-b', default='127.0.0.1', metavar='ADDRESS', | |
help='Specify alternate bind address' | |
'[default: 127.0.0.1]') | |
parser.add_argument('--directory', '-d', default=os.getcwd(), | |
help='Specify root directory' | |
'[default: current directory]') | |
parser.add_argument('--username', '-u', metavar='USERNAME', | |
help='Username to secure server' | |
'[default: your username]') | |
parser.add_argument('--password', '-p', metavar='PASSWORD', | |
help='Password to secure server') | |
parser.add_argument('port', action='store', | |
default=14230, type=int, | |
nargs='?', | |
help='Specify server port [default: 14230]') | |
arguments = parser.parse_args() | |
handler_class = partial(SimpleServer, | |
directory=arguments.directory, | |
username=arguments.username, | |
password=arguments.password) | |
start_server(handlerClass=handler_class, | |
bind=arguments.bind if not settings.host.strip() else settings.host.strip(), | |
port=arguments.port if settings.port == 0 else settings.port) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment