diff --git a/wrapper/tox.py b/wrapper/tox.py index c7138dc..738eaed 100644 --- a/wrapper/tox.py +++ b/wrapper/tox.py @@ -1,4 +1,11 @@ # -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*- + +# ctypes wrapping of libtoxcore + +# WIP: - all functions are being changed to accept strings or byres for variables +# the library will use as bytes, and return sstrings not bytes for things +# you will use as strings. YMMV. + from ctypes import * from datetime import datetime from typing import Union, Callable, Union @@ -7,12 +14,12 @@ try: from wrapper.libtox import LibToxCore from wrapper.toxav import ToxAV from wrapper.toxcore_enums_and_consts import * - import wrapper.toxcore_enums_and_consts as enums + import wrapper.toxcore_enums_and_consts as enum except: from libtox import LibToxCore from toxav import ToxAV from toxcore_enums_and_consts import * - import toxcore_enums_and_consts as enums + import toxcore_enums_and_consts as enum # callbacks can be called in any thread so were being careful # tox.py can be called by callbacks @@ -667,7 +674,7 @@ class Tox: ' returned from tox_friend_add_norequest.') if tox_err_friend_add == TOX_ERR_FRIEND_ADD['OWN_KEY']: raise ArgumentError('The friend address belongs to the sending client.') - elif tox_err_friend_add == TOX_ERR_FRIEND_ADD['ALREADY_SENT']: + if tox_err_friend_add == TOX_ERR_FRIEND_ADD['ALREADY_SENT']: raise ArgumentError('A friend request has already been sent, or the address belongs to a friend that is' ' already on the friend list.') if tox_err_friend_add == TOX_ERR_FRIEND_ADD['BAD_CHECKSUM']: @@ -805,7 +812,7 @@ class Tox: Tox.libtoxcore.tox_self_get_friend_list(self._tox_pointer, friend_list) return friend_list[0:friend_list_size] - def friend_get_public_key(self, friend_number: int, public_key: str=None) -> str: + def friend_get_public_key(self, friend_number: int, public_key: Union[bytes,None]=None) -> str: """ Copies the Public Key associated with a given friend number to a byte array. @@ -826,6 +833,7 @@ class Tox: return bin_to_string(public_key, TOX_PUBLIC_KEY_SIZE) elif tox_err_friend_get_public_key == TOX_ERR_FRIEND_GET_PUBLIC_KEY['FRIEND_NOT_FOUND']: raise ArgumentError('No friend with the given number exists on the friend list.') + raise ToxError('The function did not return OK') def friend_get_last_online(self, friend_number: int) -> int: """ @@ -938,12 +946,13 @@ class Tox: tox_err_friend_query = tox_err_friend_query.value if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['OK']: return int(result) - elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['NULL']: + if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['NULL']: raise ArgumentError('The pointer parameter for storing the query result (name, message) was NULL. Unlike' ' the `_self_` variants of these functions, which have no effect when a parameter is' ' NULL, these functions return an error in that case.') - elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['FRIEND_NOT_FOUND']: + if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['FRIEND_NOT_FOUND']: raise ArgumentError('The friend_number did not designate a valid friend.') + raise ToxError('The function did not return OK') def friend_get_status_message(self, friend_number: int, status_message=None) -> str: """ @@ -1020,11 +1029,11 @@ class Tox: tox_err_friend_query = tox_err_friend_query.value if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['OK']: return int(result) - elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['NULL']: + if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['NULL']: raise ArgumentError('The pointer parameter for storing the query result (name, message) was NULL. Unlike' ' the `_self_` variants of these functions, which have no effect when a parameter is' ' NULL, these functions return an error in that case.') - elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['FRIEND_NOT_FOUND']: + if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['FRIEND_NOT_FOUND']: raise ArgumentError('The friend_number did not designate a valid friend.') raise ToxError('The function did not return OK.') @@ -1069,11 +1078,11 @@ class Tox: tox_err_friend_query = tox_err_friend_query.value if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['OK']: return int(result) - elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['NULL']: + if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['NULL']: raise ArgumentError('The pointer parameter for storing the query result (name, message) was NULL. Unlike' ' the `_self_` variants of these functions, which have no effect when a parameter is' ' NULL, these functions return an error in that case.') - elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['FRIEND_NOT_FOUND']: + if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['FRIEND_NOT_FOUND']: raise ArgumentError('The friend_number did not designate a valid friend.') raise ToxError('The function did not return OK for friend get connection status.') @@ -1200,23 +1209,25 @@ class Tox: message = bytes(message, 'utf-8') tox_err_friend_send_message = c_int() LOG_DEBUG(f"tox.friend_send_message") - result = Tox.libtoxcore.tox_friend_send_message(self._tox_pointer, c_uint32(friend_number), - c_int(message_type), c_char_p(message), c_size_t(len(message)), + result = Tox.libtoxcore.tox_friend_send_message(self._tox_pointer, + c_uint32(friend_number), + c_int(message_type), + c_char_p(message), c_size_t(len(message)), byref(tox_err_friend_send_message)) tox_err_friend_send_message = tox_err_friend_send_message.value if tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['OK']: return int(result) - elif tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['NULL']: + if tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['NULL']: raise ArgumentError('One of the arguments to the function was NULL when it was not expected.') - elif tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['FRIEND_NOT_FOUND']: + if tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['FRIEND_NOT_FOUND']: raise ArgumentError('The friend number did not designate a valid friend.') - elif tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['FRIEND_NOT_CONNECTED']: + if tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['FRIEND_NOT_CONNECTED']: raise ArgumentError('This client is currently not connected to the friend.') - elif tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['SENDQ']: + if tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['SENDQ']: raise MemoryError('An allocation error occurred while increasing the send queue size.') - elif tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['TOO_LONG']: + if tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['TOO_LONG']: raise ArgumentError('Message length exceeded TOX_MAX_MESSAGE_LENGTH.') - elif tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['EMPTY']: + if tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['EMPTY']: raise ArgumentError('Attempted to send a zero-length message.') raise ToxError('The function did not return OK for friend send message.') @@ -1444,8 +1455,9 @@ class Tox: if error.value == TOX_ERR_FILE_GET['OK']: return bin_to_string(file_id, TOX_FILE_ID_LENGTH) s = sGetError(error.value, TOX_ERR_FILE_GET) - LOG_ERROR(f"group_new {error.value} {s}") - raise ArgumentError(f"group_new {error.value} {s}") + LOG_ERROR(f"group_new err={error.value} {s}") + # have seen ArgumentError: group_new 3 NOT_FOUND + raise ArgumentError(f"group_new err={error.value} {s}") # File transmission: sending @@ -1702,20 +1714,20 @@ class Tox: tox_err_friend_custom_packet = tox_err_friend_custom_packet.value if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['OK']: return bool(result) - elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['NULL']: + if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['NULL']: raise ArgumentError('One of the arguments to the function was NULL when it was not expected.') - elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['FRIEND_NOT_FOUND']: + if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['FRIEND_NOT_FOUND']: raise ArgumentError('The friend number did not designate a valid friend.') - elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['FRIEND_NOT_CONNECTED']: + if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['FRIEND_NOT_CONNECTED']: raise ArgumentError('This client is currently not connected to the friend.') - elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['INVALID']: + if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['INVALID']: raise ArgumentError('The first byte of data was not in the specified range for the packet type.' 'This range is 200-254 for lossy, and 160-191 for lossless packets.') - elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['EMPTY']: + if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['EMPTY']: raise ArgumentError('Attempted to send an empty packet.') - elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['TOO_LONG']: + if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['TOO_LONG']: raise ArgumentError('Packet data length exceeded TOX_MAX_CUSTOM_PACKET_SIZE.') - elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['SENDQ']: + if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['SENDQ']: raise ToxError('Packet queue is full.') raise ToxError('The function did not return OK') @@ -1740,21 +1752,22 @@ class Tox: tox_err_friend_custom_packet = tox_err_friend_custom_packet.value if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['OK']: return bool(result) - elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['NULL']: + if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['NULL']: raise ArgumentError('One of the arguments to the function was NULL when it was not expected.') - elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['FRIEND_NOT_FOUND']: + if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['FRIEND_NOT_FOUND']: raise ArgumentError('The friend number did not designate a valid friend.') - elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['FRIEND_NOT_CONNECTED']: + if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['FRIEND_NOT_CONNECTED']: raise ArgumentError('This client is currently not connected to the friend.') - elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['INVALID']: + if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['INVALID']: raise ArgumentError('The first byte of data was not in the specified range for the packet type.' 'This range is 200-254 for lossy, and 160-191 for lossless packets.') - elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['EMPTY']: + if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['EMPTY']: raise ArgumentError('Attempted to send an empty packet.') - elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['TOO_LONG']: + if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['TOO_LONG']: raise ArgumentError('Packet data length exceeded TOX_MAX_CUSTOM_PACKET_SIZE.') - elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['SENDQ']: + if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['SENDQ']: raise ToxError('Packet queue is full.') + raise ToxError('The function did not return OK') def callback_friend_lossy_packet(self, callback: Callable) -> None: """ @@ -1890,7 +1903,7 @@ class Tox: if error.value: s = sGetError(error.value, TOX_ERR_GROUP_NEW) - LOG_ERROR(f"group_new {error.value} {s}") + LOG_ERROR(f"group_new err={error.value} {s}") raise ToxError(f"group_new {s} err={error.value}") # TypeError: '<' not supported between instances of 'c_uint' and 'int' @@ -1971,7 +1984,7 @@ class Tox: byref(error)) if error.value: s = sGetError(error.value, TOX_ERR_GROUP_RECONNECT) - LOG_ERROR(f"group_new {error.value} {s}") + LOG_ERROR(f"group_new err={error.value} {s}") raise ToxError(f"group_new {s} err={error.value}") return bool(result) @@ -1997,7 +2010,7 @@ class Tox: result = Tox.libtoxcore.tox_group_disconnect(self._tox_pointer, c_uint32(group_number), byref(error)) if error.value: s = sGetError(error.value, TOX_ERR_GROUP_DISCONNECT) - LOG_ERROR(f"group_disconnect {error.value} {s}") + LOG_ERROR(f"group_disconnect err={error.value} {s}") raise ToxError(f"group_disconnect {s} err={error.value}") return bool(result) @@ -2051,9 +2064,12 @@ class Tox: error = c_int() if type(name) != bytes: - topic = bytes(name, 'utf-8') + name = bytes(name, 'utf-8') LOG_DEBUG(f"tox.group_self_set_name") - result = Tox.libtoxcore.tox_group_self_set_name(self._tox_pointer, c_uint32(group_number), name, c_size_t(len(name)), byref(error)) + result = Tox.libtoxcore.tox_group_self_set_name(self._tox_pointer, + c_uint32(group_number), + c_char_p(name), c_size_t(len(name)), + byref(error)) if error.value: LOG_ERROR(f"group_self_set_name err={error.value}") raise ToxError("group_self_set_name err={error.value}") @@ -2346,7 +2362,7 @@ class Tox: Tox.libtoxcore.tox_callback_group_peer_name(self._tox_pointer, self.group_peer_name_cb) except Exception as e: # AttributeError LOG_ERROR(f"tox.callback_conference_peer_name") - return None + return def callback_group_peer_status(self, callback: Callable, user_data) -> int: """ @@ -2369,7 +2385,7 @@ class Tox: Tox.libtoxcore.tox_callback_group_peer_status(self._tox_pointer, self.group_peer_status_cb) except Exception as e: LOG_WARN(f"callback_group_peer_status Exception {e}") - return None + return # Group chat state queries and events. @@ -2712,7 +2728,7 @@ class Tox: # Group message sending - def group_send_custom_packet(self, group_number: int, lossless: bool, data) -> bool: + def group_send_custom_packet(self, group_number: int, lossless: bool, data: bytes) -> bool: """Send a custom packet to the group. If lossless is true the packet will be lossless. Lossless @@ -2786,8 +2802,8 @@ class Tox: byref(error)) if error.value: s = sGetError(error.value, TOX_ERR_GROUP_SEND_PRIVATE_MESSAGE) - LOG_ERROR(f"group_send_private_message {error.value} {s}") - raise ToxError(f"group_send_private_message {error.value} {s}") + LOG_ERROR(f"group_send_private_message err={error.value} {s}") + raise ToxError(f"group_send_private_message err={error.value} {s}") return bool(result) @@ -2829,8 +2845,8 @@ class Tox: if error.value: s = sGetError(error.value, TOX_ERR_GROUP_SEND_MESSAGE) - LOG_ERROR(f"group_send_message {error.value} {s}") - raise ToxError(f"group_send_message {error.value} {s}") + LOG_ERROR(f"group_send_message err={error.value} {s}") + raise ToxError(f"group_send_message err={error.value} {s}") return bool(result) @@ -2915,8 +2931,8 @@ class Tox: result = Tox.libtoxcore.tox_group_invite_friend(self._tox_pointer, c_uint(group_number), c_uint32(friend_number), byref(error)) if error.value: s = sGetError(error.value, TOX_ERR_GROUP_INVITE_FRIEND) - LOG_ERROR(f"group_invite_friend {error.value} {s}") - raise ToxError(f"group_invite_friend {error.value} {s}") + LOG_ERROR(f"group_invite_friend err={error.value} {s}") + raise ToxError(f"group_invite_friend err={error.value} {s}") return bool(result) # API change - this no longer exists @@ -2976,7 +2992,7 @@ class Tox: raise ToxError(f"group_invite_accept ERROR {e}") if error.value: s = sGetError(error.value, TOX_ERR_GROUP_INVITE_ACCEPT) - LOG_ERROR(f"group_invite_friend {error.value} {s}") + LOG_ERROR(f"group_invite_friend err={error.value} {s}") raise ToxError(f"group_invite_accept {s} err={error.value}") return result @@ -3136,7 +3152,7 @@ class Tox: byref(error)) if error.value: s = sGetError(error.value, TOX_ERR_GROUP_FOUNDER_SET_PASSWORD) - LOG_ERROR(f"group_founder_set_password {error.value} {s}") + LOG_ERROR(f"group_founder_set_password err={error.value} {s}") raise ToxError(f"group_founder_set_password {s} err={error.value}") return bool(result) diff --git a/wrapper/toxav.py b/wrapper/toxav.py index 8474acb..84f26f9 100644 --- a/wrapper/toxav.py +++ b/wrapper/toxav.py @@ -42,10 +42,10 @@ class ToxAV: toxav_err_new = toxav_err_new.value if toxav_err_new == enum.TOXAV_ERR_NEW['NULL']: raise ArgumentError('One of the arguments to the function was NULL when it was not expected.') - elif toxav_err_new == enum.TOXAV_ERR_NEW['MALLOC']: + if toxav_err_new == enum.TOXAV_ERR_NEW['MALLOC']: raise MemoryError('Memory allocation failure while trying to allocate structures required for the A/V ' 'session.') - elif toxav_err_new == enum.TOXAV_ERR_NEW['MULTIPLE']: + if toxav_err_new == enum.TOXAV_ERR_NEW['MULTIPLE']: raise RuntimeError('Attempted to create a second session for the same Tox instance.') self.call_state_cb = None @@ -124,6 +124,7 @@ class ToxAV: raise ArgumentError('Attempted to call a friend while already in an audio or video call with them.') elif toxav_err_call == enum.TOXAV_ERR_CALL['INVALID_BIT_RATE']: raise ArgumentError('Audio or video bit rate is invalid.') + raise ArgumentError('The function did not return OK') def callback_call(self, callback: Callable, user_data) -> None: """ @@ -169,18 +170,19 @@ class ToxAV: toxav_err_answer = toxav_err_answer.value if toxav_err_answer == enum.TOXAV_ERR_ANSWER['OK']: return bool(result) - elif toxav_err_answer == enum.TOXAV_ERR_ANSWER['SYNC']: + if toxav_err_answer == enum.TOXAV_ERR_ANSWER['SYNC']: raise RuntimeError('Synchronization error occurred.') - elif toxav_err_answer == enum.TOXAV_ERR_ANSWER['CODEC_INITIALIZATION']: + if toxav_err_answer == enum.TOXAV_ERR_ANSWER['CODEC_INITIALIZATION']: raise RuntimeError('Failed to initialize codecs for call session. Note that codec initiation will fail if ' 'there is no receive callback registered for either audio or video.') - elif toxav_err_answer == enum.TOXAV_ERR_ANSWER['FRIEND_NOT_FOUND']: + if toxav_err_answer == enum.TOXAV_ERR_ANSWER['FRIEND_NOT_FOUND']: raise ArgumentError('The friend number did not designate a valid friend.') - elif toxav_err_answer == enum.TOXAV_ERR_ANSWER['FRIEND_NOT_CALLING']: + if toxav_err_answer == enum.TOXAV_ERR_ANSWER['FRIEND_NOT_CALLING']: raise ArgumentError('The friend was valid, but they are not currently trying to initiate a call. This is ' 'also returned if this client is already in a call with the friend.') - elif toxav_err_answer == enum.TOXAV_ERR_ANSWER['INVALID_BIT_RATE']: + if toxav_err_answer == enum.TOXAV_ERR_ANSWER['INVALID_BIT_RATE']: raise ArgumentError('Audio or video bit rate is invalid.') + raise ToxError('The function did not return OK') # Call state graph @@ -224,16 +226,17 @@ class ToxAV: toxav_err_call_control = toxav_err_call_control.value if toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['OK']: return bool(result) - elif toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['SYNC']: + if toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['SYNC']: raise RuntimeError('Synchronization error occurred.') - elif toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['FRIEND_NOT_FOUND']: + if toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['FRIEND_NOT_FOUND']: raise ArgumentError('The friend_number passed did not designate a valid friend.') - elif toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['FRIEND_NOT_IN_CALL']: + if toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['FRIEND_NOT_IN_CALL']: raise RuntimeError('This client is currently not in a call with the friend. Before the call is answered, ' 'only CANCEL is a valid control.') - elif toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['INVALID_TRANSITION']: + if toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['INVALID_TRANSITION']: raise RuntimeError('Happens if user tried to pause an already paused call or if trying to resume a call ' 'that is not paused.') + raise ToxError('The function did not return OK.') # TODO Controlling bit rates @@ -267,22 +270,23 @@ class ToxAV: toxav_err_send_frame = toxav_err_send_frame.value if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['OK']: return bool(result) - elif toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['NULL']: + if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['NULL']: raise ArgumentError('The samples data pointer was NULL.') - elif toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['FRIEND_NOT_FOUND']: + if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['FRIEND_NOT_FOUND']: raise ArgumentError('The friend_number passed did not designate a valid friend.') - elif toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['FRIEND_NOT_IN_CALL']: + if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['FRIEND_NOT_IN_CALL']: raise RuntimeError('This client is currently not in a call with the friend.') - elif toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['SYNC']: + if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['SYNC']: raise RuntimeError('Synchronization error occurred.') - elif toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['INVALID']: + if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['INVALID']: raise ArgumentError('One of the frame parameters was invalid. E.g. the resolution may be too small or too ' 'large, or the audio sampling rate may be unsupported.') - elif toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['PAYLOAD_TYPE_DISABLED']: + if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['PAYLOAD_TYPE_DISABLED']: raise RuntimeError('Either friend turned off audio or video receiving or we turned off sending for the said' 'payload.') - elif toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['RTP_FAILED']: + if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['RTP_FAILED']: RuntimeError('Failed to push frame through rtp interface.') + raise ToxError('The function did not return OK.') def video_send_frame(self, friend_number: int, width: int, height: int, y, u, v) -> None: """ diff --git a/wrapper/toxencryptsave.py b/wrapper/toxencryptsave.py index a19aab3..407eebd 100644 --- a/wrapper/toxencryptsave.py +++ b/wrapper/toxencryptsave.py @@ -46,13 +46,14 @@ class ToxEncryptSave: tox_err_encryption = tox_err_encryption.value if tox_err_encryption == enum.TOX_ERR_ENCRYPTION['OK']: return out[:] - elif tox_err_encryption == enum.TOX_ERR_ENCRYPTION['NULL']: + if tox_err_encryption == enum.TOX_ERR_ENCRYPTION['NULL']: raise ArgumentError('Some input data, or maybe the output pointer, was null.') - elif tox_err_encryption == enum.TOX_ERR_ENCRYPTION['KEY_DERIVATION_FAILED']: + if tox_err_encryption == enum.TOX_ERR_ENCRYPTION['KEY_DERIVATION_FAILED']: raise RuntimeError('The crypto lib was unable to derive a key from the given passphrase, which is usually a' ' lack of memory issue. The functions accepting keys do not produce this error.') - elif tox_err_encryption == enum.TOX_ERR_ENCRYPTION['FAILED']: + if tox_err_encryption == enum.TOX_ERR_ENCRYPTION['FAILED']: raise RuntimeError('The encryption itself failed.') + raise ToxError('The function did not return OK.') def pass_decrypt(self, data: bytes, password: str) -> bytes: """ diff --git a/wrapper/toxygen_echo.py b/wrapper/toxygen_echo.py index 2b446d1..0f2f136 100644 --- a/wrapper/toxygen_echo.py +++ b/wrapper/toxygen_echo.py @@ -16,7 +16,6 @@ import traceback import threading import random from ctypes import * -import argparse import time # LOG=util.log @@ -32,7 +31,7 @@ def LOG_trace(a): pass # print('TRAC_ '+a) import wrapper import wrapper.toxcore_enums_and_consts as enums -from wrapper.tox import Tox, UINT32_MAX, ToxError +from wrapper.tox import Tox, UINT32_MAX from wrapper.toxcore_enums_and_consts import TOX_CONNECTION, TOX_USER_STATUS, \ TOX_MESSAGE_TYPE, TOX_PUBLIC_KEY_SIZE, TOX_FILE_CONTROL, TOX_FILE_KIND @@ -54,7 +53,6 @@ except ImportError as e: # logging.log(logging.DEBUG, f"coloredlogs not available: {e}") coloredlogs = None -import wrapper_tests.support_testing as ts if 'USER' in os.environ: sDATA_FILE = '/tmp/logging_toxygen_' +os.environ['USER'] +'.tox' elif 'USERNAME' in os.environ: @@ -276,9 +274,9 @@ class EchoBot(): LOG.info('on_friend_request Accepted.') save_to_file(self._tox, sDATA_FILE) - def on_friend_message(self, friendId, type, message) -> None: + def on_friend_message(self, friendId, message_type , message) -> None: name = self._tox.friend_get_name(friendId) - LOG.debug('%s: %s' % (name, message)) + LOG.debug(f"{name}, {message}, {message_type}") yMessage = bytes(message, 'UTF-8') self._tox.friend_send_message(friendId, TOX_MESSAGE_TYPE['NORMAL'], yMessage) LOG.info('EchoBot sent: %s' % message) @@ -311,7 +309,7 @@ class EchobotTox(Tox): def __init__(self, opts, app=None): - super(EchobotTox, self).__init__(opts, app=app) + super().__init__(opts, app=app) self._address = self.self_get_address() self.name = 'pyechobot' self._opts = opts diff --git a/wrapper_tests/socks.py b/wrapper_tests/socks.py index 748fa8e..fa1b25e 100644 --- a/wrapper_tests/socks.py +++ b/wrapper_tests/socks.py @@ -90,7 +90,7 @@ _socks4errors = ("request granted", "request rejected because the client program and identd report different user-ids", "unknown error") -def setdefaultproxy(proxytype=None, addr=None, port=None, rdns=True, username=None, password=None): +def setdefaultproxy(proxytype=None, addr=None, port=None, rdns=True, username=None, password=None) -> None: """setdefaultproxy(proxytype, addr[, port[, rdns[, username[, password]]]]) Sets a default proxy which all further socksocket objects will use, unless explicitly changed. @@ -98,7 +98,7 @@ def setdefaultproxy(proxytype=None, addr=None, port=None, rdns=True, username=No global _defaultproxy _defaultproxy = (proxytype, addr, port, rdns, username, password) -def wrapmodule(module): +def wrapmodule(module) -> None: """wrapmodule(module) Attempts to replace a module's socket library with a SOCKS socket. Must set a default proxy using setdefaultproxy(...) first. @@ -272,7 +272,7 @@ class socksocket(socket.socket): """ return self.__proxypeername - def __negotiatesocks4(self,destaddr,destport): + def __negotiatesocks4(self,destaddr,destport) -> None: """__negotiatesocks4(self,destaddr,destport) Negotiates a connection through a SOCKS4 server. """ @@ -320,7 +320,7 @@ class socksocket(socket.socket): else: self.__proxypeername = (destaddr, destport) - def __negotiatehttp(self, destaddr, destport): + def __negotiatehttp(self, destaddr, destport) -> None: """__negotiatehttp(self,destaddr,destport) Negotiates a connection through an HTTP server. """ @@ -354,7 +354,7 @@ class socksocket(socket.socket): self.__proxysockname = ("0.0.0.0", 0) self.__proxypeername = (addr, destport) - def connect(self, destpair): + def connect(self, destpair) -> None: """connect(self, despair) Connects to the specified destination through a proxy. destpar - A tuple of the IP/DNS address and the port number. diff --git a/wrapper_tests/support_http.py b/wrapper_tests/support_http.py index d60ccbf..7323bc6 100644 --- a/wrapper_tests/support_http.py +++ b/wrapper_tests/support_http.py @@ -22,7 +22,7 @@ except ImportError: lNO_PROXY = ['localhost', '127.0.0.1'] CONNECT_TIMEOUT = 20.0 -def bAreWeConnected(): +def bAreWeConnected() -> bool: # FixMe: Linux only sFile = f"/proc/{os.getpid()}/net/route" if not os.path.isfile(sFile): return None @@ -33,7 +33,7 @@ def bAreWeConnected(): i += 1 return i > 0 -def pick_up_proxy_from_environ(): +def pick_up_proxy_from_environ() -> dict: retval = dict() if os.environ.get('socks_proxy', ''): # socks_proxy takes precedence over https/http @@ -68,7 +68,7 @@ def pick_up_proxy_from_environ(): retval['udp_enabled'] = True return retval -def download_url(url, settings=None): +def download_url(url:str, settings:str = None) -> None: if not bAreWeConnected(): return '' if settings is None: diff --git a/wrapper_tests/support_onions.py b/wrapper_tests/support_onions.py index ba1d182..ce501e9 100644 --- a/wrapper_tests/support_onions.py +++ b/wrapper_tests/support_onions.py @@ -8,6 +8,7 @@ import shutil import socket import sys import time +from typing import Union, Callable, Union if False: import cepa as stem @@ -224,7 +225,7 @@ yKNOWN_NODNS = """ # - w.digidow.eu # - w.cccs.de -def oMakeController(sSock='', port=9051): +def oMakeController(sSock:str = '', port:int = 9051): import getpass if sSock and os.path.exists(sSock): controller = Controller.from_socket_file(path=sSock) @@ -236,7 +237,7 @@ def oMakeController(sSock='', port=9051): return controller oSTEM_CONTROLER = None -def oGetStemController(log_level=10, sock_or_pair='/run/tor/control'): +def oGetStemController(log_level:int = 10, sock_or_pair:str = '/run/tor/control'): global oSTEM_CONTROLER if oSTEM_CONTROLER: return oSTEM_CONTROLER @@ -268,7 +269,7 @@ def oGetStemController(log_level=10, sock_or_pair='/run/tor/control'): LOG.debug(f"{controller}") return oSTEM_CONTROLER -def bAreWeConnected(): +def bAreWeConnected() -> bool: # FixMe: Linux only sFile = f"/proc/{os.getpid()}/net/route" if not os.path.isfile(sFile): return None @@ -279,7 +280,7 @@ def bAreWeConnected(): i += 1 return i > 0 -def sMapaddressResolv(target, iPort=9051, log_level=10): +def sMapaddressResolv(target:str, iPort:int = 9051, log_leve:int = 10) -> str: if not stem: LOG.warn('please install the stem Python package') return '' @@ -295,7 +296,7 @@ def sMapaddressResolv(target, iPort=9051, log_level=10): LOG.exception(e) return '' -def vwait_for_controller(controller, wait_boot=10): +def vwait_for_controller(controller, wait_boot:int = 10) -> None: if bAreWeConnected() is False: raise SystemExit("we are not connected") percent = i = 0 @@ -308,12 +309,12 @@ def vwait_for_controller(controller, wait_boot=10): time.sleep(5) i += 5 -def bin_to_hex(raw_id, length=None): +def bin_to_hex(raw_id:int, length: Union[int, None] = None) -> str: if length is None: length = len(raw_id) res = ''.join('{:02x}'.format(raw_id[i]) for i in range(length)) return res.upper() -def lIntroductionPoints(controller=None, lOnions=[], itimeout=120, log_level=10): +def lIntroductionPoints(controller=None, lOnions:list = [], itimeout:int = 120, log_level:int = 10): """now working !!! stem 1.8.x timeout must be huge >120 'Provides the descriptor for a hidden service. The **address** is the '.onion' address of the hidden service ' @@ -379,7 +380,7 @@ def lIntroductionPoints(controller=None, lOnions=[], itimeout=120, log_level=10) LOG.exception(e) return l -def zResolveDomain(domain): +def zResolveDomain(domain:str) -> int: try: ip = sTorResolve(domain) except Exception as e: # noqa @@ -396,13 +397,13 @@ def zResolveDomain(domain): ip = lpair[0] return ip -def sTorResolve(target, - verbose=False, - sHost='127.0.0.1', - iPort=9050, - SOCK_TIMEOUT_SECONDS=10.0, - SOCK_TIMEOUT_TRIES=3, - ): +def sTorResolve(target:str, + verbose:bool = False, + sHost:str = '127.0.0.1', + iPort:int = 9050, + SOCK_TIMEOUT_SECONDS:float = 10.0, + SOCK_TIMEOUT_TRIES:int = 3, + ) -> str: MAX_INFO_RESPONSE_PACKET_LENGTH = 8 if '@' in target: LOG.warn(f"sTorResolve failed invalid hostname {target}") @@ -469,7 +470,7 @@ def sTorResolve(target, return '' -def getaddrinfo(sHost, sPort): +def getaddrinfo(sHost:str, sPort:str) -> list: # do this the explicit way = Ive seen the compact connect fail # >>> sHost, sPort = 'l27.0.0.1', 33446 # >>> sock.connect((sHost, sPort)) @@ -486,7 +487,7 @@ def getaddrinfo(sHost, sPort): return None return lPair -def icheck_torrc(sFile, oArgs): +def icheck_torrc(sFile:str, oArgs) -> int: l = open(sFile, 'rt').readlines() a = {} for elt in l: @@ -528,7 +529,7 @@ def icheck_torrc(sFile, oArgs): print('VirtualAddrNetworkIPv4 172.16.0.0/12') return 0 -def lExitExcluder(oArgs, iPort=9051, log_level=10): +def lExitExcluder(oArgs, iPort:int = 9051, log_level:int = 10) -> list: """ https://raw.githubusercontent.com/nusenu/noContactInfo_Exit_Excluder/main/exclude_noContactInfo_Exits.py """ diff --git a/wrapper_tests/support_testing.py b/wrapper_tests/support_testing.py index a1e8b27..5d430bd 100644 --- a/wrapper_tests/support_testing.py +++ b/wrapper_tests/support_testing.py @@ -17,6 +17,7 @@ import unittest from ctypes import * from random import Random import functools +from typing import Union, Callable, Union random = Random() @@ -54,11 +55,11 @@ except ImportError: global LOG LOG = logging.getLogger() -def LOG_ERROR(l): print('ERRORc: '+l) -def LOG_WARN(l): print('WARNc: ' +l) -def LOG_INFO(l): print('INFOc: ' +l) -def LOG_DEBUG(l): print('DEBUGc: '+l) -def LOG_TRACE(l): pass # print('TRACE+ '+l) +def LOG_ERROR(l:str) -> None: print('ERRORc: '+l) +def LOG_WARN(l:str) -> None: print('WARNc: ' +l) +def LOG_INFO(l:str) -> None: print('INFOc: ' +l) +def LOG_DEBUG(l:str) -> None: print('DEBUGc: '+l) +def LOG_TRACE(l:str) -> None: pass # print('TRACE+ '+l) try: from trepan.api import debug @@ -71,7 +72,7 @@ else: def trepan_handler(num=None, f=None): connection_opts={'IO': 'TCP', 'PORT': 6666} intf = Mserver.ServerInterface(connection_opts=connection_opts) - dbg_opts = { 'interface': intf } + dbg_opts = {'interface': intf } print(f'Starting TCP server listening on port 6666.') debug(dbg_opts=dbg_opts) return @@ -132,8 +133,7 @@ lDEAD_BS = [ 'tox.abilinski.com', '172.103.164.250', '172.103.164.250.tpia.cipherkey.com', ] - -def assert_main_thread(): +def assert_main_thread() -> None: from PyQt5 import QtCore, QtWidgets from qtpy.QtWidgets import QApplication @@ -144,7 +144,7 @@ def assert_main_thread(): raise RuntimeError('attempt to call MainWindow.append_message from non-app thread') @contextlib.contextmanager -def ignoreStdout(): +def ignoreStdout() -> None: devnull = os.open(os.devnull, os.O_WRONLY) old_stdout = os.dup(1) sys.stdout.flush() @@ -157,7 +157,7 @@ def ignoreStdout(): os.close(old_stdout) @contextlib.contextmanager -def ignoreStderr(): +def ignoreStderr() -> None: devnull = os.open(os.devnull, os.O_WRONLY) old_stderr = os.dup(2) sys.stderr.flush() @@ -169,7 +169,7 @@ def ignoreStderr(): os.dup2(old_stderr, 2) os.close(old_stderr) -def clean_booleans(oArgs): +def clean_booleans(oArgs) -> None: for key in lBOOLEANS: if not hasattr(oArgs, key): continue val = getattr(oArgs, key) @@ -179,11 +179,11 @@ def clean_booleans(oArgs): else: setattr(oArgs, key, True) -def on_log(iTox, level, filename, line, func, message, *data): +def on_log(iTox, level, filename, line, func, message, *data) -> None: # LOG.debug(repr((level, filename, line, func, message,))) tox_log_cb(level, filename, line, func, message) -def tox_log_cb(level, filename, line, func, message, *args): +def tox_log_cb(level, filename, line, func, message, *args) -> None: """ * @param level The severity of the log message. * @param filename The source file from which the message originated. @@ -245,7 +245,7 @@ def tox_log_cb(level, filename, line, func, message, *args): else: LOG_TRACE(f"{level}: {message}") -def vAddLoggerCallback(tox_options, callback=None): +def vAddLoggerCallback(tox_options, callback=None) -> None: if callback is None: wrapper.tox.Tox.libtoxcore.tox_options_set_log_callback( tox_options._options_pointer, @@ -259,7 +259,7 @@ def vAddLoggerCallback(tox_options, callback=None): tox_options._options_pointer, tox_options.self_logger_cb) -def get_video_indexes(): +def get_video_indexes() -> list: # Linux return [str(l[5:]) for l in os.listdir('/dev/') if l.startswith('video')] @@ -394,7 +394,7 @@ def oMainArgparser(_=None, iMode=0): # help='En/Disable saving history') return parser -def vSetupLogging(oArgs): +def vSetupLogging(oArgs) -> None: global LOG logging._defaultFormatter = logging.Formatter(datefmt='%m-%d %H:%M:%S') logging._defaultFormatter.default_time_format = '%m-%d %H:%M:%S' @@ -430,7 +430,7 @@ def vSetupLogging(oArgs): LOG.info(f"Setting loglevel to {oArgs.loglevel!s}") -def setup_logging(oArgs): +def setup_logging(oArgs) -> None: global LOG if coloredlogs: aKw = dict(level=oArgs.loglevel, @@ -459,7 +459,7 @@ def setup_logging(oArgs): # LOG.trace = lambda l: LOG.log(0, repr(l)) LOG.info(f"Setting loglevel to {oArgs.loglevel!s}") -def signal_handler(num, f): +def signal_handler(num, f) -> None: from trepan.api import debug from trepan.interfaces import server as Mserver connection_opts={'IO': 'TCP', 'PORT': 6666} @@ -469,7 +469,7 @@ def signal_handler(num, f): debug(dbg_opts=dbg_opts) return -def merge_args_into_settings(args, settings): +def merge_args_into_settings(args:list, settings:dict) -> None: if args: if not hasattr(args, 'audio'): LOG.warn('No audio ' +repr(args)) @@ -493,7 +493,7 @@ def merge_args_into_settings(args, settings): clean_settings(settings) return -def clean_settings(self): +def clean_settings(self:dict) -> None: # failsafe to ensure C tox is bytes and Py settings is str # overrides @@ -528,7 +528,7 @@ def clean_settings(self): LOG.debug("Cleaned settings") -def lSdSamplerates(iDev): +def lSdSamplerates(iDev:int) -> list: try: import sounddevice as sd except ImportError: @@ -546,7 +546,7 @@ def lSdSamplerates(iDev): supported_samplerates.append(fs) return supported_samplerates -def _get_nodes_path(oArgs): +def _get_nodes_path(oArgs:str): if oArgs and hasattr(oArgs, 'nodes_json') and \ oArgs.nodes_json and os.path.isfile(oArgs.nodes_json): default = oArgs.nodes_json @@ -565,9 +565,9 @@ aNODES = {} # @functools.lru_cache(maxsize=12) TypeError: unhashable type: 'Namespace' def generate_nodes(oArgs=None, - nodes_count=DEFAULT_NODES_COUNT, - ipv='ipv4', - udp_not_tcp=True): + nodes_count:int = DEFAULT_NODES_COUNT, + ipv:str = 'ipv4', + udp_not_tcp=True) -> dict: global aNODES sKey = ipv sKey += ',0' if udp_not_tcp else ',1' @@ -584,11 +584,11 @@ def generate_nodes(oArgs=None, return aNODES[sKey] aNODES_CACHE = {} -def generate_nodes_from_file(sFile, - nodes_count=DEFAULT_NODES_COUNT, - ipv='ipv4', - udp_not_tcp=True, - ): +def generate_nodes_from_file(sFile:str, + nodes_count:int = DEFAULT_NODES_COUNT, + ipv:str = 'ipv4', + udp_not_tcp:bool = True, + ) -> dict: """https://github.com/TokTok/c-toxcore/issues/469 I had a conversation with @irungentoo on IRC about whether we really need to call tox_bootstrap() when having UDP disabled and why. The answer is yes, because in addition to TCP relays (tox_add_tcp_relay()), toxcore also needs to know addresses of UDP onion nodes in order to work correctly. The DHT, however, is not used when UDP is disabled. tox_bootstrap() function resolves the address passed to it as argument and calls onion_add_bs_node_path() and DHT_bootstrap() functions. Although calling DHT_bootstrap() is not really necessary as DHT is not used, we still need to resolve the address of the DHT node in order to populate the onion routes with onion_add_bs_node_path() call. """ @@ -637,7 +637,7 @@ I had a conversation with @irungentoo on IRC about whether we really need to cal LOG.debug(f"generate_nodes_from_file {sFile} len={len(sorted_nodes)}") return sorted_nodes -def tox_bootstrapd_port(): +def tox_bootstrapd_port() -> int: port = 33446 sFile = '/etc/tox-bootstrapd.conf' if os.path.exists(sFile): @@ -647,7 +647,7 @@ def tox_bootstrapd_port(): port = int(line[7:]) return port -def bootstrap_local(elts, lToxes, oArgs=None): +def bootstrap_local(elts:list, lToxes:list, oArgs=None): if os.path.exists('/run/tox-bootstrapd/tox-bootstrapd.pid'): LOG.debug('/run/tox-bootstrapd/tox-bootstrapd.pid') iRet = True @@ -658,12 +658,12 @@ def bootstrap_local(elts, lToxes, oArgs=None): LOG.info(f'bootstraping local') return bootstrap_udp(elts, lToxes, oArgs) -def lDNSClean(l): +def lDNSClean(l:list) -> list: global lDEAD_BS # list(set(l).difference(set(lDEAD_BS))) return [elt for elt in l if elt not in lDEAD_BS] -def lExitExcluder(oArgs, iPort=9051): +def lExitExcluder(oArgs, iPort:int =9051) -> list: """ https://raw.githubusercontent.com/nusenu/noContactInfo_Exit_Excluder/main/exclude_noContactInfo_Exits.py """ @@ -703,7 +703,7 @@ def lExitExcluder(oArgs, iPort=9051): aHOSTS = {} @functools.lru_cache(maxsize=20) -def sDNSLookup(host): +def sDNSLookup(host:str) -> str: global aHOSTS ipv = 0 if host in lDEAD_BS: @@ -784,7 +784,7 @@ def sDNSLookup(host): aHOSTS[host] = ip return ip -def bootstrap_udp(lelts, lToxes, oArgs=None): +def bootstrap_udp(lelts:list, lToxes:list, oArgs=None) -> None: lelts = lDNSClean(lelts) socket.setdefaulttimeout(15.0) for oTox in lToxes: @@ -828,7 +828,7 @@ def bootstrap_udp(lelts, lToxes, oArgs=None): # LOG.debug(f'bootstrap_udp to {host} not connected') pass -def bootstrap_tcp(lelts, lToxes, oArgs=None): +def bootstrap_tcp(lelts:list, lToxes:list, oArgs=None) -> None: lelts = lDNSClean(lelts) for oTox in lToxes: if hasattr(oTox, 'oArgs'): oArgs = oTox.oArgs @@ -874,7 +874,7 @@ def bootstrap_tcp(lelts, lToxes, oArgs=None): # +f" last={int(oTox.mycon_time)}" ) pass -def iNmapInfoNmap(sProt, sHost, sPort, key=None, environ=None, cmd=''): +def iNmapInfoNmap(sProt:str, sHost:str, sPort:str, key=None, environ=None, cmd:str = '') -> int: if sHost in ['-', 'NONE']: return 0 if not nmap: return 0 nmps = nmap.PortScanner @@ -893,7 +893,7 @@ def iNmapInfoNmap(sProt, sHost, sPort, key=None, environ=None, cmd=''): LOG.info(f"iNmapInfoNmap: to {sHost} {state}") return 0 -def iNmapInfo(sProt, sHost, sPort, key=None, environ=None, cmd='nmap'): +def iNmapInfo(sProt:str, sHost:str, sPort:str, key=None, environ=None, cmd:str = 'nmap'): if sHost in ['-', 'NONE']: return 0 sFile = os.path.join("/tmp", f"{sHost}.{os.getpid()}.nmap") if sProt in ['socks', 'socks5', 'tcp4']: @@ -917,7 +917,7 @@ def iNmapInfo(sProt, sHost, sPort, key=None, environ=None, cmd='nmap'): # bootstrap_iNmapInfo(lElts, self._args, sProt) -def bootstrap_iNmapInfo(lElts, oArgs, protocol="tcp4", bIS_LOCAL=False, iNODES=iNODES, cmd='nmap'): +def bootstrap_iNmapInfo(lElts:list, oArgs, protocol:str = "tcp4", bIS_LOCAL:bool = False, iNODES:int = iNODES, cmd:str = 'nmap') -> bool: if not bIS_LOCAL and not bAreWeConnected(): LOG.warn(f"bootstrap_iNmapInfo not local and NOT CONNECTED") return True @@ -953,7 +953,7 @@ def bootstrap_iNmapInfo(lElts, oArgs, protocol="tcp4", bIS_LOCAL=False, iNODES=i lRetval += [False] return any(lRetval) -def caseFactory(cases): +def caseFactory(cases:list) -> list: """We want the tests run in order.""" if len(cases) > 1: ordered_cases = sorted(cases, key=lambda f: inspect.findsource(f)[1]) diff --git a/wrapper_tests/tests_wrapper.py b/wrapper_tests/tests_wrapper.py index 27c8e7c..080ae42 100644 --- a/wrapper_tests/tests_wrapper.py +++ b/wrapper_tests/tests_wrapper.py @@ -39,6 +39,7 @@ import threading import traceback import unittest from ctypes import * +from typing import Union, Callable, Union faulthandler.enable() @@ -185,7 +186,7 @@ class BaseThread(threading.Thread): self._stop_thread = False self.name = name - def stop_thread(self, timeout=-1) -> None: + def stop_thread(self, timeout: int = -1) -> None: self._stop_thread = True if timeout < 0: timeout = ts.iTHREAD_TIMEOUT @@ -213,7 +214,7 @@ bob = alice = None def prepare(self): global bob, alice - def bobs_on_self_connection_status(iTox, connection_state, *args): + def bobs_on_self_connection_status(iTox, connection_state, *args) -> None: status = connection_state self.bob.dht_connected = status self.bob.mycon_time = time.time() @@ -232,7 +233,7 @@ def prepare(self): if self.bob.self_get_connection_status() != status: LOG_WARN(f"bobs_on_self_connection_status DISAGREE {status}") - def alices_on_self_connection_status(iTox, connection_state, *args): + def alices_on_self_connection_status(iTox, connection_state: int, *args) -> None: #FixMe connection_num status = connection_state self.alice.dht_connected = status @@ -371,12 +372,12 @@ class ToxSuite(unittest.TestCase): self.bBobNeedAlice() self.bAliceNeedAddBob() - def run(self, result=None): + def run(self, result=None) -> None: """ Stop after first error """ if not result.errors: super(ToxSuite, self).run(result) - def get_connection_status(self): + def get_connection_status(self) -> None: if self.bob.mycon_time <= 1 or self.alice.mycon_time <= 1: pass # drop through @@ -392,7 +393,7 @@ class ToxSuite(unittest.TestCase): return False return True - def loop(self, n): + def loop(self, n) -> None: """ t:iterate t:iteration_interval @@ -403,7 +404,7 @@ class ToxSuite(unittest.TestCase): self.bob.iterate() sleep(interval / 1000.0) - def call_bootstrap(self, num=None, lToxes=None, i=0): + def call_bootstrap(self, num: Union[int, None] = None, lToxes:list[int] =None, i:int =0) -> None: if num == None: num=ts.iNODES if lToxes is None: lToxes = [self.alice, self.bob] @@ -425,7 +426,7 @@ class ToxSuite(unittest.TestCase): LOG.debug(f"call_bootstrap ts.bootstrap_tcp {len(lElts)}") ts.bootstrap_tcp(lElts, lToxes) - def group_until_connected(self, otox, group_number, num=None, iMax=THRESHOLD): + def group_until_connected(self, otox, group_number:int, num: Union[int, None] = None, iMax:int = THRESHOLD) -> None: """ """ i = 0 @@ -463,7 +464,7 @@ class ToxSuite(unittest.TestCase): +f" last={int(otox.mycon_time)}" ) return False - def loop_until_connected(self, num=None): + def loop_until_connected(self, num: Union[int, None] = None) -> None: """ t:on_self_connection_status t:self_get_connection_status @@ -519,7 +520,7 @@ class ToxSuite(unittest.TestCase): +f" last={int(self.bob.mycon_time)}" ) return False - def wait_objs_attr(self, objs, attr): + def wait_objs_attr(self, objs: list, attr: str) -> bool: global THRESHOLD i = 0 while i <= THRESHOLD: @@ -539,7 +540,7 @@ class ToxSuite(unittest.TestCase): return all([getattr(obj, attr) is not None for obj in objs]) - def wait_otox_attrs(self, obj, attrs): + def wait_otox_attrs(self, obj: list, attrs: list[str]) -> bool: assert all(attrs), f"wait_otox_attrs {attrs}" i = 0 while i <= THRESHOLD: @@ -562,7 +563,7 @@ class ToxSuite(unittest.TestCase): return all([getattr(obj, attr) for attr in attrs]) - def wait_ensure_exec(self, method, args): + def wait_ensure_exec(self, method, args:list) -> bool: i = 0 oRet = None while i <= THRESHOLD: @@ -593,7 +594,7 @@ class ToxSuite(unittest.TestCase): return oRet - def bob_add_alice_as_friend_norequest(self): + def bob_add_alice_as_friend_norequest(self) -> bool: if not self.bBobNeedAlice(): return True MSG = 'Hi, this is Bob.' @@ -608,7 +609,7 @@ class ToxSuite(unittest.TestCase): assert self.bob.self_get_friend_list_size() >= 1 return True - def alice_add_bob_as_friend_norequest(self): + def alice_add_bob_as_friend_norequest(self) -> bool: if not self.bAliceNeedAddBob(): return True iRet = self.alice.friend_add_norequest(self.bob._address) @@ -622,7 +623,7 @@ class ToxSuite(unittest.TestCase): assert self.alice.self_get_friend_list_size() >= 1 return True - def both_add_as_friend(self): + def both_add_as_friend(self) -> bool: if bUSE_NOREQUEST: assert self.bob_add_alice_as_friend() assert self.alice_add_bob_as_friend_norequest() @@ -635,7 +636,7 @@ class ToxSuite(unittest.TestCase): LOG.warn("both_add_as_friend no alice, abid") return True - def both_add_as_friend_norequest(self): + def both_add_as_friend_norequest(self) -> bool: if self.bBobNeedAlice(): assert self.bob_add_alice_as_friend_norequest() if self.bAliceNeedAddBob(): @@ -650,7 +651,7 @@ class ToxSuite(unittest.TestCase): #? assert self.bob.friend_get_last_online(self.baid) is not None return True - def bob_add_alice_as_friend(self): + def bob_add_alice_as_friend(self) -> bool: """ t:friend_add t:on_friend_request @@ -664,7 +665,7 @@ class ToxSuite(unittest.TestCase): public_key, message_data, message_data_size, - *largs): + *largs) -> None: LOG_DEBUG(f"alices_on_friend_request: " +repr(message_data)) try: assert str(message_data, 'UTF-8') == MSG @@ -697,7 +698,7 @@ class ToxSuite(unittest.TestCase): return True - def alice_add_bob_as_friend(self): + def alice_add_bob_as_friend(self) -> bool: """ t:friend_add t:on_friend_request @@ -711,7 +712,7 @@ class ToxSuite(unittest.TestCase): public_key, message_data, message_data_size, - *largs): + *largs) -> None: LOG_DEBUG(f"bobs_on_friend_request: " +repr(message_data)) try: assert str(message_data, 'UTF-8') == MSG @@ -743,7 +744,7 @@ class ToxSuite(unittest.TestCase): self.bob.callback_friend_message(None) return True - def bob_add_alice_as_friend_and_status(self): + def bob_add_alice_as_friend_and_status(self) -> bool: if bUSE_NOREQUEST: assert self.bob_add_alice_as_friend_norequest() else: @@ -752,31 +753,28 @@ class ToxSuite(unittest.TestCase): #: Wait until both are online sSlot = 'friend_conn_status' setattr(self.bob, sSlot, False) - def bobs_on_friend_connection_status(iTox, friend_id, iStatus, *largs): + def bobs_on_friend_connection_status(iTox, friend_id, iStatus, *largs) -> None: LOG_INFO(f"bobs_on_friend_connection_status {friend_id} ?>=0" +repr(iStatus)) - if iStatus > 0: - self.bob.friend_conn_status = True + setattr(self.bob, sSlot, False) - self.bob.friend_status = None - def bobs_on_friend_status(iTox, friend_id, iStatus, *largs): + sSlot = 'friend_status' + setattr(self.bob, sSlot, None) + def bobs_on_friend_status(iTox, friend_id, iStatus, *largs) -> None: LOG_INFO(f"bobs_on_friend_status {friend_id} ?>=0" +repr(iStatus)) - if iStatus > 0: - self.bob.friend_status = True + setattr(self.bob, sSlot, False) - self.alice.friend_conn_status = None - def alices_on_friend_connection_status(iTox, friend_id, iStatus, *largs): + sSlot = 'friend_conn_status' + setattr(self.alice, sSlot, None) + def alices_on_friend_connection_status(iTox, friend_id, iStatus, *largs) -> None: LOG_INFO(f"alices_on_friend_connection_status {friend_id} ?>=0 " +repr(iStatus)) - if iStatus > 0: - self.alice.friend_conn_status = True + setattr(self.alice, sSlot, False) - self.alice.friend_status = False - def alices_on_friend_status(iTox, friend_id, iStatus, *largs): + sSlot = 'friend_status' + setattr(self.alice, sSlot, None) + def alices_on_friend_status(iTox, friend_id, iStatus, *largs) -> None: LOG_INFO(f"alices_on_friend_status {friend_id} ?>=0 " +repr(iStatus)) - if iStatus > 0: - self.alice.friend_status = True + setattr(self.alice, sSlot, False) - self.alice.callback_friend_connection_status(alices_on_friend_connection_status) - self.alice.callback_friend_status(alices_on_friend_status) try: LOG.info("bob_add_alice_as_friend_and_status waiting for alice connections") if not self.wait_otox_attrs(self.alice, @@ -786,6 +784,8 @@ class ToxSuite(unittest.TestCase): self.bob.callback_friend_connection_status(bobs_on_friend_connection_status) self.bob.callback_friend_status(bobs_on_friend_status) + self.alice.callback_friend_connection_status(alices_on_friend_connection_status) + self.alice.callback_friend_status(alices_on_friend_status) LOG.info("bob_add_alice_as_friend_and_status waiting for bob connections") if not self.wait_otox_attrs(self.bob, @@ -803,7 +803,7 @@ class ToxSuite(unittest.TestCase): self.bob.callback_friend_status(None) return True - def bob_to_alice_connected(self): + def bob_to_alice_connected(self) -> bool: assert hasattr(self, 'baid') iRet = self.bob.friend_get_connection_status(self.baid) if iRet == TOX_CONNECTION['NONE']: @@ -811,7 +811,7 @@ class ToxSuite(unittest.TestCase): return False return True - def alice_to_bob_connected(self): + def alice_to_bob_connected(self) -> bool: assert hasattr(self, 'abid') iRet = self.alice.friend_get_connection_status(self.abid) if iRet == TOX_CONNECTION['NONE']: @@ -824,7 +824,7 @@ class ToxSuite(unittest.TestCase): group_name='test_group', nick='test_nick', topic='Test Topic', # str - ): + ) -> int: privacy_state = enums.TOX_GROUP_PRIVACY_STATE['PUBLIC'] iGrp = otox.group_new(privacy_state, group_name, nick) @@ -850,33 +850,36 @@ class ToxSuite(unittest.TestCase): LOG.info(f"group pK={sPk} iGrp={iGrp} n={otox.group_get_number_groups()}") return iGrp - def otox_verify_group(self, otox, iGrp): + def otox_verify_group(self, otox, iGrp) -> int: """ group_self_get_name group_self_get_peer_id group_self_get_public_key group_self_get_role group_self_get_status + group_self_set_name """ group_number = iGrp - assert type(iGrp) == int, "otox_test_groups_join iGrp not an int" - assert iGrp < UINT32_MAX, "otox_test_groups_join iGrp failure UINT32_MAX" - assert iGrp >= 0, f"otox_test_groups_join iGrp={iGrp} < 0" - sGrp = otox.group_get_chat_id(iGrp) - assert len(sGrp) == enums.TOX_GROUP_CHAT_ID_SIZE * 2, \ - f"group sGrp={sGrp} {len(sGrp)} != {enums.TOX_GROUP_CHAT_ID_SIZE * 2}" - sPk = otox.group_self_get_public_key(iGrp) - LOG.info(f"otox_verify_group sPk={sPk} iGrp={iGrp} n={otox.group_get_number_groups()}") - - sName = otox.group_self_get_name(iGrp) - iStat = otox.group_self_get_status(iGrp) - iId = otox.group_self_get_peer_id(iGrp) - iRole = otox.group_self_get_role(iGrp) - iStat = otox.group_self_get_status(iGrp) - LOG.info(f"otox_verify_group sName={sName} iStat={iStat} iId={iId} iRole={iRole} iStat={iStat}") - try: + assert type(iGrp) == int, "otox_test_groups_join iGrp not an int" + assert iGrp < UINT32_MAX, "otox_test_groups_join iGrp failure UINT32_MAX" + assert iGrp >= 0, f"otox_test_groups_join iGrp={iGrp} < 0" + sGrp = otox.group_get_chat_id(iGrp) + assert len(sGrp) == enums.TOX_GROUP_CHAT_ID_SIZE * 2, \ + f"group sGrp={sGrp} {len(sGrp)} != {enums.TOX_GROUP_CHAT_ID_SIZE * 2}" + sPk = otox.group_self_get_public_key(iGrp) + LOG.info(f"otox_verify_group sPk={sPk} iGrp={iGrp} n={otox.group_get_number_groups()}") + + sName = otox.group_self_get_name(iGrp) + iStat = otox.group_self_get_status(iGrp) + iId = otox.group_self_get_peer_id(iGrp) + iRole = otox.group_self_get_role(iGrp) + iStat = otox.group_self_get_status(iGrp) + LOG.info(f"otox_verify_group sName={sName} iStat={iStat} iId={iId} iRole={iRole} iStat={iStat}") + + assert otox.group_self_set_name(iGrp, "NewName") + bRet = otox.group_is_connected(iGrp) except Exception as e: LOG.warn(f"group_is_connected EXCEPTION {e}") @@ -931,7 +934,7 @@ class ToxSuite(unittest.TestCase): group_name='test_group', nick='test_nick', topic='Test Topic', # str - ): + ) -> int: try: iGrp = self.otox_test_groups_create(otox, group_name, nick, topic) @@ -949,7 +952,7 @@ class ToxSuite(unittest.TestCase): # tox.callback_group_peer_join return iGrp - def wait_friend_get_connection_status(self, otox, fid, n=iN): + def wait_friend_get_connection_status(self, otox, fid:int, n:int = iN) -> int: i = 0 while i < n: iRet = otox.friend_get_connection_status(fid) @@ -964,18 +967,18 @@ class ToxSuite(unittest.TestCase): LOG.error(f"wait_friend_get_connection_status n={n}") return False - def warn_if_no_cb(self, alice, sSlot): + def warn_if_no_cb(self, alice, sSlot:str) -> None: if not hasattr(alice, sSlot+'_cb') or \ not getattr(alice, sSlot+'_cb'): LOG.warning(f"self.bob.{sSlot}_cb NOT EXIST") - def warn_if_cb(self, alice, sSlot): + def warn_if_cb(self, alice, sSlot:str) -> None: if hasattr(self.bob, sSlot+'_cb') and \ getattr(self.bob, sSlot+'_cb'): LOG.warning(f"self.bob.{sSlot}_cb EXIST") # tests are executed in order - def test_notice_log(self): # works + def test_notice_log(self) -> None: # works notice = '/var/lib/tor/.SelekTOR/3xx/cache/9050/notice.log' if os.path.exists(notice): iRet = os.system(f"sudo sed -e '1,/.notice. Bootstrapped 100%/d' {notice}" + \ @@ -993,7 +996,7 @@ class ToxSuite(unittest.TestCase): self.assertEqual(cm.output, ['INFO:foo:first message', 'ERROR:foo.bar:second message']) - def test_tests_start(self): # works + def test_tests_start(self) -> None: # works """ t:hash t:kill @@ -1012,7 +1015,7 @@ class ToxSuite(unittest.TestCase): assert self.bob.self_get_address() == self.bob._address - def test_bootstrap_local_netstat(self): # works + def test_bootstrap_local_netstat(self) -> None: # works """ t:callback_file_chunk_request t:callback_file_recv @@ -1043,7 +1046,7 @@ class ToxSuite(unittest.TestCase): else: LOG.warning(f"bootstrap_local_netstat NOT {port} iStatus={iStatus}") - def test_bootstrap_local(self): # works + def test_bootstrap_local(self) -> None: # works """ t:call_bootstrap t:add_tcp_relay @@ -1081,7 +1084,7 @@ class ToxSuite(unittest.TestCase): LOG.warning(f"bootstrap_local NOT CONNECTED iStatus={iStatus}") return False - def test_bootstrap_iNmapInfo(self): # works + def test_bootstrap_iNmapInfo(self) -> None: # works # if os.environ['USER'] != 'root': # return iStatus = self.bob.self_get_connection_status() @@ -1097,7 +1100,7 @@ class ToxSuite(unittest.TestCase): # assert ts.bootstrap_iNmapInfo(lElts, oTOX_OARGS, bIS_LOCAL, iNODES=8) - def test_self_get_secret_key(self): # works + def test_self_get_secret_key(self) -> None: # works """ t:self_get_secret_key """ @@ -1110,7 +1113,7 @@ class ToxSuite(unittest.TestCase): assert len(str(oRet0)) del secret_key - def test_self_get_public_keys(self): # works + def test_self_get_public_keys(self) -> None: # works """ t:self_get_secret_key t:self_get_public_key @@ -1125,7 +1128,7 @@ class ToxSuite(unittest.TestCase): LOG.info('test_self_get_public_keys ' +repr(oRet1)) assert oRet0 != oRet1, repr(oRet0) +' != ' +repr(oRet1) - def test_self_name(self): # works + def test_self_name(self) -> None: # works """ t:self_set_name t:self_get_name @@ -1140,7 +1143,7 @@ class ToxSuite(unittest.TestCase): @unittest.skip('loud') @unittest.skipIf(bIS_NOT_TOXYGEN or oTOX_OARGS.mode == 0, 'not testing in toxygen') - def test_sound_notification(self): # works + def test_sound_notification(self) -> None: # works """ Plays sound notification :param type of notification @@ -1148,7 +1151,7 @@ class ToxSuite(unittest.TestCase): from tests.toxygen_tests import test_sound_notification test_sound_notification(self) - def test_address(self): # works + def test_address(self) -> None: # works """ t:self_get_address t:self_get_nospam @@ -1166,7 +1169,7 @@ class ToxSuite(unittest.TestCase): pk, sk = self.alice.self_get_keys() assert pk == self.alice.self_get_address()[:CLIENT_ID_SIZE] - def test_status_message(self): # works + def test_status_message(self) -> None: # works """ t:self_get_status_message t:self_get_status_message_size @@ -1178,7 +1181,7 @@ class ToxSuite(unittest.TestCase): self.alice.self_get_status_message() +' is not ' +MSG assert self.alice.self_get_status_message_size() == len(MSG) - def test_self_get_udp_port(self): # works + def test_self_get_udp_port(self) -> None: # works """ t:self_get_udp_port """ @@ -1190,7 +1193,7 @@ class ToxSuite(unittest.TestCase): LOG.info('self_get_udp_port bob ' +repr(o)) assert o > 0 - def test_self_get_tcp_port(self): # works + def test_self_get_tcp_port(self) -> None: # works """ t:self_get_tcp_port """ @@ -1201,7 +1204,7 @@ class ToxSuite(unittest.TestCase): o = self.bob.self_get_tcp_port() LOG.info('self_get_tcp_port ' +repr(o)) - def test_get_dht_id(self): # works + def test_get_dht_id(self) -> None: # works """ t:self_get_dht_id """ @@ -1210,7 +1213,7 @@ class ToxSuite(unittest.TestCase): o2 = self.bob.self_get_dht_id() assert len(o2) == 64 - def test_bob_add_alice_as_friend_norequest(self): # works + def test_bob_add_alice_as_friend_norequest(self) -> None: # works """ t:friend_delete t:friend_exists @@ -1228,7 +1231,7 @@ class ToxSuite(unittest.TestCase): self.bob.friend_delete(self.baid) - def test_alice_add_bob_as_friend_norequest(self): # works - intermittent failures + def test_alice_add_bob_as_friend_norequest(self) -> None: # works - intermittent failures """ t:friend_delete t:friend_exists @@ -1244,7 +1247,7 @@ class ToxSuite(unittest.TestCase): if hasattr(self, 'abid') and self.abid >= 0: self.alice.friend_delete(self.abid) - def test_both_add_as_friend_norequest(self): # works + def test_both_add_as_friend_norequest(self) -> None: # works """ t:friend_delete t:friend_exists @@ -1270,7 +1273,7 @@ class ToxSuite(unittest.TestCase): self.alice.friend_delete(self.abid) assert len(self.alice.self_get_friend_list()) == 0 - def test_bob_add_alice_as_friend_and_status(self): + def test_bob_add_alice_as_friend_and_status(self) -> None: """ t:friend_delete t:friend_exists @@ -1283,35 +1286,35 @@ class ToxSuite(unittest.TestCase): self.bob.friend_delete(self.baid) @unittest.skip('unfinished') - def test_alice_add_bob_as_friend_and_status(self): + def test_alice_add_bob_as_friend_and_status(self) -> None: assert self.alice_add_bob_as_friend_and_status() if hasattr(self, 'abid') and self.abid >= 0: self.alice.friend_delete(self.abid) - def test_loop_until_connected(self): # works + def test_loop_until_connected(self) -> None: # works assert self.loop_until_connected() - def test_bob_assert_connection_status(self): # works + def test_bob_assert_connection_status(self) -> None: # works if self.bob.self_get_connection_status() == TOX_CONNECTION['NONE']: AssertionError("ERROR: NOT CONNECTED " \ +repr(self.bob.self_get_connection_status())) - def test_alice_assert_connection_status(self): # works + def test_alice_assert_connection_status(self) -> None: # works if self.alice.self_get_connection_status() == TOX_CONNECTION['NONE']: AssertionError("ERROR: NOT CONNECTED " \ +repr(self.alice.self_get_connection_status())) - def test_bob_assert_mycon_status(self): # works + def test_bob_assert_mycon_status(self) -> None: # works if self.bob.mycon_status == False: AssertionError("ERROR: NOT CONNECTED " \ +repr(self.bob.mycon_status)) - def test_alice_assert_mycon_status(self): # works + def test_alice_assert_mycon_status(self) -> None: # works if self.alice.mycon_status == False: AssertionError("ERROR: NOT CONNECTED " \ +repr(self.alice.mycon_status)) - def test_bob_add_alice_as_friend(self): # works? + def test_bob_add_alice_as_friend(self) -> None: # works? try: if bUSE_NOREQUEST: assert self.bob_add_alice_as_friend_norequest() @@ -1331,7 +1334,7 @@ class ToxSuite(unittest.TestCase): if len(self.bob.self_get_friend_list()) > 0: LOG.warn(f"WTF bob.self_get_friend_list() {bob.self_get_friend_list()}") - def test_alice_add_bob_as_friend(self): # works! + def test_alice_add_bob_as_friend(self) -> None: # works! try: if bUSE_NOREQUEST: assert self.alice_add_bob_as_friend_norequest() @@ -1357,7 +1360,7 @@ class ToxSuite(unittest.TestCase): if len(self.alice.self_get_friend_list()) > 0: LOG.warn(f"WTF alice.self_get_friend_list() {alice.self_get_friend_list()}") - def test_both_add_as_friend(self): # works + def test_both_add_as_friend(self) -> None: # works try: if bUSE_NOREQUEST: assert self.both_add_as_friend_norequest() @@ -1375,11 +1378,12 @@ class ToxSuite(unittest.TestCase): if hasattr(self,'abid') and self.abid >= 0: self.alice.friend_delete(self.abid) - def test_groups_join(self): + def test_groups_join(self) -> None: """ t:group_join t:group_disconnect t:group_leave + t:group_self_set_name """ if not self.get_connection_status(): LOG.warning(f"test_groups_join NOT CONNECTED") @@ -1399,7 +1403,7 @@ class ToxSuite(unittest.TestCase): LOG.error(f"bob.group_leave EXCEPTION {e}") raise - def test_groups(self): + def test_groups(self) -> None: """ t:group_new t:group_disconnect @@ -1408,12 +1412,13 @@ class ToxSuite(unittest.TestCase): t:group_get_topic t:group_get_topic_size t:group_get_privacy_state + t:group_self_set_name + t:group_get_number_groups t:group_founder_set_password t:group_founder_set_peer_limit t:group_founder_set_privacy_state t:group_get_chat_id - t:group_get_number_groups t:group_get_password t:group_get_password_size t:group_get_peer_limit @@ -1439,7 +1444,7 @@ class ToxSuite(unittest.TestCase): #? @unittest.skip("double free or corruption (fasttop)") @expectedFail('fails') # assertion fails on == MSG - def test_on_friend_status_message(self): # fails + def test_on_friend_status_message(self) -> None: # fails """ t:self_set_status_message t:self_get_status_message @@ -1452,7 +1457,7 @@ class ToxSuite(unittest.TestCase): MSG = 'Happy' sSlot = 'friend_status_message' - def bob_on_friend_status_message(iTox, friend_id, new_status_message, new_status_size, *largs): + def bob_on_friend_status_message(iTox, friend_id, new_status_message, new_status_size, *largs) -> None: LOG_INFO(f"BOB_ON_friend_status_message friend_id={friend_id} " \ +f"new_status_message={new_status_message}") try: @@ -1497,7 +1502,7 @@ class ToxSuite(unittest.TestCase): if hasattr(self, 'abid') and self.abid >= 0: self.alice.friend_delete(self.abid) - def test_friend(self): # works! sometimes + def test_friend(self) -> None: # works! sometimes """ t:friend_get_name t:friend_get_name_size @@ -1537,7 +1542,7 @@ class ToxSuite(unittest.TestCase): self.alice.friend_delete(self.abid) @expectedFail('fails') # assert self.bob.friend_get_status(self.baid) == TOX_USER_STATUS['BUSY'] - def test_user_status(self): # fails + def test_user_status(self) -> None: # fails """ t:self_get_status t:self_set_status @@ -1548,7 +1553,7 @@ class ToxSuite(unittest.TestCase): sSlot = 'friend_status' setattr(self.bob, sSlot, None) - def bobs_on_friend_set_status(iTox, friend_id, new_status, *largs): + def bobs_on_friend_set_status(iTox, friend_id, new_status, *largs) -> None: LOG_INFO(f"bobs_on_friend_set_status {friend_id} {new_status}") try: assert friend_id == self.baid @@ -1591,7 +1596,7 @@ class ToxSuite(unittest.TestCase): self.bob.friend_delete(self.baid) @unittest.skip('crashes') - def test_kill_remake(self): + def test_kill_remake(self) -> None: """ t:friend_get_kill_remake t:on_friend_connection_status @@ -1636,7 +1641,7 @@ class ToxSuite(unittest.TestCase): self.bob.friend_delete(self.baid) @expectedFail('fails') # new name is empty - def test_friend_name(self): # works! + def test_friend_name(self) -> None: # works! """ t:self_set_name t:friend_get_name @@ -1650,7 +1655,7 @@ class ToxSuite(unittest.TestCase): #: Test friend name NEWNAME = 'Jenny' - def bobs_on_friend_name(iTox, fid, newname, iNameSize, *largs): + def bobs_on_friend_name(iTox, fid:int, newname, iNameSize, *largs) -> None: LOG_INFO(f"bobs_on_friend_name {sSlot} {fid}") try: assert fid == self.baid @@ -1693,7 +1698,7 @@ class ToxSuite(unittest.TestCase): @expectedFail('fails') # This client is currently not connected to the friend. - def test_friend_message(self): # fails + def test_friend_message(self) -> None: # fails """ t:on_friend_action t:on_friend_message @@ -1704,7 +1709,7 @@ class ToxSuite(unittest.TestCase): MSG = 'Hi, Bob!' sSlot = 'friend_message' - def alices_on_friend_message(iTox, fid, msg_type, message, iSize, *largs): + def alices_on_friend_message(iTox, fid:int, msg_type, message, iSize, *largs) -> None: LOG_DEBUG(f"alices_on_friend_message {fid} {message}") try: assert fid == self.alice.abid @@ -1761,7 +1766,7 @@ class ToxSuite(unittest.TestCase): self.alice.friend_delete(self.abid) # This client is currently not connected to the friend. - def test_friend_action(self): # works! sometimes? + def test_friend_action(self) -> None: # works! sometimes? """ t:on_friend_action t:on_friend_message @@ -1772,7 +1777,7 @@ class ToxSuite(unittest.TestCase): ACTION = 'Kick' sSlot = 'friend_read_action' setattr(self.bob, sSlot, None) - def UNUSEDtheir_on_friend_action(iTox, fid, msg_type, action, *largs): + def UNUSEDtheir_on_friend_action(iTox, fid:int, msg_type, action, *largs): LOG_DEBUG(f"their_on_friend_action {fid} {msg_type} {action}") try: assert msg_type == TOX_MESSAGE_TYPE['ACTION'] @@ -1785,7 +1790,7 @@ class ToxSuite(unittest.TestCase): sSlot = 'friend_read_receipt' setattr(self.alice, sSlot, None) - def their_on_read_reciept(iTox, fid, msg_id, *largs): + def their_on_read_reciept(iTox, fid:int, msg_id, *largs) -> None: LOG_DEBUG(f"their_on_read_reciept {fid} {msg_id}") sSlot = 'friend_read_receipt' try: @@ -1842,7 +1847,7 @@ class ToxSuite(unittest.TestCase): if hasattr(self, 'abid') and self.abid >= 0: self.alice.friend_delete(self.abid) - def test_alice_typing_status(self): # works + def test_alice_typing_status(self) -> None: # works """ t:on_friend_read_receipt t:on_friend_typing @@ -1854,7 +1859,7 @@ class ToxSuite(unittest.TestCase): sSlot = 'friend_typing' LOG.info("test_typing_status bob adding alice") #: Test typing status - def bob_on_friend_typing(iTox, fid, is_typing, *largs): + def bob_on_friend_typing(iTox, fid:int, is_typing, *largs) -> None: LOG_INFO(f"BOB_ON_friend_typing is_typing={is_typing} fid={fid}") try: assert fid == self.baid @@ -1896,8 +1901,8 @@ class ToxSuite(unittest.TestCase): if hasattr(self, 'abid') and self.abid >= 0: self.alice.friend_delete(self.abid) - @unittest.skip('unfinished') - def test_file_transfer(self): # unfinished + @expectedFail('fails') # @unittest.skip('unfinished') + def test_file_transfer(self) -> None: # unfinished """ t:file_send t:file_send_chunk @@ -1931,7 +1936,7 @@ class ToxSuite(unittest.TestCase): CONTEXT = { 'FILE': bytes(), 'RECEIVED': 0, 'START': False, 'SENT': 0 } - def alice_on_file_recv(iTox, fid, file_number, kind, size, filename): + def alice_on_file_recv(iTox, fid:int, file_number:int, kind, size, filename) -> None: LOG_DEBUG(f"ALICE_ON_file_recv fid={fid} {file_number}") try: assert size == FILE_SIZE @@ -1944,7 +1949,7 @@ class ToxSuite(unittest.TestCase): else: LOG_INFO(f"ALICE_ON_file_recv " + str(fid)) - def alice_on_file_recv_control(iTox, fid, file_number, control, *largs): + def alice_on_file_recv_control(iTox, fid:int, file_number, control, *largs) -> None: # TOX_FILE_CONTROL = { 'RESUME': 0, 'PAUSE': 1, 'CANCEL': 2,} LOG_DEBUG(f"ALICE_ON_file_recv_control fid={fid} {file_number} {control}") try: @@ -1962,7 +1967,7 @@ class ToxSuite(unittest.TestCase): LOG_INFO(f"ALICE_ON_file_recv " + str(fid)) self.alice.completed = False - def alice_on_file_recv_chunk(iTox, fid, file_number, position, iNumBytes, *largs): + def alice_on_file_recv_chunk(iTox, fid:int, file_number:int, position:int, iNumBytes, *largs) -> None: LOG_DEBUG(f"ALICE_ON_file_recv_chunk {fid} {file_number}") # FixMe - use file_number and iNumBytes to get data? data = '' @@ -1995,7 +2000,7 @@ class ToxSuite(unittest.TestCase): self.alice.callback_file_recv_chunk(alice_on_file_recv_chunk) self.bob.completed = False - def bob_on_file_recv_control2(iTox, fid, file_number, control): + def bob_on_file_recv_control2(iTox, fid:int, file_number:int, control) -> None: LOG_DEBUG(f"BOB_ON_file_recv_control2 {fid} {file_number} control={control}") if control == TOX_FILE_CONTROL['RESUME']: CONTEXT['START'] = True @@ -2003,7 +2008,7 @@ class ToxSuite(unittest.TestCase): self.bob.completed = True pass - def bob_on_file_chunk_request(iTox, fid, file_number, position, length, *largs): + def bob_on_file_chunk_request(iTox, fid:int, file_number:int, position:int, length, *largs) -> None: LOG_DEBUG(f"BOB_ON_file_chunk_request {fid} {file_number}") if length == 0: return @@ -2081,14 +2086,13 @@ class ToxSuite(unittest.TestCase): LOG_INFO(f"test_file_transfer:: self.wait_objs_attr completed") #? @unittest.skip('crashes') - def test_tox_savedata(self): # works sorta + def test_tox_savedata(self) -> None: # works sorta """ t:get_savedata_size t:get_savedata """ # Fatal Python error: Aborted # "/var/local/src/toxygen_wrapper/wrapper/tox.py", line 180 in kill - return assert self.alice.get_savedata_size() > 0 data = self.alice.get_savedata() @@ -2116,14 +2120,14 @@ class ToxSuite(unittest.TestCase): else: LOG.info("passed test_tox_savedata") - def test_kill(self): # + def test_kill(self) -> None: # import threading LOG.info(f"THE END {threading.active_count()}") self.tearDown() LOG.info(f"THE END {threading.enumerate()}") -def vOargsToxPreamble(oArgs, Tox, ToxTest): +def vOargsToxPreamble(oArgs, Tox, ToxTest) -> None: ts.vSetupLogging(oArgs) @@ -2142,7 +2146,7 @@ def vOargsToxPreamble(oArgs, Tox, ToxTest): ### -def iMain(oArgs, failfast=True): +def iMain(oArgs, failfast=True) -> int: # collect_types.init_types_collection() @@ -2229,7 +2233,7 @@ def oArgparse(lArgv): return oArgs -def main(lArgs=None): +def main(lArgs=None) -> int: global oTOX_OARGS if lArgs is None: lArgs = [] oArgs = oArgparse(lArgs)