#!/usr/bin/env python3 """ Tor Contact Info Parser - A tool/Python Class for parsing Tor ContactInfo Information Sharing v2 specification contacts Written by Eran Sandler (https://twitter.com/erans) (C) 2018 Turned into a proper command-line tool with sub-commands and flags by @Someguy123 at Privex Inc. (C) 2021 (https://www.privex.io) (https://github.com/PrivexInc) This is a parser for the Tor ContactInfo Information Sharing Specification v2 (https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/). The parser can parse the ContactInfo field of Tor relays based on the specification. Official Repo: https://github.com/erans/torcontactinfoparser Privex Fork: https://github.com/Privex/torcontactinfoparser Released under the MIT License. """ import argparse import os import re import sys import json import logging import warnings import requests import textwrap # from rich import print as rprintxxx # HAS_RICH = True if True: def rprint(value='', *args, **kwargs): if value not in [None, False, True] and \ isinstance(value, (dict, list, set, tuple)): value = json.dumps(value, indent=4) return LOG.debug(value, *args, **kwargs) # rprint = print # HAS_RICH = False warnings.filterwarnings('ignore') LOG = logging.getLogger() class TorContactInfoParser(object): email_regex = "^[a-zA-Z0-9.!#$%&’*+/=?^_`{|}~-]+@[a-zA-Z0-9-]+(?:\\.[a-zA-Z0-9-]+)*$" def _parse_string_value(self, value, min_length, max_length, valid_chars, raise_exception=False, field_name=None, deobfuscate_email=False): value_length = len(value) if value_length < min_length: if raise_exception: raise ValueError("value of field '{0}' is too short".format(field_name)) return None if value_length > max_length: if raise_exception: raise ValueError("value of field '{0}' is too long".format(field_name)) return None if valid_chars != "*": m = re.search(valid_chars, value) if not m: if raise_exception: raise ValueError("value of field '{0}' doesn't match valid chars restrictions".format(field_name)) else: return None return value def _parse_email_value(self, value, field_name, raise_exception, deobfuscate_email): if value: v = value.replace("[]", "@") if re.search(self.email_regex, v): if not deobfuscate_email: return v.replace("@", "[]") return v return None _supported_fields_parsers = { "email" : { "fn": _parse_email_value, "args": {} }, "url" : { "fn" : _parse_string_value, "args" : { "min_length" : 4, "max_length" : 399, "valid_chars" : "[_%/:a-zA-Z0-9.-]+" } }, "proof" : { "fn" : _parse_string_value, "args" : { "min_length" : 7, "max_length" : 7, "valid_chars" : "[adinrsu-]+" } }, "ciissversion" : { "fn" : _parse_string_value, "args" : { "min_length" : 1, "max_length" : 1, "valid_chars" : "[12]+" } }, "pgp" : { "fn" : _parse_string_value, "args" : { "min_length" : 40, "max_length" : 40, "valid_chars" : "[a-zA-Z0-9]+" } }, "abuse" : { "fn": _parse_email_value, "args": {} }, "keybase" : { "fn" : _parse_string_value, "args" : { "min_length" : 0, "max_length" : 50, "valid_chars" : "[a-zA-Z0-9]+" } }, "twitter" : { "fn" : _parse_string_value, "args" : { "min_length" : 1, "max_length" : 15, "valid_chars" : "[a-zA-Z0-9_]+" } }, "mastodon" : { "fn" : _parse_string_value, "args" : { "min_length" : 0, "max_length" : 254, "valid_chars" : "*" } }, "matrix" : { "fn" : _parse_string_value, "args" : { "min_length" : 0, "max_length" : 254, "valid_chars" : "*" } }, "xmpp" : { "fn": _parse_email_value, "args": {} }, "otr3" : { "fn" : _parse_string_value, "args" : { "min_length" : 40, "max_length" : 40, "valid_chars" : "[a-z0-9]+" } }, "hoster" : { "fn" : _parse_string_value, "args" : { "min_length" : 0, "max_length" : 254, "valid_chars" : "[a-zA-Z0-9.-]+" } }, "cost" : { "fn" : _parse_string_value, "args" : { "min_length" : 0, "max_length" : 13, "valid_chars" : "[A-Z0-9.]+" } }, "uplinkbw" : { "fn" : _parse_string_value, "args" : { "min_length" : 0, "max_length" : 7, "valid_chars" : "[0-9]+" } }, "trafficacct" : { "fn" : _parse_string_value, "args" : { "min_length" : 0, "max_length" : 9, "valid_chars" : "[unmetrd0-9]+" } }, "memory" : { "fn" : _parse_string_value, "args" : { "min_length" : 0, "max_length" : 10, "valid_chars" : "[0-9]+" } }, "cpu" : { "fn" : _parse_string_value, "args" : { "min_length" : 0, "max_length" : 50, "valid_chars" : "[a-zA-Z0-9_-]+" } }, "virtualization" : { "fn" : _parse_string_value, "args" : { "min_length" : 0, "max_length" : 15, "valid_chars" : "[a-z-]+" } }, "donationurl" : { "fn" : _parse_string_value, "args" : { "min_length" : 0, "max_length" : 254, "valid_chars" : "*" } }, "btc" : { "fn" : _parse_string_value, "args" : { "min_length" : 26, "max_length" : 99, "valid_chars" : "[a-zA-Z0-9]+" } }, "zec" : { "fn" : _parse_string_value, "args" : { "min_length" : 0, "max_length" : 95, "valid_chars" : "[a-zA-Z0-9]+" } }, "xmr" : { "fn" : _parse_string_value, "args" : { "min_length" : 0, "max_length" : 99, "valid_chars" : "[a-zA-Z0-9]+" } }, "offlinemasterkey" : { "fn" : _parse_string_value, "args" : { "min_length" : 1, "max_length" : 1, "valid_chars" : "[yn]" } }, "signingkeylifetime" : { "fn" : _parse_string_value, "args" : { "min_length" : 0, "max_length" : 6, "valid_chars" : "[0-9]+" } }, "sandbox" : { "fn" : _parse_string_value, "args" : { "min_length" : 1, "max_length" : 2, "valid_chars" : "[yn]" } }, "os" : { "fn" : _parse_string_value, "args" : { "min_length" : 0, "max_length" : 20, "valid_chars" : "[A-Za-z0-9/.]+" } }, "tls" : { "fn" : _parse_string_value, "args" : { "min_length" : 0, "max_length" : 14, "valid_chars" : "[a-z]+" } }, "aesni" : { "fn" : _parse_string_value, "args" : { "min_length" : 1, "max_length" : 1, "valid_chars" : "[yn]" } }, "autoupdate" : { "fn" : _parse_string_value, "args" : { "min_length" : 1, "max_length" : 1, "valid_chars" : "[yn]" } }, "confmgmt" : { "fn" : _parse_string_value, "args" : { "min_length" : 1, "max_length" : 15, "valid_chars" : "[a-zA-Z-]" } }, "dnslocation" : { "fn" : _parse_string_value, "args" : { "min_length" : 5, "max_length" : 100, "valid_chars" : "[a-z,]" } }, "dnsqname" : { "fn" : _parse_string_value, "args" : { "min_length" : 1, "max_length" : 1, "valid_chars" : "[yn]" } }, "dnssec" : { "fn" : _parse_string_value, "args" : { "min_length" : 1, "max_length" : 1, "valid_chars" : "[yn]" } }, "dnslocalrootzone" : { "fn" : _parse_string_value, "args" : { "min_length" : 1, "max_length" : 1, "valid_chars" : "[yn]" } } } def __init__(self): pass def parse(self, value: str, raise_exception_on_invalid_value=False, deobfuscate_email=True) -> dict: # the ciissversion field is mandatory if not 'ciissversion:' in value: return None result = {} parts = value.split(" ") for p in parts: field_parts = p.split(":", 1) if len(field_parts) <= 1: continue name, data = field_parts if name in self._supported_fields_parsers: field_parser = self._supported_fields_parsers[name] if field_parser is None: result[name] = data continue if callable(field_parser): value = field_parser(self, data) else: field_parser["args"]["field_name"] = name field_parser["args"]["value"] = data field_parser["args"]["raise_exception"] = raise_exception_on_invalid_value field_parser["args"]["deobfuscate_email"] = deobfuscate_email value = field_parser["fn"](self, **field_parser["args"]) if not result.get(name, None): result[name] = value return result def cmd_parse(opts: argparse.Namespace): """ ArgParser function for parsing a single ContactInfo string, and outputting it as JSON (or python-style dict's) """ if opts.contact is None or len(opts.contact) == 0 or opts.contact[0] == '-': contact = sys.stdin.read() else: contact = ' '.join(opts.contact).strip() tparser = TorContactInfoParser() res = tparser.parse(contact) if not opts.pretty: return print(json.dumps(res)) if opts.json: res = json.dumps(res, indent=4) if opts.pretty else json.dumps(res) # if not HAS_RICH: res = json.dumps(res, indent=4) rprint(res) def cmd_scan(opts: argparse.Namespace, adata=None) -> int: """ ArgParser function for scanning all ContactInfo strings from ``https://onionoo.torproject.org/details`` , and outputting each one as a Python-style Dict, or JSON. """ parser = TorContactInfoParser() surl = "https://onionoo.torproject.org/details" if not adata: LOG.info(f"Getting relays from {surl}") jdata = requests.get(surl) try: adata = jdata.json() except Exception as e: # simplejson.errors.JSONDecodeError LOG.exception(f"JSON error {e}") return elts = adata["relays"] else: elts = json.loads(adata)['relays'] if not elts: LOG.warn(f"NO relays - are we connected?") return LOG.info(f"{len(elts)} relays") for relay in elts: if 'fingerprint' not in relay.keys(): LOG.warn(f"fingerprint not in relay for {relay}") continue fp = relay['fingerprint'] verified_host_names = relay.get('verified_host_names', []) contact = relay.get("contact", None) if not contact: LOG.warn(f"No contact for {fp} {verified_host_names}") continue if 'ciissversion' not in contact: LOG.debug(f"No ciissversion in contact in {fp}") continue LOG.debug(f"parsing {fp}") result = parser.parse(contact, False) if not result: LOG.warn(f"No result for {contact} in {fp}") continue if len(result) > 0: if opts.json: result = json.dumps(result, indent=4) if opts.pretty else json.dumps(result) if opts.pretty: rprint(result) else: LOG.debug(result) return 0 ETC_DIR = '/etc/tor/yaml' def oparser(): cparser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=textwrap.dedent(f""" Examples: # 'scan' is the original behaviour of this script. It iterates over the data # from https://onionoo.torproject.org/details , parses each contact, and prints it as Python dict-style JSON. {sys.argv[0]} scan # Same as previous. With no arguments, it's equivalent to running 'scan'. {sys.argv[0]} # If you pass '-p' after scan, it will enable pretty printing. For best pretty printing, # make sure you have 'rich' installed from pypi. {sys.argv[0]} scan -p # If you need real JSON with double quotes, rather than Python dict-style JSON, you can # use the '-j' flag to enable "real JSON" mode (you can combine with '-p' if you want pretty printed real json) {sys.argv[0]} scan -j # Using 'parse', you can parse an arbitrary ContactInfo string, and it will output the parsed result # with pretty printing by default. {sys.argv[0]} parse "contact Privex Inc. email:noc[]privex.io url:https://www.privex.io " \\ "proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc hoster:www.privex.io " \\ "uplinkbw:500 memory:4096 virtualization:kvm btc:bc1qpst9uscvd8rpjjhzz9rau3trylh6e0wh76qrlhw3q9nj89ua728sn3t6a2 " \\ "xmr:89tukP3wfpH4FZAmC1D2GfArWwfPTz8Ap46NZc54Vyhy9YxEUYoFQ7HGQ74LrCMQTD3zxvwM1ewmGjH9WVmeffwR72m1Pps" {{ 'email': 'noc@privex.io', 'url': 'https://www.privex.io', 'proof': 'uri-rsa', 'pgp': None, 'keybase': 'privexinc', 'twitter': 'PrivexInc', 'hoster': 'www.privex.io', 'uplinkbw': '500', 'memory': '4096', 'virtualization': 'kvm', 'btc': 'bc1qpst9uscvd8rpjjhzz9rau3trylh6e0wh76qrlhw3q9nj89ua728sn3t6a2', 'xmr': '89tukP3wfpH4FZAmC1D2GfArWwfPTz8Ap46NZc54Vyhy9YxEUYoFQ7HGQ74LrCMQTD3zxvwM1ewmGjH9WVmeffwR72m1Pps' }} # You can also pipe a contact string into 'parse', and it will work just the same. echo "Privex Inc. email:noc[]privex.io url:https://www.privex.io proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc" | {sys.argv[0]} parse {{'email': 'noc@privex.io', 'url': 'https://www.privex.io', 'proof': 'uri-rsa', 'pgp': None, 'keybase': 'privexinc', 'twitter': 'PrivexInc\n'}} # If you need real JSON outputted, rather than Python dict-style output, you can pass -j to either 'parse' or 'scan' {sys.argv[0]} parse -j "Privex Inc. email:noc[]privex.io url:https://www.privex.io proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc" {{ "email": "noc@privex.io", "url": "https://www.privex.io", "proof": "uri-rsa", "pgp": null, "keybase": "privexinc", "twitter": "PrivexInc" }} # You can use '-np' to disable pretty printing for 'parse' - you can combine it with '-j' to get flat, plain JSON. {sys.argv[0]} parse -np -j "Privex Inc. email:noc[]privex.io url:https://www.privex.io proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc" {{"email": "noc@privex.io", "url": "https://www.privex.io", "proof": "uri-rsa", "pgp": null, "keybase": "privexinc", "twitter": "PrivexInc"}} """)) cparser.add_argument('--relays_output', type=str, dest='relays_output', default=os.path.join(ETC_DIR, 'relays.json'), help="Write the download relays in json to a file") cparser.add_argument('-j', '--json', action='store_true', default=False, dest='json', help="Output real JSON, not Python dict format.") cparser.set_defaults(func=cmd_scan, json=False, pretty=False) subparse = cparser.add_subparsers() subparse.required = False sp_parse = subparse.add_parser('parse', help="Parse a single contact string, either as an argument, or piped into stdin") sp_parse.add_argument('contact', nargs='*') sp_parse.add_argument('-np', '--no-pretty', action='store_false', default=False, dest='pretty', help="Disable pretty printing JSON") sp_parse.set_defaults(func=cmd_parse) sp_scan = subparse.add_parser('scan', help="Parse all contacts from https://onionoo.torproject.org/details") sp_scan.add_argument('-p', action='store_true', default=False, dest='pretty', help="Enable pretty printing JSON") sp_scan.add_argument('-j', '--json', action='store_true', default=False, dest='json', help="Output real JSON, not Python dict format.") # sp_scan.set_defaults(func=cmd_scan) return cparser def iMain(lArgs=None): cparser = oparser() opts = cparser.parse_args(lArgs) data = None if opts.relays_output and os.path.exists(opts.relays_output): data = open(opts.relays_output, 'rt').read() i = cmd_scan(opts, data) return i if __name__ == "__main__": from exclude_utils import vsetup_logging if os.environ.get('DEBUG', ''): log_level = 10 # logging.DEBUG else: log_level = 20 # logging.INFO vsetup_logging(LOG, log_level) try: i = iMain(sys.argv[1:]) except KeyboardInterrupt as e: i = 0 except (requests.exceptions.ProxyError, Exception,) as e: LOG.exception(f"Exception: {e}", exc_info=True) i = 0 sys.exit(i)