Last active
May 30, 2022 22:34
-
-
Save xynova/93c7d1e6af2a5d1ad9287ed6bccb44d8 to your computer and use it in GitHub Desktop.
Terraform Registry module reference rewrite server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from git import Repo # pip install gitpython | |
import requests # pip install requests | |
from http.server import BaseHTTPRequestHandler, HTTPServer | |
import ssl | |
import time | |
import logging | |
import urllib.parse | |
import os | |
import shutil | |
import fnmatch | |
import re | |
import hashlib | |
import uuid | |
import getpass | |
import sys, errno | |
import logging | |
logger = logging.getLogger("mylogger") | |
logging.basicConfig(stream=sys.stdout, level=logging.INFO) | |
# These get set when deploying as a container | |
serverHost = os.getenv('SERVER_HOST', "tfstacks.ourdomain.com.au") | |
serverPort = os.getenv('SERVER_PORT', "443") | |
serverProtocol = os.getenv('SERVER_PROTOCOL', "https") | |
ifce_address = "0.0.0.0" | |
ifce_port = os.getenv('IFCE_PORT', serverPort) | |
ifce_protocol = os.getenv('IFCE_PROTOCOL', serverProtocol) | |
ifce_tls_keyfile = os.getenv('IFCE_TLS_KEYFILE', "/tmp/ssl/tls.key") | |
ifce_tls_certfile = os.getenv('IFCE_TLS_CERTFILE', "/tmp/ssl/tls.crt") | |
gitServer = os.getenv('GIT_SERVER', 'https://gitlab.ourdomain.com.au') | |
gitRepo = os.getenv('GIT_REPO', '/plat/grunt/terraform-stacks.git') | |
gitTokenPath = os.getenv('GIT_TOKEN_PATH', '/usr/src/app/secrets/gitToken') | |
gitlabProjectId = os.getenv('GITLAB_PROJECT_ID', '117') | |
def getGitToken(): | |
try: | |
file = open(gitTokenPath, 'r') | |
logger.info('GitLab token read from ' + gitTokenPath + ' and configured.') | |
return file.readline().strip() | |
except: | |
logger.info('No GitLab token at ' + gitTokenPath + ' prompting for token instead.') | |
return getpass.getpass("Enter gitlab auth token (press ENTER if none): ") | |
gitToken = getGitToken() | |
class MyServer(BaseHTTPRequestHandler): | |
def write_response(self,bytes): | |
try: | |
self.wfile.write(bytes) | |
except BrokenPipeError: # ignore broken pipe as socket might have been closed | |
pass | |
# OVERRIDE DEFAULT LOG HANDLER | |
def log_message(self, format, *args): | |
# Don't log health checks | |
for arg in args: | |
if arg == "GET /healthz HTTP/1.1" or arg == "GET /ready HTTP/1.1": | |
return | |
logger.info("%s - - [%s] %s\n" % | |
(self.address_string(), | |
self.log_date_time_string(), | |
format%args)) | |
def do_GET(self): | |
o = urllib.parse.urlparse(self.path) | |
qs = urllib.parse.parse_qs(o.query) | |
mrId=(qs.get("mr") or [""])[0] | |
try: | |
if o.path == "/healthz" or o.path == "/ready": | |
self.send_response(200) | |
self.send_header('Contenty-type', 'application.json') | |
self.end_headers() | |
self.write_response(b'{ "status": "ok" }\n') | |
return | |
elif qs.get('terraform-get', None): | |
logger.info("PATH -> with-tf-get") | |
self.send_response_only(200) | |
reply = ''.join([serverProtocol,'://', serverHost, ':', serverPort, o.path, 'download.zip?mr=' , mrId]) | |
self.send_header('x-terraform-get', reply ) | |
self.end_headers() | |
return | |
elif o.path.endswith(".zip"): | |
logger.info("PATH -> zip file") | |
modulePath = os.path.dirname(o.path) | |
zipContents = createZip(mrId, modulePath) | |
self.send_response(200) | |
self.send_header('Content-type', 'application/zip') | |
self.end_headers() | |
self.write_response(zipContents) | |
return | |
else: | |
if not o.path.endswith("/"): | |
raise TypeError(f"Path {o.path} must end with '/'") | |
modulePath = os.path.dirname(o.path) | |
zipContents = createZip(mrId, modulePath) | |
self.send_response(200) | |
self.send_header('Content-type', 'text/html') | |
self.end_headers() | |
self.write_response(b'The request is valid and has backing files. \n') | |
return | |
except BaseException as err: | |
msg=f"ERROR: {err}\n" | |
logger.error(msg) | |
self.send_response(500) | |
self.send_header('Contenty-type', 'text/html') | |
self.end_headers() | |
self.write_response(msg.encode('utf-8')) | |
pass | |
return | |
def do_HEAD(self): | |
o = urllib.parse.urlparse(self.path) | |
qs = urllib.parse.parse_qs(o.query) | |
logger.info(o) | |
logger.info("PATH -> method head") | |
self.send_response_only(200) | |
self.end_headers() | |
return | |
def createZip(mrId,modulePath): | |
logger.info(f'modulePath is {modulePath}') | |
gitDir = ensureGitDir(mrId) | |
tfPath = gitDir + modulePath | |
tmpName = str(uuid.uuid4()) | |
tmpDir = gitDir + '/' + tmpName | |
logger.info('Copy ' + tfPath +' into ' + tmpDir) | |
shutil.copytree(tfPath, tmpDir) | |
findExp = r'^(\s*)\bsource\s*=\s*\"/workdir((/[\w-]+)+)/?\"' # this should be configurable | |
replaceExp = r''.join(['\\1','source="'+ serverProtocol +'://', serverHost, ':', str(serverPort), '\\2/download.zip','?', 'mr=' + mrId , '"']) | |
findReplace(tmpDir, findExp, replaceExp, "*.tf") | |
zipPath = shutil.make_archive(tmpDir, 'zip', tmpDir) | |
logger.info('Created modified module @ ' + zipPath) | |
f = open(zipPath,'rb') | |
zipContents=f.read() | |
f.close() | |
# cleanup | |
if os.path.exists(tmpDir): | |
shutil.rmtree(tmpDir) | |
if os.path.isfile(zipPath): | |
os.remove(zipPath) | |
return zipContents | |
def ensureGitDir(mrId): | |
gitDir = getGitDir(mrId) | |
repo = None | |
if not os.path.isdir(gitDir): | |
logger.info("Cloning into " + gitDir) | |
gitUrl = gitServer.replace('://', '://oauth2:'+ gitToken +'@') if gitToken else gitServer | |
Repo.clone_from(gitUrl + gitRepo, gitDir) | |
repo = Repo(gitDir) | |
logger.info("Fetch latest into " + gitDir) | |
repo.git.fetch() | |
if gitServer.startswith("git://") and mrId: | |
logger.info(gitServer) | |
logger.info("Checkout tag "+ mrId +" into " + gitDir) | |
repo.git.checkout(mrId) | |
elif mrId: | |
my_headers = {'PRIVATE-TOKEN' : gitToken} | |
response = requests.get(gitServer +'/api/v4/projects/'+ gitlabProjectId +'/merge_requests/' + mrId, headers=my_headers) | |
sha = response.json()['sha'] | |
logger.info("Fetch "+ sha +" into " + gitDir) | |
repo.remotes.origin.fetch(sha) | |
repo.git.checkout(sha) | |
logger.info('MR: ' + mrId + ' is at ' + sha ) | |
return gitDir | |
def getGitDir(mrId): | |
return getWorkDir() + '/' + (mrId or 'master') | |
def getWorkDir(): | |
return '/tmp/tf-versioning-wd' | |
def findReplace(directory, find, replace, filePattern): | |
for pth, dirs, files in os.walk(os.path.abspath(directory)): | |
for filename in fnmatch.filter(files, filePattern): | |
filepath = os.path.join(pth, filename) | |
with open(filepath) as f: | |
s = f.read() | |
s = re.sub(find,replace, s, flags=re.M) | |
with open(filepath, "w") as f: | |
f.write(s) | |
if __name__ == "__main__": | |
webServer = HTTPServer((ifce_address, int(ifce_port)), MyServer) | |
if ifce_protocol == "https": | |
webServer.socket = ssl.wrap_socket (webServer.socket, keyfile=ifce_tls_keyfile, certfile=ifce_tls_certfile, server_side=True) | |
logger.info(f"Server started {ifce_protocol}://%s:%s" % webServer.server_address) | |
try: | |
webServer.serve_forever() | |
except KeyboardInterrupt: | |
pass | |
webServer.server_close() | |
logger.info("Server stopped.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment