pyproject.toml

This commit is contained in:
emdee 2024-01-14 00:08:22 +00:00
parent 4dc394213c
commit 3b623d4fdf
8 changed files with 43 additions and 0 deletions

View file

@ -0,0 +1,3 @@
# -*-mode: python; indent-tabs-mode: nil; py-indent-offset: 2; coding: utf-8 -*-
__version__ = "0.1.0"

20
src/phantompy/__main__.py Normal file
View file

@ -0,0 +1,20 @@
#!/usr/local/bin/python3.sh
# -*-mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*
from __future__ import absolute_import
import sys
from .qasync_phantompy import iMain
try:
from .support_phantompy import vsetup_logging
d = int(os.environ.get('DEBUG', 0))
if d > 0:
vsetup_logging(10, stream=sys.stderr)
else:
vsetup_logging(20, stream=sys.stderr)
vsetup_logging(log_level, logfile='', stream=sys.stderr)
except: pass
if __name__ == '__main__':
iMain(sys.argv[1:])

View file

@ -0,0 +1,84 @@
#!/usr/local/bin/python3.sh
# -*-mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*
"""
Looks for urls https://dns.google/resolve?
https://dns.google/resolve?name=domain.name&type=TXT&cd=true&do=true
and parses them to extract a magic field.
A good example of how you can parse json embedded in HTML with phantomjs.
"""
import sys
import os
from phantompy import Render
global LOG
import logging
import warnings
warnings.filterwarnings('ignore')
LOG = logging.getLogger()
class LookFor(Render):
def __init__(self, app, do_print=True, do_save=False):
app.lfps = []
self._app = app
self.do_print = do_print
self.do_save = do_save
self.progress = 0
self.we_run_this_tor_relay = None
Render.__init__(self, app, do_print, do_save)
def _exit(self, val):
Render._exit(self, val)
self.percent = 100
LOG.debug(f"phantom.py: Exiting with val {val}")
i = self.uri.find('name=')
fp = self.uri[i+5:]
i = fp.find('.')
fp = fp[:i]
# threadsafe?
self._app.lfps.append(fp)
def _html_callback(self, *args):
"""print(self, QPrinter, Callable[[bool], None])"""
if type(args[0]) is str:
self._save(args[0])
i = self.ilookfor(args[0])
self._onConsoleMessage(i, "__PHANTOM_PY_SAVED__", 0 , '')
def ilookfor(self, html):
import json
marker = '<pre style="word-wrap: break-word; white-space: pre-wrap;">'
if marker not in html: return -1
i = html.find(marker) + len(marker)
html = html[i:]
assert html[0] == '{', html
i = html.find('</pre')
html = html[:i]
assert html[-1] == '}', html
LOG.debug(f"Found {len(html)} json")
o = json.loads(html)
if "Answer" not in o.keys() or type(o["Answer"]) != list:
LOG.warn(f"FAIL {self.uri}")
return 1
for elt in o["Answer"]:
assert type(elt) == dict, elt
assert 'type' in elt, elt
if elt['type'] != 16: continue
assert 'data' in elt, elt
if elt['data'] == 'we-run-this-tor-relay':
LOG.info(f"OK {self.uri}")
self.we_run_this_tor_relay = True
return 0
self.we_run_this_tor_relay = False
LOG.warn(f"BAD {self.uri}")
return 2
def _loadFinished(self, result):
LOG.debug(f"phantom.py: Loading finished {self.uri}")
self.toHtml(self._html_callback)

273
src/phantompy/phantompy.py Normal file
View file

