add flare-on 2020

This commit is contained in:
2021-02-05 23:42:57 +07:00
parent 6e72946689
commit f375a10066
225 changed files with 348570 additions and 0 deletions

View File

@ -0,0 +1,2 @@
from .api import CryptImportKey, CryptEncrypt, CryptDecrypt, CryptExportKey, CryptCreateHash, CryptGetKeyParam, \
CryptHashData, CryptDeriveKey, CryptGetHashParam

View File

@ -0,0 +1,217 @@
from abc import abstractproperty, ABCMeta
import struct
import hashlib
import sys
import Crypto.Cipher.PKCS1_v1_5
import Crypto.Cipher.AES
import Crypto.Cipher.ARC4
from Crypto.PublicKey import RSA
from Crypto.Util.number import long_to_bytes, bytes_to_long, inverse
from wincrypto.constants import RSAPUBKEY, RSAPUBKEY_s, RSAPUBKEY_MAGIC, PUBLICKEYSTRUC_s, bType_PUBLICKEYBLOB, \
CUR_BLOB_VERSION, CALG_RSA_KEYX, PRIVATEKEYBLOB_MAGIC, PRIVATEKEYBLOB, bType_PRIVATEKEYBLOB, bType_PLAINTEXTKEYBLOB, \
bType_SIMPLEBLOB, CALG_RC4, CALG_AES_128, CALG_AES_192, CALG_AES_256, CALG_MD5, CALG_SHA1, ALG_CLASS_HASH, \
ALG_CLASS_KEY_EXCHANGE, ALG_CLASS_DATA_ENCRYPT
from wincrypto.util import add_pkcs5_padding, remove_pkcs5_padding, GET_ALG_CLASS
# python2/3 compatibility
if sys.version > '3':
long = int
class HCryptKey(object):
def __init__(self, key):
self.key = key
class RSA_KEYX(HCryptKey):
alg_id = CALG_RSA_KEYX
@classmethod
def from_pem(cls, pem_key):
imported_key = Crypto.PublicKey.RSA.importKey(pem_key)
return cls(imported_key)
@classmethod
def import_publickeyblob(cls, data):
rsapubkey = RSAPUBKEY._make(RSAPUBKEY_s.unpack_from(data))
assert rsapubkey.magic == RSAPUBKEY_MAGIC
bitlen8 = rsapubkey.bitlen // 8
modulus = bytes_to_long(data[12:12 + bitlen8][::-1])
r = RSA.construct((modulus, long(rsapubkey.pubexp)))
return cls(r)
def export_publickeyblob(self):
n = self.key.key.n
e = self.key.key.e
n_bytes = long_to_bytes(n)[::-1]
result = PUBLICKEYSTRUC_s.pack(bType_PUBLICKEYBLOB, CUR_BLOB_VERSION, CALG_RSA_KEYX)
result += RSAPUBKEY_s.pack(RSAPUBKEY_MAGIC, len(n_bytes) * 8, e)
result += n_bytes
return result
@classmethod
def import_privatekeyblob(cls, data):
rsapubkey = RSAPUBKEY._make(RSAPUBKEY_s.unpack_from(data))
assert rsapubkey.magic == PRIVATEKEYBLOB_MAGIC
bitlen8 = rsapubkey.bitlen / 8
bitlen16 = rsapubkey.bitlen / 16
privatekeyblob_s = struct.Struct(
'%ds%ds%ds%ds%ds%ds%ds' % (bitlen8, bitlen16, bitlen16, bitlen16, bitlen16, bitlen16, bitlen8))
privatekey = PRIVATEKEYBLOB._make(bytes_to_long(x[::-1]) for x in privatekeyblob_s.unpack_from(data[12:]))
r = RSA.construct((privatekey.modulus, long(rsapubkey.pubexp), privatekey.privateExponent,
privatekey.prime1, privatekey.prime2))
return cls(r)
def export_privatekeyblob(self):
key = self.key.key
n = key.n
e = key.e
d = key.d
p = key.p
q = key.q
n_bytes = long_to_bytes(n)[::-1]
key_len = len(n_bytes) * 8
result = PUBLICKEYSTRUC_s.pack(bType_PRIVATEKEYBLOB, CUR_BLOB_VERSION, CALG_RSA_KEYX)
result += RSAPUBKEY_s.pack(PRIVATEKEYBLOB_MAGIC, key_len, e)
result += n_bytes
result += long_to_bytes(p, key_len / 16)[::-1]
result += long_to_bytes(q, key_len / 16)[::-1]
result += long_to_bytes(d % (p - 1), key_len / 16)[::-1]
result += long_to_bytes(d % (q - 1), key_len / 16)[::-1]
result += long_to_bytes(inverse(q, p), key_len / 16)[::-1]
result += long_to_bytes(d, key_len / 8)[::-1]
return result
def decrypt(self, data):
data = data[::-1]
c = Crypto.Cipher.PKCS1_v1_5.new(self.key)
result = c.decrypt(data, None)
return result
def encrypt(self, data):
c = Crypto.Cipher.PKCS1_v1_5.new(self.key)
result = c.encrypt(data)
result = result[::-1]
return result
class symmetric_HCryptKey(HCryptKey):
__metaclass__ = ABCMeta
alg_id = abstractproperty()
key_len = abstractproperty()
def __init__(self, key):
if len(key) != self.key_len:
raise AssertionError('key must be {} bytes long'.format(self.key_len))
super(symmetric_HCryptKey, self).__init__(key)
@classmethod
def import_plaintextkeyblob(cls, data):
key_len = struct.unpack('<I', data[:4])[0]
key = data[4:4 + key_len]
return cls(key)
def export_plaintextkeyblob(self):
result = PUBLICKEYSTRUC_s.pack(bType_PLAINTEXTKEYBLOB, 2, self.alg_id)
result += struct.pack('<I', len(self.key))
result += self.key
return result
@classmethod
def import_simpleblob(cls, data, hPubKey):
assert struct.unpack('<I', data[:4])[0] == CALG_RSA_KEYX
assert hPubKey
key = hPubKey.decrypt(data[4:])
return cls(key)
def export_simpleblob(self, rsa_key):
result = PUBLICKEYSTRUC_s.pack(bType_SIMPLEBLOB, CUR_BLOB_VERSION, self.alg_id)
if rsa_key.alg_id != CALG_RSA_KEYX:
raise ValueError('SIMPLEBLOB export only supported under RSA key')
result += struct.pack('<I', CALG_RSA_KEYX)
result += rsa_key.encrypt(self.key)
return result
class RC4(symmetric_HCryptKey):
alg_id = CALG_RC4
key_len = 16
def encrypt(self, data):
return Crypto.Cipher.ARC4.new(self.key).encrypt(data)
def decrypt(self, data):
return Crypto.Cipher.ARC4.new(self.key).encrypt(data)
class AES_base(symmetric_HCryptKey):
alg_id = abstractproperty()
def encrypt(self, data):
data = add_pkcs5_padding(data, 16)
encrypted = Crypto.Cipher.AES.new(self.key, mode=Crypto.Cipher.AES.MODE_CBC, IV='\0' * 16).encrypt(data)
return encrypted
def decrypt(self, data):
decrypted = Crypto.Cipher.AES.new(self.key, mode=Crypto.Cipher.AES.MODE_CBC, IV=b'\0' * 16).decrypt(data)
result = remove_pkcs5_padding(decrypted)
return result
class AES128(AES_base):
alg_id = CALG_AES_128
key_len = 16
class AES192(AES_base):
alg_id = CALG_AES_192
key_len = 24
class AES256(AES_base):
alg_id = CALG_AES_256
key_len = 32
class HCryptHash():
__metaclass__ = ABCMeta
alg_id = abstractproperty()
hash_class = abstractproperty()
def __init__(self):
# noinspection PyCallingNonCallable
self.hasher = self.hash_class()
def hash_data(self, data):
self.hasher.update(data)
def get_hash_val(self):
return self.hasher.digest()
def get_hash_size(self):
return self.hasher.digest_size
class MD5(HCryptHash):
alg_id = CALG_MD5
hash_class = hashlib.md5
class SHA1(HCryptHash):
alg_id = CALG_SHA1
hash_class = hashlib.sha1
algorithm_list = [RC4, AES128, AES192, AES256, RSA_KEYX, MD5, SHA1]
symmetric_algorithms = [x for x in algorithm_list if GET_ALG_CLASS(x.alg_id) == ALG_CLASS_DATA_ENCRYPT]
asymmetric_algorithms = [x for x in algorithm_list if GET_ALG_CLASS(x.alg_id) == ALG_CLASS_KEY_EXCHANGE]
hash_algorithms = [x for x in algorithm_list if GET_ALG_CLASS(x.alg_id) == ALG_CLASS_HASH]
algorithm_registry = dict((x.alg_id, x) for x in algorithm_list)

View File

@ -0,0 +1,103 @@
from wincrypto import constants
from wincrypto.algorithms import algorithm_registry
from wincrypto.constants import PUBLICKEYSTRUC, PUBLICKEYSTRUC_s, CUR_BLOB_VERSION, bType_PUBLICKEYBLOB, \
bType_PRIVATEKEYBLOB, bType_PLAINTEXTKEYBLOB, bType_SIMPLEBLOB, KP_KEYLEN, KP_ALGID, CALG_AES_192, CALG_AES_256, \
CALG_AES_128, ALG_CLASS_KEY_EXCHANGE, ALG_CLASS_DATA_ENCRYPT
from wincrypto.util import derive_key_3des_aes, GET_ALG_CLASS
def CryptImportKey(data, pub_key=None):
publickeystruc = PUBLICKEYSTRUC._make(PUBLICKEYSTRUC_s.unpack_from(data))
if publickeystruc.bVersion != CUR_BLOB_VERSION:
raise NotImplementedError('PUBLICKEYSTRUC.bVersion={} not implemented'.format(publickeystruc.bVersion))
if publickeystruc.aiKeyAlg not in algorithm_registry:
raise NotImplementedError('ALG_ID {:x} not implemented'.format(publickeystruc.aiKeyAlg))
if publickeystruc.bType == bType_PUBLICKEYBLOB:
if GET_ALG_CLASS(publickeystruc.aiKeyAlg) != ALG_CLASS_KEY_EXCHANGE:
raise ValueError('Invalid ALG_ID {:x} for PUBLICKEYBLOB'.format(publickeystruc.aiKeyAlg))
return algorithm_registry[publickeystruc.aiKeyAlg].import_publickeyblob(data[8:])
elif publickeystruc.bType == bType_PRIVATEKEYBLOB:
if GET_ALG_CLASS(publickeystruc.aiKeyAlg) != ALG_CLASS_KEY_EXCHANGE:
raise ValueError('Invalid ALG_ID {:x} for PRIVATEKEYBLOB'.format(publickeystruc.aiKeyAlg))
return algorithm_registry[publickeystruc.aiKeyAlg].import_privatekeyblob(data[8:])
elif publickeystruc.bType == bType_PLAINTEXTKEYBLOB:
if GET_ALG_CLASS(publickeystruc.aiKeyAlg) != ALG_CLASS_DATA_ENCRYPT:
raise ValueError('Invalid ALG_ID {:x} for PLAINTEXTKEYBLOB'.format(publickeystruc.aiKeyAlg))
return algorithm_registry[publickeystruc.aiKeyAlg].import_plaintextkeyblob(data[8:])
elif publickeystruc.bType == bType_SIMPLEBLOB:
if GET_ALG_CLASS(publickeystruc.aiKeyAlg) != ALG_CLASS_DATA_ENCRYPT:
raise ValueError('Invalid ALG_ID {:x} for SIMPLEBLOB'.format(publickeystruc.aiKeyAlg))
return algorithm_registry[publickeystruc.aiKeyAlg].import_simpleblob(data[8:], pub_key)
else:
raise NotImplementedError('PUBLICKEYSTRUC.bType={} not implemented'.format(publickeystruc.bType))
def CryptDecrypt(crypt_key, encrypted_data):
return crypt_key.decrypt(encrypted_data)
def CryptEncrypt(crypt_key, plain_data):
return crypt_key.encrypt(plain_data)
def CryptExportKey(crypt_key, exp_key, blob_type):
if blob_type == bType_SIMPLEBLOB:
return crypt_key.export_simpleblob(exp_key)
elif blob_type == bType_PLAINTEXTKEYBLOB:
return crypt_key.export_plaintextkeyblob()
elif blob_type == bType_PUBLICKEYBLOB:
return crypt_key.export_publickeyblob()
elif blob_type == bType_PRIVATEKEYBLOB:
return crypt_key.export_privatekeyblob()
else:
raise NotImplementedError('blob_type={} not supported'.format(blob_type))
def CryptGetKeyParam(crypt_key, dwParam):
if dwParam == KP_KEYLEN:
return crypt_key.key_len * 8
elif dwParam == KP_ALGID:
return crypt_key.alg_id
else:
raise NotImplementedError('key param {} not implemented'.format(dwParam))
def CryptCreateHash(algid):
if algid not in algorithm_registry:
raise NotImplementedError('ALG_ID {:x} not implemented'.format(algid))
alg_class = algorithm_registry[algid]
return alg_class()
def CryptHashData(hash_alg, data):
hash_alg.hash_data(data)
def CryptGetHashParam(hash_alg, dwParam):
if dwParam == constants.HP_ALGID:
return hash_alg.alg_id
elif dwParam == constants.HP_HASHVAL:
return hash_alg.get_hash_val()
elif dwParam == constants.HP_HASHSIZE:
return hash_alg.get_hash_size()
else:
return NotImplementedError('hash param {} not implemented'.format(dwParam))
def CryptDeriveKey(hash_alg, algid):
if algid not in algorithm_registry:
raise NotImplementedError('ALG_ID {:x} not implemented'.format(algid))
hash_val = hash_alg.get_hash_val()
alg_class = algorithm_registry[algid]
# key derivation for AES and 3DES for non SHA2 family algorithms
if algid in [CALG_AES_128, CALG_AES_192, CALG_AES_256]:
key = derive_key_3des_aes(hash_alg)
else:
key = hash_val
key = key[:alg_class.key_len]
return alg_class(key)

View File

@ -0,0 +1,48 @@
from collections import namedtuple
import struct
PUBLICKEYSTRUC = namedtuple('PUBLICKEYSTRUC', 'bType bVersion aiKeyAlg') # reserved is skipped when unpacking
PUBLICKEYSTRUC_s = struct.Struct('<bb2xI')
PRIVATEKEYBLOB = namedtuple('PRIVATEKEYBLOB', 'modulus prime1 prime2 exponent1 exponent2 coefficient privateExponent')
RSAPUBKEY = namedtuple('RSAPUBKEY', 'magic bitlen pubexp')
RSAPUBKEY_s = struct.Struct('<4sII')
RSAPUBKEY_MAGIC = b'RSA1'
PRIVATEKEYBLOB_MAGIC = b'RSA2'
# bType
bType_SIMPLEBLOB = 1
bType_PUBLICKEYBLOB = 6
bType_PRIVATEKEYBLOB = 7
bType_PLAINTEXTKEYBLOB = 8
# CALG
CALG_AES_128 = 0x660e
CALG_AES_192 = 0x660f
CALG_AES_256 = 0x6610
CALG_RC4 = 0x6801
CALG_MD5 = 0x8003
CALG_SHA = 0x8004
CALG_SHA1 = 0x8004
CALG_RSA_KEYX = 0xa400
# Hash params
HP_ALGID = 0x0001
HP_HASHVAL = 0x0002
HP_HASHSIZE = 0x0004
# key params
KP_ALGID = 7
KP_BLOCKLEN = 8
KP_KEYLEN = 9
CRYPT_EXPORTABLE = 1
CUR_BLOB_VERSION = 2
ALG_CLASS_DATA_ENCRYPT = 3 << 13
ALG_CLASS_HASH = 4 << 13
ALG_CLASS_KEY_EXCHANGE = 5 << 13

View File

@ -0,0 +1,132 @@
from ctypes import FormatError
from ctypes import windll, c_void_p, byref, create_string_buffer, c_int
import struct
from wincrypto.constants import HP_ALGID, HP_HASHSIZE, KP_KEYLEN, KP_ALGID, CRYPT_EXPORTABLE
PROV_RSA_FULL = 1
PROV_RSA_AES = 24
def assert_success(success):
if not success:
raise AssertionError(FormatError())
def CryptAcquireContext():
hprov = c_void_p()
success = windll.advapi32.CryptAcquireContextA(byref(hprov), 0, 0, PROV_RSA_AES, 0)
assert_success(success)
return hprov
def CryptReleaseContext(hprov):
success = windll.advapi32.CryptReleaseContext(hprov, 0)
assert_success(success)
def CryptImportKey(hprov, keyblob, hPubKey=0):
hkey = c_void_p()
success = windll.advapi32.CryptImportKey(hprov, keyblob, len(keyblob), hPubKey, 0, byref(hkey))
assert_success(success)
return hkey
def CryptExportKey(hkey, hexpkey, blobType):
# determine output buffer length
bdatalen = c_int(0)
success = windll.advapi32.CryptExportKey(hkey, hexpkey, blobType, 0, 0, byref(bdatalen))
assert_success(success)
# export key
bdata = create_string_buffer(b'', bdatalen.value)
success = windll.advapi32.CryptExportKey(hkey, hexpkey, blobType, 0, bdata, byref(bdatalen))
assert_success(success)
return bdata.raw[:bdatalen.value]
def CryptDestroyKey(hkey):
success = windll.advapi32.CryptDestroyKey(hkey)
assert_success(success)
def CryptDecrypt(hkey, encrypted_data):
bdata = create_string_buffer(encrypted_data)
bdatalen = c_int(len(encrypted_data))
success = windll.advapi32.CryptDecrypt(hkey, 0, 1, 0, bdata, byref(bdatalen))
assert_success(success)
return bdata.raw[:bdatalen.value]
def CryptEncrypt(hkey, plain_data):
# determine output buffer length
bdatalen_test = c_int(len(plain_data))
success = windll.advapi32.CryptEncrypt(hkey, 0, 1, 0, 0, byref(bdatalen_test), len(plain_data))
assert_success(success)
out_buf_len = bdatalen_test.value
# encrypt data
bdata = create_string_buffer(plain_data, out_buf_len)
bdatalen = c_int(len(plain_data))
success = windll.advapi32.CryptEncrypt(hkey, 0, 1, 0, bdata, byref(bdatalen), out_buf_len)
assert_success(success)
return bdata.raw[:bdatalen.value]
def CryptGetKeyParam(hkey, dwparam):
# determine output buffer length
bdatalen = c_int(0)
success = windll.advapi32.CryptGetKeyParam(hkey, dwparam, 0, byref(bdatalen), 0)
assert_success(success)
# get hash param
bdata = create_string_buffer(b'', bdatalen.value)
success = windll.advapi32.CryptGetKeyParam(hkey, dwparam, bdata, byref(bdatalen), 0)
assert_success(success)
result = bdata.raw[:bdatalen.value]
if dwparam in [KP_KEYLEN, KP_ALGID]:
result = struct.unpack('I', result)[0]
return result
def CryptCreateHash(hProv, Algid):
hCryptHash = c_void_p()
success = windll.advapi32.CryptCreateHash(hProv, Algid, 0, 0, byref(hCryptHash))
assert_success(success)
return hCryptHash
def CryptHashData(hHash, data):
bdata = create_string_buffer(data)
dwdatalen = c_int(len(data))
success = windll.advapi32.CryptHashData(hHash, bdata, dwdatalen, 0)
assert_success(success)
def CryptGetHashParam(hHash, dwParam):
# determine output buffer length
bdatalen = c_int(0)
success = windll.advapi32.CryptGetHashParam(hHash, dwParam, 0, byref(bdatalen), 0)
assert_success(success)
# get hash param
bdata = create_string_buffer(b'', bdatalen.value)
success = windll.advapi32.CryptGetHashParam(hHash, dwParam, bdata, byref(bdatalen), 0)
assert_success(success)
result = bdata.raw[:bdatalen.value]
if dwParam in [HP_ALGID, HP_HASHSIZE]:
result = struct.unpack('I', result)[0]
return result
def CryptDestroyHash(hCryptHash):
success = windll.advapi32.CryptDestroyHash(hCryptHash)
assert_success(success)
def CryptDeriveKey(hProv, Algid, hBaseData):
hkey = c_void_p()
success = windll.advapi32.CryptDeriveKey(hProv, Algid, hBaseData, CRYPT_EXPORTABLE, byref(hkey))
assert_success(success)
return hkey

View File

@ -0,0 +1,43 @@
import sys
# python2/3 compatibility
if sys.version > '3':
byte2int = lambda x: x
else:
byte2int = ord
def remove_pkcs5_padding(data):
padding_length = byte2int(data[-1])
return data[:-padding_length]
def add_pkcs5_padding(data, blocksize):
last_block_len = len(data) % blocksize
padding_length = blocksize - last_block_len
if padding_length == 0:
padding_length = blocksize
return data + bytes(bytearray([padding_length] * padding_length))
# algorithm taken from msdn documentation for CryptDeriveKey
def derive_key_3des_aes(hash_alg):
from wincrypto.api import CryptCreateHash
buf1 = bytearray(b'\x36' * 64)
hash_val = bytearray(hash_alg.get_hash_val())
for x in range(len(hash_val)):
buf1[x] ^= hash_val[x]
buf2 = bytearray(b'\x5c' * 64)
for x in range(len(hash_val)):
buf2[x] ^= hash_val[x]
hash1 = CryptCreateHash(hash_alg.alg_id)
hash1.hash_data(bytes(buf1))
hash2 = CryptCreateHash(hash_alg.alg_id)
hash2.hash_data(bytes(buf2))
derived_key = hash1.get_hash_val() + hash2.get_hash_val()
return derived_key
def GET_ALG_CLASS(x):
return x & (7 << 13)