Initial commit

This commit is contained in:
Neill Cox 2023-09-11 20:22:22 +10:00
commit 1b51716d1b
72 changed files with 8204 additions and 0 deletions

View file

@ -0,0 +1,308 @@
"""
This module containts all the entry points for the redfish cli tools.
It uses argparse to add subparses.
All of this is developed and tested against Dell iDRACs at the moment. At some
point in its future there will be a lot of refactoring to deal with other OEMs
I expect.
"""
import json
import logging
import sys
# PYTHON_ARGCOMPLETE_OK
import argcomplete
from redfish_cli import api
from redfish_cli.api import storage
import redfish_cli.cli.jobs
from .parsers import parse_args
from .utils import error, table, write_output
def chassis(args):
"""Entry point: get the list of chassis"""
details = api.chassis(args.server, args.username, args.password)
write_output(details, output_format=args.format)
def chassis_details(args):
"""Entry point: get the details of a chassis"""
details = api.chassis_details(
args.server, args.username, args.password, args.chassis
)
write_output(details, output_format=args.format)
def create_logical_volume(args):
"""Stub for the create logical volume command"""
try:
result = storage.create_logical_volume(args)
write_output(result)
except api.exceptions.InvalidRaidLevel as exc:
error(exc.msg)
except api.exceptions.LogicalVolumeCreationFailure as exc:
error(json.dumps(exc.json, indent=2))
def delete_logical_volume(args):
"""Delete a logical volume"""
logging.debug(args)
try:
result = api.storage.delete_logical_volume(args)
write_output(
result,
output_format=args.format,
)
# except api.exceptions.FailedToGetJobID as exc:
# details = exc.msg
# error_msg = (
# "An error has occured. The usual cause is that the disk "
# f"specified ({args.disk}) does not exist\n"
# f"{details}"
# )
# error(error_msg)
except api.exceptions.ResponseNotOK as exc:
details = exc.response.text
error_msg = (
"An error has occured. The usual cause is that the disk "
f"specified ({args.disk}) does not exist\n"
f"{details}"
)
error(error_msg)
def configure_logging(args):
"""Configure logging for the cli"""
log_format = (
"%(asctime)s - %(pathname)s:%(funcName)s - %(levelname)s - %(message)s"
)
logging.basicConfig(
filename="redfish.log",
level=getattr(logging, args.log_level.upper()),
format=log_format,
)
def get(args):
"""
Write the results of an arbitrary get on the specified url to stdout (using
print).
Currently it always dumps the results as json, but it should respect the
the output format in the args.
It will also just exit with an error message if a url requires
authentication and no username and password is supplied. This could be
handled better.
"""
url = args.url
if url is None:
url = ""
if args.expand:
url += "?$expand=.($levels=1)"
try:
result = api.utils.get(
args.server,
url,
username=args.username,
password=args.password,
)
write_output(
result,
output_format=args.format,
)
except api.exceptions.Unauthorized:
error("This url requires authentication\n")
except api.exceptions.ResponseNotOK as exc:
error(json.dumps(json.loads(exc.response.text), indent=2))
def logical_volume(args):
"""
Entry point: get the details of a logical volume.
By default the first logical drive in the first storage controller of the
first system.
"""
logging.debug(
"server: %s username: %s system: %s controller: %s drive: %s",
args.server,
args.username,
args.system,
args.controller,
args.drive,
)
try:
write_output(
storage.logical_volume_details(args),
output_format=args.format,
)
except api.exceptions.ResponseNotOK as exc:
error(json.dumps(json.loads(exc.response.text), indent=2))
def logical_volumes(args):
"""
Entry point: get the list of logical volumes attached to a storage
controller (which in turn is part of a system)
"""
try:
result = storage.list_logical_volumes(args)
write_output(
result,
output_format=args.format,
)
except api.exceptions.ResponseNotOK as exc:
error(json.dumps(json.loads(exc.response.text), indent=2))
def physical_volume(args):
"""
Entry point: get the list of physical volumes attached to a storage
controller (which in turn is part of a system)
"""
data = storage.physical_volume(args)
# if args.format == "text":
# result = "\n".join([ f"{x['system']} {x['drive']}" for x in data])
# elif args.format == "json":
# result = data
# elif args.format == "table":
# if args.verbose:
# table = [ [x['system'], x['drive'], x['url']] for x in data]
# headers=["System", "Drive", "URL"]
# else:
# table = [ [x['system'], x['drive']] for x in data]
# headers=["System", "Drive"]
# result = tabulate(table, headers=headers, tablefmt="outline")
result = data
write_output(result, output_format=args.format)
def physical_volumes(args):
"""
Entry point: get the list of physical volumes attached to a storage
controller (which in turn is part of a system)
"""
data = storage.list_physical_volumes(args)
if args.format == "text":
result = "\n".join([f"{x['system']} {x['drive']}" for x in data])
elif args.format == "json":
result = data
elif args.format == "table":
if args.verbose:
data = [[x["system"], x["drive"], x["url"]] for x in data]
headers = ["System", "Drive", "URL"]
else:
data = [[x["system"], x["drive"]] for x in data]
headers = ["System", "Drive"]
result = table(data, headers=headers)
write_output(result, output_format=args.format)
def product(args):
"""Entry point: get the product name of a server"""
details = api.product(args.server)
write_output(details, output_format=args.format)
def redfish(args=None):
"""
Entrypoint: Base command, gets args from stdin unless they are passed in as
a parameter (which probably means we are being called from a test)
"""
if args is None:
args = parse_args(sys.argv[1:])
if args.debug:
args.log_level = "DEBUG"
configure_logging(args)
if "func" in args:
args.func(args)
else:
write_output(api.utils.get(args.server, ""), output_format=args.format)
def service_tag(args):
"""Entry point: get the service tag of a Dell server"""
try:
write_output(api.service_tag(args.server), output_format=args.format)
except api.exceptions.DellOnly:
error("Service tags are only available for Dell iDRACs")
def storage_controller(args):
"""Entry point: get the details of a storage controllers in a server"""
try:
result = storage.storage_controller_details(args)
write_output(
result,
output_format=args.format,
)
except api.exceptions.ResponseNotOK as exc:
error(json.dumps(json.loads(exc.response.text), indent=2))
def storage_controllers(args):
"""Entry point: get the list of storage controllers in a server"""
result = storage.list_storage_controllers(args)
if not args.verbose:
controller_list = []
for i in result["Members"]:
controller_list.append(i["@odata.id"].split("/")[-1])
result = controller_list
if args.format == "text":
result = "\n".join(result)
elif args.format == "table":
data = []
for row in result:
data.append([row])
result = table(data, headers=["Controller"])
write_output(
result,
output_format=args.format,
)
def system_details(args):
"""Entry point: get the details of a system in a server"""
details = api.power.system_details(
args.server, args.username, args.password, args.system
)
write_output(details, output_format=args.format)
def version(args):
"""Entry point: get the verion of redfish implemented by the BMC"""
redfish_version = api.redfish_version(args.server)
write_output(redfish_version, output_format=args.format)

View file

@ -0,0 +1,75 @@
"""Commands to interact with jobs on a manager. Probably very Dell specific."""
from redfish_cli.cli.utils import write_output, table
from redfish_cli.api import jobs
def jobs_list(args):
"""Get the list of jobs"""
result = jobs.jobs_list(args)
# import bpdb;bpdb.set_trace()
if not args.all_jobs:
result["Members"] = [
member
for member in result["Members"]
if member["PercentComplete"] < 100
]
result["Members@odata.count"] = len(result["Members"])
result["Note"] = "Only showing incomplete jobs"
if args.format != "json":
result = [
[
j["Id"],
j["JobType"],
j["PercentComplete"],
j["Name"],
j["JobState"],
]
for j in result["Members"]
]
if args.format == "table":
result = table(
result,
headers=["Job ID", "Job Type", "Pcnt Complete", "Name", "State"],
)
elif args.format == "text":
result = "\n".join(
[f"{row[0]} {row[1]} {row[2]} {row[3]} {row[4]}" for row in result]
)
write_output(result, output_format=args.format)
def job_details(args):
"""Get the details of a job"""
result = jobs.job_details(args)
write_output(result, output_format=args.format)
def watch_job(args):
"""Watch the progress of a job until it finishes or the user cancels"""
try:
while True:
result = jobs.job_details(args)
output = result
if args.format == "text":
output = " ".join(
[
result["Id"],
result["Name"],
result["JobState"],
str(result["PercentComplete"]),
]
)
write_output(output, output_format=args.format)
if result["PercentComplete"] == 100:
break
except KeyboardInterrupt:
write_output("Ctrl^C detected. Exiting")

View file

@ -0,0 +1,71 @@
"""
Code to set up the parser and subparsers for the cli
"""
# pylint: disable=cyclic-import
# Note should really think about the cyclic import more, but it works for now.
import argparse
# PYTHON_ARGCOMPLETE_OK
import argcomplete
from redfish_cli import cli
from . import base
from . import chassis
from . import dell
from . import jobs
from . import power
from . import storage
from . import system
# from .utils import add_authenticated_subparser, ArgumentTuple, EnvDefault
def parse_args(args):
"""Parse thge arguments supplied"""
parser = argparse.ArgumentParser(
description="Interact with servers via RedFish. Only Dells supported at the moment"
)
parser.add_argument("-s", "--server", required=True, dest="server")
parser.add_argument(
"-f",
"--format",
choices=["json", "table", "text"],
default="json",
)
parser.add_argument(
"--log-level",
default="WARN",
dest="log_level",
choices=[
"critical",
"fatal",
"error",
"warning",
"warn",
"info",
"debug",
],
)
parser.add_argument("--debug", action="store_true")
parser.add_argument("-l", "--log-file", default="redfish.log")
parser.add_argument("-v", "--verbose", action="store_true")
parser.add_argument("-V", "--verify", action="store_true")
subparsers = parser.add_subparsers()
base.add_get(subparsers)
base.add_version_subparser(subparsers)
base.add_product_subparser(subparsers)
chassis.add_chassis_subparser(subparsers)
jobs.add_jobs_subparser(subparsers)
power.add_power_subparser(subparsers)
storage.add_storage_subparser(subparsers)
system.add_system_subparser(subparsers)
dell.add_service_tag_subparser(subparsers)
argcomplete.autocomplete(parser)
return parser.parse_args(args)

View file

@ -0,0 +1,50 @@
"""Subparsers that don't seem to belong anywhere else"""
from redfish_cli import cli
from .utils import (
add_authenticated_subparser,
ArgumentTuple,
PositionalArgumentTuple,
)
def add_version_subparser(subparsers):
"""Utility function to add the subparser for the version command"""
version_subparser = subparsers.add_parser(
"version", help="Get the redfish version"
)
version_subparser.set_defaults(func=cli.version)
def add_get(subparsers):
"""Utility function to add the subparser for the get command."""
add_authenticated_subparser(
subparsers,
"get",
help_text="get an arbitrary url",
func=cli.get,
arguments=[
PositionalArgumentTuple(
name="url",
nargs="?",
),
ArgumentTuple(
short_flag="--E",
long_flag="--expand",
action="store_true",
required=False,
dest="expand",
),
],
)
def add_product_subparser(subparsers):
"""Utility function to add the subparser for the product command."""
product_subparser = subparsers.add_parser(
"product", help="Get the product description"
)
product_subparser.set_defaults(func=cli.product)

View file

@ -0,0 +1,44 @@
"""Subparsers related to chassis management"""
from redfish_cli import cli
from .utils import ArgumentTuple, add_authenticated_subparser, add_subparser
def add_chassis_subparser(subparsers):
"""Add the chassis subparser"""
subparser = add_authenticated_subparser(
subparsers, "chassis"
).add_subparsers()
add_list_chassis_subparser(subparser)
add_show_chassis_subparser(subparser)
def add_list_chassis_subparser(subparsers):
"""Utility function to add the subparser for the chassis command"""
add_subparser(
subparsers,
"list",
help_text="Get the list of chassis",
func=cli.chassis,
)
def add_show_chassis_subparser(subparsers):
"""Utility function to add the subparser for the chassis-details command."""
add_subparser(
subparsers,
"show",
func=cli.chassis_details,
help_text="Get the details of a chassis",
arguments=[
ArgumentTuple(
short_flag="-c",
long_flag="--chassis",
default="System.Embedded.1",
required=False,
dest="chassis",
),
],
)

View file

@ -0,0 +1,14 @@
"""Subparsers that are Dell specific"""
from redfish_cli import cli
def add_service_tag_subparser(subparsers):
"""
Utility function to add the subparser for the service-tag command. This is
a Dell specific command
"""
service_tag_subparser = subparsers.add_parser(
"service-tag", help="Get the service tag (DELL only)"
)
service_tag_subparser.set_defaults(func=cli.service_tag)

View file

@ -0,0 +1,93 @@
"""Subparsers related to job management"""
from .. import jobs
from .utils import (
ArgumentTuple,
PositionalArgumentTuple,
add_authenticated_subparser,
add_subparser,
)
def add_jobs_subparser(subparsers):
"""Add the storage subparser"""
subparser = add_authenticated_subparser(
subparsers, "jobs"
).add_subparsers()
add_job_details_subparser(subparser)
add_jobs_list_subparser(subparser)
add_watch_job_subparser(subparser)
def add_job_details_subparser(subparsers):
"""Utility function to add the subparser for the job-details command"""
add_subparser(
subparsers,
"show",
func=jobs.job_details,
help_text="Get the details of a job",
arguments=[
ArgumentTuple(
short_flag="-m",
long_flag="--manager",
default="iDRAC.Embedded.1",
required=False,
dest="manager",
),
ArgumentTuple(
short_flag="-j",
long_flag="--job-id",
required=True,
dest="job_id",
),
],
)
def add_jobs_list_subparser(subparsers):
"""Utility function to add the subparser for the list-jobs command"""
add_subparser(
subparsers,
"list",
func=jobs.jobs_list,
help_text="List the jobs on a manager. Filters will be added to this later",
arguments=[
ArgumentTuple(
short_flag="-m",
long_flag="--manager",
default="iDRAC.Embedded.1",
required=False,
dest="manager",
),
ArgumentTuple(
short_flag="-a",
long_flag="--all",
action="store_true",
dest="all_jobs",
),
],
)
def add_watch_job_subparser(subparsers):
"""Utility function to add the subparser for the watch-job command"""
add_subparser(
subparsers,
"watch",
func=jobs.watch_job,
help_text="Watch the process of a job until it completes or the user cancels",
arguments=[
ArgumentTuple(
short_flag="-m",
long_flag="--manager",
default="iDRAC.Embedded.1",
required=False,
dest="manager",
),
PositionalArgumentTuple(
name="job_id",
),
],
)

View file

