#!/usr/bin/env python3 import os import sys import json import logging import argparse import tempfile import subprocess from datetime import datetime, timedelta from urllib.parse import urlsplit logger = logging.getLogger("qute_1pass") CACHE_DIR = os.path.join(tempfile.gettempdir(), "qute_1pass") os.makedirs(CACHE_DIR, exist_ok=True) os.chmod(CACHE_DIR, 0o750) SESSION_PATH = os.path.join(CACHE_DIR, "session") SESSION_DURATION = timedelta(minutes=30) LAST_ITEM_PATH = os.path.join(CACHE_DIR, "last_item") LAST_ITEM_DURATION = timedelta(seconds=10) CMD_PASSWORD_PROMPT = [ "rofi", "-password", "-dmenu", "-p", "Vault Password", "-l", "0", "-sidebar", "-width", "20" ] CMD_LIST_PROMPT = ["rofi", "-dmenu"] CMD_ITEM_SELECT = CMD_LIST_PROMPT + ["-p", "Select login"] CMD_OP_CHECK_LOGIN = ["op", "whoami"] CMD_OP_LOGIN = ["op", "signin", "--raw"] CMD_OP_LIST_ITEMS = "op item list --categories Login --session {session_id} --format=json" CMD_OP_GET_ITEM = "op item get --session {session_id} {uuid} --format=json" CMD_OP_GET_TOTP = "op item get --otp --session {session_id} {uuid}" QUTE_FIFO = os.environ["QUTE_FIFO"] parser = argparse.ArgumentParser() parser.add_argument( "command", help="fill_credentials, fill_totp, fill_username, fill_password" ) parser.add_argument( "--auto-submit", help="Auto submit after filling", action="store_true" ) parser.add_argument( "--cache-session", help="Cache 1password session for 30 minutes", action="store_true", ) parser.add_argument( "--allow-insecure-sites", help="Allow filling credentials on insecure sites", action="store_true", ) parser.add_argument( "--cache", help="store and use cached information", action="store_true", ) parser.add_argument( "--biometric", help="Use biometric unlock - don't ask for password", action="store_true", ) class Qute: """Logic related to qutebrowser""" @classmethod def _command(cls, command, *args): with open(QUTE_FIFO, "w") as fifo: logger.info(f"{command} {' '.join(args)}") fifo.write(f"{command} {' '.join(args)}\n") fifo.flush() @classmethod def _message(cls, message, type="error"): cls._command(f"message-{type}", f"'qute-1password: {message}'") @classmethod def message_error(cls, message): cls._message(message) @classmethod def message_warning(cls, message): cls._message(message, type="warning") @classmethod def fake_key(cls, key): key = key.replace(" ", "") cls._command("fake-key", key) @classmethod def fill_credentials_tabmode(cls, username, password, submit=False): cls.fake_key(username) cls.fake_key("") cls.fake_key(password) if submit: cls.fake_key("") @classmethod def fill_single_field_tabmode(cls, value, submit=False): cls.fake_key(value) if submit: cls.fake_key("") @classmethod def fill_totp(cls, totp, submit=True): cls.fake_key(totp) if submit: cls.fake_key("") class ExecuteError(Exception): """Used when commands executed return code is not 0""" pass def execute_command(command): """Executes a command, mainly used to launch commands for user input and the op cli""" result = subprocess.run(command, capture_output=True, encoding="utf-8") if result.returncode != 0: logger.error(result.stderr) raise ExecuteError(result.stderr) return result.stdout.strip() def pipe_commands(cmd1, cmd2): p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE) p2 = subprocess.Popen(cmd2, stdin=p1.stdout, stdout=subprocess.PIPE) p1.stdout.close() return p2.communicate()[0].decode("utf-8").strip() def extract_host(url): """Extracts the host from a given URL""" _, host, *_ = urlsplit(url) return host class OnePass: """Logic related to the op command and parsing results""" @classmethod def login(cls): if arguments.biometric: try: execute_command(CMD_OP_CHECK_LOGIN) except ExecuteError: try: execute_command(CMD_OP_LOGIN) except ExecuteError: Qute.message_error("Login error") sys.exit(0) return "0" else: try: password = execute_command(CMD_PASSWORD_PROMPT) except ExecuteError: Qute.message_error("Error calling pinentry program") sys.exit(0) try: session_id = pipe_commands( ["echo", "-n", password], CMD_OP_LOGIN) except ExecuteError: Qute.message_error("Login error") sys.exit(0) if arguments.cache_session: with open(SESSION_PATH, "w") as handler: handler.write(session_id) os.chmod(SESSION_PATH, 0o640) return session_id @classmethod def get_session(cls): """ Returns a session for the op command to make calls with. If a session is cached, we check if it's expired first to avoid any errors. """ if arguments.cache_session and os.path.isfile(SESSION_PATH): # op sessions last 30 minutes, check if still valid creation_time = datetime.fromtimestamp(os.stat(SESSION_PATH).st_ctime) if (datetime.now() - creation_time) < SESSION_DURATION: return open(SESSION_PATH, "r").read() else: # Session expired os.unlink(SESSION_PATH) return cls.login() @classmethod def list_items(cls): session_id = cls.get_session() result = execute_command(CMD_OP_LIST_ITEMS.format(session_id=session_id).split()) parsed = json.loads(result) return parsed @classmethod def get_item(cls, uuid): session_id = cls.get_session() try: result = execute_command( CMD_OP_GET_ITEM.format(uuid=uuid, session_id=session_id).split() ) except ExecuteError: logger.error("Error retrieving credential", exc_info=True) parsed = json.loads(result) return parsed @classmethod def get_item_for_url(cls, url): host = extract_host(url) def filter_host(item): """Exclude items that does not match host on any configured URL""" if "urls" in item: return any(filter(lambda x: host in x["href"], item["urls"])) return False items = cls.list_items() filtered = filter(filter_host, items) mapping = { f"{host}: {item['title']} ({item['id']})": item for item in filtered } if not mapping: raise cls.NoItemsFoundError(f"No items found for host {host}") try: credential = pipe_commands( ["echo", "\n".join(mapping.keys())], CMD_ITEM_SELECT ) except ExecuteError: pass if not credential: # Cancelled return return cls.get_item(mapping[credential]["id"]) @classmethod def get_credentials(cls, item): username = password = None for field in item["fields"]: if field.get("purpose") == "USERNAME": username = field["value"] if field.get("purpose") == "PASSWORD": password = field["value"] if username is None or password is None: logger.warning( "Present: username={username} password={password}".format( username=username is not None, password=password is not None ) ) Qute.message_warning("Filled incomplete credentials") return {"username": username, "password": password} @classmethod def get_totp(cls, uuid): session_id = cls.get_session() try: return execute_command( CMD_OP_GET_TOTP.format(uuid=uuid, session_id=session_id).split() ) except ExecuteError: logger.error("Error retrieving TOTP", exc_info=True) class NoItemsFoundError(Exception): pass class CLI: def __init__(self, arguments): self.arguments = arguments def run(self): command = self.arguments.command if command != "run" and not command.startswith("_") and hasattr(self, command): return getattr(self, command)() def _get_item(self): try: item = OnePass.get_item_for_url(os.environ["QUTE_URL"]) except OnePass.NoItemsFoundError as error: Qute.message_warning("No item found for this site") logger.error(f"No item found for site: {os.environ['QUTE_URL']}") logger.error(error) sys.exit(0) return item def _store_last_item(self, item): """ Stores a reference to an item to easily get single information from it (password, TOTP) right after filling the username or credentials. """ last_item = {"host": extract_host(os.environ["QUTE_URL"]), "id": item["id"]} with open(LAST_ITEM_PATH, "w") as handler: handler.write(json.dumps(last_item)) os.chmod(LAST_ITEM_PATH, 0o640) def _fill_single_field(self, field): item = self._get_item() credentials = OnePass.get_credentials(item) Qute.fill_single_field_tabmode( credentials[field], submit=self.arguments.auto_submit ) return item def fill_username(self): item = self._fill_single_field("username") self._store_last_item(item) def fill_password(self): item = self._fill_single_field("password") self._store_last_item(item) def fill_credentials(self): item = self._get_item() credentials = OnePass.get_credentials(item) Qute.fill_credentials_tabmode( *credentials.values(), submit=self.arguments.auto_submit ) self._store_last_item(item) def fill_totp(self): # Check last item first # If theres a last_item file created in the last LAST_ITEM_DURATION seconds # and the host matches the one the user is visiting, use that UUID to retrieve # the totp item = None if os.path.isfile(LAST_ITEM_PATH): creation_time = datetime.fromtimestamp(os.stat(LAST_ITEM_PATH).st_ctime) if (datetime.now() - creation_time) < LAST_ITEM_DURATION: last_item = json.loads(open(LAST_ITEM_PATH, "r").read()) if last_item["host"] == extract_host(os.environ["QUTE_URL"]): item = last_item if not item: item = self._get_item() totp = OnePass.get_totp(item["id"]) logger.error(totp) Qute.fill_totp(totp) if os.path.isfile(LAST_ITEM_PATH): os.unlink(LAST_ITEM_PATH) if __name__ == "__main__": arguments = parser.parse_args() if arguments.cache: # add --cache to cacheable commands with CMD_OP_LIST_ITEMS += " --cache" CMD_OP_GET_ITEM += " --cache" # Prevent filling credentials in non-secure sites if not explicitly allwoed if not arguments.allow_insecure_sites: if urlsplit(os.environ["QUTE_URL"])[0] != "https": Qute.message_error( "Trying to fill a non-secure site. If you want to allow it add the --allow-insecure-sites flag." ) logger.error("Refusing to fill credentials on non-secure sites") sys.exit(0) cli = CLI(arguments) sys.exit(cli.run())