@ -0,0 +1,273 @@
#!/usr/local/bin/python3.sh
# -*-mode: python; indent-tabs-mode: nil; py-indent-offset: 2; coding: utf-8 -*-
# https://gist.github.com/michaelfranzl/91f0cc13c56120391b949f885643e974/raw/a0601515e7a575bc4c7d4d2a20973b29b6c6f2df/phantom.py
"""
# phantom.py
Simple but fully scriptable headless QtWebKit browser using PyQt5 in Python3,
specialized in executing external JavaScript and generating PDF files. A lean
replacement for other bulky headless browser frameworks.
## Usage
If you have a display attached:
./phantom.py [--pdf_output <pdf-file>] [--js_input <javascript-file>] <url-or-html-file>
If you don't have a display attached (i.e. on a remote server), you can use
xvfb-run, or don't add --show_gui - it should work without a display.
Arguments:
[--pdf_output <pdf-file>] (optional) Path and name of PDF file to generate
[--html_output <html-file>] (optional) Path and name of HTML file to generate
[--js_input <javascript-file>] (optional) Path and name of a JavaScript file to execute
--log_level 10=debug 20=info 30=warn 40=error
<url> Can be a http(s) URL or a path to a local file
## Features
* Generate a PDF screenshot of the web page after it is completely loaded.
* Optionally execute a local JavaScript file specified by the argument
<javascript-file> after the web page is completely loaded, and before
the PDF is generated.
* console.log's will be printed to stdout.
* Easily add new features by changing the source code of this script, without
compiling C++ code. For more advanced applications, consider attaching
PyQt objects/methods to WebKit's JavaScript space by using
`QWebFrame::addToJavaScriptWindowObject()`.
If you execute an external <javascript-file>, phantom.py has no way of knowing
when that script has finished doing its work. For this reason, the external
script should execute `console.log("__PHANTOM_PY_DONE__");` when done. This will
trigger the PDF generation, after which phantom.py will exit. If no
`__PHANTOM_PY_DONE__` string is seen on the console for 10 seconds, phantom.py
will exit without doing anything. This behavior could be implemented more
elegantly without console.log's but it is the simplest solution.
It is important to remember that since you're just running WebKit, you can use
everything that WebKit supports, including the usual JS client libraries, CSS,
CSS @media types, etc.
## Dependencies
* Python3
* PyQt5
* [qasnyc](https://github.com/CabbageDevelopment/qasync) for the
standalone program ```qasnyc_phantompy.py```
* xvfb (optional for display-less machines)
Installation of dependencies in Debian Stretch is easy:
apt-get install xvfb python3-pyqt5 python3-pyqt5.qtwebkit
Finding the equivalent for other OSes is an exercise that I leave to you.
## Examples
Given the following file /tmp/test.html
<html>
<body>
<p>foo <span id="id1">foo</span> <span id="id2">foo</span></p>
</body>
<script>
document.getElementById('id1').innerHTML = "bar";
</script>
</html>
... and the following file /tmp/test.js:
document.getElementById('id2').innerHTML = "baz";
console.log("__PHANTOM_PY_DONE__");
... and running this script (without attached display) ...
xvfb-run python3 phantom.py /tmp/test.html /tmp/out.pdf /tmp/test.js
... you will get a PDF file /tmp/out.pdf with the contents "foo bar baz".
Note that the second occurrence of "foo" has been replaced by the web page's own
script, and the third occurrence of "foo" by the external JS file.
## License
Copyright 2017 Michael Karl Franzl
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
import importlib
import os
import sys # noqa
from qasync import QtModuleName
from qasync.QtCore import QUrl
QPrinter = importlib.import_module(QtModuleName + ".QtPrintSupport.QPrinter", package=QtModuleName)
QWebEnginePage = importlib.import_module(QtModuleName + ".QtWebEngineWidgets.QWebEnginePage", package=QtModuleName)
global LOG
import logging
import warnings
warnings.filterwarnings('ignore')
LOG = logging.getLogger()
def prepare(sdir='/tmp'):
sfile = os.path.join(sdir, 'test.js')
if not os.path.exists(sfile):
with open(sfile, 'wt') as ofd:
ofd.write("""
document.getElementById('id2').innerHTML = "baz";
console.log("__PHANTOM_PY_DONE__");
""")
LOG.debug(f"wrote {sfile} ")
sfile = os.path.join(sdir, 'test.html')
if not os.path.exists(sfile):
with open(sfile, 'wt') as ofd:
ofd.write("""
<html>
<body>
<p>foo <span id="id1">foo</span> <span id="id2">foo</span></p>
</body>
<script>
document.getElementById('id1').innerHTML = "bar";
</script>
</html>
""")
LOG.debug(f"wrote {sfile} ")
class Render(QWebEnginePage):
def __init__(self, app, do_print=False, do_save=True):
app.ldone = []
self._app = app
self.do_print = do_print
self.do_save = do_save
self.percent = 0
self.uri = None
self.jsfile = None
self.htmlfile = None
self.pdffile = None
QWebEnginePage.__init__(self)
def run(self, url, pdffile, htmlfile, jsfile):
self._app.lstart.append(id(self))
self.percent = 10
self.uri = url
self.jsfile = jsfile
self.htmlfile = htmlfile
self.pdffile = pdffile
self.outfile = pdffile or htmlfile
LOG.debug(f"phantom.py: URL={url} htmlfile={htmlfile} pdffile={pdffile} JSFILE={jsfile}")
qurl = QUrl.fromUserInput(url)
# The PDF generation only happens when the special string __PHANTOM_PY_DONE__
# is sent to console.log(). The following JS string will be executed by
# default, when no external JavaScript file is specified.
self.js_contents = "setTimeout(function() { console.log('__PHANTOM_PY_DONE__') }, 5000);"
if jsfile:
try:
with open(self.jsfile, 'rt') as f:
self.js_contents = f.read()
except Exception as e: # noqa
LOG.exception(f"error reading jsfile {self.jsfile}")
self.loadFinished.connect(self._loadFinished)
self.percent = 20
self.load(qurl)
self.javaScriptConsoleMessage = self._onConsoleMessage
LOG.debug(f"phantom.py: loading 10")
def _onConsoleMessage(self, *args):
if len(args) > 3:
level, txt, lineno, filename = args
else:
level = 1
txt, lineno, filename = args
LOG.debug(f"CONSOLE {lineno} {txt} {filename}")
if "__PHANTOM_PY_DONE__" in txt:
self.percent = 40
# If we get this magic string, it means that the external JS is done
if self.do_save:
self.toHtml(self._html_callback)
return
# drop through
txt = "__PHANTOM_PY_SAVED__"
if "__PHANTOM_PY_SAVED__" in txt:
self.percent = 50
if self.do_print:
self._print()
return
txt = "__PHANTOM_PY_PRINTED__"
if "__PHANTOM_PY_PRINTED__" in txt:
self.percent = 60
self._exit(level)
def _loadFinished(self, result):
# RenderProcessTerminationStatus ?
self.percent = 30
LOG.info(f"phantom.py: _loadFinished {result} {self.percent}")
LOG.debug(f"phantom.py: Evaluating JS from {self.jsfile}")
self.runJavaScript("document.documentElement.contentEditable=true")
self.runJavaScript(self.js_contents)
def _html_callback(self, *args):
"""print(self, QPrinter, Callable[[bool], None])"""
if type(args[0]) is str:
self._save(args[0])
self._onConsoleMessage(0, "__PHANTOM_PY_SAVED__", 0, '')
def _save(self, html):
sfile = self.htmlfile
# CompleteHtmlSaveFormat SingleHtmlSaveFormat MimeHtmlSaveFormat
with open(sfile, 'wt') as ofd:
ofd.write(html)
LOG.debug(f"Saved {sfile}")
def _printer_callback(self, *args):
"""print(self, QPrinter, Callable[[bool], None])"""
if args[0] is False:
i = 1
else:
i = 0
self._onConsoleMessage(i, "__PHANTOM_PY_PRINTED__", 0, '')
def _print(self):
sfile = self.pdffile
printer = QPrinter()
printer.setPageMargins(10, 10, 10, 10, QPrinter.Millimeter)
printer.setPaperSize(QPrinter.A4)
printer.setCreator("phantom.py by Michael Karl Franzl")
printer.setOutputFormat(QPrinter.PdfFormat)
printer.setOutputFileName(sfile)
self.print(printer, self._printer_callback)
LOG.debug("phantom.py: Printed")
def _exit(self, val):
self.percent = 100
LOG.debug(f"phantom.py: Exiting with val {val}")
# threadsafe?
self._app.ldone.append(self.uri)

View file

@ -0,0 +1,128 @@
#!/usr/local/bin/python3.sh
# -*-mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*
import asyncio
import os
import sys
# let qasync figure out what Qt we are using - we dont care
from qasync import QApplication, QEventLoop, QtWidgets
from phantompy import Render
# if you want an example of looking for things in downloaded HTML:
# from lookupdns import LookFor as Render
from support_phantompy import omain_argparser, vsetup_logging
global LOG
import logging
import warnings
warnings.filterwarnings('ignore')
LOG = logging.getLogger()
try:
import shtab
except:
shtab = None
class Widget(QtWidgets.QWidget):
def __init__(self):
QtWidgets.QWidget.__init__(self)
self._label = QtWidgets.QLabel()
box = QtWidgets.QHBoxLayout()
self.setLayout(box)
box.addWidget(self._label)
self.progress = QtWidgets.QProgressBar()
self.progress.setRange(0, 99)
box.addWidget(self.progress)
def update(self, text):
i = len(asyncio.all_tasks())
self._label.setText(str(i))
self.progress.setValue(int(text))
class ContextManager:
def __init__(self) -> None:
self._seconds = 0
async def __aenter__(self):
LOG.debug("ContextManager enter")
return self
async def __aexit__(self, *args):
LOG.debug("ContextManager exit")
async def tick(self):
await asyncio.sleep(1)
self._seconds += 1
return self._seconds
async def main(widget, app, ilen):
LOG.debug("Task started")
try:
async with ContextManager() as ctx:
for i in range(1, 120):
seconds = await ctx.tick()
if widget:
widget.update(str(i))
if len(app.ldone) == ilen:
LOG.info(f"Finished with {app.ldone}")
print('\n'.join(app.ldone))
app.exit()
# raise asyncio.CancelledError
return
LOG.debug(f"{app.ldone} {seconds}")
except asyncio.CancelledError as ex: # noqa
LOG.debug("Task cancelled")
def iMain(largs):
parser = omain_argparser()
if shtab:
shtab.add_argument_to(parser, ["-s", "--print-completion"]) # magic!
oargs = parser.parse_args(largs)
bgui = oargs.show_gui
try:
d = int(os.environ.get('DEBUG', 0))
if d > 0:
oargs.log_level = 10
vsetup_logging(oargs.log_level, logfile='', stream=sys.stderr)
except: pass
app = QApplication([])
app.lstart = []
if bgui:
widget = Widget()
widget._app = app
widget.show()
else:
widget = None
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
url = oargs.html_url
htmlfile = oargs.html_output
pdffile = oargs.html_output
jsfile = oargs.js_input
# run only starts the url loading
r = Render(app,
do_print=True if pdffile else False,
do_save=True if htmlfile else False)
uri = url.strip()
r.run(uri, pdffile, htmlfile, jsfile)
LOG.debug(f"{r.percent} {app.lstart}")
LOG.info(f"queued {len(app.lstart)} urls")
task = loop.create_task(main(widget, app, 1))
loop.run_forever()
# cancel remaining tasks and wait for them to complete
task.cancel()
tasks = asyncio.all_tasks()
loop.run_until_complete(asyncio.gather(*tasks))
if __name__ == '__main__':
iMain(sys.argv[1:])

View file

@ -0,0 +1,117 @@
#!/usr/local/bin/python3.sh
# -*-mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*
import argparse
import os
import sys
try:
if 'COLOREDLOGS_LEVEL_STYLES' not in os.environ:
os.environ['COLOREDLOGS_LEVEL_STYLES'] = 'spam=22;debug=28;verbose=34;notice=220;warning=202;success=118,bold;error=124;critical=background=red'
# https://pypi.org/project/coloredlogs/
import coloredlogs
except ImportError:
coloredlogs = False
global LOG
import logging
import warnings
warnings.filterwarnings('ignore')
LOG = logging.getLogger()
def vsetup_logging(log_level, logfile='', stream=sys.stdout):
global LOG
add = True
# stem fucks up logging
# from stem.util import log
logging.getLogger('stem').setLevel(30)
logging._defaultFormatter = logging.Formatter(datefmt='%m-%d %H:%M:%S')
logging._defaultFormatter.default_time_format = '%m-%d %H:%M:%S'
logging._defaultFormatter.default_msec_format = ''
kwargs = dict(level=log_level,
force=True,
format='%(levelname)s %(message)s')
if logfile:
add = logfile.startswith('+')
sub = logfile.startswith('-')
if add or sub:
logfile = logfile[1:]
kwargs['filename'] = logfile
if coloredlogs:
# https://pypi.org/project/coloredlogs/
aKw = dict(level=log_level,
logger=LOG,
stream=stream,
fmt='%(levelname)s %(message)s'
)
coloredlogs.install(**aKw)
if logfile:
oHandler = logging.FileHandler(logfile)
LOG.addHandler(oHandler)
LOG.info(f"CSetting log_level to {log_level} {stream}")
else:
logging.basicConfig(**kwargs)
if add and logfile:
oHandler = logging.StreamHandler(stream)
LOG.addHandler(oHandler)
LOG.info(f"SSetting log_level to {log_level!s}")
logging._levelToName = {
logging.CRITICAL: 'CRITICAL',
logging.ERROR: 'ERROR',
logging.WARNING: 'WARN',
logging.INFO: 'INFO',
logging.DEBUG: 'DEBUG',
logging.NOTSET: 'NOTSET',
}
logging._nameToLevel = {
'CRITICAL': logging.CRITICAL,
'FATAL': logging.FATAL,
'ERROR': logging.ERROR,
'WARN': logging.WARNING,
'WARNING': logging.WARNING,
'INFO': logging.INFO,
'DEBUG': logging.DEBUG,
'NOTSET': logging.NOTSET,
}
def omain_argparser(_=None):
try:
from OpenSSL import SSL
lCAfs = SSL._CERTIFICATE_FILE_LOCATIONS
except:
lCAfs = []
CAfs = []
for elt in lCAfs:
if os.path.exists(elt):
CAfs.append(elt)
if not CAfs:
CAfs = ['']
parser = argparse.ArgumentParser(add_help=True,
epilog=__doc__)
parser.add_argument('--https_cafile', type=str,
help="Certificate Authority file (in PEM) (unused)",
default=CAfs[0])
parser.add_argument('--log_level', type=int, default=20,
help="10=debug 20=info 30=warn 40=error")
parser.add_argument('--js_input', type=str, default='',
help="Operate on the HTML file with javascript")
parser.add_argument('--html_output', type=str, default='',
help="Write loaded and javascripted result to a HTML file")
parser.add_argument('--pdf_output', type=str, default='',
help="Write loaded and javascripted result to a PDF file")
parser.add_argument('--show_gui', type=bool, default=False, store_action=True,
help="show a progress meter that doesn't work")
parser.add_argument('html_url', type=str, nargs='?',
required=True,
help='html file or url')
return parser