qute-1password/qute_1pass.py

382 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import sys
import json
import logging
import argparse
import tempfile
import subprocess
from datetime import datetime, timedelta
from urllib.parse import urlsplit
logger = logging.getLogger("qute_1pass")
CACHE_DIR = os.path.join(tempfile.gettempdir(), "qute_1pass")
os.makedirs(CACHE_DIR, exist_ok=True)
os.chmod(CACHE_DIR, 0o750)
SESSION_PATH = os.path.join(CACHE_DIR, "session")
SESSION_DURATION = timedelta(minutes=30)
LAST_ITEM_PATH = os.path.join(CACHE_DIR, "last_item")
LAST_ITEM_DURATION = timedelta(seconds=10)
CMD_PASSWORD_PROMPT = [
"rofi", "-password", "-dmenu", "-p", "Vault Password", "-l", "0", "-sidebar", "-width", "20"
]
CMD_LIST_PROMPT = ["rofi", "-dmenu"]
CMD_ITEM_SELECT = CMD_LIST_PROMPT + ["-p", "Select login"]
CMD_OP_CHECK_LOGIN = ["op", "whoami"]
CMD_OP_LOGIN = ["op", "signin", "--raw"]
CMD_OP_LIST_ITEMS = "op item list --categories Login --session {session_id} --format=json"
CMD_OP_GET_ITEM = "op item get --session {session_id} {uuid} --format=json"
CMD_OP_GET_TOTP = "op item get --otp --session {session_id} {uuid}"
QUTE_FIFO = os.environ["QUTE_FIFO"]
parser = argparse.ArgumentParser()
parser.add_argument(
"command", help="fill_credentials, fill_totp, fill_username, fill_password"
)
parser.add_argument(
"--auto-submit", help="Auto submit after filling", action="store_true"
)
parser.add_argument(
"--cache-session",
help="Cache 1password session for 30 minutes",
action="store_true",
)
parser.add_argument(
"--allow-insecure-sites",
help="Allow filling credentials on insecure sites",
action="store_true",
)
parser.add_argument(
"--cache",
help="store and use cached information",
action="store_true",
)
parser.add_argument(
"--biometric",
help="Use biometric unlock - don't ask for password",
action="store_true",
)
class Qute:
"""Logic related to qutebrowser"""
@classmethod
def _command(cls, command, *args):
with open(QUTE_FIFO, "w") as fifo:
logger.info(f"{command} {' '.join(args)}")
fifo.write(f"{command} {' '.join(args)}\n")
fifo.flush()
@classmethod
def _message(cls, message, type="error"):
cls._command(f"message-{type}", f"'qute-1password: {message}'")
@classmethod
def message_error(cls, message):
cls._message(message)
@classmethod
def message_warning(cls, message):
cls._message(message, type="warning")
@classmethod
def fake_key(cls, key):
key = key.replace(" ", "<Space>")
cls._command("fake-key", key)
@classmethod
def fill_credentials_tabmode(cls, username, password, submit=False):
cls.fake_key(username)
cls.fake_key("<TAB>")
cls.fake_key(password)
if submit:
cls.fake_key("<Return>")
@classmethod
def fill_single_field_tabmode(cls, value, submit=False):
cls.fake_key(value)
if submit:
cls.fake_key("<Return>")
@classmethod
def fill_totp(cls, totp, submit=True):
cls.fake_key(totp)
if submit:
cls.fake_key("<Return>")
class ExecuteError(Exception):
"""Used when commands executed return code is not 0"""
pass
def execute_command(command):
"""Executes a command, mainly used to launch commands for user input and the op cli"""
result = subprocess.run(command, capture_output=True, encoding="utf-8")
if result.returncode != 0:
logger.error(result.stderr)
raise ExecuteError(result.stderr)
return result.stdout.strip()
def pipe_commands(cmd1, cmd2):
p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE)
p2 = subprocess.Popen(cmd2, stdin=p1.stdout, stdout=subprocess.PIPE)
p1.stdout.close()
return p2.communicate()[0].decode("utf-8").strip()
def extract_host(url):
"""Extracts the host from a given URL"""
_, host, *_ = urlsplit(url)
return host
class OnePass:
"""Logic related to the op command and parsing results"""
@classmethod
def login(cls):
if arguments.biometric:
try:
execute_command(CMD_OP_CHECK_LOGIN)
except ExecuteError:
try:
execute_command(CMD_OP_LOGIN)
except ExecuteError:
Qute.message_error("Login error")
sys.exit(0)
return "0"
else:
try:
password = execute_command(CMD_PASSWORD_PROMPT)
except ExecuteError:
Qute.message_error("Error calling pinentry program")
sys.exit(0)
try:
session_id = pipe_commands(
["echo", "-n", password],
CMD_OP_LOGIN)
except ExecuteError:
Qute.message_error("Login error")
sys.exit(0)
if arguments.cache_session:
with open(SESSION_PATH, "w") as handler:
handler.write(session_id)
os.chmod(SESSION_PATH, 0o640)
return session_id
@classmethod
def get_session(cls):
"""
Returns a session for the op command to make calls with.
If a session is cached, we check if it's expired first to avoid any errors.
"""
if arguments.cache_session and os.path.isfile(SESSION_PATH):
# op sessions last 30 minutes, check if still valid
creation_time = datetime.fromtimestamp(os.stat(SESSION_PATH).st_ctime)
if (datetime.now() - creation_time) < SESSION_DURATION:
return open(SESSION_PATH, "r").read()
else:
# Session expired
os.unlink(SESSION_PATH)
return cls.login()
@classmethod
def list_items(cls):
session_id = cls.get_session()
result = execute_command(CMD_OP_LIST_ITEMS.format(session_id=session_id).split())
parsed = json.loads(result)
return parsed
@classmethod
def get_item(cls, uuid):
session_id = cls.get_session()
try:
result = execute_command(
CMD_OP_GET_ITEM.format(uuid=uuid, session_id=session_id).split()
)
except ExecuteError:
logger.error("Error retrieving credential", exc_info=True)
parsed = json.loads(result)
return parsed
@classmethod
def get_item_for_url(cls, url):
host = extract_host(url)
def filter_host(item):
"""Exclude items that does not match host on any configured URL"""
if "urls" in item:
return any(filter(lambda x: host in x["href"], item["urls"]))
return False
items = cls.list_items()
filtered = filter(filter_host, items)
mapping = {
f"{host}: {item['title']} ({item['id']})": item
for item in filtered
}
if not mapping:
raise cls.NoItemsFoundError(f"No items found for host {host}")
try:
credential = pipe_commands(
["echo", "\n".join(mapping.keys())], CMD_ITEM_SELECT
)
except ExecuteError:
pass
if not credential:
# Cancelled
return
return cls.get_item(mapping[credential]["id"])
@classmethod
def get_credentials(cls, item):
username = password = None
for field in item["fields"]:
if field.get("purpose") == "USERNAME":
username = field["value"]
if field.get("purpose") == "PASSWORD":
password = field["value"]
if username is None or password is None:
logger.warning(
"Present: username={username} password={password}".format(
username=username is not None, password=password is not None
)
)
Qute.message_warning("Filled incomplete credentials")
return {"username": username, "password": password}
@classmethod
def get_totp(cls, uuid):
session_id = cls.get_session()
try:
return execute_command(
CMD_OP_GET_TOTP.format(uuid=uuid, session_id=session_id).split()
)
except ExecuteError:
logger.error("Error retrieving TOTP", exc_info=True)
class NoItemsFoundError(Exception):
pass
class CLI:
def __init__(self, arguments):
self.arguments = arguments
def run(self):
command = self.arguments.command
if command != "run" and not command.startswith("_") and hasattr(self, command):
return getattr(self, command)()
def _get_item(self):
try:
item = OnePass.get_item_for_url(os.environ["QUTE_URL"])
except OnePass.NoItemsFoundError as error:
Qute.message_warning("No item found for this site")
logger.error(f"No item found for site: {os.environ['QUTE_URL']}")
logger.error(error)
sys.exit(0)
return item
def _store_last_item(self, item):
"""
Stores a reference to an item to easily get single information from it (password, TOTP)
right after filling the username or credentials.
"""
last_item = {"host": extract_host(os.environ["QUTE_URL"]), "id": item["id"]}
with open(LAST_ITEM_PATH, "w") as handler:
handler.write(json.dumps(last_item))
os.chmod(LAST_ITEM_PATH, 0o640)
def _fill_single_field(self, field):
item = self._get_item()
credentials = OnePass.get_credentials(item)
Qute.fill_single_field_tabmode(
credentials[field], submit=self.arguments.auto_submit
)
return item
def fill_username(self):
item = self._fill_single_field("username")
self._store_last_item(item)
def fill_password(self):
item = self._fill_single_field("password")
self._store_last_item(item)
def fill_credentials(self):
item = self._get_item()
credentials = OnePass.get_credentials(item)
Qute.fill_credentials_tabmode(
*credentials.values(), submit=self.arguments.auto_submit
)
self._store_last_item(item)
def fill_totp(self):
# Check last item first
# If theres a last_item file created in the last LAST_ITEM_DURATION seconds
# and the host matches the one the user is visiting, use that UUID to retrieve
# the totp
item = None
if os.path.isfile(LAST_ITEM_PATH):
creation_time = datetime.fromtimestamp(os.stat(LAST_ITEM_PATH).st_ctime)
if (datetime.now() - creation_time) < LAST_ITEM_DURATION:
last_item = json.loads(open(LAST_ITEM_PATH, "r").read())
if last_item["host"] == extract_host(os.environ["QUTE_URL"]):
item = last_item
if not item:
item = self._get_item()
totp = OnePass.get_totp(item["id"])
logger.error(totp)
Qute.fill_totp(totp)
if os.path.isfile(LAST_ITEM_PATH):
os.unlink(LAST_ITEM_PATH)
if __name__ == "__main__":
arguments = parser.parse_args()
if arguments.cache:
# add --cache to cacheable commands with
CMD_OP_LIST_ITEMS += " --cache"
CMD_OP_GET_ITEM += " --cache"
# Prevent filling credentials in non-secure sites if not explicitly allwoed
if not arguments.allow_insecure_sites:
if urlsplit(os.environ["QUTE_URL"])[0] != "https":
Qute.message_error(
"Trying to fill a non-secure site. If you want to allow it add the --allow-insecure-sites flag."
)
logger.error("Refusing to fill credentials on non-secure sites")
sys.exit(0)
cli = CLI(arguments)
sys.exit(cli.run())