#!/usr/bin/env python import random import os import pathlib from moviepy import editor import threading import time import logging import argparse import configparser from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip from pymediainfo import MediaInfo if not os.path.isdir(os.path.dirname(__file__) + 'log'): os.makedirs(os.path.dirname(__file__) + 'log') threading.current_thread().name = "main" a = os.path.dirname(__file__) + '/log/' + str(time.strftime("%Y-%m-%d %H-%M-%S")) + '.log' logging.basicConfig(filename=a, filemode='w', format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s', level='INFO') class Args: parser = ArgumentParser = argparse.ArgumentParser( description='Extracting mediainfo, sample and screens from random timecodes from video') parser.add_argument('-s', action='store_true', dest='screens', help="create screenshots") parser.add_argument('-S', action='store_true', dest='sample', help="extract sample") parser.add_argument('-m', action='store_true', dest='mediainfo', help='create mediainfo') parser.add_argument('-n', action='store', dest="count", default=10, help="number of screenshots, default 10", type=int) parser.add_argument('-c', action='store', dest='config', help='config file', default='config/config.ini', type=str) parser.add_argument('-i', action='store', dest='input', help="input file", type=str, required=True) args = parser.parse_args() logging.info(' '.join(f'{k}={v}' for k, v in vars(args).items())) class Config: conf_file = os.path.dirname(__file__) + '/' + Args.args.config @staticmethod def write(config, conf_file): with open(conf_file, 'w') as configfile: config.write(configfile) config = configparser.ConfigParser() config.add_section('Output') config.set('Output', 'Dir', str(pathlib.Path.home()) + '/sample/') if os.path.isdir('config'): logging.info("Config dir is exist") else: try: os.makedirs('config') except OSError: logging.error("Creation of the directory config failed") else: logging.info("Successfully created the config directory") time.sleep(0.5) if os.path.isfile(conf_file): logging.info("Config is exist") config2 = configparser.ConfigParser() config2.read(conf_file) for section in config.sections(): if config2.has_section(section): for option in config.options(section): if not config2.has_option(section, option): config.set(section, option, config.get(section, option)) write(config, conf_file) else: config.add_section(section) write(config, conf_file) else: logging.info("Creating Config") write(config, conf_file) class Head: """ Creates mediainfo, screenshots and sample from file """ def __init__(self): if not Args.args.screens and not Args.args.sample and not Args.args.mediainfo: Args.args.screens = Args.args.sample = Args.args.mediainfo = 1 config = configparser.ConfigParser() config.read(os.path.dirname(__file__) + '/' + Args.args.config) output = config["Output"] save_dir = output["dir"] procs = [] self.num_screen = 0 self.num_screen: int self.bname: str self.bname = os.path.basename(Args.args.input) self.fold = save_dir + '/' + os.path.splitext(self.bname)[0] logging.info("Filename is %s" % self.bname) if os.path.isdir(self.fold): logging.info("Directory %s is exist" % self.fold) else: try: os.makedirs(self.fold) except OSError: logging.error("Creation of the directory %s failed" % self.fold) else: logging.info("Successfully created the directory %s " % self.fold) logging.info("Files will be located at %s" % self.fold) self.max_time = round(MediaInfo.parse(Args.args.input).tracks[0].duration / 1000) if Args.args.sample: thread = threading.Thread(name="sample", target=Head.sample, args=(self,), kwargs={'fold': self.fold, 'bname': self.bname, 'max_time': self.max_time}) procs.append(thread) thread.start() if Args.args.screens: logging.info("Rounded duration %ss" % self.max_time) logging.info("Creating %s screenshots" % Args.args.count) for self.num_screen in range(Args.args.count): thread = threading.Thread(name="screen_" + str(self.num_screen + 1), target=Head.screens, args=(self,), kwargs={'fold': self.fold, 'max_time': self.max_time, 'num_screen': self.num_screen}) procs.append(thread) thread.start() if Args.args.mediainfo: thread = threading.Thread(name="mediainfo", target=Head.info(self, ), kwargs={'fold': self.fold}) procs.append(thread) thread.start() for proc in procs: proc.join() def sample(self, **kwargs): """ Create sample with duration 2m if file longer then 6m """ sam = self.fold + "/sample" + os.path.splitext(self.bname)[1] if self.max_time <= 360: t1 = self.max_time / 3 t2 = self.max_time * 2 / 3 else: t1 = self.max_time / 2 - 60 t2 = self.max_time / 2 + 60 logging.info(f"Creating Sample with duration {t2 - t1}s") ffmpeg_extract_subclip(Args.args.input, t1, t2, targetname=sam) def info(self, **kwargs): media_info = MediaInfo.parse(Args.args.input, output="") logging.info(f"Creating MediaInfo at {self.fold + '/mediainfo.txt'}") f = open(self.fold + "/mediainfo.txt", 'w') f.write(str(media_info)) f.close() logging.info("Created MediaInfo") def screens(self, **kwargs): sec = random.randint(round(self.max_time * 0.05), round(self.max_time * 0.95)) logging.info(f"{self.num_screen + 1}st screen time {sec}s creating") scr_cr = editor.VideoFileClip(Args.args.input) scr_cr.save_frame(self.fold + "/" + str(sec) + ".png", t=sec) scr_cr.audio.reader.close_proc() scr_cr.reader.close() logging.info(f"{self.num_screen + 1}st screen created") if __name__ == '__main__': Head()