#!/usr/local/bin/python3.sh # -*-mode: python; py-indent-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*- # https://stem.torproject.org/tutorials/examples/relay_connections.html __doc__ = """Connection Summary The following provides a summary of your relay's inbound and outbound connections. To use this you must set DisableDebuggerAttachment 0 in your torrc. Otherwise connection information will be unavailable. """ import argparse import collections import os import sys import time import stem.connection import stem.util.system import stem.util.str_tools from stem.control import Listener, Controller from stem.util.connection import get_connections, port_usage, is_valid_ipv4_address from stem_examples.tor_controller import get_controller import logging global LOG LOG = logging.getLogger() HEADER_LINE = " {version} uptime: {uptime} flags: {flags}\n" DIV = '+%s+%s+%s+' % ('-' * 30, '-' * 6, '-' * 6) COLUMN = '| %-28s | %4s | %4s |' INBOUND_ORPORT = 'Inbound to our ORPort' INBOUND_DIRPORT = 'Inbound to our DirPort' INBOUND_CONTROLPORT = 'Inbound to our ControlPort' OUTBOUND_ORPORT = 'Outbound to a relay' OUTBOUND_EXIT = 'Outbound exit traffic' OUTBOUND_UNKNOWN = 'Outbound uncategorized' def iMain(lArgs=None): if lArgs is None: lArgs = sys.argv[1:] parser = argparse.ArgumentParser() parser.add_argument("--ctrlport", default=9051, type=int, help="default: 9051") parser.add_argument("--resolver", help="default: autodetected") args = parser.parse_args(lArgs) password = os.environ.get('TOR_CONTROLLER_PASSWORD', '') control_port = int(args.ctrlport) if args.ctrlport else 'default' if False and os.path.exists('/run/tor/control'): controller = get_controller(password=password, unix='/run/tor/control') else: controller = get_controller(password=password, port=control_port) if not controller: return 1 desc = controller.get_network_status(default=None) pid = controller.get_pid() version = str(controller.get_version()).split()[0], uptime = stem.util.str_tools.short_time_label(time.time() - stem.util.system.start_time(pid)) LOG.info(HEADER_LINE.format( version=version, uptime=uptime, flags = ', '.join(desc.flags if desc else ['none']), )) policy = controller.get_exit_policy() relays = {} # address => [orports...] for desc in controller.get_network_statuses(): relays.setdefault(desc.address, []).append(desc.or_port) # categorize our connections categories = collections.OrderedDict(( (INBOUND_ORPORT, []), (INBOUND_DIRPORT, []), (INBOUND_CONTROLPORT, []), (OUTBOUND_ORPORT, []), (OUTBOUND_EXIT, []), (OUTBOUND_UNKNOWN, []), )) exit_connections = {} # port => [connections] for conn in get_connections(resolver = args.resolver, process_pid = pid): if conn.protocol == 'udp': continue if conn.local_port in controller.get_ports(Listener.OR, []): categories[INBOUND_ORPORT].append(conn) elif conn.local_port in controller.get_ports(Listener.DIR, []): categories[INBOUND_DIRPORT].append(conn) elif conn.local_port in controller.get_ports(Listener.CONTROL, []): categories[INBOUND_CONTROLPORT].append(conn) elif conn.remote_port in relays.get(conn.remote_address, []): categories[OUTBOUND_ORPORT].append(conn) elif policy.can_exit_to(conn.remote_address, conn.remote_port): categories[OUTBOUND_EXIT].append(conn) exit_connections.setdefault(conn.remote_port, []).append(conn) else: categories[OUTBOUND_UNKNOWN].append(conn) print(DIV) print(COLUMN % ('Type', 'IPv4', 'IPv6')) print(DIV) total_ipv4, total_ipv6 = 0, 0 for label, connections in categories.items(): if len(connections) == 0: continue ipv4_count = len([conn for conn in connections if is_valid_ipv4_address(conn.remote_address)]) ipv6_count = len(connections) - ipv4_count total_ipv4, total_ipv6 = total_ipv4 + ipv4_count, total_ipv6 + ipv6_count print(COLUMN % (label, ipv4_count, ipv6_count)) print(DIV) print(COLUMN % ('Total', total_ipv4, total_ipv6)) print(DIV) print('') if exit_connections: print(DIV) print(COLUMN % ('Exit Port', 'IPv4', 'IPv6')) print(DIV) total_ipv4, total_ipv6 = 0, 0 for port in sorted(exit_connections): connections = exit_connections[port] ipv4_count = len([conn for conn in connections if is_valid_ipv4_address(conn.remote_address)]) ipv6_count = len(connections) - ipv4_count total_ipv4, total_ipv6 = total_ipv4 + ipv4_count, total_ipv6 + ipv6_count usage = port_usage(port) label = '%s (%s)' % (port, usage) if usage else port print(COLUMN % (label, ipv4_count, ipv6_count)) print(DIV) print(COLUMN % ('Total', total_ipv4, total_ipv6)) print(DIV) print('') if __name__ == '__main__': from stem_examples.stem_utils import vsetup_logging LOG = logging.getLogger() if os.environ.get('DEBUG', ''): log_level = 10 else: log_level = 20 vsetup_logging(LOG, log_level) LOG.setLevel(logging.DEBUG) try: l = iMain([]) if l: print(l) i = 0 except KeyboardInterrupt as e: i = 0 except Exception as e: LOG.exception(f"Exception {e}") i = 1 sys.exit(i)