gajim3/gajim/common/helpers.py

1443 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (C) 2003-2014 Yann Leboulanger <asterix AT lagaule.org>
# Copyright (C) 2005-2006 Dimitur Kirov <dkirov AT gmail.com>
# Nikos Kouremenos <kourem AT gmail.com>
# Copyright (C) 2006 Alex Mauer <hawke AT hawkesnest.net>
# Copyright (C) 2006-2007 Travis Shirk <travis AT pobox.com>
# Copyright (C) 2006-2008 Jean-Marie Traissard <jim AT lapin.org>
# Copyright (C) 2007 Lukas Petrovicky <lukas AT petrovicky.net>
# James Newton <redshodan AT gmail.com>
# Julien Pivotto <roidelapluie AT gmail.com>
# Copyright (C) 2007-2008 Stephan Erb <steve-e AT h3c.de>
# Copyright (C) 2008 Brendan Taylor <whateley AT gmail.com>
# Jonathan Schleifer <js-gajim AT webkeks.org>
#
# This file is part of Gajim.
#
# Gajim is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation; version 3 only.
#
# Gajim is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
from typing import Any # pylint: disable=unused-import
from typing import Dict # pylint: disable=unused-import
import sys
import re
import os
import subprocess
import base64
import hashlib
import shlex
import socket
import logging
import json
import copy
import collections
import platform
import functools
from collections import defaultdict
import random
import weakref
import inspect
import string
import webbrowser
from string import Template
import urllib
from urllib.parse import unquote
from encodings.punycode import punycode_encode
from functools import wraps
from pathlib import Path
from packaging.version import Version as V
from nbxmpp.namespaces import Namespace
from nbxmpp.const import Role
from nbxmpp.const import ConnectionProtocol
from nbxmpp.const import ConnectionType
from nbxmpp.structs import ProxyData
from nbxmpp.protocol import JID
from nbxmpp.protocol import InvalidJid
from OpenSSL.crypto import load_certificate
from OpenSSL.crypto import FILETYPE_PEM
from gi.repository import Gio
from gi.repository import GLib
import precis_i18n.codec # pylint: disable=unused-import
from gajim.common import app
from gajim.common import configpaths
from gajim.common.i18n import Q_
from gajim.common.i18n import _
from gajim.common.i18n import ngettext
from gajim.common.i18n import get_rfc5646_lang
from gajim.common.const import ShowConstant
from gajim.common.const import Display
from gajim.common.const import URIType
from gajim.common.const import URIAction
from gajim.common.const import GIO_TLS_ERRORS
from gajim.common.const import SHOW_LIST
from gajim.common.regex import INVALID_XML_CHARS_REGEX
from gajim.common.regex import STH_AT_STH_DOT_STH_REGEX
from gajim.common.structs import URI
log = logging.getLogger('gajim.c.helpers')
special_groups = (_('Transports'),
_('Not in contact list'),
_('Observers'),
_('Group chats'))
URL_REGEX = re.compile(
r"(www\.(?!\.)|[a-z][a-z0-9+.-]*://)[^\s<>'\"]+[^!,\.\s<>\)'\"\]]")
class InvalidFormat(Exception):
pass
def parse_jid(jidstring):
try:
return str(validate_jid(jidstring))
except Exception as error:
raise InvalidFormat(error)
def idn_to_ascii(host):
"""
Convert IDN (Internationalized Domain Names) to ACE (ASCII-compatible
encoding)
"""
from encodings import idna
labels = idna.dots.split(host)
converted_labels = []
for label in labels:
if label:
converted_labels.append(idna.ToASCII(label).decode('utf-8'))
else:
converted_labels.append('')
return ".".join(converted_labels)
def ascii_to_idn(host):
"""
Convert ACE (ASCII-compatible encoding) to IDN (Internationalized Domain
Names)
"""
from encodings import idna
labels = idna.dots.split(host)
converted_labels = []
for label in labels:
converted_labels.append(idna.ToUnicode(label))
return ".".join(converted_labels)
def puny_encode_url(url):
_url = url
if '//' not in _url:
_url = '//' + _url
try:
o = urllib.parse.urlparse(_url)
p_loc = idn_to_ascii(o.hostname)
except Exception:
log.debug('urlparse failed: %s', url)
return False
return url.replace(o.hostname, p_loc)
def parse_resource(resource):
"""
Perform stringprep on resource and return it
"""
if not resource:
return None
try:
return resource.encode('OpaqueString').decode('utf-8')
except UnicodeError:
raise InvalidFormat('Invalid character in resource.')
def windowsify(word):
if os.name == 'nt':
return word.capitalize()
return word
def get_uf_show(show, use_mnemonic=False):
"""
Return a userfriendly string for dnd/xa/chat and make all strings
translatable
If use_mnemonic is True, it adds _ so GUI should call with True for
accessibility issues
"""
if isinstance(show, ShowConstant):
show = show.name.lower()
if show == 'dnd':
if use_mnemonic:
uf_show = _('_Busy')
else:
uf_show = _('Busy')
elif show == 'xa':
if use_mnemonic:
uf_show = _('_Not Available')
else:
uf_show = _('Not Available')
elif show == 'chat':
if use_mnemonic:
uf_show = _('_Free for Chat')
else:
uf_show = _('Free for Chat')
elif show == 'online':
if use_mnemonic:
uf_show = Q_('?user status:_Available')
else:
uf_show = Q_('?user status:Available')
elif show == 'connecting':
uf_show = _('Connecting')
elif show == 'away':
if use_mnemonic:
uf_show = _('A_way')
else:
uf_show = _('Away')
elif show == 'offline':
if use_mnemonic:
uf_show = _('_Offline')
else:
uf_show = _('Offline')
elif show == 'not in roster':
uf_show = _('Not in contact list')
elif show == 'requested':
uf_show = Q_('?contact has status:Unknown')
else:
uf_show = Q_('?contact has status:Has errors')
return uf_show
def get_uf_sub(sub):
if sub == 'none':
uf_sub = Q_('?Subscription we already have:None')
elif sub == 'to':
uf_sub = _('To')
elif sub == 'from':
uf_sub = _('From')
elif sub == 'both':
uf_sub = _('Both')
else:
uf_sub = _('Unknown')
return uf_sub
def get_uf_ask(ask):
if ask is None:
uf_ask = Q_('?Ask (for Subscription):None')
elif ask == 'subscribe':
uf_ask = _('Subscribe')
else:
uf_ask = ask
return uf_ask
def get_uf_role(role, plural=False):
''' plural determines if you get Moderators or Moderator'''
if not isinstance(role, str):
role = role.value
if role == 'none':
role_name = Q_('?Group Chat Contact Role:None')
elif role == 'moderator':
if plural:
role_name = _('Moderators')
else:
role_name = _('Moderator')
elif role == 'participant':
if plural:
role_name = _('Participants')
else:
role_name = _('Participant')
elif role == 'visitor':
if plural:
role_name = _('Visitors')
else:
role_name = _('Visitor')
return role_name
def get_uf_affiliation(affiliation, plural=False):
'''Get a nice and translated affilition for muc'''
if not isinstance(affiliation, str):
affiliation = affiliation.value
if affiliation == 'none':
affiliation_name = Q_('?Group Chat Contact Affiliation:None')
elif affiliation == 'owner':
if plural:
affiliation_name = _('Owners')
else:
affiliation_name = _('Owner')
elif affiliation == 'admin':
if plural:
affiliation_name = _('Administrators')
else:
affiliation_name = _('Administrator')
elif affiliation == 'member':
if plural:
affiliation_name = _('Members')
else:
affiliation_name = _('Member')
return affiliation_name
def get_sorted_keys(adict):
keys = sorted(adict.keys())
return keys
def to_one_line(msg):
msg = msg.replace('\\', '\\\\')
msg = msg.replace('\n', '\\n')
# s1 = 'test\ntest\\ntest'
# s11 = s1.replace('\\', '\\\\')
# s12 = s11.replace('\n', '\\n')
# s12
# 'test\\ntest\\\\ntest'
return msg
def from_one_line(msg):
# (?<!\\) is a lookbehind assertion which asks anything but '\'
# to match the regexp that follows it
# So here match '\\n' but not if you have a '\' before that
expr = re.compile(r'(?<!\\)\\n')
msg = expr.sub('\n', msg)
msg = msg.replace('\\\\', '\\')
# s12 = 'test\\ntest\\\\ntest'
# s13 = re.sub('\n', s12)
# s14 s13.replace('\\\\', '\\')
# s14
# 'test\ntest\\ntest'
return msg
def get_uf_chatstate(chatstate):
"""
Remove chatstate jargon and returns user friendly messages
"""
if chatstate == 'active':
return _('is paying attention to the conversation')
if chatstate == 'inactive':
return _('is doing something else')
if chatstate == 'composing':
return _('is composing a message…')
if chatstate == 'paused':
#paused means he or she was composing but has stopped for a while
return _('paused composing a message')
if chatstate == 'gone':
return _('has closed the chat window or tab')
return ''
def exec_command(command, use_shell=False, posix=True):
"""
execute a command. if use_shell is True, we run the command as is it was
typed in a console. So it may be dangerous if you are not sure about what
is executed.
"""
if use_shell:
subprocess.Popen('%s &' % command, shell=True).wait()
else:
args = shlex.split(command, posix=posix)
process = subprocess.Popen(args)
app.thread_interface(process.wait)
def build_command(executable, parameter):
# we add to the parameter (can hold path with spaces)
# "" so we have good parsing from shell
parameter = parameter.replace('"', '\\"') # but first escape "
command = '%s "%s"' % (executable, parameter)
return command
def get_file_path_from_dnd_dropped_uri(uri):
path = urllib.parse.unquote(uri) # escape special chars
path = path.strip('\r\n\x00') # remove \r\n and NULL
# get the path to file
if re.match('^file:///[a-zA-Z]:/', path): # windows
path = path[8:] # 8 is len('file:///')
elif path.startswith('file://'): # nautilus, rox
path = path[7:] # 7 is len('file://')
elif path.startswith('file:'): # xffm
path = path[5:] # 5 is len('file:')
return path
def sanitize_filename(filename):
"""
Make sure the filename we will write does contain only acceptable and latin
characters, and is not too long (in that case hash it)
"""
# 48 is the limit
if len(filename) > 48:
hash_ = hashlib.md5(filename.encode('utf-8'))
filename = base64.b64encode(hash_.digest()).decode('utf-8')
# make it latin chars only
filename = punycode_encode(filename).decode('utf-8')
filename = filename.replace('/', '_')
if os.name == 'nt':
filename = filename.replace('?', '_').replace(':', '_')\
.replace('\\', '_').replace('"', "'").replace('|', '_')\
.replace('*', '_').replace('<', '_').replace('>', '_')
return filename
def reduce_chars_newlines(text, max_chars=0, max_lines=0):
"""
Cut the chars after 'max_chars' on each line and show only the first
'max_lines'
If any of the params is not present (None or 0) the action on it is not
performed
"""
def _cut_if_long(string_):
if len(string_) > max_chars:
string_ = string_[:max_chars - 3] + ''
return string_
if max_lines == 0:
lines = text.split('\n')
else:
lines = text.split('\n', max_lines)[:max_lines]
if max_chars > 0:
if lines:
lines = [_cut_if_long(e) for e in lines]
if lines:
reduced_text = '\n'.join(lines)
if reduced_text != text:
reduced_text += ''
else:
reduced_text = ''
return reduced_text
def get_account_status(account):
status = reduce_chars_newlines(account['status_line'], 100, 1)
return status
def get_contact_dict_for_account(account):
"""
Create a dict of jid, nick -> contact with all contacts of account.
Can be used for completion lists
"""
contacts_dict = {}
for jid in app.contacts.get_jid_list(account):
contact = app.contacts.get_contact_with_highest_priority(account, jid)
contacts_dict[jid] = contact
name = contact.name
if name in contacts_dict:
contact1 = contacts_dict[name]
del contacts_dict[name]
contacts_dict['%s (%s)' % (name, contact1.jid)] = contact1
contacts_dict['%s (%s)' % (name, jid)] = contact
elif contact.name:
if contact.name == app.get_nick_from_jid(jid):
del contacts_dict[jid]
contacts_dict[name] = contact
return contacts_dict
def play_sound(event):
if not app.settings.get('sounds_on'):
return
play_sound_file(app.settings.get_soundevent_settings(event)['path'])
def check_soundfile_path(file_, dirs=None):
"""
Check if the sound file exists
:param file_: the file to check, absolute or relative to 'dirs' path
:param dirs: list of knows paths to fallback if the file doesn't exists
(eg: ~/.gajim/sounds/, DATADIR/sounds...).
:return the path to file or None if it doesn't exists.
"""
if not file_:
return None
if Path(file_).exists():
return Path(file_)
if dirs is None:
dirs = [configpaths.get('MY_DATA'),
configpaths.get('DATA')]
for dir_ in dirs:
dir_ = dir_ / 'sounds' / file_
if dir_.exists():
return dir_
return None
def strip_soundfile_path(file_, dirs=None, abs_=True):
"""
Remove knowns paths from a sound file
Filechooser returns an absolute path.
If path is a known fallback path, we remove it.
So config has no hardcoded path to DATA_DIR and text in textfield is
shorther.
param: file_: the filename to strip
param: dirs: list of knowns paths from which the filename should be stripped
param: abs_: force absolute path on dirs
"""
if not file_:
return None
if dirs is None:
dirs = [configpaths.get('MY_DATA'),
configpaths.get('DATA')]
file_ = Path(file_)
name = file_.name
for dir_ in dirs:
dir_ = dir_ / 'sounds' / name
if abs_:
dir_ = dir_.absolute()
if file_ == dir_:
return name
return file_
def play_sound_file(path_to_soundfile):
path_to_soundfile = check_soundfile_path(path_to_soundfile)
if path_to_soundfile is None:
return
path_to_soundfile = str(path_to_soundfile)
if sys.platform == 'win32':
import winsound
try:
winsound.PlaySound(path_to_soundfile,
winsound.SND_FILENAME|winsound.SND_ASYNC)
except Exception:
log.exception('Sound Playback Error')
elif sys.platform == 'darwin':
try:
from AppKit import NSSound
except ImportError:
log.exception('Sound Playback Error')
return
sound = NSSound.alloc()
sound.initWithContentsOfFile_byReference_(path_to_soundfile, True)
sound.play()
elif app.is_installed('GSOUND'):
try:
app.gsound_ctx.play_simple({'media.filename' : path_to_soundfile})
except GLib.Error as error:
log.error('Could not play sound: %s', error.message)
def get_connection_status(account):
con = app.connections[account]
if con.state.is_reconnect_scheduled:
return 'error'
if con.state.is_connecting or con.state.is_connected:
return 'connecting'
if con.state.is_disconnected:
return 'offline'
return con.status
def get_global_show():
maxi = 0
for account in app.connections:
if not app.settings.get_account_setting(account,
'sync_with_global_status'):
continue
status = get_connection_status(account)
index = SHOW_LIST.index(status)
if index > maxi:
maxi = index
return SHOW_LIST[maxi]
def get_global_status_message():
maxi = 0
for account in app.connections:
if not app.settings.get_account_setting(account,
'sync_with_global_status'):
continue
status = app.connections[account].status
index = SHOW_LIST.index(status)
if index > maxi:
maxi = index
status_message = app.connections[account].status_message
return status_message
def statuses_unified():
"""
Test if all statuses are the same
"""
reference = None
for account in app.connections:
if not app.settings.get_account_setting(account,
'sync_with_global_status'):
continue
if reference is None:
reference = app.connections[account].status
elif reference != app.connections[account].status:
return False
return True
def get_icon_name_to_show(contact, account=None):
"""
Get the icon name to show in online, away, requested, etc
"""
if account and app.events.get_nb_roster_events(account, contact.jid):
return 'event'
if account and app.events.get_nb_roster_events(account,
contact.get_full_jid()):
return 'event'
if account and account in app.interface.minimized_controls and \
contact.jid in app.interface.minimized_controls[account] and app.interface.\
minimized_controls[account][contact.jid].get_nb_unread_pm() > 0:
return 'event'
if account and contact.jid in app.gc_connected[account]:
if app.gc_connected[account][contact.jid]:
return 'muc-active'
return 'muc-inactive'
if contact.jid.find('@') <= 0: # if not '@' or '@' starts the jid ==> agent
return contact.show
if contact.sub in ('both', 'to'):
return contact.show
if contact.ask == 'subscribe':
return 'requested'
transport = app.get_transport_name_from_jid(contact.jid)
if transport:
return contact.show
if contact.show in SHOW_LIST:
return contact.show
return 'notinroster'
def get_full_jid_from_iq(iq_obj):
"""
Return the full jid (with resource) from an iq
"""
jid = iq_obj.getFrom()
if jid is None:
return None
return parse_jid(str(iq_obj.getFrom()))
def get_jid_from_iq(iq_obj):
"""
Return the jid (without resource) from an iq
"""
jid = get_full_jid_from_iq(iq_obj)
return app.get_jid_without_resource(jid)
def get_auth_sha(sid, initiator, target):
"""
Return sha of sid + initiator + target used for proxy auth
"""
return hashlib.sha1(("%s%s%s" % (sid, initiator, target)).encode('utf-8')).\
hexdigest()
def remove_invalid_xml_chars(string_):
if string_:
string_ = re.sub(INVALID_XML_CHARS_REGEX, '', string_)
return string_
def get_random_string(count=16):
"""
Create random string of count length
WARNING: Don't use this for security purposes
"""
allowed = string.ascii_uppercase + string.digits
return ''.join(random.choice(allowed) for char in range(count))
@functools.lru_cache(maxsize=1)
def get_os_info():
info = 'N/A'
if sys.platform in ('win32', 'darwin'):
info = f'{platform.system()} {platform.release()}'
elif sys.platform == 'linux':
try:
import distro
info = distro.name(pretty=True)
except ImportError:
info = platform.system()
return info
def allow_showing_notification(account):
if not app.settings.get('show_notifications'):
return False
if app.settings.get('autopopupaway'):
return True
if app.account_is_available(account):
return True
return False
def allow_popup_window(account):
"""
Is it allowed to popup windows?
"""
autopopup = app.settings.get('autopopup')
autopopupaway = app.settings.get('autopopupaway')
if autopopup and (autopopupaway or \
app.connections[account].status in ('online', 'chat')):
return True
return False
def allow_sound_notification(account, sound_event):
if (app.settings.get('sounddnd') or
app.connections[account].status != 'dnd' and
app.settings.get_soundevent_settings(sound_event)['enabled']):
return True
return False
def get_notification_icon_tooltip_dict():
"""
Return a dict of the form {acct: {'show': show, 'message': message,
'event_lines': [list of text lines to show in tooltip]}
"""
# How many events before we show summarized, not per-user
max_ungrouped_events = 10
accounts = get_accounts_info()
# Gather events. (With accounts, when there are more.)
for account in accounts:
account_name = account['name']
account['event_lines'] = []
# Gather events per-account
pending_events = app.events.get_events(account=account_name)
messages, non_messages = {}, {}
total_messages, total_non_messages = 0, 0
for jid in pending_events:
for event in pending_events[jid]:
if event.type_.count('file') > 0:
# This is a non-messagee event.
messages[jid] = non_messages.get(jid, 0) + 1
total_non_messages = total_non_messages + 1
else:
# This is a message.
messages[jid] = messages.get(jid, 0) + 1
total_messages = total_messages + 1
# Display unread messages numbers, if any
if total_messages > 0:
if total_messages > max_ungrouped_events:
text = ngettext('%d message pending',
'%d messages pending',
total_messages,
total_messages,
total_messages)
account['event_lines'].append(text)
else:
for jid in messages:
text = ngettext('%d message pending',
'%d messages pending',
messages[jid],
messages[jid],
messages[jid])
contact = app.contacts.get_first_contact_from_jid(
account['name'], jid)
text += ' '
if jid in app.gc_connected[account['name']]:
text += _('from group chat %s') % (jid)
elif contact:
name = contact.get_shown_name()
text += _('from user %s') % (name)
else:
text += _('from %s') % (jid)
account['event_lines'].append(text)
# Display unseen events numbers, if any
if total_non_messages > 0:
if total_non_messages > max_ungrouped_events:
text = ngettext('%d event pending',
'%d events pending',
total_non_messages,
total_non_messages,
total_non_messages)
account['event_lines'].append(text)
else:
for jid in non_messages:
text = ngettext('%d event pending',
'%d events pending',
non_messages[jid],
non_messages[jid],
non_messages[jid])
text += ' ' + _('from user %s') % (jid)
account[account]['event_lines'].append(text)
return accounts
def get_accounts_info():
"""
Helper for notification icon tooltip
"""
accounts = []
accounts_list = sorted(app.contacts.get_accounts())
for account in accounts_list:
status = get_connection_status(account)
message = app.connections[account].status_message
single_line = get_uf_show(status)
if message is None:
message = ''
else:
message = message.strip()
if message != '':
single_line += ': ' + message
account_label = app.get_account_label(account)
accounts.append({'name': account,
'account_label': account_label,
'status_line': single_line,
'show': status,
'message': message})
return accounts
def get_current_show(account):
if account not in app.connections:
return 'offline'
return app.connections[account].status
def get_optional_features(account):
features = []
if app.settings.get_account_setting(account, 'request_user_data'):
features.append(Namespace.MOOD + '+notify')
features.append(Namespace.ACTIVITY + '+notify')
features.append(Namespace.TUNE + '+notify')
features.append(Namespace.LOCATION + '+notify')
features.append(Namespace.NICK + '+notify')
if app.connections[account].get_module('Bookmarks').nativ_bookmarks_used:
features.append(Namespace.BOOKMARKS_1 + '+notify')
elif app.connections[account].get_module('Bookmarks').pep_bookmarks_used:
features.append(Namespace.BOOKMARKS + '+notify')
if app.is_installed('AV'):
features.append(Namespace.JINGLE_RTP)
features.append(Namespace.JINGLE_RTP_AUDIO)
features.append(Namespace.JINGLE_RTP_VIDEO)
features.append(Namespace.JINGLE_ICE_UDP)
# Give plugins the possibility to add their features
app.plugin_manager.extension_point('update_caps', account, features)
return features
def jid_is_blocked(account, jid):
con = app.connections[account]
return jid in con.get_module('Blocking').blocked
def get_subscription_request_msg(account=None):
s = app.settings.get_account_setting(account, 'subscription_request_msg')
if s:
return s
s = _('I would like to add you to my contact list.')
if account:
s = _('Hello, I am $name.') + ' ' + s
s = Template(s).safe_substitute({'name': app.nicks[account]})
return s
def get_user_proxy(account):
proxy_name = app.settings.get_account_setting(account, 'proxy')
if not proxy_name:
return None
return get_proxy(proxy_name)
def get_proxy(proxy_name):
try:
settings = app.settings.get_proxy_settings(proxy_name)
except ValueError:
return None
username, password = None, None
if settings['useauth']:
username, password = settings['user'], settings['pass']
return ProxyData(type=settings['type'],
host='%s:%s' % (settings['host'], settings['port']),
username=username,
password=password)
def version_condition(current_version, required_version):
if V(current_version) < V(required_version):
return False
return True
def get_available_emoticon_themes():
files = []
for folder in configpaths.get('EMOTICONS').iterdir():
if not folder.is_dir():
continue
files += [theme for theme in folder.iterdir() if theme.is_file()]
my_emots = configpaths.get('MY_EMOTS')
if my_emots.is_dir():
files += list(my_emots.iterdir())
emoticons_themes = ['font']
emoticons_themes += [file.stem for file in files if file.suffix == '.png']
return sorted(emoticons_themes)
def call_counter(func):
def helper(self, restart=False):
if restart:
self._connect_machine_calls = 0
self._connect_machine_calls += 1
return func(self)
return helper
def load_json(path, key=None, default=None):
try:
with path.open('r') as file:
json_dict = json.loads(file.read())
except Exception:
log.exception('Parsing error')
return default
if key is None:
return json_dict
return json_dict.get(key, default)
def ignore_contact(account, jid):
jid = str(jid)
known_contact = app.contacts.get_contacts(account, jid)
ignore = app.settings.get_account_setting(account,
'ignore_unknown_contacts')
if ignore and not known_contact:
log.info('Ignore unknown contact %s', jid)
return True
return False
class AdditionalDataDict(collections.UserDict):
def __init__(self, initialdata=None):
collections.UserDict.__init__(self, initialdata)
@staticmethod
def _get_path_childs(full_path):
path_childs = [full_path]
if ':' in full_path:
path_childs = full_path.split(':')
return path_childs
def set_value(self, full_path, key, value):
path_childs = self._get_path_childs(full_path)
_dict = self.data
for path in path_childs:
try:
_dict = _dict[path]
except KeyError:
_dict[path] = {}
_dict = _dict[path]
_dict[key] = value
def get_value(self, full_path, key, default=None):
path_childs = self._get_path_childs(full_path)
_dict = self.data
for path in path_childs:
try:
_dict = _dict[path]
except KeyError:
return default
try:
return _dict[key]
except KeyError:
return default
def remove_value(self, full_path, key):
path_childs = self._get_path_childs(full_path)
_dict = self.data
for path in path_childs:
try:
_dict = _dict[path]
except KeyError:
return
try:
del _dict[key]
except KeyError:
return
def copy(self):
return copy.deepcopy(self)
def save_roster_position(window):
if not app.settings.get('save-roster-position'):
return
if app.is_display(Display.WAYLAND):
return
x_pos, y_pos = window.get_position()
log.debug('Save roster position: %s %s', x_pos, y_pos)
app.settings.set('roster_x-position', x_pos)
app.settings.set('roster_y-position', y_pos)
class Singleton(type):
_instances = {} # type: Dict[Any, Any]
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(
*args, **kwargs)
return cls._instances[cls]
def delay_execution(milliseconds):
# Delay the first call for `milliseconds`
# ignore all other calls while the delay is active
def delay_execution_decorator(func):
@wraps(func)
def func_wrapper(*args, **kwargs):
def timeout_wrapper():
func(*args, **kwargs)
delattr(func_wrapper, 'source_id')
if hasattr(func_wrapper, 'source_id'):
return
func_wrapper.source_id = GLib.timeout_add(
milliseconds, timeout_wrapper)
return func_wrapper
return delay_execution_decorator
def event_filter(filter_):
def event_filter_decorator(func):
@wraps(func)
def func_wrapper(self, event, *args, **kwargs):
for attr in filter_:
if '=' in attr:
attr1, attr2 = attr.split('=')
else:
attr1, attr2 = attr, attr
try:
if getattr(event, attr1) != getattr(self, attr2):
return None
except AttributeError:
if getattr(event, attr1) != getattr(self, '_%s' % attr2):
return None
return func(self, event, *args, **kwargs)
return func_wrapper
return event_filter_decorator
def catch_exceptions(func):
@wraps(func)
def func_wrapper(self, *args, **kwargs):
try:
result = func(self, *args, **kwargs)
except Exception as error:
log.exception(error)
return None
return result
return func_wrapper
def parse_uri_actions(uri):
uri = uri[5:]
if '?' not in uri:
return 'message', {'jid': uri}
jid, action = uri.split('?', 1)
data = {'jid': jid}
if ';' in action:
action, keys = action.split(';', 1)
action_keys = keys.split(';')
for key in action_keys:
if key.startswith('subject='):
data['subject'] = unquote(key[8:])
elif key.startswith('body='):
data['body'] = unquote(key[5:])
elif key.startswith('thread='):
data['thread'] = key[7:]
return action, data
def parse_uri(uri):
if uri.startswith('xmpp:'):
action, data = parse_uri_actions(uri)
try:
validate_jid(data['jid'])
return URI(type=URIType.XMPP,
action=URIAction(action),
data=data)
except ValueError:
# Unknown action
return URI(type=URIType.UNKNOWN)
if uri.startswith('mailto:'):
uri = uri[7:]
return URI(type=URIType.MAIL, data=uri)
if uri.startswith('tel:'):
uri = uri[4:]
return URI(type=URIType.TEL, data=uri)
if STH_AT_STH_DOT_STH_REGEX.match(uri):
return URI(type=URIType.AT, data=uri)
if uri.startswith('geo:'):
location = uri[4:]
lat, _, lon = location.partition(',')
if not lon:
return URI(type=URIType.UNKNOWN, data=uri)
if Gio.AppInfo.get_default_for_uri_scheme('geo'):
return URI(type=URIType.GEO, data=uri)
uri = geo_provider_from_location(lat, lon)
return URI(type=URIType.GEO, data=uri)
if uri.startswith('file://'):
return URI(type=URIType.FILE, data=uri)
return URI(type=URIType.WEB, data=uri)
@catch_exceptions
def open_uri(uri, account=None):
if not isinstance(uri, URI):
uri = parse_uri(uri)
if uri.type == URIType.FILE:
open_file(uri.data)
elif uri.type == URIType.TEL:
if sys.platform == 'win32':
webbrowser.open(f'tel:{uri.data}')
else:
Gio.AppInfo.launch_default_for_uri(f'tel:{uri.data}')
elif uri.type == URIType.MAIL:
if sys.platform == 'win32':
webbrowser.open(f'mailto:{uri.data}')
else:
Gio.AppInfo.launch_default_for_uri(f'mailto:{uri.data}')
elif uri.type in (URIType.WEB, URIType.GEO):
if sys.platform == 'win32':
webbrowser.open(uri.data)
else:
Gio.AppInfo.launch_default_for_uri(uri.data)
elif uri.type == URIType.AT:
app.interface.new_chat_from_jid(account, uri.data)
elif uri.type == URIType.XMPP:
if account is None:
log.warning('Account must be specified to open XMPP uri')
return
if uri.action == URIAction.JOIN:
app.app.activate_action(
'groupchat-join',
GLib.Variant('as', [account, uri.data['jid']]))
elif uri.action == URIAction.MESSAGE:
app.interface.new_chat_from_jid(account, uri.data['jid'],
message=uri.data.get('body'))
else:
log.warning('Cant open URI: %s', uri)
else:
log.warning('Cant open URI: %s', uri)
@catch_exceptions
def open_file(path):
if os.name == 'nt':
os.startfile(path)
else:
# Call str() to make it work with pathlib.Path
path = str(path)
if not path.startswith('file://'):
path = 'file://' + path
Gio.AppInfo.launch_default_for_uri(path)
def geo_provider_from_location(lat, lon):
return ('https://www.openstreetmap.org/?'
'mlat=%s&mlon=%s&zoom=16') % (lat, lon)
def get_resource(account):
resource = app.settings.get_account_setting(account, 'resource')
if not resource:
return None
resource = Template(resource).safe_substitute(
{'hostname': socket.gethostname(),
'rand': get_random_string()})
app.settings.set_account_setting(account, 'resource', resource)
return resource
def get_default_muc_config():
return {
# XEP-0045 options
'muc#roomconfig_allowinvites': True,
'muc#roomconfig_publicroom': False,
'muc#roomconfig_membersonly': True,
'muc#roomconfig_persistentroom': True,
'muc#roomconfig_whois': 'anyone',
'muc#roomconfig_moderatedroom': False,
# Ejabberd options
'allow_voice_requests': False,
'public_list': False,
# Prosody options
'{http://prosody.im/protocol/muc}roomconfig_allowmemberinvites': True,
'muc#roomconfig_enablearchiving': True,
}
def validate_jid(jid, type_=None):
try:
jid = JID.from_string(str(jid))
except InvalidJid as error:
raise ValueError(error)
if type_ is None:
return jid
if type_ == 'bare' and jid.is_bare:
return jid
if type_ == 'full' and jid.is_full:
return jid
if type_ == 'domain' and jid.is_domain:
return jid
raise ValueError('Not a %s JID' % type_)
def to_user_string(error):
text = error.get_text(get_rfc5646_lang())
if text:
return text
condition = error.condition
if error.app_condition is not None:
return '%s (%s)' % (condition, error.app_condition)
return condition
def get_groupchat_name(con, jid):
name = con.get_module('Bookmarks').get_name_from_bookmark(jid)
if name:
return name
disco_info = app.storage.cache.get_last_disco_info(jid)
if disco_info is not None:
if disco_info.muc_name:
return disco_info.muc_name
return jid.split('@')[0]
def is_affiliation_change_allowed(self_contact, contact, target_aff):
if contact.affiliation.value == target_aff:
# Contact has already the target affiliation
return False
if self_contact.affiliation.is_owner:
return True
if not self_contact.affiliation.is_admin:
return False
if target_aff in ('admin', 'owner'):
# Admin cant edit admin/owner list
return False
return self_contact.affiliation > contact.affiliation
def is_role_change_allowed(self_contact, contact):
if self_contact.role < Role.MODERATOR:
return False
return self_contact.affiliation >= contact.affiliation
def get_tls_error_phrase(tls_error):
phrase = GIO_TLS_ERRORS.get(tls_error)
if phrase is None:
return GIO_TLS_ERRORS.get(Gio.TlsCertificateFlags.GENERIC_ERROR)
return phrase
class Observable:
def __init__(self, log_=None):
self._log = log_
self._callbacks = defaultdict(list)
def disconnect_signals(self):
self._callbacks = defaultdict(list)
def disconnect(self, object_):
for signal_name, handlers in self._callbacks.items():
for handler in list(handlers):
func = handler()
if func is None or func.__self__ == object_:
self._callbacks[signal_name].remove(handler)
def connect(self, signal_name, func):
if inspect.ismethod(func):
weak_func = weakref.WeakMethod(func)
elif inspect.isfunction(func):
weak_func = weakref.ref(func)
self._callbacks[signal_name].append(weak_func)
def notify(self, signal_name, *args, **kwargs):
if self._log is not None:
self._log.info('Signal: %s', signal_name)
callbacks = self._callbacks.get(signal_name, [])
for func in list(callbacks):
if func() is None:
self._callbacks[signal_name].remove(func)
continue
func()(self, signal_name, *args, **kwargs)
def write_file_async(path, data, callback, user_data=None):
file = Gio.File.new_for_path(str(path))
file.create_async(Gio.FileCreateFlags.PRIVATE,
GLib.PRIORITY_DEFAULT,
None,
_on_file_created,
(callback, data, user_data))
def _on_file_created(file, result, user_data):
callback, data, user_data = user_data
try:
outputstream = file.create_finish(result)
except GLib.Error as error:
callback(False, error, user_data)
return
# Pass data as user_data to the callback, because
# write_all_async() takes not reference to the data
# and python gc collects it before the data are written
outputstream.write_all_async(data,
GLib.PRIORITY_DEFAULT,
None,
_on_write_finished,
(callback, data, user_data))
def _on_write_finished(outputstream, result, user_data):
callback, _data, user_data = user_data
try:
successful, _bytes_written = outputstream.write_all_finish(result)
except GLib.Error as error:
callback(False, error, user_data)
else:
callback(successful, None, user_data)
def load_file_async(path, callback, user_data=None):
file = Gio.File.new_for_path(str(path))
file.load_contents_async(None,
_on_load_finished,
(callback, user_data))
def _on_load_finished(file, result, user_data):
callback, user_data = user_data
try:
_successful, contents, _etag = file.load_contents_finish(result)
except GLib.Error as error:
callback(None, error, user_data)
else:
callback(contents, None, user_data)
def convert_gio_to_openssl_cert(cert):
cert = load_certificate(FILETYPE_PEM, cert.props.certificate_pem.encode())
return cert
def get_custom_host(account):
if not app.settings.get_account_setting(account, 'use_custom_host'):
return None
host = app.settings.get_account_setting(account, 'custom_host')
port = app.settings.get_account_setting(account, 'custom_port')
type_ = app.settings.get_account_setting(account, 'custom_type')
if host.startswith('ws://') or host.startswith('wss://'):
protocol = ConnectionProtocol.WEBSOCKET
else:
host = f'{host}:{port}'
protocol = ConnectionProtocol.TCP
return (host, protocol, ConnectionType(type_))
def warn_about_plain_connection(account, connection_types):
warn = app.settings.get_account_setting(
account, 'confirm_unencrypted_connection')
for type_ in connection_types:
if type_.is_plain and warn:
return True
return False
def get_idle_status_message(state, status_message):
message = app.settings.get(f'auto{state}_message')
if not message:
message = status_message
else:
message = message.replace('$S', '%(status)s')
message = message.replace('$T', '%(time)s')
message = message % {
'status': status_message,
'time': app.settings.get(f'auto{state}time')
}
return message
def should_log(account, jid):
"""
Should conversations between a local account and a remote jid be logged?
"""
no_log_for = app.settings.get_account_setting(account, 'no_log_for')
if not no_log_for:
no_log_for = ''
no_log_for = no_log_for.split()
return (account not in no_log_for) and (jid not in no_log_for)
def ask_for_status_message(status, signin=False):
if status is None:
# We try to change the message
return True
if signin:
return app.settings.get('ask_online_status')
if status == 'offline':
return app.settings.get('ask_offline_status')
return app.settings.get('always_ask_for_status_message')
def get_group_chat_nick(account, room_jid):
nick = app.nicks[account]
client = app.get_client(account)
bookmark = client.get_module('Bookmarks').get_bookmark(room_jid)
if bookmark is not None:
if bookmark.nick is not None:
nick = bookmark.nick
return nick
def get_muc_context(jid):
disco_info = app.storage.cache.get_last_disco_info(jid)
if disco_info is None:
return None
if (disco_info.muc_is_members_only and disco_info.muc_is_nonanonymous):
return 'private'
return 'public'