import logging
import signal
import os
from mako.lookup import TemplateLookup
from .process_thread import ProcessThread
from .periods import Period
logger = logging.getLogger('periodtask.task')
(SKIP, DELAY, RUN) = (0, 1, 2)
base_dir = os.path.dirname(os.path.realpath(__file__))
default_template_dir = os.path.join(base_dir, 'templates')
[docs]class Task:
"""
Represents a task to schedule.
:param str name: The name of the task, will apear in logs and emails.
:param tuple command: See ``args`` param of the `Popen constructor
<https://docs.python.org/3/library/subprocess.html#subprocess.Popen>`_.
:param list/str periods: A cron expression (str) or a list of them. See
:doc:`cronref` for more information. By default (when set to an empyt
string) this will be equivalent to ``0 */5 * * * * UTC``.
:param bool run_on_start: Indicates weather the task should run when the
scheduler starts no matter what was given in ``periods``. Useful for
manually testing the task.
:param func/bool mail_success: If set to a truthy value an email will be
sent after the task run successfully. If this is a function, this
function will be used to send out the email (if **send_mail_func** does
not override it). The signature of the function is
.. code-block:: python
def send_mail(subject, message, html_message=None)
:param func/bool mail_failure: Controls emails sent when the task fails.
Otherwise it is the same as **mail_success**.
:param func/bool mail_skipped: Controls emails sent when the task is
skipped due to the defined **policy**.
:param func/bool mail_deleyed: Controls emails sent when the task is
delayed due to the defined **policy**.
:param func send_mail_func: If set, this must be a function. This function
will be used to send emails, no matter what was set in **mail_...**
params.
:param number wait_timeout: After sending **stop_signal** to the task
process, we wait this many seconds for the process to stop. If the
timeout expires, we kill the process.
:param int/tuple max_lines: STDOUT and STDERR are collected from the task
process. To avoid haevy memory usage we only store this many lines in
memory. More precisely STDOUT head and tail, STDERR head and tail are
list of lines. This parameter controls the maximum length of these lists.
examples:
==================== ==== ==== ==== ====
parameter stdout stderr
-------------------- --------- ---------
value head tail head tail
==================== ==== ==== ==== ====
``2`` 2 2 2 2
``(2, 3)`` 2 2 3 3
``(10, (2,3))`` 10 10 2 3
``((1, 2), (3, 4))`` 1 2 3 4
==================== ==== ==== ==== ====
:param int stop_signal: This signal will be sent to the task process when
we want to stop it gracefully.
:param int policy: Available values are ``periodtask.SKIP``,
``periodtask.DELAY`` and ``periodtask.RUN``.
**SKIP**
If a process is (still) running and the task is scheduled, this new
process will be skipped. If requested, an email will be sent.
**DELAY**
If a process is (still) running and the task is scheduled, this new
process will be delayed and will run immediatelly when the actual
process terminates. If requested, an email will be sent.
**RUN**
Tasks will always run when scheduled.
:param list/str template_dir: Directories to look for email templates in.
:param logging.logger stdout_logger: The logger to use for the STDOUT of
the task process.
:param int stdout_level: The STDOUT of the task process will be logged to
this level.
:param logging.logger stderr_logger: The logger to use for the STDERR of
the task process.
:param int stderr_level: The STDERR of the task process will be logged to
this level.
:param str cwd: The task process will run with ``cwd`` as the working
directory. See the `Popen constructor
<https://docs.python.org/3/library/subprocess.html#subprocess.Popen>`_.
:param int/None skip_delayed_email_threshold: In havaria situations
instead of sending SKIP or DELAYED emails forever, only this much
consecutive emails will be send. When the send queue becomes empty
a NO BLOCK email will be sent. ``None`` means no threshold.
:param int/None failure_email_threshold: When a task fails more than
this in a row no new FAILURE email will be sent. When the task runs
successfully a RECOVER email will be sent. ``None`` means no threshold.
"""
def __init__(
self, name, command,
periods='',
run_on_start=False,
mail_success=None,
mail_failure=None,
mail_skipped=None,
mail_delayed=None,
send_mail_func=None,
wait_timeout=10,
max_lines=50,
stop_signal=signal.SIGTERM,
policy=SKIP,
template_dir=[],
stdout_logger=logging.getLogger('periodtask.stdout'),
stdout_level=logging.INFO,
stderr_logger=logging.getLogger('periodtask.stderr'),
stderr_level=logging.INFO,
cwd=None,
skip_delayed_email_threshold=5,
failure_email_threshold=5
):
if not isinstance(periods, list) and not isinstance(periods, tuple):
periods = [periods]
self.periods = [Period(x) for x in periods]
self.name = name
self.command = command
self.run_on_start = run_on_start
self.mail_success = mail_success
self.mail_failure = mail_failure
self.mail_skipped = mail_skipped
self.mail_delayed = mail_delayed
if mail_success and send_mail_func:
self.mail_success = send_mail_func
if mail_failure and send_mail_func:
self.mail_failure = send_mail_func
if mail_skipped and send_mail_func:
self.mail_skipped = send_mail_func
if mail_delayed and send_mail_func:
self.mail_delayed = send_mail_func
self.wait_timeout = wait_timeout
self.max_lines = max_lines
self.stop_signal = stop_signal
self.policy = policy
if isinstance(template_dir, list):
template_dir = template_dir + [default_template_dir]
else:
template_dir = [template_dir] + [default_template_dir]
self.template_lookup = TemplateLookup(
directories=template_dir, default_filters=['h']
)
self.stdout_logger = stdout_logger
self.stdout_level = stdout_level
self.stderr_logger = stderr_logger
self.stderr_level = stderr_level
self.cwd = cwd
self.skip_delayed_email_threshold = skip_delayed_email_threshold
self.failure_email_threshold = failure_email_threshold
self.process_threads = []
self.first_check = True
self.delay_queue = []
# self.email_limitation_active = False
self.failure_email_sent = 0
self.skip_delayed_email_sent = 0
def check_second(self, sec):
if self.first_check:
self.first_check = False
if self.run_on_start:
return 'START'
for period in self.periods:
chk = period._check(sec)
if chk:
return chk
return False
def start_process_thread(self, formatted_sec):
msg = 'task %s starts process for %s' % (self.name, formatted_sec)
logger.info(msg)
thrd = ProcessThread(
self.name,
self.command,
self.stop_signal,
self.wait_timeout,
formatted_sec,
self.max_lines,
self.stdout_logger,
self.stdout_level,
self.stderr_logger,
self.stderr_level,
self.cwd,
)
self.process_threads.append(thrd)
thrd.start()
def send_mail_template(
self, send_func,
subject_template, text_template, html_template, **kwargs
):
get_template = self.template_lookup.get_template
subject = get_template(subject_template)
subject = subject.render(**kwargs)
subject = ''.join(subject.splitlines())
text = get_template(text_template)
text = text.render(**kwargs)
html = get_template(html_template)
html = html.render(**kwargs)
send_func(subject, text, html_message=html)
def check_subprocesses(self):
if not self.process_threads:
return
new_process_threads = []
for subproc in self.process_threads:
if subproc.is_alive():
new_process_threads.append(subproc)
continue
retcode = subproc.returncode
msg = 'task %s started for %s terminated with code %s'
msg = msg % (self.name, subproc.formatted_sec, retcode)
logger.info(msg)
if retcode == 0:
if self.mail_success:
self.send_mail_template(
self.mail_success,
'success_subject.txt',
'success.txt',
'success.html',
subproc=subproc
)
if (
self.mail_failure and
self.failure_email_threshold is not None and
self.failure_email_sent >= self.failure_email_threshold
):
self.send_mail_template(
self.mail_failure,
'recover_subject.txt',
'recover.txt',
'recover.html',
task=self,
subproc=subproc
)
self.failure_email_sent = 0
else:
if (
self.mail_failure and (
self.failure_email_threshold is None or
self.failure_email_sent <
self.failure_email_threshold
)
):
self.failure_email_sent += 1
self.send_mail_template(
self.mail_failure,
'failure_subject.txt',
'failure.txt',
'failure.html',
subproc=subproc
)
self.process_threads = new_process_threads
def skipped_or_delayed(self, formatted_sec, typ='skipped'):
if getattr(self, 'mail_%s' % typ):
self.send_mail_template(
getattr(self, 'mail_%s' % typ),
'%s_subject.txt' % typ,
'%s.txt' % typ,
'%s.html' % typ,
running=self.process_threads,
current_sec=formatted_sec,
task_name=self.name,
delay_queue=self.delay_queue
)
def check_for_second(self, sec):
formatted_sec = self.check_second(sec)
if formatted_sec:
self.delay_queue.append(formatted_sec)
if self.process_threads:
if self.policy == SKIP:
if not formatted_sec:
return
self.delay_queue.pop() # we skip it
msg = 'task %s %s for %s' % (
self.name, 'skipped', formatted_sec
)
logger.warning(msg)
# do we need to send a skip e-mail?
if (
self.skip_delayed_email_threshold is None or
self.skip_delayed_email_sent <
self.skip_delayed_email_threshold
):
self.skipped_or_delayed(formatted_sec)
self.skip_delayed_email_sent += 1
return
elif self.policy == DELAY:
if not formatted_sec:
return
msg = 'task %s %s for %s' % (
self.name, 'delayed', formatted_sec
)
logger.warning(msg)
# do we need to send a delayed e-mail?
if (
self.skip_delayed_email_threshold is None or
self.skip_delayed_email_sent <
self.skip_delayed_email_threshold
):
self.skipped_or_delayed(formatted_sec, typ='delayed')
self.skip_delayed_email_sent += 1
return
if self.delay_queue:
self.start_process_thread(self.delay_queue.pop(0))
if (
not self.delay_queue and
self.skip_delayed_email_threshold is not None and
self.skip_delayed_email_sent >=
self.skip_delayed_email_threshold
):
func = None
if self.policy == SKIP:
func = self.mail_skipped
elif self.policy == DELAY:
func = self.mail_delayed
if func:
self.send_mail_template(
func,
'noblock_subject.txt',
'noblock.txt',
'noblock.html',
task=self
)
self.skip_delayed_email_sent = 0
return True
def stop(self, check_subprocesses=True):
if self.process_threads:
for proc in self.process_threads:
proc.stop()
proc.join()
if check_subprocesses:
self.check_subprocesses()
logger.info('task stopped: %s' % self.name)