@ -0,0 +1,143 @@
"""Subparsers related to power management"""
from redfish_cli.cli import power
from .utils import (
ArgumentTuple,
PositionalArgumentTuple,
add_authenticated_subparser,
add_subparser,
)
def add_power_subparser(subparsers):
"""Add the chassis subparser"""
subparser = add_authenticated_subparser(
subparsers, "power"
).add_subparsers()
add_get_power_state(subparser)
add_list_power_states(subparser)
add_set_power_state(subparser)
add_reset_system(subparser)
add_power_off(subparser)
add_power_on(subparser)
def add_get_power_state(subparsers):
"""Utility function to add the subparser for the power get command"""
add_subparser(
subparsers,
"get",
help_text="Get the current power state of a system",
func=power.power_state,
arguments=[
ArgumentTuple(
short_flag="-s",
long_flag="--system",
default="System.Embedded.1",
required=False,
dest="system",
),
],
)
def add_list_power_states(subparsers):
"""Utility function to add the subparser for the power list command"""
add_subparser(
subparsers,
"list",
help_text="Get the allowed power state of a system",
func=power.power_states,
arguments=[
ArgumentTuple(
short_flag="-s",
long_flag="--system",
default="System.Embedded.1",
required=False,
dest="system",
),
],
)
def add_set_power_state(subparsers):
"""
Utility function to add the subparser for the set-power-states command
This command is dependent on the OEM implementation of redfish
It assumes that a valid power state has been passed in, but it should probably
do some validation to make the experience nicer.
Could do this by calling api.system_power_states_allowed maybe?
"""
add_subparser(
subparsers,
"set",
help_text=(
"Set the current power state of a system. Use get-power-state to find "
"allowable values"
),
func=power.set_power_state,
arguments=[
ArgumentTuple(
short_flag="-s",
long_flag="--system",
default="System.Embedded.1",
required=False,
dest="system",
),
PositionalArgumentTuple(name="power_state"),
],
)
def add_reset_system(subparsers):
"""Parse for the reset system command"""
add_subparser(
subparsers,
"reset",
help_text=("Reset a server. Convenience method"),
func=power.reset_system,
arguments=[
PositionalArgumentTuple(
name="system",
default="System.Embedded.1",
nargs="?",
),
],
)
def add_power_off(subparsers):
"""Parse for the power off command"""
add_subparser(
subparsers,
"off",
help_text=("Power on a server. Convenience method"),
func=power.power_off,
arguments=[
PositionalArgumentTuple(
name="system",
default="System.Embedded.1",
nargs="?",
),
],
)
def add_power_on(subparsers):
"""Parse for the power on command"""
add_subparser(
subparsers,
"on",
help_text=("Power on a server. Convenience method"),
func=power.power_on,
arguments=[
PositionalArgumentTuple(
name="system",
default="System.Embedded.1",
nargs="?",
),
],
)

View file

