186 lines
6.7 KiB
Python
Executable File
186 lines
6.7 KiB
Python
Executable File
#!/usr/bin/env python
|
|
import random
|
|
import os
|
|
import pathlib
|
|
from moviepy import editor
|
|
import threading
|
|
import time
|
|
import logging
|
|
import argparse
|
|
import configparser
|
|
|
|
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip
|
|
from pymediainfo import MediaInfo
|
|
|
|
|
|
if not os.path.isdir(os.path.dirname(__file__) + 'log'):
|
|
os.makedirs(os.path.dirname(__file__) + 'log')
|
|
threading.current_thread().name = "main"
|
|
|
|
a = os.path.dirname(__file__) + '/log/' + str(time.strftime("%Y-%m-%d %H-%M-%S")) + '.log'
|
|
logging.basicConfig(filename=a, filemode='w', format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
|
|
level='INFO')
|
|
|
|
|
|
class Args:
|
|
parser = ArgumentParser = argparse.ArgumentParser(
|
|
description='Extracting mediainfo, sample and screens from random timecodes from video')
|
|
parser.add_argument('-s', action='store_true', dest='screens', help="create screenshots")
|
|
parser.add_argument('-S', action='store_true', dest='sample', help="extract sample")
|
|
parser.add_argument('-m', action='store_true', dest='mediainfo', help='create mediainfo')
|
|
parser.add_argument('-n', action='store', dest="count", default=10,
|
|
help="number of screenshots, default 10", type=int)
|
|
parser.add_argument('-c', action='store', dest='config', help='config file', default='config/config.ini', type=str)
|
|
parser.add_argument('-i', action='store', dest='input', help="input file", type=str, required=True)
|
|
args = parser.parse_args()
|
|
|
|
logging.info(' '.join(f'{k}={v}' for k, v in vars(args).items()))
|
|
|
|
|
|
class Config:
|
|
conf_file = os.path.dirname(__file__) + '/' + Args.args.config
|
|
|
|
@staticmethod
|
|
def write(config, conf_file):
|
|
with open(conf_file, 'w') as configfile:
|
|
config.write(configfile)
|
|
|
|
config = configparser.ConfigParser()
|
|
config.add_section('Output')
|
|
config.set('Output', 'Dir', str(pathlib.Path.home()) + '/sample/')
|
|
|
|
if os.path.isdir('config'):
|
|
logging.info("Config dir is exist")
|
|
else:
|
|
try:
|
|
os.makedirs('config')
|
|
except OSError:
|
|
logging.error("Creation of the directory config failed")
|
|
else:
|
|
logging.info("Successfully created the config directory")
|
|
time.sleep(0.5)
|
|
|
|
if os.path.isfile(conf_file):
|
|
logging.info("Config is exist")
|
|
config2 = configparser.ConfigParser()
|
|
config2.read(conf_file)
|
|
for section in config.sections():
|
|
if config2.has_section(section):
|
|
for option in config.options(section):
|
|
if not config2.has_option(section, option):
|
|
config.set(section, option, config.get(section, option))
|
|
write(config, conf_file)
|
|
else:
|
|
config.add_section(section)
|
|
write(config, conf_file)
|
|
|
|
else:
|
|
logging.info("Creating Config")
|
|
write(config, conf_file)
|
|
|
|
|
|
class Head:
|
|
"""
|
|
Creates mediainfo, screenshots and sample from file
|
|
"""
|
|
|
|
def __init__(self):
|
|
if not Args.args.screens and not Args.args.sample and not Args.args.mediainfo:
|
|
Args.args.screens = Args.args.sample = Args.args.mediainfo = 1
|
|
|
|
config = configparser.ConfigParser()
|
|
config.read(os.path.dirname(__file__) + '/' + Args.args.config)
|
|
output = config["Output"]
|
|
save_dir = output["dir"]
|
|
|
|
procs = []
|
|
self.num_screen = 0
|
|
self.num_screen: int
|
|
self.bname: str
|
|
|
|
self.bname = os.path.basename(Args.args.input)
|
|
self.fold = save_dir + '/' + os.path.splitext(self.bname)[0]
|
|
logging.info("Filename is %s" % self.bname)
|
|
|
|
if os.path.isdir(self.fold):
|
|
logging.info("Directory %s is exist" % self.fold)
|
|
else:
|
|
try:
|
|
os.makedirs(self.fold)
|
|
except OSError:
|
|
logging.error("Creation of the directory %s failed" % self.fold)
|
|
else:
|
|
logging.info("Successfully created the directory %s " % self.fold)
|
|
logging.info("Files will be located at %s" % self.fold)
|
|
|
|
self.max_time = round(MediaInfo.parse(Args.args.input).tracks[0].duration / 1000)
|
|
|
|
if Args.args.sample:
|
|
thread = threading.Thread(name="sample", target=Head.sample, args=(self,),
|
|
kwargs={'fold': self.fold,
|
|
'bname': self.bname,
|
|
'max_time': self.max_time})
|
|
procs.append(thread)
|
|
thread.start()
|
|
|
|
if Args.args.screens:
|
|
logging.info("Rounded duration %ss" % self.max_time)
|
|
logging.info("Creating %s screenshots" % Args.args.count)
|
|
|
|
for self.num_screen in range(Args.args.count):
|
|
thread = threading.Thread(name="screen_" + str(self.num_screen + 1), target=Head.screens, args=(self,),
|
|
kwargs={'fold': self.fold,
|
|
'max_time': self.max_time,
|
|
'num_screen': self.num_screen})
|
|
|
|
procs.append(thread)
|
|
thread.start()
|
|
|
|
if Args.args.mediainfo:
|
|
thread = threading.Thread(name="mediainfo", target=Head.info(self, ),
|
|
kwargs={'fold': self.fold})
|
|
procs.append(thread)
|
|
thread.start()
|
|
|
|
for proc in procs:
|
|
proc.join()
|
|
|
|
def sample(self, **kwargs):
|
|
"""
|
|
Create sample with duration 2m if file longer then 6m
|
|
"""
|
|
|
|
sam = self.fold + "/sample" + os.path.splitext(self.bname)[1]
|
|
|
|
if self.max_time <= 360:
|
|
t1 = self.max_time / 3
|
|
t2 = self.max_time * 2 / 3
|
|
else:
|
|
t1 = self.max_time / 2 - 60
|
|
t2 = self.max_time / 2 + 60
|
|
|
|
logging.info(f"Creating Sample with duration {t2 - t1}s")
|
|
ffmpeg_extract_subclip(Args.args.input, t1, t2, targetname=sam)
|
|
|
|
def info(self, **kwargs):
|
|
media_info = MediaInfo.parse(Args.args.input, output="")
|
|
logging.info(f"Creating MediaInfo at {self.fold + '/mediainfo.txt'}")
|
|
f = open(self.fold + "/mediainfo.txt", 'w')
|
|
f.write(str(media_info))
|
|
f.close()
|
|
logging.info("Created MediaInfo")
|
|
|
|
def screens(self, **kwargs):
|
|
sec = random.randint(round(self.max_time * 0.05), round(self.max_time * 0.95))
|
|
logging.info(f"{self.num_screen + 1}st screen time {sec}s creating")
|
|
|
|
scr_cr = editor.VideoFileClip(Args.args.input)
|
|
scr_cr.save_frame(self.fold + "/" + str(sec) + ".png", t=sec)
|
|
scr_cr.audio.reader.close_proc()
|
|
scr_cr.reader.close()
|
|
logging.info(f"{self.num_screen + 1}st screen created")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
Head()
|