🚧 Too much stuff for one commit. Do better Neill

This commit is contained in:
Neill Cox 2023-11-15 10:04:47 +11:00
parent dc510177bb
commit 0681468b83
7 changed files with 291 additions and 212 deletions

View file

@ -23,12 +23,8 @@ def parse_args():
parser.add_argument("--local-hostname", required=True)
parser.add_argument("--user-data", default=template_path / "user-data.tpl")
parser.add_argument("--meta-data", default=template_path / "meta-data.tpl")
parser.add_argument(
"--network-data", default=template_path / "network-config.tpl"
)
parser.add_argument(
"--instance-id", required=True, help="Hostname for the new VM"
)
parser.add_argument("--network-data", default=template_path / "network-config.tpl")
parser.add_argument("--instance-id", required=True, help="Hostname for the new VM")
parser.add_argument("--output-image", required=True)
parser.add_argument("--image-size", default="800G")
parser.add_argument("--input-image", required=True)
@ -148,10 +144,7 @@ def create_image(args):
print(result)
print("Resizing image")
cmd = (
f"virt-resize --expand /dev/sda3 {args.input_image} "
f"{args.output_image}"
)
cmd = f"virt-resize --expand /dev/sda3 {args.input_image} " f"{args.output_image}"
result = subprocess.check_output(cmd, shell=True, universal_newlines=True)
@ -202,17 +195,12 @@ def delete_user_data():
def install_packages(args):
"""Install the packages needed for virt-install to work"""
cmd = (
"sudo dnf install -y virt-install virt-viewer qemu-img "
"libguestfs.x86_64"
)
cmd = "sudo dnf install -y virt-install virt-viewer qemu-img " "libguestfs.x86_64"
print("installing needed packages...")
if args.verbose:
print(
subprocess.check_output(cmd, shell=True, universal_newlines=True)
)
print(subprocess.check_output(cmd, shell=True, universal_newlines=True))
def main():

View file

@ -77,9 +77,7 @@ def parse_args():
)
if not args.public_net_end:
args.public_net_end = get_from_env(
"--public-net-end", "AIO_PUBLIC_NET_END"
)
args.public_net_end = get_from_env("--public-net-end", "AIO_PUBLIC_NET_END")
if not args.dns_server:
args.dns_server = get_from_env("--dns-server", "AIO_DNS_SERVER")
@ -133,9 +131,7 @@ def create_user(args):
cmd = "user list -f json"
user_exists = [
x
for x in json.loads(openstack_cmd(cmd, args))
if x["Name"] == args.username
x for x in json.loads(openstack_cmd(cmd, args)) if x["Name"] == args.username
]
if user_exists:
@ -234,16 +230,14 @@ def create_private_network(args):
args.private_network_id = network_exists[0]["ID"]
else:
cmd = (
f"network create -f json --internal private "
f"--project {args.project_id}"
f"network create -f json --internal private " f"--project {args.project_id}"
)
args.private_network_id = json.loads(openstack_cmd(cmd, args))["id"]
print("Private network created.")
subnet_exists = json.loads(
openstack_cmd(
f"subnet list -f json --project {args.project_id} "
f"--name private-net",
f"subnet list -f json --project {args.project_id} " f"--name private-net",
args,
)
)
@ -393,8 +387,7 @@ def create_instance(args, name, flavor, image, security_group, boot_size):
"floating ip create -f json public", args, as_json=True
)
_ = test_user_openstack_cmd(
f"server add floating ip {server['id']} "
f"{fip['floating_ip_address']}",
f"server add floating ip {server['id']} " f"{fip['floating_ip_address']}",
args,
)
@ -441,9 +434,7 @@ def create_keypair(args):
key_exists = [
kp
for kp in json.loads(
test_user_openstack_cmd("keypair list -f json ", args)
)
for kp in json.loads(test_user_openstack_cmd("keypair list -f json ", args))
if kp["Name"] == "test_keypair"
]
@ -458,8 +449,7 @@ def create_keypair(args):
args.keypair_name = json.loads(
test_user_openstack_cmd(
f"keypair create -f json "
f"--public-key {fname} test_keypair",
f"keypair create -f json " f"--public-key {fname} test_keypair",
args,
)
)[
@ -478,9 +468,7 @@ def create_router(args):
try:
router_id = [
router
for router in openstack_cmd(
"router list -f json", args, as_json=True
)
for router in openstack_cmd("router list -f json", args, as_json=True)
if router["Name"] == "test-router"
][0]["ID"]
except IndexError:
@ -492,16 +480,13 @@ def create_router(args):
)["id"]
print("router created")
router_info = openstack_cmd(
f"router show -f json {router_id}", args, as_json=True
)
router_info = openstack_cmd(f"router show -f json {router_id}", args, as_json=True)
if router_info["external_gateway_info"]:
print("router gateway already set")
else:
openstack_cmd(
f"router set {router_id} "
f"--external-gateway {args.public_network_id}",
f"router set {router_id} " f"--external-gateway {args.public_network_id}",
args,
)
print("router gateway added")
@ -536,9 +521,7 @@ def create_security_group(args):
args.sg_id = sg_exists[0]["ID"]
else:
sec_grp = json.loads(
test_user_openstack_cmd(
"security group create test-sg -f json", args
)
test_user_openstack_cmd("security group create test-sg -f json", args)
)
args.sg_id = sec_grp["id"]

