stem_examples/src/stem_examples/check_digests.py

133 lines
4.3 KiB
Python
Executable File

# -*-mode: python; py-indent-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
__doc__ = """
Checking Descriptor Digests
Tor relay information is provided by multiple documents. Signed descriptors transitively validate others by inclusion of their digest. For example, our consensus references server descriptor digest, and server descriptors in turn cite extrainfo digests.
Stem can calculate digests from server, extrainfo, microdescriptor, and
consensus documents. For instance, to validate an extrainfo descriptor...
https://stem.torproject.org/tutorials/examples/check_digests.html """
import os
import sys
import contextlib
import logging
from stem_examples.tor_controller import set_socks_proxy
from stem_examples.stem_utils import vsetup_logging
LOG = logging.getLogger()
@contextlib.contextmanager
def ignoreStdout() -> None:
devnull = os.open(os.devnull, os.O_WRONLY)
old_stdout = os.dup(1)
sys.stdout.flush()
os.dup2(devnull, 1)
os.close(devnull)
try:
yield
finally:
os.dup2(old_stdout, 1)
os.close(old_stdout)
def download_descriptors(fingerprint):
"""
Downloads the descriptors we need to validate this relay. Downloads are
parallelized, providing the caller with a tuple of the form...
(router_status_entry, server_descriptor, extrainfo_descriptor)
"""
conensus_query = stem.descriptor.remote.get_consensus()
server_desc_query = stem.descriptor.remote.get_server_descriptors(fingerprint)
extrainfo_query = stem.descriptor.remote.get_extrainfo_descriptors(fingerprint)
router_status_entries = filter(lambda desc: desc.fingerprint == fingerprint, conensus_query.run())
if len(router_status_entries) != 1:
raise IOError("Unable to find relay '%s' in the consensus" % fingerprint)
return (
router_status_entries[0],
server_desc_query.run()[0],
extrainfo_query.run()[0],
)
def iMain(lArgs=None):
global LOG
# set_socks_proxy()
iRetval = 0
if lArgs is None:
fingerprint = input("What relay fingerprint would you like to validate?\n")
print('') # blank line
lArgs = [fingerprint]
for fingerprint in lArgs:
LOG.info(f"checking digests of fp={fp}")
if not stem.util.tor_tools.is_valid_fingerprint(fingerprint):
LOG.error("'%s' is not a valid relay fingerprint" % fingerprint)
iRetval += 1
continue
try:
router_status_entry, server_desc, extrainfo_desc = download_descriptors(fingerprint)
except Exception as exc:
LOG.exception(f"Exception in download_descriptors {exc}")
iRetval += 1
continue
if router_status_entry.digest == server_desc.digest():
LOG.info("Server descriptor digest is correct")
else:
LOG.warn("Server descriptor digest invalid, expected %s but is %s" % (
router_status_entry.digest, server_desc.digest()))
if server_desc.extra_info_digest == extrainfo_desc.digest():
LOG.info("Extrainfo descriptor digest is correct")
else:
LOG.warn("Extrainfo descriptor digest invalid, expected %s but is %s" % (
server_desc.extra_info_digest, extrainfo_desc.digest()))
return iRetval
if __name__ == '__main__':
if os.environ.get('DEBUG', ''):
log_level = 10
else:
log_level = 20
vsetup_logging(LOG, log_level)
try:
logging.getLogger('stem').setLevel(20)
import stem.descriptor.remote
import stem.util.tor_tools
# bizarre uncatchable stem error
import stem.response.protocolinfo
import stem.response.mapaddress
import stem.response.getconf
import stem.response.getinfo
import stem.response.authchallenge
# 'tuple' object has no attribute 'endswith'
if len(sys.argv) > 1:
lArgs = sys.argv[1:]
LOG.info(f"Getting some {len(lArgs)}")
else:
sKNOWN_ONION = 'facebookwkhpilnemxj7asaniu7vnjjbiltxjqhye3mhbshg7kx5tfyd' # facebook
LOG.info("Getting some FPs from a sKNOWN_ONION")
from stem_examples.introduction_points import lMain
with ignoreStdout():
lArgs = lMain([sKNOWN_ONION])
LOG.info(f"Got {len(lArgs)} FPs from a sKNOWN_ONION")
i = iMain(lArgs)
except KeyboardInterrupt as e:
i = 0
except Exception as e:
LOG.exception(f"Exception in iMain {e}")
i = 1
sys.exit(i)