First working version

This commit is contained in:
Neill Cox 2023-12-24 18:07:32 +11:00
parent f863879351
commit adca188afa
8 changed files with 960 additions and 3 deletions

5
.gitignore vendored
View file

@ -160,3 +160,8 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/ #.idea/
download.sh
credentials.sh
*sql3
dl.log

View file

@ -1,3 +0,0 @@
# patreon-dl
A patreon downloader

19
credentials.example Normal file
View file

@ -0,0 +1,19 @@
#
# The value for these environment variables can be found by logging into your
# patreon account and using developer tools to find the cookie being sent.
#
# Source a copy of this file with actual credentials and then:
# patreon-dl --log-level=INFO -d $HOME/Downloads/PDL
#
# Adjusting the log level and destination as desired.
#
# TODO: More detailed explanation
export PDL_CREATOR="czepeku"
export PDL_DEVICE_ID="<REDACTED>"
export PDL_COUNTRY_CODE="AU"
export PDL_LOCALE="en-US"
export PDL_CURRENCY="USD"
export PDL_SESSION_ID="<REDACTED>
export PDL_ANALYTICS_SESSION_ID="<REDACTED>
export PDL_CF_BM="<REDACTED>

17
download.sh.example Normal file
View file

@ -0,0 +1,17 @@
#
# The value for these environment variables can be found by logging into your
# patreon account and using developer tools to find the cookie being sent.
#
# TODO: More detailes explanation
export PDL_CREATOR="czepeku"
export PDL_DEVICE_ID="<REDACTED>"
export PDL_COUNTRY_CODE="AU"
export PDL_LOCALE="en-US"
export PDL_CURRENCY="USD"
export PDL_SESSION_ID="<REDACTED>
export PDL_ANALYTICS_SESSION_ID="<REDACTED>
export PDL_CF_BM="<REDACTED>
patreon-dl --log-level=INFO \
-d $HOME/Downloads/PDL

33
pyproject.toml Normal file
View file

@ -0,0 +1,33 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "patreon-dl"
version = "0.0.1"
authors = [
{ name="Neill Cox", email="neill@ingenious.com.au" },
]
description = "A tool for downloading content from patreon"
readme = "README.md"
requires-python = ">=3.12"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
"requests",
"tabulate",
"click",
"requests",
"bs4",
"selenium",
]
[project.urls]
"Homepage" = "https://gitlab.com/neillc/patreon-dl"
"Bug Tracker" = "https://gitlab.com/neillc/patreon-dl/issues"
[project.scripts]
patreon-dl = "patreon_dl.main:main"

7
src/explore.py Normal file
View file

@ -0,0 +1,7 @@
import os
import zipfile
for root, dirs, files in os.walk("/home/ncox/OMV_Shared/download/PDL/tomcartos/"):
for file in files:
zfile = zipfile.ZipFile(root + file)
import bpdb;bpdb.set_trace()

View file

879
src/patreon_dl/main.py Normal file
View file

@ -0,0 +1,879 @@
"""
A tool to download all the available content from a patreon campaign.
The tool keeps track of state using a sqlite3 database. It will not revisit
posts it has already seen (I plan to add a parameter to set a maximum age for
overriding this).
Using sqlite3 means you can stop and restart the download without having to
start from the beginnning.
It will not download a file if a file with the same name and size already
exists locally unless you specify --force, but that will download everything.
It will crawl every post available from a page (I plan at some point to add a
parameter to specify a single page)
It uses Firefox as its web browser (I plan to add a parameter to allow Chrome
and maybe Safari at some point)
At some point I will add a parameter to allow using headless browsers, but for
now as I test I find it reassuring to see progress.
This was heavily influenced by:
- C#
- Node
I'm not particularly fluent in either of those languages so I wrote my own in
python.
Future ideas:
- It would be good to keep track of what has been previously downloaded.
- It would be nice to not show the firefox window
"""
import datetime
import logging
import os
import re
import sqlite3
import sys
import click
import requests
# import beautifulsoup4
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.common.exceptions import (
JavascriptException,
WebDriverException,
)
file_errors = []
def add_file(conn, file, creator):
"""Add a file to the list to download."""
if not in_files(conn, file):
conn.execute(
"insert into files (href, creator, added_at) values(:href, :creator, :now)",
{
"href": file,
"creator": creator,
"now": datetime.datetime.now().timestamp(),
},
)
conn.commit()
def add_to_unvisited(conn, link, creator):
"""Add the post to the list of unvisted pages."""
conn.execute(
"insert into unvisited_links (href, creator, added_at) values(:href, :creator, :added_at)",
{
"href": link,
"creator": creator,
"added_at": datetime.datetime.now().timestamp(),
},
)
conn.commit()
def add_to_visited(conn, link, creator, error=None):
"""Ad the post to the list of visited posts"""
conn.execute(
(
"insert into visited_links (href, visited_at, error, creator) "
"values(:href, :now, :error, :creator)"),
{
"href": link,
"now": datetime.datetime.now().timestamp(),
"error": error,
"creator": creator,
},
)
conn.execute(
"delete from unvisited_links where href = :link", {"link": link}
)
conn.commit()
def already_seen(conn, post):
"""Has this post already been seen?"""
return unvisited(conn, post) or visited(conn, post)
def cookie_dict(config):
"""Create a dict of cookies from the parameters passed on the command line"""
cookies = {
"patreon_device_id": config["device_id"],
"patreon_location_country_code": config["country_code"],
"patreon_locale_code": config["locale"],
"patreon_currency_pref": config["currency"],
"session_id": config["session_id"],
"analytics_session_id": config["analytics_session_id"],
"__cf_bm": config["cf_bm"],
}
return cookies
def count_files(conn, creator):
"""Count all the files we have found for the creator"""
return conn.execute(
"select count(*) from files where creator = :creator",
{"creator": creator},
).fetchone()[0]
def count_files_downloaded(conn, creator):
"""Get the count of the number of files already downloaded for this creator"""
return conn.execute(
"select count(*) from files where creator = :creator and downloaded_at is not null",
{"creator": creator},
).fetchone()[0] ## or downloaded_at < max_age")
def count_posts_to_visit(conn, creator):
"""Get the count of unvisited posts"""
return conn.execute(
"select count(*) from unvisited_links where creator = :creator", {"creator":creator}
).fetchone()[0]
def download_creator(config: dict):
"""Use selenium to download."""
# cookies = cookie_dict(config)
conn = initialise_database(config["db_path"])
config["conn"] = conn
creator = config["creator"]
remove_creator_home(conn, creator)
web_browser = webdriver.Firefox
options = webdriver.FirefoxOptions()
options.add_argument("-headless")
with web_browser(options=options) as driver:
config["driver"] = driver
login_to_patreon(config, driver)
logging.info(
"Downloading files for campiagn (%s - %s)",
config["campaign_id"],
config["campaign_name"],
)
#if config["resume"]:
#url = get_next_unvisited(config)
#else:
#url = f"https://www.patreon.com/{creator}/posts"
url = f"https://www.patreon.com/{creator}/posts"
find_posts(config, url)
# Make sure no files have been missed.
download_files(config)
def download_file(link, cookies, download_destination, force_download):
"""Download a file using the requests library"""
logging.debug("downloading %s", link)
# return
with requests.get(
link, stream=True, cookies=cookies, timeout=120
) as result:
if not result.ok:
if result.status_code == 404:
logging.warning("File %s not found", link)
file_error(result.status_code, link)
return link
if result.status_code == 403:
logging.warning(
"Permission denied when downloading %s", link
)
file_error(result.status_code, link)
return link
result.raise_for_status()
local_filename = get_filename_from_header(result.headers)
stream = True
try:
size = int(result.headers["Content-Length"])
except KeyError:
# Nt content-length, we can't stream this object. Just download it directly.
size = len(result.content)
stream = False
logging.debug(
"No content-lentg set. Will download directly"
)
filename = download_destination + "/" + local_filename
if not force_download:
try:
stats = os.stat(filename)
if stats.st_size == size:
logging.debug(
f"A file with the same name ({local_filename}) and "
f"size ({size}) has already been downloaded. Skipping"
)
return local_filename
except FileNotFoundError:
pass
if stream:
with open(
download_destination + "/" + local_filename,
"wb",
) as f:
for chunk in result.iter_content(chunk_size=8192):
# If you have chunk encoded response uncomment if
# and set chunk_size parameter to None.
# if chunk:
f.write(chunk)
else:
with open(
download_destination + "/" + local_filename,
"wb",
) as f:
f.write(result.content)
logging.info("file %s downloaded", local_filename)
return local_filename
def download_files(config):
"""Download all the files that have not yet been downloaded"""
conn = config["conn"]
creator = config["creator"]
cookies = cookie_dict(config)
download_destination = config["download_destination"]
force = config["force"]
result = conn.execute(
"select * from files where downloaded_at is null and creator = :creator",
{"creator": creator},
)
row = result.fetchone()
while row:
link = row[0]
try:
file_name = download_file(link, cookies, download_destination, force)
if not file_name:
file_name = link
downloaded(conn, link)
logging.info(
"Downloaded %s %d to go",
file_name,
count_files_to_download(conn, creator),
)
except (
requests.exceptions.ConnectionError,
requests.exceptions.ChunkedEncodingError,
ValueError
) as e:
logging.error("Could not download %s", link)
#logging.error(e.msg)
row = result.fetchone()
def downloaded(conn, file):
"""Updae the database to show the file has been downloaded"""
cursor = conn.cursor()
cursor.execute(
"update files set downloaded_at = :now where href = :href",
{"now": datetime.datetime.now().timestamp(), "href": file},
)
conn.commit()
def file_error(status_code, link):
"""Dead code?"""
global file_errors
file_errors.append([status_code, link])
def find_posts(config, start_link):
"""The non-recursive way to walk the tree. Soon to be the only way used"""
conn = config["conn"]
creator = config["creator"]
#import bpdb;bpdb.set_trace()
if not_visited(conn, start_link) and not in_unvisited(
conn, start_link
):
add_to_unvisited(conn, start_link, creator)
while unvisited_count(conn):
link = get_next_unvisited(conn)
visit_link(link, config)
logging.info(
"Visited %d links, %d files to download",
visited_links(conn, creator),
count_files_to_download(conn, creator),
)
def count_files_to_download(conn, creator):
"""Get the count of files yet to be downloaded"""
return conn.execute(
"select count(*) from files where creator = :creator and downloaded_at is null",
{"creator": creator},
).fetchone()[0] ## or downloaded_at < max_age")
def get_campaign_details(driver):
"""Get the name and id of the campaign"""
campaign_data = driver.execute_script(
"return window.patreon.bootstrap.campaign.data"
)
campaign_id = campaign_data["id"]
try:
campaign_name = campaign_data["attributes"]["name"]
except KeyError:
campaign_name = None
return campaign_id, campaign_name
def get_filename_from_header(headers):
"""
Get the filename from the headers.
This is quite a nasty hack, but stackoverflow didn't have anything better
"""
content_disposition = headers["Content-Disposition"]
# e.g.: attachment; filename="Czepeku Map Contest #1.zip";
# filename*=utf-8\'\'Czepeku%20Map%20Contest%20%231.zip'
# Split on semicolons
filename = None
fields = content_disposition.split(";")
for field in fields:
field = field.strip()
# Find the right field
if field.startswith("filename="):
filename = field.split("=")[1]
# Strip surrounding quotes
filename = filename[1:-1]
break
if filename is None:
raise ValueError("Could not find filename")
return filename
def get_next_unvisited(conn):
"""Get the next unvisited post"""
return conn.execute(
"select href from unvisited_links limit 1"
).fetchone()[0]
def get_page(driver, url):
"""
Use selenium to get the page.
Unfortunately patreon uses javascript to populate the page so we have to
do a little dance to make sure all the content has been loaded.
This has some significant performance implications, so it would be nice
to figure out a better way than repeatedly polling until the page stops
changing in size.
"""
delta = 0
last_size = 0
same_count = 0
n = 0
driver.get(url)
while 1:
text = driver.page_source
size = len(text)
delta = size - last_size
last_size = size
n += 1
if delta == 0:
same_count += 1
else:
same_count = 0
if same_count >= 100:
return text
# Change to be time based
if n > 1000:
raise RuntimeError("timeout")
logging.debug(
"Waiting for page to stabilise delta: %d same_count: %d n: %d",
delta,
same_count,
n,
)
return text
def in_files(conn, href):
"""Check to see of the specified file is already in the table."""
return (
conn.execute(
"select count(*) from files where href = :href",
{"href": href},
).fetchone()[0]
> 0
)
def in_unvisited(conn, link):
"""
Check to see if the specified link is in the list of unvisited links
already
"""
return (
conn.execute(
"select count(*) from unvisited_links where href = :link",
{"link": link},
).fetchone()[0]
!= 0
)
def initialise_database(db_path):
"""Initialise the database tables if needed"""
conn = sqlite3.connect(db_path, autocommit=False)
conn.execute(
"""create table if not exists visited_links(
href char(400) not null primary key,
creator text not null,
visited_at datetime not null,
error text
)
"""
)
conn.execute(
"""create table if not exists unvisited_links(
href char(400) not null primary key,
creator char(100) not null ,
added_at datetime not null)
"""
)
conn.execute(
"""create table if not exists files(
href char(400) primary key not null,
creator char not null,
file_name text,
file_size integer,
added_at datetime not null,
downloaded_at datetime,
file_type text)
"""
)
return conn
def login_to_patreon(config, driver):
"""Set the cookies so we are logged in. Need to load a page first."""
cookies = cookie_dict(config)
url = f"https://www.patreon.com/{config['creator']}/posts"
logging.debug("loading login page")
driver.get(url)
campaign_data = driver.execute_script(
"return window.patreon.bootstrap.campaign.data"
)
config["campaign_id"] = campaign_data["id"]
config["campaign_name"] = campaign_data["attributes"]["name"]
for key, value in cookies.items():
driver.add_cookie({"name": key, "value": value})
logging.debug("Logged in")
def not_visited(conn, link):
"""Check to see if a link has not been visited"""
return (
conn.execute(
"select count(*) from visited_links where href = :link",
{"link": link},
).fetchone()[0]
== 0
)
def remove_creator_home(conn, creator):
creator = f"https://www.patreon.com/{creator}/"
pages = ["posts", "collections"]
for page in pages:
link = creator + page
conn.execute("delete from visited_links where href = :link", {"link":link})
conn.commit()
def seen_posts(conn, creator):
"""Count the number of seen posts, both visited and unvisited"""
count_visited = conn.execute(
"select count(*) from visited_links where creator = :creator", {"creator":creator}
).fetchone()[0]
count_unvisited = conn.execute(
"select count(*) from unvisited_links where creator = :creator", {"creator":creator}
).fetchone()[0]
return count_visited + count_unvisited
def unvisited(conn, link):
"""Check whether the specified link is unvisited"""
return (
conn.execute(
"select count(*) from unvisited_links where href = :link",
{"link": link},
).fetchone()[0]
!= 0
)
def unvisited_count(conn):
"""Count the number of unvisited links"""
result = conn.execute("select count(*) from unvisited_links")
return result.fetchone()[0]
def visit_link(href, config):
""" Visit a link"""
# pylint:disable=too-many-locals
logging.info("Processing %s", href)
driver = config["driver"]
campaign_id = config["campaign_id"]
creator = config["creator"]
conn = config["conn"]
if (
config["max_files"]
and count_files_to_download(conn, creator) > config["max_files"]
):
return
try:
text = get_page(driver, href)
except WebDriverException:
error = (
f"WebDriverException occurres while trying to load {href}"
)
logging.error(error)
add_to_visited(conn, href, creator, error=error)
return
try:
page_campaign_id, _ = get_campaign_details(driver)
except JavascriptException:
error = (
"Javascript exception when trying to get page details - "
"error page?"
)
logging.warning(error)
add_to_visited(conn, href, creator, error=error)
return
if page_campaign_id != campaign_id:
error = f"This url {href} is not a post belonging to the specified creator"
logging.info(error)
add_to_visited(conn, href, page_campaign_id, error=error)
return
bs = BeautifulSoup(text, "html.parser")
links = bs.find_all("a")
_posts = [a for a in links if "href" in a.attrs and "/posts" in a.attrs["href"]]
_collections = [a for a in links if "href" in a.attrs and "/collection" in a.attrs["href"]]
_posts += _collections
_files = [
a
for a in links
if "href" in a.attrs and a.attrs["href"].startswith("https://www.patreon.com/file")
]
for file in _files:
logging.debug("adding file %s", file.attrs["href"])
add_file(conn, file.attrs["href"], creator)
logging.info(
"Found %d files so far, %d downloaded",
count_files(conn, creator),
count_files_downloaded(conn, creator),
)
for post in _posts:
link = post.attrs["href"]
if not link.startswith("https://"):
link = "https://www.patreon.com" + link
if not link.startswith("https://www.patreon.com") or link.startswith("https:///patreon.com"):
logging.info("%s isn't a patreon link 0 skipping", link)
# Lets not foillow links to other creators
match = re.match("https://www.patreon.com/(.+)/posts[/?]", link)
if match:
if match.group(1) != config["creator"]:
logging.info(
"This url (%s) looks like a link to a different creator "
"so skipping",
link,
)
continue
if already_seen(conn, link):
logging.debug("Already seen %s - skipping", link)
continue
add_to_unvisited(conn, link, creator)
download_files(config)
add_to_visited(conn, href, creator)
logging.info(
"Seen %d posts %d still to visit",
seen_posts(conn, creator),
count_posts_to_visit(conn, creator),
)
def visited(conn, link):
"""Check whether a particular link has been visited"""
return (
conn.execute(
"select count(*) from visited_links where href = :link",
{"link": link},
).fetchone()[0]
!= 0
)
def visited_links(conn, creator):
""" Get the counf of visited links"""
return conn.execute(
"select count(*) from visited_links where creator = :creator", {"creator": creator}
).fetchone()[0]
def walk_tree(
driver, config, campaign_id, href, cookies, seen_posts, files, depth
):
"""Walk the tree of all posts for the campaign"""
# pylint:disable=too-many-arguments,too-many-locals
logging.info("Processing %s", href)
logging.info("Depth %d", depth)
depth += 1
files_found = len(list(files.keys()))
if config["max_files"] and files_found > config["max_files"]:
return
logging.info("Found %d files so far...", files_found)
try:
text = get_page(driver, href)
except WebDriverException:
logging.error(
"WebDriverException occurred while trying to load %s", href
)
return
try:
page_campaign_id, _ = get_campaign_details(driver)
except JavascriptException:
logging.warning(
"Javascript exception when trying to get page details - "
"error page?"
)
return
if page_campaign_id != campaign_id:
logging.info(
"This url (%s) is not a post belonging to the specified creator",
href,
)
return
bs = BeautifulSoup(text, "html.parser")
links = bs.find_all("a")
del bs
_posts = [a for a in links if "/posts" in a.attrs["href"]]
_files = [
a
for a in links
if a.attrs["href"].startswith("https://www.patreon.com/file")
]
for file in _files:
logging.debug("adding file %s", file.attrs["href"])
files[file.attrs["href"]] = file
logging.info("Found %d files so far", len(files))
for post in _posts:
href = post.attrs["href"]
if not href.startswith("https://www.patreon.com"):
href = "https://www.patreon.com" + href
# Lets not foillow links to other creators
match = re.match("https://www.patreon.com/(.+)/posts[/?]", href)
if match:
if match.group(1) != config["creator"]:
logging.info(
"This url (%s) looks like a link to a different creator "
"so skipping",
href,
)
continue
if href in seen_posts:
logging.debug("Already seen %s - skipping", href)
continue
seen_posts[href] = post
logging.info("Seen %d posts", len(seen_posts))
keys = seen_posts.keys()
logging.debug(keys)
walk_tree(
driver,
config,
campaign_id,
href,
cookies,
seen_posts,
files,
depth,
)
@click.command()
@click.option(
"--creator",
required=True,
help="Text id of creator.",
envvar="PDL_CREATOR",
)
@click.option(
"--device-id",
default="491a6a67-1893-4474-9aad-a0dc82977cf3",
envvar="PDL_DEVICE_ID",
)
@click.option(
"--country-code", required=True, envvar="PDL_COUNTRY_CODE"
)
@click.option("--locale", required=True, envvar="PDL_LOCALE")
@click.option("--currency", required=True, envvar="PDL_CURRENCY")
@click.option("--session-id", required=True, envvar="PDL_SESSION_ID")
@click.option(
"--analytics-session-id",
required=True,
envvar="PDL_ANALYTICS_SESSION_ID",
)
@click.option("--cf-bm", envvar="PDL_CF_BM", required=True)
@click.option("--log-level", default="WARNING")
@click.option("-d", "--download-destination", required=True)
@click.option("--dry-run", is_flag=True)
@click.option("--force", is_flag=True)
@click.option("--max-files", default="0")
@click.option("--file")
@click.option("--db-path", default="./pdl_downloader.sql3")
def main(
creator,
device_id,
country_code,
locale,
currency,
session_id,
analytics_session_id,
cf_bm,
log_level,
dry_run,
download_destination,
force,
max_files,
file,
db_path,
):
"""Main function"""
# pylint:disable=too-many-arguments,too-many-locals
start_time = datetime.datetime.now()
if not os.path.isdir(download_destination):
raise ValueError("Download path does not exist")
if not os.access(download_destination, os.W_OK):
raise ValueError("Cannot write to download path")
download_destination = download_destination + "/" + creator
if not os.path.exists(download_destination):
os.makedirs(download_destination)
config = {
"creator": creator,
"device_id": device_id,
"country_code": country_code,
"locale": locale,
"currency": currency,
"session_id": session_id,
"analytics_session_id": analytics_session_id,
"cf_bm": cf_bm,
"download_destination": download_destination,
"dry_run": dry_run,
"force": force,
"max_files": int(max_files),
"db_path": db_path,
}
logging.basicConfig(
level=log_level,
format="%(levelname)s:%(asctime)s:%(funcName)s:%(message)s",
)
logging.info("Download started at %s", start_time)
if file:
download_file(
file, cookie_dict(config), download_destination, force
)
else:
download_creator(config)
end_time = datetime.datetime.now()
elapsed = end_time - start_time
logging.info(
"Download finished at %s, elapsed time was %s",
end_time,
elapsed,
)