Support online media download

pull/2909/head
Blondel Mondésir 8 months ago
parent cfd717e7e7
commit dd99701117

@ -21,11 +21,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import smartcrop
from datetime import datetime
import json
from PIL import Image
from shutil import copyfile
import sqlite3
import subprocess
from shutil import copyfile, move
from uuid import uuid4
from markupsafe import escape # dependency of flask
from functools import wraps
@ -36,7 +36,7 @@ try:
except ImportError:
clean_html = None
from flask import Blueprint, request, flash, redirect, url_for, abort, Markup, Response
from flask import Blueprint, request, flash, redirect, url_for, abort, Markup, Response, jsonify
from flask_babel import gettext as _
from flask_babel import lazy_gettext as N_
from flask_babel import get_locale
@ -342,121 +342,140 @@ def youtube():
log.error(error_message)
return False, error_message
def process_youtube_download(youtube_url, video_quality):
# def process_youtube_download(youtube_url, video_quality):
def process_youtube_download(youtube_url):
yb_executable = get_yb_executable()
if youtube_url:
youtube_id = extract_youtube_url(youtube_url)
# youtube_id = extract_youtube_url(youtube_url)
# download_args = [
# yb_executable,
# youtube_id,
# "/output",
# video_quality,
# ]
download_args = [
yb_executable,
youtube_id,
"/output",
video_quality,
youtube_url,
]
subprocess_result = run_subprocess(download_args)
log.info("Subprocess result: {}".format(subprocess_result))
if subprocess_result[0]:
log.info("Renaming files in /output/{}/videos".format(youtube_id))
# make a list of requested files with names of directories found in /output/<youtube_id>/videos
requested_files = os.listdir("/output/{}/videos".format(youtube_id))
# remove "youtube-nsig" from the list of requested files
requested_files.remove("youtube-nsig")
# log.info("Renaming files in /output/{}/videos".format(youtube_id))
# # make a list of requested files with names of directories found in /output/<youtube_id>/videos
# requested_files = os.listdir("/output/{}/videos".format(youtube_id))
# # remove "youtube-nsig" from the list of requested files
# requested_files.remove("youtube-nsig")
# make a list of requested files with paths found in media table reading /var/tmp/download.db
# select each "path" from media table to get a list of requested files
# each "path" is an absolute path to a file in /var/tmp/Youtube/<author>/<title>/<file>
requested_files = []
download_db_path = "/var/tmp/download.db"
conn = sqlite3.connect(download_db_path)
c = conn.cursor()
c.execute("SELECT path FROM media")
for row in c.fetchall():
requested_files.append(row[0])
conn.close()
log.info("Requested files: {}".format(requested_files))
renamed_files = rename_files(requested_files, youtube_id)
if renamed_files:
for requested_file in renamed_files:
requested_file = open(requested_file, "rb")
requested_file.filename = os.path.basename(requested_file.name)
requested_file.save = lambda path: copyfile(requested_file.name, path)
log.info("Processing file: {}".format(requested_file))
try:
modify_date = False
calibre_db.update_title_sort(config)
calibre_db.session.connection().connection.connection.create_function(
"uuid4", 0, lambda: str(uuid4())
)
meta, error = file_handling_on_upload(requested_file)
if error:
return error
(
db_book,
input_authors,
title_dir,
renamed_authors,
) = create_book_on_upload(modify_date, meta)
# Comments need book id therefore only possible after flush
modify_date |= edit_book_comments(
Markup(meta.description).unescape(), db_book
)
book_id = db_book.id
title = db_book.title
error = helper.update_dir_structure(
book_id,
config.config_calibre_dir,
input_authors[0],
meta.file_path,
title_dir + meta.extension.lower(),
renamed_author=renamed_authors,
)
move_coverfile(meta, db_book)
if modify_date:
calibre_db.set_metadata_dirty(book_id)
# save data to database, reread data
calibre_db.session.commit()
if error:
flash(error, category="error")
link = '<a href="{}">{}</a>'.format(
url_for("web.show_book", book_id=book_id), escape(title)
)
upload_text = N_("File %(file)s uploaded", file=link)
WorkerThread.add(
current_user.name, TaskUpload(upload_text, escape(title))
)
helper.add_book_to_thumbnail_cache(book_id)
if len(renamed_files) < 2:
if current_user.role_edit() or current_user.role_admin():
resp = {
"location": url_for(
"edit-book.show_edit_book", book_id=book_id
)
}
return Response(json.dumps(resp), mimetype="application/json")
else:
resp = {"location": url_for("web.show_book", book_id=book_id)}
return Response(json.dumps(resp), mimetype="application/json")
except (OperationalError, IntegrityError, StaleDataError) as e:
calibre_db.session.rollback()
log.error_or_exception("Database error: {}".format(e))
flash(
_(
"Oops! Database Error: %(error)s.",
error=e.orig if hasattr(e, "orig") else e,
),
category="error",
)
# renamed_files = rename_files(requested_files, youtube_id)
# if renamed_files:
for requested_file in requested_files:
requested_file = open(requested_file, "rb")
requested_file.filename = os.path.basename(requested_file.name)
requested_file.save = lambda path: copyfile(requested_file.name, path)
log.info("Processing file: {}".format(requested_file))
try:
modify_date = False
calibre_db.update_title_sort(config)
calibre_db.session.connection().connection.connection.create_function(
"uuid4", 0, lambda: str(uuid4())
)
meta, error = file_handling_on_upload(requested_file)
if error:
return error
(
db_book,
input_authors,
title_dir,
renamed_authors,
) = create_book_on_upload(modify_date, meta)
# Comments need book id therefore only possible after flush
modify_date |= edit_book_comments(
Markup(meta.description).unescape(), db_book
)
book_id = db_book.id
title = db_book.title
error = helper.update_dir_structure(
book_id,
config.config_calibre_dir,
input_authors[0],
meta.file_path,
title_dir + meta.extension.lower(),
renamed_author=renamed_authors,
)
move_coverfile(meta, db_book)
if modify_date:
calibre_db.set_metadata_dirty(book_id)
# save data to database, reread data
calibre_db.session.commit()
if error:
flash(error, category="error")
link = '<a href="{}">{}</a>'.format(
url_for("web.show_book", book_id=book_id), escape(title)
)
upload_text = N_("File %(file)s uploaded", file=link)
WorkerThread.add(
current_user.name, TaskUpload(upload_text, escape(title))
)
helper.add_book_to_thumbnail_cache(book_id)
if len(requested_files) < 2:
if current_user.role_edit() or current_user.role_admin():
resp = {
"location": url_for(
"edit-book.show_edit_book", book_id=book_id
)
}
return Response(json.dumps(resp), mimetype="application/json")
else:
resp = {"location": url_for("web.show_book", book_id=book_id)}
return Response(json.dumps(resp), mimetype="application/json")
except (OperationalError, IntegrityError, StaleDataError) as e:
calibre_db.session.rollback()
log.error_or_exception("Database error: {}".format(e))
flash(
_(
"Oops! Database Error: %(error)s.",
error=e.orig if hasattr(e, "orig") else e,
),
category="error",
)
else:
flash("Error: 'poetry' executable not found in PATH", category="error")
return False
if request.method == "POST" and "youtubeURL" in request.form:
youtube_url = request.form["youtubeURL"]
video_quality = request.form.get("videoQuality", "720")
# video_quality = request.form.get("videoQuality", "720")
if process_youtube_download(youtube_url, video_quality):
# if process_youtube_download(youtube_url, video_quality):
if process_youtube_download(youtube_url):
response = {
"success": "Downloaded YouTube media successfully",
}
@ -467,66 +486,66 @@ def youtube():
}
return jsonify(response), 500
def extract_youtube_url(url):
try:
if "youtube.com" in url:
if "watch?v=" in url:
return url.split("watch?v=")[1]
elif "playlist?list=" in url:
return url.split("playlist?list=")[1]
elif "channel/" in url:
return url.split("channel/")[1]
elif "user/" in url:
return url.split("user/")[1]
elif "@" in url:
return extract_channel_id_from_handle(url)
elif "youtu.be" in url:
return url.split("youtu.be/")[1]
flash("Error: Invalid YouTube URL", category="error")
return None
except Exception as e:
flash("An error occurred while processing the YouTube URL: {}".format(e), category="error")
return None
def extract_channel_id_from_handle(url):
handle = url.split("@")[1]
operational_api_url = "https://yt.lemnoslife.com/channels?handle=" + handle
response = requests.get(operational_api_url)
if response.status_code == 200:
return response.json()["items"][0]["id"]
else:
flash("Error: Failed to retrieve YouTube channel ID from API", category="error")
return None
def rename_files(requested_files, youtube_id):
# cache_directory_path = "/output/{}/cache".format(youtube_id)
video_dir_path = "/output/{}/videos".format(youtube_id)
renamed_files = []
if not os.path.exists("/tmp/calibre_web"):
os.makedirs("/tmp/calibre_web")
for video_id in requested_files:
# video_json_path = os.path.join(cache_directory_path, "videos.json")
video_webm_path = os.path.join(video_dir_path, video_id, "video.webm")
thumbnail_path = os.path.join(video_dir_path, video_id, "video.webp")
try:
thumbnail_path_new = os.path.join("/tmp", "calibre_web", "{}.webp".format(video_id + youtube_id))
move(thumbnail_path, thumbnail_path_new)
video_webm_path_new = os.path.join("/tmp", "calibre_web", "{}.webm".format(video_id + youtube_id))
move(video_webm_path, video_webm_path_new)
renamed_files.append(video_webm_path_new)
if not os.listdir(video_dir_path):
os.rmdir(video_dir_path)
except Exception as e:
flash("An error occurred while renaming the YouTube video file: {}".format(e), category="error")
return None
# def extract_youtube_url(url):
# try:
# if "youtube.com" in url:
# if "watch?v=" in url:
# return url.split("watch?v=")[1]
# elif "playlist?list=" in url:
# return url.split("playlist?list=")[1]
# elif "channel/" in url:
# return url.split("channel/")[1]
# elif "user/" in url:
# return url.split("user/")[1]
# elif "@" in url:
# return extract_channel_id_from_handle(url)
# elif "youtu.be" in url:
# return url.split("youtu.be/")[1]
# flash("Error: Invalid YouTube URL", category="error")
# return None
# except Exception as e:
# flash("An error occurred while processing the YouTube URL: {}".format(e), category="error")
# return None
# def extract_channel_id_from_handle(url):
# handle = url.split("@")[1]
# operational_api_url = "https://yt.lemnoslife.com/channels?handle=" + handle
# response = requests.get(operational_api_url)
# if response.status_code == 200:
# return response.json()["items"][0]["id"]
# else:
# flash("Error: Failed to retrieve YouTube channel ID from API", category="error")
# return None
# def rename_files(requested_files, youtube_id):
# # cache_directory_path = "/output/{}/cache".format(youtube_id)
# video_dir_path = "/output/{}/videos".format(youtube_id)
# renamed_files = []
# if not os.path.exists("/tmp/calibre_web"):
# os.makedirs("/tmp/calibre_web")
# for video_id in requested_files:
# # video_json_path = os.path.join(cache_directory_path, "videos.json")
# video_webm_path = os.path.join(video_dir_path, video_id, "video.webm")
# thumbnail_path = os.path.join(video_dir_path, video_id, "video.webp")
# try:
# thumbnail_path_new = os.path.join("/tmp", "calibre_web", "{}.webp".format(video_id + youtube_id))
# move(thumbnail_path, thumbnail_path_new)
# video_webm_path_new = os.path.join("/tmp", "calibre_web", "{}.webm".format(video_id + youtube_id))
# move(video_webm_path, video_webm_path_new)
# renamed_files.append(video_webm_path_new)
# if not os.listdir(video_dir_path):
# os.rmdir(video_dir_path)
# except Exception as e:
# flash("An error occurred while renaming the YouTube video file: {}".format(e), category="error")
# return None
return renamed_files
# return renamed_files
@editbook.route("/admin/book/convert/<int:book_id>", methods=['POST'])
@login_required_if_no_ano
@ -977,15 +996,16 @@ def move_coverfile(meta, db_book):
try:
os.makedirs(new_cover_path, exist_ok=True)
image = Image.open(cover_file)
# crop image to square 150x150 using smartcrop
cropper = smartcrop.SmartCrop()
result = cropper.crop(image, 150, 150)
x, y, width, height = result['top_crop']['x'], result['top_crop']['y'], \
result['top_crop']['width'], result['top_crop']['height']
cropped_image = image.crop((x, y, x + width, y + height))
cropped_image.save(os.path.join(new_cover_path, "cover.jpg"))
# image = Image.open(cover_file)
# # crop image to square 150x150 using smartcrop
# cropper = smartcrop.SmartCrop()
# result = cropper.crop(image, 150, 150)
# x, y, width, height = result['top_crop']['x'], result['top_crop']['y'], \
# result['top_crop']['width'], result['top_crop']['height']
# cropped_image = image.crop((x, y, x + width, y + height))
# cropped_image.save(os.path.join(new_cover_path, "cover.jpg"))
move(cover_file, os.path.join(new_cover_path, "cover.jpg"))
if meta.cover:
os.unlink(meta.cover)

@ -86,8 +86,7 @@
<li>
<button id="btn-download-youtube" class="btn btn-default navbar-btn" data-toggle="modal"
data-target="#youtubeDownloadModal">
<span class="glyphicon glyphicon-download-alt"></span>
{{_(' Add YouTube Videos')}}
{{_('Download')}}
</button>
</li>
{% endif %}

@ -148,7 +148,7 @@
<div class="modal-header">
<button type="button" class="close" data-dismiss="modal" aria-label="Close"><span
aria-hidden="true">&times;</span></button>
<h4 class="modal-title" id="youtubeDownloadModalLabel">{{_('Download a YouTube Channel or Playlist to your
<h4 class="modal-title" id="youtubeDownloadModalLabel">{{_('Download a video to your
Internet-in-a-Box')}}</h4>
</div>
<div class="modal-body">
@ -156,10 +156,10 @@
method="post">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<div class="form-group">
<label for="youtubeURL" class="col-sm-2 control-label">{{_('YouTube URL')}}</label>
<label for="youtubeURL" class="col-sm-2 control-label">{{_('URL')}}</label>
<div class="col-sm-10">
<input type="text" class="form-control" id="youtubeURL" name="youtubeURL"
placeholder="{{_('Enter YouTube URL')}}">
placeholder="{{_('Enter URL')}}">
</div>
</div>
<!-- Advanced Options Toggle Button -->

@ -16,10 +16,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
# import json
import datetime
import os
import hashlib
import shutil
import sqlite3
from subprocess import run
from tempfile import gettempdir
from flask_babel import gettext as _
@ -216,7 +218,7 @@ def pdf_meta(tmp_file_path, original_file_name, original_file_extension):
cover=pdf_preview(tmp_file_path, original_file_name),
description=subject,
tags=tags,
series="",Deldesir y
series="",
series_id="",
languages=','.join(languages),
publisher=publisher,
@ -246,115 +248,90 @@ def pdf_preview(tmp_file_path, tmp_dir):
log.warning('On Windows this error could be caused by missing ghostscript')
return None
def video_metadata(tmp_file_path, original_file_name, original_file_extension):
video_cover(tmp_file_path)
meta = BookMeta(
file_path=tmp_file_path,
extension=original_file_extension,
title=original_file_name,
author='Unknown',
cover=os.path.splitext(tmp_file_path)[0] + '.cover.jpg',
description='',
tags='',
series="",
series_id="",
languages="",
publisher="",
pubdate="",
identifiers=[])
return meta
def video_cover(tmp_file_path):
"""generate cover image from video using ffmpeg"""
ffmpeg_executable = os.getenv('FFMPEG_PATH', 'ffmpeg')
ffmpeg_output_file = os.path.splitext(tmp_file_path)[0] + '.cover.jpg'
ffmpeg_args = [
ffmpeg_executable,
'-i', tmp_file_path,
'-vframes', '1',
'-y', ffmpeg_output_file
]
try:
ffmpeg_result = run(ffmpeg_args, capture_output=True, check=True)
log.debug(f"ffmpeg output: {ffmpeg_result.stdout}")
except Exception as e:
log.warning(f"ffmpeg failed: {e}")
return None
def image_metadata(tmp_file_path, original_file_name, original_file_extension):
shutil.copyfile(tmp_file_path, os.path.splitext(tmp_file_path)[0] + '.cover.jpg')
meta = BookMeta(
file_path=tmp_file_path,
extension=original_file_extension,
title=original_file_name,
author='Unknown',
cover=os.path.splitext(tmp_file_path)[0] + '.cover.jpg',
description='',
tags='',
series="",
series_id="",
languages="",
publisher="",
pubdate="",
identifiers=[])
return meta
def video_metadata(tmp_file_path, original_file_name, original_file_extension):
video_id = os.path.splitext(original_file_name)[0][:11]
youtube_id = os.path.splitext(original_file_name)[0][11:]
json_file_path = os.path.join("/output", youtube_id, "cache", "videos.json")
coverfile_path = os.path.splitext(original_file_name)[0] + '.webp'
if os.path.isfile(coverfile_path):
coverfile_path = os.path.splitext(tmp_file_path)[0] + '.cover.webp'
os.rename(os.path.splitext(original_file_name)[0] + '.webp', coverfile_path)
os.remove(os.path.splitext(original_file_name)[0] + '.webm')
return coverfile_path
if os.path.isfile(json_file_path):
with open(json_file_path) as json_file:
data = json.load(json_file)
title = data[video_id]['snippet']['title']
author = data[video_id]['snippet']['videoOwnerChannelTitle']
description = data[video_id]['snippet']['description']
publisher = 'YouTube'
pubdate = data[video_id]['contentDetails']['videoPublishedAt'][0:10]
meta = BookMeta(
file_path=tmp_file_path,
extension=original_file_extension,
title=title,
author=author,
cover=coverfile_path,
description=description,
tags='',
series="",
series_id="",
languages="",
publisher=publisher,
pubdate=pubdate,
identifiers=[])
return meta
# video_id = os.path.splitext(original_file_name)[0][:11]
# youtube_id = os.path.splitext(original_file_name)[0][11:]
# json_file_path = os.path.join("/output", youtube_id, "cache", "videos.json")
# coverfile_path = os.path.splitext(original_file_name)[0] + '.webp'
# if os.path.isfile(coverfile_path):
# coverfile_path = os.path.splitext(tmp_file_path)[0] + '.cover.webp'
# os.rename(os.path.splitext(original_file_name)[0] + '.webp', coverfile_path)
# os.remove(os.path.splitext(original_file_name)[0] + '.webm')
# return coverfile_path
# if os.path.isfile(json_file_path):
# with open(json_file_path) as json_file:
# data = json.load(json_file)
# title = data[video_id]['snippet']['title']
# author = data[video_id]['snippet']['videoOwnerChannelTitle']
# description = data[video_id]['snippet']['description']
# publisher = 'YouTube'
# pubdate = data[video_id]['contentDetails']['videoPublishedAt'][0:10]
# original file name example: Baby_Calm_Down_FULL_HD_Selena_Gomez_Rema_Official_Music_Video_2023_600.22k_[yJ_qhwk2X2c].webm
# extract video id from original file name by extracting the string between the brackets "[...]"
# if there is a "]" before the extension:
if ']' in original_file_name:
video_id = original_file_name.split('[')[1].split(']')[0]
# with the video_id, we can get the video info from /var/tmp/download.db (sqlite3 database)
# Here are the columns from the database's media table:
# id, size, duration, time_uploaded, time_created, time_modified, time_deleted, time_downloaded, fps, view_count, path, webpath, extractor_id, title, live_status, uploader, width, height, type, video_count, audio_count, language, description, playlist_id, chapter_count, chapters
# we need the extractor_id (the video_id), title, uploader, description, time_uploaded
# we need to check if the video_id is in the database and access the columns we need to get the metadata for BookMeta
download_db_path = "/var/tmp/download.db"
if os.path.isfile(download_db_path):
conn = sqlite3.connect(download_db_path)
c = conn.cursor()
c.execute("SELECT * FROM media WHERE extractor_id=?", (video_id,))
row = c.fetchone()
if row is not None:
title = row[13]
author = row[15]
description = row[22]
publisher = 'YouTube'
# example of time_uploaded: 1696464000
pubdate = row[3] # time_uploaded
# convert time_uploaded to YYYY-MM-DD format
pubdate = datetime.datetime.fromtimestamp(pubdate).strftime('%Y-%m-%d')
# if no cover image, generate one
if not os.path.isfile(os.path.splitext(tmp_file_path)[0] + '.cover.jpg'):
generate_video_cover(tmp_file_path)
meta = BookMeta(
file_path=tmp_file_path,
extension=original_file_extension,
title=title,
author=author,
cover=os.path.splitext(tmp_file_path)[0] + '.cover.jpg',
description=description,
tags='',
series="",
series_id="",
languages="",
publisher=publisher,
pubdate=pubdate,
identifiers=[])
return meta
else:
log.warning('Cannot find download.db, using default')
else:
ffmpeg_executable = os.getenv('FFMPEG_PATH', 'ffmpeg')
ffmpeg_output_file = os.path.splitext(tmp_file_path)[0] + '.cover.jpg'
ffmpeg_args = [
ffmpeg_executable,
'-i', tmp_file_path,
'-vframes', '1',
'-y', ffmpeg_output_file
]
try:
ffmpeg_result = run(ffmpeg_args, capture_output=True, check=True)
log.debug(f"ffmpeg output: {ffmpeg_result.stdout}")
except Exception as e:
log.warning(f"ffmpeg failed: {e}")
return None
# ffmpeg_executable = os.getenv('FFMPEG_PATH', 'ffmpeg')
# ffmpeg_output_file = os.path.splitext(tmp_file_path)[0] + '.cover.jpg'
# ffmpeg_args = [
# ffmpeg_executable,
# '-i', tmp_file_path,
# '-vframes', '1',
# '-y', ffmpeg_output_file
# ]
# try:
# ffmpeg_result = run(ffmpeg_args, capture_output=True, check=True)
# log.debug(f"ffmpeg output: {ffmpeg_result.stdout}")
# except Exception as e:
# log.warning(f"ffmpeg failed: {e}")
# return None
meta = BookMeta(
file_path=tmp_file_path,
@ -372,6 +349,23 @@ def video_metadata(tmp_file_path, original_file_name, original_file_extension):
identifiers=[])
return meta
def generate_video_cover(tmp_file_path):
ffmpeg_executable = os.getenv('FFMPEG_PATH', 'ffmpeg')
ffmpeg_output_file = os.path.splitext(tmp_file_path)[0] + '.cover.jpg'
ffmpeg_args = [
ffmpeg_executable,
'-i', tmp_file_path,
'-vframes', '1',
'-y', ffmpeg_output_file
]
try:
ffmpeg_result = run(ffmpeg_args, capture_output=True, check=True)
log.debug(f"ffmpeg output: {ffmpeg_result.stdout}")
except Exception as e:
log.warning(f"ffmpeg failed: {e}")
return None
def image_metadata(tmp_file_path, original_file_name, original_file_extension):
shutil.copyfile(tmp_file_path, os.path.splitext(tmp_file_path)[0] + '.cover.jpg')

Loading…
Cancel
Save