forked from kytos-ng/mef_eline
-
Notifications
You must be signed in to change notification settings - Fork 0
/
scheduler.py
154 lines (123 loc) · 5.07 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""Module responsible for handling schedulers."""
from uuid import uuid4
from apscheduler.jobstores.base import JobLookupError
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from pytz import utc
from kytos.core import log
class CircuitSchedule:
"""Schedule events."""
def __init__(self, **kwargs):
"""Create a CircuitSchedule object."""
self._id = kwargs.get("id", uuid4().hex)
self.date = kwargs.get("date", None)
# The minimum number of seconds to wait between retries
self.interval = kwargs.get("interval", None)
# Frequency uses Cron format. Ex: "* * * * * *"
self.frequency = kwargs.get("frequency", None)
self.action = kwargs.get("action", "create")
@property
def id(self): # pylint: disable=invalid-name
"""Return this EVC's ID."""
return self._id
@id.setter
def id(self, value): # pylint: disable=invalid-name
"""Set this EVC's ID."""
self._id = value
def as_dict(self):
"""Return a dictionary representing an circuit schedule object."""
circuit_scheduler_dict = {"id": self.id, "action": self.action}
if self.date:
circuit_scheduler_dict["date"] = self.date
if self.frequency:
circuit_scheduler_dict["frequency"] = self.frequency
if self.interval:
circuit_scheduler_dict["interval"] = self.interval
return circuit_scheduler_dict
@classmethod
def from_dict(cls, data):
"""Return a CircuitSchedule object from dict."""
return cls(**data)
class Scheduler:
"""Class to schedule the circuits rules.
It is responsible to create/remove schedule jobs based on
Circuit Schedules.
"""
def __init__(self):
"""Create a new schedule structure."""
self.scheduler = BackgroundScheduler(timezone=utc)
self.scheduler.start()
def shutdown(self):
"""Shutdown the scheduler."""
self.scheduler.shutdown(wait=False)
def add(self, circuit):
"""
Add all circuit_scheduler from specific circuit.
Args:
circuit (napps.kytos.mef_eline.models.EVCBase): EVC circuit
"""
for circuit_scheduler in circuit.circuit_scheduler:
self.add_circuit_job(circuit, circuit_scheduler)
def remove(self, circuit):
"""Remove all scheduler from a circuit."""
for job in circuit.circuit_scheduler:
self.cancel_job(job.id)
def add_circuit_job(self, circuit, circuit_scheduler):
"""
Prepare the Circuit data to be added to the Scheduler.
:param circuit(napps.kytos.mef_eline.models.EVCBase): EVC circuit
:param circuit_scheduler (CircuitSchedule): Circuit schedule data
:return:
"""
if circuit_scheduler.action == "create":
job_call = circuit.deploy
elif circuit_scheduler.action == "remove":
job_call = circuit.remove
else:
raise ValueError("Scheduler action must be 'create' or 'remove'")
data = {"id": circuit_scheduler.id}
if circuit_scheduler.date:
data.update({"run_date": circuit_scheduler.date})
else:
data.update(
{
"start_date": circuit.start_date,
"end_date": circuit.end_date,
}
)
if circuit_scheduler.interval:
data.update(circuit_scheduler.interval)
self.add_job(circuit_scheduler, job_call, data)
def add_job(self, circuit_scheduler, job_call, data):
"""
Add a specific cron job to the scheduler.
Args:
circuit_scheduler: CircuitSchedule object
job_call: function to be called by the job
data: Dict to pass to the job_call as parameter
if job_call is a date, the template is like:
{'id': <ID>, 'run_date': date } or
{'id': <ID>, 'start_date': date, 'end_date': date }
if job_call is an interval, the template is like:
{ 'id': <ID>,
'hours': 2,
'minutes': 3
}
if job_call is frequency, the template is the cron format.
"""
if circuit_scheduler.date:
self.scheduler.add_job(job_call, "date", **data)
elif circuit_scheduler.interval:
self.scheduler.add_job(job_call, "interval", **data)
elif circuit_scheduler.frequency:
cron = CronTrigger.from_crontab(
circuit_scheduler.frequency, timezone=utc
)
self.scheduler.add_job(job_call, cron, **data)
def cancel_job(self, circuit_scheduler_id):
"""Cancel a specific job from scheduler."""
try:
self.scheduler.remove_job(circuit_scheduler_id)
except JobLookupError as job_error:
# Job was not found... Maybe someone already remove it.
log.error(f"Scheduler error cancelling job. {0}".format(job_error))