-
Notifications
You must be signed in to change notification settings - Fork 0
/
openvpn-u2f-ask-password
executable file
·795 lines (681 loc) · 28.6 KB
/
openvpn-u2f-ask-password
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
#!/usr/bin/env python3
# openvpn-u2f-setup/openvpn-u2f-ask-password -- send U2F auth to OpenVPN server
# Copyright (C) 2021, Walter Doekes, OSSO B.V.
#
# This file is part of openvpn-u2f-setup. It is free software: you can
# redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, version 3
# or any later version.
#
# For documentation, sources and the full license, go to:
# https://github.com/ossobv/openvpn-u2f-setup
#
# This script needs to be installed as a daemon on your system so it
# can handle OpenVPN password requests through SystemD. Use the provided
# openvpn-u2f-ask-password.service file.
import json
import os
import signal
import subprocess
import sys
import traceback
import time
from base64 import urlsafe_b64encode
from warnings import warn
import pyinotify # python3-pyinotify
try:
# You must do this before connecting to the bus. Otherwise we get no
# asynchronous signal receptions.
from dbus.mainloop.glib import DBusGMainLoop
DBusGMainLoop(set_as_default=True)
# We expect python3-dbus through these dependent apt packages:
# ubuntu-server -> software-properties-common -> python3-dbus
import dbus
dbus_session = dbus.SessionBus()
dbus_system = dbus.SystemBus()
# Optional... this should autostart.
# shn = dbus_session.start_service_by_name('org.gnome.Shell.Notifications')
# if shn == (True, dbus_session.START_REPLY_SUCCESS):
# warn("Yes, started the notifier")
# elif shn == (True, dbus_session.START_REPLY_ALREADY_RUNNING):
# warn("Was already running")
# else:
# warn("Failed to start org.gnome.Shell.Notifications: {}".format(shn))
from gi.repository import GLib
except ImportError as e:
warn(
'Could not load some or all requirements needed for GUI '
'notification support: {}'.format(e))
GLib = None
dbus_session = None
dbus_system = None
# This must be the same on both sides for each new challenge. It's
# prefixed with "pam://" because the u2f-server is picky.
ORIGIN = 'pam://openvpn-server'
# This must be the same for the lifetime of the key handle
# (registration). If the appId is different, the u2host application
# refuses to sign (-6, authenticator error).
APPID = 'openvpn'
def is_in_screensaver():
"""
Return True if the user screen is locked at the moment
Use this to avoid waking up a blanked screen when away.
"""
if dbus_session is None:
return False # we don't know
screensaver_names = [
'org.gnome.ScreenSaver',
'org.cinnamon.ScreenSaver',
'org.kde.screensaver',
'org.freedesktop.ScreenSaver',
]
for obj_name in screensaver_names:
try:
obj_path = '/{0}'.format(obj_name.replace('.', '/'))
bus_object = dbus_session.get_object(obj_name, obj_path)
bus_interface = dbus.Interface(bus_object, obj_name)
# >>> get_interface.proxy_object._introspect_method_map
# {'org.freedesktop.DBus.Properties.Get': 'ss',
# 'org.freedesktop.DBus.Properties.GetAll': 's',
# 'org.freedesktop.DBus.Properties.Set': 'ssv',
# 'org.freedesktop.DBus.Introspectable.Introspect': '',
# 'org.freedesktop.DBus.Peer.Ping': '',
# 'org.freedesktop.DBus.Peer.GetMachineId': '',
# 'org.gnome.ScreenSaver.Lock': '',
# 'org.gnome.ScreenSaver.GetActive': '',
# 'org.gnome.ScreenSaver.SetActive': 'b',
# 'org.gnome.ScreenSaver.GetActiveTime': ''}
status = bool(bus_interface.GetActive())
break
except dbus.exceptions.DBusException:
pass
else:
warn('No applicable screensaver found')
return False # we don't know
return status
def listen_for_unlock(handler):
"""
Add the handler to known screensaver unlock signals.
"""
if dbus_system is None:
return
# gdbus monitor --system -d org.freedesktop.login1
# dbus-monitor --system \
# "type=signal,interface=org.freedesktop.login1.Session,member=Unlock"
def safe_handler(*args, **kwargs):
try:
handler(*args, **kwargs)
except Exception as e:
traceback.print_exc()
print('ERROR: safe_handler:', e)
ret = dbus_system.add_signal_receiver(
safe_handler,
'Unlock',
'org.freedesktop.login1.Session')
print('DEBUG: listen_for', ret)
assert ret, ret
class Notification:
"""
GNOME notification class with stdout fallback.
Used to notify GUI users that action needs to be taken.
"""
_closable = []
@staticmethod
def _get_image_path():
for image_candidate in (
# gnome-icon-theme
'/usr/share/icons/gnome/48x48/devices/network-vpn.png',
# suru-icon-theme
'/usr/share/icons/suru/status/scalable/network-secure.svg',
# ubuntu-mono
('/usr/share/icons/ubuntu-mono-dark/status/24/'
'nm-vpn-standalone-lock.svg'),
):
if os.path.isfile(image_candidate):
return image_candidate
return 'dialog-warn' # dialog-(information|error)
@classmethod
def _notify(
cls, app_name, replaces_id, app_icon, summary, body, actions=[],
hints={}, expire_timeout_ms=-1):
"""
https://specifications.freedesktop.org/notification-spec/latest/
ar01s09.html # Hints
ar01s08.html # org.freedesktop.Notifications.Notify
"""
obj_name = 'org.freedesktop.Notifications'
obj_path = '/{0}'.format(obj_name.replace('.', '/'))
bus_object = dbus_session.get_object(obj_name, obj_path)
bus_interface = dbus.Interface(bus_object, obj_name)
next_id = bus_interface.Notify(
app_name, replaces_id, app_icon, summary, body,
actions, hints, expire_timeout_ms)
return next_id
@classmethod
def _push_closable(cls, replaces_id):
cls._closable.append(replaces_id)
@classmethod
def _close(cls):
while cls._closable:
replaces_id = cls._closable.pop(0)
print('DEBUG: Notification._close({})'.format(replaces_id))
obj_name = 'org.freedesktop.Notifications'
obj_path = '/{0}'.format(obj_name.replace('.', '/'))
bus_object = dbus_session.get_object(obj_name, obj_path)
bus_interface = dbus.Interface(bus_object, obj_name)
bus_interface.CloseNotification(replaces_id)
return False # don't reschedule
def __init__(self, title, body, icon, autoclose=True):
self._title = title
self._body = body
self._icon = icon
self._autoclose = autoclose
self._notification = None
# Okay. This is all single threaded. We may have notifications
# showing, but they won't get closed, because we have control of
# the mainloop at this point. Close all earlier notifications.
self._close()
self.show()
def update(self, body, autoclose=True):
self._body = body
self._autoclose = autoclose
self.show()
def show(self):
replaces_id = self._notification if self._notification else 0
self._notification = self._notify(
sys.argv[0], replaces_id, self._icon or self._get_image_path(),
self._title, self._body)
if self._autoclose:
# expire_timeous_ms does not work for us. We have to expire
# them manually.
self._notification, replaces_id = None, self._notification
# This is single threaded. We're scheduling a close now.
# But, if someone else starts a notification, we'd be doing
# that work, and not this callback.
# WARNING: If we get a second notification, we should kill
# these earlier ones first. (By calling _close() in the
# constructor. Yes. This is a sucky workaround.)
self._push_closable(replaces_id)
GLib.timeout_add_seconds(3, (lambda: self._close()))
print('> [{}]\n> {}'.format(self._title, self._body))
class Process:
"""
Process helper; reads interesting stuff from /proc/PID.
Used to detect which (openvpn) process is requesting a password.
"""
@staticmethod
def getppid(pid):
try:
with open('/proc/{}/stat'.format(pid), 'rb') as fp:
stat = fp.read().decode('ascii', 'replace').split(' ')
except OSError: # FileNotFoundError? PermissionDenied?
return 1
# pid (comm) state ppid pgrp ... (see: man 5 proc)
for i in range(1, len(stat)):
if stat[i].endswith(')'):
break
else:
return 1
# i is now 1 except when there is a space in comm
return int(stat[i + 2])
@staticmethod
def getcwd(pid):
try:
return os.readlink('/proc/{}/cwd'.format(pid))
except OSError: # FileNotFoundError? PermissionDenied?
return ''
@classmethod
def getexe(cls, pid):
try:
return os.readlink('/proc/{}/exe'.format(pid))
except OSError: # FileNotFoundError? PermissionDenied?
try:
return cls.getcmdline(pid)[0]
except IndexError:
return ''
@staticmethod
def getcmdline(pid):
try:
with open('/proc/{}/cmdline'.format(pid), 'rb') as fp:
return fp.read().decode('ascii', 'replace').split('\0')
except OSError: # FileNotFoundError? PermissionDenied?
return []
@classmethod
def from_pid(cls, pid):
try:
os.kill(pid, 0)
except OSError as e:
if e.args[0] == 3: # ESRCH [Errno 3] No such process
print('pid {} is gone at {}'.format(pid, e))
return None
elif e.args[0] == 1: # ENOPERM
pass
return cls(pid)
def __init__(self, pid):
self.pid = pid
self.exe = self.getexe(pid)
# Is the password request made through systemd-ask-password,
# then fetch the parent process.
# > /bin/systemd-ask-password --echo --icon network-vpn "Enter ...:"
if self.exe.endswith('/systemd-ask-password'):
ppid = self.getppid(pid)
if ppid != 1:
pid = ppid
self.pid = pid
self.exe = self.getexe(pid)
@property
def cwd(self):
return self.getcwd(self.pid)
@property
def cmdline(self):
return self.getcmdline(self.pid)
def __repr__(self):
return '<Process({}, {}>'.format(self.pid, self.exe)
class AskPassword:
"""
Base AskPassword class, reusable if we wanted.
Subclasses need to implement handle().
"""
class CannotHandle(Exception):
pass
class TimeoutDuringHandling(Exception):
pass
def __init__(self, process):
self.last_activity = time.time()
self._process = process
def make_request(self, askfilename, header, values):
self._askfilename = askfilename
self._header = header
self._values = values
self._not_after = int(self._values.get('NotAfter', 0)) / 1000000.0
def is_valid(self):
# print('DEBUG: isfile {} AND {} > {}'.format(
# os.path.isfile(self._askfilename), self._not_after,
# time.clock_gettime(time.CLOCK_MONOTONIC)))
return (
os.path.isfile(self._askfilename) and
(not self._not_after
or self._not_after > time.clock_gettime(time.CLOCK_MONOTONIC)))
def respond(self, response):
subprocess.run(
['/usr/bin/pkexec',
# Work around incompatibility with policykit actions and
# Ubuntu move of /lib to /usr/lib, which causes polkit-1
# actions not to be matched if the non-real path is used.
os.path.realpath('/lib/systemd/systemd-reply-password'),
'1', self._values['Socket']],
input=response.encode('ascii'))
print('DEBUG: response to {!r}: {}'.format(
self._values.get('Message'),
(response if len(response) <= 16
else '{}...'.format(response[0:16]))))
def handle(self):
self.last_activity = time.time()
raise NotImplementedError()
class AskPasswordOpenVpnU2f(AskPassword):
"""
OpenVpnU2f specific AskPassword class.
Called for usernames and passwords. Responds with keyhandle and
(when U2F device is touched) with signed response.
"""
@staticmethod
def _get_config_filename_from_openvpn_process(process):
cmdline = process.cmdline
config = None
try:
idx = cmdline.index('--config')
config = cmdline[idx + 1]
except (IndexError, ValueError):
# > If --config file is the only option to the openvpn
# > command, the --config can be removed, and the command can
# > be given as openvpn file
if len(cmdline) == 2:
config = cmdline[1]
if config and config[0] != '/':
# Find keyhandle in /etc/openvpn/{client/,}.
for possible_path in (
process.cwd, '/etc/openvpn/client', '/etc/openvpn'):
if os.path.isfile(os.path.join(possible_path, config)):
config = os.path.join(possible_path, config)
break
assert os.path.isfile(config), config
return config
@staticmethod
def _get_u2f_keyhandles_from_openvpn_config(config):
"""
Assume config is .../<VPN>.conf and .../<VPN>/keyhandle.dat exists.
"""
possible_cfg = [os.path.expanduser('~/.u2f_keys')]
if config and config.endswith('.conf'):
possible_cfg.append(
os.path.join(config[0:-5], 'keyhandle.dat'))
for cfg in possible_cfg:
try:
with open(cfg, 'r') as f:
print('DEBUG: using keyhandles from', cfg)
return [i.split()[0] for i in f.readlines() if i.split()]
except IOError:
pass
else:
raise FileNotFoundError(
f'Cannot find U2F keyhandles: {possible_cfg!r}')
@staticmethod
def _make_challenge_request(keyhandle):
# Make it an exact int and no padding of other stuff.
# Must be 32 bytes in total, so we pad before challenging.
timestamp = str(int(time.time()))
assert len(timestamp) == 10, timestamp
assert all(i in '0123456789' for i in timestamp), timestamp
b64challenge = (
urlsafe_b64encode(
(timestamp + 'Z' + timestamp + 'Z' + timestamp)
.encode('ascii'))
.decode('ascii').replace('=', ''))
# Create request to pass to u2f-host. Pretend we got this from
# u2f-server -a authenticate.
request = (
f'{{"keyHandle": "{keyhandle}", "version": "U2F_V2", '
f'"challenge": "{b64challenge}", "appId": "{APPID}"}}')
return timestamp, request
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._password_response = None
def _is_username_request(self):
lower = self._values.get('Message').strip().lower().rstrip(':')
return lower == 'enter auth username' # 'Enter Auth Username:'
def _is_password_request(self):
lower = self._values.get('Message').strip().lower().rstrip(':')
return lower == 'enter auth password' # 'Enter Auth Password:'
def handle(self):
if is_in_screensaver():
warn('Ignoring ask-password request because in screensaver mode')
return
try:
if self._is_username_request():
if self._password_response:
print('WARNING? Stale password? {}'.format(
self._password_response))
self._password_response = None
self._handle_username_request()
elif self._is_password_request():
if not self._password_response:
print('WARNING. Skipped auth bit? Do nothing..')
return
self._handle_password_request()
else:
assert False, 'Unexpected request: {}'.format(self._values)
except (AskPassword.CannotHandle, AskPassword.TimeoutDuringHandling):
pass
else:
# Update so it doesn't get pruned just yet.
self.last_activity = time.time()
def _handle_username_request(self):
vpn_name, keyhandles = self._prepare_keypress_request()
# Call upon work from the U2F token. This may take a while.
keyhandle, timestamp, response = self._make_challenge_response(
vpn_name, keyhandles)
# Store timestamp+response for reuse when the password is
# requested.
self._password_response = '{}/{}'.format(timestamp, response)
self.respond(keyhandle)
def _handle_password_request(self):
assert self._password_response, (
self._process, self._password_response)
# TODO/FIXME: assert that this is still the same openvpn?
self.respond(self._password_response)
self._password_response = None
def _prepare_keypress_request(self):
try:
config = self._get_config_filename_from_openvpn_process(
self._process)
except AssertionError: # FIXME: should not use assertionerror
Notification(
'OpenVPN config filename not found',
('openvpn-u2f-ask-password needs the config to find '
'the path to the keyhandle. Ignoring this auth request.'),
self._values.get('Icon'))
raise AskPassword.CannotHandle()
vpn_name = os.path.basename(config).rsplit('.', 1)[0]
try:
keyhandles = self._get_u2f_keyhandles_from_openvpn_config(config)
except FileNotFoundError:
Notification(
'OpenVPN {} has no U2F key handle'.format(vpn_name),
('No keyhandle.dat found. Assuming authentication is '
'handled elsewhere.'),
self._values.get('Icon'))
raise AskPassword.CannotHandle()
except PermissionError:
Notification(
'OpenVPN {} could not read U2F key handle'.format(vpn_name),
'Did you set the correct permissions? Ignoring this request.',
self._values.get('Icon'))
raise AskPassword.CannotHandle()
return vpn_name, keyhandles
def _make_challenge_response(self, vpn_name, keyhandles):
# Do global (GUI?) notification.
note = Notification(
'OpenVPN {} U2F key request'.format(vpn_name),
('Please insert your hardware token for extra authentication. '
'When it blinks, touch the button.'),
self._values.get('Icon'), autoclose=False)
t0 = time.time()
keyhandle_idx = 0 # cycle over keyhandles
while True:
if not self.is_valid():
note.update('TIMEOUT. Too slow!')
raise AskPassword.TimeoutDuringHandling()
# Cycle over keyhandles. If the wrong key is inserted, u2f-host
# aborts quickly, so we can try the next keyhandle instead.
keyhandle = keyhandles[keyhandle_idx]
keyhandle_idx = (keyhandle_idx + 1) % len(keyhandles)
# The request is valid for a very short while, so we'll need
# a timeout on the process.
timeout = 6 # MUST be lower than openvpn-u2f-verify max $timedelta
timestamp, request = self._make_challenge_request(keyhandle)
t1 = time.time()
try:
ret = subprocess.run(
['/usr/bin/u2f-host', '-a', 'authenticate', '-o', ORIGIN],
input=request.encode('ascii'), stdout=subprocess.PIPE,
timeout=timeout) # short timeout in seconds
except subprocess.TimeoutExpired:
pass
else:
if ret.returncode == 0:
out = ret.stdout.decode('ascii')
break
# FIXME: After 300s (5mins) we abort. At this point you should
# have your key. If you really want to log in. Restart the
# OpenVPN client.
if (time.time() - t0) > 300:
note.update('TIMEOUT. Too slow!')
raise AskPassword.TimeoutDuringHandling()
# Sleep a short while if starting u2f-host failed quickly.
if (time.time() - t1) < 0.3:
time.sleep(1)
# In case the notification was already hidden, re-display it.
note.show()
# If you were to feed the 'out' JSON to u2f-server -a authenticate,
# it should be happy.
# {"signatureData": "AQAA..",
# "clientData": "eyAiY2hhb...",
# "keyHandle": "SnVVgZR18w..."}
# print('JSON:', out)
# But, because we want minimal data sent over the username and
# password fields, we'll send only our timestamp and the signature.
# We'll recreate the JSON found above on the server side.
jsdec = json.loads(out)
signature = jsdec['signatureData']
note.update('PRESSED. Thank you!')
return keyhandle, timestamp, signature
class AskPasswordHandler(pyinotify.ProcessEvent):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Keep user/pass requests that were done recently by the same process.
# This way we can match up username and password in the same instance.
self._reuse_askpassword = {}
def process_IN_CLOSE_WRITE(self, event):
self.process_IN_MOVED_TO(event)
def process_IN_MOVED_TO(self, event):
"""
event = <Event cookie=688867 dir=False mask=0x80 maskname=IN_MOVED_TO
name=ask.rCEVu1 path=/run/systemd/ask-password
pathname=/run/systemd/ask-password/ask.rCEVu1 wd=1 >
event.pathname = /run/systemd/ask-password/ask.rCEVu1
"""
self._prune_reuse_askpassword()
# print(event)
if os.path.basename(event.pathname).startswith('ask.'):
self.handle_askfile(event.pathname)
def handle_askfile(self, pathname):
try:
with open(pathname, 'r') as fp:
data = fp.read()
except FileNotFoundError:
print('DEBUG: file is now gone', pathname)
except Exception as e:
traceback.print_exc()
print('ERROR: handle_askfile:', e)
else:
try:
self._handle_askinifile(pathname, data)
except Exception as e:
traceback.print_exc()
print('ERROR: _handle_askinifile:', e)
def _handle_askinifile(self, filename, inifile):
"""
[Ask]
PID=1285316
Socket=/run/systemd/ask-password/sck.df96c35fda1a42ca
AcceptCached=0
Echo=1
NotAfter=1974186814503
Message=Enter Auth Username:
Icon=network-vpn
"""
lines = inifile.split('\n')
header = lines[0][1:-1]
vals = dict(i.split('=', 1) for i in lines[1:] if i)
process = Process.from_pid(int(vals['PID']))
if process and process.exe == '/usr/sbin/openvpn' and header == 'Ask':
assert isinstance(process.pid, int), process.pid
try:
ask = self._reuse_askpassword[process.pid]
except KeyError:
ask = self._reuse_askpassword[process.pid] = (
AskPasswordOpenVpnU2f(process))
ask.make_request(filename, header, vals)
ask.handle()
else:
# Ignore. This password request is not for us.
pass
def _prune_reuse_askpassword(self):
now = time.time()
for pid, instance in list(self._reuse_askpassword.items()):
if (now - instance.last_activity) > 5:
del self._reuse_askpassword[pid]
"""
From https://systemd.io/PASSWORD_AGENTS/ fetched on 2021-01-29.
> It is easy to write additional agents. The basic algorithm to follow looks
> like this:
>
> - Create an inotify watch on /run/systemd/ask-password, watch for
> IN_CLOSE_WRITE|IN_MOVED_TO
>
> - Ignore all events on files in that directory that do not start with "ask."
>
> - As soon as a file named "ask.xxxx" shows up, read it. It's a simple .ini
> file that may be parsed with the usual parsers. The xxxx suffix is
> randomized.
>
> - Make sure to ignore unknown .ini file keys in those files, so that we can
> easily extend the format later on.
>
> - You'll find the question to ask the user in the Message= field in the [Ask]
> section. It is a single-line string in UTF-8, which might be
> internationalized (by the party that originally asks the question, not by
> the agent).
>
> - You'll find an icon name (following the XDG icon naming spec) to show next
> to the message in the Icon= field in the [Ask] section
>
> - You'll find the PID of the client asking the question in the PID= field in
> the [Ask] section (Before asking your question use kill(PID, 0) and ignore
> the file if this returns ESRCH; there's no need to show the data of this
> field but if you want to you may)
>
> - Echo= specifies whether the input should be obscured. If this field is
> missing or is Echo=0, the input should not be shown.
>
> - The socket to send the response to is configured via Socket= in the [Ask]
> section. It is a AF_UNIX/SOCK_DGRAM socket in the file system.
>
> - Ignore files where the time specified in the NotAfter= field in the [Ask]
> section is in the past. The time is specified in usecs, and refers to the
> CLOCK_MONOTONIC clock. If NotAfter= is 0, no such check should take place.
>
> - Make sure to hide a password query dialog as soon as a) the ask.xxxx file
> is deleted, watch this with inotify. b) the NotAfter= time elapses, if it
> is set != 0.
>
> - Access to the socket is restricted to privileged users. To acquire the
> necessary privileges to send the answer back, consider using PolicyKit. In
> fact, the GNOME agent we ship does that, and you may simply piggyback on
> that, by executing "/usr/bin/pkexec /lib/systemd/systemd-reply-password 1
> /path/to/socket" or "/usr/bin/pkexec /lib/systemd/systemd-reply-password 0
> /path/to/socket" and writing the password to its standard input. Use ‘1' as
> argument if a password was entered by the user, or ‘0' if the user canceled
> the request.
>
> - If you do not want to use PK ensure to acquire the necessary privileges in
> some other way and send a single datagram to the socket consisting of the
> password string either prefixed with "+" or with "-" depending on whether
> the password entry was successful or not. You may but don't have to include
> a final NUL byte in your message.
"""
def reread_existing(*args, **kwargs):
print('DEBUG: Re-reading {}'.format(ASK_PASSWORD_DIRECTORY))
fs = os.listdir(ASK_PASSWORD_DIRECTORY)
fs = [i for i in fs if i.startswith('ask.')]
for filename in fs:
handler.handle_askfile(os.path.join(ASK_PASSWORD_DIRECTORY, filename))
def notifier_run_one(source, condition, notifier):
print('DEBUG: Got pyinotify event')
# while notifier.check_events(timeout=<ms>):
notifier.read_events()
notifier.process_events()
return True
ASK_PASSWORD_DIRECTORY = '/run/systemd/ask-password'
wm = pyinotify.WatchManager()
handler = AskPasswordHandler()
notifier = pyinotify.Notifier(wm, handler)
mask = pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MOVED_TO
wm.add_watch(ASK_PASSWORD_DIRECTORY, mask)
# Install handler so we re-read files on SIGHUP or SIGUSR1.
signal.signal(signal.SIGHUP, reread_existing)
signal.signal(signal.SIGUSR1, reread_existing)
listen_for_unlock(reread_existing)
# Read/handle already existing files.. just in case.
reread_existing()
# Infinite loop time.
print('Started {}, listening in /run/systemd/ask-password'.format(sys.argv[0]))
if GLib:
# Set up notifier_run_one() call into GLib MainLoop so we don't have
# to block in notifier.loop().
GLib.io_add_watch(notifier._fd, GLib.IO_IN, notifier_run_one, notifier)
# GLib mainloop.
signal.signal(signal.SIGINT, signal.SIG_DFL)
loop = GLib.MainLoop()
try:
print('DEBUG: GLib mainloop')
loop.run()
except KeyboardInterrupt:
pass
notifier.stop()
else:
# pyinotify mainloop.
print('DEBUG: pyinotify mainloop')
notifier.loop()
print('done.')
# vim: set ts=8 sw=4 sts=4 et ai: