diff --git a/.gitignore b/.gitignore index 98d45c9..9a8364e 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ -.c9revisions/ \ No newline at end of file +.c9revisions/ +tests.py +*.pyc diff --git a/README.md b/README.md index 474bdb2..c9666bd 100644 --- a/README.md +++ b/README.md @@ -17,18 +17,31 @@ pip install python-yubikey from yubikey import Yubikey yubi = Yubikey() -yubi.register('') +yubi.register('', '') # yubi.id and yubi.key are now set ``` ## Check valid OTP ``` +yubi = Yubikey(, ) result = yubi.verify('') # True / False +# If is provided, requests will be signed and the responses checked. +``` + +## Optionals + +``` +# Using custom API server +# Must be one of YubicoWS._servers +yubi = Yubikey(123, 'dGhpc3JlcG9yb2Nrcw==', server='api2.yubico.com') + +# Using http instead of https +yubi = Yubikey(123, 'dGhpc3JlcG9yb2Nrcw==', protocol='http') ``` # NO WARRANTY THE PROGRAM IS DISTRIBUTED IN THE HOPE THAT IT WILL BE USEFUL, BUT WITHOUT ANY WARRANTY. IT IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION. -IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW THE AUTHOR WILL BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. \ No newline at end of file +IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW THE AUTHOR WILL BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. diff --git a/tests.py b/tests.py deleted file mode 100644 index d59be43..0000000 --- a/tests.py +++ /dev/null @@ -1,5 +0,0 @@ -from yubikey import Yubikey - - -yubi = Yubikey(123) -yubi.verify('asdadadasdasdadadadadasdasdadsasdadadsadadad') \ No newline at end of file diff --git a/yubikey.py b/yubikey.py index 7efa2f9..b81d590 100644 --- a/yubikey.py +++ b/yubikey.py @@ -1,15 +1,18 @@ #!/usr/bin/env python - -import requests import string from random import choice +import base64 +from hashlib import sha1 +import hmac +import requests class YubicoWS(object): register_ws = 'https://upgrade.yubico.com/getapikey/?format=json' - _api_ws = 'http://%s/wsapi/2.0/' api_ws = None - + + _protocol = 'https' + _servers = [ 'api.yubico.com', 'api2.yubico.com', @@ -17,25 +20,40 @@ class YubicoWS(object): 'api4.yubico.com', 'api5.yubico.com', ] - + _server = None + + _api_ws = '%s://%s/wsapi/2.0/' + _errors = { 'BAD_OTP': 'The OTP is invalid format.', 'REPLAYED_OTP': 'The OTP has already been seen by the service.', 'BAD_SIGNATURE': 'The HMAC signature verification failed.', 'MISSING_PARAMETER': 'The request lacks a parameter.', 'NO_SUCH_CLIENT': 'The request id does not exist.', - 'OPERATION_NOT_ALLOWED': 'The request id is not allowed to verify OTPs.', - 'BACKEND_ERROR': 'Unexpected error in our server. Please contact us if you see this error.', - 'NOT_ENOUGH_ANSWERS': 'Server could not get requested number of syncs during before timeout', + 'OPERATION_NOT_ALLOWED': 'The request id is not allowed ' + 'to verify OTPs.', + 'BACKEND_ERROR': 'Unexpected error in our server. Please contact ' + 'us if you see this error.', + # 2.0 + 'NOT_ENOUGH_ANSWERS': 'Server could not get requested number of syncs ' + 'during before timeout', 'REPLAYED_REQUEST': 'Server has seen the OTP/Nonce combination before', } - - def __init__(self): - self.select_random_server() - def select_random_server(self): - "Select random API Server" - self.api_ws = self._api_ws % choice(self._servers) + def __init__(self, **kwargs): + self._protocol = kwargs.get('protocol', self._protocol) + self._server = kwargs.get('server', None) + self.select_server() + + def select_server(self): + "Select server if provided, otherwise uses one from the list" + if self._server and self._server in self._servers: + self.api_ws = self._api_ws % (self._protocol, self._server) + else: + self.api_ws = self._api_ws % ( + self._protocol, + choice(self._servers) + ) def register_api_key(self, email, otp): "Registers an API Key with the servers" @@ -46,57 +64,85 @@ class YubicoWS(object): response = requests.post(self.register_ws, data) ws_response = response.json() if not ws_response['status']: - raise WSError(ws_response['error']) - + raise YubicoWSError(ws_response['error']) + return ws_response - + def verify(self, yubikey_id, otp, key=None): "Verifies the provided OTP with the server" endpoint = 'verify' url = self.api_ws + endpoint - + # Check otp format if not (len(otp) > 32 and len(otp) < 48): raise OTPIncorrectFormat() - + nonce = self.generate_nonce() - + data = { - 'id': int(yubikey_id), + 'id': str(yubikey_id), 'otp': str.lower(otp), 'nonce': nonce } - + # Use API key for signing the message if key is provided if key: - data = self.sign_otp(data, key) - + data['h'] = self.sign(data, key) + response = requests.get(url, params=data) - + ws_response = self.parse_ws_response(response.text) - print(ws_response) + if ws_response['status'] == 'OK': # Check if response is valid - if not (ws_response['nonce'] == nonce \ - and ws_response['otp'] != otp \ - and True): - raise WSInvalidResponse() - # TODO check signature + if not (ws_response['nonce'] == data['nonce'] + and ws_response['otp'] == otp): + raise YubicoWSInvalidResponse() + + if key: + signature = self.sign(ws_response, key) + + if ws_response['h'] != signature: + raise YubicoWSResponseBadSignature( + "The signature sent by the server is invalid" + ) + else: - raise WSError(self._errors[ws_response['status']]) - + raise YubicoWSError(self._errors[ws_response['status']]) + return ws_response - def sign_otp(self, data, key): - "Signs the OTP with the provided key" - return data + def sign(self, data, key): + "Signs the message with the provided key" + # Sort k=v dict + params = [] + + for k in sorted(data.keys()): + if k != 'h': # Just in case + key_value = "%s=%s" % (k, data[k]) + params.append(key_value) + + # Join as urlparams + parameters = '&'.join(params) + + # hmac-sha1 + hashed_string = hmac.new( + base64.b64decode(key), + parameters, + sha1 + ).digest() + + # base64 encode + signature = base64.b64encode(hashed_string) + + return signature def parse_ws_response(self, text): "Parses the API key=value response into a dict" data = {} - for line in text.strip().split('\n'): + for line in text.split(): key, value = line.split('=', 1) - data[key] = value + data[key.strip()] = value.strip() return data def generate_nonce(self): @@ -109,11 +155,11 @@ class Yubikey(object): id = None key = None prefix = None - + _last_result = False - - def __init__(self, yubikey_id=None, key=None): - self.ws = YubicoWS() + + def __init__(self, yubikey_id=None, key=None, **kwargs): + self.ws = YubicoWS(**kwargs) if yubikey_id: self.id = yubikey_id if key: @@ -128,36 +174,38 @@ class Yubikey(object): self.id = credentials['id'] self.key = credentials['key'] result = True - + return result - + def verify(self, otp): "Verify an OTP to check if its valid" result = False if self.id: self.get_prefix(otp) - result = self.ws.verify(self.id, otp, key=self.key) - if result == 'OK': + try: + self.ws.verify(self.id, otp, key=self.key) result = True - + except (YubicoWSResponseBadSignature, YubicoWSError): + result = False + self._last_result = result return result - + def get_prefix(self, otp): "Get prefix from an OTP if present" if len(otp) > 32: self.prefix = str.lower(otp[:-32]) - -class WSError(Exception): + +class YubicoWSError(Exception): def __init__(self, message=None): self.msg = "Web Service responded with an error: %s" % message - + def __str__(self): return repr(self.msg) - -class WSInvalidResponse(Exception): + +class YubicoWSInvalidResponse(Exception): msg = 'Response from the server is invalid' @@ -168,3 +216,7 @@ class WSResponseError(Exception): class OTPIncorrectFormat(Exception): pass + + +class YubicoWSResponseBadSignature(Exception): + pass