Added hmac-sha1 signature

This commit is contained in:
Felipe 2013-07-08 11:28:17 -04:00
parent c8b078d0c1
commit 9bcae287e4
1 changed files with 98 additions and 50 deletions

View File

@ -1,15 +1,18 @@
#!/usr/bin/env python #!/usr/bin/env python
import requests
import string import string
from random import choice from random import choice
import base64
from hashlib import sha1
import hmac
import requests
class YubicoWS(object): class YubicoWS(object):
register_ws = 'https://upgrade.yubico.com/getapikey/?format=json' register_ws = 'https://upgrade.yubico.com/getapikey/?format=json'
_api_ws = 'http://%s/wsapi/2.0/'
api_ws = None api_ws = None
_protocol = 'https'
_servers = [ _servers = [
'api.yubico.com', 'api.yubico.com',
'api2.yubico.com', 'api2.yubico.com',
@ -17,25 +20,37 @@ class YubicoWS(object):
'api4.yubico.com', 'api4.yubico.com',
'api5.yubico.com', 'api5.yubico.com',
] ]
_server = None
_api_ws = '%s://%s/wsapi/2.0/'
_errors = { _errors = {
'BAD_OTP': 'The OTP is invalid format.', 'BAD_OTP': 'The OTP is invalid format.',
'REPLAYED_OTP': 'The OTP has already been seen by the service.', 'REPLAYED_OTP': 'The OTP has already been seen by the service.',
'BAD_SIGNATURE': 'The HMAC signature verification failed.', 'BAD_SIGNATURE': 'The HMAC signature verification failed.',
'MISSING_PARAMETER': 'The request lacks a parameter.', 'MISSING_PARAMETER': 'The request lacks a parameter.',
'NO_SUCH_CLIENT': 'The request id does not exist.', 'NO_SUCH_CLIENT': 'The request id does not exist.',
'OPERATION_NOT_ALLOWED': 'The request id is not allowed to verify OTPs.', 'OPERATION_NOT_ALLOWED': 'The request id is not allowed '
'BACKEND_ERROR': 'Unexpected error in our server. Please contact us if you see this error.', 'to verify OTPs.',
'NOT_ENOUGH_ANSWERS': 'Server could not get requested number of syncs during before timeout', 'BACKEND_ERROR': 'Unexpected error in our server. Please contact '
'us if you see this error.',
# 2.0
'NOT_ENOUGH_ANSWERS': 'Server could not get requested number of syncs '
'during before timeout',
'REPLAYED_REQUEST': 'Server has seen the OTP/Nonce combination before', 'REPLAYED_REQUEST': 'Server has seen the OTP/Nonce combination before',
} }
def __init__(self):
self.select_random_server()
def select_random_server(self): def __init__(self, **kwargs):
"Select random API Server" self._protocol = kwargs.get('protocol', self._protocol)
self.api_ws = self._api_ws % choice(self._servers) self._server = kwargs.get('server', None)
self.select_server()
def select_server(self):
"Select server if provided, otherwise uses one from the list"
if self._server and self._server in self._servers:
self.api_ws = self._api_ws % (self._protocol, self._server)
else:
self.api_ws = self._api_ws % (self._protocol, choice(self._servers))
def register_api_key(self, email, otp): def register_api_key(self, email, otp):
"Registers an API Key with the servers" "Registers an API Key with the servers"
@ -46,57 +61,84 @@ class YubicoWS(object):
response = requests.post(self.register_ws, data) response = requests.post(self.register_ws, data)
ws_response = response.json() ws_response = response.json()
if not ws_response['status']: if not ws_response['status']:
raise WSError(ws_response['error']) raise YubicoWSError(ws_response['error'])
return ws_response return ws_response
def verify(self, yubikey_id, otp, key=None): def verify(self, yubikey_id, otp, key=None):
"Verifies the provided OTP with the server" "Verifies the provided OTP with the server"
endpoint = 'verify' endpoint = 'verify'
url = self.api_ws + endpoint url = self.api_ws + endpoint
# Check otp format # Check otp format
if not (len(otp) > 32 and len(otp) < 48): if not (len(otp) > 32 and len(otp) < 48):
raise OTPIncorrectFormat() raise OTPIncorrectFormat()
nonce = self.generate_nonce() nonce = self.generate_nonce()
data = { data = {
'id': int(yubikey_id), 'id': str(yubikey_id),
'otp': str.lower(otp), 'otp': str.lower(otp),
'nonce': nonce 'nonce': nonce
} }
# Use API key for signing the message if key is provided # Use API key for signing the message if key is provided
if key: if key:
data = self.sign_otp(data, key) data['h'] = self.sign(data, key).replace('+', '%2B')
response = requests.get(url, params=data) response = requests.get(url, params=data)
ws_response = self.parse_ws_response(response.text) ws_response = self.parse_ws_response(response.text)
print(ws_response)
if ws_response['status'] == 'OK': if ws_response['status'] == 'OK':
# Check if response is valid # Check if response is valid
if not (ws_response['nonce'] == nonce \ if not (ws_response['nonce'] == nonce \
and ws_response['otp'] != otp \ and ws_response['otp'] != otp \
and True): and True):
raise WSInvalidResponse() raise YubicoWSInvalidResponse()
# TODO check signature
if key:
signature = self.sign(ws_response, key)
if data['h'] != signature:
raise YubicoWSResponseBadSignature(
"The signature sent by the server is invalid"
)
else: else:
raise WSError(self._errors[ws_response['status']]) raise YubicoWSError(self._errors[ws_response['status']])
return ws_response return ws_response
def sign_otp(self, data, key): def sign(self, data, key):
"Signs the OTP with the provided key" "Signs the message with the provided key"
return data if 'h' in data:
# Just in case
data.pop('h')
# Sorted k=v dict
params = []
for k in sorted(data.keys()):
key_value = "%s=%s" % (k, data[k])
params.append(key_value)
# Join as urlparams
string = '&'.join(params)
# hmac-sha1
hashed_string = hmac.new(base64.b64decode(key), string, sha1).digest()
# base64 encode
signature = base64.b64encode(hashed_string)
return signature
def parse_ws_response(self, text): def parse_ws_response(self, text):
"Parses the API key=value response into a dict" "Parses the API key=value response into a dict"
data = {} data = {}
for line in text.strip().split('\n'): for line in text.split():
key, value = line.split('=', 1) key, value = line.split('=', 1)
data[key] = value data[key.strip()] = value.strip()
return data return data
def generate_nonce(self): def generate_nonce(self):
@ -109,11 +151,11 @@ class Yubikey(object):
id = None id = None
key = None key = None
prefix = None prefix = None
_last_result = False _last_result = False
def __init__(self, yubikey_id=None, key=None): def __init__(self, yubikey_id=None, key=None, **kwargs):
self.ws = YubicoWS() self.ws = YubicoWS(**kwargs)
if yubikey_id: if yubikey_id:
self.id = yubikey_id self.id = yubikey_id
if key: if key:
@ -128,36 +170,38 @@ class Yubikey(object):
self.id = credentials['id'] self.id = credentials['id']
self.key = credentials['key'] self.key = credentials['key']
result = True result = True
return result return result
def verify(self, otp): def verify(self, otp):
"Verify an OTP to check if its valid" "Verify an OTP to check if its valid"
result = False result = False
if self.id: if self.id:
self.get_prefix(otp) self.get_prefix(otp)
result = self.ws.verify(self.id, otp, key=self.key) try:
if result == 'OK': response = self.ws.verify(self.id, otp, key=self.key)
result = True result = True
except YubicoWSResponseBadSignature, YubicoWSError:
result = False
self._last_result = result self._last_result = result
return result return result
def get_prefix(self, otp): def get_prefix(self, otp):
"Get prefix from an OTP if present" "Get prefix from an OTP if present"
if len(otp) > 32: if len(otp) > 32:
self.prefix = str.lower(otp[:-32]) self.prefix = str.lower(otp[:-32])
class WSError(Exception):
class YubicoWSError(Exception):
def __init__(self, message=None): def __init__(self, message=None):
self.msg = "Web Service responded with an error: %s" % message self.msg = "Web Service responded with an error: %s" % message
def __str__(self): def __str__(self):
return repr(self.msg) return repr(self.msg)
class WSInvalidResponse(Exception):
class YubicoWSInvalidResponse(Exception):
msg = 'Response from the server is invalid' msg = 'Response from the server is invalid'
@ -168,3 +212,7 @@ class WSResponseError(Exception):
class OTPIncorrectFormat(Exception): class OTPIncorrectFormat(Exception):
pass pass
class YubicoWSResponseBadSignature(Exception):
pass