# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 - # from https://github.com/nusenu/trustor-poc # with minor refactoring to make the code more Pythonic. import datetime import os import re import sys import ipaddress import warnings import urllib3.util from urllib3.util import parse_url as urlparse from stem.control import Controller # from stem.util.tor_tools import * try: # unbound is not on pypi from unbound import RR_CLASS_IN, RR_TYPE_TXT, ub_ctx except: ub_ctx = RR_TYPE_TXT = RR_CLASS_IN = None global LOG import logging warnings.filterwarnings('ignore') LOG = logging.getLogger() logging.getLogger("urllib3").setLevel(logging.INFO) # import urllib3.contrib.pyopenssl # urllib3.contrib.pyopenssl.inject_into_urllib3() # download this python library from # https://github.com/erans/torcontactinfoparser # sys.path.append('/home/....') try: from torcontactinfo import TorContactInfoParser except: TorContactInfoParser = None class TrustorError(Exception): pass # https://stackoverflow.com/questions/2532053/validate-a-hostname-string # FIXME this check allows non-fqdn names def is_valid_hostname(hostname): if len(hostname) > 255: return False if hostname[-1] == ".": hostname = hostname[:-1] # strip exactly one dot from the right, if present allowed = re.compile("(?!-)[A-Z0-9-]{1,63}(? 0: if 'ciissversion' in parsed_ci and 'proof' in parsed_ci and 'url' in parsed_ci: prooftype = parsed_ci['proof'] ciurl = parsed_ci['url'] if parsed_ci['ciissversion'] in accepted_ciissversions and prooftype in accepted_proof_types: if ciurl.startswith('http://') or ciurl.startswith('https://'): try: domain = urlparse(ciurl).netloc except: LOG.warning('failed to parse domain %s' % ciurl) domain = 'error' continue else: domain = ciurl if not is_valid_hostname(domain): domain = 'error' continue # we can ignore relays that do not claim to be operated by a trusted operator # if we do not accept all if domain not in trusted_domains and not accept_all: continue if domain in result.keys(): if prooftype in result[domain].keys(): result[domain][prooftype].append(fingerprint) else: result[domain] = {prooftype: [fingerprint]} # mixed proof types are not allowd as per spec but we are not strict here LOG.warning('%s is using mixed prooftypes %s' % (domain, prooftype)) else: result[domain] = {prooftype: [fingerprint]} return result def oDownloadUrlRequests(uri, sCAfile, timeout=30, host='127.0.0.1', port=9050, content_type='text/plain', session=None): import requests # socks proxy used for outbound web requests (for validation of proofs) proxy = {'https': "socks5h://{host}:{port}"} # we use this UA string when connecting to webservers to fetch rsa-fingerprint.txt proof files # https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/#uri-rsa headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0'} LOG.debug("fetching %s...." % uri) try: # grr. fix urllib3 # urllib3.connection WARNING Certificate did not match expected hostname: head = requests.head(uri, timeout=timeout, proxies=proxy, headers=headers) except Exception as e: LOG.exception(f"{e}") raise TrustorError(f"HTTP HEAD request failed for {uri} {e}") if head.status_code >= 300: raise TrustorError(f"HTTP Errorcode {head.status_code}") if not head.headers['Content-Type'].startswith('text/plain'): raise TrustorError(f"HTTP Content-Type != text/plain") if not os.path.exists(sCAfile): raise TrustorError(f"File not found CAfile {sCAfile}") if session is None: session = requests.sessions.Session() try: oReqResp = session.request(method="get", url=uri, proxies=proxy, timeout=timeout, headers=headers, allow_redirects=False, verify=True ) except: LOG.warn("HTTP GET request failed for %s" % uri) raise if oReqResp.status_code != 200: raise TrustorError(f"HTTP Errorcode {head.status_code}") if not oReqResp.headers['Content-Type'].startswith('text/plain'): raise TrustorError(f"HTTP Content-Type != text/plain") # check for redirects (not allowed as per spec) if oReqResp.url != uri: LOG.error(f'Redirect detected {uri} vs %s (final)' % (oReqResp.url)) raise TrustorError(f'Redirect detected {uri} vs %s (final)' % (oReqResp.url)) return oReqResp # There's no point in using asyncio because of duplicate urls in the tasks async def oDownloadUrlHttpx(uri, sCAfile, timeout=30, host='127.0.0.1', port=9050, content_type='text/plain'): import httpcore import asyncio import httpx # socks proxy used for outbound web requests (for validation of proofs) if host and port: proxy = "socks5://{host}:{port}" else: proxy = '' # we use this UA string when connecting to webservers to fetch rsa-fingerprint.txt proof files # https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/#uri-rsa headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0'} LOG.debug("fetching %s...." % uri) async with httpx.AsyncClient(proxies=proxy) as client: try: # https://www.python-httpx.org/advanced/ head = await client.head(uri, timeout=timeout, headers=headers) except Exception as e: LOG.exception(f"{e}") raise TrustorError(f"HTTP HEAD request failed for {uri} {e}") if head.status_code >= 300: raise TrustorError(f"HTTP Errorcode {head.status_code}") if content_type and not head.headers['Content-Type'].startswith(content_type): raise TrustorError(f"HTTP Content-Type != {content_type}" ) if not os.path.exists(sCAfile): raise TrustorError(f"File not found CAfile {sCAfile}") try: oReqResp = await client.get(url=uri, timeout=timeout, headers=headers, max_redirects=0, verify=sCAfile, ) except (asyncio.exceptions.CancelledError, httpcore.PoolTimeout, Exception,) as e: LOG.warn(f"HTTP GET request failed for %s {e}" % uri) raise if oReqResp.status_code != 200: LOG.warn(f"HTTP Errorcode {head.status_code}") raise TrustorError(f"HTTP Errorcode {head.status_code}") if not oReqResp.headers['Content-Type'].startswith('text/plain'): LOG.warn(f"HTTP Content-Type != text/plain") raise TrustorError(f"HTTP Content-Type != text/plain") # check for redirects (not allowed as per spec) if oReqResp.url != uri: LOG.error(f'Redirect detected {uri} vs %s (final)' % (oReqResp.url)) raise TrustorError(f'Redirect detected {uri} vs %s (final)' % (oReqResp.url)) return oReqResp def ballow_subdomain_matching(hostname, dnsnames): for elt in dnsnames: if len(hostname.split('.')) > len(elt.split('.')) and hostname.endswith(elt): # parent return True return False from urllib3.util.ssl_match_hostname import (CertificateError, _dnsname_match, _ipaddress_match) def my_match_hostname(cert, hostname): """Verify that *cert* (in decoded format as returned by SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 and RFC 6125 rules are followed, but IP addresses are not accepted for *hostname*. CertificateError is raised on failure. On success, the function returns nothing. """ if not cert: raise ValueError( "empty or no certificate, match_hostname needs a " "SSL socket or SSL context with either " "CERT_OPTIONAL or CERT_REQUIRED" ) try: # Divergence from upstream: ipaddress can't handle byte str host_ip = ipaddress.ip_address(hostname) except (UnicodeError, ValueError): # ValueError: Not an IP address (common case) # UnicodeError: Divergence from upstream: Have to deal with ipaddress not taking # byte strings. addresses should be all ascii, so we consider it not # an ipaddress in this case host_ip = None except AttributeError: # Divergence from upstream: Make ipaddress library optional if ipaddress is None: host_ip = None else: # Defensive raise dnsnames = [] san = cert.get("subjectAltName", ()) for key, value in san: if key == "DNS": if host_ip is None and _dnsname_match(value, hostname): return dnsnames.append(value) elif key == "IP Address": if host_ip is not None and _ipaddress_match(value, host_ip): return dnsnames.append(value) if not dnsnames: # The subject is only checked when there is no dNSName entry # in subjectAltName for sub in cert.get("subject", ()): for key, value in sub: # XXX according to RFC 2818, the most specific Common Name # must be used. if key == "commonName": if _dnsname_match(value, hostname): return dnsnames.append(value) if len(dnsnames) > 1: # soften this to allow subdomain matching if ballow_subdomain_matching(hostname, dnsnames): LOG.warn(f"Allowing {hostname} in {dnsnames}") return raise CertificateError( "hostname %r " "doesn't match any of %s" % (hostname, ", ".join(map(repr, dnsnames))) ) elif len(dnsnames) == 1: raise CertificateError("hostname %r doesn't match %r" % (hostname, dnsnames[0])) else: raise CertificateError( "no appropriate commonName or subjectAltName fields were found" ) urllib3.util.ssl_match_hostname.match_hostname = my_match_hostname from urllib3.util.ssl_ import is_ipaddress def _my_match_hostname(cert, asserted_hostname): # Our upstream implementation of ssl.match_hostname() # only applies this normalization to IP addresses so it doesn't # match DNS SANs so we do the same thing! stripped_hostname = asserted_hostname.strip("u[]") if is_ipaddress(stripped_hostname): asserted_hostname = stripped_hostname try: my_match_hostname(cert, asserted_hostname) except CertificateError as e: LOG.warning( "Certificate did not match hostname: %s. Certificate: %s", asserted_hostname, cert, ) # Add cert to exception and reraise so client code can inspect # the cert when catching the exception, if they want to e._peer_cert = cert raise urllib3.connection._match_hostname = _my_match_hostname from urllib3.contrib.socks import SOCKSProxyManager # from urllib3 import Retry def oDownloadUrlUrllib3Socks(uri, sCAfile, timeout=30, host='127.0.0.1', port=9050, session=None, content_type='text/plain'): """Theres no need to use requests here and it adds too many layers on the SSL to be able to get at things """ domain = urlparse(uri).netloc # socks proxy used for outbound web requests (for validation of proofs) proxy = SOCKSProxyManager(f'socks5h://{host}:{port}/', num_pools=1, timeout=timeout, cert_reqs='CERT_REQUIRED', assert_hostname=domain, ca_certs=sCAfile) # we use this UA string when connecting to webservers to fetch rsa-fingerprint.txt proof files # https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/#uri-rsa headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0'} LOG.debug("fetching %s...." % uri) try: # grr. fix urllib3 # Errors will be wrapped in :class:`~urllib3.exceptions.MaxRetryError` unless # retries are disabled, in which case the causing exception will be raised. head = proxy.request('HEAD', uri, headers=headers, redirect=False, retries=False) except Exception as e: LOG.error(f"HTTP HEAD request failed for {uri} {e}") raise if head.status >= 300: raise TrustorError(f"HTTP Errorcode {head.status}") if content_type and not head.headers['Content-Type'].startswith(content_type): raise TrustorError(f"HTTP Content-Type != {content_type}") if not os.path.exists(sCAfile): raise TrustorError(f"File not found CAfile {sCAfile}") try: oReqResp = proxy.request("GET", uri, headers=headers, redirect=False, ) except Exception as e: LOG.warn(f"HTTP GET request failed for {uri} {e}") raise if oReqResp.status != 200: raise TrustorError(f"HTTP Errorcode {head.status}") if content_type and not oReqResp.headers['Content-Type'].startswith(content_type): raise TrustorError(f"HTTP Content-Type != {content_type}") # check for redirects (not allowed as per spec) if oReqResp.geturl() != uri: LOG.error(f'Redirect detected %s vs %s (final)' % (uri, oReqResp.geturl())) raise TrustorError(f'Redirect detected %s vs %s (final)' % (uri, oReqResp.geturl())) oReqResp.decode_content = True return oReqResp import urllib3.connectionpool from urllib3.connection import HTTPSConnection urllib3.connectionpool.VerifiedHTTPSConnection = HTTPSConnection def lDownloadUrlFps(domain, sCAfile, timeout=30, host='127.0.0.1', port=9050): uri = f"https://{domain}/.well-known/tor-relay/rsa-fingerprint.txt" o = oDownloadUrlRequests(uri, sCAfile, timeout=timeout, host=host, port=port) well_known_content = o.text.upper().strip().split('\n') well_known_content = [i for i in well_known_content if i and len(i) == 40] return well_known_content def validate_proofs(candidates, validation_cache_file, timeout=20, host='127.0.0.1', port=9050): ''' This function takes the return value of find_validation_candidates() and validated them according to their proof type (uri-rsa, dns-rsa) and writes properly validated relay fingerprints to the local validation cache ''' dt_utc = datetime.datetime.now(datetime.timezone.utc).date() f = open(validation_cache_file, mode='a') count = 0 for domain in candidates.keys(): for prooftype in candidates[domain].keys(): if prooftype == 'uri-rsa': well_known_content = lDownloadUrlFps(domain, timeout=timeout, host=host, port=port) for fingerprint in candidates[domain][prooftype]: if fingerprint in well_known_content: # write cache entry count += 1 f.write('%s:%s:%s:%s\n' % (domain, fingerprint, prooftype, dt_utc)) else: LOG.error('%s:%s:%s' % (fingerprint, domain, prooftype)) elif prooftype == 'dns-rsa' and ub_ctx: for fingerprint in candidates[domain][prooftype]: fp_domain = fingerprint + '.' + domain if idns_validate(fp_domain, libunbound_resolv_file='resolv.conf', dnssec_DS_file='dnssec-root-trust', ) == 0: count += 1 f.write('%s:%s:%s:%s\n' % (domain, fingerprint, prooftype, dt_utc)) else: LOG.error('%s:%s:%s' % (fingerprint, domain, prooftype)) f.close() LOG.info('successfully validated %s new (not yet validated before) relays' % count) def idns_validate(domain, libunbound_resolv_file='resolv.conf', dnssec_DS_file='dnssec-root-trust', ): ''' performs DNS TXT lookups and verifies the reply - is DNSSEC valid and - contains only a single TXT record - the DNS record contains a hardcoded string as per specification https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/#dns-rsa ''' if not ub_ctx: return -1 # this is not the system wide /etc/resolv.conf # use dnscrypt-proxy to encrypt your DNS and route it via tor's SOCKSPort ctx = ub_ctx() if (os.path.isfile(libunbound_resolv_file)): ctx.resolvconf(libunbound_resolv_file) else: LOG.error('libunbound resolv config file: "%s" is missing, aborting!' % libunbound_resolv_file) return 5 if (os.path.isfile(dnssec_DS_file)): ctx.add_ta_file(dnssec_DS_file) else: LOG.error('DNSSEC trust anchor file "%s" is missing, aborting!' % dnssec_DS_file) return 6 status, result = ctx.resolve(domain, RR_TYPE_TXT, RR_CLASS_IN) if status == 0 and result.havedata: if len(result.rawdata) == 1 and result.secure: # ignore the first byte, it is the TXT length if result.data.as_raw_data()[0][1:] == b'we-run-this-tor-relay': return 0 return 1 def configure_tor(controller, trusted_fingerprints, exitonly=True): ''' takes the list of trusted fingerprints and configures a tor client to only use trusted relays in a certain position for now we only set exits. we refuse to set the configuration if there are less then 40 trusted relays ''' relay_count = len(trusted_fingerprints) if relay_count < 41: LOG.error('Too few trusted relays (%s), aborting!' % relay_count) sys.exit(15) try: controller.set_conf('ExitNodes', trusted_fingerprints) LOG.error('limited exits to %s relays' % relay_count) except Exception as e: # noqa LOG.exception('Failed to set ExitNodes tor config to trusted relays') sys.exit(20) if __name__ == '__main__': CAfile = '/etc/ssl/certs/ca-certificates.crt' trust_config = 'trust_config' assert os.path.exists(trust_config) trusted_domains = read_local_trust_config(trust_config) validation_cache_file = 'validation_cache' trusted_fingerprints = read_local_validation_cache(validation_cache_file, trusted_domains=trusted_domains) # tor ControlPort password controller_password = '' # tor ControlPort IP controller_address = '127.0.0.1' timeout = 20 port = 9050 controller = get_controller(address=controller_address, password=controller_password) r = find_validation_candidates(controller, validation_cache=trusted_fingerprints, trusted_domains=trusted_domains, CAfile=CAfile) validate_proofs(r, validation_cache_file, timeout=timeout, host=controller_address, port=port) # refresh list with newly validated fingerprints trusted_fingerprints = read_local_validation_cache(validation_cache_file, trusted_domains=trusted_domains) configure_tor(controller, trusted_fingerprints)