@ -0,0 +1,332 @@
"""Subparsers related to storage management"""
from redfish_cli import cli
from .utils import ArgumentTuple, add_authenticated_subparser, add_subparser
def add_storage_subparser(subparsers):
"""Add the storage subparser"""
subparser = add_authenticated_subparser(
subparsers, "storage"
).add_subparsers()
add_logical_volumes_subparser(subparser)
add_physical_volumes_subparser(subparser)
add_controllers_subparser(subparser)
def add_create_logical_volume_subparser(subparsers):
"""Utility function to add the subparser for the chassis command"""
add_subparser(
subparsers,
"create",
help_text="Create a new logical volume",
func=cli.create_logical_volume,
arguments=[
ArgumentTuple(
long_flag="--system",
short_flag="-e",
dest="system",
default="System.Embedded.1",
),
ArgumentTuple(
long_flag="--manager",
short_flag="-m",
dest="manager",
default="iDRAC.Embedded.1",
),
ArgumentTuple(
long_flag="--controller",
short_flag="-c",
dest="controller",
required=True,
),
ArgumentTuple(
short_flag="-d",
long_flag="--disk",
required=True,
dest="disk",
nargs="?",
action="append",
),
ArgumentTuple(
short_flag="-r",
long_flag="--raid-level",
choices=["0", "1", "5", "6"],
required=True,
dest="raid_level",
),
ArgumentTuple(
short_flag="-n",
long_flag="--name",
required=False,
dest="name",
),
ArgumentTuple(
short_flag="-s",
long_flag="--size",
required=False,
dest="size",
type=int,
),
ArgumentTuple(
short_flag="-S",
long_flag="--stripe-size",
required=False,
dest="stripe_size",
type=int,
),
ArgumentTuple(
short_flag="-D",
long_flag="--disk-cache-policy",
required=False,
dest="disk_cache_policy",
choices=["Enabled", "Disabled"],
),
ArgumentTuple(
short_flag="-R",
long_flag="--read-cache-policy",
required=False,
dest="read_cache_policy",
choices=["Off", "ReadAhead", "AdaptiveReadAhead"],
),
ArgumentTuple(
short_flag="-W",
long_flag="--write-cache-policy",
required=False,
dest="write_cache_policy",
choices=[
"ProtectedWriteBack",
"UnprotectedWriteBack",
"WriteThrough",
],
),
ArgumentTuple(
short_flag="-Z",
long_flag="--secure",
required=False,
action="store_true",
dest="secure",
),
],
)
def add_delete_logical_volume_subparser(subparsers):
"""Utility function to add the subparser for the delete logical volume command"""
add_subparser(
subparsers,
"delete",
help_text="Delete a logical volume",
func=cli.delete_logical_volume,
arguments=[
ArgumentTuple(
long_flag="--system",
short_flag="-e",
dest="system",
default="System.Embedded.1",
),
ArgumentTuple(
short_flag="-d",
long_flag="--disk",
required=True,
dest="disk",
),
],
)
def add_logical_volume_subparser(subparsers):
"""Utility function to add the subparser for the logical-volume command."""
add_subparser(
subparsers,
"show",
func=cli.logical_volume,
arguments=[
ArgumentTuple(
"-S", "--system", "System.Embedded.1", False, "system"
),
ArgumentTuple(
short_flag="-c",
long_flag="--controller",
default=None,
required=False,
dest="controller",
),
ArgumentTuple(
short_flag="-d",
long_flag="--drive",
default=None,
required=False,
dest="drive",
),
ArgumentTuple(
short_flag="-i",
long_flag="--drive-index",
default=0,
required=False,
dest="drive_index",
),
],
)
def add_list_logical_volumes_subparser(subparsers):
"""Utility function to add the subparser for the logical-volumes command."""
# logical-volumes subparser
add_subparser(
subparsers,
"list",
func=cli.logical_volumes,
arguments=[
ArgumentTuple(
"-S", "--system", "System.Embedded.1", False, "system"
),
ArgumentTuple(
short_flag="-c",
long_flag="--controller",
default=None,
required=False,
dest="controller",
),
ArgumentTuple(
short_flag="-e",
long_flag="--expand",
action="store_false",
required=False,
dest="expand",
),
],
)
def add_physical_volume_subparser(subparsers):
"""Utility function to add the subparser for the physical-volumes command."""
add_subparser(
subparsers,
"show",
func=cli.physical_volume,
arguments=[
ArgumentTuple(
short_flag="-S",
long_flag="--system",
default=None,
required=False,
dest="system",
),
ArgumentTuple(
short_flag="-c",
long_flag="--controller",
default=None,
required=False,
dest="controller",
),
ArgumentTuple(
short_flag="-d",
long_flag="--drive",
default=None,
required=False,
dest="drive",
),
],
)
def add_list_physical_volumes_subparser(subparsers):
"""Utility function to add the subparser for the physical-volumes command."""
add_subparser(
subparsers,
"list",
func=cli.physical_volumes,
arguments=[
ArgumentTuple(
"-S", "--system", "System.Embedded.1", False, "system"
),
ArgumentTuple(
short_flag="-c",
long_flag="--controller",
default=None,
required=False,
dest="controller",
),
],
)
def add_show_controller_subparser(subparsers):
"""Utility function to add the subparser for the storage-controller command"""
add_subparser(
subparsers,
"show",
func=cli.storage_controller,
arguments=[
ArgumentTuple(
"-S",
"--system",
"System.Embedded.1",
False,
"system",
help_text=None,
),
ArgumentTuple(
"-c",
"--controller",
None,
False,
"controller",
help_text=None,
),
],
)
def add_list_controllers_subparser(subparsers):
"""Utility function to add the subparser for the storage-controllers command"""
add_subparser(
subparsers,
"list",
func=cli.storage_controllers,
arguments=[
ArgumentTuple(
"-S",
"--system",
"System.Embedded.1",
False,
"system",
help_text=None,
),
],
)
def add_logical_volumes_subparser(subparsers):
"""Add the storage logical volumes subparser"""
subparser = add_authenticated_subparser(
subparsers, "logical"
).add_subparsers()
add_logical_volume_subparser(subparser)
add_list_logical_volumes_subparser(subparser)
add_create_logical_volume_subparser(subparser)
add_delete_logical_volume_subparser(subparser)
def add_physical_volumes_subparser(subparsers):
"""Add the storage physical volumes subparser"""
subparser = add_authenticated_subparser(
subparsers, "physical"
).add_subparsers()
add_physical_volume_subparser(subparser)
add_list_physical_volumes_subparser(subparser)
def add_controllers_subparser(subparsers):
"""Add the storage controller ssubparser"""
subparser = add_authenticated_subparser(
subparsers, "controllers"
).add_subparsers()
add_list_controllers_subparser(subparser)
add_show_controller_subparser(subparser)

View file

@ -0,0 +1,33 @@
"""Subparsers related to system management"""
from redfish_cli import cli
from .utils import ArgumentTuple, add_authenticated_subparser, add_subparser
def add_system_subparser(subparsers):
"""Add the chassis subparser"""
subparser = add_authenticated_subparser(
subparsers, "system"
).add_subparsers()
add_show_system_subparser(subparser)
def add_show_system_subparser(subparsers):
"""Utility function to add the subparser for the system-details command"""
add_subparser(
subparsers,
"show",
func=cli.system_details,
help_text="Get the details of a system",
arguments=[
ArgumentTuple(
short_flag="-s",
long_flag="--system",
default="System.Embedded.1",
required=False,
dest="system",
),
],
)

View file

@ -0,0 +1,160 @@
"""Utilities used by many parsers"""
import argparse
from collections import namedtuple
import os
ArgumentTuple = namedtuple(
"argument_tuple",
[
"short_flag",
"long_flag",
"default",
"required",
"dest",
"action",
"envvar",
"help_text",
"nargs",
"choices",
"type",
],
defaults=[None, None, None, None, None, None, None, None, None],
)
PositionalArgumentTuple = namedtuple(
"argument_tuple",
[
"name",
"default",
"action",
"help_text",
"nargs",
"choices",
"type",
],
defaults=[None, None, None, None, None, None, None],
)
class EnvDefault(argparse.Action):
"""Thanks to https://stackoverflow.com/questions/10551117/"""
# pylint: disable=too-few-public-methods
def __init__(self, envvar, required=True, default=None, **kwargs):
if not default and envvar:
if envvar in os.environ:
default = os.environ[envvar]
if required and default:
required = False
super().__init__(default=default, required=required, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, values)
def add_authenticated_subparser(
subparsers: argparse.ArgumentParser,
name: str,
help_text: str = "",
func: any = None,
arguments: list = None,
):
"""Add a subparser for a command that requires authentication"""
if arguments is None:
arguments = []
arguments = [
ArgumentTuple(
short_flag="-p",
long_flag="--password",
required=True,
default=os.environ.get("RF_PASSWORD"),
dest="password",
action=EnvDefault,
envvar="RF_PASSWORD",
),
ArgumentTuple(
short_flag="-u",
long_flag="--username",
required=True,
dest="username",
action=EnvDefault,
default=os.environ.get("RF_USERNAME"),
),
] + arguments
subparser = add_subparser(
subparsers, name, help_text=help_text, func=func, arguments=arguments
)
return subparser
def add_subparser(
subparsers: argparse.ArgumentParser,
name: str,
help_text: str = "",
func: any = None,
arguments: list = None,
):
"""Utility function to make adding a subparser easier."""
if arguments is None:
arguments = []
subparser = subparsers.add_parser(name, help=help_text)
for argument in arguments:
if isinstance(argument, ArgumentTuple):
if argument.action == EnvDefault:
subparser.add_argument(
argument.short_flag,
argument.long_flag,
required=argument.required,
dest=argument.dest,
default=argument.default,
help=argument.help_text,
action=argument.action,
envvar=argument.envvar,
choices=argument.choices,
type=argument.type,
)
elif argument.action:
subparser.add_argument(
argument.short_flag,
argument.long_flag,
required=argument.required,
dest=argument.dest,
default=argument.default,
help=argument.help_text,
action=argument.action,
)
else:
subparser.add_argument(
argument.short_flag,
argument.long_flag,
required=argument.required,
dest=argument.dest,
default=argument.default,
help=argument.help_text,
nargs=argument.nargs,
action=argument.action,
choices=argument.choices,
type=argument.type,
)
else:
subparser.add_argument(
argument.name,
default=argument.default,
help=argument.help_text,
nargs=argument.nargs,
action=argument.action,
choices=argument.choices,
type=argument.type,
)
subparser.set_defaults(func=func)
return subparser

View file

@ -0,0 +1,62 @@
"""Commands to do with power state"""
from redfish_cli.cli.utils import error, write_output
from redfish_cli.api import power
from redfish_cli.api.exceptions import FailedToSetPowerState
def reset_system(args):
"""Reboot a server - only tested against Dells"""
try:
result = power.system_reset(args)
write_output(result, output_format=args.format)
except FailedToSetPowerState as exc:
error(exc.msg)
def power_state(args):
"""
Entry point: get the powerstate of a server.
Looks like a duplicate of powerstate above
"""
details = power.system_power_state(args)
write_output(details, output_format=args.format)
def power_states(args):
"""Entry point: get the list of powerstates a server supports. OEM dependant"""
details = power.system_power_states_allowed(args)
write_output(details, output_format=args.format)
def set_power_state(args):
"""
Entry point: set the powerstate of a server
Valid powerstates are OEM dependant.
TODO: Better doco
"""
try:
details = power.system_set_power_state(args)
write_output(details, output_format=args.format)
except FailedToSetPowerState:
error(f"Failed to set powerstate to {args.power_state}")
def power_off(args):
"""Power off a system. Stub"""
try:
details = power.system_power_off(args)
write_output(details, output_format=args.format)
except FailedToSetPowerState as exc:
error(exc.msg)
def power_on(args):
"""Power on a system. Stub"""
try:
details = power.system_power_on(args)
write_output(details, output_format=args.format)
except FailedToSetPowerState as exc:
error(exc.msg)

View file

@ -0,0 +1,39 @@
"""Utility functions for the cli"""
import json
import logging
import sys
import tabulate
def error(message, output_format="text", exit_code=1):
"""
Write an error message to stderr, and by default exit with a return code
By default it will exit with rc 1. If rc is not truthy then sys.exit will
not be called and exectution will continue.
"""
if output_format == "json":
sys.stderr.write(json.dumps(message, indent=2))
else:
sys.stderr.write(message)
sys.stderr.write("\n")
if exit_code:
sys.exit(exit_code)
def write_output(output: str, output_format="text"):
"""Write output in the specified format - either text (default) or json"""
logging.debug("format: %s data: %s", output_format, output)
if output_format == "json":
print(json.dumps(output, indent=2))
else:
print(output)
def table(data, headers=None, tablefmt="outline"):
"""Wrap tabulate"""
return tabulate.tabulate(data, headers=headers, tablefmt=tablefmt)