616 lines
23 KiB
Python
616 lines
23 KiB
Python
"""
|
|
Main class for inkycal Project
|
|
Copyright by aceinnolab
|
|
"""
|
|
|
|
import asyncio
|
|
import glob
|
|
import hashlib
|
|
|
|
import numpy
|
|
|
|
from inkycal import loggers # noqa
|
|
from inkycal.custom import *
|
|
from inkycal.display import Display
|
|
from inkycal.modules.inky_image import Inkyimage as Images
|
|
from inkycal.utils import JSONCache
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
settings = Settings()
|
|
|
|
CACHE_NAME = "inkycal_main"
|
|
|
|
|
|
class Inkycal:
|
|
"""Inkycal main class
|
|
|
|
Main class of Inkycal, test and run the main Inkycal program.
|
|
|
|
Args:
|
|
- settings_path = str -> the full path to your settings.json file
|
|
if no path is given, tries looking for settings file in /boot folder.
|
|
- render = bool (True/False) -> show the image on the epaper display?
|
|
|
|
Attributes:
|
|
- optimize = True/False. Reduce number of colours on the generated image
|
|
to improve rendering on E-Papers. Set this to False for 9.7" E-Paper.
|
|
"""
|
|
|
|
def __init__(self, settings_path: str or None = None, render: bool = True, use_pi_sugar: bool = False):
|
|
"""Initialise Inkycal"""
|
|
self._release = "2.0.3"
|
|
|
|
logger.info(f"Inkycal v{self._release} booting up...")
|
|
|
|
self.render = render
|
|
self.info = None
|
|
|
|
logger.info("Checking if a settings file is present...")
|
|
# load settings file - throw an error if file could not be found
|
|
if settings_path:
|
|
logger.info(f"Custom location for settings.json file specified: {settings_path}")
|
|
try:
|
|
with open(settings_path) as settings_file:
|
|
self.settings = json.load(settings_file)
|
|
|
|
except FileNotFoundError:
|
|
raise FileNotFoundError(
|
|
f"No settings.json file could be found in the specified location: {settings_path}")
|
|
|
|
else:
|
|
logger.info("Looking for settings.json file in /boot folder...")
|
|
try:
|
|
with open('/boot/settings.json', mode="r") as settings_file:
|
|
self.settings = json.load(settings_file)
|
|
|
|
except FileNotFoundError:
|
|
raise SettingsFileNotFoundError
|
|
|
|
self.disable_calibration = self.settings.get('disable_calibration', False)
|
|
if self.disable_calibration:
|
|
logger.info("Calibration disabled. Please proceed with caution to prevent ghosting.")
|
|
|
|
if not os.path.exists(settings.IMAGE_FOLDER):
|
|
os.mkdir(settings.IMAGE_FOLDER)
|
|
|
|
if not os.path.exists(settings.CACHE_PATH):
|
|
os.mkdir(settings.CACHE_PATH)
|
|
|
|
# Option to use epaper image optimisation, reduces colours
|
|
self.optimize = True
|
|
|
|
self.show_border = self.settings.get('border_around_modules', False)
|
|
|
|
self.cleanup()
|
|
|
|
# Load drivers if image should be rendered
|
|
if self.render:
|
|
# Init Display class with model in settings file
|
|
# from inkycal.display import Display
|
|
self.Display = Display(self.settings["model"])
|
|
|
|
# check if colours can be rendered
|
|
self.supports_colour = True if 'colour' in self.settings['model'] else False
|
|
|
|
# get calibration hours
|
|
self._calibration_hours = self.settings['calibration_hours']
|
|
|
|
# init calibration state
|
|
self._calibration_state = False
|
|
|
|
# Load and initialise modules specified in the settings file
|
|
self._module_number = 1
|
|
for module in self.settings['modules']:
|
|
module_name = module['name']
|
|
try:
|
|
loader = f'from inkycal.modules import {module_name}'
|
|
# print(loader)
|
|
exec(loader)
|
|
setup = f'self.module_{self._module_number} = {module_name}({module})'
|
|
# print(setup)
|
|
exec(setup)
|
|
width = module['config']['size'][0]
|
|
height = module['config']['size'][1]
|
|
logger.info(f'name : {module_name} size : {width}x{height} px')
|
|
|
|
self._module_number += 1
|
|
|
|
# If a module was not found, print an error message
|
|
except ImportError:
|
|
logger.exception(f'Could not find module: "{module}". Please try to import manually')
|
|
|
|
# If something unexpected happened, show the error message
|
|
except:
|
|
logger.exception(f"Exception: {traceback.format_exc()}.")
|
|
|
|
# Path to store images
|
|
self.image_folder = settings.IMAGE_FOLDER
|
|
|
|
# Remove old hashes
|
|
self._remove_hashes(self.image_folder)
|
|
|
|
# set up cache
|
|
if not os.path.exists(os.path.join(settings.CACHE_PATH, CACHE_NAME)):
|
|
if not os.path.exists(settings.CACHE_PATH):
|
|
os.mkdir(settings.CACHE_PATH)
|
|
self.cache = JSONCache(CACHE_NAME)
|
|
self.cache_data = self.cache.read()
|
|
|
|
self.counter = 0 if "counter" not in self.cache_data else int(self.cache_data["counter"])
|
|
|
|
self.use_pi_sugar = use_pi_sugar
|
|
self.battery_capacity = 100
|
|
|
|
if self.use_pi_sugar:
|
|
logger.info("PiSugar support enabled.")
|
|
from inkycal.utils import PiSugar
|
|
self.pisugar = PiSugar()
|
|
|
|
self.battery_capacity = self.pisugar.get_battery()
|
|
logger.info(f"PiSugar battery capacity: {self.battery_capacity}%")
|
|
|
|
if self.battery_capacity < 20:
|
|
logger.warning("Battery capacity is below 20%!")
|
|
|
|
logger.info("Setting system time to PiSugar time...")
|
|
if self.pisugar.rtc_pi2rtc():
|
|
logger.info("RTC time updates successfully")
|
|
else:
|
|
logger.warning("RTC time could not be set!")
|
|
|
|
print(
|
|
f"Using PiSigar model: {self.pisugar.get_model()}. Current PiSugar time: {self.pisugar.get_rtc_time()}")
|
|
|
|
# Give an OK message
|
|
logger.info('Inkycal initialised successfully!')
|
|
|
|
def countdown(self, interval_mins: int = None) -> int:
|
|
"""Returns the remaining time in seconds until the next display update based on the interval.
|
|
|
|
Args:
|
|
interval_mins (int): The interval in minutes for the update. If none is given, the value
|
|
from the settings file is used.
|
|
|
|
Returns:
|
|
int: The remaining time in seconds until the next update.
|
|
"""
|
|
# Default to settings if no interval is provided
|
|
if interval_mins is None:
|
|
interval_mins = self.settings["update_interval"]
|
|
|
|
# Get the current time
|
|
now = arrow.now()
|
|
|
|
# Calculate the next update time
|
|
# Finding the total minutes from the start of the day
|
|
minutes_since_midnight = now.hour * 60 + now.minute
|
|
|
|
# Finding the next interval point
|
|
minutes_to_next_interval = (
|
|
minutes_since_midnight // interval_mins + 1) * interval_mins - minutes_since_midnight
|
|
seconds_to_next_interval = minutes_to_next_interval * 60 - now.second
|
|
|
|
# Logging the remaining time in appropriate units
|
|
hours_to_next_interval = minutes_to_next_interval // 60
|
|
remaining_minutes = minutes_to_next_interval % 60
|
|
if hours_to_next_interval > 0:
|
|
print(f'{hours_to_next_interval} hours and {remaining_minutes} minutes left until next refresh')
|
|
else:
|
|
print(f'{remaining_minutes} minutes left until next refresh')
|
|
|
|
return seconds_to_next_interval
|
|
|
|
def dry_run(self):
|
|
"""Tests if Inkycal can run without issues.
|
|
|
|
Attempts to import module names from settings file. Loads the config
|
|
for each module and initializes the module. Tries to run the module and
|
|
checks if the images could be generated correctly.
|
|
|
|
Generated images can be found in the /images folder of Inkycal.
|
|
"""
|
|
logger.info(f'Selected E-paper display: {self.settings["model"]}')
|
|
|
|
# store module numbers in here
|
|
errors = []
|
|
|
|
# short info for info-section
|
|
self.info = f"{arrow.now().format('D MMM @ HH:mm')} "
|
|
|
|
for number in range(1, self._module_number):
|
|
name = eval(f"self.module_{number}.name")
|
|
success = self.process_module(number)
|
|
if success:
|
|
logger.debug(f'Image of module {name} generated successfully')
|
|
else:
|
|
logger.warning(f'Generating image of module {name} failed!')
|
|
errors.append(number)
|
|
self.info += f"module {number}: Error! "
|
|
|
|
if errors:
|
|
logger.error('Error/s in modules:', *errors)
|
|
del errors
|
|
|
|
self._assemble()
|
|
|
|
def _image_hash(self, _in):
|
|
"""Create a md5sum of a path or a bytes stream."""
|
|
if not isinstance(_in, str):
|
|
image_bytes = _in.tobytes()
|
|
else:
|
|
try:
|
|
with open(_in) as i:
|
|
return i.read()
|
|
except FileNotFoundError:
|
|
image_bytes = None
|
|
return hashlib.md5(image_bytes).hexdigest() if image_bytes else ""
|
|
|
|
def _remove_hashes(self, basepath):
|
|
for _file in glob.glob(f"{basepath}/*.hash"):
|
|
try:
|
|
os.remove(_file)
|
|
except:
|
|
pass
|
|
|
|
def _write_image_hash(self, path, _in):
|
|
"""Write hash to a file."""
|
|
with open(path, "w") as o:
|
|
o.write(self._image_hash(_in))
|
|
|
|
def _needs_image_update(self, _list):
|
|
"""Check if any image has been updated or not.
|
|
Input a list of tuples(str, image)."""
|
|
res = False
|
|
for item in _list:
|
|
_a = self._image_hash(item[0])
|
|
_b = self._image_hash(item[1])
|
|
print("{f1}:{h1} -> {h2}".format(f1=item[0], h1=_a, h2=_b))
|
|
if _a != _b:
|
|
res = True
|
|
self._write_image_hash(item[0], item[1])
|
|
print("Refresh needed: {a}".format(a=res))
|
|
return res
|
|
|
|
async def run(self, run_once=False):
|
|
"""Runs main program in nonstop mode or a single iteration based on the run_once flag.
|
|
|
|
Args:
|
|
run_once (bool): If True, runs the updating process once and stops. If False,
|
|
runs indefinitely.
|
|
|
|
Uses an infinity loop to run Inkycal nonstop or a single time based on run_once.
|
|
Inkycal generates the image from all modules, assembles them in one image,
|
|
refreshes the E-Paper and then sleeps until the next scheduled update or exits.
|
|
"""
|
|
# Get the time of initial run
|
|
runtime = arrow.now()
|
|
|
|
# Function to flip images upside down
|
|
upside_down = lambda image: image.rotate(180, expand=True)
|
|
|
|
logger.info(f'Inkycal version: v{self._release}')
|
|
logger.info(f'Selected E-paper display: {self.settings["model"]}')
|
|
|
|
while True:
|
|
logger.info("Starting new cycle...")
|
|
current_time = arrow.now(tz=get_system_tz())
|
|
logger.info(f"Timestamp: {current_time.format('HH:mm:ss DD.MM.YYYY')}")
|
|
self.cache_data["counter"] = self.counter
|
|
|
|
errors = [] # Store module numbers in here
|
|
|
|
# Short info for info-section
|
|
if not self.settings.get('image_hash', False):
|
|
self.info = f"{current_time.format('D MMM @ HH:mm')} "
|
|
else:
|
|
self.info = ""
|
|
|
|
for number in range(1, self._module_number):
|
|
success = self.process_module(number)
|
|
if not success:
|
|
errors.append(number)
|
|
self.info += f"im {number}: X "
|
|
|
|
if errors:
|
|
logger.error("Error/s in modules:", *errors)
|
|
self.counter = 0
|
|
self.cache_data["counter"] = 0
|
|
else:
|
|
self.counter += 1
|
|
self.cache_data["counter"] += 1
|
|
logger.info("All images generated successfully!")
|
|
del errors
|
|
|
|
if self.battery_capacity < 20:
|
|
self.info += "Low battery! "
|
|
|
|
# Assemble image from each module - add info section if specified
|
|
self._assemble()
|
|
|
|
# Check if image should be rendered
|
|
if self.render:
|
|
logger.info("Attempting to render image on display...")
|
|
display = self.Display
|
|
self._calibration_check()
|
|
if self._calibration_state:
|
|
# After calibration, we have to forcefully rewrite the screen
|
|
self._remove_hashes(self.image_folder)
|
|
|
|
if self.supports_colour:
|
|
im_black = Image.open(f"{self.image_folder}canvas.png")
|
|
im_colour = Image.open(f"{self.image_folder}canvas_colour.png")
|
|
|
|
# Flip the image by 180° if required
|
|
if self.settings['orientation'] == 180:
|
|
im_black = upside_down(im_black)
|
|
im_colour = upside_down(im_colour)
|
|
|
|
# Render the image on the display
|
|
if not self.settings.get('image_hash', False) or self._needs_image_update([
|
|
(f"{self.image_folder}/canvas.png.hash", im_black),
|
|
(f"{self.image_folder}/canvas_colour.png.hash", im_colour)
|
|
]):
|
|
display.render(im_black, im_colour)
|
|
|
|
# Part for black-white ePapers
|
|
else:
|
|
im_black = self._merge_bands()
|
|
|
|
# Flip the image by 180° if required
|
|
if self.settings['orientation'] == 180:
|
|
im_black = upside_down(im_black)
|
|
|
|
if not self.settings.get('image_hash', False) or self._needs_image_update([
|
|
(f"{self.image_folder}/canvas.png.hash", im_black), ]):
|
|
display.render(im_black)
|
|
|
|
logger.info(f'\nNo errors since {self.counter} display updates')
|
|
logger.info(f'program started {runtime.humanize()}')
|
|
|
|
# store the cache data
|
|
self.cache.write(self.cache_data)
|
|
|
|
# Exit the loop if run_once is True
|
|
if run_once:
|
|
break # Exit the loop after one full cycle if run_once is True
|
|
|
|
sleep_time = self.countdown()
|
|
|
|
if self.use_pi_sugar:
|
|
sleep_time_rtc = arrow.now(tz=get_system_tz()).shift(seconds=sleep_time)
|
|
result = self.pisugar.rtc_alarm_set(sleep_time_rtc, 127)
|
|
if result:
|
|
logger.info(f"Alarm set for {sleep_time_rtc.format('HH:mm:ss')}")
|
|
else:
|
|
logger.warning(f"Failed to set alarm for {sleep_time_rtc.format('HH:mm:ss')}")
|
|
|
|
await asyncio.sleep(sleep_time)
|
|
|
|
@staticmethod
|
|
def _merge_bands():
|
|
"""Merges black and coloured bands for black-white ePapers
|
|
returns the merged image
|
|
"""
|
|
|
|
im1_path = os.path.join(settings.image_folder, "canvas.png")
|
|
im2_path = os.path.join(settings.image_folder, "canvas_colour.png")
|
|
|
|
# If there is an image for black and colour, merge them
|
|
if os.path.exists(im1_path) and os.path.exists(im2_path):
|
|
|
|
im1 = Image.open(im1_path).convert('RGBA')
|
|
im2 = Image.open(im2_path).convert('RGBA')
|
|
|
|
im1 = Images.merge(im1, im2)
|
|
|
|
# If there is no image for the coloured-band, return the bw-image
|
|
elif os.path.exists(im1_path) and not os.path.exists(im2_path):
|
|
im1 = Image.open(im1_path).convert('RGBA')
|
|
|
|
else:
|
|
raise FileNotFoundError("Inkycal cannot find images to merge")
|
|
|
|
return im1
|
|
|
|
def _assemble(self):
|
|
"""Assembles all sub-images to a single image"""
|
|
|
|
# Create 2 blank images with the same resolution as the display
|
|
width, height = Display.get_display_size(self.settings["model"])
|
|
|
|
# Since Inkycal runs in vertical mode, switch the height and width
|
|
width, height = height, width
|
|
|
|
im_black = Image.new('RGB', (width, height), color='white')
|
|
im_colour = Image.new('RGB', (width, height), color='white')
|
|
|
|
# Set cursor for y-axis
|
|
im1_cursor = 0
|
|
im2_cursor = 0
|
|
|
|
for number in range(1, self._module_number):
|
|
|
|
# get the path of the current module's generated images
|
|
im1_path = f"{self.image_folder}module{number}_black.png"
|
|
im2_path = f"{self.image_folder}module{number}_colour.png"
|
|
|
|
# Check if there is an image for the black band
|
|
if os.path.exists(im1_path):
|
|
|
|
# Get actual size of image
|
|
im1 = Image.open(im1_path).convert('RGBA')
|
|
im1_size = im1.size
|
|
|
|
# Get the size of the section
|
|
section_size = [i for i in self.settings['modules'] if i['position'] == number][0]['config']['size']
|
|
|
|
# Calculate coordinates to center the image
|
|
x = int((section_size[0] - im1_size[0]) / 2)
|
|
|
|
# If this is the first module, use the y-offset
|
|
if im1_cursor == 0:
|
|
y = int((section_size[1] - im1_size[1]) / 2)
|
|
else:
|
|
y = im1_cursor + int((section_size[1] - im1_size[1]) / 2)
|
|
|
|
# center the image in the section space
|
|
im_black.paste(im1, (x, y), im1)
|
|
|
|
# Shift the y-axis cursor at the beginning of next section
|
|
im1_cursor += section_size[1]
|
|
|
|
# Check if there is an image for the coloured band
|
|
if os.path.exists(im2_path):
|
|
|
|
# Get actual size of image
|
|
im2 = Image.open(im2_path).convert('RGBA')
|
|
im2_size = im2.size
|
|
|
|
# Get the size of the section
|
|
section_size = [i for i in self.settings['modules'] if i['position'] == number][0]['config']['size']
|
|
|
|
# Calculate coordinates to center the image
|
|
x = int((section_size[0] - im2_size[0]) / 2)
|
|
|
|
# If this is the first module, use the y-offset
|
|
if im2_cursor == 0:
|
|
y = int((section_size[1] - im2_size[1]) / 2)
|
|
else:
|
|
y = im2_cursor + int((section_size[1] - im2_size[1]) / 2)
|
|
|
|
# center the image in the section space
|
|
im_colour.paste(im2, (x, y), im2)
|
|
|
|
# Shift the y-axis cursor at the beginning of next section
|
|
im2_cursor += section_size[1]
|
|
|
|
# Add info-section if specified --
|
|
|
|
# Calculate the max. fontsize for info-section
|
|
if self.settings['info_section']:
|
|
info_height = self.settings["info_section_height"]
|
|
info_width = width
|
|
font = self.font = ImageFont.truetype(
|
|
fonts['NotoSansUI-Regular'], size=14)
|
|
|
|
info_x = im_black.size[1] - info_height
|
|
write(im_black, (0, info_x), (info_width, info_height),
|
|
self.info, font=font)
|
|
|
|
# optimize the image by mapping colours to pure black and white
|
|
if self.optimize:
|
|
im_black = self._optimize_im(im_black)
|
|
im_colour = self._optimize_im(im_colour)
|
|
|
|
im_black.save(self.image_folder + 'canvas.png', 'PNG')
|
|
im_colour.save(self.image_folder + 'canvas_colour.png', 'PNG')
|
|
|
|
# Additionally, combine the two images with color
|
|
def clear_white(img):
|
|
"""Replace all white pixels from image with transparent pixels
|
|
"""
|
|
x = numpy.asarray(img.convert('RGBA')).copy()
|
|
x[:, :, 3] = (255 * (x[:, :, :3] != 255).any(axis=2)).astype(numpy.uint8)
|
|
return Image.fromarray(x)
|
|
|
|
# Additionally, combine the two images with color
|
|
def black_to_colour(img):
|
|
"""Replace all black pixels from image with red pixels
|
|
"""
|
|
buffer = numpy.array(img.convert('RGB'))
|
|
red, green = buffer[:, :, 0], buffer[:, :, 1]
|
|
|
|
threshold = 220
|
|
|
|
# non-white -> red
|
|
buffer[numpy.logical_and(red <= threshold, green <= threshold)] = [255, 0, 0]
|
|
|
|
return Image.fromarray(buffer)
|
|
|
|
# Save full-screen images as well
|
|
im_black = clear_white(im_black)
|
|
im_colour = black_to_colour(im_colour)
|
|
|
|
im_colour.paste(im_black, (0, 0), im_black)
|
|
im_colour.save(os.path.join(settings.IMAGE_FOLDER, 'full-screen.png'), 'PNG')
|
|
|
|
@staticmethod
|
|
def _optimize_im(image, threshold=220):
|
|
"""Optimize the image for rendering on ePaper displays"""
|
|
|
|
buffer = numpy.array(image.convert('RGB'))
|
|
red, green = buffer[:, :, 0], buffer[:, :, 1]
|
|
|
|
# grey->black
|
|
buffer[numpy.logical_and(red <= threshold, green <= threshold)] = [0, 0, 0]
|
|
image = Image.fromarray(buffer)
|
|
return image
|
|
|
|
def calibrate(self, cycles=3):
|
|
"""Calibrate the E-Paper display
|
|
|
|
Uses the Display class to calibrate the display with the default of 3
|
|
cycles. After a refresh cycle, a new image is generated and shown.
|
|
"""
|
|
|
|
self.Display.calibrate(cycles=cycles)
|
|
|
|
def _calibration_check(self):
|
|
"""Calibration scheduler
|
|
uses calibration hours from settings file to check if calibration is due.
|
|
If no calibration hours are set, calibration is skipped."""
|
|
|
|
# Check if calibration hours are not set or the list is empty
|
|
if not self._calibration_hours:
|
|
print("No calibration hours set. Skipping calibration.")
|
|
return
|
|
|
|
now = arrow.now()
|
|
if now.hour in self._calibration_hours and not self._calibration_state:
|
|
self.calibrate()
|
|
self._calibration_state = True
|
|
else:
|
|
self._calibration_state = False
|
|
|
|
@staticmethod
|
|
def cleanup():
|
|
# clean up old images in image_folder
|
|
if len(glob.glob(settings.IMAGE_FOLDER)) <= 1:
|
|
return
|
|
for _file in glob.glob(settings.IMAGE_FOLDER):
|
|
try:
|
|
os.remove(_file)
|
|
except:
|
|
logger.error(f"could not remove file: {_file}")
|
|
pass
|
|
|
|
def process_module(self, number) -> bool or Exception:
|
|
"""Process individual module to generate images and handle exceptions."""
|
|
module = eval(f'self.module_{number}')
|
|
try:
|
|
black, colour = module.generate_image()
|
|
if self.show_border:
|
|
draw_border_2(im=black, xy=(1, 1), size=(black.width - 2, black.height - 2), radius=5)
|
|
black.save(f"{self.image_folder}module{number}_black.png", "PNG")
|
|
colour.save(f"{self.image_folder}module{number}_colour.png", "PNG")
|
|
return True
|
|
except Exception:
|
|
logger.exception(f"Error in module {number}!")
|
|
return False
|
|
|
|
def _shutdown_system(self):
|
|
"""Shutdown the system"""
|
|
import subprocess
|
|
from time import sleep
|
|
try:
|
|
logger.info("Shutting down OS in 5 seconds...")
|
|
sleep(5)
|
|
subprocess.run(["sudo", "shutdown", "-h", "now"], check=True)
|
|
except subprocess.CalledProcessError:
|
|
logger.warning("Failed to execute shutdown command.")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print(f'running inkycal main in standalone/debug mode')
|