Files: b46e4ec37c3250819bcfec6a303708c4994256a0 / MediaProcessor.py
39531 bytesRaw
1 | #!/usr/bin/env python3 |
2 | # |
3 | # MediaProcessor.py - Program to process media files and add them to IPFS. |
4 | # |
5 | # A SQLite3 database is used to store IPFS hashes and detailed metadata for |
6 | # each item. This lightweight SQL engine is filesystem based (no server). |
7 | # |
8 | # This script requires 2 JSON formatted config files containing schema column |
9 | # names and other info, like filter settings for video length and upload date. |
10 | # |
11 | # Youtube is known to change the metadata keys, making it difficult to rely on |
12 | # youtube metadata for the schema. After sampling 1000s of videos a common set |
13 | # of metadata fields was arrived at. The metadataFields.json file is currently |
14 | # where these fields / database columns are defined. |
15 | # |
16 | # The metadata provided by youtube-dl extractors is not totally normalized. To |
17 | # insure a database record is saved for every file added to IPFS, an alternate |
18 | # metadata dictionary is used which always sets these 9 essential columns: |
19 | # |
20 | # sqlts, pky, g_idx, grupe, vhash, vsize, season_number, url and _filename |
21 | # |
22 | # and any others from the downloaded metadata whose field names are found in |
23 | # the metadataFields.json file. A substitute metadata dictionary is used when a |
24 | # SQL error occurs while adding a new row. If the 2nd attempt fails it's logged |
25 | # along with the IPFS hash, so a record can be added manually at a later time. |
26 | # Usually the substitution is required because there are missing fields in the |
27 | # metadata provided by some youtube-dl extractors specific to a video source. |
28 | # |
29 | from __future__ import unicode_literals |
30 | from email.message import EmailMessage |
31 | from mpServerDefinitions import * # Server specific CONSTANTS |
32 | from tinytag import TinyTag # Gets meta info from MP3 files |
33 | from yt_dlp import utils |
34 | from datetime import * |
35 | import Exceptions |
36 | import subprocess |
37 | import threading |
38 | import smtplib |
39 | import sqlite3 |
40 | import yt_dlp |
41 | import time |
42 | import json |
43 | import ssl |
44 | import sys |
45 | import os |
46 | import re |
47 | |
48 | |
49 | class MediaProcessor: |
50 | |
51 | def __init__(self): |
52 | self.SQLrows2Add = [] # Lists populated by download callback threads |
53 | self.ErrorList = [] |
54 | self.Config = {} # Loaded from config file (JSON file format) |
55 | |
56 | """ config file template, JSON format. Use single quotes only, null not None: |
57 | self.Config { |
58 | "Comment": [ "Unreferenced - for comments inside config file", |
59 | "It is difficult sometimes to find the video link for Brightcove videos.", |
60 | "This is the method that I use with Firefox. You will need 2 things:", |
61 | "a) AccountID", |
62 | "b) VideoID", |
63 | "Get these by right-clicking on the video and select Player Information.", |
64 | "Use ctrl-C to copy the info, and plug them into this template:", |
65 | "http://players.brightcove.net/<AccountID>/default_default/index.html?videoId=<VideoID>", |
66 | "Works with Firefox 68.8, 75.0 and probably others as of May 12, 2020" |
67 | ], |
68 | |
69 | "DLbase": "dir", Folder for all downloaded files organized by grupe |
70 | "DLeLoselfg":"file", File for exceptions / errors during downloads |
71 | "DLarch": "file", This tracks downloads to skip those already done |
72 | "DLmeta": "file", The metadata definition is now split into this file |
73 | |
74 | "DLOpts": { Name / value pairs for youtube-dl options |
75 | "optName1": "value1", NOT always the same as cmd line opts |
76 | ... |
77 | }, |
78 | |
79 | "Grupes": { # Dictionary of grupes to download, with its own criteria |
80 | "gName1": { Group name containing video selection criteria |
81 | "Active": true, Enable or disable downloads for this grupe |
82 | "Duration": 0, Min size of video in seconds; for no limits use 0 |
83 | "Quota": null, Limits size of grupe's DL folder to N files |
84 | "Start": null, Earliest upload date string or (YYYYMMDD) or null |
85 | "End": null, Latest upload date or null. 1, 2 or neither OK |
86 | "Stop": null, Stop downloading from playlist after this many DLs |
87 | "url1", |
88 | "url2", |
89 | ... |
90 | ] |
91 | }, |
92 | ... Additional grupes |
93 | }, |
94 | |
95 | "MetaColumns": [ Contains the list of database fields for metadata for |
96 | ... the video downloaded along with it in JSON format. |
97 | This section is loaded from a separate file which is |
98 | specified by the DLbase and DLmeta key defined above. |
99 | ] |
100 | } |
101 | """ |
102 | |
103 | def usage(self): |
104 | cmd = sys.argv[0] |
105 | str = "\nUses youtube-dl to download videos and add them to IPFS and track\n" |
106 | str += "the results in a SQLite database.\n\n" |
107 | str += "Usage: " + cmd + " [-h] | <-c config> <-d sqlite> [-g grupe]\n" |
108 | str += "-h or no args print this help message.\n\n" |
109 | str += "-c is a JSON formated config file that specifies the target groups,\n" |
110 | str += "their URL(s), downloader options, the base or top level folder for\n" |
111 | str += "the groups of files downloaded and the list of metadata columns.\n" |
112 | str += "-d is the SQLite filename (it is created if it doesn't exist).\n\n" |
113 | str += "-g ignore all but the grupes in config except the one name after -g.\n\n" |
114 | print(str) |
115 | exit(0) |
116 | |
117 | # Return detailed info from the stack trace. Default is to |
118 | # return only the last item, pass True as 2nd arg for all items. |
119 | def decodeException(self, e, all=False): |
120 | trace = [] |
121 | stk = 0 |
122 | tb = e.__traceback__ |
123 | while tb is not None: |
124 | stk += 1 |
125 | trace.append({ |
126 | "stk": stk, |
127 | "filename": tb.tb_frame.f_code.co_filename, |
128 | "function": tb.tb_frame.f_code.co_name, |
129 | "lineno": tb.tb_lineno |
130 | }) |
131 | tb = tb.tb_next |
132 | if not all: |
133 | trace = trace[-1] |
134 | del trace["stk"] |
135 | return { |
136 | 'type': type(e).__name__, |
137 | 'message': str(e), |
138 | 'trace': trace |
139 | } |
140 | |
141 | # Flattens a nested JSON object and returns a python dictionary |
142 | def flatten_json(self, nested_json): |
143 | out = {} |
144 | def flatten(x, name=''): |
145 | if type(x) is dict: |
146 | for a in x: |
147 | flatten(x[a], name + a + '_') |
148 | elif type(x) is list: |
149 | i = 0 |
150 | for a in x: |
151 | flatten(a, name + str(i) + '_') |
152 | i += 1 |
153 | else: |
154 | out[name[:-1]] = x |
155 | |
156 | flatten(nested_json) |
157 | return out |
158 | |
159 | # Create the SQLite database file if it doesn't exist, using the |
160 | # MetaColumns from config. If it already exists, open a connection |
161 | # to it. Always returns a connection object to the dbFile. |
162 | def openSQLiteDB(self, dbFile): |
163 | newDatabase = not os.path.exists(dbFile) |
164 | columns = self.Config['MetaColumns'] |
165 | conn = sqlite3.connect(dbFile) |
166 | if newDatabase: |
167 | sql = '''create table if not exists IPFS_INFO ( |
168 | "sqlts" TIMESTAMP NOT NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now', 'localtime')), |
169 | "pky" INTEGER PRIMARY KEY AUTOINCREMENT, |
170 | "db_hash" TEXT, |
171 | "dl_good" INTEGER DEFAULT 0, |
172 | "dl_errs" INTEGER DEFAULT 0);''' |
173 | conn.execute(sql) |
174 | |
175 | sql = '''create table if not exists IPFS_HASH_INDEX ( |
176 | "sqlts" TIMESTAMP NOT NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now', 'localtime')), |
177 | "pky" INTEGER PRIMARY KEY AUTOINCREMENT, |
178 | "g_idx" TEXT, |
179 | "grupe" TEXT, |
180 | "vhash" TEXT, |
181 | "vsize" TEXT, |
182 | "mhash" TEXT''' |
183 | for c in columns: |
184 | sql += ',\n\t"' + c + '" TEXT' |
185 | sql += ')' |
186 | conn.execute(sql) |
187 | return conn |
188 | |
189 | # Pin a file referenced by hash to local IPFS. Returns True if successfull |
190 | def pin2IPFS(self, hash): |
191 | cmd = ["ipfs", "pin", "add", hash] |
192 | out = subprocess.run(cmd, stderr=subprocess.DEVNULL, |
193 | stdout=subprocess.PIPE).stdout.decode('utf-8') |
194 | if out.startswith("pinned"): return True |
195 | else: return False |
196 | |
197 | # Add a file to IPFS and return the hash for it. |
198 | # Returns 46 character hash: "Qm....1234567890123456789012345678901234567890" |
199 | # or a zero length string |
200 | def add2IPFS(self, file): |
201 | cmd = ["ipfs", "add", "-Q", file] |
202 | out = subprocess.run(cmd, stderr=subprocess.DEVNULL, |
203 | stdout=subprocess.PIPE).stdout.decode('utf-8') |
204 | if out.startswith("Qm") and len(out) == 47: |
205 | hash = out[0:46] # Return the 46 character hash - newline |
206 | else: hash = "" |
207 | return hash |
208 | |
209 | # Add the updated SQLite DB to IPFS under the static IPNS name associated with |
210 | # STATIC_DB_HASH. That way the most recent DB can always be obtained by wget: |
211 | # https://ipfs.io/ipns/<STATIC_DB_HASH value>. |
212 | # 2023/01/30: Despite closing the DB conn, it seems now prev DB is published. |
213 | # I suspect the sqlite import doesn't flush the file on close now, but it did. |
214 | def publishDB(self, file): |
215 | newDBhash = self.add2IPFS(file) # Add the updated SQLite database to IPFS |
216 | if len(newDBhash) == 46: |
217 | if STATIC_DB_HASH: |
218 | cmd = ["ipfs", "name", "publish", "-Q", "-key=" + STATIC_DB_HASH, newDBhash] |
219 | cp = subprocess.run(cmd, capture_output=True, text=True) |
220 | if cp.returncode != 0: |
221 | print(f"Error publishing database to IPFS: {cp.stderr}", flush=True) |
222 | return newDBhash |
223 | |
224 | # Reopen the SQL DB and update the IPFS_INFO table with info for this run. |
225 | # The info for this run will not be available in the database published to |
226 | # IPFS until the next database is published (a chicken & egg scenario) due |
227 | # because the hash of the published database is in the IPFS_INFO table. |
228 | def updateRunInfo(self, sqlFile, dbHash, good, errs): |
229 | conn = self.openSQLiteDB(sqlFile) |
230 | conn.row_factory = sqlite3.Row # Results as python dictionary |
231 | sql = '''INSERT INTO IPFS_INFO ("db_hash", "dl_good", "dl_errs") |
232 | VALUES (?,?,?);''' |
233 | |
234 | if conn is not None: |
235 | conn.execute(sql, (dbHash, good, errs)) |
236 | conn.commit() |
237 | conn.execute('VACUUM;') # Flush to the currently open database file |
238 | conn.close() # This will be the very last thing done this run |
239 | |
240 | # Filter the list of URLs provided in config file to skip URLs we've already |
241 | # DL'd, based on a SQLite query. Filtering is based on the video ID. This is |
242 | # not of much value b/c most URLs are for a channel or playlist, plus yt-dlp |
243 | # already provides this function. |
244 | def filterUrls(self, conn, configUrls): |
245 | regx = re.compile(r'^.*?watch\?v=([^&]+)&*.*$', re.IGNORECASE) |
246 | cursor = conn.cursor() |
247 | filteredUrls = [] |
248 | for url in configUrls: |
249 | match = re.search(regx, url) |
250 | if match is not None: # Is this a single video url? |
251 | id = match.group(1) |
252 | sql = "SELECT COUNT(*) FROM IPFS_HASH_INDEX WHERE id = ?;" |
253 | if cursor.execute(sql, [id]).fetchone()[0] == 0: |
254 | filteredUrls.append(url) |
255 | else: filteredUrls.append(url) |
256 | return filteredUrls |
257 | |
258 | # Create a grupe index file containing a list of all video and metadata IPFS |
259 | # hashes for the grupe. Add it to IPFS & return the hash and count of rows |
260 | # updated. -> object ??? |
261 | def updateGrupeIndex(self, conn, grupe): |
262 | cursor = conn.cursor() |
263 | |
264 | sql = 'SELECT "v=" || vhash || " " || "m=" || mhash' |
265 | sql += ' FROM IPFS_HASH_INDEX' |
266 | sql += ' WHERE grupe = ?' |
267 | idxFile = f"/tmp/{grupe}_idx.txt" |
268 | with open(idxFile, "w") as idx: |
269 | for row in cursor.execute(sql, (grupe,)): # Loop through all grupe rows |
270 | if row[0]: idx.write(row[0] + '\n') |
271 | hash = self.add2IPFS(idxFile) |
272 | if len(hash) > 0: |
273 | sql = "UPDATE IPFS_HASH_INDEX set g_idx=? WHERE grupe=?" |
274 | cursor.execute(sql, (hash, grupe)) |
275 | conn.commit() |
276 | os.remove(idxFile) |
277 | return cursor.rowcount, hash |
278 | |
279 | # This block of code will create group index files for every group in DB, |
280 | # then add that file to IPFS. Update every row in the group with that hash, |
281 | # and do that for every grupe, so every row in IPFS_HASH_INDEX table gets |
282 | # updated. See "updateGrupeIndex" above for details. This just wraps that. |
283 | def regenerateAllGrupeIndexes(self, conn): |
284 | cursor = conn.cursor() |
285 | sql = "sSELECT DISTINCT grupe FROM IPFS_HASH_INDEX" |
286 | for row in cursor.execute(sql): |
287 | (count, hash) = self.updateGrupeIndex(conn, row[0]) |
288 | print("Updated %d rows for grupe %s with grupe index hash %s" % |
289 | (count, row[0], hash), flush=True) |
290 | |
291 | # Add a row to SQLite database. Most of the column data is from the JSON |
292 | # metadata (gvmjList) downloaded with the video. Note that SQLite is not |
293 | # thread safe, so only the main thread updates the DB. |
294 | def addRow2db(self, conn, cols, gvmjList): |
295 | (grupe, vhash, vsize, mhash, jsn) = gvmjList |
296 | cursor = conn.cursor() |
297 | jsn["episode_number"] = mhash # Mark row as pinned by adding the hashes |
298 | jsn["season_number"] = vhash # to these fields |
299 | values = [grupe, vhash, vsize, mhash] # 1st 4 values not in YT metadata |
300 | sql = 'INSERT INTO IPFS_HASH_INDEX ("grupe", "vhash", "vsize", "mhash"' |
301 | |
302 | for col in cols: |
303 | sql += ',\n\t"' + col + '"' |
304 | |
305 | sql += ") VALUES (?,?,?,?" # Now add metadata |
306 | for col in cols: |
307 | sql += ",?" |
308 | values.append(jsn[col]) |
309 | sql += "); " |
310 | cursor.execute(sql, values) |
311 | conn.commit() |
312 | return cursor.lastrowid |
313 | |
314 | # This wrapper method calls the addRow2db method above and upon failure makes |
315 | # a 2nd attempt to insert the row with an alternate metadata dictionary. The |
316 | # alternate dictionary is created by copying the valid metadata into the new |
317 | # alternate and adding missing keys, setting their missing values to "?". |
318 | # |
319 | # The ytdl extractors vary as to the format of the metadata they produce, |
320 | # youtube-dl doesn't totally normalize it. If a video file was downloaded and |
321 | # an IPFS hash was produced a row will be added with sqlts, pky, g_idx, grupe, |
322 | # vhash, vsize, season_number and _filename columns that have known valid data. |
323 | def addRow(self, conn, cols, gvmjList): |
324 | try: |
325 | row = self.addRow2db(conn, cols, gvmjList) # Attempt number one... |
326 | |
327 | # On failure create a new metadata dictionary for this file. For any |
328 | # missing keys, create a key whose value is "?". This is a work- |
329 | # around for JSON metadata fields the extractor doesn't provide. |
330 | except (sqlite3.OperationalError, KeyError) as e: |
331 | newDictionary = {} |
332 | (grp, vhash, vsize, mhash, jsn) = gvmjList |
333 | for col in cols: |
334 | if col in jsn.keys(): |
335 | newDictionary[col] = jsn[col] |
336 | else: newDictionary[col] = "?" # Previously "unknown-value" |
337 | |
338 | # Try again. Any exception this time will propagate upstream |
339 | row = self.addRow2db(conn, cols, (grp, vhash, vsize, mhash, newDictionary)) |
340 | return row |
341 | |
342 | # Add a row to the SQLite database for every video downloaded for this grupe, |
343 | # print the successes and failures and log the failures to the error log file. |
344 | # NOTE: Reduced printed output for Pirate Stick for leaner reporting. |
345 | def processGrupeResults(self, conn, cols, items, grupe, eLog): |
346 | downloads = len(self.SQLrows2Add) |
347 | good = 0 |
348 | |
349 | if downloads > 0: |
350 | for dat in self.SQLrows2Add: # dat = (grp, vhash, vSize, mhash, json) |
351 | try: |
352 | self.addRow(conn, cols, dat) |
353 | good += 1 # Sucessfully added to SQLite |
354 | |
355 | # Failed to add the row to SQLite, but it's saved in IPFS |
356 | except Exception as expn: |
357 | args = (dat[0], dat[1], dat[2], dat[3], dat[4], decodeException(expn)) |
358 | er = "SQL Error! Grupe=%s vHash=%s vSize=%s, mHash=%s\n\nJSON=%s\n%s" % args |
359 | er += "\nMetadata key/values used:\n" |
360 | json = dat[4] |
361 | for key in json: |
362 | er += "%32s = %s\n" % (key, json[key]) |
363 | self.ErrorList.append(er) |
364 | |
365 | self.updateGrupeIndex(conn, grupe) |
366 | |
367 | # Print and log the list of download failures |
368 | failures = len(self.ErrorList) |
369 | if len(self.ErrorList) > 0: |
370 | eLog.write("PROCESSING ERRORS FOR GRUPE=%s:\n" % grupe) |
371 | for error in self.ErrorList: |
372 | eLog.write(error + '\n') |
373 | eLog.write("END OF ERRORS FOR %s\n\n" % grupe) |
374 | |
375 | args = (items, downloads, failures) |
376 | print("Processed=%d (Succeeded=%d, Failed=%d)" % args, flush=True) |
377 | return good, failures |
378 | |
379 | # Used to determine if folder size limit has been exceeded. NOT recursive |
380 | def getSize(self, path): |
381 | totalSize = 0 |
382 | for f in os.listdir(path): |
383 | fp = os.path.join(path, f) |
384 | totalSize += os.path.getsize(fp) |
385 | return totalSize |
386 | |
387 | # Check if the download folder for this grupe is over the quota (if any) and |
388 | # remove the oldest file if it is. The quota is the maximum number of files |
389 | # or the maximum amount of space to limit the folder to. Quota is a string of |
390 | # integers followed by whitespace & unit string value. If no unit designation |
391 | # is specified the quota is the amount of space used in bytes. When the limit |
392 | # is exceeded the oldest files are removed to make room. .json and .wav files |
393 | # aren't counted in a file count quota, but they are for folder space quotas. |
394 | # Removals always remove all files of the same name regardless of extension, |
395 | # HOWEVER, wildcard replacement occurs after the 1st . on the left. Also note |
396 | # that pruning will never remove the last remaining file. |
397 | def pruneDir(self, quota, dir): |
398 | max = count = 0 |
399 | fList = [] |
400 | |
401 | if quota: # Do nothing if no quota specified |
402 | q = quota.split(' ') # Quota amount and units, if any |
403 | if q[0].isdecimal(): # Check if string is a valid number |
404 | max = int(q[0]) # This is the quota limit |
405 | if max < 2: # Invalid quota value, zero removed |
406 | err = "Invalid quota: " + dir |
407 | self.ErrorList.append(err) # Log the error |
408 | return False |
409 | |
410 | for f in os.listdir(dir): # Create a list of candidate files |
411 | if f.endswith(EXT_LIST): # Only include primary video files |
412 | fList.append(dir + '/' + f) # Prefix the file with path |
413 | count += 1 # Count how many in the list |
414 | if count < 2: return False # We're done if none or only 1 |
415 | |
416 | old = min(fList, key=os.path.getctime) # Get oldest file |
417 | |
418 | if len(q) > 1: size = 0 # Quota limits number of files |
419 | else: size = self.getSize(dir) # Quota limits space used |
420 | if count > max or size > max: # Over the quota? |
421 | rm = old.rsplit('.')[0] + ".*" # Replace extension with a wildcard |
422 | os.system("rm -rf %s" % rm) # Easy way to do all related files |
423 | return True # Oldest file removed |
424 | else: return False |
425 | |
426 | # Rename filename with numeric value for media duration |
427 | def fixDuration(self, name, seconds=0): |
428 | duration = str(int(float(seconds))) |
429 | noDur = SEPARATOR + "NA" + SEPARATOR # SEPARATOR - see serverDefs import |
430 | newDur = SEPARATOR + duration + SEPARATOR |
431 | zeroDur = SEPARATOR + "0" + SEPARATOR |
432 | if noDur in name: |
433 | newName = name.replace(noDur, newDur) |
434 | elif zeroDur in name: |
435 | newName = name.replace(zeroDur, newDur) |
436 | else: |
437 | newName = name |
438 | if newName != name: os.rename(name, newName) |
439 | return newName |
440 | |
441 | # Return info provided within the downloaded filename |
442 | def parseFilename(self, file): |
443 | dir, base = file.rsplit('/', 1) # Separate grupe folder & downloaded file |
444 | grp = dir.rsplit('/', 1)[1] # Extract grupe from full path |
445 | pb, ext = os.path.splitext(file) # Path+Base in [0], extension in [1] |
446 | mFile = pb + ".info.json" # json metadata file for this download |
447 | vFile = file # Full pathname of downloaded file |
448 | return [dir, base, grp, pb, ext, vFile, mFile] |
449 | |
450 | # This method is a process thread started with each successful download. It |
451 | # is started as a daemon thread to add the video and its' metadata to IPFS, |
452 | # and creates lists for errors and the files downloaded (to update SQLite). |
453 | def processVideo(self, file): |
454 | vHash = mHash = jFlat = vSize = duration = None |
455 | |
456 | print(f"\nProcessing video file: {file}", flush=True) |
457 | p = self.parseFilename(file) |
458 | dir,base,grp,pb,ext,vFile,mFile = p # Get all the values from full pathname |
459 | |
460 | # The grupe quota limits the size of the download folder. It's a string |
461 | # containing an integer with a space followed by an optional units word. |
462 | quota = self.Config["Grupes"][grp]["Quota"] # i.e. "20 files" or "2500000000" |
463 | pruned = False |
464 | while self.pruneDir(quota, dir): # Keep pruning until under quota |
465 | time.sleep(0.01) |
466 | pruned = True |
467 | if pruned: self.ErrorList.append("WARNING: Folder limit reached and pruned!") |
468 | |
469 | # Process the metadata JSON file first, as we may need information from it |
470 | try: |
471 | with open(mFile, 'r') as jsn: # Read the entire JSON metadata file |
472 | jDict = json.load(jsn) # Create a python dictionary from it |
473 | jFlat = self.flatten_json(jDict) # Flatten the dictionary |
474 | id3 = TinyTag.get(file) |
475 | if id3.year: jFlat["release_year"] = id3.year |
476 | if id3.title: jFlat["title"] = id3.title |
477 | if id3.artist: jFlat["artist"] = id3.artist |
478 | if id3.duration: duration = id3.duration |
479 | if not duration: duration = 0 # No duration from yt-dlp or id3 |
480 | jFlat["duration"] = duration # At least make it's numeric, and |
481 | mf = self.fixDuration(mFile, duration) # rename both video & metadata |
482 | vf = self.fixDuration(file, duration) # files too |
483 | p = self.parseFilename(vf) # Reparse for name changes |
484 | mHash = self.add2IPFS(mf) # Add the metadata file to IPFS |
485 | except Exception as e: |
486 | er = self.decodeException(e) |
487 | self.ErrorList.append(f"Metadata problem {file}:\n{er}") |
488 | |
489 | # Log all errors, but add to SQLite if we got a valid video hash from IPFS |
490 | try: |
491 | dir,base,grp,pb,ext,vFile,mFile = p # Retrieve latest values |
492 | vHash = self.add2IPFS(vFile) # Add video file to IPFS |
493 | if len(vHash) == 46 and jFlat: |
494 | vSize = os.path.getsize(vFile) # Get file size for database |
495 | jFlat['_filename'] = base # Just use the base filename |
496 | if os.path.exists(vFile): # Delete once it's in IPFS |
497 | os.remove(vFile) # Remove only the video file |
498 | else: raise IPFSexception |
499 | |
500 | except Exception as e: # Log all errors that may occur |
501 | er = self.decodeException(e) |
502 | args = (grp, vHash, mHash, base, er) |
503 | self.ErrorList.append("Grupe=%s vHash=%s mHash=%s vFile=%s\n%s" % args) |
504 | |
505 | # If vHash is valid create a SQLite entry for it, regardless of metadata |
506 | finally: |
507 | # print(f"g={base} v={vSize} jLen={len(jFlat)} j={type(jFlat)}", flush=True) |
508 | if vHash.startswith("Qm"): |
509 | self.SQLrows2Add.append([grp, vHash, vSize, mHash, jFlat]) |
510 | |
511 | # Starts a daemon thread to process the downloaded file. youtube-dl provides no |
512 | # way to obtain information about the ffmpeg post processor, and adding to IPFS |
513 | # can be time consuming. Using threads to handle files allows the main thread |
514 | # 2 download other files concurrently with IPFS additions. See the processVideo |
515 | # function above for specifics of how downloaded files are processed. |
516 | def callback(self, d): |
517 | if d['status'] == 'finished': |
518 | path = d['filename'] # Callback provides full pathname |
519 | nam = path.rsplit('/', 1)[1].split("~^~", 1)[0] |
520 | th = threading.Thread(name=nam, target=self.processVideo, |
521 | args=([path]), daemon=True) |
522 | th.start() # Start the thread and continue |
523 | |
524 | ############################################################################## |
525 | # # |
526 | # Primary program loop. The youtube-dl library takes care of downloading. # |
527 | # The callback function above processes each download, adding files to IPFS # |
528 | # and creating a list of rows to add to the SQLite DB by this function. # |
529 | # # |
530 | ############################################################################## |
531 | def ytdlProcess(self, conn): |
532 | sep = SEPARATOR |
533 | cols = self.Config['MetaColumns'] |
534 | dlBase = self.Config['DLbase'] |
535 | dlArch = dlBase + self.Config['DLarch'] |
536 | dlElog = dlBase + self.Config['DLeLog'] |
537 | dlOpts = self.Config['DLOpts'] |
538 | grupeList = self.Config['Grupes'] |
539 | total = 0 |
540 | failures = 0 |
541 | # NOTE: items missing from downloaded metadata will be replaced with "?" |
542 | # Media duration is important for sorting files into schedules. |
543 | ytdlFileFormat = "/%(id)s" + sep + "%(duration)s"+ sep + ".%(ext)s" |
544 | |
545 | dlOpts['ignoreerrors'] = True # Until can catch from yt-dlp |
546 | dlOpts['downloader'] = "aria2c" # Fast downloader |
547 | dlOpts['downloader-args'] = "aria2c:'-c -j 3 -x 3 -s 3 -k 1M'" |
548 | # dlOpts['verbose'] = True # Useful for debugging |
549 | |
550 | # Add crucial download options |
551 | # dlOpts['force-ipv6'] = True # May not be enabled on host |
552 | dlOpts['writeinfojson'] = True |
553 | dlOpts['progress_hooks']=[self.callback] # Called at least 1ce per video |
554 | dlOpts['download_archive'] = dlArch # Facilitates updates w/o dupes |
555 | dlOpts['restrictfilenames'] = True # Required format for DLd files |
556 | eLog = open(dlElog, mode='a+') # Error log file for all grupes |
557 | for grupe in grupeList: |
558 | if not grupeList[grupe]['Active']: continue # Skip this grupe |
559 | self.SQLrows2Add = [] # Empty the list of downloads |
560 | self.ErrorList = [] # Empty the list of errors |
561 | print("\nBEGIN " + grupe, flush=True) # Marks start of group in log |
562 | |
563 | if not os.path.isdir(dlBase+grupe): # If it doesn't exist |
564 | os.mkdir(dlBase + grupe) # create folder 4 grupe |
565 | |
566 | # Add qualifier for minimum video duration (in seconds) |
567 | dur = grupeList[grupe]['Duration'] |
568 | if dur != None and dur > 0: |
569 | dur = "duration > %d" % dur |
570 | dlOpts['match_filter'] = utils.match_filter_func(dur) |
571 | elif 'match_filter' in dlOpts.keys(): |
572 | del dlOpts['match_filter'] # No duration filter |
573 | |
574 | # Add release date range qualifier; either one or both are OK |
575 | sd = grupeList[grupe]['Start'] # null or YYYYMMDD format |
576 | ed = grupeList[grupe]['End'] # in JSON config file |
577 | if sd != None or ed != None: |
578 | dr = utils.DateRange(sd, ed) # Dates are inclusive |
579 | dlOpts['daterange'] = dr # Always set a date range |
580 | elif 'daterange' in dlOpts.keys(): |
581 | del dlOpts['daterange'] # No date filter |
582 | |
583 | # This stops downloading from playlist after this many videos |
584 | stop = grupeList[grupe]['Stop'] |
585 | if stop != None and stop > 0: dlOpts['playlistend'] = stop |
586 | elif 'playlistend' in dlOpts.keys(): |
587 | del dlOpts['playlistend'] # No playlist limit |
588 | |
589 | # This will change downloaded file folder for each grupe |
590 | dlOpts['outtmpl'] = dlBase + grupe + ytdlFileFormat |
591 | |
592 | urls = grupeList[grupe]['urls'] |
593 | # Don't even try downloading videos we already have in the DB |
594 | newUrls = urls # self.filterUrls(conn, urls) |
595 | |
596 | yt_dlp.YoutubeDL(dlOpts).download(newUrls) # BEGIN DOWNLOADING!!! |
597 | print(f"YOUTUBE-DL PROCESSING COMPLETE for {grupe}", flush=True) |
598 | |
599 | # Wait for all callback threads to finish |
600 | for th in threading.enumerate(): |
601 | if th.name != "MainThread": |
602 | while th.is_alive(): time.sleep(0.1) |
603 | |
604 | # Log errors and print results of this DL grupe |
605 | good, fails = self.processGrupeResults(conn, cols, |
606 | len(urls), grupe, eLog) |
607 | total += good |
608 | failures += fails # Accumulate totals for this run |
609 | |
610 | eLog.close() |
611 | return total, failures |
612 | |
613 | # |
614 | # Display a summary of this download session. Return them for emailing. |
615 | # |
616 | def displaySummary(self, conn): |
617 | # |
618 | # Print the total number of files in the DB since it was created |
619 | # |
620 | sql = "SELECT COUNT(*), MIN(sqlts) FROM IPFS_HASH_INDEX;" |
621 | cols = (conn.cursor().execute(sql)).fetchone() # 1 row, 2 columns |
622 | total = f"\n{cols[0]} files downloaded and indexed since {cols[1][0:10]}" |
623 | print(total, flush=True) |
624 | mail = total + '\n' |
625 | # |
626 | # Report the number of files added to each grupe in the last 30 days |
627 | # |
628 | strt = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d") |
629 | sql = "SELECT DISTINCT SUBSTR(max(sqlts), 6, 5) as tme, grupe, count(*) as cnt " |
630 | sql += "FROM IPFS_HASH_INDEX WHERE sqlts > ? " |
631 | sql += "GROUP BY grupe ORDER BY sqlts desc;" |
632 | args = " Date Videos Grupe (Videos Added in the Last 30 Days)" |
633 | print(args, flush=True) |
634 | mail += args + '\n' |
635 | for cols in conn.execute(sql, (strt,)): |
636 | args = (cols['tme'], cols['cnt'], cols['grupe']) |
637 | mail += "%5s | %6d | %s\n" % args |
638 | print("%5s | %6d | %s" % args, flush=True) |
639 | # |
640 | # Print the total number for all grupes in last 30 days |
641 | # |
642 | sql = "SELECT COUNT(*) FROM IPFS_HASH_INDEX WHERE sqlts > ?" |
643 | dbObj = conn.cursor().execute(sql, (strt,)) |
644 | total = " Total: " |
645 | total += "%5d" % dbObj.fetchone()[0] |
646 | print(total, flush=True) |
647 | mail += total |
648 | # |
649 | # Report the files downloaded today as grupes, titles & IPFS URLs |
650 | # |
651 | urls = "" |
652 | sql = "SELECT grupe, title, vhash " |
653 | sql += "FROM IPFS_HASH_INDEX " |
654 | sql += "WHERE DATE(sqlts) = DATE('now', 'localtime', 'start of day');" |
655 | rows = (conn.cursor().execute(sql)).fetchall() |
656 | if len(rows) > 0: |
657 | args = "\nIPFS URLs for files downloaded today:" |
658 | urls = args + '\n' |
659 | print(args, flush=True) |
660 | for col in rows: |
661 | args = (col['grupe'], col['title'][:48], col['vhash']) |
662 | text = "%13s | %48s | https://ipfs.io/ipfs/%s" % args |
663 | urls += text + '\n' |
664 | print(text, flush=True) |
665 | else: |
666 | args = "\nRun complete. No files downloaded today." |
667 | print(args, flush=True) |
668 | mail += args |
669 | return mail, urls |
670 | |
671 | # Send a plain text message via email to recipient(s). Retry if necessary |
672 | def emailResults(self, server, account, subject, origin, to, text): |
673 | msg = EmailMessage() # Create a text/plain container |
674 | msg.set_content(text) |
675 | msg['Subject'] = subject |
676 | msg['From'] = origin |
677 | msg['To'] = to |
678 | |
679 | # tx-server: /usr/local/lib/python3.8/site-packages/certifi/cacert.pem |
680 | # context = ssl._create_unverified_context() |
681 | # with smtplib.SMTP_SSL(server[0], server[1], context=context) as emailer: |
682 | # emailer.login(account[0], account[1]) |
683 | # emailer.send_message(msg) |
684 | def sendMail(srvr, acct, mesg): |
685 | try: |
686 | context = ssl.create_default_context() |
687 | context.check_hostname = False |
688 | context.verify_mode = ssl.CERT_NONE |
689 | with smtplib.SMTP(srvr[0], srvr[1]) as emailer: |
690 | emailer.starttls(context=context) |
691 | emailer.login(acct[0], acct[1]) |
692 | emailer.send_message(mesg) |
693 | except Exception as e: |
694 | return self.decodeException(e) |
695 | return 0 # No Exception, so mesg was sent successfully |
696 | |
697 | for cnt in range(1, EMAIL_RETRIES): |
698 | unsent = sendMail(server, account, msg) |
699 | if unsent: |
700 | print(f"Attempt {cnt}: {unsent}", flush=True) |
701 | time.sleep(EMAIL_DELAY) |
702 | else: break # We're done if message sent |
703 | |
704 | ############################################################################## |
705 | # Get command line arguments. Returns a tuple with config and DB connection. # |
706 | # Usage: thisFile [-h] | <-c config> <-d sqlite> # |
707 | # # |
708 | # Parse command line and report config info. Prints usage and exists if args # |
709 | # are invalid or missing. mff is the metadataFields.json file to load # |
710 | ############################################################################## |
711 | def getCmdLineArgs(self): |
712 | if len(sys.argv) >= 5: |
713 | sqlDBfile = self.Config = conn = None |
714 | grupe = grupes = urls = 0 |
715 | |
716 | # Required parameter: -c config file |
717 | if sys.argv[1] == "-c" and os.path.isfile(sys.argv[2]): |
718 | with open(sys.argv[2], 'r') as jsn: |
719 | self.Config = json.load(jsn) |
720 | meta = self.Config['DLbase'] + self.Config['DLmeta'] |
721 | if len(meta) > 0: # Did config info load? |
722 | with open(meta, 'r') as jsn: # Now load meta fields |
723 | self.Config['MetaColumns'] = json.load(jsn)['MetaColumns'] |
724 | metaSize = len(self.Config['MetaColumns']) |
725 | if metaSize > 0: # All config info loaded OK? |
726 | for grupe in self.Config['Grupes']: # Count groups and urls in them |
727 | grupes += 1 |
728 | urls += len( self.Config['Grupes'][grupe]['urls'] ) |
729 | print("Database Metadata Columns=%d" % metaSize, flush=True) |
730 | print("Downloaded groups will be saved in %s" % self.Config['DLbase'], flush=True) |
731 | print("%d groups, %d urls to process" % (grupes, urls), flush=True) |
732 | else: self.usage() |
733 | else: self.usage() |
734 | |
735 | # Required parameter: -d SQLite database file |
736 | if sys.argv[3] == "-d": |
737 | sqlDBfile = sys.argv[4] |
738 | conn = self.openSQLiteDB(sqlDBfile) |
739 | conn.row_factory = sqlite3.Row # Results as python dictionary |
740 | if conn == None: self.usage() |
741 | |
742 | # Optional parameter: -g grupe from config to use |
743 | if len(sys.argv) >= 6 and sys.argv[5] == "-g": |
744 | print(f"Ignoring all grupes in config except {grupe}", flush=True) |
745 | for grupe in self.Config['Grupes']: # Mark all inactive except 1 |
746 | if grupe == sys.argv[6]: continue |
747 | self.Config['Grupes'][grupe]['Active'] = False |
748 | |
749 | if not os.path.isdir(self.Config['DLbase']): # Create folder for results |
750 | os.mkdir(self.Config['DLbase']) # if necessary |
751 | |
752 | return self.Config, conn, sqlDBfile # Return essential information |
753 | else: self.usage() |
754 | |
755 | ############################################################################## |
756 | # Primary starting point for script # |
757 | # ############################################################################ |
758 | def runScript(self, sqlFile, conn): |
759 | hash = None |
760 | if sqlFile is None or conn is None: |
761 | self.Config, conn, sqlFile = self.getCmdLineArgs() # Open config, DB |
762 | |
763 | #regenerateAllGrupeIndexes(conn) # Fix all grupe indexes |
764 | #exit(0) |
765 | |
766 | # Command line and config file processed, time to get down to it |
767 | good, fails = self.ytdlProcess(conn) |
768 | |
769 | mail, urls = self.displaySummary(conn) |
770 | conn.execute("vacuum;") # Flush changes in this session & make backup |
771 | conn.execute("vacuum into 'latest.sqlite';") # Flush all to backup |
772 | conn.close() # DBs should be the same now, but do sha256 hashes differ? |
773 | # They shouldn't, but updateRunInfo will make them differ! |
774 | |
775 | # If any downloads were successful, update IPFS with new SQLite file. |
776 | # This hash will appear in the IPFS_INFO table published next run. |
777 | if good > 0: |
778 | hash = self.publishDB("latest.sqlite") |
779 | args = f"\nThe newest SQLite DB hash is: {hash}\n" |
780 | if STATIC_DB_HASH: |
781 | args += "It is always available at:\n" |
782 | args += f"https://ipfs.io/ipns/{STATIC_DB_HASH}" |
783 | mail += args |
784 | print(args + "\n", flush=True) |
785 | |
786 | # Update IPFS_INFO table in SQL DB with results for this run |
787 | # and flush all changes to current database file, not latest. |
788 | # Current DB will actually be latest, tho not latest published. |
789 | self.updateRunInfo(sqlFile, hash, good, fails) |
790 | |
791 | if SEND_EMAIL: |
792 | self.emailResults(EMAIL_SERVR, EMAIL_LOGIN, |
793 | EMAIL_SUB1, EMAIL_FROM, EMAIL_LIST, mail) |
794 | |
795 | if len(EMAIL_URLS) > 0 and len(urls) > 0: |
796 | self.emailResults(EMAIL_SERVR, EMAIL_LOGIN, |
797 | EMAIL_SUB2, EMAIL_FROM, EMAIL_URLS, urls) |
798 | |
799 | |
800 | ############################################################################### |
801 | # main is called only if this file is invoked as a script not an object class # |
802 | ############################################################################### |
803 | if __name__ == "__main__": |
804 | mp = MediaProcessor() |
805 | mp.runScript(None, None) # Run script using command line parameters |
806 | exit(0) |
807 |
Built with git-ssb-web