diff --git a/ntfy/backends/multi.py b/ntfy/backends/multi.py new file mode 100644 index 0000000..6d986b3 --- /dev/null +++ b/ntfy/backends/multi.py @@ -0,0 +1,27 @@ +from importlib import import_module +try: + from ..terminal import is_focused +except ImportError: + + def is_focused(): + return True + + +from ..screensaver import is_locked + + +def notify(title, + message, + locked=None, + focused=None, + unfocused=None, + retcode=None): + for condition, options in ((is_locked, locked), (is_focused, focused), + (lambda: not is_focused(), unfocused)): + for backend_name, backend_options in options.items(): + if not condition(): + continue + backend = import_module('ntfy.backends.{}'.format( + backend_options.get('backend', backend_name))) + backend_options.pop('backend', None) + backend.notify(title, message, retcode=retcode, **backend_options) diff --git a/ntfy/cli.py b/ntfy/cli.py index 7dad0b2..6d6dea8 100644 --- a/ntfy/cli.py +++ b/ntfy/cli.py @@ -34,6 +34,9 @@ def is_focused(): return True +from .screensaver import is_locked + + def run_cmd(args): if getattr(args, 'pid', False): return watch_pid(args) @@ -64,6 +67,8 @@ def run_cmd(args): retcode = process.returncode if args.longer_than is not None and duration <= args.longer_than: return None, None + if args.locked_only and not is_locked(): + return None, None if args.unfocused_only and is_focused(): return None, None message = _result_message(args.command if not args.hide_command else None, @@ -230,6 +235,12 @@ def default_sender(args): type=int, metavar='N', help="Only notify if the command runs longer than N seconds") +done_parser.add_argument( + '--locked-only', + action='store_true', + default=False, + dest='locked_only', + help='Only notify if the screen is locked') done_parser.add_argument( '-b', '--background-only', @@ -283,6 +294,12 @@ def default_sender(args): type=int, metavar='N', help="Only notify if the command runs longer than N seconds") +shell_integration_parser.add_argument( + '--locked-only', + action='store_true', + default=False, + dest='locked_only', + help='Only notify if the screen is locked') shell_integration_parser.add_argument( '-f', '--foreground-too', diff --git a/ntfy/screensaver.py b/ntfy/screensaver.py new file mode 100644 index 0000000..8689983 --- /dev/null +++ b/ntfy/screensaver.py @@ -0,0 +1,103 @@ +from shlex import split +from subprocess import check_output, check_call, CalledProcessError, PIPE + +# some adapted from +# https://github.com/mtorromeo/xdg-utils/blob/master/scripts/xdg-screensaver.in#L540 + + +def xscreensaver_detect(): + try: + check_call(split('pgrep xscreensaver'), stdout=PIPE) + except (CalledProcessError, OSError): + return False + else: + return True + + +def xscreensaver_is_locked(): + return 'screen locked' in check_output(split('xscreensaver-command -time')) + + +def lightlocker_detect(): + try: + check_call(split('pgrep light-locker'), stdout=PIPE) + except (CalledProcessError, OSError): + return False + else: + return True + + +def lightlocker_is_active(): + return 'The screensaver is active' in check_output( + split('light-locker-command -q')) + + +def gnomescreensaver_detect(): + try: + import dbus + except ImportError: + return False + bus = dbus.SessionBus() + dbus_obj = bus.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus') + dbus_iface = dbus.Interface( + dbus_obj, dbus_interface='org.freedesktop.DBus') + try: + dbus_iface.GetNameOwner('org.gnome.ScreenSaver') + except dbus.DBusException as e: + if e.get_dbus_name() == 'org.freedesktop.DBus.Error.NameHasNoOwner': + return False + else: + raise e + else: + return True + + +def gnomescreensaver_is_locked(): + import dbus + bus = dbus.SessionBus() + dbus_obj = bus.get_object('org.gnome.ScreenSaver', + '/org/gnome/ScreenSaver') + dbus_iface = dbus.Interface( + dbus_obj, dbus_interface='org.gnome.ScreenSaver') + return bool(dbus_iface.GetActive()) + + +def matescreensaver_detect(): + try: + import dbus + except ImportError: + return False + bus = dbus.SessionBus() + dbus_obj = bus.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus') + dbus_iface = dbus.Interface( + dbus_obj, dbus_interface='org.freedesktop.DBus') + try: + dbus_iface.GetNameOwner('org.mate.ScreenSaver') + except dbus.DBusException as e: + if e.get_dbus_name() == 'org.freedesktop.DBus.Error.NameHasNoOwner': + return False + else: + raise e + else: + return True + + +def matescreensaver_is_locked(): + import dbus + bus = dbus.SessionBus() + dbus_obj = bus.get_object('org.mate.ScreenSaver', '/org/mate/ScreenSaver') + dbus_iface = dbus.Interface( + dbus_obj, dbus_interface='org.mate.ScreenSaver') + return bool(dbus_iface.GetActive()) + + +def is_locked(): + if xscreensaver_detect(): + return xscreensaver_is_locked() + if lightlocker_detect(): + return lightlocker_is_active() + if gnomescreensaver_detect(): + return gnomescreensaver_is_locked() + if matescreensaver_detect(): + return matescreensaver_is_locked() + return True diff --git a/tests/test_cli.py b/tests/test_cli.py index c1341f3..5cff1af 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -26,6 +26,7 @@ def test_default(self, mock_Popen): args.pid = None args.unfocused_only = False args.hide_command = False + args.locked_only = False self.assertEqual(('"true" succeeded in 0:00 minutes', 0), run_cmd(args)) @@ -39,6 +40,7 @@ def test_emoji(self, mock_Popen): args.no_emoji = False args.unfocused_only = False args.hide_command = False + args.locked_only = False self.assertEqual( (':white_check_mark: "true" succeeded in 0:00 minutes', 0), run_cmd(args)) @@ -70,6 +72,7 @@ def test_failure(self, mock_Popen): args.pid = None args.unfocused_only = False args.hide_command = False + args.locked_only = False self.assertEqual(('"false" failed (code 42) in 0:00 minutes', 42), run_cmd(args)) @@ -82,6 +85,7 @@ def test_stdout(self, mock_Popen): args.pid = None args.unfocused_only = False args.hide_command = False + args.locked_only = False # not actually used args.stdout = True args.stderr = False @@ -97,6 +101,7 @@ def test_stderr(self, mock_Popen): args.pid = None args.unfocused_only = False args.hide_command = False + args.locked_only = False # not actually used args.stdout = False args.stderr = True @@ -112,6 +117,7 @@ def test_stdout_and_stderr(self, mock_Popen): args.pid = None args.unfocused_only = False args.hide_command = False + args.locked_only = False # not actually used args.stdout = True args.stderr = True @@ -128,6 +134,7 @@ def test_failure_stdout_and_stderr(self, mock_Popen): args.pid = None args.unfocused_only = False args.hide_command = False + args.locked_only = False # not actually used args.stdout = True args.stderr = True @@ -144,6 +151,7 @@ def test_hide_command(self, mock_Popen): args.pid = None args.unfocused_only = False args.hide_command = True + args.locked_only = False self.assertEqual(('Your command succeeded in 0:00 minutes', 0), run_cmd(args)) @@ -155,6 +163,7 @@ def test_formatter(self): args.longer_than = -1 args.unfocused_only = False args.hide_command = False + args.locked_only = False self.assertEqual(('"true" succeeded in 1:05 minutes', 0), run_cmd(args)) @@ -166,6 +175,7 @@ def test_formatter_failure(self): args.longer_than = -1 args.unfocused_only = False args.hide_command = False + args.locked_only = False self.assertEqual(('"false" failed (code 1) in 0:10 minutes', 1), run_cmd(args)) @@ -199,6 +209,7 @@ def test_watch_pid(self, mock_process): args = MagicMock() args.pid = 1 args.unfocused_only = False + args.locked_only = False self.assertEqual('PID[1]: "cmd" finished in 0:00 minutes', run_cmd(args)[0])