Merge branch 'signature'

# By Felipe (4) and Felipe Martin (2)
* signature:
  Signature fully working, updated readme
  Fixed all PEP errors
  Added hmac-sha1 signature
  Removed tests file
  Updated README
  Ignore .pyc files
This commit is contained in:
Felipe Martin 2013-07-08 22:54:25 +02:00
commit b5004601c5
4 changed files with 123 additions and 61 deletions

4
.gitignore vendored
View File

@ -1 +1,3 @@
.c9revisions/ .c9revisions/
tests.py
*.pyc

View File

@ -17,18 +17,31 @@ pip install python-yubikey
from yubikey import Yubikey from yubikey import Yubikey
yubi = Yubikey() yubi = Yubikey()
yubi.register('<INSERT OTP HERE>') yubi.register('<EMAIL>', '<INSERT OTP HERE>')
# yubi.id and yubi.key are now set # yubi.id and yubi.key are now set
``` ```
## Check valid OTP ## Check valid OTP
``` ```
yubi = Yubikey(<ID>, <Key>)
result = yubi.verify('<INSERT ANOHTER OTP HERE>') result = yubi.verify('<INSERT ANOHTER OTP HERE>')
# True / False # True / False
# If <key> is provided, requests will be signed and the responses checked.
```
## Optionals
```
# Using custom API server
# Must be one of YubicoWS._servers
yubi = Yubikey(123, 'dGhpc3JlcG9yb2Nrcw==', server='api2.yubico.com')
# Using http instead of https
yubi = Yubikey(123, 'dGhpc3JlcG9yb2Nrcw==', protocol='http')
``` ```
# NO WARRANTY # NO WARRANTY
THE PROGRAM IS DISTRIBUTED IN THE HOPE THAT IT WILL BE USEFUL, BUT WITHOUT ANY WARRANTY. IT IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION. THE PROGRAM IS DISTRIBUTED IN THE HOPE THAT IT WILL BE USEFUL, BUT WITHOUT ANY WARRANTY. IT IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW THE AUTHOR WILL BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW THE AUTHOR WILL BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.

View File

@ -1,5 +0,0 @@
from yubikey import Yubikey
yubi = Yubikey(123)
yubi.verify('asdadadasdasdadadadadasdasdadsasdadadsadadad')

View File

@ -1,15 +1,18 @@
#!/usr/bin/env python #!/usr/bin/env python
import requests
import string import string
from random import choice from random import choice
import base64
from hashlib import sha1
import hmac
import requests
class YubicoWS(object): class YubicoWS(object):
register_ws = 'https://upgrade.yubico.com/getapikey/?format=json' register_ws = 'https://upgrade.yubico.com/getapikey/?format=json'
_api_ws = 'http://%s/wsapi/2.0/'
api_ws = None api_ws = None
_protocol = 'https'
_servers = [ _servers = [
'api.yubico.com', 'api.yubico.com',
'api2.yubico.com', 'api2.yubico.com',
@ -17,25 +20,40 @@ class YubicoWS(object):
'api4.yubico.com', 'api4.yubico.com',
'api5.yubico.com', 'api5.yubico.com',
] ]
_server = None
_api_ws = '%s://%s/wsapi/2.0/'
_errors = { _errors = {
'BAD_OTP': 'The OTP is invalid format.', 'BAD_OTP': 'The OTP is invalid format.',
'REPLAYED_OTP': 'The OTP has already been seen by the service.', 'REPLAYED_OTP': 'The OTP has already been seen by the service.',
'BAD_SIGNATURE': 'The HMAC signature verification failed.', 'BAD_SIGNATURE': 'The HMAC signature verification failed.',
'MISSING_PARAMETER': 'The request lacks a parameter.', 'MISSING_PARAMETER': 'The request lacks a parameter.',
'NO_SUCH_CLIENT': 'The request id does not exist.', 'NO_SUCH_CLIENT': 'The request id does not exist.',
'OPERATION_NOT_ALLOWED': 'The request id is not allowed to verify OTPs.', 'OPERATION_NOT_ALLOWED': 'The request id is not allowed '
'BACKEND_ERROR': 'Unexpected error in our server. Please contact us if you see this error.', 'to verify OTPs.',
'NOT_ENOUGH_ANSWERS': 'Server could not get requested number of syncs during before timeout', 'BACKEND_ERROR': 'Unexpected error in our server. Please contact '
'us if you see this error.',
# 2.0
'NOT_ENOUGH_ANSWERS': 'Server could not get requested number of syncs '
'during before timeout',
'REPLAYED_REQUEST': 'Server has seen the OTP/Nonce combination before', 'REPLAYED_REQUEST': 'Server has seen the OTP/Nonce combination before',
} }
def __init__(self):
self.select_random_server()
def select_random_server(self): def __init__(self, **kwargs):
"Select random API Server" self._protocol = kwargs.get('protocol', self._protocol)
self.api_ws = self._api_ws % choice(self._servers) self._server = kwargs.get('server', None)
self.select_server()
def select_server(self):
"Select server if provided, otherwise uses one from the list"
if self._server and self._server in self._servers:
self.api_ws = self._api_ws % (self._protocol, self._server)
else:
self.api_ws = self._api_ws % (
self._protocol,
choice(self._servers)
)
def register_api_key(self, email, otp): def register_api_key(self, email, otp):
"Registers an API Key with the servers" "Registers an API Key with the servers"
@ -46,57 +64,85 @@ class YubicoWS(object):
response = requests.post(self.register_ws, data) response = requests.post(self.register_ws, data)
ws_response = response.json() ws_response = response.json()
if not ws_response['status']: if not ws_response['status']:
raise WSError(ws_response['error']) raise YubicoWSError(ws_response['error'])
return ws_response return ws_response
def verify(self, yubikey_id, otp, key=None): def verify(self, yubikey_id, otp, key=None):
"Verifies the provided OTP with the server" "Verifies the provided OTP with the server"
endpoint = 'verify' endpoint = 'verify'
url = self.api_ws + endpoint url = self.api_ws + endpoint
# Check otp format # Check otp format
if not (len(otp) > 32 and len(otp) < 48): if not (len(otp) > 32 and len(otp) < 48):
raise OTPIncorrectFormat() raise OTPIncorrectFormat()
nonce = self.generate_nonce() nonce = self.generate_nonce()
data = { data = {
'id': int(yubikey_id), 'id': str(yubikey_id),
'otp': str.lower(otp), 'otp': str.lower(otp),
'nonce': nonce 'nonce': nonce
} }
# Use API key for signing the message if key is provided # Use API key for signing the message if key is provided
if key: if key:
data = self.sign_otp(data, key) data['h'] = self.sign(data, key)
response = requests.get(url, params=data) response = requests.get(url, params=data)
ws_response = self.parse_ws_response(response.text) ws_response = self.parse_ws_response(response.text)
print(ws_response)
if ws_response['status'] == 'OK': if ws_response['status'] == 'OK':
# Check if response is valid # Check if response is valid
if not (ws_response['nonce'] == nonce \ if not (ws_response['nonce'] == data['nonce']
and ws_response['otp'] != otp \ and ws_response['otp'] == otp):
and True): raise YubicoWSInvalidResponse()
raise WSInvalidResponse()
# TODO check signature if key:
signature = self.sign(ws_response, key)
if ws_response['h'] != signature:
raise YubicoWSResponseBadSignature(
"The signature sent by the server is invalid"
)
else: else:
raise WSError(self._errors[ws_response['status']]) raise YubicoWSError(self._errors[ws_response['status']])
return ws_response return ws_response
def sign_otp(self, data, key): def sign(self, data, key):
"Signs the OTP with the provided key" "Signs the message with the provided key"
return data # Sort k=v dict
params = []
for k in sorted(data.keys()):
if k != 'h': # Just in case
key_value = "%s=%s" % (k, data[k])
params.append(key_value)
# Join as urlparams
parameters = '&'.join(params)
# hmac-sha1
hashed_string = hmac.new(
base64.b64decode(key),
parameters,
sha1
).digest()
# base64 encode
signature = base64.b64encode(hashed_string)
return signature
def parse_ws_response(self, text): def parse_ws_response(self, text):
"Parses the API key=value response into a dict" "Parses the API key=value response into a dict"
data = {} data = {}
for line in text.strip().split('\n'): for line in text.split():
key, value = line.split('=', 1) key, value = line.split('=', 1)
data[key] = value data[key.strip()] = value.strip()
return data return data
def generate_nonce(self): def generate_nonce(self):
@ -109,11 +155,11 @@ class Yubikey(object):
id = None id = None
key = None key = None
prefix = None prefix = None
_last_result = False _last_result = False
def __init__(self, yubikey_id=None, key=None): def __init__(self, yubikey_id=None, key=None, **kwargs):
self.ws = YubicoWS() self.ws = YubicoWS(**kwargs)
if yubikey_id: if yubikey_id:
self.id = yubikey_id self.id = yubikey_id
if key: if key:
@ -128,36 +174,38 @@ class Yubikey(object):
self.id = credentials['id'] self.id = credentials['id']
self.key = credentials['key'] self.key = credentials['key']
result = True result = True
return result return result
def verify(self, otp): def verify(self, otp):
"Verify an OTP to check if its valid" "Verify an OTP to check if its valid"
result = False result = False
if self.id: if self.id:
self.get_prefix(otp) self.get_prefix(otp)
result = self.ws.verify(self.id, otp, key=self.key) try:
if result == 'OK': self.ws.verify(self.id, otp, key=self.key)
result = True result = True
except (YubicoWSResponseBadSignature, YubicoWSError):
result = False
self._last_result = result self._last_result = result
return result return result
def get_prefix(self, otp): def get_prefix(self, otp):
"Get prefix from an OTP if present" "Get prefix from an OTP if present"
if len(otp) > 32: if len(otp) > 32:
self.prefix = str.lower(otp[:-32]) self.prefix = str.lower(otp[:-32])
class WSError(Exception):
class YubicoWSError(Exception):
def __init__(self, message=None): def __init__(self, message=None):
self.msg = "Web Service responded with an error: %s" % message self.msg = "Web Service responded with an error: %s" % message
def __str__(self): def __str__(self):
return repr(self.msg) return repr(self.msg)
class WSInvalidResponse(Exception):
class YubicoWSInvalidResponse(Exception):
msg = 'Response from the server is invalid' msg = 'Response from the server is invalid'
@ -168,3 +216,7 @@ class WSResponseError(Exception):
class OTPIncorrectFormat(Exception): class OTPIncorrectFormat(Exception):
pass pass
class YubicoWSResponseBadSignature(Exception):
pass