git ssb

0+

thomas-dev-patchwork / MediaProcessor



Tree: 762c108c6dc008453fbff099340086a31f6f17a5

Files: 762c108c6dc008453fbff099340086a31f6f17a5 / MediaProcessor.py

39531 bytesRaw
1#!/usr/bin/env python3
2#
3# MediaProcessor.py - Program to process media files and add them to IPFS.
4#
5# A SQLite3 database is used to store IPFS hashes and detailed metadata for
6# each item. This lightweight SQL engine is filesystem based (no server).
7#
8# This script requires 2 JSON formatted config files containing schema column
9# names and other info, like filter settings for video length and upload date.
10#
11# Youtube is known to change the metadata keys, making it difficult to rely on
12# youtube metadata for the schema. After sampling 1000s of videos a common set
13# of metadata fields was arrived at. The metadataFields.json file is currently
14# where these fields / database columns are defined.
15#
16# The metadata provided by youtube-dl extractors is not totally normalized. To
17# insure a database record is saved for every file added to IPFS, an alternate
18# metadata dictionary is used which always sets these 9 essential columns:
19#
20# sqlts, pky, g_idx, grupe, vhash, vsize, season_number, url and _filename
21#
22# and any others from the downloaded metadata whose field names are found in
23# the metadataFields.json file. A substitute metadata dictionary is used when a
24# SQL error occurs while adding a new row. If the 2nd attempt fails it's logged
25# along with the IPFS hash, so a record can be added manually at a later time.
26# Usually the substitution is required because there are missing fields in the
27# metadata provided by some youtube-dl extractors specific to a video source.
28#
29from __future__ import unicode_literals
30from email.message import EmailMessage
31from mpServerDefinitions import * # Server specific CONSTANTS
32from tinytag import TinyTag # Gets meta info from MP3 files
33from yt_dlp import utils
34from datetime import *
35import Exceptions
36import subprocess
37import threading
38import smtplib
39import sqlite3
40import yt_dlp
41import time
42import json
43import ssl
44import sys
45import os
46import re
47
48
49class MediaProcessor:
50
51 def __init__(self):
52 self.SQLrows2Add = [] # Lists populated by download callback threads
53 self.ErrorList = []
54 self.Config = {} # Loaded from config file (JSON file format)
55
56 """ config file template, JSON format. Use single quotes only, null not None:
57 self.Config {
58 "Comment": [ "Unreferenced - for comments inside config file",
59 "It is difficult sometimes to find the video link for Brightcove videos.",
60 "This is the method that I use with Firefox. You will need 2 things:",
61 "a) AccountID",
62 "b) VideoID",
63 "Get these by right-clicking on the video and select Player Information.",
64 "Use ctrl-C to copy the info, and plug them into this template:",
65 "http://players.brightcove.net/<AccountID>/default_default/index.html?videoId=<VideoID>",
66 "Works with Firefox 68.8, 75.0 and probably others as of May 12, 2020"
67 ],
68
69 "DLbase": "dir", Folder for all downloaded files organized by grupe
70 "DLeLoselfg":"file", File for exceptions / errors during downloads
71 "DLarch": "file", This tracks downloads to skip those already done
72 "DLmeta": "file", The metadata definition is now split into this file
73
74 "DLOpts": { Name / value pairs for youtube-dl options
75 "optName1": "value1", NOT always the same as cmd line opts
76 ...
77 },
78
79 "Grupes": { # Dictionary of grupes to download, with its own criteria
80 "gName1": { Group name containing video selection criteria
81 "Active": true, Enable or disable downloads for this grupe
82 "Duration": 0, Min size of video in seconds; for no limits use 0
83 "Quota": null, Limits size of grupe's DL folder to N files
84 "Start": null, Earliest upload date string or (YYYYMMDD) or null
85 "End": null, Latest upload date or null. 1, 2 or neither OK
86 "Stop": null, Stop downloading from playlist after this many DLs
87 "url1",
88 "url2",
89 ...
90 ]
91 },
92 ... Additional grupes
93 },
94
95 "MetaColumns": [ Contains the list of database fields for metadata for
96 ... the video downloaded along with it in JSON format.
97 This section is loaded from a separate file which is
98 specified by the DLbase and DLmeta key defined above.
99 ]
100 }
101 """
102
103 def usage(self):
104 cmd = sys.argv[0]
105 str = "\nUses youtube-dl to download videos and add them to IPFS and track\n"
106 str += "the results in a SQLite database.\n\n"
107 str += "Usage: " + cmd + " [-h] | <-c config> <-d sqlite> [-g grupe]\n"
108 str += "-h or no args print this help message.\n\n"
109 str += "-c is a JSON formated config file that specifies the target groups,\n"
110 str += "their URL(s), downloader options, the base or top level folder for\n"
111 str += "the groups of files downloaded and the list of metadata columns.\n"
112 str += "-d is the SQLite filename (it is created if it doesn't exist).\n\n"
113 str += "-g ignore all but the grupes in config except the one name after -g.\n\n"
114 print(str)
115 exit(0)
116
117 # Return detailed info from the stack trace. Default is to
118 # return only the last item, pass True as 2nd arg for all items.
119 def decodeException(self, e, all=False):
120 trace = []
121 stk = 0
122 tb = e.__traceback__
123 while tb is not None:
124 stk += 1
125 trace.append({
126 "stk": stk,
127 "filename": tb.tb_frame.f_code.co_filename,
128 "function": tb.tb_frame.f_code.co_name,
129 "lineno": tb.tb_lineno
130 })
131 tb = tb.tb_next
132 if not all:
133 trace = trace[-1]
134 del trace["stk"]
135 return {
136 'type': type(e).__name__,
137 'message': str(e),
138 'trace': trace
139 }
140
141 # Flattens a nested JSON object and returns a python dictionary
142 def flatten_json(self, nested_json):
143 out = {}
144 def flatten(x, name=''):
145 if type(x) is dict:
146 for a in x:
147 flatten(x[a], name + a + '_')
148 elif type(x) is list:
149 i = 0
150 for a in x:
151 flatten(a, name + str(i) + '_')
152 i += 1
153 else:
154 out[name[:-1]] = x
155
156 flatten(nested_json)
157 return out
158
159 # Create the SQLite database file if it doesn't exist, using the
160 # MetaColumns from config. If it already exists, open a connection
161 # to it. Always returns a connection object to the dbFile.
162 def openSQLiteDB(self, dbFile):
163 newDatabase = not os.path.exists(dbFile)
164 columns = self.Config['MetaColumns']
165 conn = sqlite3.connect(dbFile)
166 if newDatabase:
167 sql = '''create table if not exists IPFS_INFO (
168 "sqlts" TIMESTAMP NOT NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now', 'localtime')),
169 "pky" INTEGER PRIMARY KEY AUTOINCREMENT,
170 "db_hash" TEXT,
171 "dl_good" INTEGER DEFAULT 0,
172 "dl_errs" INTEGER DEFAULT 0);'''
173 conn.execute(sql)
174
175 sql = '''create table if not exists IPFS_HASH_INDEX (
176 "sqlts" TIMESTAMP NOT NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now', 'localtime')),
177 "pky" INTEGER PRIMARY KEY AUTOINCREMENT,
178 "g_idx" TEXT,
179 "grupe" TEXT,
180 "vhash" TEXT,
181 "vsize" TEXT,
182 "mhash" TEXT'''
183 for c in columns:
184 sql += ',\n\t"' + c + '" TEXT'
185 sql += ')'
186 conn.execute(sql)
187 return conn
188
189 # Pin a file referenced by hash to local IPFS. Returns True if successfull
190 def pin2IPFS(self, hash):
191 cmd = ["ipfs", "pin", "add", hash]
192 out = subprocess.run(cmd, stderr=subprocess.DEVNULL,
193 stdout=subprocess.PIPE).stdout.decode('utf-8')
194 if out.startswith("pinned"): return True
195 else: return False
196
197 # Add a file to IPFS and return the hash for it.
198 # Returns 46 character hash: "Qm....1234567890123456789012345678901234567890"
199 # or a zero length string
200 def add2IPFS(self, file):
201 cmd = ["ipfs", "add", "-Q", file]
202 out = subprocess.run(cmd, stderr=subprocess.DEVNULL,
203 stdout=subprocess.PIPE).stdout.decode('utf-8')
204 if out.startswith("Qm") and len(out) == 47:
205 hash = out[0:46] # Return the 46 character hash - newline
206 else: hash = ""
207 return hash
208
209 # Add the updated SQLite DB to IPFS under the static IPNS name associated with
210 # STATIC_DB_HASH. That way the most recent DB can always be obtained by wget:
211 # https://ipfs.io/ipns/<STATIC_DB_HASH value>.
212 # 2023/01/30: Despite closing the DB conn, it seems now prev DB is published.
213 # I suspect the sqlite import doesn't flush the file on close now, but it did.
214 def publishDB(self, file):
215 newDBhash = self.add2IPFS(file) # Add the updated SQLite database to IPFS
216 if len(newDBhash) == 46:
217 if STATIC_DB_HASH:
218 cmd = ["ipfs", "name", "publish", "-Q", "-key=" + STATIC_DB_HASH, newDBhash]
219 cp = subprocess.run(cmd, capture_output=True, text=True)
220 if cp.returncode != 0:
221 print(f"Error publishing database to IPFS: {cp.stderr}", flush=True)
222 return newDBhash
223
224 # Reopen the SQL DB and update the IPFS_INFO table with info for this run.
225 # The info for this run will not be available in the database published to
226 # IPFS until the next database is published (a chicken & egg scenario) due
227 # because the hash of the published database is in the IPFS_INFO table.
228 def updateRunInfo(self, sqlFile, dbHash, good, errs):
229 conn = self.openSQLiteDB(sqlFile)
230 conn.row_factory = sqlite3.Row # Results as python dictionary
231 sql = '''INSERT INTO IPFS_INFO ("db_hash", "dl_good", "dl_errs")
232 VALUES (?,?,?);'''
233
234 if conn is not None:
235 conn.execute(sql, (dbHash, good, errs))
236 conn.commit()
237 conn.execute('VACUUM;') # Flush to the currently open database file
238 conn.close() # This will be the very last thing done this run
239
240 # Filter the list of URLs provided in config file to skip URLs we've already
241 # DL'd, based on a SQLite query. Filtering is based on the video ID. This is
242 # not of much value b/c most URLs are for a channel or playlist, plus yt-dlp
243 # already provides this function.
244 def filterUrls(self, conn, configUrls):
245 regx = re.compile(r'^.*?watch\?v=([^&]+)&*.*$', re.IGNORECASE)
246 cursor = conn.cursor()
247 filteredUrls = []
248 for url in configUrls:
249 match = re.search(regx, url)
250 if match is not None: # Is this a single video url?
251 id = match.group(1)
252 sql = "SELECT COUNT(*) FROM IPFS_HASH_INDEX WHERE id = ?;"
253 if cursor.execute(sql, [id]).fetchone()[0] == 0:
254 filteredUrls.append(url)
255 else: filteredUrls.append(url)
256 return filteredUrls
257
258 # Create a grupe index file containing a list of all video and metadata IPFS
259 # hashes for the grupe. Add it to IPFS & return the hash and count of rows
260 # updated. -> object ???
261 def updateGrupeIndex(self, conn, grupe):
262 cursor = conn.cursor()
263
264 sql = 'SELECT "v=" || vhash || " " || "m=" || mhash'
265 sql += ' FROM IPFS_HASH_INDEX'
266 sql += ' WHERE grupe = ?'
267 idxFile = f"/tmp/{grupe}_idx.txt"
268 with open(idxFile, "w") as idx:
269 for row in cursor.execute(sql, (grupe,)): # Loop through all grupe rows
270 if row[0]: idx.write(row[0] + '\n')
271 hash = self.add2IPFS(idxFile)
272 if len(hash) > 0:
273 sql = "UPDATE IPFS_HASH_INDEX set g_idx=? WHERE grupe=?"
274 cursor.execute(sql, (hash, grupe))
275 conn.commit()
276 os.remove(idxFile)
277 return cursor.rowcount, hash
278
279 # This block of code will create group index files for every group in DB,
280 # then add that file to IPFS. Update every row in the group with that hash,
281 # and do that for every grupe, so every row in IPFS_HASH_INDEX table gets
282 # updated. See "updateGrupeIndex" above for details. This just wraps that.
283 def regenerateAllGrupeIndexes(self, conn):
284 cursor = conn.cursor()
285 sql = "sSELECT DISTINCT grupe FROM IPFS_HASH_INDEX"
286 for row in cursor.execute(sql):
287 (count, hash) = self.updateGrupeIndex(conn, row[0])
288 print("Updated %d rows for grupe %s with grupe index hash %s" %
289 (count, row[0], hash), flush=True)
290
291 # Add a row to SQLite database. Most of the column data is from the JSON
292 # metadata (gvmjList) downloaded with the video. Note that SQLite is not
293 # thread safe, so only the main thread updates the DB.
294 def addRow2db(self, conn, cols, gvmjList):
295 (grupe, vhash, vsize, mhash, jsn) = gvmjList
296 cursor = conn.cursor()
297 jsn["episode_number"] = mhash # Mark row as pinned by adding the hashes
298 jsn["season_number"] = vhash # to these fields
299 values = [grupe, vhash, vsize, mhash] # 1st 4 values not in YT metadata
300 sql = 'INSERT INTO IPFS_HASH_INDEX ("grupe", "vhash", "vsize", "mhash"'
301
302 for col in cols:
303 sql += ',\n\t"' + col + '"'
304
305 sql += ") VALUES (?,?,?,?" # Now add metadata
306 for col in cols:
307 sql += ",?"
308 values.append(jsn[col])
309 sql += "); "
310 cursor.execute(sql, values)
311 conn.commit()
312 return cursor.lastrowid
313
314 # This wrapper method calls the addRow2db method above and upon failure makes
315 # a 2nd attempt to insert the row with an alternate metadata dictionary. The
316 # alternate dictionary is created by copying the valid metadata into the new
317 # alternate and adding missing keys, setting their missing values to "?".
318 #
319 # The ytdl extractors vary as to the format of the metadata they produce,
320 # youtube-dl doesn't totally normalize it. If a video file was downloaded and
321 # an IPFS hash was produced a row will be added with sqlts, pky, g_idx, grupe,
322 # vhash, vsize, season_number and _filename columns that have known valid data.
323 def addRow(self, conn, cols, gvmjList):
324 try:
325 row = self.addRow2db(conn, cols, gvmjList) # Attempt number one...
326
327 # On failure create a new metadata dictionary for this file. For any
328 # missing keys, create a key whose value is "?". This is a work-
329 # around for JSON metadata fields the extractor doesn't provide.
330 except (sqlite3.OperationalError, KeyError) as e:
331 newDictionary = {}
332 (grp, vhash, vsize, mhash, jsn) = gvmjList
333 for col in cols:
334 if col in jsn.keys():
335 newDictionary[col] = jsn[col]
336 else: newDictionary[col] = "?" # Previously "unknown-value"
337
338 # Try again. Any exception this time will propagate upstream
339 row = self.addRow2db(conn, cols, (grp, vhash, vsize, mhash, newDictionary))
340 return row
341
342 # Add a row to the SQLite database for every video downloaded for this grupe,
343 # print the successes and failures and log the failures to the error log file.
344 # NOTE: Reduced printed output for Pirate Stick for leaner reporting.
345 def processGrupeResults(self, conn, cols, items, grupe, eLog):
346 downloads = len(self.SQLrows2Add)
347 good = 0
348
349 if downloads > 0:
350 for dat in self.SQLrows2Add: # dat = (grp, vhash, vSize, mhash, json)
351 try:
352 self.addRow(conn, cols, dat)
353 good += 1 # Sucessfully added to SQLite
354
355 # Failed to add the row to SQLite, but it's saved in IPFS
356 except Exception as expn:
357 args = (dat[0], dat[1], dat[2], dat[3], dat[4], decodeException(expn))
358 er = "SQL Error! Grupe=%s vHash=%s vSize=%s, mHash=%s\n\nJSON=%s\n%s" % args
359 er += "\nMetadata key/values used:\n"
360 json = dat[4]
361 for key in json:
362 er += "%32s = %s\n" % (key, json[key])
363 self.ErrorList.append(er)
364
365 self.updateGrupeIndex(conn, grupe)
366
367 # Print and log the list of download failures
368 failures = len(self.ErrorList)
369 if len(self.ErrorList) > 0:
370 eLog.write("PROCESSING ERRORS FOR GRUPE=%s:\n" % grupe)
371 for error in self.ErrorList:
372 eLog.write(error + '\n')
373 eLog.write("END OF ERRORS FOR %s\n\n" % grupe)
374
375 args = (items, downloads, failures)
376 print("Processed=%d (Succeeded=%d, Failed=%d)" % args, flush=True)
377 return good, failures
378
379 # Used to determine if folder size limit has been exceeded. NOT recursive
380 def getSize(self, path):
381 totalSize = 0
382 for f in os.listdir(path):
383 fp = os.path.join(path, f)
384 totalSize += os.path.getsize(fp)
385 return totalSize
386
387 # Check if the download folder for this grupe is over the quota (if any) and
388 # remove the oldest file if it is. The quota is the maximum number of files
389 # or the maximum amount of space to limit the folder to. Quota is a string of
390 # integers followed by whitespace & unit string value. If no unit designation
391 # is specified the quota is the amount of space used in bytes. When the limit
392 # is exceeded the oldest files are removed to make room. .json and .wav files
393 # aren't counted in a file count quota, but they are for folder space quotas.
394 # Removals always remove all files of the same name regardless of extension,
395 # HOWEVER, wildcard replacement occurs after the 1st . on the left. Also note
396 # that pruning will never remove the last remaining file.
397 def pruneDir(self, quota, dir):
398 max = count = 0
399 fList = []
400
401 if quota: # Do nothing if no quota specified
402 q = quota.split(' ') # Quota amount and units, if any
403 if q[0].isdecimal(): # Check if string is a valid number
404 max = int(q[0]) # This is the quota limit
405 if max < 2: # Invalid quota value, zero removed
406 err = "Invalid quota: " + dir
407 self.ErrorList.append(err) # Log the error
408 return False
409
410 for f in os.listdir(dir): # Create a list of candidate files
411 if f.endswith(EXT_LIST): # Only include primary video files
412 fList.append(dir + '/' + f) # Prefix the file with path
413 count += 1 # Count how many in the list
414 if count < 2: return False # We're done if none or only 1
415
416 old = min(fList, key=os.path.getctime) # Get oldest file
417
418 if len(q) > 1: size = 0 # Quota limits number of files
419 else: size = self.getSize(dir) # Quota limits space used
420 if count > max or size > max: # Over the quota?
421 rm = old.rsplit('.')[0] + ".*" # Replace extension with a wildcard
422 os.system("rm -rf %s" % rm) # Easy way to do all related files
423 return True # Oldest file removed
424 else: return False
425
426 # Rename filename with numeric value for media duration
427 def fixDuration(self, name, seconds=0):
428 duration = str(int(float(seconds)))
429 noDur = SEPARATOR + "NA" + SEPARATOR # SEPARATOR - see serverDefs import
430 newDur = SEPARATOR + duration + SEPARATOR
431 zeroDur = SEPARATOR + "0" + SEPARATOR
432 if noDur in name:
433 newName = name.replace(noDur, newDur)
434 elif zeroDur in name:
435 newName = name.replace(zeroDur, newDur)
436 else:
437 newName = name
438 if newName != name: os.rename(name, newName)
439 return newName
440
441 # Return info provided within the downloaded filename
442 def parseFilename(self, file):
443 dir, base = file.rsplit('/', 1) # Separate grupe folder & downloaded file
444 grp = dir.rsplit('/', 1)[1] # Extract grupe from full path
445 pb, ext = os.path.splitext(file) # Path+Base in [0], extension in [1]
446 mFile = pb + ".info.json" # json metadata file for this download
447 vFile = file # Full pathname of downloaded file
448 return [dir, base, grp, pb, ext, vFile, mFile]
449
450 # This method is a process thread started with each successful download. It
451 # is started as a daemon thread to add the video and its' metadata to IPFS,
452 # and creates lists for errors and the files downloaded (to update SQLite).
453 def processVideo(self, file):
454 vHash = mHash = jFlat = vSize = duration = None
455
456 print(f"\nProcessing video file: {file}", flush=True)
457 p = self.parseFilename(file)
458 dir,base,grp,pb,ext,vFile,mFile = p # Get all the values from full pathname
459
460 # The grupe quota limits the size of the download folder. It's a string
461 # containing an integer with a space followed by an optional units word.
462 quota = self.Config["Grupes"][grp]["Quota"] # i.e. "20 files" or "2500000000"
463 pruned = False
464 while self.pruneDir(quota, dir): # Keep pruning until under quota
465 time.sleep(0.01)
466 pruned = True
467 if pruned: self.ErrorList.append("WARNING: Folder limit reached and pruned!")
468
469 # Process the metadata JSON file first, as we may need information from it
470 try:
471 with open(mFile, 'r') as jsn: # Read the entire JSON metadata file
472 jDict = json.load(jsn) # Create a python dictionary from it
473 jFlat = self.flatten_json(jDict) # Flatten the dictionary
474 id3 = TinyTag.get(file)
475 if id3.year: jFlat["release_year"] = id3.year
476 if id3.title: jFlat["title"] = id3.title
477 if id3.artist: jFlat["artist"] = id3.artist
478 if id3.duration: duration = id3.duration
479 if not duration: duration = 0 # No duration from yt-dlp or id3
480 jFlat["duration"] = duration # At least make it's numeric, and
481 mf = self.fixDuration(mFile, duration) # rename both video & metadata
482 vf = self.fixDuration(file, duration) # files too
483 p = self.parseFilename(vf) # Reparse for name changes
484 mHash = self.add2IPFS(mf) # Add the metadata file to IPFS
485 except Exception as e:
486 er = self.decodeException(e)
487 self.ErrorList.append(f"Metadata problem {file}:\n{er}")
488
489 # Log all errors, but add to SQLite if we got a valid video hash from IPFS
490 try:
491 dir,base,grp,pb,ext,vFile,mFile = p # Retrieve latest values
492 vHash = self.add2IPFS(vFile) # Add video file to IPFS
493 if len(vHash) == 46 and jFlat:
494 vSize = os.path.getsize(vFile) # Get file size for database
495 jFlat['_filename'] = base # Just use the base filename
496 if os.path.exists(vFile): # Delete once it's in IPFS
497 os.remove(vFile) # Remove only the video file
498 else: raise IPFSexception
499
500 except Exception as e: # Log all errors that may occur
501 er = self.decodeException(e)
502 args = (grp, vHash, mHash, base, er)
503 self.ErrorList.append("Grupe=%s vHash=%s mHash=%s vFile=%s\n%s" % args)
504
505 # If vHash is valid create a SQLite entry for it, regardless of metadata
506 finally:
507 # print(f"g={base} v={vSize} jLen={len(jFlat)} j={type(jFlat)}", flush=True)
508 if vHash.startswith("Qm"):
509 self.SQLrows2Add.append([grp, vHash, vSize, mHash, jFlat])
510
511 # Starts a daemon thread to process the downloaded file. youtube-dl provides no
512 # way to obtain information about the ffmpeg post processor, and adding to IPFS
513 # can be time consuming. Using threads to handle files allows the main thread
514 # 2 download other files concurrently with IPFS additions. See the processVideo
515 # function above for specifics of how downloaded files are processed.
516 def callback(self, d):
517 if d['status'] == 'finished':
518 path = d['filename'] # Callback provides full pathname
519 nam = path.rsplit('/', 1)[1].split("~^~", 1)[0]
520 th = threading.Thread(name=nam, target=self.processVideo,
521 args=([path]), daemon=True)
522 th.start() # Start the thread and continue
523
524 ##############################################################################
525 # #
526 # Primary program loop. The youtube-dl library takes care of downloading. #
527 # The callback function above processes each download, adding files to IPFS #
528 # and creating a list of rows to add to the SQLite DB by this function. #
529 # #
530 ##############################################################################
531 def ytdlProcess(self, conn):
532 sep = SEPARATOR
533 cols = self.Config['MetaColumns']
534 dlBase = self.Config['DLbase']
535 dlArch = dlBase + self.Config['DLarch']
536 dlElog = dlBase + self.Config['DLeLog']
537 dlOpts = self.Config['DLOpts']
538 grupeList = self.Config['Grupes']
539 total = 0
540 failures = 0
541 # NOTE: items missing from downloaded metadata will be replaced with "?"
542 # Media duration is important for sorting files into schedules.
543 ytdlFileFormat = "/%(id)s" + sep + "%(duration)s"+ sep + ".%(ext)s"
544
545 dlOpts['ignoreerrors'] = True # Until can catch from yt-dlp
546 dlOpts['downloader'] = "aria2c" # Fast downloader
547 dlOpts['downloader-args'] = "aria2c:'-c -j 3 -x 3 -s 3 -k 1M'"
548# dlOpts['verbose'] = True # Useful for debugging
549
550 # Add crucial download options
551 # dlOpts['force-ipv6'] = True # May not be enabled on host
552 dlOpts['writeinfojson'] = True
553 dlOpts['progress_hooks']=[self.callback] # Called at least 1ce per video
554 dlOpts['download_archive'] = dlArch # Facilitates updates w/o dupes
555 dlOpts['restrictfilenames'] = True # Required format for DLd files
556 eLog = open(dlElog, mode='a+') # Error log file for all grupes
557 for grupe in grupeList:
558 if not grupeList[grupe]['Active']: continue # Skip this grupe
559 self.SQLrows2Add = [] # Empty the list of downloads
560 self.ErrorList = [] # Empty the list of errors
561 print("\nBEGIN " + grupe, flush=True) # Marks start of group in log
562
563 if not os.path.isdir(dlBase+grupe): # If it doesn't exist
564 os.mkdir(dlBase + grupe) # create folder 4 grupe
565
566 # Add qualifier for minimum video duration (in seconds)
567 dur = grupeList[grupe]['Duration']
568 if dur != None and dur > 0:
569 dur = "duration > %d" % dur
570 dlOpts['match_filter'] = utils.match_filter_func(dur)
571 elif 'match_filter' in dlOpts.keys():
572 del dlOpts['match_filter'] # No duration filter
573
574 # Add release date range qualifier; either one or both are OK
575 sd = grupeList[grupe]['Start'] # null or YYYYMMDD format
576 ed = grupeList[grupe]['End'] # in JSON config file
577 if sd != None or ed != None:
578 dr = utils.DateRange(sd, ed) # Dates are inclusive
579 dlOpts['daterange'] = dr # Always set a date range
580 elif 'daterange' in dlOpts.keys():
581 del dlOpts['daterange'] # No date filter
582
583 # This stops downloading from playlist after this many videos
584 stop = grupeList[grupe]['Stop']
585 if stop != None and stop > 0: dlOpts['playlistend'] = stop
586 elif 'playlistend' in dlOpts.keys():
587 del dlOpts['playlistend'] # No playlist limit
588
589 # This will change downloaded file folder for each grupe
590 dlOpts['outtmpl'] = dlBase + grupe + ytdlFileFormat
591
592 urls = grupeList[grupe]['urls']
593 # Don't even try downloading videos we already have in the DB
594 newUrls = urls # self.filterUrls(conn, urls)
595
596 yt_dlp.YoutubeDL(dlOpts).download(newUrls) # BEGIN DOWNLOADING!!!
597 print(f"YOUTUBE-DL PROCESSING COMPLETE for {grupe}", flush=True)
598
599 # Wait for all callback threads to finish
600 for th in threading.enumerate():
601 if th.name != "MainThread":
602 while th.is_alive(): time.sleep(0.1)
603
604 # Log errors and print results of this DL grupe
605 good, fails = self.processGrupeResults(conn, cols,
606 len(urls), grupe, eLog)
607 total += good
608 failures += fails # Accumulate totals for this run
609
610 eLog.close()
611 return total, failures
612
613 #
614 # Display a summary of this download session. Return them for emailing.
615 #
616 def displaySummary(self, conn):
617 #
618 # Print the total number of files in the DB since it was created
619 #
620 sql = "SELECT COUNT(*), MIN(sqlts) FROM IPFS_HASH_INDEX;"
621 cols = (conn.cursor().execute(sql)).fetchone() # 1 row, 2 columns
622 total = f"\n{cols[0]} files downloaded and indexed since {cols[1][0:10]}"
623 print(total, flush=True)
624 mail = total + '\n'
625 #
626 # Report the number of files added to each grupe in the last 30 days
627 #
628 strt = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
629 sql = "SELECT DISTINCT SUBSTR(max(sqlts), 6, 5) as tme, grupe, count(*) as cnt "
630 sql += "FROM IPFS_HASH_INDEX WHERE sqlts > ? "
631 sql += "GROUP BY grupe ORDER BY sqlts desc;"
632 args = " Date Videos Grupe (Videos Added in the Last 30 Days)"
633 print(args, flush=True)
634 mail += args + '\n'
635 for cols in conn.execute(sql, (strt,)):
636 args = (cols['tme'], cols['cnt'], cols['grupe'])
637 mail += "%5s | %6d | %s\n" % args
638 print("%5s | %6d | %s" % args, flush=True)
639 #
640 # Print the total number for all grupes in last 30 days
641 #
642 sql = "SELECT COUNT(*) FROM IPFS_HASH_INDEX WHERE sqlts > ?"
643 dbObj = conn.cursor().execute(sql, (strt,))
644 total = " Total: "
645 total += "%5d" % dbObj.fetchone()[0]
646 print(total, flush=True)
647 mail += total
648 #
649 # Report the files downloaded today as grupes, titles & IPFS URLs
650 #
651 urls = ""
652 sql = "SELECT grupe, title, vhash "
653 sql += "FROM IPFS_HASH_INDEX "
654 sql += "WHERE DATE(sqlts) = DATE('now', 'localtime', 'start of day');"
655 rows = (conn.cursor().execute(sql)).fetchall()
656 if len(rows) > 0:
657 args = "\nIPFS URLs for files downloaded today:"
658 urls = args + '\n'
659 print(args, flush=True)
660 for col in rows:
661 args = (col['grupe'], col['title'][:48], col['vhash'])
662 text = "%13s | %48s | https://ipfs.io/ipfs/%s" % args
663 urls += text + '\n'
664 print(text, flush=True)
665 else:
666 args = "\nRun complete. No files downloaded today."
667 print(args, flush=True)
668 mail += args
669 return mail, urls
670
671 # Send a plain text message via email to recipient(s). Retry if necessary
672 def emailResults(self, server, account, subject, origin, to, text):
673 msg = EmailMessage() # Create a text/plain container
674 msg.set_content(text)
675 msg['Subject'] = subject
676 msg['From'] = origin
677 msg['To'] = to
678
679 # tx-server: /usr/local/lib/python3.8/site-packages/certifi/cacert.pem
680 # context = ssl._create_unverified_context()
681 # with smtplib.SMTP_SSL(server[0], server[1], context=context) as emailer:
682 # emailer.login(account[0], account[1])
683 # emailer.send_message(msg)
684 def sendMail(srvr, acct, mesg):
685 try:
686 context = ssl.create_default_context()
687 context.check_hostname = False
688 context.verify_mode = ssl.CERT_NONE
689 with smtplib.SMTP(srvr[0], srvr[1]) as emailer:
690 emailer.starttls(context=context)
691 emailer.login(acct[0], acct[1])
692 emailer.send_message(mesg)
693 except Exception as e:
694 return self.decodeException(e)
695 return 0 # No Exception, so mesg was sent successfully
696
697 for cnt in range(1, EMAIL_RETRIES):
698 unsent = sendMail(server, account, msg)
699 if unsent:
700 print(f"Attempt {cnt}: {unsent}", flush=True)
701 time.sleep(EMAIL_DELAY)
702 else: break # We're done if message sent
703
704 ##############################################################################
705 # Get command line arguments. Returns a tuple with config and DB connection. #
706 # Usage: thisFile [-h] | <-c config> <-d sqlite> #
707 # #
708 # Parse command line and report config info. Prints usage and exists if args #
709 # are invalid or missing. mff is the metadataFields.json file to load #
710 ##############################################################################
711 def getCmdLineArgs(self):
712 if len(sys.argv) >= 5:
713 sqlDBfile = self.Config = conn = None
714 grupe = grupes = urls = 0
715
716 # Required parameter: -c config file
717 if sys.argv[1] == "-c" and os.path.isfile(sys.argv[2]):
718 with open(sys.argv[2], 'r') as jsn:
719 self.Config = json.load(jsn)
720 meta = self.Config['DLbase'] + self.Config['DLmeta']
721 if len(meta) > 0: # Did config info load?
722 with open(meta, 'r') as jsn: # Now load meta fields
723 self.Config['MetaColumns'] = json.load(jsn)['MetaColumns']
724 metaSize = len(self.Config['MetaColumns'])
725 if metaSize > 0: # All config info loaded OK?
726 for grupe in self.Config['Grupes']: # Count groups and urls in them
727 grupes += 1
728 urls += len( self.Config['Grupes'][grupe]['urls'] )
729 print("Database Metadata Columns=%d" % metaSize, flush=True)
730 print("Downloaded groups will be saved in %s" % self.Config['DLbase'], flush=True)
731 print("%d groups, %d urls to process" % (grupes, urls), flush=True)
732 else: self.usage()
733 else: self.usage()
734
735 # Required parameter: -d SQLite database file
736 if sys.argv[3] == "-d":
737 sqlDBfile = sys.argv[4]
738 conn = self.openSQLiteDB(sqlDBfile)
739 conn.row_factory = sqlite3.Row # Results as python dictionary
740 if conn == None: self.usage()
741
742 # Optional parameter: -g grupe from config to use
743 if len(sys.argv) >= 6 and sys.argv[5] == "-g":
744 print(f"Ignoring all grupes in config except {grupe}", flush=True)
745 for grupe in self.Config['Grupes']: # Mark all inactive except 1
746 if grupe == sys.argv[6]: continue
747 self.Config['Grupes'][grupe]['Active'] = False
748
749 if not os.path.isdir(self.Config['DLbase']): # Create folder for results
750 os.mkdir(self.Config['DLbase']) # if necessary
751
752 return self.Config, conn, sqlDBfile # Return essential information
753 else: self.usage()
754
755 ##############################################################################
756 # Primary starting point for script #
757 # ############################################################################
758 def runScript(self, sqlFile, conn):
759 hash = None
760 if sqlFile is None or conn is None:
761 self.Config, conn, sqlFile = self.getCmdLineArgs() # Open config, DB
762
763 #regenerateAllGrupeIndexes(conn) # Fix all grupe indexes
764 #exit(0)
765
766 # Command line and config file processed, time to get down to it
767 good, fails = self.ytdlProcess(conn)
768
769 mail, urls = self.displaySummary(conn)
770 conn.execute("vacuum;") # Flush changes in this session & make backup
771 conn.execute("vacuum into 'latest.sqlite';") # Flush all to backup
772 conn.close() # DBs should be the same now, but do sha256 hashes differ?
773 # They shouldn't, but updateRunInfo will make them differ!
774
775 # If any downloads were successful, update IPFS with new SQLite file.
776 # This hash will appear in the IPFS_INFO table published next run.
777 if good > 0:
778 hash = self.publishDB("latest.sqlite")
779 args = f"\nThe newest SQLite DB hash is: {hash}\n"
780 if STATIC_DB_HASH:
781 args += "It is always available at:\n"
782 args += f"https://ipfs.io/ipns/{STATIC_DB_HASH}"
783 mail += args
784 print(args + "\n", flush=True)
785
786 # Update IPFS_INFO table in SQL DB with results for this run
787 # and flush all changes to current database file, not latest.
788 # Current DB will actually be latest, tho not latest published.
789 self.updateRunInfo(sqlFile, hash, good, fails)
790
791 if SEND_EMAIL:
792 self.emailResults(EMAIL_SERVR, EMAIL_LOGIN,
793 EMAIL_SUB1, EMAIL_FROM, EMAIL_LIST, mail)
794
795 if len(EMAIL_URLS) > 0 and len(urls) > 0:
796 self.emailResults(EMAIL_SERVR, EMAIL_LOGIN,
797 EMAIL_SUB2, EMAIL_FROM, EMAIL_URLS, urls)
798
799
800###############################################################################
801# main is called only if this file is invoked as a script not an object class #
802###############################################################################
803if __name__ == "__main__":
804 mp = MediaProcessor()
805 mp.runScript(None, None) # Run script using command line parameters
806 exit(0)
807

Built with git-ssb-web