Last active
July 9, 2017 15:53
-
-
Save iandexter/d323e6d699a77bd5b23940542309a273 to your computer and use it in GitHub Desktop.
Allow or revoke SSH access to AWS security group
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import sys | |
import requests | |
import boto3 | |
from botocore.exceptions import ClientError | |
sg = { | |
'aws-region-1': 'sg-group-id', | |
'aws-region-2': 'sg-group-id', | |
} | |
def exit(err): | |
print('Error: {}'.format(err)) | |
sys.exit(1) | |
def get_ip(): | |
try: | |
r = requests.get('http://checkip.amazonaws.com') | |
r.raise_for_status() | |
return '{}/32'.format(r.text.rstrip()) | |
except requests.exceptions.RequestException as err: | |
exit(err) | |
def test_sg(**kwargs): | |
try: | |
resp = kwargs['client'].describe_security_groups( | |
GroupIds=[kwargs['sg']], | |
Filters=[{'Name':'ip-permission.cidr', 'Values':[kwargs['ip_addr']]}] | |
) | |
if resp['SecurityGroups']: | |
return (resp['SecurityGroups'][0]['GroupId'] == kwargs['sg']) | |
except ClientError as err: | |
exit(err) | |
def authorize(**kwargs): | |
try: | |
resp = kwargs['client'].authorize_security_group_ingress( | |
GroupId=kwargs['sg'], | |
IpProtocol='tcp', | |
ToPort=22, | |
FromPort=22, | |
CidrIp=kwargs['ip_addr'] | |
) | |
return 'Allowed' | |
except ClientError as err: | |
exit(err) | |
def revoke(**kwargs): | |
try: | |
resp = kwargs['client'].revoke_security_group_ingress( | |
GroupId=kwargs['sg'], | |
IpProtocol='tcp', | |
ToPort=22, | |
FromPort=22, | |
CidrIp=kwargs['ip_addr'] | |
) | |
return 'Revoked' | |
except ClientError as err: | |
exit(err) | |
def main(argv): | |
if len(argv) == 1: | |
exit("{} aws-region".format(sys.argv[0])) | |
else: | |
my_ip = get_ip() | |
for r in argv[1:]: | |
ec2 = boto3.client('ec2', region_name=r) | |
kwargs = {'client': ec2, 'sg': sg[r], 'ip_addr': my_ip} | |
op = revoke(**kwargs) if test_sg(**kwargs) else authorize(**kwargs) | |
print('{} {} in {}'.format(op, my_ip, r)) | |
if __name__ == '__main__': | |
main(sys.argv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment