Skip to content
Snippets Groups Projects
Commit 600164a7 authored by Bruno Boiget's avatar Bruno Boiget Committed by Matthieu Lamalle
Browse files

Zephir client creation #64

parent e20488b2
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
"""Zephir-cmd-input
"""
import sys
import getpass
from pathlib import Path
from json import loads, dumps
import requests
from urllib.request import urlopen, Request
from tiramisu_json_api import Config
from tiramisu_cmdline_parser import TiramisuCmdlineParser
# URL = 'lab14-eole.ac-dijon.fr'
URL = 'zephir2.ac-test.fr'
version = 'v1'
token_file = Path.home() / '.zephir-client.jwt.token'
def send(config):
# check authentication
# retrieve new message value
# (which is the name of message selected by the user)
message = config.option('message').value.get()
payload = {}
version = "v1"
if message:
# retrieve all arguments, version, returns, ... for selected message
message_od_name = message.replace('.', '_')
for option in config.option(message_od_name).list():
name = option.option.name()
if name.endswith('.version'):
api_version = option.value.get()
elif not option.owner.isdefault():
payload[name.rsplit('_', 1)[-1]] = option.value.get()
if not payload:
payload = {}
# get stored token if available
if Path.is_file(token_file):
token = open(token_file).read()
else:
token = ''
auth_ok = False
# loop on call if authentication failed
while not auth_ok:
url = f'https://{URL}/api/{version}/{message}'
headers = {'Authorization':f'Bearer {token}'}
try:
response = requests.post(url, data=dumps(payload), headers=headers, verify=False).json()
if 'error' in response and "bearer" in response["error"]["kwargs"]["reason"]:
# Token non valid :
# {"error": {"uri": "zephir.unknown_error", "kwargs": {"reason": "unexpected bearer format"}, "description": "unknown error"}}
token = authenticate()
else:
auth_ok = True
if 'error' in response:
print(f"Erreur : {response['error']['kwargs']['reason']}")
else:
print(response)
except ValueError:
token = authenticate()
continue
except Exception as err:
print(f"Erreur : {err}")
sys.exit(1)
def authenticate():
print("\nVous n'êtes pas authentifié, veuillez saisir vos identifiants Zéphir\n")
SSO_REALM = "zephir"
USERNAME = input("Utilisateur : ")
PASSWORD = getpass.getpass(prompt='Mot de passe : ', stream=None)
auth_datas = {
"username": USERNAME,
"password": PASSWORD,
"grant_type": "password",
"client_id": "zephir-api"
}
auth_url = f'https://{URL}/auth/realms/{SSO_REALM}/protocol/openid-connect/token'
response = requests.post(auth_url, data=auth_datas, verify=False).json()
token = response.get('access_token', '')
try:
with open(token_file, 'w') as t_file:
t_file.write(token)
except:
# cannot write token file, proceed anyway
pass
return token
def help():
print("""
Usage : zephir-client [-h] domain action
""")
sys.exit(1)
def main():
req = requests.get(f'https://{URL}/api/{version}', verify=False)
json = req.json()
config = Config(json['options'])
parser = TiramisuCmdlineParser(config, fullpath=False)
# bypass mandatory validation in parser
parser.parse_args(valid_mandatory=False)
config = parser.get_config()
for key in config.value.mandatory():
if not config.option(key).option.issymlinkoption():
text = input("* "+config.option(key).option.doc() + " : ")
if config.option(key).option.type() == 'int':
config.option(key).value.set(int(text))
else:
config.option(key).value.set(text)
send(config)
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment