"""Module to handle downloading and verification of Gentoo images To ensure accuracy, we re-download every .txt file if it's older than one day. We assume that people building a cloud configured image want what is most up to date. If you have a specific image you want built over and over regardless, create a config file and load it in using -c/--config that points GENTOO_* values to the files you want. """ import os import re import sys from datetime import date import hashlib import progressbar from urllib.request import urlretrieve import tempfile from gentooimgr import LOG import gentooimgr.config as config from gentooimgr.common import older_than_a_day hashpattern = re.compile(config.GENTOO_FILE_HASH_RE, re.MULTILINE) isopattern = re.compile(config.GENTOO_FILE_ISO_RE, re.MULTILINE) isohashpattern = re.compile(config.GENTOO_FILE_ISO_HASH_RE, re.MULTILINE) stage3pattern = re.compile(config.GENTOO_FILE_STAGE3_RE, re.MULTILINE) stage3hashpattern = re.compile(config.GENTOO_FILE_STAGE3_HASH_RE, re.MULTILINE) class DownloadProgressBar(): def __init__(self): self.progress = None def __call__(self, block_num, block_size, total_size): if not self.progress: self.progress = progressbar.ProgressBar(maxval=total_size) self.progress.start() downloaded = block_num * block_size if downloaded < total_size: self.progress.update(downloaded) else: self.progress.finish() def parse_latest_iso_text(fullpath) -> tuple: """Returns a tuple of (hash type, iso name, iso bytes)""" with open(fullpath) as f: content = f.read() m_hash = hashpattern.search(content) m_iso = isopattern.search(content) return (m_hash.group(1) if not m_hash is None else None, m_iso.group(1) if not m_iso is None else None, m_iso.group(2) if not m_iso is None else None,) def parse_latest_stage3_text(fullpath) -> tuple: """Returns a tuple of (hash type, iso name, iso bytes) """ with open(fullpath) as f: content = f.read() m_hash = hashpattern.search(content) m_stage3 = stage3pattern.search(content) return (m_hash.group(1) if not m_hash is None else None, m_stage3.group(1) if not m_stage3 is None else None, m_stage3.group(2) if not m_stage3 is None else None,) def verify(args, _type: str, baseurl: str, hashpattern, filename: str) -> bool: """Downloads hash file and run a hash check on the file :Parameters: - args: Namespace of parsed arguments - _type: str hash type - baseurl: (remote) folder where hashsum file is contained - hashpattern: - filename: str name of file to check (used to download corresponding hash file) A install-amd64-minimal-2023111iso2T170154Z.iso file will have a install-amd64-minimal-20231112T170154Z.iso.sha256 for example. :Returns: Whether iso was verified using the specified hash """ thefile = os.path.join(args.download_dir, filename) LOG.info(f"verifying hash of {thefile}") digest = hashlib.file_digest(open(thefile, 'rb'), _type.lower()) filename = filename+f".{_type.lower()}" # Update to hash file hashfile = os.path.join(baseurl, filename) fullpath = os.path.join(args.download_dir, os.path.basename(hashfile)) if not os.path.exists(fullpath) or args.redownload or older_than_a_day(fullpath): LOG.info(f"Downloading {filename}") urlretrieve(hashfile, fullpath, DownloadProgressBar()) hd = digest.hexdigest() with open(fullpath, 'r') as f: content = f.read() m_hash = hashpattern.search(content) _hash = m_hash.group(1) assert hd == _hash, f"Hash mismatch {hd} != {_hash}" def download_stage3(args, url=None) -> str: if url is None: if args.profile == "systemd": url = os.path.join(config.GENTOO_BASE_STAGE_SYSTEMD_URL, config.GENTOO_LATEST_STAGE_SYSTEMD_FILE) else: url = os.path.join(config.GENTOO_BASE_STAGE_OPENRC_URL, config.GENTOO_LATEST_STAGE_OPENRC_FILE) filename = os.path.basename(url) fullpath = os.path.join(args.download_dir, filename) if not os.path.exists(fullpath) or args.redownload or older_than_a_day(fullpath): print(f"Downloading {filename}") urlretrieve(url, fullpath, DownloadProgressBar()) hashtype, latest, size = parse_latest_stage3_text(fullpath) size = int(size) filename = latest fullpath = os.path.join(args.download_dir, filename) if not os.path.exists(fullpath) or args.redownload: LOG.info(f"Downloading {filename}") url = os.path.join( config.GENTOO_BASE_STAGE_SYSTEMD_URL if args.profile == "systemd" else \ config.GENTOO_BASE_STAGE_OPENRC_URL, filename) urlretrieve(url, fullpath, DownloadProgressBar()) # Verify byte size stage3size = os.path.getsize(fullpath) assert size == stage3size, f"Stage 3 size {size} does not match expected value {stage3size}." verify(args, hashtype, config.GENTOO_BASE_STAGE_SYSTEMD_URL if args.profile == "systemd" else \ config.GENTOO_BASE_STAGE_OPENRC_URL, stage3hashpattern, filename) return fullpath def download_portage(args, url=None) -> str: """Handle downloading of portage system for installation into cloud image We always download the latest portage package and rename it to today's date. If using today's date to grab portage, sometimes depending on timezone, the package won't be available. If always using latest, worst case scenario is you have a portage package a day late. """ if url is None: url = config.GENTOO_PORTAGE_FILE base = os.path.basename(url) # Uses 'latest' filename today = date.today() # Write latest to today's date so we don't constantly redownload, but filename = base.replace("latest", "%d%d%d" % (today.year, today.month, today.day)) fullpath = os.path.join(args.download_dir, filename) # Portage is always "latest" in this case, so definitely check if older than a day and redownload. if not os.path.exists(fullpath) or args.redownload or older_than_a_day(fullpath): LOG.info(f"Downloading {filename} ({base})") urlretrieve(url, fullpath, DownloadProgressBar()) return fullpath def download(args, url=None) -> str: """Download txt file with iso name and hash type :Parameters: - args: Namespace with parsed arguments - url: str or None. If None, will generate a url to the latest minimal install iso :Returns: Full path to the downloaded iso file Will cause program to exit if iso byte size fails to match expected value. """ if url is None: url = os.path.join(config.GENTOO_BASE_ISO_URL, config.GENTOO_LATEST_ISO_FILE) # Download the latest txt file filename = os.path.basename(url) fullpath = os.path.join(args.download_dir, filename) if not os.path.exists(fullpath) or args.redownload or older_than_a_day(fullpath): LOG.info(f"Downloading {fullpath}") urlretrieve(url, fullpath, DownloadProgressBar()) hashtype, latest, size = parse_latest_iso_text(fullpath) size = int(size) # Download the iso file filename = latest fullpath = os.path.join(args.download_dir, filename) if not os.path.exists(fullpath) or args.redownload: LOG.info(f"Downloading {filename}") url = os.path.join(config.GENTOO_BASE_ISO_URL, filename) urlretrieve(url, fullpath, DownloadProgressBar()) # Verify byte size isosize = os.path.getsize(fullpath) assert size == isosize, f"ISO size {size} does not match expected value {isosize}." verify(args, hashtype, config.GENTOO_BASE_ISO_URL, isohashpattern, filename) return fullpath