improvements

This commit is contained in:
emdee@spm.plastiras.org 2024-01-16 14:35:29 +00:00
parent dbe62ffbd7
commit 42cf62f623
15 changed files with 175 additions and 118 deletions

3
.gitignore vendored
View File

@ -4,6 +4,9 @@ __pycache__/
*.py[cod]
*$py.class
*~
*.dst
# C extensions
*.so

View File

@ -17,6 +17,17 @@ requires a password.
### check_digests
Checking Descriptor Digests
Tor relay information is provided by multiple documents. Signed
descriptors transitively validate others by inclusion of their
digest. For example, our consensus references server descriptor
digest, and server descriptors in turn cite extrainfo digests.
Stem can calculate digests from server, extrainfo, microdescriptor, and consensus documents. For instance, to validate an extrainfo descriptor...
https://stem.torproject.org/tutorials/examples/check_digests.html
### compare_flags Comparing Directory Authority Flags
Compares the votes of two directory authorities, in this case moria1

View File

@ -1,8 +1,7 @@
[project]
name = "stem_examples"
version = "2023.12"
description = "examples of using stem"
authors = [{ name = "emdee", email = "Ingvar@gitgub.com" } ]
authors = [{ name = "emdee", email = "emdee@spm.plastiras.org" } ]
requires-python = ">=3.6"
dependencies = [
'stem',
@ -21,8 +20,20 @@ classifiers = [
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: Implementation :: CPython",
]
#
dynamic = ["version", "readme", ] # cannot be dynamic ['license']
scripts = { exclude_badExits = "stem_examples.exclude_badExits:iMain" }
[project.scripts]
check_digests = "stem_examples.check_digests:iMain"
compare_flags = "stem_examples.compare_flags:iMain"
exit_used = "stem_examples.exit_used:iMain"
introduction_points = "stem_examples.introduction_points:iMain"
list_circuits = "stem_examples.list_circuits:iMain"
mappaddress = "stem_examples.mappaddress:iMain"
outdated_relays = "stem_examples.outdated_relays:iMain"
relay_connections = "stem_examples.relay_connections:iMain"
tor_bootstrap_check = "stem_examples.tor_bootstrap_check:iMain"
torcontactinfo = "stem_examples.torcontactinfo:iMain"
#[project.license]
#file = "LICENSE.md"
@ -36,7 +47,7 @@ build-backend = "setuptools.build_meta"
[tool.setuptools.dynamic]
version = {attr = "stem_examples.__version__"}
readme = {file = ["README.md", "exclude_badExits.md"]}
readme = {file = ["README.md", "stem_examples.txt"]}
[tool.setuptools]
packages = ["stem_examples"]

View File

@ -0,0 +1 @@
__version__ = "1.0.0"

View File

@ -1,12 +1,37 @@
# -*-mode: python; py-indent-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
# http://vt5hknv6sblkgf22.onion/tutorials/examples/check_digests.html
import sys
__doc__ = """
Checking Descriptor Digests
Tor relay information is provided by multiple documents. Signed descriptors transitively validate others by inclusion of their digest. For example, our consensus references server descriptor digest, and server descriptors in turn cite extrainfo digests.
Stem can calculate digests from server, extrainfo, microdescriptor, and
consensus documents. For instance, to validate an extrainfo descriptor...
https://stem.torproject.org/tutorials/examples/check_digests.html """
import os
import sys
import contextlib
import logging
import stem.descriptor.remote
import stem.util.tor_tools
from tor_controller import set_socks_proxy
LOG = logging.getLogger()
@contextlib.contextmanager
def ignoreStdout() -> None:
devnull = os.open(os.devnull, os.O_WRONLY)
old_stdout = os.dup(1)
sys.stdout.flush()
os.dup2(devnull, 1)
os.close(devnull)
try:
yield
finally:
os.dup2(old_stdout, 1)
os.close(old_stdout)
def download_descriptors(fingerprint):
"""
Downloads the descriptors we need to validate this relay. Downloads are
@ -30,27 +55,73 @@ def download_descriptors(fingerprint):
extrainfo_query.run()[0],
)
if __name__ == '__main__':
set_socks_proxy()
fingerprint = input("What relay fingerprint would you like to validate?\n")
print('') # blank line
def iMain(lArgs=None):
# set_socks_proxy()
iRetval = 0
if lArgs is None:
fingerprint = input("What relay fingerprint would you like to validate?\n")
print('') # blank line
lArgs = [fingerprint]
if not stem.util.tor_tools.is_valid_fingerprint(fingerprint):
print("'%s' is not a valid relay fingerprint" % fingerprint)
sys.exit(1)
for fingerprint in lArgs:
log.INFO(f"checking digests of fp={fp}")
if not stem.util.tor_tools.is_valid_fingerprint(fingerprint):
LOG.error("'%s' is not a valid relay fingerprint" % fingerprint)
iRetval += 1
continue
try:
router_status_entry, server_desc, extrainfo_desc = download_descriptors(fingerprint)
except Exception as exc:
LOG.exception(f"Exception in download_descriptors {exc}")
iRetval += 1
continue
if router_status_entry.digest == server_desc.digest():
LOG.info("Server descriptor digest is correct")
else:
LOG.warn("Server descriptor digest invalid, expected %s but is %s" % (
router_status_entry.digest, server_desc.digest()))
if server_desc.extra_info_digest == extrainfo_desc.digest():
LOG.info("Extrainfo descriptor digest is correct")
else:
LOG.warn("Extrainfo descriptor digest invalid, expected %s but is %s" % (
server_desc.extra_info_digest, extrainfo_desc.digest()))
return iRetval
if __name__ == '__main__':
LOG.setLevel(logging.DEBUG)
try:
router_status_entry, server_desc, extrainfo_desc = download_descriptors(fingerprint)
except Exception as exc:
print(exc)
sys.exit(1)
import stem.descriptor.remote
import stem.util.tor_tools
logging.getLogger('stem').setLevel(20)
# bizarre uncatchable stem error
import stem.response.protocolinfo
import stem.response.mapaddress
import stem.response.getconf
import stem.response.getinfo
import stem.response.authchallenge
# 'tuple' object has no attribute 'endswith'
if len(sys.argv) > 1:
lArgs = sys.argv[1:]
LOG.info(f"Getting some {len(lArgs)}")
else:
sKNOWN_ONION = 'facebookwkhpilnemxj7asaniu7vnjjbiltxjqhye3mhbshg7kx5tfyd', # facebook
LOG.info("Getting some FPs from a sKNOWN_ONION")
from stem_examples.introduction_points import lMain
# with ignoreStdout():
lArgs = lMain([sKNOWN_ONION])
LOG.debug(f"Got {len(lArgs)} FPs from a sKNOWN_ONION")
i = iMain(lArgs)
except KeyboardInterrupt as e:
i = 0
except Exception as e:
LOG.exception(f"Exception in iMain {e}")
i = 1
sys.exit(i)
if router_status_entry.digest == server_desc.digest():
print("Server descriptor digest is correct")
else:
print("Server descriptor digest invalid, expected %s but is %s" % (router_status_entry.digest, server_desc.digest()))
if server_desc.extra_info_digest == extrainfo_desc.digest():
print("Extrainfo descriptor digest is correct")
else:
print("Extrainfo descriptor digest invalid, expected %s but is %s" % (server_desc.extra_info_digest, extrainfo_desc.digest()))

View File

@ -1,5 +1,11 @@
# -*-mode: python; py-indent-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
#
__doc__ = """
Compares the votes of two directory authorities, in this case moria1
and maatuska, with a special interest in the 'Running' flag.
https://stem.torproject.org/tutorials/examples/compare_flags.html
"""
import sys
import collections

View File

@ -14,7 +14,7 @@ import os
from stem import StreamStatus
from stem.control import EventType, Controller
from tor_controller import get_controller
from stem_examples.tor_controller import get_controller
def stream_event(controller, event):
if event.status == StreamStatus.SUCCEEDED and event.circ_id:
@ -30,7 +30,6 @@ def stream_event(controller, event):
print(" locale: %s" % controller.get_info("ip-to-country/%s" % exit_relay.address, 'unknown'))
print("")
from tor_controller import get_controller
def iMain():
print("Please wait for requests for tor exits. Press 'enter' to end.")

View File

@ -1,38 +0,0 @@
# -*-mode: python; py-indent-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
#http://vt5hknv6sblkgf22.onion/tutorials/examples/exit_used.html
import functools
from stem import StreamStatus
from stem.control import EventType, Controller
def main():
print("Tracking requests for tor exits. Press 'enter' to end.")
print("")
with Controller.from_port() as controller:
controller.authenticate()
stream_listener = functools.partial(stream_event, controller)
controller.add_event_listener(stream_listener, EventType.STREAM)
raw_input() # wait for user to press enter
def stream_event(controller, event):
if event.status == StreamStatus.SUCCEEDED and event.circ_id:
circ = controller.get_circuit(event.circ_id)
exit_fingerprint = circ.path[-1][0]
exit_relay = controller.get_network_status(exit_fingerprint)
print("Exit relay for our connection to %s" % (event.target))
print(" address: %s:%i" % (exit_relay.address, exit_relay.or_port))
print(" fingerprint: %s" % exit_relay.fingerprint)
print(" nickname: %s" % exit_relay.nickname)
print(" locale: %s" % controller.get_info("ip-to-country/%s" % exit_relay.address, 'unknown'))
print("")
if __name__ == '__main__':
main()

View File

@ -13,27 +13,37 @@ Tor2web provides a quick and easy way of seeing if your hidden service is workin
This script tests if you can reach a hidden service, passed as an onion address
as an argument. If no argument is given, 3 common onion sites are tested:
Facebook, DuckDuckGo, and .
Facebook, DuckDuckGo.
"""
import sys
import os
import getpass
import logging
from stem.control import Controller
from tor_controller import get_controller
from stem_examples.tor_controller import get_controller
LOG = logging.getLogger()
lKNOWN_ONIONS = [
'facebookwkhpilnemxj7asaniu7vnjjbiltxjqhye3mhbshg7kx5tfyd', # facebook
'duckduckgogg42xjoc72x3sjasowoarfbgcmvfimaftt6twagswzczad', # ddg
'zkaan2xfbuxia2wpf7ofnkbz6r5zdbbvxbunvp5g2iebopbfc4iqmbad', # hks
]
def iMain(lArgs=None):
lRetval = lMain(lArgs)
if lRetval is None:
return -1
return 0
def lMain(lArgs=None):
lRetval = []
if lArgs is None:
lArgs = sys.argv[1:]
lArgs = lKNOWN_ONIONS
try:
if os.path.exists('/run/tor/control'):
controller = get_controller(unix='/run/tor/control')
@ -53,6 +63,7 @@ def iMain(lArgs=None):
print(f"{elt} introduction points are...\n")
for introduction_point in l:
lRetval += [introduction_point]
print(' %s:%s => %s' % (introduction_point.address,
introduction_point.port,
introduction_point.identifier))
@ -61,12 +72,15 @@ def iMain(lArgs=None):
print(e)
finally:
del controller
return 0
return lRetval
if __name__ == '__main__':
if len(sys.argv) <= 1:
lArgs = lKNOWN_ONIONS
else:
lArgs = sys.argv[1:]
sys.exit(iMain())
LOG.setLevel(logging.INFO)
try:
i = iMain(sys.argv[1:])
except KeyboardInterrupt as e:
i = 0
except Exception as e:
i = 1
sys.exit(i)

View File

@ -1,11 +0,0 @@
from stem.control import Controller
with Controller.from_port(port = 9051) as controller:
controller.authenticate()
desc = controller.get_hidden_service_descriptor('3g2upl4pq6kufc4m')
print("DuckDuckGo's introduction points are...\n")
for introduction_point in desc.introduction_points():
print(' %s:%s => %s' % (introduction_point.address, introduction_point.port, introduction_point.identifier))
# http://vt5hknv6sblkgf22.onion/tutorials/over_the_river.html

View File

@ -1,25 +0,0 @@
# -*-mode: python; py-indent-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
# http://vt5hknv6sblkgf22.onion/tutorials/examples/list_circuits.html
from stem import CircStatus
from stem.control import Controller
# port(port = 9051)
with Controller.from_socket_file(path='/var/run/tor/control') as controller:
controller.authenticate()
for circ in sorted(controller.get_circuits()):
if circ.status != CircStatus.BUILT:
continue
print("")
print("Circuit %s (%s)" % (circ.id, circ.purpose))
for i, entry in enumerate(circ.path):
div = '+' if (i == len(circ.path) - 1) else '|'
fingerprint, nickname = entry
desc = controller.get_network_status(fingerprint, None)
address = desc.address if desc else 'unknown'
print(" %s- %s (%s, %s)" % (div, fingerprint, nickname, address))

View File

@ -9,7 +9,7 @@ import os
from stem import StreamStatus
from stem.control import EventType, Controller
from tor_controller import set_socks_proxy
from tor_controller import get_controller
from stem_examples.tor_controller import get_controller
global LOG
import logging

View File

@ -1,12 +1,27 @@
#!/usr/local/bin/python3.sh
# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*-
__doc__ = """List Outdated Relays
Time marches on. Tor makes new releases, and at some point needs to drop
support for old ones. Below is the script we used on ticket 9476 to reach out
to relay operators that needed to upgrade.
https://stem.torproject.org/tutorials/examples/outdated_relays.html
"""
import sys
import logging
from stem.descriptor.remote import DescriptorDownloader
from stem.version import Version
from tor_controller import set_socks_proxy
LOG = logging.getLogger()
def iMain():
set_socks_proxy()
downloader = DescriptorDownloader()
downloader = DescriptorDownloader(use_mirrors=True)
count, with_contact = 0, 0
elts = downloader.get_server_descriptors()
print(f"Checking for outdated relays len server_descriptors={len(list(elts))}...")

View File

@ -24,7 +24,7 @@ from stem.control import Listener
from stem.control import Controller
from stem.util.connection import get_connections, port_usage, is_valid_ipv4_address
from tor_controller import get_controller
from stem_examples.tor_controller import get_controller
global LOG
import logging

View File

@ -14,7 +14,7 @@ import os
import re
from stem.connection import connect
from tor_controller import get_controller
from stem_examples.tor_controller import get_controller
def iMain(lArgs=None):
password = os.environ.get('TOR_CONTROLLER_PASSWORD')