Added YAML output

This commit is contained in:
emdee 2022-09-30 20:09:14 +00:00
parent cc5f2b888e
commit f834a7df50
2 changed files with 270 additions and 188 deletions

View File

@ -3,10 +3,10 @@
Read and manipulate tox profile files. It started as a simple script from
<https://stackoverflow.com/questions/30901873/what-format-are-tox-files-stored-in>
For the moment logging_tox_savefile.py just reads a Tox profile
and prints to stdout various things that it finds. Later it can
be extended to print out JSON or YAML, and then extended to
accept JSON or YAML to write a profile.
For the moment logging_tox_savefile.py just reads a Tox profile and
prints to stdout various things that it finds. Then it writes what it
found in YAML to stderr. Later it can be extended to print out JSON
or YAML, and then extended to accept JSON or YAML to write a profile.
## Requirements
@ -23,6 +23,9 @@ There is an updated and bugfixed version in:
If you want to read the GROUPS section, you need Python msgpack:
<https://pypi.org/project/msgpack/>
If you want to write in YAML, you need Python yaml:
<https://pypi.org/project/PyYAML/>
If you have coloredlogs installed it will make use of it:
<https://pypi.org/project/coloredlogs/>

View File

@ -1,5 +1,10 @@
# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*-
"""
Reads a tox profile and prints out information on what's in there to stdout.
Then it write what it found in YAML to stderr.
"""
# originally from:
# https://stackoverflow.com/questions/30901873/what-format-are-tox-files-stored-in
@ -15,6 +20,11 @@ try:
except ImportError as e:
msgpack = None
try:
import yaml
except ImportError as e:
yaml = None
try:
# https://github.com/toxygen-project/toxygen
from wrapper.toxencryptsave import ToxEncryptSave
@ -36,6 +46,7 @@ except ImportError as e:
coloredlogs = False
LOG = logging.getLogger()
bUSE_NMAP = True
#messenger.c
MESSENGER_STATE_TYPE_NOSPAMKEYS = 1
@ -91,7 +102,235 @@ def bin_to_hex(raw_id, length=None):
res = ''.join('{:02x}'.format(raw_id[i]) for i in range(length))
return res.upper()
def lProcessFriends(state, index, length):
"""Friend:
The integers in this structure are stored in Big Endian format.
Length Contents
1 uint8_t Status
32 Long term public key
1024 Friend request message as a byte string
1 PADDING
2 uint16_t Size of the friend request message
128 Name as a byte string
2 uint16_t Size of the name
1007 Status message as a byte string
1 PADDING
2 uint16_t Size of the status message
1 uint8_t User status (see also: USERSTATUS)
3 PADDING
4 uint32_t Nospam (only used for sending a friend request)
8 uint64_t Last seen time
"""
dStatus = { # Status Meaning
0: 'Not a friend',
1: 'Friend added',
2: 'Friend request sent',
3: 'Confirmed friend',
4: 'Friend online'
}
result = state[index + 8:index + 8 + length]
slen = 1+32+1024+1+2+128+2+1007+1+2+1+3+4+8 # 2216
LOG.debug(f"TODO process_chunk {length // slen} FRIENDS {length} {length % 2216}")
assert length % slen == 0
lIN = []
for i in range(length // slen):
delta = i*slen
status = struct.unpack_from(">b", result, delta)[0]
o = delta+1; l = 32
pk = bin_to_hex(result[o:o+l], l)
o = delta+1+32+1024+1+2+128; l = 2
nsize = struct.unpack_from(">H", result, o)[0]
o = delta+1+32+1024+1+2; l = 128
name = str(result[o:o+nsize], 'utf-8')
o = delta+1+32+1024+1+2+128+2+1007; l = 2
msize = struct.unpack_from(">H", result, o)[0]
o = delta+1+32+1024+1+2+128+2; l = 1007
mame = str(result[o:o+msize], 'utf-8')
LOG.info(f"Friend #{i} {dStatus[status]} {name} {pk}")
lIN += [{"Status": dStatus[status],
"Name": name,
"Pk": pk}]
return lIN
def lProcessGroups(state, index, length):
result = state[index + 8:index + 8 + length]
lIN = []
i = 0
if msgpack:
try:
groups = msgpack.loads(result, raw=True)
LOG.debug(f"TODO process_chunk Groups len={len(groups)}")
for group in groups:
assert len(group) == 7, group
i += 1
state_values, \
state_bin, \
topic_info, \
mod_list, \
keys, \
self_info, \
saved_peers, = group
assert len(state_values) == 8, state_values
assert len(state_bin) == 5, state_bin
assert len(topic_info) == 6, topic_info
topic_info_topic = str(topic_info[3], 'utf-8')
LOG.info(f"lProcessGroups #{i} topic_info_topic={topic_info_topic}")
assert len(mod_list) == 2, mod_list
num_moderators = mod_list[0]
LOG.debug(f"lProcessGroups #{i} num moderators={mod_list[0]}")
#define CRYPTO_SIGN_PUBLIC_KEY_SIZE 32
mods = mod_list[1]
assert len(mods) % 32 == 0, len(mods)
assert len(mods) == num_moderators * 32, len(mods)
lMODS = []
for j in range(num_moderators):
mod = mods[j*32:j*32 + 32]
LOG.info(f"lProcessGroups group#{i} mod#{j} sig_pk={bin_to_hex(mod)}")
lMODS += [{"Sig_pk": bin_to_hex(mod)}]
if lMODS: lIN += [{"Moderators": lMODS}]
assert len(keys) == 4, keys
LOG.debug(f"lProcessGroups #{i} {repr(list(map(len, keys)))}")
chat_public_key, chat_secret_key, self_public_key, self_secret_key = keys
LOG.info(f"lProcessGroups #{i} chat_public_key={bin_to_hex(chat_public_key)}")
lIN += [{"Chat_public_key": bin_to_hex(chat_public_key)}]
if int(bin_to_hex(chat_secret_key), 16) != 0:
LOG.info(f"lProcessGroups #{i} chat_secret_key={bin_to_hex(chat_secret_key)}")
lIN += [{"Chat_secret_key": bin_to_hex(chat_secret_key)}]
LOG.info(f"lProcessGroups #{i} self_public_key={bin_to_hex(self_public_key)}")
lIN += [{"Self_public_key": bin_to_hex(self_public_key)}]
LOG.info(f"lProcessGroups #{i} self_secret_key={bin_to_hex(self_secret_key)}")
lIN += [{"Self_secret_key": bin_to_hex(self_secret_key)}]
assert len(self_info) == 4, self_info
self_nick_len, self_role, self_status, self_nick = self_info
self_nick = str(self_nick, 'utf-8')
LOG.info(f"lProcessGroups #{i} self_nick={repr(self_nick)}")
lIN += [{"Self_nick": self_nick}]
assert len(saved_peers) == 2, saved_peers
except Exception as e:
LOG.warn(f"process_chunk Groups #{i} error={e}")
else:
LOG.debug(f"TODO process_chunk Groups = no msgpack bytes={length}")
return lIN
def lProcessTcpRelay(state, index, length):
"""Node Info (packed node format)
The Node Info data structure contains a Transport Protocol, a Socket
Address, and a Public Key. This is sufficient information to start
communicating with that node. The binary representation of a Node Info is
called the packed node format.
Length Type Contents
1 bit Transport Protocol UDP = 0, TCP = 1
7 bit Address Family 2 = IPv4, 10 = IPv6
4 | 16 IP address 4 bytes for IPv4, 16 bytes for IPv6
2 Port Number Port number
32 Public Key Node ID
"""
delta = 0
relay = 0
result = state[index + 8:index + 8 + length]
lIN = []
while length > 0:
status = struct.unpack_from(">B", result, delta)[0]
if status >= 128:
ipv = 'TCP'
af = status - 128
else:
ipv = 'UDP'
af = status
if af == 2:
af = 'IPv4'
alen = 4
ipaddr = inet_ntop(AF_INET, result[delta+1:delta+1+alen])
else:
af = 'IPv6'
alen = 16
ipaddr = inet_ntop(AF_INET6, result[delta+1:delta+1+alen])
total = 1 + alen + 2 + 32
port = int(struct.unpack_from(">H", result, delta+1+alen)[0])
pk = bin_to_hex(result[delta+1+alen+2:delta+1+alen+2+32], 32)
LOG.info(f"DHTnode #{relay} bytes={length} status={status} ip={ipv} af={af} ip={ipaddr} port={port} pk={pk}")
lIN += [{"Bytes": length,
"Status": status,
"Ip": ipv,
"Af": af,
"Ip": ipaddr,
"Port": port,
"Pk": pk}]
if bUSE_NMAP:
cmd = f"nmap -Pn -n -sT -p T:{port} {ipaddr}"
delta += total
length -= total
relay += 1
return lIN
def lProcessDHTnodes(state, index, length):
relay = 0
result = state[index + 8:index + 8 + length]
status = struct.unpack_from("<L", result, 0)[0]
# 4 uint32_t (0x159000D)
assert status == 0x159000D
length -= 4
delta = 4
lIN = []
while length > 0:
slen = struct.unpack_from("<L", result, delta)[0]
stype = struct.unpack_from("<H", result, delta+4)[0]
smark = struct.unpack_from("<H", result, delta+6)[0]
assert smark == 0x11CE
total = slen + 4 + 2 + 2
subtotal = 0
offset = delta
while offset < slen: #loop over nodes
status = struct.unpack_from(">B", result, offset+8)[0]
assert status < 12
ipv = 'UDP'
if status == 2:
af = 'IPv4'
alen = 4
ipaddr = inet_ntop(AF_INET, result[offset+8+1:offset+8+1+alen])
else:
af = 'IPv6'
alen = 16
ipaddr = inet_ntop(AF_INET6, result[offset+8+1:offset+8+1+alen])
subtotal = 1 + alen + 2 + 32
port = int(struct.unpack_from(">H", result, offset+8+1+alen)[0])
pk = bin_to_hex(result[offset+8+1+alen+2:offset+8+1+alen+2+32], 32)
LOG.info(f"DHTnode #{relay} status={status} ipaddr={ipaddr} port={port} {pk}")
lIN += [{f"status": f"{status}",
f"ipaddr": f"{ipaddr}",
f"port": f"{port}",
f"pk": f"{pk}"}]
if bUSE_NMAP:
cmd = f"nmap -Pn -n -sU -p U:{port} {ipaddr}"
offset += subtotal
delta += total
length -= total
relay += 1
return lIN
lOUT = []
def process_chunk(index, state):
global lOUT
if index + 8 >= len(state):
return
length = struct.unpack_from("<H", state, index)[0]
@ -104,210 +343,46 @@ def process_chunk(index, state):
public_key = bin_to_hex(result[4:36])
private_key = bin_to_hex(result[36:68])
LOG.info(f"nospam = {nospam}")
lOUT += [{f"Nospam": f"{nospam}"}]
LOG.info(f"public_key = {public_key}")
lOUT += [{f"Public_key": f"{public_key}"}]
LOG.info(f"private_key = {private_key}")
lOUT += [{f"Private_key": f"{private_key}"}]
elif data_type == MESSENGER_STATE_TYPE_DHT:
relay = 0
result = state[index + 8:index + 8 + length]
status = struct.unpack_from("<L", result, 0)[0]
# 4 uint32_t (0x159000D)
assert status == 0x159000D
LOG.debug(f"process_chunk {dSTATE_TYPE[data_type]} length={length}")
length -= 4
delta = 4
while length > 0:
slen = struct.unpack_from("<L", result, delta)[0]
stype = struct.unpack_from("<H", result, delta+4)[0]
smark = struct.unpack_from("<H", result, delta+6)[0]
assert smark == 0x11CE
total = slen + 4 + 2 + 2
subtotal = 0
offset = delta
while offset < slen: #loop over nodes
status = struct.unpack_from(">B", result, offset+8)[0]
assert status < 12
ipv = 'UDP'
if status == 2:
af = 'IPv4'
alen = 4
ipaddr = inet_ntop(AF_INET, result[offset+8+1:offset+8+1+alen])
else:
af = 'IPv6'
alen = 16
ipaddr = inet_ntop(AF_INET6, result[offset+8+1:offset+8+1+alen])
subtotal = 1 + alen + 2 + 32
port = struct.unpack_from(">H", result, offset+8+1+alen)[0]
pk = bin_to_hex(result[offset+8+1+alen+2:offset+8+1+alen+2+32], 32)
LOG.info(f"{dSTATE_TYPE[data_type]} #{relay} status={status} ipaddr={ipaddr} port={port} {pk}")
offset += subtotal
delta += total
length -= total
relay += 1
lIN = lProcessDHTnodes(state, index, length)
if lIN: lOUT += [{"DHT_nodes": lIN}]
elif data_type == MESSENGER_STATE_TYPE_FRIENDS:
"""Friend:
The integers in this structure are stored in Big Endian format.
Length Contents
1 uint8_t Status
32 Long term public key
1024 Friend request message as a byte string
1 PADDING
2 uint16_t Size of the friend request message
128 Name as a byte string
2 uint16_t Size of the name
1007 Status message as a byte string
1 PADDING
2 uint16_t Size of the status message
1 uint8_t User status (see also: USERSTATUS)
3 PADDING
4 uint32_t Nospam (only used for sending a friend request)
8 uint64_t Last seen time
"""
dStatus = { # Status Meaning
0: 'Not a friend ',
1: 'Friend added ',
2: 'Friend request sent ',
3: 'Confirmed friend ',
4: 'Friend online '
}
result = state[index + 8:index + 8 + length]
slen = 1+32+1024+1+2+128+2+1007+1+2+1+3+4+8 # 2216
LOG.debug(f"TODO process_chunk {length // slen} FRIENDS {length} {length % 2216}")
assert length % slen == 0
for i in range(length // slen):
delta = i*slen
status = struct.unpack_from(">b", result, delta)[0]
o = delta+1; l = 32
pk = bin_to_hex(result[o:o+l], l)
o = delta+1+32+1024+1+2+128; l = 2
nsize = struct.unpack_from(">H", result, o)[0]
o = delta+1+32+1024+1+2; l = 128
name = str(result[o:o+nsize], 'utf-8')
o = delta+1+32+1024+1+2+128+2+1007; l = 2
msize = struct.unpack_from(">H", result, o)[0]
o = delta+1+32+1024+1+2+128+2; l = 1007
mame = str(result[o:o+msize], 'utf-8')
LOG.info(f"Friend #{i} {dStatus[status]} {name} {pk}")
lIN = lProcessFriends(state, index, length)
if lIN: lOUT += [{"Friends": lIN}]
elif data_type == MESSENGER_STATE_TYPE_NAME:
LOG.info("User name = " +str(state[index + 8:index + 8 + length], 'utf-8'))
name = str(state[index + 8:index + 8 + length], 'utf-8')
LOG.info("Nick name = " +name)
lOUT += [{"Nick_name": name}]
elif data_type == MESSENGER_STATE_TYPE_STATUSMESSAGE:
LOG.info(f"StatusMessage = {str(state[index + 8:index + 8 + length], 'utf-8')}")
mess = str(state[index + 8:index + 8 + length], 'utf-8')
LOG.info(f"StatusMessage = " +mess)
lOUT += [{"Status_message": mess}]
elif data_type == MESSENGER_STATE_TYPE_STATUS:
# 1 uint8_t status (0 = online, 1 = away, 2 = busy)
dStatus = {0: 'online', 1: 'away', 2: 'busy'}
status = struct.unpack_from(">b", state, index)[0]
LOG.info(f"{dSTATE_TYPE[data_type]} = {dStatus[status]}")
status = dStatus[status]
LOG.info(f"{dSTATE_TYPE[data_type]} = " +status)
lOUT += [{f"Online_status": status}]
elif data_type == MESSENGER_STATE_TYPE_GROUPS:
result = state[index + 8:index + 8 + length]
if msgpack:
try:
groups = msgpack.loads(result, raw=True)
LOG.debug(f"TODO process_chunk {dSTATE_TYPE[data_type]} len={len(groups)}")
i = 0
for group in groups:
assert len(group) == 7, group
i += 1
state_values, \
state_bin, \
topic_info, \
mod_list, \
keys, \
self_info, \
saved_peers, = group
lIN = lProcessGroups(state, index, length)
if lIN: lOUT += [{"Groups": lIN}]
assert len(state_values) == 8, state_values
assert len(state_bin) == 5, state_bin
assert len(topic_info) == 6, topic_info
topic_info_topic = str(topic_info[3], 'utf-8')
LOG.info(f"{dSTATE_TYPE[data_type]} #{i} topic_info_topic={topic_info_topic}")
assert len(mod_list) == 2, mod_list
num_moderators = mod_list[0]
LOG.debug(f"{dSTATE_TYPE[data_type]} #{i} num moderators={mod_list[0]}")
#define CRYPTO_SIGN_PUBLIC_KEY_SIZE 32
mods = mod_list[1]
assert len(mods) % 32 == 0, len(mods)
assert len(mods) == num_moderators * 32, len(mods)
for j in range(num_moderators):
mod = mods[j*32:j*32 + 32]
LOG.info(f"{dSTATE_TYPE[data_type]} group#{i} mod#{j} sig_pk={bin_to_hex(mod)}")
assert len(keys) == 4, keys
LOG.debug(f"{dSTATE_TYPE[data_type]} #{i} {repr(list(map(len, keys)))}")
chat_public_key, chat_secret_key, self_public_key, self_secret_key = keys
LOG.info(f"{dSTATE_TYPE[data_type]} #{i} chat_public_key={bin_to_hex(chat_public_key)}")
if int(bin_to_hex(chat_secret_key), 16) != 0:
LOG.info(f"{dSTATE_TYPE[data_type]} #{i} chat_secret_key={bin_to_hex(chat_secret_key)}")
LOG.info(f"{dSTATE_TYPE[data_type]} #{i} self_public_key={bin_to_hex(self_public_key)}")
LOG.info(f"{dSTATE_TYPE[data_type]} #{i} self_secret_key={bin_to_hex(self_secret_key)}")
assert len(self_info) == 4, self_info
self_nick_len, self_role, self_status, self_nick = self_info
self_nick = str(self_nick, 'utf-8')
LOG.info(f"{dSTATE_TYPE[data_type]} #{i} self_nick={repr(self_nick)}")
assert len(saved_peers) == 2, saved_peers
except Exception as e:
LOG.warn(f"process_chunk {dSTATE_TYPE[data_type]} #{i} error={e}")
else:
LOG.debug(f"TODO process_chunk {dSTATE_TYPE[data_type]} #{i} bytes={length}")
elif data_type == MESSENGER_STATE_TYPE_TCP_RELAY:
"""Node Info (packed node format)
The Node Info data structure contains a Transport Protocol, a Socket
Address, and a Public Key. This is sufficient information to start
communicating with that node. The binary representation of a Node Info is
called the packed node format.
Length Type Contents
1 bit Transport Protocol UDP = 0, TCP = 1
7 bit Address Family 2 = IPv4, 10 = IPv6
4 | 16 IP address 4 bytes for IPv4, 16 bytes for IPv6
2 Port Number Port number
32 Public Key Node ID
"""
delta = 0
relay = 0
result = state[index + 8:index + 8 + length]
while length > 0:
status = struct.unpack_from(">B", result, delta)[0]
if status >= 128:
ipv = 'TCP'
af = status - 128
else:
ipv = 'UDP'
af = status
if af == 2:
af = 'IPv4'
alen = 4
ipaddr = inet_ntop(AF_INET, result[delta+1:delta+1+alen])
else:
af = 'IPv6'
alen = 16
ipaddr = inet_ntop(AF_INET6, result[delta+1:delta+1+alen])
total = 1 + alen + 2 + 32
port = struct.unpack_from(">H", result, delta+1+alen)[0]
pk = bin_to_hex(result[delta+1+alen+2:delta+1+alen+2+32], 32)
LOG.info(f"{dSTATE_TYPE[data_type]} #{relay} bytes={length} {status} {ipv} {af} {ipaddr} {port} {pk}")
delta += total
length -= total
relay += 1
lIN = lProcessTcpRelay(state, index, length)
if lIN: lOUT += [{"Tcp_relays": lIN}]
elif data_type == MESSENGER_STATE_TYPE_PATH_NODE:
LOG.debug(f"TODO process_chunk {dSTATE_TYPE[data_type]} bytes={length}")
@ -316,6 +391,8 @@ called the “packed node format”.
LOG.debug(f"TODO process_chunk {dSTATE_TYPE[data_type]} bytes={length}")
else:
LOG.info(f"NO {dSTATE_TYPE[data_type]}")
lOUT += [{"Conferences": []}]
elif data_type == MESSENGER_STATE_TYPE_END:
LOG.info("That's all folks...")
@ -359,3 +436,5 @@ if __name__ == '__main__':
assert oSave
assert oSave[:8] == b'\x00\x00\x00\x00\x1f\x1b\xed\x15'
process_chunk(8, oSave)
if lOUT and yaml:
sys.stderr.write(yaml.dump(lOUT))