|
""" |
|
Script to get full list of containers in local registry. |
|
|
|
--- |
|
MIT License |
|
|
|
Copyright (c) 2021 Dr John A Stevenson |
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy |
|
of this software and associated documentation files (the "Software"), to deal |
|
in the Software without restriction, including without limitation the rights |
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
|
copies of the Software, and to permit persons to whom the Software is |
|
furnished to do so, subject to the following conditions: |
|
|
|
The above copyright notice and this permission notice shall be included in all |
|
copies or substantial portions of the Software. |
|
|
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
|
SOFTWARE. |
|
""" |
|
from concurrent.futures import ThreadPoolExecutor |
|
import datetime as dt |
|
import json |
|
import logging |
|
import os |
|
from pathlib import Path |
|
from pprint import pformat |
|
from tempfile import NamedTemporaryFile |
|
import subprocess |
|
import sys |
|
from typing import Dict, List, Tuple, Iterator, Optional, Union |
|
|
|
# Define types (should use TypedDict but that isn't in standard library on Python 3.6) |
|
ScanResult = List[Dict[str, Union[str, List[Dict[str, str]]]]] |
|
ScanResultList = List[Tuple[str, Optional[ScanResult]]] |
|
VulnerableImages = Dict[str, List[str]] |
|
|
|
DXF_HOST = os.environ['DXF_HOST'] |
|
DXF_USERNAME = os.environ['DXF_USERNAME'] |
|
BAD_CVES = { |
|
'CVE-2021-44228', # RCE bug for Java's log4j logging library |
|
'CVE-2021-45046', # Relates to CVE-2021-44228 log4j vulnerability |
|
} |
|
|
|
|
|
def scan_registry(): |
|
"""Scan all the containers on DXF_HOST registry for BAD_CVES.""" |
|
logging.info('Starting run at %s', dt.datetime.now()) |
|
|
|
# Use 'generator pipeline' pattern to get container scan results |
|
repos = (repo for repo in list_repos()) |
|
images = [alias for repo in repos for alias in list_aliases(repo)] |
|
results = scan_many(images) |
|
logging.info('%s containers scanned', len(results)) |
|
|
|
# Extract list of containers with each vulnerability |
|
vulnerable_images = list_vulnerable(results, BAD_CVES) |
|
|
|
logging.info('Vulnerable images:\n%s', pformat(vulnerable_images)) |
|
logging.info('Completing run at %s', dt.datetime.now()) |
|
|
|
return vulnerable_images |
|
|
|
|
|
def list_repos() -> Iterator[str]: |
|
"""Use DXF to get a list of containers using connection details provided |
|
in environment variables.""" |
|
# List repos |
|
logging.info("Listing repos on %s (as %s)", DXF_HOST, DXF_USERNAME) |
|
result = subprocess.run(['dxf', 'list-repos'], stdout=subprocess.PIPE, |
|
check=True) |
|
|
|
repos = [repo.decode('utf-8') for repo in result.stdout.split()] |
|
for repo in repos: |
|
logging.debug("Repo: %s", repo) |
|
yield repo |
|
|
|
|
|
def list_aliases(repo: str) -> Iterator[str]: |
|
"""Use DXF to yield tags (aliases) for a container.""" |
|
try: |
|
result = subprocess.run( |
|
['dxf', 'list-aliases', repo], check=True, |
|
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) |
|
aliases = result.stdout.decode('utf-8').split() |
|
|
|
except subprocess.CalledProcessError as exc: |
|
aliases = [] |
|
|
|
if exc.returncode == 13: |
|
logging.error("Unauthorized for %s", repo) |
|
elif exc.returncode == 2: |
|
logging.error("Not found for %s", repo) |
|
else: |
|
raise |
|
|
|
for alias in aliases: |
|
yield f'{repo}:{alias}' |
|
|
|
|
|
def scan_many(images: List[str]) -> ScanResultList: |
|
"""Scan all the images for a repo. Use Threads for concurrent |
|
execution. Return a list of image: result dictionaries""" |
|
with ThreadPoolExecutor(max_workers=8) as executor: |
|
results = executor.map(scan_image, images, timeout=5) |
|
|
|
return list(zip(images, results)) |
|
|
|
|
|
def scan_image(image: str) -> Optional[ScanResult]: |
|
"""Run Trivy scan on remote image and return results as dictionary created |
|
from the output JSON.""" |
|
logging.info('Scanning %s', image) |
|
tempfile = NamedTemporaryFile(delete=False) |
|
|
|
try: |
|
# Trivy scan. --light uses smaller DB without descriptions |
|
subprocess.run( |
|
['trivy', 'image', '--light', '-f', 'json', '-o', tempfile.name, |
|
f'{DXF_HOST}/{image}'], |
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) |
|
|
|
# Read JSON data from temporary file |
|
with open(tempfile.name, encoding='utf-8') as results_file: |
|
scan_result = json.load(results_file) |
|
|
|
except subprocess.CalledProcessError as exc: |
|
# Null result if scan failed |
|
scan_result = None |
|
logging.error("Failed to scan %s (returncode %s)\n%s", |
|
image, exc.returncode, exc) |
|
finally: |
|
os.remove(tempfile.name) |
|
|
|
return scan_result |
|
|
|
|
|
def list_vulnerable(results: ScanResultList, |
|
bad_cves: List[str]) -> VulnerableImages: |
|
"""List the images vulnerable to each of the bad_cves.""" |
|
vulnerable_images: VulnerableImages = {'Scan failed': []} |
|
|
|
for cve in bad_cves: |
|
vulnerable_images[cve] = [] |
|
for alias, scan_result in results: |
|
if scan_result: |
|
if has_cve(cve, scan_result): |
|
vulnerable_images[cve].append(alias) |
|
else: |
|
vulnerable_images['Scan failed'].append(alias) |
|
|
|
return vulnerable_images |
|
|
|
|
|
def has_cve(cve: str, scan_result: ScanResult) -> bool: |
|
"""Check whether a scan result includes the CVE.""" |
|
# Extract CVEs from scan_result |
|
scan_cves = [] |
|
for target in scan_result: |
|
vulnerabilities = target.get('Vulnerabilities') |
|
if vulnerabilities: |
|
ids = [v['VulnerabilityID'] for v in vulnerabilities] |
|
scan_cves.extend(ids) |
|
|
|
return cve in scan_cves |
|
|
|
|
|
if __name__ == "__main__": |
|
# Set up logging |
|
logging.basicConfig(stream=sys.stdout, level=logging.INFO) |
|
|
|
# Run script |
|
vulnerable_images = scan_registry() |
|
output_file = Path.cwd() / 'vulnerable_images.json' |
|
output_file.write_text(json.dumps(vulnerable_images)) |