Last active
October 5, 2016 14:18
-
-
Save sebschrader/c3a95b2bd19af525cdedd0387657e694 to your computer and use it in GitHub Desktop.
GitHub Release Hook to run Jekyll
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# SECRET must be a bytes object | |
SECRET = b"correcthorsebatterystaple" | |
TARGET_DIR = "/var/www/htdocs/my-site" | |
REPO_DIR = "/var/lib/my-repo.git" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 -u | |
import argparse | |
import collections | |
import functools | |
import hashlib | |
import hmac | |
import io | |
import json | |
import os | |
import subprocess | |
import sys | |
import time | |
import urllib.parse | |
import wsgiref.simple_server | |
class HTTPError(Exception): | |
def __init__(self, code, message=None): | |
super().__init__(code, message) | |
self.code = code | |
self.message = message | |
class HTTPBadRequestError(HTTPError): | |
def __init__(self, message=None): | |
super().__init__(400, message) | |
class HTTPUnauthorizedError(HTTPError): | |
def __init__(self, message=None): | |
super().__init__(401, message) | |
class HTTPForbiddenError(HTTPError): | |
def __init__(self, message=None): | |
super().__init__(403, message) | |
class HTTPNotFoundError(HTTPError): | |
def __init__(self, message=None): | |
super().__init__(404, message) | |
class HTTPMethodNotAllowedError(HTTPError): | |
def __init__(self, message=None): | |
super().__init__(405, message) | |
class HTTPInternalServerError(HTTPError): | |
def __init__(self, message=None): | |
super().__init__(500, message) | |
class HTTPNotImplementedError(HTTPError): | |
def __init__(self, message=None): | |
super().__init__(501, message) | |
class Headers(collections.Mapping): | |
def __init__(self, environ): | |
super().__init__() | |
self.environ = environ | |
@staticmethod | |
def _cgi_name(key): | |
if not isinstance(key, str): | |
return key | |
cgi_name = key.upper().replace('-', '_') | |
if cgi_name not in {'CONTENT_LENGTH', 'CONTENT_TYPE'}: | |
return 'HTTP_' + cgi_name | |
return cgi_name | |
def __getitem__(self, key): | |
cgi_name = self._cgi_name(key) | |
try: | |
return self.environ[cgi_name] | |
except KeyError: | |
raise HTTPBadRequestError("Missing header {}".format(key)) | |
def __contains__(self, key): | |
cgi_name = self._cgi_name(key) | |
return cgi_name in self.environ | |
def __iter__(self): | |
for key, value in self.environ.items(): | |
if key.startswith('HTTP_') and key not in {'HTTP_CONTENT_LENGTH', | |
'HTTP_CONTENT_TYPE'}: | |
yield key[5:].title().replace('_', '-') | |
elif key in {'CONTENT_LENGTH', 'CONTENT_TYPE'}: | |
yield key.title().replace('_', '-') | |
def __len__(self): | |
return len(iter(self)) | |
class BaseRequestHandler: | |
""" | |
Webhook that receives and verifies any GitHub event. | |
To handle an event, subclass this event and implement | |
handle_{event}_event methods for all events you want to handle. | |
The handle methods should either return a (code, dict) tuple that will be | |
used to return a JSON response or raise an AbortError. | |
""" | |
default_encoder = functools.partial(json.dumps, indent=2) | |
reason_phrases = { | |
100: "Continue", | |
200: "OK", | |
400: "Bad Request", | |
401: "", | |
402: "", | |
403: "Forbidden", | |
404: "Not Found", | |
500: "Internal Error", | |
} | |
def __init__(self, environ, start_response, *, secret): | |
self.headers = Headers(environ) | |
self.environ = environ | |
self.start_response = start_response | |
self.input = environ['wsgi.input'] | |
self.errors = environ['wsgi.errors'] | |
self.request_method = environ['REQUEST_METHOD'] | |
self.remote_addr = environ['REMOTE_ADDR'] | |
self.remote_port = environ['REMOTE_PORT'] | |
self.server_name = environ['SERVER_NAME'] | |
self.server_port = environ['SERVER_PORT'] | |
self.server_protocol = environ['SERVER_PROTOCOL'] | |
self.script_name = environ['SCRIPT_NAME'] | |
self.path_info = environ['PATH_INFO'] | |
self.query_string = environ['QUERY_STRING'] | |
self.secret = secret | |
def send_error(self, code, message=None, exc_info=None): | |
""" | |
Send an error response. | |
:param code: HTTP status code | |
:param message: An error message string | |
:param exc_info: Exception info | |
""" | |
response = {'message': message} | |
yield from self.send_json(code, response, exc_info) | |
def send_json(self, code, obj, exc_info=None, encoder=default_encoder): | |
""" | |
Send a JSON response | |
:param code: HTTP status code | |
:param obj: Python object to serialize | |
:param encoder: Callable that takes a Python object and returns the | |
encoded JSON as str | |
""" | |
json = encoder(obj) | |
content = json.encode('utf-8', 'replace') | |
try: | |
reason_phrase = self.reason_phrases[code] | |
except KeyError: | |
status = str(code) | |
else: | |
status = "{:d} {}".format(code, reason_phrase) | |
self.start_response(status, [ | |
('Content-Type', 'application/json'), | |
('Content-Length', str(len(content))), | |
], exc_info) | |
if self.request_method != 'HEAD' and code >= 200 and code not in (204, 304): | |
yield content | |
else: | |
yield b'' | |
KNOWN_METHODS = {'OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT'} | |
def handle_request(self): | |
try: | |
try: | |
method_handler = getattr(self, 'do_{}'.format(self.request_method)) | |
except AttributeError as e: | |
if self.request_method not in self.KNOWN_METHODS: | |
raise HTTPNotImplementedError("Unknown HTTP method {}" | |
.format(self.method)) | |
else: | |
raise HTTPMethodNotAllowedError("HTTP method {} is not " | |
"supported" | |
.format(self.method)) | |
code, response = method_handler() | |
except HTTPError as e: | |
return self.send_error(e.code, e.message) | |
except Exception as e: | |
import traceback | |
print('-' * 78, file=self.errors) | |
print('Exception happened during processing of request from ' | |
'{}:{}:' | |
.format(self.remote_addr, self.remote_port), | |
file=self.errors) | |
traceback.print_exc(file=self.errors) | |
print('-' * 78, file=self.errors) | |
self.errors.flush() | |
return self.send_error(500, "An unexpected error occurred.", sys.exc_info()) | |
else: | |
return self.send_json(code, response) | |
def do_GET(self): | |
return 200, {'supported_events': [ | |
attr[7:-6] | |
for attr in dir(self) | |
if attr.startswith('handle_') and attr.endswith('_event') | |
]} | |
do_HEAD = do_GET | |
@staticmethod | |
def verify_signature(secret, signature, content): | |
algorithm, sep, digest = signature.partition('=') | |
if sep == '': | |
raise HTTPBadRequestError("Invalid X-Hub-Signature format") | |
if algorithm not in hashlib.algorithms_available: | |
raise HTTPNotImplementedError("Unsupported digest {}".format(algorithm)) | |
digestmod = getattr(hashlib, algorithm) | |
mac = hmac.new(secret, content, digestmod) | |
if not hmac.compare_digest(mac.hexdigest(), digest): | |
raise HTTPUnauthorizedError("Signature verification failed") | |
@staticmethod | |
def decode_urlencoded_content(content): | |
try: | |
decoded_content = content.decode('ascii') | |
except UnicodeDecodeError as e: | |
raise HTTPBadRequestError("Could not decode content as ASCII") from e | |
try: | |
form = dict(urllib.parse.parse_qsl(decoded_content, | |
strict_parsing=True, | |
errors='strict')) | |
except ValueError as e: | |
raise HTTPBadRequestError("Content is not valid " | |
"application/x-www-form-urlencoded form " | |
"data") from e | |
try: | |
return form['payload'] | |
except KeyError as e: | |
raise HTTPBadRequestError("Missing form element 'payload'") from e | |
@staticmethod | |
def decode_json_content(content): | |
try: | |
return content.decode('utf-8') | |
except UnicodeDecodeError as e: | |
raise HTTPBadRequestError("Could not decode content as UTF-8") from e | |
def do_POST(self): | |
if 'Content-Length' not in self.headers: | |
content_length = -1 | |
else: | |
try: | |
content_length = int(self.headers['Content-Length']) | |
except ValueError as e: | |
raise HTTPBadRequestError("Invalid Content-Length") from e | |
if content_length < 0: | |
raise HTTPBadRequestError("Invalid Content-Length") | |
content = self.input.read(content_length) | |
if len(content) < content_length: | |
raise HTTPBadRequestError("Content was shorter than Content-Length {} " | |
"< {}".format(len(content), content_length)) | |
self.verify_signature(self.secret, self.headers['X-Hub-Signature'], | |
content) | |
content_type = self.headers['Content-Type'] | |
if content_type == 'application/json': | |
payload = self.decode_json_content(content) | |
elif content_type == 'application/x-www-form-urlencoded': | |
payload = self.decode_urlencoded_content(content) | |
else: | |
raise HTTPBadRequestError("Content-Type must be either application/" | |
"json or application/x-www-form-urlencoded") | |
try: | |
message = json.loads(payload) | |
except ValueError as e: | |
raise HTTPBadRequestError("Payload is not a valid JSON document") from e | |
event = self.headers['X-GitHub-Event'] | |
delivery = self.headers['X-GitHub-Delivery'] | |
try: | |
event_handler = getattr(self, "handle_{}_event".format(event)) | |
except AttributeError as e: | |
raise HTTPBadRequestError("Unsupported event {}".format(event)) from e | |
return event_handler(delivery, message) | |
class PingHandler(BaseRequestHandler): | |
def handle_ping_event(self, delivery, message): | |
return 200, {'message': 'Ping OK'} | |
class WSGIApp: | |
def __init__(self, handler_class=PingHandler, handler_kwargs=None): | |
self.handler_class = handler_class | |
if handler_kwargs is None: | |
handler_kwargs = {} | |
self.handler_kwargs = handler_kwargs | |
def __call__(self, environ, start_response): | |
request_handler = self.handler_class(environ, start_response, | |
**self.handler_kwargs) | |
return request_handler.handle_request() | |
class JekyllBuildHandler(PingHandler): | |
def __init__(self, environ, start_response, *, secret, repo_dir, target_dir): | |
super().__init__(environ, start_response, secret=secret) | |
self.repo_dir = repo_dir | |
self.target_dir = target_dir | |
def handle_release_event(self, delivery, message): | |
try: | |
tag_name = message['release']['tag_name'] | |
except KeyError as e: | |
raise HTTPNotFoundError("Missing key 'release'->'tag_name'") from e | |
commands = [ | |
self.run(["git", "fetch", "--all"]), | |
self.run(["git", "submodule", "update", "--init", "--checkout", | |
"--force", "--recursive"]), | |
self.run(["git", "checkout", "tags/{}".format(tag_name)]), | |
self.run(["bundle", "exec", "jekyll", "build", "-d", self.target_dir]), | |
] | |
return 200, {'message': 'Build successful', 'commands': commands} | |
def run(self, args): | |
try: | |
with subprocess.Popen(args, stdin=subprocess.DEVNULL, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
close_fds=True, shell=False, cwd=self.repo_dir, | |
universal_newlines=True, | |
restore_signals=True) as process: | |
try: | |
stdout, stderr = process.communicate(timeout=60) | |
except subprocess.TimeoutExpired as e: | |
process.kill() | |
stdout, stderr = process.communicate() | |
raise HTTPInternalServerError( | |
"Command '{}' took more than {}s to complete:\n{}" | |
.format(process.args, e.timeout, stdout)) | |
if process.returncode != 0: | |
raise HTTPInternalServerError( | |
"Command '{}' returned non-zero exit status {}:\n{}" | |
.format(process.args, process.returncode, stdout)) | |
return {'cmd': process.args, 'output': stdout} | |
except (OSError, subprocess.SubprocessError) as e: | |
raise HTTPInternalServerError(500, str(e)) from e | |
def main(): | |
argparser = argparse.ArgumentParser(description='Github hook handler') | |
argparser.add_argument('-b', '--bind', help='Address to bind to', default="") | |
argparser.add_argument('-p', '--port', type=int, help='Port to listen on', default=8080) | |
args = argparser.parse_args() | |
import config | |
app = WSGIApp(JekyllBuildHandler, handler_kwargs={ | |
'secret': config.SECRET, | |
'repo_dir': config.REPO_DIR, | |
'target_dir': config.TARGET_DIR}) | |
server = wsgiref.simple_server.make_server(args.bind, args.port, app) | |
try: | |
server.serve_forever() | |
except KeyboardInterrupt: | |
pass | |
server.server_close() | |
return os.EX_OK | |
if __name__ == '__main__': | |
sys.exit(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hook | |
import config | |
application = hook.WSGIApp(hook.JekyllBuildHandler, handler_kwargs={ | |
'secret': config.SECRET, | |
'repo_dir': config.REPO_DIR, | |
'target_dir': config.TARGET_DIR}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pytest | |
requests |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import concurrent.futures | |
import hashlib | |
import hmac | |
import io | |
import json | |
import os | |
import random | |
import socket | |
import string | |
import subprocess | |
import sys | |
import tempfile | |
import time | |
import unittest.mock | |
import urllib.parse | |
import uuid | |
import wsgiref.util | |
import wsgiref.validate | |
import pytest | |
import hook | |
@pytest.yield_fixture | |
def server(): | |
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) | |
env = { | |
'SECRET': SECRET, | |
'REPO_DIR': '/tmp', | |
'TARGET_DIR': '/tmp', | |
} | |
argv = ["./hook.py", "--bind", HOST, "--port", str(PORT)] | |
patch = unittest.mock.patch | |
class TestingHandler(hook.JekyllBuildHandler): | |
secret = SECRET | |
repo_dir = '/tmp' | |
target_dir = '/tmp' | |
server = http.server.HTTPServer((HOST, PORT), TestingHandler) | |
with executor: | |
future = executor.submit(server.serve_forever) | |
yield future, TestingHandler | |
if not future.done(): | |
server.shutdown() | |
future.result() | |
@pytest.fixture | |
def repo_dir(): | |
return '/tmp' | |
@pytest.fixture | |
def target_dir(): | |
return '/tmp' | |
@pytest.fixture | |
def secret(): | |
gen = random.SystemRandom() | |
return ''.join(gen.choice(string.ascii_letters) | |
for i in range(20)).encode('ascii') | |
@pytest.fixture | |
def app(secret, repo_dir, target_dir): | |
app = hook.WSGIApp(hook.JekyllBuildHandler, handler_kwargs={ | |
'secret': secret, | |
'repo_dir': repo_dir, | |
'target_dir': target_dir, | |
}) | |
return wsgiref.validate.validator(app) | |
def assert_response_headers(headers): | |
headers = dict(headers) | |
assert 'Content-Length' in headers | |
assert 'Content-Type' in headers | |
assert headers['Content-Type'] == 'application/json' | |
def assert_json_response(expected_code, response): | |
code, reason, headers, content = response | |
assert_response_headers(headers) | |
assert isinstance(json.loads(content.decode('utf-8')), dict) | |
assert expected_code == code | |
def create_message(secret, event, message, form_encoded=False): | |
payload = json.dumps(message) | |
if form_encoded: | |
content_type = 'application/x-www-form-urlencoded' | |
content = urllib.parse.urlencode({'payload': payload}, | |
errors='strict').encode('utf-8') | |
else: | |
content_type = 'application/json' | |
content = payload.encode('utf-8') | |
mac = hmac.new(secret, content, digestmod=hashlib.sha1) | |
headers = { | |
'X-GitHub-Delivery': str(uuid.uuid4()), | |
'X-GitHub-Event': event, | |
'X-Hub-Signature': "sha1=" + mac.hexdigest(), | |
'Content-Type': content_type, | |
} | |
return headers, content | |
def make_request(app, method, headers, content): | |
environ = { | |
('' if name in {'Content-Length', 'Content-Type'} else 'HTTP_') + | |
name.upper().replace('-', '_'): value | |
for name, value in headers.items() | |
} | |
errors = io.StringIO() | |
output = io.BytesIO() | |
input = io.BytesIO(content) | |
environ.update({ | |
'REMOTE_ADDR': '127.0.0.1', | |
'REMOTE_PORT': '65534', | |
'SERVER_ADDR': '127.0.0.1', | |
'SERVER_NAME': 'localhost', | |
'SERVER_PORT': '8080', | |
'SERVER_SOFTWARE': 'test_hook.py', | |
'REQUEST_METHOD': method, | |
'QUERY_STRING': '', | |
'wsgi.input': input, | |
'wsgi.errors': errors, | |
}) | |
wsgiref.util.setup_testing_defaults(environ) | |
saved_status_code = None | |
saved_reason_phrase = None | |
saved_response_headers = None | |
saved_exc_info = None | |
def start_response(status, response_headers, exc_info=None): | |
nonlocal saved_status_code, saved_reason_phrase, \ | |
saved_response_headers, saved_exc_info | |
code, sep, saved_reason_phrase = status.partition(' ') | |
saved_status_code = int(code) | |
saved_response_headers = response_headers | |
m = output.getbuffer() | |
size = m.nbytes | |
m.release() | |
if size > 0: | |
raise exc_info[1].with_traceback(exc_info[2]) | |
saved_exc_info = exc_info | |
return output.write | |
try: | |
chunks = app(environ, start_response) | |
for chunk in chunks: | |
output.write(chunk) | |
chunks.close() | |
except: | |
raise | |
finally: | |
sys.stderr.write(errors.getvalue()) | |
sys.stderr.flush() | |
response_content = output.getvalue() | |
if saved_exc_info: | |
raise saved_exc_info[1].with_traceback(saved_exc_info[2]) | |
return saved_status_code, saved_reason_phrase, saved_response_headers, \ | |
response_content | |
def test_get(app): | |
assert_json_response(200, make_request(app, "GET", {}, b'')) | |
def test_head(app): | |
code, reason, headers, content = make_request(app, "HEAD", {}, b'') | |
assert_response_headers(headers) | |
assert code == 200 | |
assert content == b'' | |
@pytest.mark.parametrize('form_encoded', [ | |
True, | |
False, | |
]) | |
def test_valid_ping(app, secret, form_encoded): | |
headers, content = create_message(secret, 'ping', {}, form_encoded) | |
assert_json_response(200, make_request(app, "POST", headers, content)) | |
def test_invalid_event(app, secret): | |
headers, content = create_message(secret, 'invalid', {}) | |
assert_json_response(400, make_request(app, "POST", headers, content)) | |
@pytest.mark.parametrize('header', [ | |
'X-GitHub-Delivery', | |
'X-GitHub-Event', | |
'X-Hub-Signature', | |
'Content-Type', | |
]) | |
def test_missing_header(app, secret, header): | |
headers, content = create_message(secret, 'ping', {}) | |
headers.pop(header) | |
assert_json_response(400, make_request(app, "POST", headers, content)) | |
def test_invalid_signature(app, secret): | |
headers, content = create_message(secret, 'ping', {}) | |
content = content + b'123' | |
assert_json_response(401, make_request(app, "POST", headers, content)) | |
def test_valid_release(app, secret): | |
tag_name = 'foobar' | |
headers, content = create_message(secret, 'release', {'release': {'tag_name': tag_name}}) | |
def Popen(args, **kwargs): | |
mock = unittest.mock.MagicMock(spec=subprocess.Popen, autospec=True) | |
attrs = { | |
'args': args, | |
'__enter__.return_value': mock, | |
'communicate.return_value': ('Mocked stdout output\n', None), | |
'returncode': 0, | |
} | |
mock.configure_mock(**attrs) | |
return mock | |
patched_Popen = unittest.mock.patch.object( | |
subprocess, 'Popen', autospec=True, | |
side_effect=Popen) | |
with patched_Popen: | |
response = make_request(app, "POST", headers, content) | |
assert_json_response(200, response) | |
kwargs = { | |
'cwd':'/tmp', | |
'stdin': subprocess.DEVNULL, | |
'stdout': subprocess.PIPE, | |
'stderr': subprocess.STDOUT, | |
'close_fds': True, | |
'restore_signals': True, | |
'shell': False, | |
'universal_newlines': True, | |
} | |
subprocess.Popen.assert_has_calls([ | |
unittest.mock.call( | |
["git", "fetch", "--all"], | |
**kwargs), | |
unittest.mock.call( | |
["git", "submodule", "update", "--init", "--checkout", | |
"--force", "--recursive"], | |
**kwargs), | |
unittest.mock.call( | |
["git", "checkout", "tags/{}".format(tag_name)], | |
**kwargs), | |
unittest.mock.call( | |
["bundle", "exec", "jekyll", "build", "-d", "/tmp"], | |
**kwargs), | |
]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment