This commit is contained in:
emdee@spm.plastiras.org 2024-01-14 11:55:48 +00:00
parent 5c55533c32
commit 79b2296285
13 changed files with 1206 additions and 1 deletions

View File

@ -1,3 +1,6 @@
# stem_examples
stem examples
stem examples
https://stem.torproject.org/tutorials/examples/exit_used.html
http://vt5hknv6sblkgf22.onion/tutorials/examples/list_circuits.html
http://vt5hknv6sblkgf22.onion/tutorials/examples/relay_connections.html

View File

View File

@ -0,0 +1,56 @@
# -*-mode: python; py-indent-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
# http://vt5hknv6sblkgf22.onion/tutorials/examples/check_digests.html
import sys
import stem.descriptor.remote
import stem.util.tor_tools
from tor_controller import set_socks_proxy
def download_descriptors(fingerprint):
"""
Downloads the descriptors we need to validate this relay. Downloads are
parallelized, providing the caller with a tuple of the form...
(router_status_entry, server_descriptor, extrainfo_descriptor)
"""
conensus_query = stem.descriptor.remote.get_consensus()
server_desc_query = stem.descriptor.remote.get_server_descriptors(fingerprint)
extrainfo_query = stem.descriptor.remote.get_extrainfo_descriptors(fingerprint)
router_status_entries = filter(lambda desc: desc.fingerprint == fingerprint, conensus_query.run())
if len(router_status_entries) != 1:
raise IOError("Unable to find relay '%s' in the consensus" % fingerprint)
return (
router_status_entries[0],
server_desc_query.run()[0],
extrainfo_query.run()[0],
)
if __name__ == '__main__':
set_socks_proxy()
fingerprint = input("What relay fingerprint would you like to validate?\n")
print('') # blank line
if not stem.util.tor_tools.is_valid_fingerprint(fingerprint):
print("'%s' is not a valid relay fingerprint" % fingerprint)
sys.exit(1)
try:
router_status_entry, server_desc, extrainfo_desc = download_descriptors(fingerprint)
except Exception as exc:
print(exc)
sys.exit(1)
if router_status_entry.digest == server_desc.digest():
print("Server descriptor digest is correct")
else:
print("Server descriptor digest invalid, expected %s but is %s" % (router_status_entry.digest, server_desc.digest()))
if server_desc.extra_info_digest == extrainfo_desc.digest():
print("Extrainfo descriptor digest is correct")
else:
print("Extrainfo descriptor digest invalid, expected %s but is %s" % (server_desc.extra_info_digest, extrainfo_desc.digest()))

View File

@ -0,0 +1,62 @@
# -*-mode: python; py-indent-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
import sys
import collections
import stem.descriptor
import stem.descriptor.remote
import stem.directory
from tor_controller import set_socks_proxy
def iMain():
# Query all authority votes asynchronously.
downloader = stem.descriptor.remote.DescriptorDownloader(
document_handler = stem.descriptor.DocumentHandler.DOCUMENT,
)
# An ordered dictionary ensures queries are finished in the order they were
# added.
queries = collections.OrderedDict()
for name, authority in stem.directory.Authority.from_cache().items():
if authority.v3ident is None:
continue # authority doesn't vote if it lacks a v3ident
queries[name] = downloader.get_vote(authority)
# Wait for the votes to finish being downloaded, this produces a dictionary of
# authority nicknames to their vote.
votes = dict((name, query.run()[0]) for (name, query) in queries.items())
# Get a superset of all the fingerprints in all the votes.
all_fingerprints = set()
for vote in votes.values():
all_fingerprints.update(vote.routers.keys())
# Finally, compare moria1's votes to maatuska's votes.
for fingerprint in all_fingerprints:
moria1_vote = votes['moria1'].routers.get(fingerprint)
maatuska_vote = votes['maatuska'].routers.get(fingerprint)
if not moria1_vote and not maatuska_vote:
print("both moria1 and maatuska haven't voted about %s" % fingerprint)
elif not moria1_vote:
print("moria1 hasn't voted about %s" % fingerprint)
elif not maatuska_vote:
print("maatuska hasn't voted about %s" % fingerprint)
elif 'Running' in moria1_vote.flags and 'Running' not in maatuska_vote.flags:
print("moria1 has the Running flag but maatuska doesn't: %s" % fingerprint)
elif 'Running' in maatuska_vote.flags and 'Running' not in moria1_vote.flags:
print("maatuska has the Running flag but moria1 doesn't: %s" % fingerprint)
return 0
if __name__ == '__main__':
set_socks_proxy()
sys.exit(iMain())

46
src/stem_examples/exit_used.py Executable file
View File

@ -0,0 +1,46 @@
# -*-mode: python; py-indent-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
# https://stem.torproject.org/tutorials/examples/exit_used.html
import functools
import sys
import os
from stem import StreamStatus
from stem.control import EventType, Controller
from tor_controller import get_controller
def stream_event(controller, event):
if event.status == StreamStatus.SUCCEEDED and event.circ_id:
circ = controller.get_circuit(event.circ_id)
exit_fingerprint = circ.path[-1][0]
exit_relay = controller.get_network_status(exit_fingerprint)
print("Exit relay for our connection to %s" % (event.target))
print(" address: %s:%i" % (exit_relay.address, exit_relay.or_port))
print(" fingerprint: %s" % exit_relay.fingerprint)
print(" nickname: %s" % exit_relay.nickname)
print(" locale: %s" % controller.get_info("ip-to-country/%s" % exit_relay.address, 'unknown'))
print("")
from tor_controller import get_controller
def main():
print("Please wait for requests for tor exits. Press 'enter' to end.")
print("")
if os.path.exists('/run/tor/control'):
controller = get_controller(unix='/run/tor/control')
else:
controller = get_controller(port=9051)
password = os.environ.get('TOR_CONTROLLER_PASSWORD')
controller.authenticate(password)
stream_listener = functools.partial(stream_event, controller)
controller.add_event_listener(stream_listener, EventType.STREAM)
print('Press Enter')
input() # wait for user to press enter
if __name__ == '__main__':
main()

View File

@ -0,0 +1,55 @@
# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*-
# http://vt5hknv6sblkgf22.onion/tutorials/over_the_river.html
import sys
import os
import getpass
from stem.control import Controller
from tor_controller import get_controller
def iMain(lArgs=None):
if lArgs is None:
lArgs = sys.argv[1:]
try:
if os.path.exists('/run/tor/control'):
controller = get_controller(unix='/run/tor/control')
else:
controller = get_controller(port=9051)
password = os.environ.get('TOR_CONTROLLER_PASSWORD')
controller.authenticate(password)
for elt in lArgs:
desc = controller.get_hidden_service_descriptor(elt, await_result=True, timeout=None)
print(f"{desc} get_hidden_service_descriptor\n")
l = desc.introduction_points()
if l:
print(f"{elt} NO introduction points\n")
continue
print(f"{elt} introduction points are...\n")
for introduction_point in l:
print(' %s:%s => %s' % (introduction_point.address,
introduction_point.port,
introduction_point.identifier))
except Exception as e:
print(e)
finally:
del controller
return 0
lKNOWN_ONIONS = [
'facebookwkhpilnemxj7asaniu7vnjjbiltxjqhye3mhbshg7kx5tfyd', # facebook
'duckduckgogg42xjoc72x3sjasowoarfbgcmvfimaftt6twagswzczad', # ddg
'zkaan2xfbuxia2wpf7ofnkbz6r5zdbbvxbunvp5g2iebopbfc4iqmbad', # hks
]
if __name__ == '__main__':
if len(sys.argv) <= 1:
lArgs = lKNOWN_ONIONS
else:
lArgs = sys.argv[1:]
sys.exit(iMain())

View File

@ -0,0 +1,44 @@
# -*-mode: python; py-indent-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
# http://vt5hknv6sblkgf22.onion/tutorials/examples/list_circuits.html
import sys
import os
import getpass
from stem import CircStatus
from tor_controller import get_controller
def iMain():
if os.path.exists('/run/tor/control'):
controller = get_controller(unix='/run/tor/control')
else:
controller = get_controller(port=9051)
password = os.environ.get('TOR_CONTROLLER_PASSWORD')
try:
controller.authenticate(password)
for circ in sorted(controller.get_circuits()):
if circ.status != CircStatus.BUILT:
continue
print("")
print("Circuit %s (%s)" % (circ.id, circ.purpose))
for i, entry in enumerate(circ.path):
div = '+' if (i == len(circ.path) - 1) else '|'
fingerprint, nickname = entry
desc = controller.get_network_status(fingerprint, None)
address = desc.address if desc else 'unknown'
print(" %s- %s (%s, %s)" % (div, fingerprint, nickname, address))
except Exception as e:
print(e)
finally:
del controller
return 0
if __name__ == '__main__':
sys.exit( iMain() )

View File

@ -0,0 +1,41 @@
# -*-mode: python; py-indent-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
# https://stem.torproject.org/tutorials/examples/exit_used.html
import functools
import sys
import getpass
import os
from stem import StreamStatus
from stem.control import EventType, Controller
from tor_controller import set_socks_proxy
from tor_controller import get_controller
global LOG
import logging
LOG = logging.getLogger('map_address')
def sMapaddressResolv(target, iPort=9051):
try:
if os.path.exists('/run/tor/control'):
controller = get_controller(unix='/run/tor/control')
else:
controller = get_controller(port=9051)
password = os.environ.get('TOR_CONTROLLER_PASSWORD')
controller.authenticate(password)
map_dict = {"0.0.0.0": target}
map_ret = controller.map_address(map_dict)
return map_ret
except Exception as e:
LOG.exception(e)
if __name__ == '__main__':
if len(sys.argv) < 2:
target = "l2ct3xnuaiwwtoybtn46qp2av4ndxcguwupzyv6xrsmnwi647vvmwtqd"
else:
target = sys.argv[1]
print(sMapaddressResolv(target))

View File

@ -0,0 +1,28 @@
from stem.descriptor.remote import DescriptorDownloader
from stem.version import Version
from tor_controller import set_socks_proxy
def iMain():
set_socks_proxy()
downloader = DescriptorDownloader()
count, with_contact = 0, 0
elts = downloader.get_server_descriptors()
print(f"Checking for outdated relays len server_descriptors={len(list(elts))}...")
print("")
for desc in elts:
if desc.tor_version < Version('0.2.3.0'):
count += 1
if desc.contact:
print(' %-15s %s' % (desc.tor_version, desc.contact.decode("utf-8", "replace")))
with_contact += 1
print("")
print("%i outdated relays found, %i had contact information" % (count, with_contact))
# http://vt5hknv6sblkgf22.onion/tutorials/examples/outdated_relays.htmlhttp://vt5hknv6sblkgf22.onion/tutorials/examples/outdated_relays.html
return 0
if __name__ == '__main__':
sys.exit( iMain())

View File

@ -0,0 +1,154 @@
# -*-mode: python; py-indent-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
# http://vt5hknv6sblkgf22.onion/tutorials/examples/relay_connections.html
import argparse
import collections
import os
import sys
import time
import stem.connection
import stem.util.system
import stem.util.str_tools
from stem.control import Listener
from stem.control import Controller
from stem.util.connection import get_connections, port_usage, is_valid_ipv4_address
from tor_controller import get_controller
global LOG
import logging
LOG = logging.getLogger('relay_cons')
HEADER_LINE = " {version} uptime: {uptime} flags: {flags}\n"
DIV = '+%s+%s+%s+' % ('-' * 30, '-' * 6, '-' * 6)
COLUMN = '| %-28s | %4s | %4s |'
INBOUND_ORPORT = 'Inbound to our ORPort'
INBOUND_DIRPORT = 'Inbound to our DirPort'
INBOUND_CONTROLPORT = 'Inbound to our ControlPort'
OUTBOUND_ORPORT = 'Outbound to a relay'
OUTBOUND_EXIT = 'Outbound exit traffic'
OUTBOUND_UNKNOWN = 'Outbound uncategorized'
def iMain(lArgs=None):
if lArgs is None:
lArgs = sys.argv[1:]
parser = argparse.ArgumentParser()
parser.add_argument("--ctrlport", default=9051, type=int, help="default: 9051")
parser.add_argument("--resolver", help="default: autodetected")
args = parser.parse_args(lArgs)
control_port = int(args.ctrlport) if args.ctrlport else 'default'
if os.path.exists('/run/tor/control'):
controller = get_controller(unix='/run/tor/control')
else:
controller = get_controller(port=control_port)
# controller = stem.connection.connect(control_port = ('127.0.0.1', control_port))
password = os.environ.get('TOR_CONTROLLER_PASSWORD')
controller.authenticate(password)
if not controller:
return 1
desc = controller.get_network_status(default=None)
pid = controller.get_pid()
version = str(controller.get_version()).split()[0],
uptime = stem.util.str_tools.short_time_label(time.time() - stem.util.system.start_time(pid))
print(HEADER_LINE.format(
version=version,
uptime=uptime,
flags = ', '.join(desc.flags if desc else ['none']),
))
policy = controller.get_exit_policy()
relays = {} # address => [orports...]
for desc in controller.get_network_statuses():
relays.setdefault(desc.address, []).append(desc.or_port)
# categorize our connections
categories = collections.OrderedDict((
(INBOUND_ORPORT, []),
(INBOUND_DIRPORT, []),
(INBOUND_CONTROLPORT, []),
(OUTBOUND_ORPORT, []),
(OUTBOUND_EXIT, []),
(OUTBOUND_UNKNOWN, []),
))
exit_connections = {} # port => [connections]
for conn in get_connections(resolver = args.resolver, process_pid = pid):
if conn.protocol == 'udp':
continue
if conn.local_port in controller.get_ports(Listener.OR, []):
categories[INBOUND_ORPORT].append(conn)
elif conn.local_port in controller.get_ports(Listener.DIR, []):
categories[INBOUND_DIRPORT].append(conn)
elif conn.local_port in controller.get_ports(Listener.CONTROL, []):
categories[INBOUND_CONTROLPORT].append(conn)
elif conn.remote_port in relays.get(conn.remote_address, []):
categories[OUTBOUND_ORPORT].append(conn)
elif policy.can_exit_to(conn.remote_address, conn.remote_port):
categories[OUTBOUND_EXIT].append(conn)
exit_connections.setdefault(conn.remote_port, []).append(conn)
else:
categories[OUTBOUND_UNKNOWN].append(conn)
print(DIV)
print(COLUMN % ('Type', 'IPv4', 'IPv6'))
print(DIV)
total_ipv4, total_ipv6 = 0, 0
for label, connections in categories.items():
if len(connections) == 0:
continue
ipv4_count = len([conn for conn in connections if is_valid_ipv4_address(conn.remote_address)])
ipv6_count = len(connections) - ipv4_count
total_ipv4, total_ipv6 = total_ipv4 + ipv4_count, total_ipv6 + ipv6_count
print(COLUMN % (label, ipv4_count, ipv6_count))
print(DIV)
print(COLUMN % ('Total', total_ipv4, total_ipv6))
print(DIV)
print('')
if exit_connections:
print(DIV)
print(COLUMN % ('Exit Port', 'IPv4', 'IPv6'))
print(DIV)
total_ipv4, total_ipv6 = 0, 0
for port in sorted(exit_connections):
connections = exit_connections[port]
ipv4_count = len([conn for conn in connections if is_valid_ipv4_address(conn.remote_address)])
ipv6_count = len(connections) - ipv4_count
total_ipv4, total_ipv6 = total_ipv4 + ipv4_count, total_ipv6 + ipv6_count
usage = port_usage(port)
label = '%s (%s)' % (port, usage) if usage else port
print(COLUMN % (label, ipv4_count, ipv6_count))
print(DIV)
print(COLUMN % ('Total', total_ipv4, total_ipv6))
print(DIV)
print('')
if __name__ == '__main__':
iMain()

View File

@ -0,0 +1,46 @@
#!/usr/bin/python3 -u
## Copyright (C) 2012 - 2020 ENCRYPTED SUPPORT LP <adrelanos@riseup.net>
## See the file COPYING for copying conditions.
import sys
import os
import re
from stem.connection import connect
from tor_controller import get_controller
def iMain(lArgs=None):
password = os.environ.get('TOR_CONTROLLER_PASSWORD')
if os.path.exists('/run/tor/control'):
controller = get_controller(password=password, unix='/run/tor/control')
else:
controller = get_controller(password=password, port=9051)
# controller.connect()
bootstrap_status = controller.get_info("status/bootstrap-phase")
## Possible answer, if network cable has been removed:
## 250-status/bootstrap-phase=WARN BOOTSTRAP PROGRESS=80 TAG=conn_or SUMMARY="Connecting to the Tor network" WARNING="No route to host" REASON=NOROUTE COUNT=26 RECOMMENDATION=warn
## Possible answer:
## 250-status/bootstrap-phase=NOTICE BOOTSTRAP PROGRESS=85 TAG=handshake_or SUMMARY="Finishing handshake with first hop"
## Possible answer, when done:
## 250-status/bootstrap-phase=NOTICE BOOTSTRAP PROGRESS=100 TAG=done SUMMARY="Done"
## TODO: parse the messages above.
print(format(bootstrap_status))
progress_percent = re.match('.* PROGRESS=([0-9]+).*', bootstrap_status)
exit_code = int(progress_percent.group(1))
controller.close()
return exit_code
if __name__ == '__main__':
sys.exit( iMain())

View File

@ -0,0 +1,57 @@
# -*-mode: python; py-indent-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
import os
import sys
import getpass
import socket
from stem.control import EventType, Controller
from stem.connection import MissingPassword
def set_socks_proxy(SOCKS5_PROXY_HOST='127.0.0.1', SOCKS5_PROXY_PORT=9050):
try:
import socks # you need to install pysocks (see above)
# Remove this if you don't plan to "deactivate" the proxy later
default_socket = socket.socket
# Set up a proxy
socks.set_default_proxy(socks.SOCKS5, SOCKS5_PROXY_HOST, SOCKS5_PROXY_PORT)
socket.socket = socks.socksocket
except ImportError:
return False
return True
def get_controller(password=None, address='127.0.0.1', port=9051, unix='/run/tor/control'):
if unix and os.path.exists(unix):
# print(unix)
controller = Controller.from_socket_file(path=unix)
else:
# print(port)
controller = Controller.from_port(address=address, port=port)
if password is None:
print("DBUG: trying TOR_CONTROLLER_PASSWORD")
password = os.environ.get('TOR_CONTROLLER_PASSWORD')
else:
# print(f"DBUG: using a password {len(password)}")
pass
if not password:
# print("DBUG: trying without a password")
try:
controller.authenticate()
return controller
except MissingPassword as e:
if not password:
sys.stdout.flush()
password = getpass.unix_getpass(prompt='Controller Password: ', stream=sys.stderr)
except Exception as e:
print(f"WARN: {e}")
try:
# print(f"DBUG: using a password {password}")
controller.authenticate(password)
except Exception as e:
print(f"ERROR: {e}")
return controller

View File

@ -0,0 +1,613 @@
#!/usr/bin/env python3
"""
Tor Contact Info Parser - A tool/Python Class for parsing Tor ContactInfo Information Sharing v2 specification contacts
Written by Eran Sandler (https://twitter.com/erans) (C) 2018
Turned into a proper command-line tool with sub-commands and flags by @Someguy123 at Privex Inc. (C) 2021
(https://www.privex.io) (https://github.com/PrivexInc)
This is a parser for the Tor ContactInfo Information Sharing Specification v2 (https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/).
The parser can parse the ContactInfo field of Tor relays based on the specification.
Official Repo: https://github.com/erans/torcontactinfoparser
Privex Fork: https://github.com/Privex/torcontactinfoparser
Released under the MIT License.
"""
import argparse
import os
import re
import sys
import json
import requests
import textwrap
try:
from rich import print as rprint
HAS_RICH = True
except ImportError:
def rprint(value='', *args, **kwargs):
if value not in [None, False, True] and isinstance(value, (dict, list, set, tuple)):
value = json.dumps(value, indent=4)
return print(value, *args, **kwargs)
# rprint = print
HAS_RICH = False
global LOG
import logging
import warnings
warnings.filterwarnings('ignore')
LOG = logging.getLogger()
class TorContactInfoParser(object):
email_regex = "^[a-zA-Z0-9.!#$%&*+/=?^_`{|}~-]+@[a-zA-Z0-9-]+(?:\\.[a-zA-Z0-9-]+)*$"
def _parse_string_value(self, value, min_length, max_length, valid_chars, raise_exception=False, field_name=None, deobfuscate_email=False):
value_length = len(value)
if value_length < min_length:
if raise_exception:
raise ValueError("value of field '{0}' is too short".format(field_name))
return None
if value_length > max_length:
if raise_exception:
raise ValueError("value of field '{0}' is too long".format(field_name))
return None
if valid_chars != "*":
m = re.search(valid_chars, value)
if not m:
if raise_exception:
raise ValueError("value of field '{0}' doesn't match valid chars restrictions".format(field_name))
else:
return None
return value
def _parse_email_value(self, value, field_name, raise_exception, deobfuscate_email):
if value:
v = value.replace("[]", "@")
if re.search(self.email_regex, v):
if not deobfuscate_email:
return v.replace("@", "[]")
return v
return None
_supported_fields_parsers = {
"email" : {
"fn": _parse_email_value,
"args": {}
},
"url" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 4,
"max_length" : 399,
"valid_chars" : "[_%/:a-zA-Z0-9.-]+"
}
},
"proof" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 7,
"max_length" : 7,
"valid_chars" : "[adinrsu-]+"
}
},
"ciissversion" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[12]+"
}
},
"pgp" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 40,
"max_length" : 40,
"valid_chars" : "[a-zA-Z0-9]+"
}
},
"abuse" : {
"fn": _parse_email_value,
"args": {}
},
"keybase" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 50,
"valid_chars" : "[a-zA-Z0-9]+"
}
},
"twitter" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 15,
"valid_chars" : "[a-zA-Z0-9_]+"
}
},
"mastodon" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 254,
"valid_chars" : "*"
}
},
"matrix" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 254,
"valid_chars" : "*"
}
},
"xmpp" : {
"fn": _parse_email_value,
"args": {}
},
"otr3" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 40,
"max_length" : 40,
"valid_chars" : "[a-z0-9]+"
}
},
"hoster" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 254,
"valid_chars" : "[a-zA-Z0-9.-]+"
}
},
"cost" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 13,
"valid_chars" : "[A-Z0-9.]+"
}
},
"uplinkbw" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 7,
"valid_chars" : "[0-9]+"
}
},
"trafficacct" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 9,
"valid_chars" : "[unmetrd0-9]+"
}
},
"memory" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 10,
"valid_chars" : "[0-9]+"
}
},
"cpu" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 50,
"valid_chars" : "[a-zA-Z0-9_-]+"
}
},
"virtualization" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 15,
"valid_chars" : "[a-z-]+"
}
},
"donationurl" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 254,
"valid_chars" : "*"
}
},
"btc" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 26,
"max_length" : 99,
"valid_chars" : "[a-zA-Z0-9]+"
}
},
"zec" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 95,
"valid_chars" : "[a-zA-Z0-9]+"
}
},
"xmr" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 99,
"valid_chars" : "[a-zA-Z0-9]+"
}
},
"offlinemasterkey" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[yn]"
}
},
"signingkeylifetime" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 6,
"valid_chars" : "[0-9]+"
}
},
"sandbox" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 2,
"valid_chars" : "[yn]"
}
},
"os" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 20,
"valid_chars" : "[A-Za-z0-9/.]+"
}
},
"tls" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 14,
"valid_chars" : "[a-z]+"
}
},
"aesni" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[yn]"
}
},
"autoupdate" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[yn]"
}
},
"confmgmt" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 15,
"valid_chars" : "[a-zA-Z-]"
}
},
"dnslocation" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 5,
"max_length" : 100,
"valid_chars" : "[a-z,]"
}
},
"dnsqname" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[yn]"
}
},
"dnssec" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[yn]"
}
},
"dnslocalrootzone" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[yn]"
}
}
}
def __init__(self):
pass
def parse(self, value: str, raise_exception_on_invalid_value=False, deobfuscate_email=True) -> dict:
# the ciissversion field is mandatory
if not 'ciissversion:' in value:
return None
result = {}
parts = value.split(" ")
for p in parts:
field_parts = p.split(":", 1)
if len(field_parts) <= 1:
continue
name, data = field_parts
if name in self._supported_fields_parsers:
field_parser = self._supported_fields_parsers[name]
if field_parser is None:
result[name] = data
continue
if callable(field_parser):
value = field_parser(self, data)
else:
field_parser["args"]["field_name"] = name
field_parser["args"]["value"] = data
field_parser["args"]["raise_exception"] = raise_exception_on_invalid_value
field_parser["args"]["deobfuscate_email"] = deobfuscate_email
value = field_parser["fn"](self, **field_parser["args"])
if not result.get(name, None):
result[name] = value
return result
def cmd_parse(opts: argparse.Namespace):
"""
ArgParser function for parsing a single ContactInfo string, and outputting it as JSON (or python-style dict's)
"""
if opts.contact is None or len(opts.contact) == 0 or opts.contact[0] == '-':
contact = sys.stdin.read()
else:
contact = ' '.join(opts.contact).strip()
tparser = TorContactInfoParser()
res = tparser.parse(contact)
if not opts.pretty:
return print(json.dumps(res))
if opts.json:
res = json.dumps(res, indent=4) if opts.pretty else json.dumps(res)
# if not HAS_RICH: res = json.dumps(res, indent=4)
rprint(res)
def vsetup_logging(log_level, logfile='', stream=sys.stderr):
global LOG
add = True
try:
if 'COLOREDLOGS_LEVEL_STYLES' not in os.environ:
os.environ['COLOREDLOGS_LEVEL_STYLES'] = 'spam=22;debug=28;verbose=34;notice=220;warning=202;success=118,bold;error=124;critical=background=red'
# https://pypi.org/project/coloredlogs/
import coloredlogs
except ImportError:
coloredlogs = False
logging._defaultFormatter = logging.Formatter(datefmt='%m-%d %H:%M:%S')
logging._defaultFormatter.default_time_format = '%m-%d %H:%M:%S'
logging._defaultFormatter.default_msec_format = ''
kwargs = dict(level=log_level,
force=True,
format='%(levelname)s %(message)s')
if logfile:
add = logfile.startswith('+')
sub = logfile.startswith('-')
if add or sub:
logfile = logfile[1:]
kwargs['filename'] = logfile
if coloredlogs:
# https://pypi.org/project/coloredlogs/
aKw = dict(level=log_level,
logger=LOG,
stream=stream,
fmt='%(levelname)s %(message)s'
)
coloredlogs.install(**aKw)
if logfile:
oHandler = logging.FileHandler(logfile)
LOG.addHandler(oHandler)
LOG.info(f"CSetting log_level to {log_level} {stream}")
else:
logging.basicConfig(**kwargs)
if add and logfile:
oHandler = logging.StreamHandler(stream)
LOG.addHandler(oHandler)
LOG.info(f"SSetting log_level to {log_level!s}")
def cmd_scan(opts: argparse.Namespace, adata=None):
"""
ArgParser function for scanning all ContactInfo strings from ``https://onionoo.torproject.org/details`` ,
and outputting each one as a Python-style Dict, or JSON.
"""
parser = TorContactInfoParser()
surl = "https://onionoo.torproject.org/details"
if not adata:
LOG.info(f"Getting relays from {surl}")
jdata = requests.get(surl)
try:
adata = jdata.json()
except Exception as e:
# simplejson.errors.JSONDecodeError
LOG.exception(f"JSON error {e}")
return
elts = adata["relays"]
else:
elts = json.loads(adata)['relays']
if not elts:
LOG.warn(f"NO relays - are we connected?")
return
LOG.info(f"{len(elts)} relays")
for relay in elts:
if 'fingerprint' not in relay.keys():
LOG.warn(f"fingerprint not in relay for {relay}")
continue
fp = relay['fingerprint']
verified_host_names = relay.get('verified_host_names', [])
contact = relay.get("contact", None)
if not contact:
LOG.warn(f"No contact for {fp} {verified_host_names}")
continue
if 'ciissversion' not in contact:
LOG.debug(f"No ciissversion in contact in {fp}")
continue
LOG.debug(f"parsing {fp}")
result = parser.parse(contact, False)
if not result:
LOG.warn(f"No result for {contact} in {fp}")
continue
if len(result) > 0:
if opts.json: result = json.dumps(result, indent=4) if opts.pretty else json.dumps(result)
if opts.pretty:
rprint(result)
else:
print(result)
ETC_DIR = '/etc/tor/yaml'
def oparser():
cparser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent(f"""
Examples:
# 'scan' is the original behaviour of this script. It iterates over the data
# from https://onionoo.torproject.org/details , parses each contact, and prints it as Python dict-style JSON.
{sys.argv[0]} scan
# Same as previous. With no arguments, it's equivalent to running 'scan'.
{sys.argv[0]}
# If you pass '-p' after scan, it will enable pretty printing. For best pretty printing,
# make sure you have 'rich' installed from pypi.
{sys.argv[0]} scan -p
# If you need real JSON with double quotes, rather than Python dict-style JSON, you can
# use the '-j' flag to enable "real JSON" mode (you can combine with '-p' if you want pretty printed real json)
{sys.argv[0]} scan -j
# Using 'parse', you can parse an arbitrary ContactInfo string, and it will output the parsed result
# with pretty printing by default.
{sys.argv[0]} parse "contact Privex Inc. email:noc[]privex.io url:https://www.privex.io " \\
"proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc hoster:www.privex.io " \\
"uplinkbw:500 memory:4096 virtualization:kvm btc:bc1qpst9uscvd8rpjjhzz9rau3trylh6e0wh76qrlhw3q9nj89ua728sn3t6a2 " \\
"xmr:89tukP3wfpH4FZAmC1D2GfArWwfPTz8Ap46NZc54Vyhy9YxEUYoFQ7HGQ74LrCMQTD3zxvwM1ewmGjH9WVmeffwR72m1Pps"
{{
'email': 'noc@privex.io',
'url': 'https://www.privex.io',
'proof': 'uri-rsa',
'pgp': None,
'keybase': 'privexinc',
'twitter': 'PrivexInc',
'hoster': 'www.privex.io',
'uplinkbw': '500',
'memory': '4096',
'virtualization': 'kvm',
'btc': 'bc1qpst9uscvd8rpjjhzz9rau3trylh6e0wh76qrlhw3q9nj89ua728sn3t6a2',
'xmr': '89tukP3wfpH4FZAmC1D2GfArWwfPTz8Ap46NZc54Vyhy9YxEUYoFQ7HGQ74LrCMQTD3zxvwM1ewmGjH9WVmeffwR72m1Pps'
}}
# You can also pipe a contact string into 'parse', and it will work just the same.
echo "Privex Inc. email:noc[]privex.io url:https://www.privex.io proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc" | {sys.argv[0]} parse
{{'email': 'noc@privex.io', 'url': 'https://www.privex.io', 'proof': 'uri-rsa', 'pgp': None, 'keybase': 'privexinc', 'twitter': 'PrivexInc\n'}}
# If you need real JSON outputted, rather than Python dict-style output, you can pass -j to either 'parse' or 'scan'
{sys.argv[0]} parse -j "Privex Inc. email:noc[]privex.io url:https://www.privex.io proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc"
{{
"email": "noc@privex.io",
"url": "https://www.privex.io",
"proof": "uri-rsa",
"pgp": null,
"keybase": "privexinc",
"twitter": "PrivexInc"
}}
# You can use '-np' to disable pretty printing for 'parse' - you can combine it with '-j' to get flat, plain JSON.
{sys.argv[0]} parse -np -j "Privex Inc. email:noc[]privex.io url:https://www.privex.io proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc"
{{"email": "noc@privex.io", "url": "https://www.privex.io", "proof": "uri-rsa", "pgp": null, "keybase": "privexinc", "twitter": "PrivexInc"}}
"""))
cparser.set_defaults(func=cmd_scan, json=False, pretty=False)
subparse = cparser.add_subparsers()
subparse.required = False
sp_parse = subparse.add_parser('parse',
help="Parse a single contact string, either as an argument, or piped into stdin")
sp_parse.add_argument('contact', nargs='*')
sp_parse.add_argument('-np', '--no-pretty',
action='store_false', default=False, dest='pretty',
help="Disable pretty printing JSON")
sp_parse.add_argument('--relays_output', type=str,
dest='relays_output',
default=os.path.join(ETC_DIR, 'relays.json'),
help="Write the download relays in json to a file")
sp_parse.add_argument('-j', '--json', action='store_true',
default=False, dest='json',
help="Output real JSON, not Python dict format.")
sp_parse.set_defaults(func=cmd_parse)
sp_scan = subparse.add_parser('scan', help="Parse all contacts from https://onionoo.torproject.org/details")
sp_scan.add_argument('-p', action='store_true', default=False, dest='pretty', help="Enable pretty printing JSON")
sp_scan.add_argument('-j', '--json', action='store_true', default=False, dest='json', help="Output real JSON, not Python dict format.")
# sp_scan.set_defaults(func=cmd_scan)
return cparser
if __name__ == "__main__":
if os.environ.get('DEBUG', ''):
log_level = 10
else:
log_level = 20
vsetup_logging(log_level)
try:
cparser = oparser()
opts = cparser.parse_args(sys.argv[1:])
data = None
if opts.relays_output and os.path.exists(opts.relays_output):
data = open(opts.relays_output, 'rt').read()
cmd_scan(opts, data)
except (requests.exceptions.ProxyError, Exception,) as e:
LOG.error(f"{e}")
i = 0
# else:
# args = cparser.parse_args(sys.argv[1:])
# i = args.func(args)
sys.exit(i)