git ssb

0+

thomas-dev-patchwork / MediaProcessor



Commit b38fff921639daeb0858f973d29e73f904c3fadf

Initial Media Processor files

Thomas Freedman committed on 2/12/2023, 8:13:20 PM

Files changed

Exceptions.pyadded
MediaProcessor.pyadded
addSingles.pyadded
metadataFields.jsonadded
mpServerDefinitions.pyadded
Exceptions.pyView
@@ -1,0 +1,7 @@
1 +#!/usr/bin/env python3
2 +
3 +# Python user-defined exceptions
4 +class IPFSexception(Exception):
5 + "Raised an error running an IPFS command"
6 + pass
7 +
MediaProcessor.pyView
@@ -1,0 +1,806 @@
1 +#!/usr/bin/env python3
2 +#
3 +# MediaProcessor.py - Program to process media files and add them to IPFS.
4 +#
5 +# A SQLite3 database is used to store IPFS hashes and detailed metadata for
6 +# each item. This lightweight SQL engine is filesystem based (no server).
7 +#
8 +# This script requires 2 JSON formatted config files containing schema column
9 +# names and other info, like filter settings for video length and upload date.
10 +#
11 +# Youtube is known to change the metadata keys, making it difficult to rely on
12 +# youtube metadata for the schema. After sampling 1000s of videos a common set
13 +# of metadata fields was arrived at. The metadataFields.json file is currently
14 +# where these fields / database columns are defined.
15 +#
16 +# The metadata provided by youtube-dl extractors is not totally normalized. To
17 +# insure a database record is saved for every file added to IPFS, an alternate
18 +# metadata dictionary is used which always sets these 9 essential columns:
19 +#
20 +# sqlts, pky, g_idx, grupe, vhash, vsize, season_number, url and _filename
21 +#
22 +# and any others from the downloaded metadata whose field names are found in
23 +# the metadataFields.json file. A substitute metadata dictionary is used when a
24 +# SQL error occurs while adding a new row. If the 2nd attempt fails it's logged
25 +# along with the IPFS hash, so a record can be added manually at a later time.
26 +# Usually the substitution is required because there are missing fields in the
27 +# metadata provided by some youtube-dl extractors specific to a video source.
28 +#
29 +from __future__ import unicode_literals
30 +from email.message import EmailMessage
31 +from mpServerDefinitions import * # Server specific CONSTANTS
32 +from tinytag import TinyTag # Gets meta info from MP3 files
33 +from yt_dlp import utils
34 +from datetime import *
35 +import Exceptions
36 +import subprocess
37 +import threading
38 +import smtplib
39 +import sqlite3
40 +import yt_dlp
41 +import time
42 +import json
43 +import ssl
44 +import sys
45 +import os
46 +import re
47 +
48 +
49 +class MediaProcessor:
50 +
51 + def __init__(self):
52 + self.SQLrows2Add = [] # Lists populated by download callback threads
53 + self.ErrorList = []
54 + self.Config = {} # Loaded from config file (JSON file format)
55 +
56 + """ config file template, JSON format. Use single quotes only, null not None:
57 + self.Config {
58 + "Comment": [ "Unreferenced - for comments inside config file",
59 + "It is difficult sometimes to find the video link for Brightcove videos.",
60 + "This is the method that I use with Firefox. You will need 2 things:",
61 + "a) AccountID",
62 + "b) VideoID",
63 + "Get these by right-clicking on the video and select Player Information.",
64 + "Use ctrl-C to copy the info, and plug them into this template:",
65 + "http://players.brightcove.net/<AccountID>/default_default/index.html?videoId=<VideoID>",
66 + "Works with Firefox 68.8, 75.0 and probably others as of May 12, 2020"
67 + ],
68 +
69 + "DLbase": "dir", Folder for all downloaded files organized by grupe
70 + "DLeLoselfg":"file", File for exceptions / errors during downloads
71 + "DLarch": "file", This tracks downloads to skip those already done
72 + "DLmeta": "file", The metadata definition is now split into this file
73 +
74 + "DLOpts": { Name / value pairs for youtube-dl options
75 + "optName1": "value1", NOT always the same as cmd line opts
76 + ...
77 + },
78 +
79 + "Grupes": { # Dictionary of grupes to download, with its own criteria
80 + "gName1": { Group name containing video selection criteria
81 + "Active": true, Enable or disable downloads for this grupe
82 + "Duration": 0, Min size of video in seconds; for no limits use 0
83 + "Quota": null, Limits size of grupe's DL folder to N files
84 + "Start": null, Earliest upload date string or (YYYYMMDD) or null
85 + "End": null, Latest upload date or null. 1, 2 or neither OK
86 + "Stop": null, Stop downloading from playlist after this many DLs
87 + "url1",
88 + "url2",
89 + ...
90 + ]
91 + },
92 + ... Additional grupes
93 + },
94 +
95 + "MetaColumns": [ Contains the list of database fields for metadata for
96 + ... the video downloaded along with it in JSON format.
97 + This section is loaded from a separate file which is
98 + specified by the DLbase and DLmeta key defined above.
99 + ]
100 + }
101 + """
102 +
103 + def usage(self):
104 + cmd = sys.argv[0]
105 + str = "\nUses youtube-dl to download videos and add them to IPFS and track\n"
106 + str += "the results in a SQLite database.\n\n"
107 + str += "Usage: " + cmd + " [-h] | <-c config> <-d sqlite> [-g grupe]\n"
108 + str += "-h or no args print this help message.\n\n"
109 + str += "-c is a JSON formated config file that specifies the target groups,\n"
110 + str += "their URL(s), downloader options, the base or top level folder for\n"
111 + str += "the groups of files downloaded and the list of metadata columns.\n"
112 + str += "-d is the SQLite filename (it is created if it doesn't exist).\n\n"
113 + str += "-g ignore all but the grupes in config except the one name after -g.\n\n"
114 + print(str)
115 + exit(0)
116 +
117 + # Return detailed info from the stack trace. Default is to
118 + # return only the last item, pass True as 2nd arg for all items.
119 + def decodeException(self, e, all=False):
120 + trace = []
121 + stk = 0
122 + tb = e.__traceback__
123 + while tb is not None:
124 + stk += 1
125 + trace.append({
126 + "stk": stk,
127 + "filename": tb.tb_frame.f_code.co_filename,
128 + "function": tb.tb_frame.f_code.co_name,
129 + "lineno": tb.tb_lineno
130 + })
131 + tb = tb.tb_next
132 + if not all:
133 + trace = trace[-1]
134 + del trace["stk"]
135 + return {
136 + 'type': type(e).__name__,
137 + 'message': str(e),
138 + 'trace': trace
139 + }
140 +
141 + # Flattens a nested JSON object and returns a python dictionary
142 + def flatten_json(self, nested_json):
143 + out = {}
144 + def flatten(x, name=''):
145 + if type(x) is dict:
146 + for a in x:
147 + flatten(x[a], name + a + '_')
148 + elif type(x) is list:
149 + i = 0
150 + for a in x:
151 + flatten(a, name + str(i) + '_')
152 + i += 1
153 + else:
154 + out[name[:-1]] = x
155 +
156 + flatten(nested_json)
157 + return out
158 +
159 + # Create the SQLite database file if it doesn't exist, using the
160 + # MetaColumns from config. If it already exists, open a connection
161 + # to it. Always returns a connection object to the dbFile.
162 + def openSQLiteDB(self, dbFile):
163 + newDatabase = not os.path.exists(dbFile)
164 + columns = self.Config['MetaColumns']
165 + conn = sqlite3.connect(dbFile)
166 + if newDatabase:
167 + sql = '''create table if not exists IPFS_INFO (
168 + "sqlts" TIMESTAMP NOT NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now', 'localtime')),
169 + "pky" INTEGER PRIMARY KEY AUTOINCREMENT,
170 + "db_hash" TEXT,
171 + "dl_good" INTEGER DEFAULT 0,
172 + "dl_errs" INTEGER DEFAULT 0);'''
173 + conn.execute(sql)
174 +
175 + sql = '''create table if not exists IPFS_HASH_INDEX (
176 + "sqlts" TIMESTAMP NOT NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now', 'localtime')),
177 + "pky" INTEGER PRIMARY KEY AUTOINCREMENT,
178 + "g_idx" TEXT,
179 + "grupe" TEXT,
180 + "vhash" TEXT,
181 + "vsize" TEXT,
182 + "mhash" TEXT'''
183 + for c in columns:
184 + sql += ',\n\t"' + c + '" TEXT'
185 + sql += ')'
186 + conn.execute(sql)
187 + return conn
188 +
189 + # Pin a file referenced by hash to local IPFS. Returns True if successfull
190 + def pin2IPFS(self, hash):
191 + cmd = ["ipfs", "pin", "add", hash]
192 + out = subprocess.run(cmd, stderr=subprocess.DEVNULL,
193 + stdout=subprocess.PIPE).stdout.decode('utf-8')
194 + if out.startswith("pinned"): return True
195 + else: return False
196 +
197 + # Add a file to IPFS and return the hash for it.
198 + # Returns 46 character hash: "Qm....1234567890123456789012345678901234567890"
199 + # or a zero length string
200 + def add2IPFS(self, file):
201 + cmd = ["ipfs", "add", "-Q", file]
202 + out = subprocess.run(cmd, stderr=subprocess.DEVNULL,
203 + stdout=subprocess.PIPE).stdout.decode('utf-8')
204 + if out.startswith("Qm") and len(out) == 47:
205 + hash = out[0:46] # Return the 46 character hash - newline
206 + else: hash = ""
207 + return hash
208 +
209 + # Add the updated SQLite DB to IPFS under the static IPNS name associated with
210 + # STATIC_DB_HASH. That way the most recent DB can always be obtained by wget:
211 + # https://ipfs.io/ipns/<STATIC_DB_HASH value>.
212 + # 2023/01/30: Despite closing the DB conn, it seems now prev DB is published.
213 + # I suspect the sqlite import doesn't flush the file on close now, but it did.
214 + def publishDB(self, file):
215 + newDBhash = self.add2IPFS(file) # Add the updated SQLite database to IPFS
216 + if len(newDBhash) == 46:
217 + if STATIC_DB_HASH:
218 + cmd = ["ipfs", "name", "publish", "-Q", "-key=" + STATIC_DB_HASH, newDBhash]
219 + cp = subprocess.run(cmd, capture_output=True, text=True)
220 + if cp.returncode != 0:
221 + print(f"Error publishing database to IPFS: {cp.stderr}", flush=True)
222 + return newDBhash
223 +
224 + # Reopen the SQL DB and update the IPFS_INFO table with info for this run.
225 + # The info for this run will not be available in the database published to
226 + # IPFS until the next database is published (a chicken & egg scenario) due
227 + # because the hash of the published database is in the IPFS_INFO table.
228 + def updateRunInfo(self, sqlFile, dbHash, good, errs):
229 + conn = self.openSQLiteDB(sqlFile)
230 + conn.row_factory = sqlite3.Row # Results as python dictionary
231 + sql = '''INSERT INTO IPFS_INFO ("db_hash", "dl_good", "dl_errs")
232 + VALUES (?,?,?);'''
233 +
234 + if conn is not None:
235 + conn.execute(sql, (dbHash, good, errs))
236 + conn.commit()
237 + conn.execute('VACUUM;') # Flush to the currently open database file
238 + conn.close() # This will be the very last thing done this run
239 +
240 + # Filter the list of URLs provided in config file to skip URLs we've already
241 + # DL'd, based on a SQLite query. Filtering is based on the video ID. This is
242 + # not of much value b/c most URLs are for a channel or playlist, plus yt-dlp
243 + # already provides this function.
244 + def filterUrls(self, conn, configUrls):
245 + regx = re.compile(r'^.*?watch\?v=([^&]+)&*.*$', re.IGNORECASE)
246 + cursor = conn.cursor()
247 + filteredUrls = []
248 + for url in configUrls:
249 + match = re.search(regx, url)
250 + if match is not None: # Is this a single video url?
251 + id = match.group(1)
252 + sql = "SELECT COUNT(*) FROM IPFS_HASH_INDEX WHERE id = ?;"
253 + if cursor.execute(sql, [id]).fetchone()[0] == 0:
254 + filteredUrls.append(url)
255 + else: filteredUrls.append(url)
256 + return filteredUrls
257 +
258 + # Create a grupe index file containing a list of all video and metadata IPFS
259 + # hashes for the grupe. Add it to IPFS & return the hash and count of rows
260 + # updated. -> object ???
261 + def updateGrupeIndex(self, conn, grupe):
262 + cursor = conn.cursor()
263 +
264 + sql = 'SELECT "v=" || vhash || " " || "m=" || mhash'
265 + sql += ' FROM IPFS_HASH_INDEX'
266 + sql += ' WHERE grupe = ?'
267 + idxFile = f"/tmp/{grupe}_idx.txt"
268 + with open(idxFile, "w") as idx:
269 + for row in cursor.execute(sql, (grupe,)): # Loop through all grupe rows
270 + if row[0]: idx.write(row[0] + '\n')
271 + hash = self.add2IPFS(idxFile)
272 + if len(hash) > 0:
273 + sql = "UPDATE IPFS_HASH_INDEX set g_idx=? WHERE grupe=?"
274 + cursor.execute(sql, (hash, grupe))
275 + conn.commit()
276 + os.remove(idxFile)
277 + return cursor.rowcount, hash
278 +
279 + # This block of code will create group index files for every group in DB,
280 + # then add that file to IPFS. Update every row in the group with that hash,
281 + # and do that for every grupe, so every row in IPFS_HASH_INDEX table gets
282 + # updated. See "updateGrupeIndex" above for details. This just wraps that.
283 + def regenerateAllGrupeIndexes(self, conn):
284 + cursor = conn.cursor()
285 + sql = "sSELECT DISTINCT grupe FROM IPFS_HASH_INDEX"
286 + for row in cursor.execute(sql):
287 + (count, hash) = self.updateGrupeIndex(conn, row[0])
288 + print("Updated %d rows for grupe %s with grupe index hash %s" %
289 + (count, row[0], hash), flush=True)
290 +
291 + # Add a row to SQLite database. Most of the column data is from the JSON
292 + # metadata (gvmjList) downloaded with the video. Note that SQLite is not
293 + # thread safe, so only the main thread updates the DB.
294 + def addRow2db(self, conn, cols, gvmjList):
295 + (grupe, vhash, vsize, mhash, jsn) = gvmjList
296 + cursor = conn.cursor()
297 + jsn["episode_number"] = mhash # Mark row as pinned by adding the hashes
298 + jsn["season_number"] = vhash # to these fields
299 + values = [grupe, vhash, vsize, mhash] # 1st 4 values not in YT metadata
300 + sql = 'INSERT INTO IPFS_HASH_INDEX ("grupe", "vhash", "vsize", "mhash"'
301 +
302 + for col in cols:
303 + sql += ',\n\t"' + col + '"'
304 +
305 + sql += ") VALUES (?,?,?,?" # Now add metadata
306 + for col in cols:
307 + sql += ",?"
308 + values.append(jsn[col])
309 + sql += "); "
310 + cursor.execute(sql, values)
311 + conn.commit()
312 + return cursor.lastrowid
313 +
314 + # This wrapper method calls the addRow2db method above and upon failure makes
315 + # a 2nd attempt to insert the row with an alternate metadata dictionary. The
316 + # alternate dictionary is created by copying the valid metadata into the new
317 + # alternate and adding missing keys, setting their missing values to "?".
318 + #
319 + # The ytdl extractors vary as to the format of the metadata they produce,
320 + # youtube-dl doesn't totally normalize it. If a video file was downloaded and
321 + # an IPFS hash was produced a row will be added with sqlts, pky, g_idx, grupe,
322 + # vhash, vsize, season_number and _filename columns that have known valid data.
323 + def addRow(self, conn, cols, gvmjList):
324 + try:
325 + row = self.addRow2db(conn, cols, gvmjList) # Attempt number one...
326 +
327 + # On failure create a new metadata dictionary for this file. For any
328 + # missing keys, create a key whose value is "?". This is a work-
329 + # around for JSON metadata fields the extractor doesn't provide.
330 + except (sqlite3.OperationalError, KeyError) as e:
331 + newDictionary = {}
332 + (grp, vhash, vsize, mhash, jsn) = gvmjList
333 + for col in cols:
334 + if col in jsn.keys():
335 + newDictionary[col] = jsn[col]
336 + else: newDictionary[col] = "?" # Previously "unknown-value"
337 +
338 + # Try again. Any exception this time will propagate upstream
339 + row = self.addRow2db(conn, cols, (grp, vhash, vsize, mhash, newDictionary))
340 + return row
341 +
342 + # Add a row to the SQLite database for every video downloaded for this grupe,
343 + # print the successes and failures and log the failures to the error log file.
344 + # NOTE: Reduced printed output for Pirate Stick for leaner reporting.
345 + def processGrupeResults(self, conn, cols, items, grupe, eLog):
346 + downloads = len(self.SQLrows2Add)
347 + good = 0
348 +
349 + if downloads > 0:
350 + for dat in self.SQLrows2Add: # dat = (grp, vhash, vSize, mhash, json)
351 + try:
352 + self.addRow(conn, cols, dat)
353 + good += 1 # Sucessfully added to SQLite
354 +
355 + # Failed to add the row to SQLite, but it's saved in IPFS
356 + except Exception as expn:
357 + args = (dat[0], dat[1], dat[2], dat[3], dat[4], decodeException(expn))
358 + er = "SQL Error! Grupe=%s vHash=%s vSize=%s, mHash=%s\n\nJSON=%s\n%s" % args
359 + er += "\nMetadata key/values used:\n"
360 + json = dat[4]
361 + for key in json:
362 + er += "%32s = %s\n" % (key, json[key])
363 + self.ErrorList.append(er)
364 +
365 + self.updateGrupeIndex(conn, grupe)
366 +
367 + # Print and log the list of download failures
368 + failures = len(self.ErrorList)
369 + if len(self.ErrorList) > 0:
370 + eLog.write("PROCESSING ERRORS FOR GRUPE=%s:\n" % grupe)
371 + for error in self.ErrorList:
372 + eLog.write(error + '\n')
373 + eLog.write("END OF ERRORS FOR %s\n\n" % grupe)
374 +
375 + args = (items, downloads, failures)
376 + print("Processed=%d (Succeeded=%d, Failed=%d)" % args, flush=True)
377 + return good, failures
378 +
379 + # Used to determine if folder size limit has been exceeded. NOT recursive
380 + def getSize(self, path):
381 + totalSize = 0
382 + for f in os.listdir(path):
383 + fp = os.path.join(path, f)
384 + totalSize += os.path.getsize(fp)
385 + return totalSize
386 +
387 + # Check if the download folder for this grupe is over the quota (if any) and
388 + # remove the oldest file if it is. The quota is the maximum number of files
389 + # or the maximum amount of space to limit the folder to. Quota is a string of
390 + # integers followed by whitespace & unit string value. If no unit designation
391 + # is specified the quota is the amount of space used in bytes. When the limit
392 + # is exceeded the oldest files are removed to make room. .json and .wav files
393 + # aren't counted in a file count quota, but they are for folder space quotas.
394 + # Removals always remove all files of the same name regardless of extension,
395 + # HOWEVER, wildcard replacement occurs after the 1st . on the left. Also note
396 + # that pruning will never remove the last remaining file.
397 + def pruneDir(self, quota, dir):
398 + max = count = 0
399 + fList = []
400 +
401 + if quota: # Do nothing if no quota specified
402 + q = quota.split(' ') # Quota amount and units, if any
403 + if q[0].isdecimal(): # Check if string is a valid number
404 + max = int(q[0]) # This is the quota limit
405 + if max < 2: # Invalid quota value, zero removed
406 + err = "Invalid quota: " + dir
407 + self.ErrorList.append(err) # Log the error
408 + return False
409 +
410 + for f in os.listdir(dir): # Create a list of candidate files
411 + if f.endswith(EXT_LIST): # Only include primary video files
412 + fList.append(dir + '/' + f) # Prefix the file with path
413 + count += 1 # Count how many in the list
414 + if count < 2: return False # We're done if none or only 1
415 +
416 + old = min(fList, key=os.path.getctime) # Get oldest file
417 +
418 + if len(q) > 1: size = 0 # Quota limits number of files
419 + else: size = self.getSize(dir) # Quota limits space used
420 + if count > max or size > max: # Over the quota?
421 + rm = old.rsplit('.')[0] + ".*" # Replace extension with a wildcard
422 + os.system("rm -rf %s" % rm) # Easy way to do all related files
423 + return True # Oldest file removed
424 + else: return False
425 +
426 + # Rename filename with numeric value for media duration
427 + def fixDuration(self, name, seconds=0):
428 + duration = str(int(float(seconds)))
429 + noDur = SEPARATOR + "NA" + SEPARATOR # SEPARATOR - see serverDefs import
430 + newDur = SEPARATOR + duration + SEPARATOR
431 + zeroDur = SEPARATOR + "0" + SEPARATOR
432 + if noDur in name:
433 + newName = name.replace(noDur, newDur)
434 + elif zeroDur in name:
435 + newName = name.replace(zeroDur, newDur)
436 + else:
437 + newName = name
438 + if newName != name: os.rename(name, newName)
439 + return newName
440 +
441 + # Return info provided within the downloaded filename
442 + def parseFilename(self, file):
443 + dir, base = file.rsplit('/', 1) # Separate grupe folder & downloaded file
444 + grp = dir.rsplit('/', 1)[1] # Extract grupe from full path
445 + pb, ext = os.path.splitext(file) # Path+Base in [0], extension in [1]
446 + mFile = pb + ".info.json" # json metadata file for this download
447 + vFile = file # Full pathname of downloaded file
448 + return [dir, base, grp, pb, ext, vFile, mFile]
449 +
450 + # This method is a process thread started with each successful download. It
451 + # is started as a daemon thread to add the video and its' metadata to IPFS,
452 + # and creates lists for errors and the files downloaded (to update SQLite).
453 + def processVideo(self, file):
454 + vHash = mHash = jFlat = vSize = duration = None
455 +
456 + print(f"\nProcessing video file: {file}", flush=True)
457 + p = self.parseFilename(file)
458 + dir,base,grp,pb,ext,vFile,mFile = p # Get all the values from full pathname
459 +
460 + # The grupe quota limits the size of the download folder. It's a string
461 + # containing an integer with a space followed by an optional units word.
462 + quota = self.Config["Grupes"][grp]["Quota"] # i.e. "20 files" or "2500000000"
463 + pruned = False
464 + while self.pruneDir(quota, dir): # Keep pruning until under quota
465 + time.sleep(0.01)
466 + pruned = True
467 + if pruned: self.ErrorList.append("WARNING: Folder limit reached and pruned!")
468 +
469 + # Process the metadata JSON file first, as we may need information from it
470 + try:
471 + with open(mFile, 'r') as jsn: # Read the entire JSON metadata file
472 + jDict = json.load(jsn) # Create a python dictionary from it
473 + jFlat = self.flatten_json(jDict) # Flatten the dictionary
474 + id3 = TinyTag.get(file)
475 + if id3.year: jFlat["release_year"] = id3.year
476 + if id3.title: jFlat["title"] = id3.title
477 + if id3.artist: jFlat["artist"] = id3.artist
478 + if id3.duration: duration = id3.duration
479 + if not duration: duration = 0 # No duration from yt-dlp or id3
480 + jFlat["duration"] = duration # At least make it's numeric, and
481 + mf = self.fixDuration(mFile, duration) # rename both video & metadata
482 + vf = self.fixDuration(file, duration) # files too
483 + p = self.parseFilename(vf) # Reparse for name changes
484 + mHash = self.add2IPFS(mf) # Add the metadata file to IPFS
485 + except Exception as e:
486 + er = self.decodeException(e)
487 + self.ErrorList.append(f"Metadata problem {file}:\n{er}")
488 +
489 + # Log all errors, but add to SQLite if we got a valid video hash from IPFS
490 + try:
491 + dir,base,grp,pb,ext,vFile,mFile = p # Retrieve latest values
492 + vHash = self.add2IPFS(vFile) # Add video file to IPFS
493 + if len(vHash) == 46 and jFlat:
494 + vSize = os.path.getsize(vFile) # Get file size for database
495 + jFlat['_filename'] = base # Just use the base filename
496 + if os.path.exists(vFile): # Delete once it's in IPFS
497 + os.remove(vFile) # Remove only the video file
498 + else: raise IPFSexception
499 +
500 + except Exception as e: # Log all errors that may occur
501 + er = self.decodeException(e)
502 + args = (grp, vHash, mHash, base, er)
503 + self.ErrorList.append("Grupe=%s vHash=%s mHash=%s vFile=%s\n%s" % args)
504 +
505 + # If vHash is valid create a SQLite entry for it, regardless of metadata
506 + finally:
507 + # print(f"g={base} v={vSize} jLen={len(jFlat)} j={type(jFlat)}", flush=True)
508 + if vHash.startswith("Qm"):
509 + self.SQLrows2Add.append([grp, vHash, vSize, mHash, jFlat])
510 +
511 + # Starts a daemon thread to process the downloaded file. youtube-dl provides no
512 + # way to obtain information about the ffmpeg post processor, and adding to IPFS
513 + # can be time consuming. Using threads to handle files allows the main thread
514 + # 2 download other files concurrently with IPFS additions. See the processVideo
515 + # function above for specifics of how downloaded files are processed.
516 + def callback(self, d):
517 + if d['status'] == 'finished':
518 + path = d['filename'] # Callback provides full pathname
519 + nam = path.rsplit('/', 1)[1].split("~^~", 1)[0]
520 + th = threading.Thread(name=nam, target=self.processVideo,
521 + args=([path]), daemon=True)
522 + th.start() # Start the thread and continue
523 +
524 + ##############################################################################
525 + # #
526 + # Primary program loop. The youtube-dl library takes care of downloading. #
527 + # The callback function above processes each download, adding files to IPFS #
528 + # and creating a list of rows to add to the SQLite DB by this function. #
529 + # #
530 + ##############################################################################
531 + def ytdlProcess(self, conn):
532 + sep = SEPARATOR
533 + cols = self.Config['MetaColumns']
534 + dlBase = self.Config['DLbase']
535 + dlArch = dlBase + self.Config['DLarch']
536 + dlElog = dlBase + self.Config['DLeLog']
537 + dlOpts = self.Config['DLOpts']
538 + grupeList = self.Config['Grupes']
539 + total = 0
540 + failures = 0
541 + # NOTE: items missing from downloaded metadata will be replaced with "?"
542 + # Media duration is important for sorting files into schedules.
543 + ytdlFileFormat = "/%(id)s" + sep + "%(duration)s"+ sep + ".%(ext)s"
544 +
545 + dlOpts['ignoreerrors'] = True # Until can catch from yt-dlp
546 + dlOpts['downloader'] = "aria2c" # Fast downloader
547 + dlOpts['downloader-args'] = "aria2c:'-c -j 3 -x 3 -s 3 -k 1M'"
548 +# dlOpts['verbose'] = True # Useful for debugging
549 +
550 + # Add crucial download options
551 + # dlOpts['force-ipv6'] = True # May not be enabled on host
552 + dlOpts['writeinfojson'] = True
553 + dlOpts['progress_hooks']=[self.callback] # Called at least 1ce per video
554 + dlOpts['download_archive'] = dlArch # Facilitates updates w/o dupes
555 + dlOpts['restrictfilenames'] = True # Required format for DLd files
556 + eLog = open(dlElog, mode='a+') # Error log file for all grupes
557 + for grupe in grupeList:
558 + if not grupeList[grupe]['Active']: continue # Skip this grupe
559 + self.SQLrows2Add = [] # Empty the list of downloads
560 + self.ErrorList = [] # Empty the list of errors
561 + print("\nBEGIN " + grupe, flush=True) # Marks start of group in log
562 +
563 + if not os.path.isdir(dlBase+grupe): # If it doesn't exist
564 + os.mkdir(dlBase + grupe) # create folder 4 grupe
565 +
566 + # Add qualifier for minimum video duration (in seconds)
567 + dur = grupeList[grupe]['Duration']
568 + if dur != None and dur > 0:
569 + dur = "duration > %d" % dur
570 + dlOpts['match_filter'] = utils.match_filter_func(dur)
571 + elif 'match_filter' in dlOpts.keys():
572 + del dlOpts['match_filter'] # No duration filter
573 +
574 + # Add release date range qualifier; either one or both are OK
575 + sd = grupeList[grupe]['Start'] # null or YYYYMMDD format
576 + ed = grupeList[grupe]['End'] # in JSON config file
577 + if sd != None or ed != None:
578 + dr = utils.DateRange(sd, ed) # Dates are inclusive
579 + dlOpts['daterange'] = dr # Always set a date range
580 + elif 'daterange' in dlOpts.keys():
581 + del dlOpts['daterange'] # No date filter
582 +
583 + # This stops downloading from playlist after this many videos
584 + stop = grupeList[grupe]['Stop']
585 + if stop != None and stop > 0: dlOpts['playlistend'] = stop
586 + elif 'playlistend' in dlOpts.keys():
587 + del dlOpts['playlistend'] # No playlist limit
588 +
589 + # This will change downloaded file folder for each grupe
590 + dlOpts['outtmpl'] = dlBase + grupe + ytdlFileFormat
591 +
592 + urls = grupeList[grupe]['urls']
593 + # Don't even try downloading videos we already have in the DB
594 + newUrls = urls # self.filterUrls(conn, urls)
595 +
596 + yt_dlp.YoutubeDL(dlOpts).download(newUrls) # BEGIN DOWNLOADING!!!
597 + print(f"YOUTUBE-DL PROCESSING COMPLETE for {grupe}", flush=True)
598 +
599 + # Wait for all callback threads to finish
600 + for th in threading.enumerate():
601 + if th.name != "MainThread":
602 + while th.is_alive(): time.sleep(0.1)
603 +
604 + # Log errors and print results of this DL grupe
605 + good, fails = self.processGrupeResults(conn, cols,
606 + len(urls), grupe, eLog)
607 + total += good
608 + failures += fails # Accumulate totals for this run
609 +
610 + eLog.close()
611 + return total, failures
612 +
613 + #
614 + # Display a summary of this download session. Return them for emailing.
615 + #
616 + def displaySummary(self, conn):
617 + #
618 + # Print the total number of files in the DB since it was created
619 + #
620 + sql = "SELECT COUNT(*), MIN(sqlts) FROM IPFS_HASH_INDEX;"
621 + cols = (conn.cursor().execute(sql)).fetchone() # 1 row, 2 columns
622 + total = f"\n{cols[0]} files downloaded and indexed since {cols[1][0:10]}"
623 + print(total, flush=True)
624 + mail = total + '\n'
625 + #
626 + # Report the number of files added to each grupe in the last 30 days
627 + #
628 + strt = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
629 + sql = "SELECT DISTINCT SUBSTR(sqlts, 6, 5) as tme, grupe, count(*) as cnt "
630 + sql += "FROM IPFS_HASH_INDEX WHERE sqlts > ? "
631 + sql += "GROUP BY grupe ORDER BY sqlts desc;"
632 + args = " Date Videos Grupe (Videos Added in the Last 30 Days)"
633 + print(args, flush=True)
634 + mail += args + '\n'
635 + for cols in conn.execute(sql, (strt,)):
636 + args = (cols['tme'], cols['cnt'], cols['grupe'])
637 + mail += "%5s | %6d | %s\n" % args
638 + print("%5s | %6d | %s" % args, flush=True)
639 + #
640 + # Print the total number for all grupes in last 30 days
641 + #
642 + sql = "SELECT COUNT(*) FROM IPFS_HASH_INDEX WHERE sqlts > ?"
643 + dbObj = conn.cursor().execute(sql, (strt,))
644 + total = " Total: "
645 + total += "%5d" % dbObj.fetchone()[0]
646 + print(total, flush=True)
647 + mail += total
648 + #
649 + # Report the files downloaded today as grupes, titles & IPFS URLs
650 + #
651 + urls = ""
652 + sql = "SELECT grupe, title, vhash "
653 + sql += "FROM IPFS_HASH_INDEX "
654 + sql += "WHERE DATE(sqlts) = DATE('now', 'localtime', 'start of day');"
655 + rows = (conn.cursor().execute(sql)).fetchall()
656 + if len(rows) > 0:
657 + args = "\nIPFS URLs for files downloaded today:"
658 + urls = args + '\n'
659 + print(args, flush=True)
660 + for col in rows:
661 + args = (col['grupe'], col['title'][:48], col['vhash'])
662 + text = "%13s | %48s | https://ipfs.io/ipfs/%s" % args
663 + urls += text + '\n'
664 + print(text, flush=True)
665 + else:
666 + args = "\nRun complete. No files downloaded today."
667 + print(args, flush=True)
668 + mail += args
669 + return mail, urls
670 +
671 + # Send a plain text message via email to recipient(s). Retry if necessary
672 + def emailResults(self, server, account, subject, origin, to, text):
673 + msg = EmailMessage() # Create a text/plain container
674 + msg.set_content(text)
675 + msg['Subject'] = subject
676 + msg['From'] = origin
677 + msg['To'] = to
678 +
679 + # tx-server: /usr/local/lib/python3.8/site-packages/certifi/cacert.pem
680 + # context = ssl._create_unverified_context()
681 + # with smtplib.SMTP_SSL(server[0], server[1], context=context) as emailer:
682 + # emailer.login(account[0], account[1])
683 + # emailer.send_message(msg)
684 + def sendMail(srvr, acct, mesg):
685 + try:
686 + context = ssl.create_default_context()
687 + context.check_hostname = False
688 + context.verify_mode = ssl.CERT_NONE
689 + with smtplib.SMTP(srvr[0], srvr[1]) as emailer:
690 + emailer.starttls(context=context)
691 + emailer.login(acct[0], acct[1])
692 + emailer.send_message(mesg)
693 + except Exception as e:
694 + return self.decodeException(e)
695 + return 0 # No Exception, so mesg was sent successfully
696 +
697 + for cnt in range(1, EMAIL_RETRIES):
698 + unsent = sendMail(server, account, msg)
699 + if unsent:
700 + print(f"Attempt {cnt}: {unsent}", flush=True)
701 + time.sleep(EMAIL_DELAY)
702 + else: break # We're done if message sent
703 +
704 + ##############################################################################
705 + # Get command line arguments. Returns a tuple with config and DB connection. #
706 + # Usage: thisFile [-h] | <-c config> <-d sqlite> #
707 + # #
708 + # Parse command line and report config info. Prints usage and exists if args #
709 + # are invalid or missing. mff is the metadataFields.json file to load #
710 + ##############################################################################
711 + def getCmdLineArgs(self):
712 + if len(sys.argv) >= 5:
713 + sqlDBfile = self.Config = conn = None
714 + grupe = grupes = urls = 0
715 +
716 + # Required parameter: -c config file
717 + if sys.argv[1] == "-c" and os.path.isfile(sys.argv[2]):
718 + with open(sys.argv[2], 'r') as jsn:
719 + self.Config = json.load(jsn)
720 + meta = self.Config['DLbase'] + self.Config['DLmeta']
721 + if len(meta) > 0: # Did config info load?
722 + with open(meta, 'r') as jsn: # Now load meta fields
723 + self.Config['MetaColumns'] = json.load(jsn)['MetaColumns']
724 + metaSize = len(self.Config['MetaColumns'])
725 + if metaSize > 0: # All config info loaded OK?
726 + for grupe in self.Config['Grupes']: # Count groups and urls in them
727 + grupes += 1
728 + urls += len( self.Config['Grupes'][grupe]['urls'] )
729 + print("Database Metadata Columns=%d" % metaSize, flush=True)
730 + print("Downloaded groups will be saved in %s" % self.Config['DLbase'], flush=True)
731 + print("%d groups, %d urls to process" % (grupes, urls), flush=True)
732 + else: self.usage()
733 + else: self.usage()
734 +
735 + # Required parameter: -d SQLite database file
736 + if sys.argv[3] == "-d":
737 + sqlDBfile = sys.argv[4]
738 + conn = self.openSQLiteDB(sqlDBfile)
739 + conn.row_factory = sqlite3.Row # Results as python dictionary
740 + if conn == None: self.usage()
741 +
742 + # Optional parameter: -g grupe from config to use
743 + if len(sys.argv) >= 6 and sys.argv[5] == "-g":
744 + print(f"Ignoring all grupes in config except {grupe}", flush=True)
745 + for grupe in self.Config['Grupes']: # Mark all inactive except 1
746 + if grupe == sys.argv[6]: continue
747 + self.Config['Grupes'][grupe]['Active'] = False
748 +
749 + if not os.path.isdir(self.Config['DLbase']): # Create folder for results
750 + os.mkdir(self.Config['DLbase']) # if necessary
751 +
752 + return self.Config, conn, sqlDBfile # Return essential information
753 + else: self.usage()
754 +
755 + ##############################################################################
756 + # Primary starting point for script #
757 + # ############################################################################
758 + def runScript(self, sqlFile, conn):
759 + hash = None
760 + if sqlFile is None or conn is None:
761 + self.Config, conn, sqlFile = self.getCmdLineArgs() # Open config, DB
762 +
763 + #regenerateAllGrupeIndexes(conn) # Fix all grupe indexes
764 + #exit(0)
765 +
766 + # Command line and config file processed, time to get down to it
767 + good, fails = self.ytdlProcess(conn)
768 +
769 + mail, urls = self.displaySummary(conn)
770 + conn.execute("vacuum;") # Flush changes in this session & make backup
771 + conn.execute("vacuum into 'latest.sqlite';") # Flush all to backup
772 + conn.close() # DBs should be the same now, but do sha256 hashes differ?
773 + # They shouldn't, but updateRunInfo will make them differ!
774 +
775 + # If any downloads were successful, update IPFS with new SQLite file.
776 + # This hash will appear in the IPFS_INFO table published next run.
777 + if good > 0:
778 + hash = self.publishDB("latest.sqlite")
779 + args = f"\nThe newest SQLite DB hash is: {hash}\n"
780 + if STATIC_DB_HASH:
781 + args += "It is always available at:\n"
782 + args += f"https://ipfs.io/ipns/{STATIC_DB_HASH}"
783 + mail += args
784 + print(args + "\n", flush=True)
785 +
786 + # Update IPFS_INFO table in SQL DB with results for this run
787 + # and flush all changes to current database file, not latest.
788 + # Current DB will actually be latest, tho not latest published.
789 + self.updateRunInfo(sqlFile, hash, good, fails)
790 +
791 + if SEND_EMAIL:
792 + self.emailResults(EMAIL_SERVR, EMAIL_LOGIN,
793 + EMAIL_SUB1, EMAIL_FROM, EMAIL_LIST, mail)
794 +
795 + if len(EMAIL_URLS) > 0 and len(urls) > 0:
796 + self.emailResults(EMAIL_SERVR, EMAIL_LOGIN,
797 + EMAIL_SUB2, EMAIL_FROM, EMAIL_URLS, urls)
798 +
799 +
800 +###############################################################################
801 +# main is called only if this file is invoked as a script not an object class #
802 +###############################################################################
803 +if __name__ == "__main__":
804 + mp = MediaProcessor()
805 + mp.runScript(None, None) # Run script using command line parameters
806 + exit(0)
addSingles.pyView
@@ -1,0 +1,90 @@
1 +
2 +import MediaProcessor as mpClass
3 +import sqlite3
4 +import json
5 +import sys
6 +
7 +#
8 +# addSingles.py -- Commad line program to process a list of media URLs and add
9 +# them to IPFS and record their hashes & metadata in a SQLite
10 +# database.
11 +#
12 +
13 +# Configuration template file for MediaGrabber, usually read from a JSON file.
14 +CFG = {
15 + "DLbase": "/home/ipfs/ytDL/", # Base folder for all files:
16 + "DLeLog": "error.log", # Log file appended to DLBase
17 + "DLarch": "youtube-dl.history", # yt_dlp download history id list
18 + "DLmeta": "metadataFields.json", # metadata schema for SQLite database
19 + "DLOpts": { # Download options for all grupes
20 + "retries": 5,
21 + "format": "best",
22 + "continuedl": True,
23 + "ignoreerrors": True,
24 + "merge-output-format": "mp4" # Ignored for non-video media
25 + },
26 +
27 + "Grupes": { # List of grupes (channels, publishers)
28 + "singles": { # Aribtrary name of grupe/channel/publisher/topic
29 + "Quota": 0, # number<space>bytes or number only (file count)
30 + "Active": True, # Process this grupe or ignore flag
31 + "Duration": 0, # Minimum duration or 0 for any length
32 + "Start": None, # Date published string "yyyymmdd"
33 + "End": None, # Range end for date published, "yyyymmdd"
34 + "Stop": 300, # Maximum number of files per playlist URL
35 + "urls": [] # List of URLs or URLs to playists
36 + }
37 + },
38 + "MetaColumns": [] # Will be read from DLmeta JSON file
39 +}
40 +
41 +# MAIN - Let us begin...
42 +mp = mpClass.MediaProcessor()
43 +try:
44 + with open(CFG['DLbase'] + CFG['DLmeta'], 'r') as jsn:
45 + CFG['MetaColumns'] = json.load(jsn)['MetaColumns']
46 +except Exception as e:
47 + if len(CFG['MetaColumns']) == 0:
48 + print(f"Couldn't load the metadata file - Bye!\n\n{e}")
49 + exit(1)
50 +mp.Config = CFG
51 +
52 +#
53 +# Get 3 inputs from user: database file, grupe name, list of URLs
54 +#
55 +db = input("SQLite database file (default is 'ipfsDallas.sqlite'): ")
56 +if db == "": db = "ipfsIndex.sqlite"
57 +try:
58 + conn = mp.openSQLiteDB(db) # Open a SQLite database or create one
59 + conn.row_factory = sqlite3.Row # Results as python dictionary
60 +except Exception as e:
61 + if conn == None:
62 + print(f"An error occurred opening {db}\n\n{e}")
63 + exit(1)
64 +
65 +grupe = input("Grupe name (default is 'singles'): ")
66 +if grupe != "": # Create a new grupe and pull values from default
67 + CFG['Grupes'][grupe] = CFG['Grupes'].pop('singles')
68 +else: grupe = "singles"
69 +
70 +urls = input("Comma separated list of URLs: ")
71 +if urls:
72 + quoted = []
73 + for url in urls.split(','):
74 + url = url.strip()
75 + if not url.lower().startswith("http"):
76 + print(f"Each URL must start with http - Bye!")
77 + exit(-1)
78 + else:
79 + quoted.append(f"{url}")
80 +else: print(f"You must provide at least 1 URL - Bye!")
81 +
82 +CFG['Grupes'][grupe]['urls'] = quoted
83 +
84 +mp.Config = CFG # Set class' download parameters
85 +
86 +yn = input(f"Using DB={db} grupe={grupe} for {len(quoted)} url(s). Proceed? (default is y) ")
87 +if yn == "" or yn == "y" or yn == "Y":
88 + mp.runScript(db, conn)
89 + m, u = mp.displaySummary(conn)
90 + print(f"mail={m}\n\nurls={u}\n")
metadataFields.jsonView
@@ -1,0 +1,64 @@
1 +{
2 + "MetaColumns": [
3 + "_filename",
4 + "abr",
5 + "acodec",
6 + "age_limit",
7 + "album",
8 + "alt_title",
9 + "annotations",
10 + "artist",
11 + "average_rating",
12 + "categories_0",
13 + "channel_id",
14 + "channel_url",
15 + "creator",
16 + "description",
17 + "dislike_count",
18 + "display_id",
19 + "duration",
20 + "end_time",
21 + "episode_number",
22 + "ext",
23 + "extractor",
24 + "extractor_key",
25 + "format",
26 + "format_id",
27 + "format_note",
28 + "fulltitle",
29 + "height",
30 + "http_headers_Accept",
31 + "http_headers_Accept-Charset",
32 + "http_headers_Accept-Encoding",
33 + "http_headers_Accept-Language",
34 + "http_headers_User-Agent",
35 + "id",
36 + "is_live",
37 + "license",
38 + "like_count",
39 + "player_url",
40 + "playlist",
41 + "playlist_index",
42 + "protocol",
43 + "release_date",
44 + "release_year",
45 + "season_number",
46 + "series",
47 + "start_time",
48 + "thumbnail",
49 + "thumbnails_0_id",
50 + "thumbnails_0_url",
51 + "title",
52 + "track",
53 + "upload_date",
54 + "uploader",
55 + "uploader_id",
56 + "uploader_url",
57 + "url",
58 + "vcodec",
59 + "view_count",
60 + "webpage_url",
61 + "webpage_url_basename",
62 + "width"
63 + ]
64 +}
mpServerDefinitions.pyView
@@ -1,0 +1,22 @@
1 +#!/usr/bin/python3
2 +
3 +#
4 +# Defines constants for MediaProcessor.py, a class to scrape videos with yt-dlp
5 +#
6 +EMAIL_RETRIES = 3 # How many times to attempt email delivery
7 +EMAIL_DELAY = 60 # Delay between attempts, in seconds
8 +EMAIL_SERVR = ["email_server.tld", 587]
9 +EMAIL_LOGIN = ["emailUser@email_server.tld", "email_server password"]
10 +EMAIL_LIST = ["list of email addresses", "to send results", "comma separated"]
11 +EMAIL_URLS = EMAIL_LIST # A separate email list to only send URLs to
12 +STATIC_DB_HASH = "IPNS hash to publish sqlite database revisions"
13 +DOWNLOAD_IP = "111.222.333.444" # The IP address to use for downloads
14 +IP_ADR_LIST = ["111.222.333.444", "555.666.777.888", "..."]
15 +IP_ADR_INDX = 0
16 +YTDL_PROG = 'yt-dlp'
17 +EMAIL_SUB1 = 'Media Processor Results'
18 +EMAIL_SUB2 = 'URLs for content saved in IPFS'
19 +EMAIL_FROM = "emailUser@email_server.tld"
20 +SEND_EMAIL = True # Send results to email recipients
21 +SEPARATOR = "~^~" # Separates elements of the downloaded file pathname
22 +EXT_LIST = ("webm", "mp4", "mp3", "mkv", "m4v") # Download file extensions

Built with git-ssb-web