View file

@ -17,11 +17,12 @@ def parse_args():
parser.add_argument("-n", "--project-name", default="test-project")
parser.add_argument("-u", "--username", default="test-user")
parser.add_argument("-p", "--password", default="secrete123")
parser.add_argument("-c", "--cloud", default="standalone")
parser.add_argument("-c", "--cloud", default="dst_admin")
parser.add_argument("--delete-all", action="store_true")
parser.add_argument("--delete-images", action="store_true")
parser.add_argument("--delete-flavors", action="store_true")
parser.add_argument("--delete-networks", action="store_true")
parser.add_argument("--delete-private-network", action="store_true")
parser.add_argument("--delete-public-network", action="store_true")
parser.add_argument("--delete-instances", action="store_true")
parser.add_argument("--delete-user", action="store_true")
parser.add_argument("--delete-project", action="store_true")
@ -61,9 +62,7 @@ def destroy_user(args):
"""Delete the user if it exists"""
cmd = "user list -f json"
user_exists = [
x
for x in json.loads(openstack_cmd(cmd, args))
if x["Name"] == args.username
x for x in json.loads(openstack_cmd(cmd, args)) if x["Name"] == args.username
]
if user_exists:
@ -81,16 +80,12 @@ def destroy_public_network(args):
public_network_exists = [
network
for network in openstack_cmd(
"network list -f json", args, as_json=True
)
for network in openstack_cmd("network list -f json", args, as_json=True)
if network["Name"] == "public"
]
if public_network_exists:
routers = test_user_openstack_cmd(
"router list -f json", args, as_json=True
)
routers = test_user_openstack_cmd("router list -f json", args, as_json=True)
for router in routers:
router_details = test_user_openstack_cmd(
@ -99,8 +94,7 @@ def destroy_public_network(args):
for interface in router_details["interfaces_info"]:
test_user_openstack_cmd(
f"router remove port {router['ID']} "
f"{interface['port_id']}",
f"router remove port {router['ID']} " f"{interface['port_id']}",
args,
)
@ -113,22 +107,19 @@ def destroy_private_network(args):
"""Delete the private network"""
private_network_exists = [
network
for network in openstack_cmd(
"network list -f json", args, as_json=True
)
for network in test_user_openstack_cmd("network list -f json", args, as_json=True)
if network["Name"] == "private"
]
if private_network_exists:
print("deleting private network")
test_user_openstack_cmd("network delete private", args)
test_user_openstack_cmd(f"network delete private", args)
def delete_flavor(args, name):
flavor_exists = [
flavor
for flavor in openstack_cmd(
"flavor list -f json --all", args, as_json=True
)
for flavor in openstack_cmd("flavor list -f json --all", args, as_json=True)
if flavor["Name"] == name
]
if flavor_exists:
@ -138,20 +129,21 @@ def delete_flavor(args,name):
else:
print(f"{name} flavour not found")
def destroy_cirros_flavor(args):
"""Delete the cirros flavor"""
delete_flavor(args, "cirros")
def destroy_rhel_flavor(args):
"""Delete the rhel flavor"""
delete_flavor(args, "rhel")
def delete_image(args, name):
image_exists = [
image
for image in openstack_cmd(
"image list -f json", args, as_json=True
)
for image in openstack_cmd("image list -f json", args, as_json=True)
if image["Name"] == name
]
@ -161,14 +153,17 @@ def delete_image(args, name):
else:
print(f"{name} image not found")
def destroy_cirros_image(args):
"""Delete the cirros image"""
delete_image(args, "cirros_image")
def destroy_rhel_image(args):
"""Delete the rhel image"""
delete_image(args, "rhel_image")
def destroy_cirros_instance(args):
"""Delete the cirros instance"""
try:
@ -206,12 +201,11 @@ def get_project_id(args):
result = openstack_cmd("project list -f json", args, as_json=True)
try:
args.project_id = [r for r in result if r["Name"] == args.project_name][0][
"ID"
]
args.project_id = [r for r in result if r["Name"] == args.project_name][0]["ID"]
except IndexError:
args.project_id = None
def main():
"""main function"""
args = parse_args()
@ -233,9 +227,10 @@ def main():
destroy_cirros_flavor(args)
destroy_rhel_flavor(args)
if args.delete_networks or args.delete_all:
if args.delete_public_network:
destroy_public_network(args)
if args.delete_private_network or args.delete_all:
if args.project_id:
destroy_private_network(args)
else:

View file

@ -1,43 +1,42 @@
"""
Quick and dirty script to validate that the necessary project, user,roles,
flavors, networks, images have been created
flavors, networks, images have not been created
"""
import argparse
import json
from rich import print
from .utils import openstack_cmd
def failed(msg, args):
args.failed = True
print(f"[red]{msg} :x:[/red]")
def passed(msg):
print(f"[green]{msg} :white_heavy_check_mark:[/green]")
def parse_args():
"""Parse the command line arguments"""
# home = os.environ.get('HOME')
parser = argparse.ArgumentParser()
# parser.add_argument("-d", "--project-domain", default="default")
parser.add_argument("-n", "--project-name", default="test-project")
# parser.add_argument(
# "-D", "--project-description", default="Test project"
# )
# parser.add_argument("-u", "--username", default="test-user")
# parser.add_argument("-p", "--password", default="secrete123")
# parser.add_argument("-c", "--cloud", default="standalone")
# parser.add_argument("-g", "--gateway", default="10.76.23.254")
# parser.add_argument(
# "-C", "--public-network-cider", default="10.76.23.0/24"
# )
# parser.add_argument("--private-network-cidr", default="192.168.100.0/24")
# parser.add_argument("--publice-net-start", default="10.76.23.50")
# parser.add_argument("--publice-net-end", default="10.76.23.52")
# parser.add_argument("--dns-server", default="10.76.23.245")
parser.add_argument("-u", "--username", default="test-user")
parser.add_argument("-c", "--cloud", default="dst_admin")
parser.add_argument("--private-network-name", default="private")
parser.add_argument("--cirros-flavor-name", default="cirros")
parser.add_argument("--rhel-flavor-name", default="rhel")
parser.add_argument("--cirros-instance-name", default="cirros-test-instance")
parser.add_argument("--rhel-instance-name", default="rhel-test-instance")
parser.add_argument("--router-name", default="test-router")
parser.add_argument("--group-name", default="os-migrate")
# parser.add_argument("--dry-run", action="store_true")
parser.add_argument(
"--ssh", help="Connection string to run commands on a remote host."
)
# export OS_CLOUD=standalone
# export STANDALONE_HOST=10.76.23.39
# parser.add_argument(
# "--ssh", help="Connection string to run commands on a remote host."
# )
args = parser.parse_args()
@ -54,107 +53,234 @@ def validate_project(args):
]
if project_exists:
print(project_exists)
passed(f"project {args.project_name} exists with id {project_exists[0]['ID']}")
else:
print("Project {args.project_name} not found.")
failed("Project {args.project_name} not found.", args)
def validate_user(args):
"""Validate that the user exists"""
cmd = "user list -f json"
user_exists = [
x
for x in json.loads(openstack_cmd(cmd, args))
if x["Name"] == args.username
x for x in json.loads(openstack_cmd(cmd, args)) if x["Name"] == args.username
]
if user_exists:
print(user_exists)
passed(f"user {args.username} exists with ID: {user_exists[0]['ID']}")
else:
print("User not found")
failed("User not found", args)
def validate_member_role(args):
"""
Validate that the member role has been assigned to the user.
"""
print(args)
# cmd = (
# f"role add --user {args.username} "
# f"--project {args.project_id} member"
# )
cmd = f"role assignment list --user {args.username} --names --project {args.project_name} --role member -f json"
# result = openstack_cmd(cmd, args)
if len(openstack_cmd(cmd, args, as_json=True)) > 0:
passed(f"User has member role in {args.project_name}")
else:
failed(f"User does not have member role in {args.project_name}")
# cmd = f"role assignment list --user {args.user_id} --role member -f json"
# result = json.loads(openstack_cmd(cmd, args))
# if result:
# print("User has member role")
def validate_group(args):
check(
args,
"group list -f json",
"group",
f"group {args.group_name} exists",
"Name",
"os-migrate",
fail_if="not found"
)
def validate_groups_has_member_role(args):
cmd = f"role assignment list --group {args.group_name} --names --project {args.project_name} --role member -f json"
if len(openstack_cmd(cmd, args, as_json=True)) > 0:
passed(f"Group {args.group_name} has member role in {args.project_name}")
else:
failed(f"Group {args.group_name} does not have member role in {args.project_name}")
def validate_admin_in_group(args):
cmd = f"group contains user {args.group_name} admin"
result = openstack_cmd(cmd, args, as_json=False)
if "admin in group" in result:
passed(result[:-1])
else:
failed(result[:-1], args)
def validate_public_network(args):
"""Coming soon - validate the public network"""
# pylint: disable=unused-argument,unused-variable
print("Validate public network - NYI")
cmd = (
"network create --external --provider-physical-network datacentre "
"--provider-network-type flat public"
cmd = "network list -f json"
try:
network_exists = [
n for n in openstack_cmd(cmd, args, as_json=True) if n["Name"] == "public"
][0]
passed(f"public network exists with id: {network_exists['ID']}")
except IndexError:
failed("public network not found", args)
cmd = f"subnet list -f json"
try:
subnet = [
s
for s in openstack_cmd(cmd, args, as_json=True)
if s["Name"] == "public-net"
][0]
subnet_details = openstack_cmd(
f"subnet show -f json {subnet['ID']}", args, as_json=True
)
cmd = (
f"subnet create public-net --subnet-range {args.publice_network_cidr} "
f"--no-dhcp --gateway {args.gateway} --allocation-pool "
f"start={args.public_net_start},end={args.public_net_end} "
"--network public"
passed(
f"public subnet found allocation pool: {subnet_details['allocation_pools'][0]}"
)
print(cmd)
except IndexError:
failed("public subnet not found", args)
def validate_private_network(args):
"""Coming soon - validate the private network"""
# pylint: disable=unused-argument,unused-variable
_ = "openstack network create --internal private"
_ = (
"openstack subnet create private-net "
f"--subnet-range {args.private_network_cidr} --network private"
cmd = "network list -f json"
try:
network_exists = [
n
for n in openstack_cmd(cmd, args, as_json=True)
if n["Name"] == args.private_network_name
][0]
failed(
f"private ({args.private_network_name}) network exists with id: {network_exists['ID']}",
args,
)
print("validate private network - NYI")
except IndexError:
passed("private network not found")
def validate_cirros_flavor(args):
"""Coming soon - create the cirros flavor"""
# pylint: disable=unused-argument
print("validate cirros flavor - NYI")
"""Validate that the cirros flavor doesn't exist"""
cmd = "flavor list -f json --all"
try:
flavor = [
f
for f in openstack_cmd(cmd, args, as_json=True)
if f["Name"] == args.cirros_flavor_name
][0]
failed(f"cirros flavor found (ID:{flavor['ID']})", args)
except IndexError:
passed("Cirros flavor not found.")
def validate_rhel_flavor(args):
"""Coming soon - validate the rhel flavor"""
# pylint: disable=unused-argument
print("validate rhel flavor - NYI")
"""Validate that the rhel flavor doesn't exist"""
cmd = "flavor list -f json --all"
try:
flavor = [
f
for f in openstack_cmd(cmd, args, as_json=True)
if f["Name"] == args.rhel_flavor_name
][0]
failed(f"rhel flavor found (ID:{flavor['ID']})", args)
except IndexError:
passed("RHEL flavor not found.")
def validate_cirros_image(args):
"""Coming soon - validate the cirros image"""
# pylint: disable=unused-argument
print("validate cirros image - NYI")
"""Validate that the cirros image does not exist"""
cmd = "image list -f json --all"
try:
image = [
f
for f in openstack_cmd(cmd, args, as_json=True)
if f["Name"] == args.cirros_image_name
][0]
failed(f"Cirros image found (ID:{image['ID']})", args)
except IndexError:
passed("Cirros image not found.")
def validate_rhel_image(args):
"""Coming soon - validate the rhel image"""
"""Validate that the rhel image does not exist"""
# pylint: disable=unused-argument
print("validate rhel image - NYI")
cmd = "image list -f json --all"
try:
image = [
f
for f in openstack_cmd(cmd, args, as_json=True)
if f["Name"] == args.rhel_image_name
][0]
failed(f"RHEL image found (ID:{image['ID']})", args)
except IndexError:
passed("RHEL image not found.")
def validate_cirros_instance(args):
"""Coming soon - validate the cirros instance"""
# pylint: disable=unused-argument
print("validate cirros instance - NYI")
"""Validate that the cirros instance does not exist"""
cmd = "server list -f json --all"
try:
instance = [
f
for f in openstack_cmd(cmd, args, as_json=True)
if f["Name"] == args.cirros_instance_name
][0]
failed(f"Cirros instance found (ID:{instance['ID']})", args)
except IndexError:
passed("Cirros instance not found.")
def validate_rhel_instance(args):
"""Coming soon - create the rhel instance"""
# pylint: disable=unused-argument
print("validate rhel instance - NYI")
"""Validate that the RHEL instance does not exist"""
cmd = "server list -f json --all"
try:
instance = [
f
for f in openstack_cmd(cmd, args, as_json=True)
if f["Name"] == args.rhel_instance_name
][0]
failed(f"RHEL instance found (ID:{instance['ID']})", args)
except IndexError:
passed("RHEL instance not found.")
def check(args, cmd, thing, msg, key, value, id_key="ID", fail_if="found"):
if fail_if not in ["found", "not found"]:
raise ValueError("fail_if must be eitther 'found' or 'not found'")
try:
found = [i for i in openstack_cmd(cmd, args, as_json=True) if i[key] == value][0]
if fail_if == "found":
failed(msg.format(key=id_key, value=found[id_key]), args)
else:
passed(f"{thing} not found")
except IndexError:
if fail_if == "found":
passed(f"{thing} not found")
else:
failed(msg.format(key=id_key, value=found[id_key]), args)
def validate_router(args):
check(
args,
f"router list --project {args.project_name} -f json",
"router",
"router found ",
"Name",
args.router_name,
)
def main():
@ -164,6 +290,9 @@ def main():
validate_user(args)
validate_project(args)
validate_member_role(args)
validate_group(args)
validate_groups_has_member_role(args)
validate_admin_in_group(args)
validate_public_network(args)
validate_private_network(args)
validate_cirros_flavor(args)
@ -172,6 +301,7 @@ def main():
validate_rhel_image(args)
validate_cirros_instance(args)
validate_rhel_instance(args)
validate_router(args)
if __name__ == "__main__":

View file

@ -5,39 +5,19 @@ flavors, networks, images have been created
import argparse
import json
from .utils import openstack_cmd
from .utils import openstack_cmd, passed, failed
def parse_args():
"""Parse the command line arguments"""
# home = os.environ.get('HOME')
parser = argparse.ArgumentParser()
# parser.add_argument("-d", "--project-domain", default="default")
parser.add_argument("-n", "--project-name", default="test-project")
# parser.add_argument(
# "-D", "--project-description", default="Test project"
# )
# parser.add_argument("-u", "--username", default="test-user")
# parser.add_argument("-p", "--password", default="secrete123")
# parser.add_argument("-c", "--cloud", default="standalone")
# parser.add_argument("-g", "--gateway", default="10.76.23.254")
# parser.add_argument(
# "-C", "--public-network-cider", default="10.76.23.0/24"
# )
# parser.add_argument("--private-network-cidr", default="192.168.100.0/24")
# parser.add_argument("--publice-net-start", default="10.76.23.50")
# parser.add_argument("--publice-net-end", default="10.76.23.52")
# parser.add_argument("--dns-server", default="10.76.23.245")
# parser.add_argument("--dry-run", action="store_true")
parser.add_argument(
"--ssh", help="Connection string to run commands on a remote host."
)
# export OS_CLOUD=standalone
# export STANDALONE_HOST=10.76.23.39
parser.add_argument("-u", "--username", default="test-user")
parser.add_argument("-c", "--cloud", default="src_admin")
parser.add_argument("--private-network-name", default="private")
parser.add_argument("--cirros-flavor-name", default="cirros")
parser.add_argument("--rhel-flavor-name", default="rhel")
args = parser.parse_args()
@ -54,33 +34,34 @@ def validate_project(args):
]
if project_exists:
print(project_exists)
passed(project_exists)
else:
print("Project {args.project_name} not found.")
failed("Project {args.project_name} not found.", args)
def validate_user(args):
"""Validate that the user exists"""
cmd = "user list -f json"
user_exists = [
x
for x in json.loads(openstack_cmd(cmd, args))
if x["Name"] == args.username
x for x in json.loads(openstack_cmd(cmd, args)) if x["Name"] == args.username
]
if user_exists:
print(user_exists)
passed(user_exists)
else:
print("User not found")
failed("User not found", args)
def validate_group_exists(args):
failed("NYI", args)
def validate_group_exists(args)
print("NYI")
def validate_admin_in_group(args):
print("NYI")
failed("NYI", args)
def validate_group_has_member_role(args):
print("NYI")
failed("NYI", args)
def validate_member_role(args):
@ -105,7 +86,7 @@ def validate_member_role(args):
def validate_public_network(args):
"""Coming soon - validate the public network"""
# pylint: disable=unused-argument,unused-variable
print("Validate public network - NYI")
failed("Validate public network - NYI", args)
cmd = (
"network create --external --provider-physical-network datacentre "
"--provider-network-type flat public"
@ -127,43 +108,43 @@ def validate_private_network(args):
"openstack subnet create private-net "
f"--subnet-range {args.private_network_cidr} --network private"
)
print("validate private network - NYI")
failed("validate private network - NYI", args)
def validate_cirros_flavor(args):
"""Coming soon - create the cirros flavor"""
# pylint: disable=unused-argument
print("validate cirros flavor - NYI")
failed("validate cirros flavor - NYI", args)
def validate_rhel_flavor(args):
"""Coming soon - validate the rhel flavor"""
# pylint: disable=unused-argument
print("validate rhel flavor - NYI")
failed("validate rhel flavor - NYI", args)
def validate_cirros_image(args):
"""Coming soon - validate the cirros image"""
# pylint: disable=unused-argument
print("validate cirros image - NYI")
failed("validate cirros image - NYI", args)
def validate_rhel_image(args):
"""Coming soon - validate the rhel image"""
# pylint: disable=unused-argument
print("validate rhel image - NYI")
failed("validate rhel image - NYI", args)
def validate_cirros_instance(args):
"""Coming soon - validate the cirros instance"""
# pylint: disable=unused-argument
print("validate cirros instance - NYI")
failed("validate cirros instance - NYI", args)
def validate_rhel_instance(args):
"""Coming soon - create the rhel instance"""
# pylint: disable=unused-argument
print("validate rhel instance - NYI")
failed("validate rhel instance - NYI", args)
def main():
@ -171,9 +152,9 @@ def main():
args = parse_args()
validate_user(args)
validate_group(args)
validate_group_exists(args)
validate_admin_in_group(args)
validate_admin_has_member_role(args)
validate_group_has_member_role(args)
validate_public_network(args)
validate_private_network(args)
validate_cirros_flavor(args)

View file

@ -19,9 +19,7 @@ def parse_args():
parser.add_argument("-a", "--address", required=True)
parser.add_argument("-i", "--interface", required=True)
parser.add_argument("-m", "--netmask", default=24)
parser.add_argument(
"-d", "--dns", nargs="+", action="append", required=True
)
parser.add_argument("-d", "--dns", nargs="+", action="append", required=True)
parser.add_argument("-D", "--domain", default="aio")
parser.add_argument("--using-multiple-nics", action="store_true")
parser.add_argument("-U", "--deployment-user")
@ -41,8 +39,7 @@ def parse_args():
parser.add_argument(
"--cinder-pool-size",
default="500280",
help="Size of the loopback device allocated to cinder. "
"Defaults to 500280",
help="Size of the loopback device allocated to cinder. " "Defaults to 500280",
)
args = parser.parse_args()
@ -141,16 +138,12 @@ def main():
args = parse_args()
containers_yaml = build_containers_yaml(args)
with open(
args.containers_yaml_out, "w", encoding="utf-8"
) as containers_out:
with open(args.containers_yaml_out, "w", encoding="utf-8") as containers_out:
containers_out.write(yaml.dump(containers_yaml))
print(f"containers yaml written to {args.containers_yaml_out}")
standalone_parameters = get_standalone_parameters(args)
with open(
args.standalone_yaml_out, "w", encoding="utf-8"
) as parameters_out:
with open(args.standalone_yaml_out, "w", encoding="utf-8") as parameters_out:
parameters_out.write(yaml.dump(standalone_parameters))
print(f"standalone parameters yaml written to {args.standalone_yaml_out}")

View file

@ -4,6 +4,8 @@ import os
import subprocess
import sys
from rich import print
def get_from_env(name, envvar, required=True):
"""Get the value for a parameter from an environment variable"""
@ -27,18 +29,16 @@ def openstack_cmd(cmd, args, as_json=False):
cmd = cmd.replace("\n", " ")
cmd = f"OS_CLOUD={args.cloud} openstack " + cmd
if args.ssh:
if "ssh" in args and args.ssh:
cmd = f'ssh {args.ssh} "{cmd}"'
if args.dry_run:
if "dry_run" in args and args.dry_run:
print("dry-run specified. Not executing cmd. cmd was:")
print(cmd)
return None
try:
result = subprocess.check_output(
cmd, shell=True, universal_newlines=True
)
result = subprocess.check_output(cmd, shell=True, universal_newlines=True)
if as_json:
result = json.loads(result)
@ -64,3 +64,12 @@ def test_user_openstack_cmd(cmd, args, as_json=False):
print(cmd)
return openstack_cmd(cmd, args, as_json=as_json)
def failed(msg, args):
args.failed = True
print(f"[red]{msg} :x:[/red]")
def passed(msg):
print(f"[green]{msg} :white_heavy_check_mark:[/green]")