checkpoint
This commit is contained in:
206
inkycal/main.py
206
inkycal/main.py
@@ -6,44 +6,21 @@ Copyright by aceinnolab
|
||||
import asyncio
|
||||
import glob
|
||||
import hashlib
|
||||
from logging.handlers import RotatingFileHandler
|
||||
|
||||
import numpy
|
||||
|
||||
from inkycal import loggers # noqa
|
||||
from inkycal.custom import *
|
||||
from inkycal.display import Display
|
||||
from inkycal.modules.inky_image import Inkyimage as Images
|
||||
|
||||
# On the console, set a logger to show only important logs
|
||||
# (level ERROR or higher)
|
||||
stream_handler = logging.StreamHandler()
|
||||
stream_handler.setLevel(logging.ERROR)
|
||||
|
||||
if not os.path.exists(f'{top_level}/logs'):
|
||||
os.mkdir(f'{top_level}/logs')
|
||||
|
||||
# Save all logs to a file, which contains more detailed output
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s | %(name)s | %(levelname)s: %(message)s',
|
||||
datefmt='%d-%m-%Y %H:%M:%S',
|
||||
handlers=[
|
||||
stream_handler, # add stream handler from above
|
||||
RotatingFileHandler( # log to a file too
|
||||
f'{top_level}/logs/inkycal.log', # file to log
|
||||
maxBytes=2097152, # 2MB max filesize
|
||||
backupCount=5 # create max 5 log files
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
# Show less logging for PIL module
|
||||
logging.getLogger("PIL").setLevel(logging.WARNING)
|
||||
from inkycal.utils.json_cache import JSONCache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
settings = Settings()
|
||||
|
||||
CACHE_NAME = "inkycal_main"
|
||||
|
||||
# TODO: autostart -> supervisor?
|
||||
|
||||
class Inkycal:
|
||||
"""Inkycal main class
|
||||
@@ -62,13 +39,7 @@ class Inkycal:
|
||||
|
||||
def __init__(self, settings_path: str or None = None, render: bool = True):
|
||||
"""Initialise Inkycal"""
|
||||
|
||||
# Get the release version from setup.py
|
||||
with open(f'{top_level}/setup.py') as setup_file:
|
||||
for line in setup_file:
|
||||
if line.startswith('__version__'):
|
||||
self._release = line.split("=")[-1].replace("'", "").replace('"', "").replace(" ", "")
|
||||
break
|
||||
self._release = "2.0.3"
|
||||
|
||||
self.render = render
|
||||
self.info = None
|
||||
@@ -77,8 +48,7 @@ class Inkycal:
|
||||
if settings_path:
|
||||
try:
|
||||
with open(settings_path) as settings_file:
|
||||
settings = json.load(settings_file)
|
||||
self.settings = settings
|
||||
self.settings = json.load(settings_file)
|
||||
|
||||
except FileNotFoundError:
|
||||
raise FileNotFoundError(
|
||||
@@ -86,17 +56,19 @@ class Inkycal:
|
||||
|
||||
else:
|
||||
try:
|
||||
with open('/boot/settings.json') as settings_file:
|
||||
settings = json.load(settings_file)
|
||||
self.settings = settings
|
||||
with open('/boot/settings.json', mode="r") as settings_file:
|
||||
self.settings = json.load(settings_file)
|
||||
|
||||
except FileNotFoundError:
|
||||
raise SettingsFileNotFoundError
|
||||
|
||||
self.disable_calibration = self.settings.get('disable_calibration', False)
|
||||
|
||||
if not os.path.exists(image_folder):
|
||||
os.mkdir(image_folder)
|
||||
if not os.path.exists(settings.IMAGE_FOLDER):
|
||||
os.mkdir(settings.IMAGE_FOLDER)
|
||||
|
||||
if not os.path.exists(settings.CACHE_PATH):
|
||||
os.mkdir(settings.CACHE_PATH)
|
||||
|
||||
# Option to use epaper image optimisation, reduces colours
|
||||
self.optimize = True
|
||||
@@ -122,7 +94,7 @@ class Inkycal:
|
||||
|
||||
# Load and initialise modules specified in the settings file
|
||||
self._module_number = 1
|
||||
for module in settings['modules']:
|
||||
for module in self.settings['modules']:
|
||||
module_name = module['name']
|
||||
try:
|
||||
loader = f'from inkycal.modules import {module_name}'
|
||||
@@ -131,10 +103,9 @@ class Inkycal:
|
||||
setup = f'self.module_{self._module_number} = {module_name}({module})'
|
||||
# print(setup)
|
||||
exec(setup)
|
||||
logger.info(('name : {name} size : {width}x{height} px'.format(
|
||||
name=module_name,
|
||||
width=module['config']['size'][0],
|
||||
height=module['config']['size'][1])))
|
||||
width = module['config']['size'][0]
|
||||
height = module['config']['size'][1]
|
||||
logger.info(f'name : {module_name} size : {width}x{height} px')
|
||||
|
||||
self._module_number += 1
|
||||
|
||||
@@ -147,55 +118,56 @@ class Inkycal:
|
||||
logger.exception(f"Exception: {traceback.format_exc()}.")
|
||||
|
||||
# Path to store images
|
||||
self.image_folder = image_folder
|
||||
self.image_folder = settings.IMAGE_FOLDER
|
||||
|
||||
# Remove old hashes
|
||||
self._remove_hashes(self.image_folder)
|
||||
|
||||
# set up cache
|
||||
if not os.path.exists(os.path.join(settings.CACHE_PATH, CACHE_NAME)):
|
||||
if not os.path.exists(settings.CACHE_PATH):
|
||||
os.mkdir(settings.CACHE_PATH)
|
||||
self.cache = JSONCache(CACHE_NAME)
|
||||
self.cache_data = self.cache.read()
|
||||
|
||||
# Give an OK message
|
||||
print('loaded inkycal')
|
||||
|
||||
def countdown(self, interval_mins: int or None = None) -> int:
|
||||
"""Returns the remaining time in seconds until next display update.
|
||||
def countdown(self, interval_mins: int = None) -> int:
|
||||
"""Returns the remaining time in seconds until the next display update based on the interval.
|
||||
|
||||
Args:
|
||||
- interval_mins = int -> the interval in minutes for the update
|
||||
if no interval is given, the value from the settings file is used.
|
||||
interval_mins (int): The interval in minutes for the update. If none is given, the value
|
||||
from the settings file is used.
|
||||
|
||||
Returns:
|
||||
- int -> the remaining time in seconds until next update
|
||||
int: The remaining time in seconds until the next update.
|
||||
"""
|
||||
|
||||
# Check if empty, if empty, use value from settings file
|
||||
# Default to settings if no interval is provided
|
||||
if interval_mins is None:
|
||||
interval_mins = self.settings["update_interval"]
|
||||
|
||||
# Find out at which minutes the update should happen
|
||||
# Get the current time
|
||||
now = arrow.now()
|
||||
if interval_mins <= 60:
|
||||
update_timings = [(60 - interval_mins * updates) for updates in range(60 // interval_mins)][::-1]
|
||||
|
||||
# Calculate time in minutes until next update
|
||||
minutes = [_ for _ in update_timings if _ >= now.minute][0] - now.minute
|
||||
# Calculate the next update time
|
||||
# Finding the total minutes from the start of the day
|
||||
minutes_since_midnight = now.hour * 60 + now.minute
|
||||
|
||||
# Print the remaining time in minutes until next update
|
||||
print(f'{minutes} minutes left until next refresh')
|
||||
# Finding the next interval point
|
||||
minutes_to_next_interval = (
|
||||
minutes_since_midnight // interval_mins + 1) * interval_mins - minutes_since_midnight
|
||||
seconds_to_next_interval = minutes_to_next_interval * 60 - now.second
|
||||
|
||||
# Calculate time in seconds until next update
|
||||
remaining_time = minutes * 60 + (60 - now.second)
|
||||
|
||||
# Return seconds until next update
|
||||
return remaining_time
|
||||
# Logging the remaining time in appropriate units
|
||||
hours_to_next_interval = minutes_to_next_interval // 60
|
||||
remaining_minutes = minutes_to_next_interval % 60
|
||||
if hours_to_next_interval > 0:
|
||||
print(f'{hours_to_next_interval} hours and {remaining_minutes} minutes left until next refresh')
|
||||
else:
|
||||
# Calculate time in minutes until next update using the range of 24 hours in steps of every full hour
|
||||
update_timings = [(60 * 24 - interval_mins * updates) for updates in range(60 * 24 // interval_mins)][::-1]
|
||||
minutes = [_ for _ in update_timings if _ >= now.minute][0] - now.minute
|
||||
remaining_time = minutes * 60 + (60 - now.second)
|
||||
print(f'{remaining_minutes} minutes left until next refresh')
|
||||
|
||||
print(f'{round(minutes / 60, 1)} hours left until next refresh')
|
||||
|
||||
# Return seconds until next update
|
||||
return remaining_time
|
||||
return seconds_to_next_interval
|
||||
|
||||
def test(self):
|
||||
"""Tests if Inkycal can run without issues.
|
||||
@@ -218,20 +190,13 @@ class Inkycal:
|
||||
|
||||
for number in range(1, self._module_number):
|
||||
name = eval(f"self.module_{number}.name")
|
||||
module = eval(f'self.module_{number}')
|
||||
print(f'generating image(s) for {name}...', end="")
|
||||
try:
|
||||
black, colour = module.generate_image()
|
||||
if self.show_border:
|
||||
draw_border_2(im=black, xy=(1, 1), size=(black.width - 2, black.height - 2), radius=5)
|
||||
black.save(f"{self.image_folder}module{number}_black.png", "PNG")
|
||||
colour.save(f"{self.image_folder}module{number}_colour.png", "PNG")
|
||||
success = self.process_module(number)
|
||||
if success:
|
||||
print("OK!")
|
||||
except Exception:
|
||||
else:
|
||||
errors.append(number)
|
||||
self.info += f"module {number}: Error! "
|
||||
logger.exception("Error!")
|
||||
logger.exception(f"Exception: {traceback.format_exc()}.")
|
||||
|
||||
if errors:
|
||||
logger.error('Error/s in modules:', *errors)
|
||||
@@ -277,14 +242,17 @@ class Inkycal:
|
||||
print("Refresh needed: {a}".format(a=res))
|
||||
return res
|
||||
|
||||
async def run(self):
|
||||
"""Runs main program in nonstop mode.
|
||||
async def run(self, run_once=False):
|
||||
"""Runs main program in nonstop mode or a single iteration based on the run_once flag.
|
||||
|
||||
Uses an infinity loop to run Inkycal nonstop. Inkycal generates the image
|
||||
from all modules, assembles them in one image, refreshed the E-Paper and
|
||||
then sleeps until the next scheduled update.
|
||||
Args:
|
||||
run_once (bool): If True, runs the updating process once and stops. If False,
|
||||
runs indefinitely.
|
||||
|
||||
Uses an infinity loop to run Inkycal nonstop or a single time based on run_once.
|
||||
Inkycal generates the image from all modules, assembles them in one image,
|
||||
refreshes the E-Paper and then sleeps until the next scheduled update or exits.
|
||||
"""
|
||||
|
||||
# Get the time of initial run
|
||||
runtime = arrow.now()
|
||||
|
||||
@@ -303,31 +271,19 @@ class Inkycal:
|
||||
f"Time: {current_time.format('HH:mm')}")
|
||||
print('Generating images for all modules...', end='')
|
||||
|
||||
errors = [] # store module numbers in here
|
||||
errors = [] # Store module numbers in here
|
||||
|
||||
# short info for info-section
|
||||
# Short info for info-section
|
||||
if not self.settings.get('image_hash', False):
|
||||
self.info = f"{current_time.format('D MMM @ HH:mm')} "
|
||||
else:
|
||||
self.info = ""
|
||||
|
||||
for number in range(1, self._module_number):
|
||||
|
||||
# name = eval(f"self.module_{number}.name")
|
||||
module = eval(f'self.module_{number}')
|
||||
|
||||
try:
|
||||
black, colour = module.generate_image()
|
||||
if self.show_border:
|
||||
draw_border_2(im=black, xy=(1, 1), size=(black.width - 2, black.height - 2), radius=5)
|
||||
black.save(f"{self.image_folder}module{number}_black.png", "PNG")
|
||||
colour.save(f"{self.image_folder}module{number}_colour.png", "PNG")
|
||||
self.info += f"module {number}: OK "
|
||||
except Exception as e:
|
||||
success = self.process_module(number)
|
||||
if not success:
|
||||
errors.append(number)
|
||||
self.info += f"module {number}: Error! "
|
||||
logger.exception("Error!")
|
||||
logger.exception(f"Exception: {traceback.format_exc()}.")
|
||||
|
||||
if errors:
|
||||
logger.error("Error/s in modules:", *errors)
|
||||
@@ -343,10 +299,9 @@ class Inkycal:
|
||||
# Check if image should be rendered
|
||||
if self.render:
|
||||
display = self.Display
|
||||
|
||||
self._calibration_check()
|
||||
if self._calibration_state:
|
||||
# after calibration, we have to forcefully rewrite the screen
|
||||
# After calibration, we have to forcefully rewrite the screen
|
||||
self._remove_hashes(self.image_folder)
|
||||
|
||||
if self.supports_colour:
|
||||
@@ -358,17 +313,15 @@ class Inkycal:
|
||||
im_black = upside_down(im_black)
|
||||
im_colour = upside_down(im_colour)
|
||||
|
||||
# render the image on the display
|
||||
# Render the image on the display
|
||||
if not self.settings.get('image_hash', False) or self._needs_image_update([
|
||||
(f"{self.image_folder}/canvas.png.hash", im_black),
|
||||
(f"{self.image_folder}/canvas_colour.png.hash", im_colour)
|
||||
]):
|
||||
# render the image on the display
|
||||
display.render(im_black, im_colour)
|
||||
|
||||
# Part for black-white ePapers
|
||||
elif not self.supports_colour:
|
||||
|
||||
im_black = self._merge_bands()
|
||||
|
||||
# Flip the image by 180° if required
|
||||
@@ -376,13 +329,15 @@ class Inkycal:
|
||||
im_black = upside_down(im_black)
|
||||
|
||||
if not self.settings.get('image_hash', False) or self._needs_image_update([
|
||||
(f"{self.image_folder}/canvas.png.hash", im_black),
|
||||
]):
|
||||
(f"{self.image_folder}/canvas.png.hash", im_black),]):
|
||||
display.render(im_black)
|
||||
|
||||
print(f'\nNo errors since {counter} display updates \n'
|
||||
f'program started {runtime.humanize()}')
|
||||
|
||||
if run_once:
|
||||
break # Exit the loop after one full cycle if run_once is True
|
||||
|
||||
sleep_time = self.countdown()
|
||||
await asyncio.sleep(sleep_time)
|
||||
|
||||
@@ -392,7 +347,8 @@ class Inkycal:
|
||||
returns the merged image
|
||||
"""
|
||||
|
||||
im1_path, im2_path = image_folder + 'canvas.png', image_folder + 'canvas_colour.png'
|
||||
im1_path = os.path.join(settings.image_folder, "canvas.png")
|
||||
im2_path = os.path.join(settings.image_folder, "canvas_colour.png")
|
||||
|
||||
# If there is an image for black and colour, merge them
|
||||
if os.path.exists(im1_path) and os.path.exists(im2_path):
|
||||
@@ -531,7 +487,7 @@ class Inkycal:
|
||||
im_colour = black_to_colour(im_colour)
|
||||
|
||||
im_colour.paste(im_black, (0, 0), im_black)
|
||||
im_colour.save(image_folder + 'full-screen.png', 'PNG')
|
||||
im_colour.save(os.path.join(settings.IMAGE_FOLDER, 'full-screen.png'), 'PNG')
|
||||
|
||||
@staticmethod
|
||||
def _optimize_im(image, threshold=220):
|
||||
@@ -574,13 +530,29 @@ class Inkycal:
|
||||
@staticmethod
|
||||
def cleanup():
|
||||
# clean up old images in image_folder
|
||||
for _file in glob.glob(f"{image_folder}*.png"):
|
||||
if len(glob.glob(settings.IMAGE_FOLDER)) <= 1:
|
||||
return
|
||||
for _file in glob.glob(settings.IMAGE_FOLDER):
|
||||
try:
|
||||
os.remove(_file)
|
||||
except:
|
||||
logger.error(f"could not remove file: {_file}")
|
||||
pass
|
||||
|
||||
def process_module(self, number) -> bool or Exception:
|
||||
"""Process individual module to generate images and handle exceptions."""
|
||||
module = eval(f'self.module_{number}')
|
||||
try:
|
||||
black, colour = module.generate_image()
|
||||
if self.show_border:
|
||||
draw_border_2(im=black, xy=(1, 1), size=(black.width - 2, black.height - 2), radius=5)
|
||||
black.save(f"{self.image_folder}module{number}_black.png", "PNG")
|
||||
colour.save(f"{self.image_folder}module{number}_colour.png", "PNG")
|
||||
return True
|
||||
except Exception:
|
||||
logger.exception(f"Error in module {number}!")
|
||||
return False
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(f'running inkycal main in standalone/debug mode')
|
||||
|
||||
Reference in New Issue
Block a user