Created
June 8, 2022 15:11
-
-
Save sdabbour-stratio/a0a695a310dc1d2580e398b0d95e4d50 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import requests | |
from requests.packages.urllib3.util.retry import Retry | |
from requests.adapters import HTTPAdapter | |
import sys | |
''' | |
Lookup plugin to get the oauth token from stratio dcos | |
============================================================ | |
Parameters: | |
- username | |
- password | |
- gosecSSOUrl | |
- proxyAccesPointUrl | |
Use example: | |
- name: Get the dcos auth token | |
set_fact: | |
dcos_auth_token: "{{ lookup('dcos_auth', 'gosecSSOUrl=https://master-1.node.paas.labs.stratio.com:9005/sso proxyAccesPointUrl=https://master-1.node.paas.labs.stratio.com password=1234 username=admin11')}}" | |
- name: Echo token | |
shell: "echo {{ dcos_auth_token }}" | |
- name: List marathon apps | |
uri: | |
url: "https://sso.paas.labs.stratio.com/service/marathon/v2/groups" | |
headers: | |
Cookie: "DCOSID=s1; dcos-acs-auth-cookie={{ dcos_auth_token }}" | |
''' | |
class DCOSTokenFetcher: | |
def __init__(self, gosec_sso_url, proxy_access_url, username, password, tenant): | |
requests.packages.urllib3.disable_warnings() | |
self.gosec_sso_url = gosec_sso_url | |
self.proxy_access_url = proxy_access_url | |
self.username = username | |
self.password = password | |
self.tenant = tenant | |
# Setup session | |
self.session = requests.Session() | |
retries = Retry(total=10, | |
backoff_factor=0.1, | |
status_forcelist=[ 500, 502, 503, 504 ]) | |
self.session.mount('http://', HTTPAdapter(max_retries=retries)) | |
self.session.mount('https://', HTTPAdapter(max_retries=retries)) | |
def get_oauth2_token(self): | |
r = self.session.get(self.proxy_access_url + "/login", verify=False, allow_redirects=True) | |
# extract info from body | |
execution, lt = self._get_login_info(r) | |
r = self.session.post(self.gosec_sso_url + "/login", { | |
"service": self.gosec_sso_url + "/oauth2.0/callbackAuthorize", | |
"lt": lt, | |
"_eventId": "submit", | |
"execution": execution, | |
"submit": "LOGIN", | |
"username": self.username, | |
"password": self.password, | |
"tenant": self.tenant | |
}, verify=False, allow_redirects=True) | |
if 'dcos-acs-auth-cookie' not in self.session.cookies: | |
raise Exception("Invalid credentials") | |
return self.session.cookies['dcos-acs-auth-cookie'] | |
def _get_login_info(self, r): | |
body = r.content.decode("UTF-8") | |
lt_left_match = "name=\"lt\" value=\"" | |
lt1 = body.index(lt_left_match) | |
prelt = body[lt1 + len(lt_left_match):] | |
lt = prelt[:prelt.index("\" />")].strip() | |
execution_left_match = "name=\"execution\" value=\"" | |
execution1 = body.index(execution_left_match) | |
execution = body[execution1 + len("name=\"execution\" value=\""):].split("\"")[0] | |
return execution, lt | |
def main(gosec_sso_url, proxy_access_url, username, password, tenant): | |
fetcher = DCOSTokenFetcher(gosec_sso_url, proxy_access_url, username, password, tenant) | |
output = fetcher.get_oauth2_token() | |
print(output) | |
pass | |
if __name__ == '__main__': | |
gosec_sso_url = sys.argv[1] | |
proxy_access_url = sys.argv[2] | |
username = sys.argv[3] | |
password = sys.argv[4] | |
tenant = sys.argv[5] | |
main(gosec_sso_url, proxy_access_url, username, password, tenant) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment