Created
September 12, 2013 14:57
-
-
Save cluther/6538887 to your computer and use it in GitHub Desktop.
Python module for reading krb5 credential cache files.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
############################################################################## | |
# | |
# Copyright (C) Zenoss, Inc. 2013, all rights reserved. | |
# | |
# This content is made available according to terms specified in | |
# License.zenoss under the directory where your Zenoss product is installed. | |
# | |
############################################################################## | |
''' | |
Module for reading krb5 credential cache files. See the following link | |
for documentation on the file format. | |
http://repo.or.cz/w/krb5dissect.git/blob_plain/HEAD:/ccache.txt | |
''' | |
from base64 import b64encode | |
from datetime import datetime | |
from struct import unpack | |
class CCache(object): | |
file_format_version = None # uint16 (H,2) | |
headerlen = None # uint16 (H,2) | |
headers = None # Header[] | |
primary_principal = None # Principal[] | |
credentials = None # Credential[] | |
size = None | |
def __init__(self, data): | |
self.size = len(data) | |
# Track to which byte we've read in data. | |
idx = 0 | |
self.file_format_version, self.headerlen = unpack( | |
'!HH', data[idx:idx+4]) | |
idx += 4 | |
# Remember idx where headers start. | |
headers_idx = idx | |
# Add headers. | |
self.headers = [] | |
while idx < headers_idx + self.headerlen: | |
header = Header(data[idx:idx+self.headerlen]) | |
idx += header.size | |
self.headers.append(header) | |
# Add primary_principal. | |
self.primary_principal = Principal(data[idx:]) | |
idx += self.primary_principal.size | |
# Add credentials. | |
self.credentials = [] | |
while idx < self.size: | |
credential = Credential(data[idx:]) | |
idx += credential.size | |
self.credentials.append(credential) | |
def __str__(self): | |
return ( | |
"File format version: {0}\n" | |
"Header length: {1}\n" | |
"Headers:\n{2}\n" | |
"Primary Principal:\n{3}\n" | |
"Credentials:\n{4}\n".format( | |
self.file_format_version, | |
self.headerlen, | |
'\n'.join(map(str, self.headers)), | |
str(self.primary_principal), | |
'\n'.join(map(str, self.credentials)))) | |
class Header(object): | |
tag = None # uint16 (H,2) | |
taglen = None # uint16 (H,2) | |
tagdata = None # uint8[] | DeltaTime | |
size = None | |
def __init__(self, data): | |
self.size = 0 | |
self.tag, self.taglen = unpack('!HH', data[:4]) | |
if self.tag == 1: | |
self.tagdata = DeltaTime(data[4:4+self.taglen]) | |
else: | |
self.tagdata = unpack('!%dc' % self.taglen, data[4:4+self.taglen]) | |
self.size = 4 + self.taglen | |
def __str__(self): | |
return ( | |
" Tag: {0}\n" | |
" Tag length: {1}\n" | |
" Tag data: {2}\n".format( | |
self.tag, | |
self.taglen, | |
str(self.tagdata))) | |
class DeltaTime(object): | |
time_offset = None # uint32 (I,4) | |
usec_offset = None # uint32 (I,4) | |
size = None | |
def __init__(self, data): | |
self.time_offset, self.usec_offset = unpack('!II', data) | |
self.size = len(data) | |
def __str__(self): | |
return "DeltaTime (time_offset:{0}, usec_offset:{1})".format( | |
self.time_offset, self.usec_offset) | |
class Credential(object): | |
client = None # Principal | |
server = None # Principal | |
key = None # KeyBlock | |
time = None # Times | |
is_skey = None # uint8 (B,1) | |
tktflags = None # uint32 (I,4) | |
num_address = None # uint32 (I,4) | |
addrs = None # Address[] | |
num_authdata = None # uint32 (I,4) | |
authdata = None # AuthData[] | |
ticket = None # CountedOctetString | |
second_ticket = None # CountedOctetString | |
size = None | |
def __init__(self, data): | |
self.client = Principal(data[0:]) | |
idx = self.client.size | |
self.server = Principal(data[idx:]) | |
idx += self.server.size | |
self.key = KeyBlock(data[idx:]) | |
idx += self.key.size | |
self.time = Times(data[idx:]) | |
idx += self.time.size | |
self.is_skey, self.tktflags, self.num_address = unpack( | |
'!BII', data[idx:idx+9]) | |
idx += 9 | |
self.addrs = [] | |
while len(self.addrs) < self.num_address: | |
address = Address(data[idx:]) | |
self.addrs.append(address) | |
idx += address.size | |
self.num_authdata, = unpack('!I', data[idx:idx+4]) | |
idx += 4 | |
self.authdata = [] | |
while len(self.authdata) < self.num_authdata: | |
authdata = AuthData(data[idx:]) | |
self.authdata.append(authdata) | |
idx += authdata.size | |
self.ticket = Ticket(data[idx:]) | |
idx += self.ticket.size | |
self.second_ticket = Ticket(data[idx:]) | |
idx += self.second_ticket.size | |
self.size = idx | |
def __str__(self): | |
return ( | |
" Client: {0}\n" | |
" Server: {1}\n" | |
" Key: {2}\n" | |
" Times:\n{3}\n" | |
" SKey: {4}\n" | |
" Ticket flags: {5}\n" | |
" Addresses: {6}\n" | |
" Auth data: {7}\n" | |
" Ticket: {8}\n\n" | |
" Second ticket: {9}\n\n".format( | |
str(self.client), | |
str(self.server), | |
str(self.key), | |
str(self.time), | |
'Yes' if self.is_skey else 'No', | |
self.tktflags, | |
str(self.addrs), | |
str(self.authdata), | |
str(self.ticket), | |
str(self.second_ticket))) | |
class KeyBlock(object): | |
keytype = None # uint16 (H,2) | |
etype = None # uint16 (H,2) | |
keylen = None # uint16 (H,2) | |
keyvalue = None # uint8[] | |
size = None | |
def __init__(self, data): | |
self.keytype, self.etype, self.keylen = unpack('!HHH', data[:6]) | |
self.keyvalue = unpack('!%dc' % self.keylen, data[6:6+self.keylen]) | |
self.size = 6 + self.keylen | |
def __str__(self): | |
return b64encode(''.join(self.keyvalue)) | |
class Times(object): | |
authtime = None # uint32 (I,4) | |
starttime = None # uint32 (I,4) | |
endtime = None # uint32 (I,4) | |
renew_till = None # uint32 (I,4) | |
size = None | |
def __init__(self, data): | |
self.size = 16 | |
self.authtime, self.starttime, self.endtime, self.renew_till = unpack( | |
'!IIII', data[:self.size]) | |
def __str__(self): | |
return ( | |
" Auth: {0}\n" | |
" Start: {1}\n" | |
" End: {2}\n" | |
" Renew till: {3}\n".format( | |
datetime.fromtimestamp(self.authtime).isoformat(), | |
datetime.fromtimestamp(self.starttime).isoformat(), | |
datetime.fromtimestamp(self.endtime).isoformat(), | |
datetime.fromtimestamp(self.renew_till).isoformat())) | |
class Address(object): | |
addrtype = None # uint16 (H,2) | |
addrdata = None # CountedOctetString | |
size = None | |
def __init__(self, data): | |
self.addrtype, = unpack('!H', data[:2]) | |
self.addrdata = CountedOctetString(data[2:]) | |
self.size = 2 + self.addrdata.size | |
class AuthData(object): | |
authtype = None # uint16 (H,2) | |
authdata = None # CountedOctetString | |
size = None | |
def __init__(self, data): | |
self.authtype, = unpack('!H', data[:2]) | |
self.authdata = CountedOctetString(data[2:]) | |
self.size = 2 + self.authdata.size | |
class Principal(object): | |
name_type = None # uint16 (I,4) | |
num_components = None # uint16 (I,4) | |
realm = None # CountedOctetString | |
components = None # CountedOctetString[] | |
size = None | |
def __init__(self, data): | |
self.name_type, self.num_components = unpack('!II', data[:8]) | |
idx = 8 | |
self.realm = CountedOctetString(data[idx:]) | |
idx += self.realm.size | |
self.components = [] | |
while len(self.components) < self.num_components: | |
component = CountedOctetString(data[idx:]) | |
self.components.append(component) | |
idx += component.size | |
self.size = idx | |
def __str__(self): | |
return ', '.join( | |
('{0}@{1}'.format( | |
str(x), str(self.realm)) for x in self.components)) | |
class CountedOctetString(object): | |
length = None # uint32 (I, 4) | |
data = None # uint8[] | |
size = None | |
def __init__(self, data): | |
self.length, = unpack('!I', data[:4]) | |
self.data = unpack('!%dc' % self.length, data[4:4+self.length]) | |
self.size = 4 + self.length | |
def __str__(self): | |
return ''.join(self.data) | |
def __repr__(self): | |
return repr(str(self)) | |
class Ticket(CountedOctetString): | |
def __str__(self): | |
return b64encode(super(Ticket, self).__str__()) | |
def main(): | |
import optparse | |
import os | |
import tempfile | |
parser = optparse.OptionParser() | |
parser.add_option( | |
'-d', '--debug', | |
default=False, action='store_true', | |
help='Open pdb once ccache has been loaded') | |
options, args = parser.parse_args() | |
if len(args) > 0: | |
ccache_filename = args[0] | |
else: | |
ccache_filename = os.path.join( | |
tempfile.gettempdir(), | |
'krb5cc_{0}'.format(os.getuid())) | |
with open(ccache_filename, 'rb') as f: | |
ccache = CCache(f.read()) | |
print ccache | |
if options.debug: | |
print "Opening pdb. See the 'ccache' object." | |
import pdb | |
pdb.set_trace() | |
pass | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment