# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*- # # @file tests.py # @author Wei-Ning Huang (AZ) # # Copyright (C) 2013 - 2014 Wei-Ning Huang (AZ) # All Rights reserved. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # """Originaly from https://github.com/oxij/PyTox c-toxcore-02 branch which itself was forked from https://github.com/aitjcize/PyTox/ Modified to work with toxygen_wrapper Unlike almost all of the c-toxcore ctests, these are real tests that take place over and Internet connection. These tests create the alice and bob Toxes for each testcase. We could do it once for the testsuite but we are testing a ctypes wrapper and what we think we've seen is errors in the wrapper can corrupt memory that shows as a SEGV but not nesessarily right-away: could be a little later. So for cleanliness and purity we remake the Toxes, which means we have to wait in each test to get connected on tests that require connectivity, which can be slow over tor: ~40 sec. is not unusual, but less for directly connected. The other advantage of a fresh tox each time is that the tests are more reproducible and comparable. So typically this testsuite takes ~1000 sec. direct and 1500 sec. over Tor, but Tor can have bad weeks so these Tor times could double or more. We should consider reusing a tox profile between testcases to cache the peers. """ import ctypes import faulthandler import hashlib import logging import os import random import re import sys import threading import traceback import unittest from ctypes import * from typing import Union, Callable, Union faulthandler.enable() import warnings warnings.filterwarnings('ignore') try: from io import BytesIO import certifi import pycurl except ImportError: pycurl = None # from pyannotate_runtime import collect_types try: import coloredlogs os.environ['COLOREDLOGS_LEVEL_STYLES'] = 'spam=22;debug=28;verbose=34;notice=220;warning=202;success=118,bold;error=124;critical=background=red' except ImportError as e: logging.log(logging.DEBUG, f"coloredlogs not available: {e}") coloredlogs = None try: import color_runner except ImportError as e: logging.log(logging.DEBUG, f"color_runner not available: {e}") color_runner = None import toxygen_wrapper import toxygen_wrapper.toxcore_enums_and_consts as enums from toxygen_wrapper.tox import Tox, UINT32_MAX, ToxError from toxygen_wrapper.toxcore_enums_and_consts import (TOX_ADDRESS_SIZE, TOX_CONNECTION, TOX_FILE_CONTROL, TOX_MESSAGE_TYPE, TOX_SECRET_KEY_SIZE, TOX_USER_STATUS) try: import support_testing as ts import support_onions as so from wrapper_mixin import WrapperMixin except ImportError: import toxygen_wrapper.tests.support_testing as ts import toxygen_wrapper.tests.support_onions as so from tox_wrapper.wrapper_mixin import WrapperMixin try: from tests.toxygen_tests import test_sound_notification bIS_NOT_TOXYGEN = False except ImportError: bIS_NOT_TOXYGEN = True # from PyQt5 import QtCore import time sleep = time.sleep global LOG LOG = logging.getLogger('TestS') if False: def LOG_ERROR(l: str) -> None: LOG.error('+ '+l) def LOG_WARN(l: str) -> None: LOG.warn('+ '+l) def LOG_INFO(l: str) -> None: LOG.info('+ '+l) def LOG_DEBUG(l: str) -> None: LOG.debug('+ '+l) def LOG_TRACE(l: str) -> None: pass # print('+ '+l) else: # just print to stdout so there is NO complications from logging. def LOG_ERROR(l: str) -> None: print('EROR+ '+l) def LOG_WARN(l: str) -> None: print('WARN+ '+l) def LOG_INFO(l: str) -> None: print('INFO+ '+l) def LOG_DEBUG(l: str) -> None: print('DEBUG+ '+l) def LOG_TRACE(l: str) -> None: pass # print('TRAC+ '+l) ADDR_SIZE = 38 * 2 CLIENT_ID_SIZE = 32 * 2 fSOCKET_TIMEOUT = 15.0 iN = 6 global oTOX_OPTIONS oTOX_OPTIONS = {} bIS_LOCAL = 'new' in sys.argv or 'local' in sys.argv or 'newlocal' in sys.argv def expectedFailure(test_item): test_item.__unittest_expecting_failure__ = True return test_item def expectedFail(reason: str): """ expectedFailure with a reason """ def decorator(test_item): test_item.__unittest_expecting_failure__ = True return test_item return decorator class ToxOptions(): def __init__(self): self.ipv6_enabled = True self.udp_enabled = True self.proxy_type = 0 self.proxy_host = '' self.proxy_port = 0 self.start_port = 0 self.end_port = 0 self.tcp_port = 0 self.savedata_type = 0 # 1=toxsave, 2=secretkey self.savedata_data = b'' self.savedata_length = 0 self.local_discovery_enabled = False self.dht_announcements_enabled = True self.hole_punching_enabled = False self.experimental_thread_safety = False class App(): def __init__(self): self.mode = 0 oAPP = App() class AliceTox(Tox): def __init__(self, opts, args, app=None): super(AliceTox, self).__init__(opts, app=app) self._address = self.self_get_address() self.name = 'alice' self._opts = opts self._app = app self._args = args class BobTox(Tox): def __init__(self, opts, args, app=None): super(BobTox, self).__init__(opts, app=app) self._address = self.self_get_address() self.name = 'bob' self._opts = opts self._app = app self._args = args class BaseThread(threading.Thread): def __init__(self, name=None, target=None): if name: super().__init__(name=name, target=target) else: super().__init__(target=target) self._stop_thread = False self.name = name def stop_thread(self, timeout: int = -1) -> None: self._stop_thread = True if timeout < 0: timeout = ts.iTHREAD_TIMEOUT i = 0 while i < ts.iTHREAD_JOINS: self.join(timeout) if not self.is_alive(): break i = i + 1 else: LOG.warning(f"{self.name} BLOCKED") class ToxIterateThread(BaseThread): def __init__(self, tox): super().__init__(name='ToxIterateThread') self._tox = tox def run(self) -> None: while not self._stop_thread: self._tox.iterate() sleep(self._tox.iteration_interval() / 1000) def prepare(self): def bobs_on_self_connection_status(iTox, connection_state, *args) -> None: status = connection_state self.bob.dht_connected = status self.bob.mycon_time = time.time() try: if status != TOX_CONNECTION['NONE']: LOG_INFO(f"bobs_on_self_connection_status TRUE {status}" \ +f" last={int(self.bob.mycon_time)}" ) self.bob.self_conn_status = True else: LOG_DEBUG(f"bobs_on_self_connection_status FALSE {status}" \ +f" last={int(self.bob.mycon_time)}" ) self.bob.self_conn_status = False except Exception as e: LOG_ERROR(f"bobs_on_self_connection_status EXCEPTION {e}") else: if self.bob.self_get_connection_status() != status: LOG_WARN(f"bobs_on_self_connection_status DISAGREE {status}") def alices_on_self_connection_status(iTox, connection_state: int, *args) -> None: #FixMe connection_num status = connection_state self.alice.dht_connected = status self.alice.mycon_time = time.time() try: if status != TOX_CONNECTION['NONE']: LOG_INFO(f"alices_on_self_connection_status TRUE {status}" \ +f" last={int(self.alice.mycon_time)}" ) self.alice.self_conn_status = True else: LOG_DEBUG(f"alices_on_self_connection_status FALSE {status}" \ +f" last={int(self.alice.mycon_time)}" ) self.alice.self_conn_status = False except Exception as e: LOG_ERROR(f"alices_on_self_connection_status EXCEPTION error={e}") opts = oTestsToxOptions(oTOX_OARGS) alice = AliceTox(opts, oTOX_OARGS, app=oAPP) alice.dht_connected = -1 alice.self_conn_status = False alice.mycon_time = 1 alice.callback_self_connection_status(alices_on_self_connection_status) # only bob logs trace_enabled if oTOX_OARGS.trace_enabled: LOG.info(f"toxcore trace_enabled") ts.vAddLoggerCallback(opts) else: LOG.debug(f"toxcore trace_enabled=False") bob = BobTox(opts, oTOX_OARGS, app=oAPP) bob.dht_connected = -1 bob.self_conn_status = False bob.mycon_time = 1 bob.callback_self_connection_status(bobs_on_self_connection_status) if not bIS_LOCAL and not ts.bAreWeConnected(): LOG.warning(f"doOnce not local and NOT CONNECTED") return [bob, alice] class ToxSuite(unittest.TestCase, WrapperMixin): failureException = AssertionError def setUp(self): if not hasattr(self, 'alice') and not hasattr(self, 'bob'): l = prepare(self) assert l self.bob, self.alice = l if False and not hasattr(self.bob, '_main_loop'): self.bob._main_loop = ToxIterateThread(self.bob) self.bob._main_loop.start() LOG.debug(f"self.bob._main_loop: ") # {threading.enumerate()} if False and not hasattr(self.alice, '_main_loop'): self.alice._main_loop = ToxIterateThread(self.alice) self.alice._main_loop.start() LOG.debug(f"self.alice._main_loop: ") # {threading.enumerate()} self.bBobNeedAlice() self.bAliceNeedAddBob() def run(self, result=None) -> None: """ Stop after first error """ if result and not result.errors: super(ToxSuite, self).run(result) @classmethod def setUpClass(cls) -> None: global oTOX_OARGS assert oTOX_OPTIONS cls.lUdp = ts.generate_nodes( oArgs=oTOX_OARGS, nodes_count=2*ts.iNODES, ipv='ipv4', udp_not_tcp=True) cls.lTcp = ts.generate_nodes( oArgs=oTOX_OARGS, nodes_count=2*ts.iNODES, ipv='ipv4', udp_not_tcp=False) def tearDown(self) -> None: """ """ if hasattr(self, 'bob') and self.bob.self_get_friend_list_size() >= 1: LOG.warn(f"tearDown BOBS STILL HAS A FRIEND LIST {self.bob.self_get_friend_list()}") for elt in self.bob.self_get_friend_list(): self.bob.friend_delete(elt) if hasattr(self, 'alice') and self.alice.self_get_friend_list_size() >= 1: LOG.warn(f"tearDown ALICE STILL HAS A FRIEND LIST {self.alice.self_get_friend_list()}") for elt in self.alice.self_get_friend_list(): self.alice.friend_delete(elt) # LOG.debug(f"tearDown threads={threading.active_count()}") if hasattr(self, 'bob'): self.bob.callback_self_connection_status(None) if hasattr(self.bob, 'main_loop'): self.bob._main_loop.stop_thread() del self.bob._main_loop # self.bob.kill() del self.bob if hasattr(self, 'alice'): self.alice.callback_self_connection_status(None) if hasattr(self.alice, 'main_loop'): self.alice._main_loop.stop_thread() del self.alice._main_loop # self.alice.kill() del self.alice @classmethod def tearDownClass(cls) -> None: if hasattr(cls, 'bob'): cls.bob._main_loop.stop_thread() cls.bob.kill() del cls.bob if hasattr(cls, 'alice'): cls.alice._main_loop.stop_thread() cls.alice.kill() del cls.alice # tests are executed in order def test_notice_log(self) -> None: # works notice = '/var/lib/tor/.SelekTOR/3xx/cache/9050/notice.log' if os.path.exists(notice): iRet = os.system(f"sudo sed -e '1,/.notice. Bootstrapped 100%/d' {notice}" + \ "| grep 'Tried for 120 seconds to get a connection to :0.'") if iRet == 0: raise SystemExit("seconds to get a connection to :0") else: LOG.debug(f"checked {notice}") def test_tests_logging(self): # works with self.assertLogs('foo', level='INFO') as cm: logging.getLogger('foo').info('first message') logging.getLogger('foo.bar').error('second message') logging.getLogger('foo.bar.baz').debug('third message') self.assertEqual(cm.output, ['INFO:foo:first message', 'ERROR:foo.bar:second message']) def test_key_size(self) -> None: # works """ t:tox_address_size t:tox_file_id_length t:tox_public_key_size t:tox_secret_key_size t:tox_group_max_topic_length t:tox_group_max_part_length t:tox_group_max_message_length t:tox_group_max_custom_lossy_packet_length t:tox_group_max_custom_lossless_packet_length t:tox_group_max_group_name_length t:tox_group_max_password_size t:tox_group_chat_id_size t:tox_group_peer_public_key_size """ otox = self.bob for elt in ( 'tox_address_size', 'tox_version_major', 'tox_version_minor', 'tox_version_patch', 'tox_file_id_length', 'tox_public_key_size', 'tox_secret_key_size', 'tox_group_max_topic_length', 'tox_group_max_part_length', 'tox_group_max_message_length', 'tox_group_max_custom_lossy_packet_length', 'tox_group_max_custom_lossless_packet_length', 'tox_group_max_group_name_length', 'tox_group_max_password_size', 'tox_group_chat_id_size', 'tox_group_peer_public_key_size', 'tox_hash_length', 'tox_max_custom_packet_size', 'tox_max_filename_length', 'tox_max_friend_request_length', 'tox_max_hostname_length', 'tox_max_message_length', 'tox_max_name_length', 'tox_max_status_message_length', 'tox_nospam_size', 'tox_pass_salt_length', ): ret = getattr(otox, elt)() LOG.info(f"test_key_size: {elt} -> {ret} ") def test_hash(self): # works otox = self.bob string = 'abcdef' name = otox.hash(bytes(string, 'utf-8')) assert name string = b'abcdef' name = otox.hash(string) assert name LOG.info(f"test_hash: {string} -> {name} ") def test_tests_start(self) -> None: # works """ t:hash t:kill t:libtoxcore t:options_default t:options_free t:options_new t:self_get_toxid """ LOG.info("test_tests_start " ) port = ts.tox_bootstrapd_port() assert len(self.bob._address) == 2*TOX_ADDRESS_SIZE, len(self.bob._address) assert len(self.alice._address) == 2*TOX_ADDRESS_SIZE, \ len(self.alice._address) assert self.bob.self_get_address() == self.bob._address assert self.alice.self_get_address() == self.alice._address def test_bootstrap_local_netstat(self) -> None: # works """ t:callback_file_chunk_request t:callback_file_recv t:callback_file_recv_chunk t:callback_file_recv_control t:callback_friend_connection_status t:callback_friend_lossless_packet t:callback_friend_lossy_packet t:callback_friend_message t:callback_friend_name t:callback_friend_read_receipt t:callback_friend_request t:callback_friend_status t:callback_friend_status_message t:callback_friend_typing t:callback_group_custom_packet t:callback_group_invite """ otox = self.bob if otox._args.network not in ['new', 'newlocal', 'local']: return port = ts.tox_bootstrapd_port() if not port: return iStatus = os.system(f"""netstat -nle4 | grep :{port}""") if iStatus == 0: LOG.info(f"bootstrap_local_netstat port {port} iStatus={iStatus}") else: LOG.warning(f"bootstrap_local_netstat NOT {port} iStatus={iStatus}") def test_bootstrap_local(self, fsocket_timeout:float = fSOCKET_TIMEOUT) -> bool: # works """ t:call_bootstrap t:add_tcp_relay t:self_get_dht_id """ # get port from /etc/tox-bootstrapd.conf 33445 self.call_bootstrap(fsocket_timeout=fsocket_timeout) # ts.bootstrap_local(self, self.lUdp) i = 0 iStatus = -1 while i < 10: i = i + 1 iStatus = self.bob.self_get_connection_status() if iStatus != TOX_CONNECTION['NONE']: break sleep(3) else: pass o1 = self.alice.self_get_dht_id() assert len(o1) == 64 o2 = self.bob.self_get_dht_id() assert len(o2) == 64 # if o1 != o2: LOG.warning(f"bootstrap_local DHT NOT same {o1} {o2} iStatus={iStatus}") iStatus = self.bob.self_get_connection_status() if iStatus != TOX_CONNECTION['NONE']: LOG.info(f"bootstrap_local connected iStatus={iStatus}") return True iStatus = self.alice.self_get_connection_status() if iStatus != TOX_CONNECTION['NONE']: LOG.info(f"bootstrap_local connected iStatus={iStatus}") return True LOG.warning(f"bootstrap_local NOT CONNECTED iStatus={iStatus}") return False @unittest.skipIf(os.geteuid() != 0, 'must be root') def test_bootstrap_iNmapInfo(self) -> None: # works # if os.environ['USER'] != 'root': # return iStatus = self.bob.self_get_connection_status() LOG.info(f"test_bootstrap_iNmapInfo connected bob iStatus={iStatus}") otox = self.bob if otox._args.network in ['new', 'newlocal', 'localnew']: lElts = self.lUdp elif otox._args.proxy_port > 0: lElts = self.lTcp else: lElts = self.lUdp lRetval = [] random.shuffle(lElts) # assert ts.bootstrap_iNmapInfo(lElts, otox._args, "tcp4", bIS_LOCAL=bIS_LOCAL, iNODES=8) def test_self_get_secret_key(self) -> None: # works """ t:self_get_secret_key """ # test_self_get_secret_key CRYPTO_SECRET_KEY_SIZE = 32 secret_key = create_string_buffer(CRYPTO_SECRET_KEY_SIZE) oRet0 = self.alice.self_get_secret_key(secret_key) assert oRet0, repr(oRet0) LOG.info('test_self_get_secret_key ' +repr(oRet0)) assert len(str(oRet0)) del secret_key def test_self_get_public_keys(self) -> None: # works """ t:self_get_secret_key t:self_get_public_key """ LOG.info('test_self_get_public_keys self.alice.self_get_secret_key') oRet0 = self.alice.self_get_secret_key() assert len(oRet0) LOG.info('test_self_get_public_keys ' +repr(oRet0)) oRet1 = self.alice.self_get_public_key() assert len(oRet1) LOG.info('test_self_get_public_keys ' +repr(oRet1)) assert oRet0 != oRet1, repr(oRet0) +' != ' +repr(oRet1) def test_self_name(self) -> None: # works """ t:self_set_name t:self_get_name t:self_get_name_size """ self.alice.self_set_name('Alice') assert self.alice.self_get_name() == 'Alice' assert self.alice.self_get_name_size() == len('Alice') self.bob.self_set_name('Bob') assert self.bob.self_get_name() == 'Bob' assert self.bob.self_get_name_size() == len('Bob') @unittest.skip('loud') @unittest.skipIf(bIS_NOT_TOXYGEN or oTOX_OARGS.mode == 0, 'not testing in toxygen') def test_sound_notification(self) -> None: # works """ Plays sound notification :param type of notification """ from tests.toxygen_tests import test_sound_notification test_sound_notification(self) def test_address(self) -> None: # works """ t:self_get_address t:self_get_nospam t:self_set_nospam t:self_get_keys """ assert len(self.alice.self_get_address()) == ADDR_SIZE assert len(self.bob.self_get_address()) == ADDR_SIZE self.alice.self_set_nospam(0x12345678) assert self.alice.self_get_nospam() == 0x12345678 self.loop(50) if hasattr(self.alice, 'self_get_keys'): pk, sk = self.alice.self_get_keys() assert pk == self.alice.self_get_address()[:CLIENT_ID_SIZE] def test_status_message(self) -> None: # works """ t:self_get_status_message t:self_get_status_message_size """ MSG = 'Happy' self.alice.self_set_status_message(MSG) self.loop(100) assert self.alice.self_get_status_message() == MSG, \ self.alice.self_get_status_message() +' is not ' +MSG assert self.alice.self_get_status_message_size() == len(MSG) def test_self_get_udp_port(self) -> None: # works """ t:self_get_udp_port """ otox = self.bob if hasattr(oTOX_OPTIONS, 'udp_port') and oTOX_OPTIONS.udp_port: o = self.alice.self_get_udp_port() LOG.info('self_get_udp_port alice ' +repr(o)) assert o > 0 o = self.bob.self_get_udp_port() LOG.info('self_get_udp_port bob ' +repr(o)) assert o > 0 def test_self_get_tcp_port(self) -> None: # works """ t:self_get_tcp_port """ if hasattr(oTOX_OPTIONS, 'tcp_port') and oTOX_OPTIONS.tcp_port: # errors if tcp_port <= 0 o = self.alice.self_get_tcp_port() LOG.info('self_get_tcp_port ' +repr(o)) o = self.bob.self_get_tcp_port() LOG.info('self_get_tcp_port ' +repr(o)) def test_get_dht_id(self) -> None: # works """ t:self_get_dht_id """ o1 = self.alice.self_get_dht_id() assert len(o1) == 64 o2 = self.bob.self_get_dht_id() assert len(o2) == 64 def test_bob_add_alice_as_friend_norequest(self) -> None: # works """ t:friend_delete t:friend_exists t:friend_add_norequest t:friend_get_public_key t:self_get_friend_list t:self_get_friend_list_size """ i = len(self.bob.self_get_friend_list()) assert self.bob_add_alice_as_friend_norequest() assert len(self.bob.self_get_friend_list()) == i + 1 #: Test last online assert self.bob.friend_get_last_online(self.baid) is not None if hasattr(self, 'baid') and self.baid >= 0: self.bob.friend_delete(self.baid) def test_alice_add_bob_as_friend_norequest(self) -> None: # works - intermittent failures """ t:friend_delete t:friend_exists t:friend_get_public_key t:self_get_friend_list t:self_get_friend_list_size """ i = len(self.alice.self_get_friend_list()) assert self.alice_add_bob_as_friend_norequest() assert len(self.alice.self_get_friend_list()) == i + 1 #: Test last online assert self.alice.friend_get_last_online(self.abid) is not None if hasattr(self, 'abid') and self.abid >= 0: self.alice.friend_delete(self.abid) def test_both_add_as_friend_norequest(self) -> None: # works """ t:friend_delete t:friend_exists t:friend_get_public_key t:self_get_friend_list t:self_get_friend_list_size """ try: self.both_add_as_friend_norequest() assert len(self.bob.self_get_friend_list()) > 0 assert len(self.alice.self_get_friend_list()) > 0 except AssertionError as e: LOG.error(f"Failed test {e}") raise except Exception as e: LOG.error(f"Failed test {e}") raise finally: if hasattr(self, 'baid') and self.baid >= 0: self.bob.friend_delete(self.baid) assert len(self.bob.self_get_friend_list()) == 0 if hasattr(self, 'abid') and self.abid >= 0: self.alice.friend_delete(self.abid) assert len(self.alice.self_get_friend_list()) == 0 def test_loop_until_connected(self) -> None: # works assert self.loop_until_connected() def test_bob_add_alice_as_friend_and_status(self) -> None: """ t:friend_delete t:friend_exists t:friend_get_public_key t:self_get_friend_list t:self_get_friend_list_size """ self.bob_add_alice_as_friend_and_status() if hasattr(self, 'baid') and self.baid >= 0: self.bob.friend_delete(self.baid) @unittest.skip('unfinished') def test_alice_add_bob_as_friend_and_status(self) -> None: assert self.alice_add_bob_as_friend_and_status() if hasattr(self, 'abid') and self.abid >= 0: self.alice.friend_delete(self.abid) def test_bob_assert_connection_status(self) -> None: # works if self.bob.self_get_connection_status() == TOX_CONNECTION['NONE']: AssertionError("ERROR: NOT CONNECTED " \ +repr(self.bob.self_get_connection_status())) def test_alice_assert_connection_status(self) -> None: # works if self.alice.self_get_connection_status() == TOX_CONNECTION['NONE']: AssertionError("ERROR: NOT CONNECTED " \ +repr(self.alice.self_get_connection_status())) def test_bob_assert_self_conn_status(self) -> None: # works if self.bob.self_conn_status == False: AssertionError("ERROR: NOT CONNECTED " \ +repr(self.bob.self_conn_status)) def test_alice_assert_self_conn_status(self) -> None: # works if self.alice.self_conn_status == False: AssertionError("ERROR: NOT CONNECTED " \ +repr(self.alice.self_conn_status)) def test_bob_add_alice_as_friend(self) -> None: # works? try: if self.bob._args.norequest: assert self.bob_add_alice_as_friend_norequest() else: assert self.bob_add_alice_as_friend() #: Test last online assert self.bob.friend_get_last_online(self.baid) is not None except AssertionError as e: LOG.error(f"Failed test {e}") raise except Exception as e: LOG.error(f"Failed test {e}") raise finally: if hasattr(self, 'baid') and self.baid >= 0: self.bob.friend_delete(self.baid) if len(self.bob.self_get_friend_list()) > 0: LOG.warn(f"WTF bob.self_get_friend_list() {bob.self_get_friend_list()}") def test_alice_add_bob_as_friend(self) -> None: # works! try: if self.bob._args.norequest: assert self.alice_add_bob_as_friend_norequest() else: assert self.alice_add_bob_as_friend() #: Test last online assert self.alice.friend_get_last_online(self.abid) is not None except AssertionError as e: #WTF? if hasattr(self, 'abid') and self.abid >= 0: self.alice.friend_delete(self.abid) LOG.error(f"Failed test {e}") raise except Exception as e: #WTF? LOG.error(f"test_alice_add_bob_as_friend EXCEPTION {e}") raise finally: if hasattr(self, 'baid') and self.baid >= 0: self.bob.friend_delete(self.baid) if hasattr(self, 'abid') and self.abid >= 0: self.alice.friend_delete(self.abid) if len(self.alice.self_get_friend_list()) > 0: LOG.warn(f"WTF alice.self_get_friend_list() {alice.self_get_friend_list()}") def test_both_add_as_friend(self) -> None: # works try: if self.bob._args.norequest: assert self.both_add_as_friend_norequest() else: assert self.both_add_as_friend() except AssertionError as e: LOG.warn(f"Failed test {e}") raise except Exception as e: LOG.error(f"test_both_add_as_friend EXCEPTION {e}") raise finally: if hasattr(self,'baid') and self.baid >= 0: self.bob.friend_delete(self.baid) if hasattr(self,'abid') and self.abid >= 0: self.alice.friend_delete(self.abid) def test_groups_join(self) -> None: """ t:group_join t:group_disconnect t:group_leave t:group_self_set_name """ if not self.get_connection_status(): LOG.warning(f"test_groups_join NOT CONNECTED") self.loop_until_connected(self.bob) iGrp = self.otox_test_groups_join(self.bob) LOG.info(f"test_groups_join iGrp={iGrp}") assert iGrp >= 0, f"test_groups_join iGrp={iGrp}" try: self.bob.group_disconnect(iGrp) except Exception as e: LOG.error(f"bob.group_disconnect EXCEPTION {e}") raise try: self.bob.group_leave(iGrp, None) except Exception as e: LOG.error(f"bob.group_leave EXCEPTION {e}") raise def test_groups(self) -> None: """ t:group_new t:group_disconnect t:group_get_name t:group_get_name_size t:group_get_topic t:group_get_topic_size t:group_get_privacy_state t:group_self_set_name t:group_get_number_groups t:group_founder_set_password t:group_founder_set_peer_limit t:group_founder_set_privacy_state t:group_get_chat_id t:group_get_password t:group_get_password_size t:group_get_peer_limit t:group_invite_accept t:group_invite_friend t:group_is_connected t:group_leave t:group_mod_set_role """ iGrp = self.otox_test_groups(self.bob) LOG.info(f"test_groups iGrp={iGrp}") if iGrp >= 0: try: self.bob.group_disconnect(iGrp) except Exception as e: LOG.error(f"bob.group_disconnect EXCEPTION {e}") raise try: self.bob.group_leave(iGrp, None) except Exception as e: LOG.error(f"bob.group_leave EXCEPTION {e}") raise #! @expectedFail('fails') # assertion fails on == MSG def test_on_friend_status_message(self) -> None: # fails """ t:self_set_status_message t:self_get_status_message t:self_get_status_message_size t:friend_set_status_message t:friend_get_status_message t:friend_get_status_message_size t:on_friend_status_message """ MSG = 'Happy' sSlot = 'friend_status_message' def bob_on_friend_status_message(iTox, friend_id, new_status_message, new_status_size, *largs) -> None: LOG_INFO(f"BOB_ON_friend_status_message friend_id={friend_id} " \ +f"new_status_message={new_status_message}") try: assert new_status_size == len(MSG), new_status_size if type(new_status_message) == bytes: assert str(new_status_message[:new_status_size], 'UTF-8') == MSG, \ f"new_status_message={new_status_message}" elif type(new_status_message) == str: assert new_status_message[:new_status_size] == MSG, \ f"new_status_message={new_status_message}" else: # new_status_message= raise RuntimeError("unrecognized type type(new_status_message)") assert friend_id == self.baid except Exception as e: LOG_ERROR(f"BOB_ON_friend_status_message EXCEPTION {e}") setattr(self.bob, sSlot, True) setattr(self.bob, sSlot, None) try: if self.bob._args.norequest: assert self.bob_add_alice_as_friend_norequest() assert self.alice_add_bob_as_friend_norequest() else: # no not connected error assert self.bob_add_alice_as_friend() assert self.alice_add_bob_as_friend_norequest() # need a friend connected? if not self.get_connection_status(): LOG.warning(f"test_groups_join NOT CONNECTED") self.loop_until_connected(self.bob) self.bob.callback_friend_status_message(bob_on_friend_status_message) self.warn_if_no_cb(self.bob, sSlot) status_message = bytes(MSG, 'utf-8') self.alice.self_set_status_message(status_message) if not self.wait_otox_attrs(self.bob, [sSlot]): raise AssertionError(f"on_friend_status_message NO {sSlot}") if not self.bob.friend_get_status_message(self.baid) == MSG: LOG.warn(f"message={self.bob.friend_get_status_message(self.baid)}") if not self.bob.friend_get_status_message_size(self.baid) == len(MSG): LOG.warn(f"message_len={self.bob.friend_get_status_message_size(self.baid)}") except AssertionError as e: LOG.error(f"test_on_friend_status_message FAILED {e}") raise except Exception as e: LOG.error(f"test_on_friend_status_message EXCEPTION {e}") raise finally: self.bob.callback_friend_status(None) if hasattr(self, 'baid') and self.baid >= 0: self.bob.friend_delete(self.baid) if hasattr(self, 'abid') and self.abid >= 0: self.alice.friend_delete(self.abid) def test_friend(self) -> None: # works! sometimes """ t:friend_get_name t:friend_get_name_size t:on_friend_name """ try: #: Test friend request if self.bob._args.norequest: assert self.bob_add_alice_as_friend_norequest() assert self.alice_add_bob_as_friend_norequest() else: # no not connected error assert self.bob_add_alice_as_friend() assert self.alice_add_bob_as_friend_norequest() a = self.alice.self_get_address()[:CLIENT_ID_SIZE] assert self.bob.friend_get_public_key(self.baid) == a, \ LOG.error(f"test_friend BAID {a}") del a #: Test friend_get_public_key b = self.bob.self_get_address()[:CLIENT_ID_SIZE] assert self.alice.friend_get_public_key(self.abid) == b, \ LOG.error(f"test_friend ABID {b}") del b except AssertionError as e: LOG.error(f"Failed test {e}") raise except Exception as e: LOG.error(f"test_friend EXCEPTION {e}") raise finally: if hasattr(self, 'baid') and self.baid >= 0: self.bob.friend_delete(self.baid) if hasattr(self, 'abid') and self.abid >= 0: self.alice.friend_delete(self.abid) @expectedFail('fails') # assert self.bob.friend_get_status(self.baid) == TOX_USER_STATUS['BUSY'] def test_user_status(self) -> None: # fails """ t:self_get_status t:self_set_status t:friend_get_status t:friend_get_status t:on_friend_status """ sSlot = 'friend_status' setattr(self.bob, sSlot, None) def bobs_on_friend_set_status(iTox, friend_id, new_status, *largs) -> None: LOG_INFO(f"bobs_on_friend_set_status {friend_id} {new_status}") try: assert friend_id == self.baid assert new_status in [TOX_USER_STATUS['BUSY'], TOX_USER_STATUS['AWAY']] except Exception as e: LOG_WARN(f"bobs_on_friend_set_status EXCEPTION {e}") setattr(self.bob, sSlot, True) try: if self.bob._args.norequest: assert self.bob_add_alice_as_friend_norequest() else: assert self.bob_add_alice_as_friend() if not self.get_connection_status(): self.loop_until_connected(self.bob) self.bob.callback_friend_status(bobs_on_friend_set_status) self.warn_if_no_cb(self.bob, sSlot) sSTATUS = TOX_USER_STATUS['BUSY'] self.alice.self_set_status(sSTATUS) if not self.wait_otox_attrs(self.bob, [sSlot]): # malloc(): unaligned tcache chunk detected LOG_WARN raise AssertionError(f'test_user_status NO {sSlot}') assert self.bob.friend_get_status(self.baid) == TOX_USER_STATUS['BUSY'], \ f"friend_get_status {self.bob.friend_get_status(self.baid)} != {TOX_USER_STATUS['BUSY']}" except AssertionError as e: LOG.error(f"test_user_status FAILED {e}") raise except Exception as e: LOG.error(f"test_user_status EXCEPTION {e}") raise finally: self.bob.callback_friend_status(None) self.warn_if_cb(self.bob, sSlot) if hasattr(self, 'baid') and self.baid >= 0: self.bob.friend_delete(self.baid) @unittest.skip('crashes') def test_kill_remake(self) -> None: """ t:friend_get_kill_remake t:on_friend_connection_status """ sSlot = 'friend_connection_status' setattr(self.bob, sSlot, None) def bobs_on_friend_connection_status(iTox, friend_id, iStatus, *largs): LOG_INFO(f"bobs_on_friend_connection_status " +repr(iStatus)) try: assert friend_id == self.baid except Exception as e: LOG_ERROR(f"bobs_on_friend_connection_status ERROR {e}") setattr(self.bob, sSlot, True) otox = self.bob opts = oTestsToxOptions(otox._args) setattr(self.bob, sSlot, True) try: if self.bob._args.norequest: assert self.bob_add_alice_as_friend_norequest() else: assert self.bob_add_alice_as_friend() self.bob.callback_friend_connection_status(bobs_on_friend_connection_status) LOG.info("test_kill_remake killing alice") self.alice.kill() #! bang LOG.info("test_kill_remake making alice") self.alice = Tox(opts, app=oAPP) LOG.info("test_kill_remake maked alice") if not self.wait_otox_attrs(self.bob, [sSlot]): raise AssertionError(f'test_kill_remake NO {sSlot}') except AssertionError as e: LOG.error(f"test_kill_remake Failed test {e}") raise except Exception as e: LOG.error(f"bobs_on_friend_connection_status {e}") raise finally: self.bob.callback_friend_connection_status(None) if hasattr(self, 'baid') and self.baid >= 0: self.bob.friend_delete(self.baid) def test_alice_typing_status(self) -> None: # works """ t:on_friend_read_receipt t:on_friend_typing t:self_set_typing t:friend_get_typing t:friend_get_last_online """ sSlot = 'friend_typing' LOG.info("test_typing_status bob adding alice") #: Test typing status def bob_on_friend_typing(iTox, fid:int, is_typing, *largs) -> None: try: assert fid == self.baid if is_typing is True: LOG_INFO(f"BOB_ON_friend_typing is_typing={is_typing} fid={fid}") assert self.bob.friend_get_typing(fid) is True, \ 'self.bob.friend_get_typing' else: LOG_DEBUG(f"BOB_ON_friend_typing is_typing={is_typing} fid={fid}") except Exception as e: LOG_ERROR(f"BOB_ON_friend_typing {e}") setattr(self.bob, sSlot, True) setattr(self.bob, sSlot, None) try: if self.bob._args.norequest: assert self.both_add_as_friend_norequest() else: assert self.both_add_as_friend() if not self.get_connection_status(): LOG.warning(f"test_friend_typing_status NOT CONNECTED") self.loop_until_connected(self.bob) self.bob.callback_friend_typing(bob_on_friend_typing) self.warn_if_no_cb(self.bob, sSlot) self.alice.self_set_typing(self.abid, True) if not self.wait_otox_attrs(self.bob, [sSlot]): raise AssertionError(f"bobs_on_friend_typing NO {sSlot}") except AssertionError as e: LOG.error(f"Failed test {e}") raise except Exception as e: LOG.error(f"test_alice_typing_status error={e}") raise finally: self.bob.callback_friend_typing(None) if hasattr(self, 'baid') and self.baid >= 0: self.bob.friend_delete(self.baid) if hasattr(self, 'abid') and self.abid >= 0: self.alice.friend_delete(self.abid) @expectedFail('fails') # new name is empty def test_friend_name(self) -> None: # works! """ t:self_set_name t:friend_get_name t:friend_get_name_size t:on_friend_name """ sSlot= 'friend_name' #: Test friend request #: Test friend name NEWNAME = 'Jenny' def bobs_on_friend_name(iTox, fid:int, newname, iNameSize, *largs) -> None: LOG_INFO(f"bobs_on_friend_name {sSlot} {fid}") try: assert fid == self.baid assert str(newname, 'UTF-8') == NEWNAME except Exception as e: LOG_ERROR(f"bobs_on_friend_name EXCEPTION {e}") setattr(self.bob, sSlot, True) setattr(self.bob, sSlot, None) try: LOG.info("test_friend_name") if self.bob._args.norequest: assert self.bob_add_alice_as_friend_norequest() else: assert self.bob_add_alice_as_friend() if not self.get_connection_status(): LOG.warning(f"test_friend_typing NOT CONNECTED") self.loop_until_connected(self.bob) self.bob.callback_friend_name(bobs_on_friend_name) self.warn_if_no_cb(self.bob, sSlot) self.alice.self_set_name(NEWNAME) if not self.wait_otox_attrs(self.bob, [sSlot]): raise AssertionError(f"bobs_on_friend_name NO {sSlot}") # name=None assert self.bob.friend_get_name(self.baid) == NEWNAME, \ f"{self.bob.friend_get_name(self.baid)} != {NEWNAME}" assert self.bob.friend_get_name_size(self.baid) == len(NEWNAME), \ f"{self.bob.friend_get_name_size(self.baid)} != {len(NEWNAME)}" except AssertionError as e: LOG.error(f"test_friend_name Failed test {e}") raise except Exception as e: LOG.error(f"test_friend EXCEPTION {e}") raise finally: if hasattr(self, 'baid') and self.baid >= 0: self.bob.friend_delete(self.baid) self.bob.callback_friend_name(None) self.warn_if_cb(self.bob, sSlot) # https://github.com/TokTok/c-toxcore/issues/1338 # If you node is on the internet, you can check if it works with this test page https://nodes.tox.chat/test. It sends some packets to you node and checks if it gets the correct response back. If it's in a private network, you can telnet into your node telnet 33445 to at least make sure there is something accepting TCP connections on there (I guess nmap works too). # torsocks telnet 33445 # ERROR torsocks[2573639]: Unable to resolve. Status reply: 4 (in socks5_recv_resolve_reply() at socks5.c:677) # Trying # ERROR torsocks[2573639]: General SOCKS server failure (in socks5_recv_connect_reply() at socks5.c:527) # telnet: connect to address Connection refused # # This client is currently not connected to the friend. @expectedFail('fails works! sometimes?') def test_friend_action(self) -> None: # works! sometimes? """ t:on_friend_action t:on_friend_message t:friend_send_message """ #: Test action ACTION = 'Kick' sSlot = 'friend_read_action' setattr(self.bob, sSlot, None) def UNUSEDtheir_on_friend_action(iTox, fid:int, msg_type, action, *largs): LOG_DEBUG(f"their_on_friend_action {fid} {msg_type} {sSlot} {action}") try: assert msg_type == TOX_MESSAGE_TYPE['ACTION'] assert action == ACTION except Exception as e: LOG_ERROR(f"their_on_friend_action EXCEPTION {sSlot} {e}") else: LOG_INFO(f"their_on_friend_action {sSlot} {action}") setattr(self.bob, sSlot, True) sSlot = 'friend_read_receipt' setattr(self.alice, sSlot, None) def their_on_read_reciept(iTox, fid:int, msg_id, *largs) -> None: LOG_DEBUG(f"their_on_read_reciept {fid} {msg_id}") sSlot = 'friend_read_receipt' try: # should be the receivers id if hasattr(bob, 'baid'): assert fid == bob.baid setattr(self.bob, sSlot, True) elif hasattr(alice, 'abid'): assert fid == alice.abid setattr(self.alice, sSlot, True) assert msg_id >= 0 except Exception as e: LOG_ERROR(f"their_on_read_reciept {sSlot} {e}") else: LOG_INFO(f"their_on_read_reciept {sSlot} fid={fid}") try: if self.bob._args.norequest: assert self.both_add_as_friend_norequest() else: assert self.both_add_as_friend() if not self.wait_friend_get_connection_status(self.bob, self.baid, n=2*iN): raise AssertionError(f"bob NOT CONNECTED baid={self.baid}, n={2*iN}") if not self.wait_friend_get_connection_status(self.alice, self.abid, n=2*iN): raise AssertionError(f"alice NOT CONNECTED abid={self.abid}, n={2*iN}") self.bob.callback_friend_read_receipt(their_on_read_reciept) #was their_on_friend_action self.alice.callback_friend_read_receipt(their_on_read_reciept) #was their_on_friend_action self.warn_if_no_cb(self.bob, 'friend_read_receipt') self.warn_if_no_cb(self.alice, 'friend_read_receipt') if True: iMsg = self.bob.friend_send_message(self.baid, TOX_MESSAGE_TYPE['ACTION'], bytes(ACTION, 'UTF-8')) assert iMsg >= 0 else: assert self.wait_ensure_exec(self.bob.friend_send_message, [self.baid, TOX_MESSAGE_TYPE['ACTION'], bytes(ACTION, 'UTF-8')]) if not self.wait_otox_attrs(self.alice, [sSlot]): raise AssertionError(f"alice test_friend_action NO {sSlot}") except AssertionError as e: LOG.error(f"Failed test {e}") raise except ArgumentError as e: # ArgumentError('This client is currently NOT CONNECTED to the friend.') # dunno LOG.warning(f"test_friend_action {e}") except Exception as e: LOG.error(f"test_friend_action {e}") raise finally: self.alice.callback_friend_read_receipt(None) if hasattr(self, 'baid') and self.baid >= 0: self.bob.friend_delete(self.baid) if hasattr(self, 'abid') and self.abid >= 0: self.alice.friend_delete(self.abid) @unittest.skipIf( not os.environ.get('TOR_CONTROLLER_PASSWORD', ''), \ "Requires tor controller password") def test_onion_intros(self) -> None: # timeout intermittently # introduction points are needed for onion services if self.bob._args.proxy_type == 2: target = 'zkaan2xfbuxia2wpf7ofnkbz6r5zdbbvxbunvp5g2iebopbfc4iqmbad' controller = so.oGetStemController(log_level=10) i = self.bob._args.test_timeout l = so.lIntroductionPoints(controller, [target], itimeout=i) if l: LOG_INFO(f"test_onion_intros len={len(l)}") else: LOG_WARN(f"test_onion_intros len={len(l)}") # This client is currently not connected to the friend. @expectedFail('bob NOT CONNECTED baid') # passes clearnet fails tor = no def test_friend_message(self) -> None: # fails intermittently """ t:on_friend_action t:on_friend_message t:friend_send_message """ # @unittest.skipIf( oTOX_OARGS.proxy_type == 2, 'Fails over Tor') #: Test message MSG = 'Hi, Bob!' sSlot = 'friend_message' def alices_on_friend_message(iTox, fid:int, msg_type, message, iSize, *largs) -> None: LOG_DEBUG(f"alices_on_friend_message {fid} {message}") try: assert fid == self.alice.abid assert msg_type == TOX_MESSAGE_TYPE['NORMAL'] assert str(message, 'UTF-8') == MSG except Exception as e: LOG_ERROR(f"alices_on_friend_message EXCEPTION {e}") else: LOG_INFO(f"alices_on_friend_message {message}") setattr(self.alice, sSlot, True) setattr(self.alice, sSlot, None) self.alice.callback_friend_message(None) try: if self.bob._args.norequest: assert self.both_add_as_friend_norequest() else: assert self.both_add_as_friend() # should we loop until connection status if not self.wait_friend_get_connection_status(self.bob, self.baid, n=3*iN): raise AssertionError(f"bob NOT CONNECTED baid={self.baid}, n={3*iN}") if not self.wait_friend_get_connection_status(self.alice, self.abid, n=3*iN): raise AssertionError(f"alice NOT CONNECTED baid={self.abid}, n={3*iN}") self.alice.callback_friend_message(alices_on_friend_message) self.warn_if_no_cb(self.alice, sSlot) # dunno - both This client is currently NOT CONNECTED to the friend # ArgumentError: This client is currently not connected to the friend. iMesId = self.bob.friend_send_message(self.baid, TOX_MESSAGE_TYPE['NORMAL'], bytes(MSG, 'UTF-8')) assert iMesId >= 0, "iMesId >= 0" if not self.wait_otox_attrs(self.alice, [sSlot]): raise AssertionError(f"alices_on_friend_message NO {sSlot}") except ArgumentError as e: # ArgumentError('This client is currently NOT CONNECTED to the friend.') # dunno LOG.error(f"test_friend_message ArgumentError {e}") raise except AssertionError as e: LOG.error(f"test_friend_message AssertionError {e}") raise except Exception as e: LOG.error(f"test_friend_message EXCEPTION {e}") raise finally: self.alice.callback_friend_message(None) self.warn_if_cb(self.alice, sSlot) if hasattr(self, 'baid') and self.baid >= 0: self.bob.friend_delete(self.baid) if hasattr(self, 'abid') and self.abid >= 0: self.alice.friend_delete(self.abid) @expectedFail('unfinished') def test_file_transfer(self) -> None: # unfinished """ t:file_send t:file_send_chunk t:file_control t:file_seek t:file_get_file_id t:on_file_recv t:on_file_recv_control t:on_file_recv_chunk t:on_file_chunk_request """ if self.bob._args.norequest: assert self.both_add_as_friend_norequest() else: assert self.both_add_as_friend() FILE_NUMBER = 1 FILE = os.urandom(1024 * 1024) FILE_NAME = b"/tmp/test.bin" if not os.path.exists(FILE_NAME): with open(FILE_NAME, 'wb') as oFd: oFd.write(FILE) FILE_SIZE = len(FILE) OFFSET = 567 # was FILE_ID = FILE_NAME FILE_ID = 32*'1' # m = hashlib.md5() m.update(FILE[OFFSET:]) FILE_DIGEST = m.hexdigest() CONTEXT = { 'FILE': bytes(), 'RECEIVED': 0, 'START': False, 'SENT': 0 } def alice_on_file_recv(iTox, fid:int, file_number:int, kind, size, filename) -> None: LOG_DEBUG(f"ALICE_ON_file_recv fid={fid} {file_number}") try: assert size == FILE_SIZE assert filename == FILE_NAME retv = self.alice.file_seek(fid, file_number, OFFSET) assert retv is True self.alice.file_control(fid, file_number, TOX_FILE_CONTROL['RESUME']) except Exception as e: LOG_ERROR(f"ALICE_ON_file_recv {e}") else: LOG_INFO(f"ALICE_ON_file_recv " + str(fid)) def alice_on_file_recv_control(iTox, fid:int, file_number, control, *largs) -> None: # TOX_FILE_CONTROL = { 'RESUME': 0, 'PAUSE': 1, 'CANCEL': 2,} LOG_DEBUG(f"ALICE_ON_file_recv_control fid={fid} {file_number} {control}") try: assert FILE_NUMBER == file_number # FixMe _FINISHED? if False and control == TOX_FILE_CONTROL['RESUME']: # assert CONTEXT['RECEIVED'] == FILE_SIZE # m = hashlib.md5() # m.update(CONTEXT['FILE']) # assert m.hexdigest() == FILE_DIGEST self.alice.completed = True except Exception as e: LOG_ERROR(f"ALICE_ON_file_recv {e}") else: LOG_INFO(f"ALICE_ON_file_recv " + str(fid)) self.alice.completed = False def alice_on_file_recv_chunk(iTox, fid:int, file_number:int, position:int, iNumBytes, *largs) -> bool: LOG_DEBUG(f"ALICE_ON_file_recv_chunk {fid} {file_number}") # FixMe - use file_number and iNumBytes to get data? data = '' LOG_INFO(f"ALICE_ON_file_recv_chunk {fid}") try: if data is None: assert CONTEXT['RECEIVED'] == (FILE_SIZE - OFFSET) m = hashlib.md5() m.update(CONTEXT['FILE']) assert m.hexdigest() == FILE_DIGEST self.alice.completed = True self.alice.file_control(fid, file_number, TOX_FILE_CONTROL['CANCEL']) return True CONTEXT['FILE'] += data CONTEXT['RECEIVED'] += len(data) # if CONTEXT['RECEIVED'] < FILE_SIZE: # assert self.file_data_remaining( # fid, file_number, 1) == FILE_SIZE - CONTEXT['RECEIVED'] except Exception as e: LOG_ERROR(f"ALICE_ON_file_recv_chunk {e}") return False return True # AliceTox.on_file_send_request = on_file_send_request # AliceTox.on_file_control = on_file_control # AliceTox.on_file_data = on_file_data try: # required? if not self.wait_friend_get_connection_status(self.bob, self.baid, n=2*iN): LOG_WARN(f"bobs wait_friend_get_connection_status {2*iN}") if not self.wait_friend_get_connection_status(self.alice, self.abid, n=2*iN): LOG_WARN(f"alices' wait_friend_get_connection_status {2*iN}") self.alice.callback_file_recv(alice_on_file_recv) self.alice.callback_file_recv_control(alice_on_file_recv_control) self.alice.callback_file_recv_chunk(alice_on_file_recv_chunk) self.bob.completed = False def bob_on_file_recv_control2(iTox, fid:int, file_number:int, control) -> None: LOG_DEBUG(f"BOB_ON_file_recv_control2 {fid} {file_number} control={control}") if control == TOX_FILE_CONTROL['RESUME']: CONTEXT['START'] = True elif control == TOX_FILE_CONTROL['CANCEL']: self.bob.completed = True pass def bob_on_file_chunk_request(iTox, fid:int, file_number:int, position:int, length, *largs) -> None: LOG_DEBUG(f"BOB_ON_file_chunk_request {fid} {file_number}") if length == 0: return data = FILE[position:(position + length)] self.bob.file_send_chunk(fid, file_number, position, data) sSlot = 'file_recv_control' self.bob.callback_file_recv_control(bob_on_file_recv_control2) self.bob.callback_file_chunk_request(bob_on_file_chunk_request) i = 0 iKind = 0 while i < 2: i += 1 try: FN = self.bob.file_send(self.baid, iKind, FILE_SIZE, FILE_ID, FILE_NAME) LOG.info(f"test_file_transfer bob.file_send {FN}") except ArgumentError as e: LOG.debug(f"test_file_transfer bob.file_send {e} {i}") # ctypes.ArgumentError: This client is currently not connected to the friend raise else: break self.loop(100) sleep(1) else: LOG.error(f"test_file_transfer bob.file_send 2") raise AssertionError(f"test_file_transfer bob.file_send {self.bob._args.test_timeout // 2}") # UINT32_MAX try: FID = self.bob.file_get_file_id(self.baid, FN) hexFID = "".join([hex(ord(c))[2:].zfill(2) for c in FILE_NAME]) assert FID.startswith(hexFID.upper()) except Exception as e: LOG.warn(f"test_file_transfer:: {FILE_NAME} {hexFID} {e}") LOG.debug('\n' + traceback.format_exc()) if not self.wait_otox_attrs(self.bob, ['completed']): LOG_WARN(f"test_file_transfer Bob NO completed") return False if not self.wait_otox_attrs(self.alice, ['completed']): LOG_WARN(f"test_file_transfer Alice NO completed") return False return True except (ArgumentError, ValueError,) as e: # ValueError: non-hexadecimal number found in fromhex() arg at position 0 LOG.error(f"test_file_transfer: {e}") raise except Exception as e: LOG.error(f"test_file_transfer:: {e}") LOG.debug('\n' + traceback.format_exc()) raise finally: self.alice.callback_file_recv(None) self.alice.callback_file_recv_control(None) self.alice.callback_file_recv_chunk(None) self.bob.callback_file_recv_control(None) self.bob.callback_file_chunk_request(None) if hasattr(self, 'baid') and self.baid >= 0: self.bob.friend_delete(self.baid) if hasattr(self, 'abid') and self.abid >= 0: self.alice.friend_delete(self.abid) LOG_INFO(f"test_file_transfer:: self.wait_objs_attr completed") @unittest.skip('crashes') def test_tox_savedata(self) -> None: # """ t:get_savedata_size t:get_savedata """ # Fatal Python error: Aborted # "/var/local/src/toxygen_wrapper/wrapper/tox.py", line 180 in kill global oTOX_OARGS assert self.alice.get_savedata_size() > 0 data = self.alice.get_savedata() assert data is not None addr = self.alice.self_get_address() # self._address try: LOG.info("test_tox_savedata alice.kill") # crashes self.alice.kill() del self.alice except: pass oArgs = self.alice._args opts = oTestsToxOptions(oArgs) opts.savedata_data = data opts.savedata_length = len(data) self.alice = Tox(tox_options=opts) if addr != self.alice.self_get_address(): LOG.warning("test_tox_savedata " + f"{addr} != {self.alice.self_get_address()}") else: LOG.info("passed test_tox_savedata") def test_kill(self) -> None: # import threading LOG.info(f"THE END {threading.active_count()}") self.tearDown() LOG.info(f"THE END {threading.enumerate()}") def vOargsToxPreamble(oArgs, Tox, ToxTest) -> None: ts.vSetupLogging(oArgs) methods = set([x for x in dir(Tox) if not x[0].isupper() and not x[0] == '_']) docs = "".join([getattr(ToxTest, x).__doc__ for x in dir(ToxTest) if getattr(ToxTest, x).__doc__ is not None]) tested = set(re.findall(r't:(.*?)\n', docs)) not_tested = methods.difference(tested) logging.info('Test Coverage: %.2f%%' % (len(tested) * 100.0 / len(methods))) if len(not_tested): logging.info('Not tested:\n %s' % "\n ".join(sorted(list(not_tested)))) ### def iMain(oArgs, failfast=True) -> int: # collect_types.init_types_collection() vOargsToxPreamble(oArgs, Tox, ToxSuite) # https://stackoverflow.com/questions/35930811/how-to-sort-unittest-testcases-properly/35930812#35930812 cases = ts.suiteFactory(*ts.caseFactory([ToxSuite])) if color_runner: runner = color_runner.runner.TextTestRunner(verbosity=2, failfast=failfast) else: runner = unittest.TextTestRunner(verbosity=2, failfast=failfast, warnings='ignore') # with collect_types.collect(): runner.run(cases) # collect_types.dump_stats('tests_wrapper.out') def oTestsToxOptions(oArgs): data = None tox_options = toxygen_wrapper.tox.Tox.options_new() if oArgs.proxy_type: tox_options.contents.proxy_type = int(oArgs.proxy_type) tox_options.contents.proxy_host = bytes(oArgs.proxy_host, 'UTF-8') tox_options.contents.proxy_port = int(oArgs.proxy_port) tox_options.contents.udp_enabled = oArgs.udp_enabled = False else: tox_options.contents.udp_enabled = oArgs.udp_enabled if not os.path.exists('/proc/sys/net/ipv6'): oArgs.ipv6_enabled = False else: tox_options.contents.ipv6_enabled = oArgs.ipv6_enabled tox_options.contents.tcp_port = int(oArgs.tcp_port) tox_options.contents.dht_announcements_enabled = oArgs.dht_announcements_enabled tox_options.contents.hole_punching_enabled = oArgs.hole_punching_enabled # overrides tox_options.contents.local_discovery_enabled = False tox_options.contents.experimental_thread_safety = False # REQUIRED!! if oArgs.ipv6_enabled and not os.path.exists('/proc/sys/net/ipv6'): LOG.warning('Disabling IPV6 because /proc/sys/net/ipv6 does not exist' + repr(oArgs.ipv6_enabled)) tox_options.contents.ipv6_enabled = False else: tox_options.contents.ipv6_enabled = bool(oArgs.ipv6_enabled) if data: # load existing profile tox_options.contents.savedata_type = enums.TOX_SAVEDATA_TYPE['TOX_SAVE'] tox_options.contents.savedata_data = c_char_p(data) tox_options.contents.savedata_length = len(data) else: # create new profile tox_options.contents.savedata_type = enums.TOX_SAVEDATA_TYPE['NONE'] tox_options.contents.savedata_data = None tox_options.contents.savedata_length = 0 return tox_options def oArgparse(lArgv): global THRESHOLD if os.environ.get('socks_proxy'): THRESHOLD = 150 # >25 else: THRESHOLD = 30 # >25 parser = ts.oMainArgparser() parser.add_argument('--norequest',type=str, default='False', choices=['True','False'], help='Use _norequest during testing') parser.add_argument('--test_timeout',type=int, default=THRESHOLD, help='Test timeout during testing') parser.add_argument('profile', type=str, nargs='?', default=None, help='Path to Tox profile') oArgs = parser.parse_args(lArgv) ts.clean_booleans(oArgs) THRESHOLD = oArgs.test_timeout if hasattr(oArgs, 'sleep'): if oArgs.sleep == 'qt': pass # broken or gevent.sleep(idle_period) elif oArgs.sleep == 'gevent': pass # broken or gevent.sleep(idle_period) else: oArgs.sleep = 'time' return oArgs def main(lArgs=None) -> int: global oTOX_OARGS global bIS_LOCAL global THRESHOLD if lArgs is None: lArgs = sys.argv[1:] oArgs = oArgparse(lArgs) bIS_LOCAL = oArgs.network in ['newlocal', 'localnew', 'local'] THRESHOLD = oArgs.test_timeout oTOX_OARGS = oArgs setattr(oTOX_OARGS, 'bIS_LOCAL', bIS_LOCAL) bIS_LOCAL = True setattr(oTOX_OARGS, 'bIS_LOCAL', bIS_LOCAL) # oTOX_OPTIONS = ToxOptions() global oTOX_OPTIONS oTOX_OPTIONS = oTestsToxOptions(oArgs) if coloredlogs: # https://pypi.org/project/coloredlogs/ coloredlogs.install(level=oArgs.loglevel, logger=LOG, # %(asctime)s,%(msecs)03d %(hostname)s [%(process)d] fmt='%(name)s %(levelname)s %(message)s' ) else: logging.basicConfig(level=oArgs.loglevel) # logging.INFO return iMain(oArgs) if __name__ == '__main__': sys.exit(main(sys.argv[1:] ))