150 lines
4.7 KiB
Python
150 lines
4.7 KiB
Python
import select
|
|
import threading
|
|
import time
|
|
from collections import namedtuple
|
|
|
|
import channel
|
|
from constants import cronmessage_types
|
|
|
|
# time field uses the monotonic time returned by time.monotonic()
|
|
Event = namedtuple('Event', ['time', 'channel', 'message'])
|
|
|
|
class CronThread(threading.Thread):
|
|
def __init__(self, cron_control_channel):
|
|
self.cron_control_channel = cron_control_channel
|
|
|
|
# Sorted array of events
|
|
self.events = []
|
|
|
|
threading.Thread.__init__(self)
|
|
|
|
def get_timeout_value(self):
|
|
if len(self.events) == 0:
|
|
# No events, block until we get a message
|
|
# Timeout of -1 makes poll block indefinitely
|
|
return -1
|
|
|
|
else:
|
|
# First event in the array is always the earliest
|
|
seconds_to_wait = self.events[0].time - time.monotonic()
|
|
|
|
# time.monotonic() uses fractional second but poll uses milliseconds, convert
|
|
ms_to_wait = int(seconds_to_wait * 1000)
|
|
|
|
# In case we somehow took long enough that next one should be run by now, make it run now
|
|
if ms_to_wait < 0:
|
|
ms_to_wait = 0
|
|
|
|
return ms_to_wait
|
|
|
|
def run_events(self):
|
|
assert len(self.events) > 0
|
|
|
|
current_time = time.monotonic()
|
|
|
|
# Look for first event that should be run after current time, and split the array there
|
|
# index should point at the first to be after current time, or at end of array
|
|
# Either way, we can split the array at that location, first part being what to run and second rest
|
|
index = 0
|
|
while index < len(self.events):
|
|
if self.events[index].time > current_time:
|
|
break
|
|
index += 1
|
|
|
|
# Split array
|
|
to_run = self.events[:index]
|
|
self.events = self.events[index:]
|
|
|
|
# Run events
|
|
for event in to_run:
|
|
event.channel.send(event.message)
|
|
|
|
def add_event(self, event):
|
|
# Look for first event that should be run after event, and split the array there
|
|
# index should point at the first to be after event, or at end of array
|
|
# Either way, we can split the array at that location safely
|
|
index = 0
|
|
while index < len(self.events):
|
|
if self.events[index].time > event.time:
|
|
break
|
|
index += 1
|
|
|
|
self.events = self.events[:index] + [event] + self.events[index:]
|
|
|
|
def delete_event(self, event):
|
|
# Try to find the element with same channel and message
|
|
index = 0
|
|
while index < len(self.events):
|
|
if self.events[index].channel == event.channel and self.events[index].message == event.message:
|
|
break
|
|
index += 1
|
|
|
|
if index < len(self.events):
|
|
# The event at index is the one we need to delete
|
|
self.events = self.events[:index] + self.events[index + 1:]
|
|
|
|
def reschedule_event(self, event):
|
|
self.delete_event(event)
|
|
self.add_event(event)
|
|
|
|
def run(self):
|
|
# Create poll object and register the control channel
|
|
# The poll object is used to implement both waiting and control of the cron thread
|
|
poll = select.poll()
|
|
poll.register(self.cron_control_channel, select.POLLIN)
|
|
|
|
while True:
|
|
timeout = self.get_timeout_value()
|
|
poll_result = poll.poll(timeout)
|
|
|
|
if len(poll_result) == 0:
|
|
# No fds were ready → we timed out. Time to run some events
|
|
self.run_events()
|
|
|
|
else:
|
|
# New message was received, handle it
|
|
command_type, *arguments = self.cron_control_channel.recv()
|
|
|
|
if command_type == cronmessage_types.quit:
|
|
break
|
|
|
|
elif command_type == cronmessage_types.schedule:
|
|
event, = arguments
|
|
self.add_event(event)
|
|
|
|
elif command_type == cronmessage_types.delete:
|
|
event, = arguments
|
|
self.delete_event(event)
|
|
|
|
elif command_type == cronmessage_types.reschedule:
|
|
event, = arguments
|
|
self.reschedule_event(event)
|
|
|
|
else:
|
|
assert False #unreachable
|
|
|
|
def start():
|
|
cron_control_channel = channel.Channel()
|
|
CronThread(cron_control_channel).start()
|
|
return cron_control_channel
|
|
|
|
def quit(cron_control_channel):
|
|
"""Stop the cron instance"""
|
|
cron_control_channel.send((cronmessage_types.quit,))
|
|
|
|
def schedule(cron_control_channel, seconds, channel, message):
|
|
"""Schedules message to be send to channel"""
|
|
event = Event(time.monotonic() + seconds, channel, message)
|
|
cron_control_channel.send((cronmessage_types.schedule, event))
|
|
|
|
def delete(cron_control_channel, channel, message):
|
|
"""Remove an event. If event is not found, this is a no-op.
|
|
Matches events based on channel and message, and only applies to the earlier one found."""
|
|
event = Event(None, channel, message)
|
|
cron_control_channel.send((cronmessage_types.delete, event))
|
|
|
|
def reschedule(cron_control_channel, seconds, channel, message):
|
|
"""Reschedules message to be send to channel. If event is not found, a new one is created.
|
|
Matches events based on channel and message, and only applies to the earlier one found."""
|
|
event = Event(time.monotonic() + seconds, channel, message)
|
|
cron_control_channel.send((cronmessage_types.reschedule, event))
|