#!/usr/bin/env python3 # # MediaProcessor.py - Program to process media files and add them to IPFS. # # A SQLite3 database is used to store IPFS hashes and detailed metadata for # each item. This lightweight SQL engine is filesystem based (no server). # # This script requires 2 JSON formatted config files containing schema column # names and other info, like filter settings for video length and upload date. # # Youtube is known to change the metadata keys, making it difficult to rely on # youtube metadata for the schema. After sampling 1000s of videos a common set # of metadata fields was arrived at. The metadataFields.json file is currently # where these fields / database columns are defined. # # The metadata provided by youtube-dl extractors is not totally normalized. To # insure a database record is saved for every file added to IPFS, an alternate # metadata dictionary is used which always sets these 9 essential columns: # # sqlts, pky, g_idx, grupe, vhash, vsize, season_number, url and _filename # # and any others from the downloaded metadata whose field names are found in # the metadataFields.json file. A substitute metadata dictionary is used when a # SQL error occurs while adding a new row. If the 2nd attempt fails it's logged # along with the IPFS hash, so a record can be added manually at a later time. # Usually the substitution is required because there are missing fields in the # metadata provided by some youtube-dl extractors specific to a video source. # from __future__ import unicode_literals from email.message import EmailMessage from mpServerDefinitions import * # Server specific CONSTANTS from tinytag import TinyTag # Gets meta info from MP3 files from yt_dlp import utils from datetime import * import Exceptions import subprocess import threading import smtplib import sqlite3 import yt_dlp import time import json import ssl import sys import os import re class MediaProcessor: def __init__(self): self.SQLrows2Add = [] # Lists populated by download callback threads self.ErrorList = [] self.Config = {} # Loaded from config file (JSON file format) """ config file template, JSON format. Use single quotes only, null not None: self.Config { "Comment": [ "Unreferenced - for comments inside config file", "It is difficult sometimes to find the video link for Brightcove videos.", "This is the method that I use with Firefox. You will need 2 things:", "a) AccountID", "b) VideoID", "Get these by right-clicking on the video and select Player Information.", "Use ctrl-C to copy the info, and plug them into this template:", "http://players.brightcove.net//default_default/index.html?videoId=", "Works with Firefox 68.8, 75.0 and probably others as of May 12, 2020" ], "DLbase": "dir", Folder for all downloaded files organized by grupe "DLeLoselfg":"file", File for exceptions / errors during downloads "DLarch": "file", This tracks downloads to skip those already done "DLmeta": "file", The metadata definition is now split into this file "DLOpts": { Name / value pairs for youtube-dl options "optName1": "value1", NOT always the same as cmd line opts ... }, "Grupes": { # Dictionary of grupes to download, with its own criteria "gName1": { Group name containing video selection criteria "Active": true, Enable or disable downloads for this grupe "Duration": 0, Min size of video in seconds; for no limits use 0 "Quota": null, Limits size of grupe's DL folder to N files "Start": null, Earliest upload date string or (YYYYMMDD) or null "End": null, Latest upload date or null. 1, 2 or neither OK "Stop": null, Stop downloading from playlist after this many DLs "url1", "url2", ... ] }, ... Additional grupes }, "MetaColumns": [ Contains the list of database fields for metadata for ... the video downloaded along with it in JSON format. This section is loaded from a separate file which is specified by the DLbase and DLmeta key defined above. ] } """ def usage(self): cmd = sys.argv[0] str = "\nUses youtube-dl to download videos and add them to IPFS and track\n" str += "the results in a SQLite database.\n\n" str += "Usage: " + cmd + " [-h] | <-c config> <-d sqlite> [-g grupe]\n" str += "-h or no args print this help message.\n\n" str += "-c is a JSON formated config file that specifies the target groups,\n" str += "their URL(s), downloader options, the base or top level folder for\n" str += "the groups of files downloaded and the list of metadata columns.\n" str += "-d is the SQLite filename (it is created if it doesn't exist).\n\n" str += "-g ignore all but the grupes in config except the one name after -g.\n\n" print(str) exit(0) # Return detailed info from the stack trace. Default is to # return only the last item, pass True as 2nd arg for all items. def decodeException(self, e, all=False): trace = [] stk = 0 tb = e.__traceback__ while tb is not None: stk += 1 trace.append({ "stk": stk, "filename": tb.tb_frame.f_code.co_filename, "function": tb.tb_frame.f_code.co_name, "lineno": tb.tb_lineno }) tb = tb.tb_next if not all: trace = trace[-1] del trace["stk"] return { 'type': type(e).__name__, 'message': str(e), 'trace': trace } # Flattens a nested JSON object and returns a python dictionary def flatten_json(self, nested_json): out = {} def flatten(x, name=''): if type(x) is dict: for a in x: flatten(x[a], name + a + '_') elif type(x) is list: i = 0 for a in x: flatten(a, name + str(i) + '_') i += 1 else: out[name[:-1]] = x flatten(nested_json) return out # Create the SQLite database file if it doesn't exist, using the # MetaColumns from config. If it already exists, open a connection # to it. Always returns a connection object to the dbFile. def openSQLiteDB(self, dbFile): newDatabase = not os.path.exists(dbFile) columns = self.Config['MetaColumns'] conn = sqlite3.connect(dbFile) if newDatabase: sql = '''create table if not exists IPFS_INFO ( "sqlts" TIMESTAMP NOT NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now', 'localtime')), "pky" INTEGER PRIMARY KEY AUTOINCREMENT, "db_hash" TEXT, "dl_good" INTEGER DEFAULT 0, "dl_errs" INTEGER DEFAULT 0);''' conn.execute(sql) sql = '''create table if not exists IPFS_HASH_INDEX ( "sqlts" TIMESTAMP NOT NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now', 'localtime')), "pky" INTEGER PRIMARY KEY AUTOINCREMENT, "g_idx" TEXT, "grupe" TEXT, "vhash" TEXT, "vsize" TEXT, "mhash" TEXT''' for c in columns: sql += ',\n\t"' + c + '" TEXT' sql += ')' conn.execute(sql) return conn # Pin a file referenced by hash to local IPFS. Returns True if successfull def pin2IPFS(self, hash): cmd = ["ipfs", "pin", "add", hash] out = subprocess.run(cmd, stderr=subprocess.DEVNULL, stdout=subprocess.PIPE).stdout.decode('utf-8') if out.startswith("pinned"): return True else: return False # Add a file to IPFS and return the hash for it. # Returns 46 character hash: "Qm....1234567890123456789012345678901234567890" # or a zero length string def add2IPFS(self, file): cmd = ["ipfs", "add", "-Q", file] out = subprocess.run(cmd, stderr=subprocess.DEVNULL, stdout=subprocess.PIPE).stdout.decode('utf-8') if out.startswith("Qm") and len(out) == 47: hash = out[0:46] # Return the 46 character hash - newline else: hash = "" return hash # Add the updated SQLite DB to IPFS under the static IPNS name associated with # STATIC_DB_HASH. That way the most recent DB can always be obtained by wget: # https://ipfs.io/ipns/. # 2023/01/30: Despite closing the DB conn, it seems now prev DB is published. # I suspect the sqlite import doesn't flush the file on close now, but it did. def publishDB(self, file): newDBhash = self.add2IPFS(file) # Add the updated SQLite database to IPFS if len(newDBhash) == 46: if STATIC_DB_HASH: cmd = ["ipfs", "name", "publish", "-Q", "-key=" + STATIC_DB_HASH, newDBhash] cp = subprocess.run(cmd, capture_output=True, text=True) if cp.returncode != 0: print(f"Error publishing database to IPFS: {cp.stderr}", flush=True) return newDBhash # Reopen the SQL DB and update the IPFS_INFO table with info for this run. # The info for this run will not be available in the database published to # IPFS until the next database is published (a chicken & egg scenario) due # because the hash of the published database is in the IPFS_INFO table. def updateRunInfo(self, sqlFile, dbHash, good, errs): conn = self.openSQLiteDB(sqlFile) conn.row_factory = sqlite3.Row # Results as python dictionary sql = '''INSERT INTO IPFS_INFO ("db_hash", "dl_good", "dl_errs") VALUES (?,?,?);''' if conn is not None: conn.execute(sql, (dbHash, good, errs)) conn.commit() conn.execute('VACUUM;') # Flush to the currently open database file conn.close() # This will be the very last thing done this run # Filter the list of URLs provided in config file to skip URLs we've already # DL'd, based on a SQLite query. Filtering is based on the video ID. This is # not of much value b/c most URLs are for a channel or playlist, plus yt-dlp # already provides this function. def filterUrls(self, conn, configUrls): regx = re.compile(r'^.*?watch\?v=([^&]+)&*.*$', re.IGNORECASE) cursor = conn.cursor() filteredUrls = [] for url in configUrls: match = re.search(regx, url) if match is not None: # Is this a single video url? id = match.group(1) sql = "SELECT COUNT(*) FROM IPFS_HASH_INDEX WHERE id = ?;" if cursor.execute(sql, [id]).fetchone()[0] == 0: filteredUrls.append(url) else: filteredUrls.append(url) return filteredUrls # Create a grupe index file containing a list of all video and metadata IPFS # hashes for the grupe. Add it to IPFS & return the hash and count of rows # updated. -> object ??? def updateGrupeIndex(self, conn, grupe): cursor = conn.cursor() sql = 'SELECT "v=" || vhash || " " || "m=" || mhash' sql += ' FROM IPFS_HASH_INDEX' sql += ' WHERE grupe = ?' idxFile = f"/tmp/{grupe}_idx.txt" with open(idxFile, "w") as idx: for row in cursor.execute(sql, (grupe,)): # Loop through all grupe rows if row[0]: idx.write(row[0] + '\n') hash = self.add2IPFS(idxFile) if len(hash) > 0: sql = "UPDATE IPFS_HASH_INDEX set g_idx=? WHERE grupe=?" cursor.execute(sql, (hash, grupe)) conn.commit() os.remove(idxFile) return cursor.rowcount, hash # This block of code will create group index files for every group in DB, # then add that file to IPFS. Update every row in the group with that hash, # and do that for every grupe, so every row in IPFS_HASH_INDEX table gets # updated. See "updateGrupeIndex" above for details. This just wraps that. def regenerateAllGrupeIndexes(self, conn): cursor = conn.cursor() sql = "sSELECT DISTINCT grupe FROM IPFS_HASH_INDEX" for row in cursor.execute(sql): (count, hash) = self.updateGrupeIndex(conn, row[0]) print("Updated %d rows for grupe %s with grupe index hash %s" % (count, row[0], hash), flush=True) # Add a row to SQLite database. Most of the column data is from the JSON # metadata (gvmjList) downloaded with the video. Note that SQLite is not # thread safe, so only the main thread updates the DB. def addRow2db(self, conn, cols, gvmjList): (grupe, vhash, vsize, mhash, jsn) = gvmjList cursor = conn.cursor() jsn["episode_number"] = mhash # Mark row as pinned by adding the hashes jsn["season_number"] = vhash # to these fields values = [grupe, vhash, vsize, mhash] # 1st 4 values not in YT metadata sql = 'INSERT INTO IPFS_HASH_INDEX ("grupe", "vhash", "vsize", "mhash"' for col in cols: sql += ',\n\t"' + col + '"' sql += ") VALUES (?,?,?,?" # Now add metadata for col in cols: sql += ",?" values.append(jsn[col]) sql += "); " cursor.execute(sql, values) conn.commit() return cursor.lastrowid # This wrapper method calls the addRow2db method above and upon failure makes # a 2nd attempt to insert the row with an alternate metadata dictionary. The # alternate dictionary is created by copying the valid metadata into the new # alternate and adding missing keys, setting their missing values to "?". # # The ytdl extractors vary as to the format of the metadata they produce, # youtube-dl doesn't totally normalize it. If a video file was downloaded and # an IPFS hash was produced a row will be added with sqlts, pky, g_idx, grupe, # vhash, vsize, season_number and _filename columns that have known valid data. def addRow(self, conn, cols, gvmjList): try: row = self.addRow2db(conn, cols, gvmjList) # Attempt number one... # On failure create a new metadata dictionary for this file. For any # missing keys, create a key whose value is "?". This is a work- # around for JSON metadata fields the extractor doesn't provide. except (sqlite3.OperationalError, KeyError) as e: newDictionary = {} (grp, vhash, vsize, mhash, jsn) = gvmjList for col in cols: if col in jsn.keys(): newDictionary[col] = jsn[col] else: newDictionary[col] = "?" # Previously "unknown-value" # Try again. Any exception this time will propagate upstream row = self.addRow2db(conn, cols, (grp, vhash, vsize, mhash, newDictionary)) return row # Add a row to the SQLite database for every video downloaded for this grupe, # print the successes and failures and log the failures to the error log file. # NOTE: Reduced printed output for Pirate Stick for leaner reporting. def processGrupeResults(self, conn, cols, items, grupe, eLog): downloads = len(self.SQLrows2Add) good = 0 if downloads > 0: for dat in self.SQLrows2Add: # dat = (grp, vhash, vSize, mhash, json) try: self.addRow(conn, cols, dat) good += 1 # Sucessfully added to SQLite # Failed to add the row to SQLite, but it's saved in IPFS except Exception as expn: args = (dat[0], dat[1], dat[2], dat[3], dat[4], decodeException(expn)) er = "SQL Error! Grupe=%s vHash=%s vSize=%s, mHash=%s\n\nJSON=%s\n%s" % args er += "\nMetadata key/values used:\n" json = dat[4] for key in json: er += "%32s = %s\n" % (key, json[key]) self.ErrorList.append(er) self.updateGrupeIndex(conn, grupe) # Print and log the list of download failures failures = len(self.ErrorList) if len(self.ErrorList) > 0: eLog.write("PROCESSING ERRORS FOR GRUPE=%s:\n" % grupe) for error in self.ErrorList: eLog.write(error + '\n') eLog.write("END OF ERRORS FOR %s\n\n" % grupe) args = (items, downloads, failures) print("Processed=%d (Succeeded=%d, Failed=%d)" % args, flush=True) return good, failures # Used to determine if folder size limit has been exceeded. NOT recursive def getSize(self, path): totalSize = 0 for f in os.listdir(path): fp = os.path.join(path, f) totalSize += os.path.getsize(fp) return totalSize # Check if the download folder for this grupe is over the quota (if any) and # remove the oldest file if it is. The quota is the maximum number of files # or the maximum amount of space to limit the folder to. Quota is a string of # integers followed by whitespace & unit string value. If no unit designation # is specified the quota is the amount of space used in bytes. When the limit # is exceeded the oldest files are removed to make room. .json and .wav files # aren't counted in a file count quota, but they are for folder space quotas. # Removals always remove all files of the same name regardless of extension, # HOWEVER, wildcard replacement occurs after the 1st . on the left. Also note # that pruning will never remove the last remaining file. def pruneDir(self, quota, dir): max = count = 0 fList = [] if quota: # Do nothing if no quota specified q = quota.split(' ') # Quota amount and units, if any if q[0].isdecimal(): # Check if string is a valid number max = int(q[0]) # This is the quota limit if max < 2: # Invalid quota value, zero removed err = "Invalid quota: " + dir self.ErrorList.append(err) # Log the error return False for f in os.listdir(dir): # Create a list of candidate files if f.endswith(EXT_LIST): # Only include primary video files fList.append(dir + '/' + f) # Prefix the file with path count += 1 # Count how many in the list if count < 2: return False # We're done if none or only 1 old = min(fList, key=os.path.getctime) # Get oldest file if len(q) > 1: size = 0 # Quota limits number of files else: size = self.getSize(dir) # Quota limits space used if count > max or size > max: # Over the quota? rm = old.rsplit('.')[0] + ".*" # Replace extension with a wildcard os.system("rm -rf %s" % rm) # Easy way to do all related files return True # Oldest file removed else: return False # Rename filename with numeric value for media duration def fixDuration(self, name, seconds=0): duration = str(int(float(seconds))) noDur = SEPARATOR + "NA" + SEPARATOR # SEPARATOR - see serverDefs import newDur = SEPARATOR + duration + SEPARATOR zeroDur = SEPARATOR + "0" + SEPARATOR if noDur in name: newName = name.replace(noDur, newDur) elif zeroDur in name: newName = name.replace(zeroDur, newDur) else: newName = name if newName != name: os.rename(name, newName) return newName # Return info provided within the downloaded filename def parseFilename(self, file): dir, base = file.rsplit('/', 1) # Separate grupe folder & downloaded file grp = dir.rsplit('/', 1)[1] # Extract grupe from full path pb, ext = os.path.splitext(file) # Path+Base in [0], extension in [1] mFile = pb + ".info.json" # json metadata file for this download vFile = file # Full pathname of downloaded file return [dir, base, grp, pb, ext, vFile, mFile] # This method is a process thread started with each successful download. It # is started as a daemon thread to add the video and its' metadata to IPFS, # and creates lists for errors and the files downloaded (to update SQLite). def processVideo(self, file): vHash = mHash = jFlat = vSize = duration = None print(f"\nProcessing video file: {file}", flush=True) p = self.parseFilename(file) dir,base,grp,pb,ext,vFile,mFile = p # Get all the values from full pathname # The grupe quota limits the size of the download folder. It's a string # containing an integer with a space followed by an optional units word. quota = self.Config["Grupes"][grp]["Quota"] # i.e. "20 files" or "2500000000" pruned = False while self.pruneDir(quota, dir): # Keep pruning until under quota time.sleep(0.01) pruned = True if pruned: self.ErrorList.append("WARNING: Folder limit reached and pruned!") # Process the metadata JSON file first, as we may need information from it try: with open(mFile, 'r') as jsn: # Read the entire JSON metadata file jDict = json.load(jsn) # Create a python dictionary from it jFlat = self.flatten_json(jDict) # Flatten the dictionary id3 = TinyTag.get(file) if id3.year: jFlat["release_year"] = id3.year if id3.title: jFlat["title"] = id3.title if id3.artist: jFlat["artist"] = id3.artist if id3.duration: duration = id3.duration if not duration: duration = 0 # No duration from yt-dlp or id3 jFlat["duration"] = duration # At least make it's numeric, and mf = self.fixDuration(mFile, duration) # rename both video & metadata vf = self.fixDuration(file, duration) # files too p = self.parseFilename(vf) # Reparse for name changes mHash = self.add2IPFS(mf) # Add the metadata file to IPFS except Exception as e: er = self.decodeException(e) self.ErrorList.append(f"Metadata problem {file}:\n{er}") # Log all errors, but add to SQLite if we got a valid video hash from IPFS try: dir,base,grp,pb,ext,vFile,mFile = p # Retrieve latest values vHash = self.add2IPFS(vFile) # Add video file to IPFS if len(vHash) == 46 and jFlat: vSize = os.path.getsize(vFile) # Get file size for database jFlat['_filename'] = base # Just use the base filename if os.path.exists(vFile): # Delete once it's in IPFS os.remove(vFile) # Remove only the video file else: raise IPFSexception except Exception as e: # Log all errors that may occur er = self.decodeException(e) args = (grp, vHash, mHash, base, er) self.ErrorList.append("Grupe=%s vHash=%s mHash=%s vFile=%s\n%s" % args) # If vHash is valid create a SQLite entry for it, regardless of metadata finally: # print(f"g={base} v={vSize} jLen={len(jFlat)} j={type(jFlat)}", flush=True) if vHash.startswith("Qm"): self.SQLrows2Add.append([grp, vHash, vSize, mHash, jFlat]) # Starts a daemon thread to process the downloaded file. youtube-dl provides no # way to obtain information about the ffmpeg post processor, and adding to IPFS # can be time consuming. Using threads to handle files allows the main thread # 2 download other files concurrently with IPFS additions. See the processVideo # function above for specifics of how downloaded files are processed. def callback(self, d): if d['status'] == 'finished': path = d['filename'] # Callback provides full pathname nam = path.rsplit('/', 1)[1].split("~^~", 1)[0] th = threading.Thread(name=nam, target=self.processVideo, args=([path]), daemon=True) th.start() # Start the thread and continue ############################################################################## # # # Primary program loop. The youtube-dl library takes care of downloading. # # The callback function above processes each download, adding files to IPFS # # and creating a list of rows to add to the SQLite DB by this function. # # # ############################################################################## def ytdlProcess(self, conn): sep = SEPARATOR cols = self.Config['MetaColumns'] dlBase = self.Config['DLbase'] dlArch = dlBase + self.Config['DLarch'] dlElog = dlBase + self.Config['DLeLog'] dlOpts = self.Config['DLOpts'] grupeList = self.Config['Grupes'] total = 0 failures = 0 # NOTE: items missing from downloaded metadata will be replaced with "?" # Media duration is important for sorting files into schedules. ytdlFileFormat = "/%(id)s" + sep + "%(duration)s"+ sep + ".%(ext)s" dlOpts['ignoreerrors'] = True # Until can catch from yt-dlp dlOpts['downloader'] = "aria2c" # Fast downloader dlOpts['downloader-args'] = "aria2c:'-c -j 3 -x 3 -s 3 -k 1M'" # dlOpts['verbose'] = True # Useful for debugging # Add crucial download options # dlOpts['force-ipv6'] = True # May not be enabled on host dlOpts['writeinfojson'] = True dlOpts['progress_hooks']=[self.callback] # Called at least 1ce per video dlOpts['download_archive'] = dlArch # Facilitates updates w/o dupes dlOpts['restrictfilenames'] = True # Required format for DLd files eLog = open(dlElog, mode='a+') # Error log file for all grupes for grupe in grupeList: if not grupeList[grupe]['Active']: continue # Skip this grupe self.SQLrows2Add = [] # Empty the list of downloads self.ErrorList = [] # Empty the list of errors print("\nBEGIN " + grupe, flush=True) # Marks start of group in log if not os.path.isdir(dlBase+grupe): # If it doesn't exist os.mkdir(dlBase + grupe) # create folder 4 grupe # Add qualifier for minimum video duration (in seconds) dur = grupeList[grupe]['Duration'] if dur != None and dur > 0: dur = "duration > %d" % dur dlOpts['match_filter'] = utils.match_filter_func(dur) elif 'match_filter' in dlOpts.keys(): del dlOpts['match_filter'] # No duration filter # Add release date range qualifier; either one or both are OK sd = grupeList[grupe]['Start'] # null or YYYYMMDD format ed = grupeList[grupe]['End'] # in JSON config file if sd != None or ed != None: dr = utils.DateRange(sd, ed) # Dates are inclusive dlOpts['daterange'] = dr # Always set a date range elif 'daterange' in dlOpts.keys(): del dlOpts['daterange'] # No date filter # This stops downloading from playlist after this many videos stop = grupeList[grupe]['Stop'] if stop != None and stop > 0: dlOpts['playlistend'] = stop elif 'playlistend' in dlOpts.keys(): del dlOpts['playlistend'] # No playlist limit # This will change downloaded file folder for each grupe dlOpts['outtmpl'] = dlBase + grupe + ytdlFileFormat urls = grupeList[grupe]['urls'] # Don't even try downloading videos we already have in the DB newUrls = urls # self.filterUrls(conn, urls) yt_dlp.YoutubeDL(dlOpts).download(newUrls) # BEGIN DOWNLOADING!!! print(f"YOUTUBE-DL PROCESSING COMPLETE for {grupe}", flush=True) # Wait for all callback threads to finish for th in threading.enumerate(): if th.name != "MainThread": while th.is_alive(): time.sleep(0.1) # Log errors and print results of this DL grupe good, fails = self.processGrupeResults(conn, cols, len(urls), grupe, eLog) total += good failures += fails # Accumulate totals for this run eLog.close() return total, failures # # Display a summary of this download session. Return them for emailing. # def displaySummary(self, conn): # # Print the total number of files in the DB since it was created # sql = "SELECT COUNT(*), MIN(sqlts) FROM IPFS_HASH_INDEX;" cols = (conn.cursor().execute(sql)).fetchone() # 1 row, 2 columns total = f"\n{cols[0]} files downloaded and indexed since {cols[1][0:10]}" print(total, flush=True) mail = total + '\n' # # Report the number of files added to each grupe in the last 30 days # strt = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d") sql = "SELECT DISTINCT SUBSTR(sqlts, 6, 5) as tme, grupe, count(*) as cnt " sql += "FROM IPFS_HASH_INDEX WHERE sqlts > ? " sql += "GROUP BY grupe ORDER BY sqlts desc;" args = " Date Videos Grupe (Videos Added in the Last 30 Days)" print(args, flush=True) mail += args + '\n' for cols in conn.execute(sql, (strt,)): args = (cols['tme'], cols['cnt'], cols['grupe']) mail += "%5s | %6d | %s\n" % args print("%5s | %6d | %s" % args, flush=True) # # Print the total number for all grupes in last 30 days # sql = "SELECT COUNT(*) FROM IPFS_HASH_INDEX WHERE sqlts > ?" dbObj = conn.cursor().execute(sql, (strt,)) total = " Total: " total += "%5d" % dbObj.fetchone()[0] print(total, flush=True) mail += total # # Report the files downloaded today as grupes, titles & IPFS URLs # urls = "" sql = "SELECT grupe, title, vhash " sql += "FROM IPFS_HASH_INDEX " sql += "WHERE DATE(sqlts) = DATE('now', 'localtime', 'start of day');" rows = (conn.cursor().execute(sql)).fetchall() if len(rows) > 0: args = "\nIPFS URLs for files downloaded today:" urls = args + '\n' print(args, flush=True) for col in rows: args = (col['grupe'], col['title'][:48], col['vhash']) text = "%13s | %48s | https://ipfs.io/ipfs/%s" % args urls += text + '\n' print(text, flush=True) else: args = "\nRun complete. No files downloaded today." print(args, flush=True) mail += args return mail, urls # Send a plain text message via email to recipient(s). Retry if necessary def emailResults(self, server, account, subject, origin, to, text): msg = EmailMessage() # Create a text/plain container msg.set_content(text) msg['Subject'] = subject msg['From'] = origin msg['To'] = to # tx-server: /usr/local/lib/python3.8/site-packages/certifi/cacert.pem # context = ssl._create_unverified_context() # with smtplib.SMTP_SSL(server[0], server[1], context=context) as emailer: # emailer.login(account[0], account[1]) # emailer.send_message(msg) def sendMail(srvr, acct, mesg): try: context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE with smtplib.SMTP(srvr[0], srvr[1]) as emailer: emailer.starttls(context=context) emailer.login(acct[0], acct[1]) emailer.send_message(mesg) except Exception as e: return self.decodeException(e) return 0 # No Exception, so mesg was sent successfully for cnt in range(1, EMAIL_RETRIES): unsent = sendMail(server, account, msg) if unsent: print(f"Attempt {cnt}: {unsent}", flush=True) time.sleep(EMAIL_DELAY) else: break # We're done if message sent ############################################################################## # Get command line arguments. Returns a tuple with config and DB connection. # # Usage: thisFile [-h] | <-c config> <-d sqlite> # # # # Parse command line and report config info. Prints usage and exists if args # # are invalid or missing. mff is the metadataFields.json file to load # ############################################################################## def getCmdLineArgs(self): if len(sys.argv) >= 5: sqlDBfile = self.Config = conn = None grupe = grupes = urls = 0 # Required parameter: -c config file if sys.argv[1] == "-c" and os.path.isfile(sys.argv[2]): with open(sys.argv[2], 'r') as jsn: self.Config = json.load(jsn) meta = self.Config['DLbase'] + self.Config['DLmeta'] if len(meta) > 0: # Did config info load? with open(meta, 'r') as jsn: # Now load meta fields self.Config['MetaColumns'] = json.load(jsn)['MetaColumns'] metaSize = len(self.Config['MetaColumns']) if metaSize > 0: # All config info loaded OK? for grupe in self.Config['Grupes']: # Count groups and urls in them grupes += 1 urls += len( self.Config['Grupes'][grupe]['urls'] ) print("Database Metadata Columns=%d" % metaSize, flush=True) print("Downloaded groups will be saved in %s" % self.Config['DLbase'], flush=True) print("%d groups, %d urls to process" % (grupes, urls), flush=True) else: self.usage() else: self.usage() # Required parameter: -d SQLite database file if sys.argv[3] == "-d": sqlDBfile = sys.argv[4] conn = self.openSQLiteDB(sqlDBfile) conn.row_factory = sqlite3.Row # Results as python dictionary if conn == None: self.usage() # Optional parameter: -g grupe from config to use if len(sys.argv) >= 6 and sys.argv[5] == "-g": print(f"Ignoring all grupes in config except {grupe}", flush=True) for grupe in self.Config['Grupes']: # Mark all inactive except 1 if grupe == sys.argv[6]: continue self.Config['Grupes'][grupe]['Active'] = False if not os.path.isdir(self.Config['DLbase']): # Create folder for results os.mkdir(self.Config['DLbase']) # if necessary return self.Config, conn, sqlDBfile # Return essential information else: self.usage() ############################################################################## # Primary starting point for script # # ############################################################################ def runScript(self, sqlFile, conn): hash = None if sqlFile is None or conn is None: self.Config, conn, sqlFile = self.getCmdLineArgs() # Open config, DB #regenerateAllGrupeIndexes(conn) # Fix all grupe indexes #exit(0) # Command line and config file processed, time to get down to it good, fails = self.ytdlProcess(conn) mail, urls = self.displaySummary(conn) conn.execute("vacuum;") # Flush changes in this session & make backup conn.execute("vacuum into 'latest.sqlite';") # Flush all to backup conn.close() # DBs should be the same now, but do sha256 hashes differ? # They shouldn't, but updateRunInfo will make them differ! # If any downloads were successful, update IPFS with new SQLite file. # This hash will appear in the IPFS_INFO table published next run. if good > 0: hash = self.publishDB("latest.sqlite") args = f"\nThe newest SQLite DB hash is: {hash}\n" if STATIC_DB_HASH: args += "It is always available at:\n" args += f"https://ipfs.io/ipns/{STATIC_DB_HASH}" mail += args print(args + "\n", flush=True) # Update IPFS_INFO table in SQL DB with results for this run # and flush all changes to current database file, not latest. # Current DB will actually be latest, tho not latest published. self.updateRunInfo(sqlFile, hash, good, fails) if SEND_EMAIL: self.emailResults(EMAIL_SERVR, EMAIL_LOGIN, EMAIL_SUB1, EMAIL_FROM, EMAIL_LIST, mail) if len(EMAIL_URLS) > 0 and len(urls) > 0: self.emailResults(EMAIL_SERVR, EMAIL_LOGIN, EMAIL_SUB2, EMAIL_FROM, EMAIL_URLS, urls) ############################################################################### # main is called only if this file is invoked as a script not an object class # ############################################################################### if __name__ == "__main__": mp = MediaProcessor() mp.runScript(None, None) # Run script using command line parameters exit(0)