Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions ntfy/backends/multi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from importlib import import_module
try:
from ..terminal import is_focused
except ImportError:

def is_focused():
return True


from ..screensaver import is_locked


def notify(title,
message,
locked=None,
focused=None,
unfocused=None,
retcode=None):
for condition, options in ((is_locked, locked), (is_focused, focused),
(lambda: not is_focused(), unfocused)):
for backend_name, backend_options in options.items():
if not condition():
continue
backend = import_module('ntfy.backends.{}'.format(
backend_options.get('backend', backend_name)))
backend_options.pop('backend', None)
backend.notify(title, message, retcode=retcode, **backend_options)
17 changes: 17 additions & 0 deletions ntfy/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ def is_focused():
return True


from .screensaver import is_locked


def run_cmd(args):
if getattr(args, 'pid', False):
return watch_pid(args)
Expand Down Expand Up @@ -64,6 +67,8 @@ def run_cmd(args):
retcode = process.returncode
if args.longer_than is not None and duration <= args.longer_than:
return None, None
if args.locked_only and not is_locked():
return None, None
if args.unfocused_only and is_focused():
return None, None
message = _result_message(args.command if not args.hide_command else None,
Expand Down Expand Up @@ -230,6 +235,12 @@ def default_sender(args):
type=int,
metavar='N',
help="Only notify if the command runs longer than N seconds")
done_parser.add_argument(
'--locked-only',
action='store_true',
default=False,
dest='locked_only',
help='Only notify if the screen is locked')
done_parser.add_argument(
'-b',
'--background-only',
Expand Down Expand Up @@ -283,6 +294,12 @@ def default_sender(args):
type=int,
metavar='N',
help="Only notify if the command runs longer than N seconds")
shell_integration_parser.add_argument(
'--locked-only',
action='store_true',
default=False,
dest='locked_only',
help='Only notify if the screen is locked')
shell_integration_parser.add_argument(
'-f',
'--foreground-too',
Expand Down
103 changes: 103 additions & 0 deletions ntfy/screensaver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from shlex import split
from subprocess import check_output, check_call, CalledProcessError, PIPE

# some adapted from
# https://github.com/mtorromeo/xdg-utils/blob/master/scripts/xdg-screensaver.in#L540


def xscreensaver_detect():
try:
check_call(split('pgrep xscreensaver'), stdout=PIPE)
except (CalledProcessError, OSError):
return False
else:
return True


def xscreensaver_is_locked():
return 'screen locked' in check_output(split('xscreensaver-command -time'))


def lightlocker_detect():
try:
check_call(split('pgrep light-locker'), stdout=PIPE)
except (CalledProcessError, OSError):
return False
else:
return True


def lightlocker_is_active():
return 'The screensaver is active' in check_output(
split('light-locker-command -q'))


def gnomescreensaver_detect():
try:
import dbus
except ImportError:
return False
bus = dbus.SessionBus()
dbus_obj = bus.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus')
dbus_iface = dbus.Interface(
dbus_obj, dbus_interface='org.freedesktop.DBus')
try:
dbus_iface.GetNameOwner('org.gnome.ScreenSaver')
except dbus.DBusException as e:
if e.get_dbus_name() == 'org.freedesktop.DBus.Error.NameHasNoOwner':
return False
else:
raise e
else:
return True


def gnomescreensaver_is_locked():
import dbus
bus = dbus.SessionBus()
dbus_obj = bus.get_object('org.gnome.ScreenSaver',
'/org/gnome/ScreenSaver')
dbus_iface = dbus.Interface(
dbus_obj, dbus_interface='org.gnome.ScreenSaver')
return bool(dbus_iface.GetActive())


def matescreensaver_detect():
try:
import dbus
except ImportError:
return False
bus = dbus.SessionBus()
dbus_obj = bus.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus')
dbus_iface = dbus.Interface(
dbus_obj, dbus_interface='org.freedesktop.DBus')
try:
dbus_iface.GetNameOwner('org.mate.ScreenSaver')
except dbus.DBusException as e:
if e.get_dbus_name() == 'org.freedesktop.DBus.Error.NameHasNoOwner':
return False
else:
raise e
else:
return True


def matescreensaver_is_locked():
import dbus
bus = dbus.SessionBus()
dbus_obj = bus.get_object('org.mate.ScreenSaver', '/org/mate/ScreenSaver')
dbus_iface = dbus.Interface(
dbus_obj, dbus_interface='org.mate.ScreenSaver')
return bool(dbus_iface.GetActive())


def is_locked():
if xscreensaver_detect():
return xscreensaver_is_locked()
if lightlocker_detect():
return lightlocker_is_active()
if gnomescreensaver_detect():
return gnomescreensaver_is_locked()
if matescreensaver_detect():
return matescreensaver_is_locked()
return True
11 changes: 11 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def test_default(self, mock_Popen):
args.pid = None
args.unfocused_only = False
args.hide_command = False
args.locked_only = False
self.assertEqual(('"true" succeeded in 0:00 minutes', 0),
run_cmd(args))

Expand All @@ -39,6 +40,7 @@ def test_emoji(self, mock_Popen):
args.no_emoji = False
args.unfocused_only = False
args.hide_command = False
args.locked_only = False
self.assertEqual(
(':white_check_mark: "true" succeeded in 0:00 minutes', 0),
run_cmd(args))
Expand Down Expand Up @@ -70,6 +72,7 @@ def test_failure(self, mock_Popen):
args.pid = None
args.unfocused_only = False
args.hide_command = False
args.locked_only = False
self.assertEqual(('"false" failed (code 42) in 0:00 minutes', 42),
run_cmd(args))

Expand All @@ -82,6 +85,7 @@ def test_stdout(self, mock_Popen):
args.pid = None
args.unfocused_only = False
args.hide_command = False
args.locked_only = False
# not actually used
args.stdout = True
args.stderr = False
Expand All @@ -97,6 +101,7 @@ def test_stderr(self, mock_Popen):
args.pid = None
args.unfocused_only = False
args.hide_command = False
args.locked_only = False
# not actually used
args.stdout = False
args.stderr = True
Expand All @@ -112,6 +117,7 @@ def test_stdout_and_stderr(self, mock_Popen):
args.pid = None
args.unfocused_only = False
args.hide_command = False
args.locked_only = False
# not actually used
args.stdout = True
args.stderr = True
Expand All @@ -128,6 +134,7 @@ def test_failure_stdout_and_stderr(self, mock_Popen):
args.pid = None
args.unfocused_only = False
args.hide_command = False
args.locked_only = False
# not actually used
args.stdout = True
args.stderr = True
Expand All @@ -144,6 +151,7 @@ def test_hide_command(self, mock_Popen):
args.pid = None
args.unfocused_only = False
args.hide_command = True
args.locked_only = False
self.assertEqual(('Your command succeeded in 0:00 minutes', 0),
run_cmd(args))

Expand All @@ -155,6 +163,7 @@ def test_formatter(self):
args.longer_than = -1
args.unfocused_only = False
args.hide_command = False
args.locked_only = False
self.assertEqual(('"true" succeeded in 1:05 minutes', 0),
run_cmd(args))

Expand All @@ -166,6 +175,7 @@ def test_formatter_failure(self):
args.longer_than = -1
args.unfocused_only = False
args.hide_command = False
args.locked_only = False
self.assertEqual(('"false" failed (code 1) in 0:10 minutes', 1),
run_cmd(args))

Expand Down Expand Up @@ -199,6 +209,7 @@ def test_watch_pid(self, mock_process):
args = MagicMock()
args.pid = 1
args.unfocused_only = False
args.locked_only = False
self.assertEqual('PID[1]: "cmd" finished in 0:00 minutes',
run_cmd(args)[0])

Expand Down