Created
October 5, 2016 17:52
-
-
Save andrewalexander/62dd690d6fcc2f560c47f796b025fbd2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import uuid | |
from c7n.policy import load | |
from c7n.resources import load_resources | |
from c7n.utils import Bag, yaml_load | |
def convert_sg_to_describe(event): | |
# convert CFT event to the format custodian expects (describe_security_groups) | |
# TODO: want some kind of schema validation for the event | |
sg = event['ResourceProperties'] | |
incoming_egress = sg.get('SecurityGroupEgress', []) | |
incoming_ingress = sg.get('SecurityGroupIngress', []) | |
# convert egress | |
egress = [] | |
for index, rule in enumerate(incoming_egress): | |
egress.append({}) | |
# Make sure to set IP Protocol to 'all' (-1) if we don't set ports | |
if not rule.get('FromPort') and not rule.get('ToPort'): | |
egress[index]['IpProtocol'] = "-1" | |
else: | |
egress[index]['IpProtocol'] = rule.get('IpProtocol') | |
egress[index]['FromPort'] = int(rule.get('FromPort')) | |
egress[index]['ToPort'] = int(rule.get('ToPort')) | |
egress[index]['IpRanges'] = [rule.get('CidrIp')] or [] | |
egress[index]['UserIdGroupPairs'] = list(rule.get('DestinationSecurityGroupId') or []) | |
# not sure if we can support these in CloudFormation, but the Describe call has it | |
egress[index]['PrefixListIds'] = [] | |
# convert ingress | |
ingress = [] | |
for index, rule in enumerate(incoming_ingress): | |
ingress.append({}) | |
# Make sure to set IP Protocol to 'all' (-1) if we don't set ports | |
if not rule.get('FromPort') and not rule.get('ToPort'): | |
ingress[index]['IpProtocol'] = "-1" | |
else: | |
ingress[index]['IpProtocol'] = rule.get('IpProtocol') | |
ingress[index]['FromPort'] = int(rule.get('FromPort')) | |
ingress[index]['ToPort'] = int(rule.get('ToPort')) | |
ingress[index]['IpRanges'] = [rule.get('CidrIp')] or [] | |
if rule.get('SourceSecurityGroupOwnerId'): | |
ingress[index]['UserIdGroupPairs'] = list('%s/%s' % ( | |
rule['SourceSecurityGroupOwnerId'], | |
rule.get('SourceSecurityGroupId')) | |
or []) | |
else: | |
ingress[index]['UserIdGroupPairs'] = list(rule.get('SourceSecurityGroupId') or []) | |
ingress[index]['PrefixListIds'] = [] | |
# convert tags: | |
tags = [] | |
for tag in sg['Tags']: | |
for k, v in tag.iteritems(): | |
tags.append({'Key': k, 'Value': v}) | |
# build final dict | |
describe_dict = { | |
'SecurityGroups': [{ | |
'IpPermissionsEgress': egress, | |
'Description': sg['GroupDescription'], | |
'Tags': tags, | |
'IpPermissions': ingress, | |
'GroupName': sg['GroupName'], | |
'VpcId': sg['VpcId'], | |
'OwnerId': event['accountId'], | |
'GroupId': event['groupId'] | |
}] | |
} | |
return describe_dict | |
def main(): | |
load_resources() | |
config = Bag({ | |
'region': os.environ.get('AWS_DEFAULT_REGION', 'us-east-1'), | |
'cache': '', | |
'profile': None, | |
'assume_role': None, | |
'log_group': None, | |
'metrics_enabled': True, | |
'output_dir': '/tmp/' + str(uuid.uuid4()), | |
'cache_period': 0, | |
'dryrun': False}) | |
incoming_event = { | |
"ResourceProperties": { | |
"GroupName": "TestInternetSG", | |
"GroupDescription": "Typical Internet-Facing Security Group", | |
"VpcId": "vpc-1234abcd", | |
"SecurityGroupIngress": [{ | |
"CidrIp" : '10.0.0.0/8', | |
"FromPort" : 53, | |
"IpProtocol" : 'tcp', | |
"ToPort" : 53 | |
}], | |
"SecurityGroupEgress": [], | |
"Tags": [{ | |
"Key": "Name", | |
"Value": "InternetSecurityGroup" | |
} | |
] | |
}, | |
'accountId': '123456789012', | |
'groupId': 'sg-abcd1234' | |
} | |
cfn_dict = convert_sg_to_describe(incoming_event) | |
policies = load(config, 'sg-rules.yml') | |
fake_security_groups = cfn_dict['SecurityGroups'] | |
for p in policies: | |
print p.name | |
print p.get_resource_manager().filter_resources(fake_security_groups) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment