Initial Commit

This commit is contained in:
Neill Cox 2024-08-10 14:13:07 +10:00
commit 05c2e5e81f
17 changed files with 1197 additions and 0 deletions

3
ah_tools/__init__.py Normal file
View file

@ -0,0 +1,3 @@
__version__ = "0.0.1"
from .core import check_backups, decrypt_cli, encrypt_cli

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

85
ah_tools/check_backups.py Normal file
View file

@ -0,0 +1,85 @@
import argparse
import json
import re
import requests
from ah_tools.helpers import decrypt
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument(
"-s",
"--servers",
type=argparse.FileType("r"),
required=True,
help="path containing the encrypted list of servers"
)
parser.add_argument(
"-p", "--password",
type=str,
required=True,
help="password to decrypt the servers file"
)
parser.add_argument(
"-S", "--salt",
type=str,
required=True,
help="salt for decrypting the servers file"
)
parser.add_argument(
"-d", "--date",
type=str,
required=True,
help="start date for listing backups YYYY-mm-dd"
)
parser.add_argument(
"-f", "--format",
type=str, choices=["text", "json"],
default="text",
help="Output format. Text of json"
)
args = parser.parse_args()
return args
def get_logs(server, port, date, user, password):
url = (
f"https://{server}:{port}/virtual-server/remote.cgi?program=list-backup-logs&"
f"multiline&start={date}&multiline"
)
result = requests.get(url, auth=requests.auth.HTTPBasicAuth(user, password))
backup_details = {}
backup_list = []
for line in result.text.split("\n"):
if re.match("^[0-9]+-[0-9]+-1:$", line):
if backup_details:
backup_list.append(backup_details)
backup_details = {}
else:
if ":" in line:
line = line.strip()
key, value = line.split(":", maxsplit=1)
backup_details[key] = value
for b in backup_list:
print(b["Ended"], b["Final status"])
def main():
args = parse_arguments()
data = args.servers.read()
servers = json.loads(decrypt(args.salt, args.password, data))
for server in servers:
print(server["name"])
get_logs(server["hostname"], 10000, args.date, server["user"], server["password"])
if __name__ == "__main__":
main()

53
ah_tools/core.py Normal file
View file

@ -0,0 +1,53 @@
import argparse
import sys
from .check_backups import main as check_backups
from .helpers import decrypt, encrypt
def dummy():
check_backups()
def encrypt_cli():
parser = argparse.ArgumentParser()
parser.add_argument(
"-p", "--password",
type=str,
required=True,
help="password to decrypt the servers file"
)
parser.add_argument(
"-S", "--salt",
type=str,
required=True,
help="salt for decrypting the servers file"
)
args = parser.parse_args()
data = sys.stdin.read()
data = encrypt(args.salt, args.password, data)
print(data)
def decrypt_cli():
parser = argparse.ArgumentParser()
parser.add_argument(
"-p", "--password",
type=str,
required=True,
help="password to decrypt the servers file"
)
parser.add_argument(
"-S", "--salt",
type=str,
required=True,
help="salt for decrypting the servers file"
)
args = parser.parse_args()
data = sys.stdin.read()
data = decrypt(args.salt, args.password, data)
print(data)

125
ah_tools/gcloud_dns.py Normal file
View file

@ -0,0 +1,125 @@
import argparse
import json
import subprocess
import requests
def get(uri, token):
result = requests.get(uri, headers={"Authorization": f"Bearer {token}"})
result.raise_for_status()
return result
def print_rrdata(zone, rrdata, filter={}):
rrtype = rrdata["type"]
data = rrdata["rrdatas"]
rrname = rrdata["name"]
# print(filter)
if rrtype in [ 'A', "AAAA"]:
for d in data:
if not filter or ('name' in filter and filter['name'] in rrname) or ("ip" in filter and d == filter["ip"]):
return f"{zone} {rrtype} {rrname} {d}"
elif rrtype == 'CNAME':
for d in data:
if not filter or ("name" in filter and filter["name"] in d):
return(zone, rrtype, d)
elif rrtype == 'MX':
for d in data:
if not filter or ("name" in filter and filter["name"] in d):
return(zone, rrtype, d)
elif rrtype == 'NS':
for d in data:
if not filter or ("name" in filter and filter["name"] in d):
return(zone, rrtype, d)
elif rrtype == 'SOA':
for d in data:
if not filter or ("name" in filter and filter["name"] in d):
return(zone, rrtype, d)
elif rrtype in ["SRV", "TXT"]:
for d in data:
if not filter or ('name' in filter and filter['name'] in d) or ("ip" in filter and filter["ip"] in d):
return(zone, rrtype, rrname, d)
else:
print(f"unknown type({rrtype})")
print(rrdata)
raise TypeError
def print_rrset(zone, rrset, filter={}):
for rrdata in rrset['rrdatas']:
print_rrdata(zone, rrdata, filter=filter)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--project", help="Google Cloud Project ID", default="ace-hosting-1253")
parser.add_argument("--zone", help="Google Cloud Zone ID")
parser.add_argument("--token", help="Google Cloud API Token")
parser.add_argument("--find-name", help="Look for this name")
parser.add_argument("--find-ip", help="Look for this IP address")
return parser.parse_args()
def get_token(args):
if args.token:
return args.token
return subprocess.run(
"gcloud auth print-access-token",
shell=True,
capture_output=True,
universal_newlines=True
).stdout[:-1]
def get_filter(args):
filter = {}
if args.find_name:
filter["name"] = args.find_name
if args.find_ip:
filter["ip"] = args.find_ip
return filter
def main():
args = parse_args()
project_dns_uri = f"https://dns.googleapis.com/dns/v1/projects/{args.project}"
token = get_token(args)
filter = get_filter(args)
r = get(f"{project_dns_uri}/managedZones?maxResults=1000", token)
zones = r.json()['managedZones']
found = []
for zone in zones:
print(f"Checking domain {zone['name']}...")
rrsets = get(f"{project_dns_uri}/managedZones/{zone['id']}/rrsets", token).json()['rrsets']
matches = 0
for rrset in rrsets:
result = print_rrdata(
zone["name"],
rrset,
filter=filter
)
if result:
matches += 1
found.append(result)
if matches:
print(f" found {matches} matches...")
for f in found:
print(f)
if __name__ == "__main__":
main()

22
ah_tools/helpers.py Normal file
View file

@ -0,0 +1,22 @@
import base64
import sys
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
def decrypt(salt, password, data):
kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt.encode("ascii"), iterations=480000)
key = base64.urlsafe_b64encode(kdf.derive(password.encode('ascii')))
f = Fernet(key)
return f.decrypt(data.encode("ascii")).decode("ascii")
def encrypt(salt, password, data):
kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt.encode("ascii"), iterations=480000)
key = base64.urlsafe_b64encode(kdf.derive(password.encode('ascii')))
f = Fernet(key)
return f.encrypt(data.encode("ascii")).decode("ascii")

63
ah_tools/listvs.py Normal file
View file

@ -0,0 +1,63 @@
from pprint import pprint as pp
import subprocess
import requests
# cmd = "ssh ah-platypus2 ls -l"
# result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
#
# print(result)
#
# auth = requests.auth.HTTPBasicAuth('root', '}~7R%t]Ru3qW')
# url = "https://platypus-2.ace-hosting.com.au:10000/virtual-server/remote.cgi?program=list-domains&multiline&json=1"
#
# r = requests.get(url, auth=auth)
#
# print(r.text)
read_api_test_token="dop_v1_c34e7364effc04a6cc031f28722e479d64867ef68ad1925e38cdf0982dce0117"
r = requests.get('https://api.digitalocean.com/v2/droplets', headers={'Authorization': f'Bearer {read_api_test_token}'})
servers = r.json()
# pp(servers)
for s in servers['droplets']:
print(s['name'])
linode_token = "0eb5b7c46ae094dbf2a70b12024b6a5160e85ee841eb0bffc4262740b45d0aed"
url = "https://api.linode.com/v4/linode/instances?page=1&page_size=100"
headers = {"accept": "application/json", "Authorization": f"Bearer {linode_token}"}
response = requests.get(url, headers=headers)
servers = response.json()
for s in servers['data']:
print(s['label'])
bl_token ="8ozeaEDL96p44WptTPXZNknTfrpDmntYzLfh8gU1zFNYKBLRHnc537qGH4wvicDb"
api_uri = "https://api.binarylane.com.au/v2/servers"
headers = {"accept": "application/json", "Authorization": f"Bearer {bl_token}"}
r = requests.get(api_uri, headers=headers)
servers = r.json()
for s in servers['servers']:
print(s['name'])
"""
Servers
Digital Ocean
Linode
Rimu Hosting N/A
BinaryLane
S3 Buckets
AMAZON
BackBlaze
Digital Ocean
"""

87
ah_tools/main.py Normal file
View file

@ -0,0 +1,87 @@
# Script to check the details of an ah-hosted domain.
# Check NS, www adress, domain A records, MX records
import argparse
import dns.resolver
AH_SERVERS = {
"159.203.100.138":"platypus-2",
"128.199.219.144":"wp-01.ace-hosting.com.au",
"172.104.183.16:":"kultarra.ace-hosting.com.au/wp-02.ace-hosting.com.au",
"198.199.121.45":"wp-03.ace-hosting.com.au",
"112.213.34.61":"wp-04.ace-hosting.com.au",
"170.64.172.115":"wp-05.ace-hosting.com.au",
"43.229.63.127":"wp-06.ace-hosting.com.au",
"172.104.61.71": "makinmattresses.com.au",
}
def get_ah_server_name(ip):
try:
name = AH_SERVERS[ip]
except KeyError:
name = "is not an Ace Server"
return name
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--domain", type=str, required=True, help="The domain to check")
parser.add_argument("-f", "--output-format", type=str, choices=["text", "json"], default="text", help="Output format. Text of json")
args = parser.parse_args()
return args
def lookup_a_record_for_mx(mx):
answer = dns.resolver.resolve(mx, "A")[0]
return get_ah_server_name(answer.address)
def main():
# Use a breakpoint in the code line below to debug your script.
args = parse_arguments()
try:
answers = dns.resolver.resolve(args.domain, "NS")
for rdata in answers:
ip = rdata.to_text()
print(f"NS record for {args.domain} = {ip}")
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
print(f"NS record for {args.domain} Not found")
try:
answers = dns.resolver.resolve(args.domain, "A")
for rdata in answers:
ip = rdata.to_text()
print(f"A record for {args.domain} = {ip}({get_ah_server_name(ip)})")
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
print(f"A record for {args.domain} Not found")
try:
answers = dns.resolver.resolve("www." + args.domain, "A")
for rdata in answers:
for rdata in answers:
ip = rdata.to_text()
print(f"A record for {'www.' + args.domain} = {ip}({get_ah_server_name(ip)})")
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
print(f"A record for {'www.' + args.domain} not found")
try:
answers = dns.resolver.resolve(args.domain, "MX")
for rdata in answers:
host = rdata.exchange
# preference = rdata.preference
if "google" in str(host):
print ("MX: google")
elif "outlook" in str(host):
print ("MX: outlook")
else:
print(f"MX: {host}({lookup_a_record_for_mx(host)})")
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
print(f"MX record for {args.domain} not found")
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
main()

12
ah_tools/parse-vm-dns.py Normal file
View file

@ -0,0 +1,12 @@
"""
ingenious-software.net.au. A 112.213.34.61
www.ingenious-software.net.au. A 112.213.34.61
ftp.ingenious-software.net.au. A 112.213.34.61
localhost.ingenious-software.net.au. A 127.0.0.1
webmail.ingenious-software.net.au. A 112.213.34.61
admin.ingenious-software.net.au. A 112.213.34.61
mail.ingenious-software.net.au. A 112.213.34.61
ingenious-software.net.au. MX 5 mail.ingenious-software.net.au.
ingenious-software.net.au. TXT "v=spf1 a mx a:ingenious-software.net.au ip4:112.213.34.61 ip4:112.213.34.61 ?all"
202406._domainkey.ingenious-software.net.au. TXT ( "v=DKIM1; k=rsa; t=s; p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA6K+2yOktsdXKC" "hcQPEupMJuFkoarcraRlSf7MEPbC4NITgzRPNL4fY4yrTZ4JuCXmHgc12ueySwfsWbHErI8/rilCdobo" "bhjV/dObTHXXuFMt3gst+QuV2nQAsUb8BMXIpqWSBNsLXU/6MsT22OXn3mRxVZDyUqg7WTMN6EfsE7aU" "blOgomx521Bt2fLzKIcVXiLW01qE1co6+VCRi+6OGwK3fO+7DKdW/lmuSIc9jwj3qKRa9SMJt6GC8C3m" "Gh5GvSxJwVGaogS0rI53qnchkHUQRdf9mikG0XKfv5dJetMmE8pR5GTzXhUIt3Uy+dpKjt0iFQZjwfIW" "7J2P5EZ9QIDAQAB" )
"""

29
ah_tools/vm-remote-ssh.py Normal file
View file

@ -0,0 +1,29 @@
import subprocess
def find_server(server):
servers = [
{"name":"wp", "hostname":"wp-04.ace-hosting.com", "user":"root", "port":22, "identity":""},
]
return servers[server]
def virtualmin_ssh(server, command):
if server["user"] != "root":
cmd = "sudo " + cmd
cmd = f"ssh -p {port} {user}@{hostname} {cmd}"
result = subprocess.check_output(cmd, shell=True, universal_newlines=True)
return result
def get_dns(server, domain):
pass
def main():
server = find_server("wp-04")
if __name__ == "__main__":
main()