python-yubikey/yubikey.py

233 lines
6.5 KiB
Python
Raw Permalink Normal View History

2013-07-04 14:48:37 +00:00
#!/usr/bin/env python
import string
from random import choice
2013-07-08 15:28:17 +00:00
import base64
from hashlib import sha1
import hmac
import requests
2013-07-04 14:48:37 +00:00
class YubicoWS(object):
"""
Yubico Web Service class that interacts with the Yubico API
"""
2013-07-04 14:48:37 +00:00
register_ws = 'https://upgrade.yubico.com/getapikey/?format=json'
api_ws = None
2013-07-08 15:28:17 +00:00
_protocol = 'https'
2013-07-04 14:48:37 +00:00
_servers = [
'api.yubico.com',
'api2.yubico.com',
'api3.yubico.com',
'api4.yubico.com',
'api5.yubico.com',
]
2013-07-08 15:28:17 +00:00
_server = None
_api_ws = '%s://%s/wsapi/2.0/'
2013-07-04 14:48:37 +00:00
_errors = {
'BAD_OTP': 'The OTP is invalid format.',
'REPLAYED_OTP': 'The OTP has already been seen by the service.',
'BAD_SIGNATURE': 'The HMAC signature verification failed.',
'MISSING_PARAMETER': 'The request lacks a parameter.',
'NO_SUCH_CLIENT': 'The request id does not exist.',
2013-07-08 15:28:17 +00:00
'OPERATION_NOT_ALLOWED': 'The request id is not allowed '
'to verify OTPs.',
'BACKEND_ERROR': 'Unexpected error in our server. Please contact '
'us if you see this error.',
# 2.0
'NOT_ENOUGH_ANSWERS': 'Server could not get requested number of syncs '
'during before timeout',
2013-07-04 14:48:37 +00:00
'REPLAYED_REQUEST': 'Server has seen the OTP/Nonce combination before',
}
2013-07-08 15:28:17 +00:00
def __init__(self, **kwargs):
self._protocol = kwargs.get('protocol', self._protocol)
self._server = kwargs.get('server', None)
self.select_server()
def select_server(self):
"Select server if provided, otherwise uses one from the list"
if self._server and self._server in self._servers:
self.api_ws = self._api_ws % (self._protocol, self._server)
else:
2013-07-08 20:34:33 +00:00
self.api_ws = self._api_ws % (
self._protocol,
choice(self._servers)
)
2013-07-04 14:48:37 +00:00
def register_api_key(self, email, otp):
2013-07-05 10:54:15 +00:00
"Registers an API Key with the servers"
2013-07-04 14:48:37 +00:00
data = {
'email': email,
'otp': str.lower(otp)
}
response = requests.post(self.register_ws, data)
ws_response = response.json()
if not ws_response['status']:
2013-07-08 15:28:17 +00:00
raise YubicoWSError(ws_response['error'])
2013-07-04 14:48:37 +00:00
return ws_response
2013-07-08 15:28:17 +00:00
def verify(self, yubikey_id, otp, key=None):
2013-07-05 10:54:15 +00:00
"Verifies the provided OTP with the server"
2013-07-04 14:48:37 +00:00
endpoint = 'verify'
url = self.api_ws + endpoint
2013-07-08 15:28:17 +00:00
2013-07-04 14:48:37 +00:00
# Check otp format
if not (len(otp) > 32 and len(otp) < 48):
raise OTPIncorrectFormat()
2013-07-08 15:28:17 +00:00
2013-07-04 14:48:37 +00:00
nonce = self.generate_nonce()
2013-07-08 15:28:17 +00:00
2013-07-04 14:48:37 +00:00
data = {
2013-07-08 15:28:17 +00:00
'id': str(yubikey_id),
2013-07-04 14:48:37 +00:00
'otp': str.lower(otp),
'nonce': nonce
}
2013-07-08 15:28:17 +00:00
# Use API key for signing the message if key is provided
if key:
data['h'] = self.sign(data, key)
2013-07-08 15:28:17 +00:00
2013-07-04 14:48:37 +00:00
response = requests.get(url, params=data)
2013-07-08 15:28:17 +00:00
2013-07-04 14:48:37 +00:00
ws_response = self.parse_ws_response(response.text)
2013-07-08 15:28:17 +00:00
2013-07-04 14:48:37 +00:00
if ws_response['status'] == 'OK':
# Check if response is valid
if not (ws_response['nonce'] == data['nonce']
and ws_response['otp'] == otp):
2013-07-08 15:28:17 +00:00
raise YubicoWSInvalidResponse()
if key:
signature = self.sign(ws_response, key)
if ws_response['h'] != signature:
2013-07-08 15:28:17 +00:00
raise YubicoWSResponseBadSignature(
"The signature sent by the server is invalid"
)
2013-07-04 14:48:37 +00:00
else:
2013-07-08 15:28:17 +00:00
raise YubicoWSError(self._errors[ws_response['status']])
2013-07-04 14:48:37 +00:00
return ws_response
2013-07-08 15:28:17 +00:00
def sign(self, data, key):
"Signs the message with the provided key"
# Sort k=v dict
2013-07-08 15:28:17 +00:00
params = []
2013-07-08 15:28:17 +00:00
for k in sorted(data.keys()):
if k != 'h': # Just in case
key_value = "%s=%s" % (k, data[k])
params.append(key_value)
2013-07-08 15:28:17 +00:00
# Join as urlparams
2013-07-08 20:34:33 +00:00
parameters = '&'.join(params)
2013-07-08 15:28:17 +00:00
# hmac-sha1
2013-07-08 20:34:33 +00:00
hashed_string = hmac.new(
base64.b64decode(key),
parameters,
sha1
).digest()
2013-07-08 15:28:17 +00:00
# base64 encode
signature = base64.b64encode(hashed_string)
return signature
2013-07-04 14:48:37 +00:00
def parse_ws_response(self, text):
2013-07-05 10:54:15 +00:00
"Parses the API key=value response into a dict"
2013-07-04 14:48:37 +00:00
data = {}
2013-07-08 15:28:17 +00:00
for line in text.split():
2013-07-04 14:48:37 +00:00
key, value = line.split('=', 1)
2013-07-08 15:28:17 +00:00
data[key.strip()] = value.strip()
2013-07-04 14:48:37 +00:00
return data
def generate_nonce(self):
2013-07-05 10:54:15 +00:00
"Generates a random string"
2013-07-04 14:48:37 +00:00
chars = string.ascii_lowercase + string.digits
return ''.join(choice(chars) for x in range(40))
class Yubikey(object):
"""
Yubikey object wrapper
"""
2013-07-04 14:48:37 +00:00
id = None
key = None
prefix = None
2013-07-08 15:28:17 +00:00
2013-07-04 14:48:37 +00:00
_last_result = False
2013-07-08 15:28:17 +00:00
def __init__(self, yubikey_id=None, key=None, **kwargs):
self.ws = YubicoWS(**kwargs)
2013-07-04 14:48:37 +00:00
if yubikey_id:
self.id = yubikey_id
if key:
self.key = key
2013-07-04 14:48:37 +00:00
def register(self, email, otp):
2013-07-05 10:54:15 +00:00
"Registers this yubikey"
2013-07-04 14:48:37 +00:00
result = False
if not self.id:
credentials = self.ws.register_api_key(email, otp)
if credentials['status']:
self.id = credentials['id']
self.key = credentials['key']
result = True
2013-07-08 15:28:17 +00:00
2013-07-04 14:48:37 +00:00
return result
2013-07-08 15:28:17 +00:00
2013-07-04 14:48:37 +00:00
def verify(self, otp):
2013-07-05 10:54:15 +00:00
"Verify an OTP to check if its valid"
2013-07-04 14:48:37 +00:00
result = False
if self.id:
self.get_prefix(otp)
2013-07-08 15:28:17 +00:00
try:
2013-07-08 20:34:33 +00:00
self.ws.verify(self.id, otp, key=self.key)
2013-07-04 14:48:37 +00:00
result = True
2013-07-08 20:34:33 +00:00
except (YubicoWSResponseBadSignature, YubicoWSError):
2013-07-08 15:28:17 +00:00
result = False
2013-07-04 14:48:37 +00:00
self._last_result = result
return result
2013-07-08 15:28:17 +00:00
2013-07-04 14:48:37 +00:00
def get_prefix(self, otp):
2013-07-05 10:54:15 +00:00
"Get prefix from an OTP if present"
2013-07-04 14:48:37 +00:00
if len(otp) > 32:
self.prefix = str.lower(otp[:-32])
2013-07-08 15:28:17 +00:00
###
# Custom exceptions
###
2013-07-08 15:28:17 +00:00
class YubicoWSError(Exception):
"Web service error. Defined by yubico documentation."
2013-07-04 14:48:37 +00:00
def __init__(self, message=None):
self.msg = "Web Service responded with an error: %s" % message
2013-07-08 15:28:17 +00:00
2013-07-04 14:48:37 +00:00
def __str__(self):
return repr(self.msg)
2013-07-08 15:28:17 +00:00
class YubicoWSInvalidResponse(Exception):
"Exception if the web service answers without same otp/nonce parameters"
2013-07-04 14:48:37 +00:00
msg = 'Response from the server is invalid'
class YubicoWSResponseBadSignature(Exception):
"Exception if the web service answers with a invalid signature"
2013-07-04 14:48:37 +00:00
pass
2013-07-08 15:28:17 +00:00
class OTPIncorrectFormat(Exception):
"Exception raised if the OTP provided is incorrect"
2013-07-08 15:28:17 +00:00
pass