Inkycal/inkycal/custom/openweathermap_wrapper.py
2024-01-23 20:44:21 +01:00

289 lines
11 KiB
Python

import json
import logging
from datetime import datetime
from datetime import timedelta
from typing import Dict
from typing import List
from typing import Literal
import requests
from dateutil import tz
from inkycal.custom.functions import get_system_tz
TEMP_UNITS = Literal["celsius", "fahrenheit"]
WIND_UNITS = Literal["meters_sec", "km_hour", "miles_hour", "knots", "beaufort"]
logger = logging.getLogger(__name__)
logger.setLevel("DEBUG")
def is_timestamp_within_range(timestamp: datetime, start_time: datetime, end_time: datetime) -> bool:
# Check if the timestamp is within the range
return start_time <= timestamp <= end_time
class OpenWeatherMap:
def __init__(
self,
api_key: str,
city_id: int,
temp_unit: TEMP_UNITS = "celsius",
wind_unit: WIND_UNITS = "meters_sec",
language: str = "en",
) -> None:
self.api_key = api_key
self.city_id = city_id
self.temp_unit = temp_unit
self.wind_unit = wind_unit
self.language = language
self._api_version = "2.5"
self._base_url = f"https://api.openweathermap.org/data/{self._api_version}"
self.tz_zone = tz.gettz(get_system_tz())
def get_current_weather(self) -> Dict:
"""
Gets current weather status from this API: https://openweathermap.org/current
:return:
Current weather as dictionary
"""
# Gets weather forecast from this API:
current_weather_url = (
f"{self._base_url}/weather?id={self.city_id}&appid={self.api_key}&units=Metric&lang={self.language}"
)
response = requests.get(current_weather_url)
if not response.ok:
raise AssertionError(
f"Failure getting the current weather: code {response.status_code}. Reason: {response.text}"
)
current_data = json.loads(response.text)
current_weather = {}
current_weather["detailed_status"] = current_data["weather"][0]["description"]
current_weather["weather_icon_name"] = current_data["weather"][0]["icon"]
current_weather["temp"] = self.get_converted_temperature(
current_data["main"]["temp"]
) # OWM Unit Default: Kelvin, Metric: Celsius, Imperial: Fahrenheit
current_weather["temp_feels_like"] = self.get_converted_temperature(current_data["main"]["feels_like"])
current_weather["min_temp"] = self.get_converted_temperature(current_data["main"]["temp_min"])
current_weather["max_temp"] = self.get_converted_temperature(current_data["main"]["temp_max"])
current_weather["humidity"] = current_data["main"]["humidity"] # OWM Unit: % rH
current_weather["wind"] = self.get_converted_windspeed(
current_data["wind"]["speed"]
) # OWM Unit Default: meter/sec, Metric: meter/sec
current_weather["wind_gust"] = self.get_converted_windspeed(current_data["wind"]["gust"])
current_weather["uvi"] = None # TODO: this is no longer supported with 2.5 API, find alternative
current_weather["sunrise"] = current_data["sys"]["sunrise"] # unix timestamp
current_weather["sunset"] = current_data["sys"]["sunset"]
self.current_weather = current_weather
return current_weather
def get_weather_forecast(self) -> List[Dict]:
"""
Gets weather forecasts from this API: https://openweathermap.org/forecast5
What you get is a list of 40 forecasts for 3-hour time slices, totaling to 5 days.
:return:
Forecasts data dictionary
"""
#
forecast_url = (
f"{self._base_url}/forecast?id={self.city_id}&appid={self.api_key}&units=Metric&lang={self.language}"
)
response = requests.get(forecast_url)
if not response.ok:
raise AssertionError(
f"Failure getting the current weather: code {response.status_code}. Reason: {response.text}"
)
forecast_data = json.loads(response.text)["list"]
# Add forecast data to hourly_data_dict list of dictionaries
hourly_forecasts = []
for forecast in forecast_data:
# calculate combined precipitation (snow + rain)
precip_mm = 0.0
if "rain" in forecast.keys():
precip_mm = +forecast["rain"]["3h"] # OWM Unit: mm
if "snow" in forecast.keys():
precip_mm = +forecast["snow"]["3h"] # OWM Unit: mm
hourly_forecasts.append(
{
"temp": self.get_converted_temperature(
forecast["main"]["temp"]
), # OWM Unit Default: Kelvin, Metric: Celsius, Imperial: Fahrenheit
"min_temp": self.get_converted_temperature(forecast["main"]["temp_min"]),
"max_temp": self.get_converted_temperature(forecast["main"]["temp_max"]),
"precip_3h_mm": precip_mm,
"wind": self.get_converted_windspeed(
forecast["wind"]["speed"]
), # OWM Unit Default: meter/sec, Metric: meter/sec, Imperial: miles/hour
"wind_gust": self.get_converted_windspeed(forecast["wind"]["gust"]),
"pressure": forecast["main"]["pressure"], # OWM Unit: hPa
"humidity": forecast["main"]["humidity"], # OWM Unit: % rH
"precip_probability": forecast["pop"]
* 100.0, # OWM value is unitless, directly converting to % scale
"icon": forecast["weather"][0]["icon"],
"datetime": datetime.fromtimestamp(forecast["dt"], tz=self.tz_zone)
}
)
logger.debug(f"Added rain forecast at {datetime.fromtimestamp(forecast['dt'], tz=self.tz_zone)}: {precip_mm}")
self.hourly_forecasts = hourly_forecasts
return self.hourly_forecasts
def get_forecast_for_day(self, days_from_today: int) -> Dict:
"""
Get temperature range, rain and most frequent icon code
for the day that is days_from_today away
:param days_from_today:
should be int from 0-4: e.g. 2 -> 2 days from today
:return:
Forecast dictionary
"""
# Make sure hourly forecasts are up to date
_ = self.get_weather_forecast()
# Calculate the start and end times for the specified number of days from now
current_time = datetime.now()
start_time = (
(current_time + timedelta(days=days_from_today))
.replace(hour=0, minute=0, second=0, microsecond=0)
.astimezone(tz=self.tz_zone)
)
end_time = (start_time + timedelta(days=1)).astimezone(tz=self.tz_zone)
# Get all the forecasts for that day's time range
forecasts = [
f
for f in self.hourly_forecasts
if is_timestamp_within_range(timestamp=f["datetime"], start_time=start_time, end_time=end_time)
]
# In case the next available forecast is already for the next day, use that one for the less than 3 remaining hours of today
if forecasts == []:
forecasts.append(self.hourly_forecasts[0])
# Get rain and temperatures for that day
temps = [f["temp"] for f in forecasts]
rain = sum([f["precip_3h_mm"] for f in forecasts])
# Get all weather icon codes for this day
icons = [f["icon"] for f in forecasts]
day_icons = [icon for icon in icons if "d" in icon]
# Use the day icons if possible
icon = max(set(day_icons), key=icons.count) if len(day_icons) > 0 else max(set(icons), key=icons.count)
# Return a dict with that day's data
day_data = {
"datetime": start_time.timestamp(),
"icon": icon,
"temp_min": min(temps),
"temp_max": max(temps),
"precip_mm": rain,
}
return day_data
def get_converted_temperature(self, value: float) -> float:
if self.temp_unit == "fahrenheit":
value = self.celsius_to_fahrenheit(value)
return value
def get_converted_windspeed(self, value: float) -> float:
Literal["meters_sec", "km_hour", "miles_hour", "knots", "beaufort"]
if self.wind_unit == "km_hour":
value = self.celsius_to_fahrenheit(value)
elif self.wind_unit == "km_hour":
value = self.mps_to_kph(value)
elif self.wind_unit == "miles_hour":
value = self.mps_to_mph(value)
elif self.wind_unit == "knots":
value = self.mps_to_knots(value)
elif self.wind_unit == "beaufort":
value = self.mps_to_beaufort(value)
return value
@staticmethod
def mps_to_beaufort(meters_per_second: float) -> int:
"""Map meters per second to the beaufort scale.
Args:
meters_per_second:
float representing meters per seconds
Returns:
an integer of the beaufort scale mapping the input
"""
thresholds = [0.3, 1.6, 3.4, 5.5, 8.0, 10.8, 13.9, 17.2, 20.8, 24.5, 28.5, 32.7]
return next((i for i, threshold in enumerate(thresholds) if meters_per_second < threshold), 12)
@staticmethod
def mps_to_mph(meters_per_second: float) -> float:
"""Map meters per second to miles per hour
Args:
meters_per_second:
float representing meters per seconds.
Returns:
float representing the input value in miles per hour.
"""
# 1 m/s is approximately equal to 2.23694 mph
miles_per_hour = meters_per_second * 2.23694
return miles_per_hour
@staticmethod
def mps_to_kph(meters_per_second: float) -> float:
"""Map meters per second to kilometers per hour
Args:
meters_per_second:
float representing meters per seconds.
Returns:
float representing the input value in kilometers per hour.
"""
# 1 m/s is equal to 3.6 km/h
kph = meters_per_second * 3.6
return kph
@staticmethod
def mps_to_knots(meters_per_second: float) -> float:
"""Map meters per second to knots (nautical miles per hour)
Args:
meters_per_second:
float representing meters per seconds.
Returns:
float representing the input value in knots.
"""
# 1 m/s is equal to 1.94384 knots
knots = meters_per_second * 1.94384
return knots
@staticmethod
def celsius_to_fahrenheit(celsius: int or float) -> float:
"""Converts the given temperate from degrees Celsius to Fahrenheit."""
fahrenheit = (float(celsius) * 9.0 / 5.0) + 32.0
return fahrenheit
def main():
"""Main function, only used for testing purposes"""
key = ""
city = 2643743
lang = "de"
owm = OpenWeatherMap(api_key=key, city_id=city, language=lang)
current_weather = owm.get_current_weather()
print(current_weather)
hourly_forecasts = owm.get_weather_forecast()
print(owm.get_forecast_for_day(days_from_today=2))
if __name__ == "__main__":
main()