287 lines
11 KiB
Python
287 lines
11 KiB
Python
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from datetime import timedelta
|
|
from typing import Dict
|
|
from typing import List
|
|
from typing import Literal
|
|
|
|
import requests
|
|
from dateutil import tz
|
|
|
|
from inkycal.custom.functions import get_system_tz
|
|
|
|
TEMP_UNITS = Literal["celsius", "fahrenheit"]
|
|
WIND_UNITS = Literal["meters_sec", "km_hour", "miles_hour", "knots", "beaufort"]
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel("DEBUG")
|
|
|
|
|
|
def is_timestamp_within_range(timestamp: datetime, start_time: datetime, end_time: datetime) -> bool:
|
|
# Check if the timestamp is within the range
|
|
return start_time <= timestamp <= end_time
|
|
|
|
|
|
class OpenWeatherMap:
|
|
def __init__(
|
|
self,
|
|
api_key: str,
|
|
city_id: int,
|
|
temp_unit: TEMP_UNITS = "celsius",
|
|
wind_unit: WIND_UNITS = "meters_sec",
|
|
language: str = "en",
|
|
) -> None:
|
|
self.api_key = api_key
|
|
self.city_id = city_id
|
|
self.temp_unit = temp_unit
|
|
self.wind_unit = wind_unit
|
|
self.language = language
|
|
self._api_version = "2.5"
|
|
self._base_url = f"https://api.openweathermap.org/data/{self._api_version}"
|
|
self.tz_zone = tz.gettz(get_system_tz())
|
|
|
|
def get_current_weather(self) -> Dict:
|
|
"""
|
|
Gets current weather status from this API: https://openweathermap.org/current
|
|
:return:
|
|
Current weather as dictionary
|
|
"""
|
|
# Gets weather forecast from this API:
|
|
current_weather_url = (
|
|
f"{self._base_url}/weather?id={self.city_id}&appid={self.api_key}&units=Metric&lang={self.language}"
|
|
)
|
|
response = requests.get(current_weather_url)
|
|
if not response.ok:
|
|
raise AssertionError(
|
|
f"Failure getting the current weather: code {response.status_code}. Reason: {response.text}"
|
|
)
|
|
current_data = json.loads(response.text)
|
|
|
|
current_weather = {}
|
|
current_weather["detailed_status"] = current_data["weather"][0]["description"]
|
|
current_weather["weather_icon_name"] = current_data["weather"][0]["icon"]
|
|
current_weather["temp"] = self.get_converted_temperature(
|
|
current_data["main"]["temp"]
|
|
) # OWM Unit Default: Kelvin, Metric: Celsius, Imperial: Fahrenheit
|
|
current_weather["temp_feels_like"] = self.get_converted_temperature(current_data["main"]["feels_like"])
|
|
current_weather["min_temp"] = self.get_converted_temperature(current_data["main"]["temp_min"])
|
|
current_weather["max_temp"] = self.get_converted_temperature(current_data["main"]["temp_max"])
|
|
current_weather["humidity"] = current_data["main"]["humidity"] # OWM Unit: % rH
|
|
current_weather["wind"] = self.get_converted_windspeed(
|
|
current_data["wind"]["speed"]
|
|
) # OWM Unit Default: meter/sec, Metric: meter/sec
|
|
current_weather["wind_gust"] = self.get_converted_windspeed(current_data["wind"]["gust"])
|
|
current_weather["uvi"] = None # TODO: this is no longer supported with 2.5 API, find alternative
|
|
|
|
self.current_weather = current_weather
|
|
|
|
return current_weather
|
|
|
|
def get_weather_forecast(self) -> List[Dict]:
|
|
"""
|
|
Gets weather forecasts from this API: https://openweathermap.org/forecast5
|
|
What you get is a list of 40 forecasts for 3-hour time slices, totaling to 5 days.
|
|
:return:
|
|
Forecasts data dictionary
|
|
"""
|
|
#
|
|
forecast_url = (
|
|
f"{self._base_url}/forecast?id={self.city_id}&appid={self.api_key}&units=Metric&lang={self.language}"
|
|
)
|
|
response = requests.get(forecast_url)
|
|
if not response.ok:
|
|
raise AssertionError(
|
|
f"Failure getting the current weather: code {response.status_code}. Reason: {response.text}"
|
|
)
|
|
forecast_data = json.loads(response.text)["list"]
|
|
|
|
# Add forecast data to hourly_data_dict list of dictionaries
|
|
hourly_forecasts = []
|
|
for forecast in forecast_data:
|
|
# calculate combined precipitation (snow + rain)
|
|
precip_mm = 0.0
|
|
if "rain" in forecast.keys():
|
|
precip_mm = +forecast["rain"]["3h"] # OWM Unit: mm
|
|
if "snow" in forecast.keys():
|
|
precip_mm = +forecast["snow"]["3h"] # OWM Unit: mm
|
|
hourly_forecasts.append(
|
|
{
|
|
"temp": self.get_converted_temperature(
|
|
forecast["main"]["temp"]
|
|
), # OWM Unit Default: Kelvin, Metric: Celsius, Imperial: Fahrenheit
|
|
"min_temp": self.get_converted_temperature(forecast["main"]["temp_min"]),
|
|
"max_temp": self.get_converted_temperature(forecast["main"]["temp_max"]),
|
|
"precip_3h_mm": precip_mm,
|
|
"wind": self.get_converted_windspeed(
|
|
forecast["wind"]["speed"]
|
|
), # OWM Unit Default: meter/sec, Metric: meter/sec, Imperial: miles/hour
|
|
"wind_gust": self.get_converted_windspeed(forecast["wind"]["gust"]),
|
|
"pressure": forecast["main"]["pressure"], # OWM Unit: hPa
|
|
"humidity": forecast["main"]["humidity"], # OWM Unit: % rH
|
|
"precip_probability": forecast["pop"]
|
|
* 100.0, # OWM value is unitless, directly converting to % scale
|
|
"icon": forecast["weather"][0]["icon"],
|
|
"datetime": datetime.fromtimestamp(forecast["dt"], tz=self.tz_zone)
|
|
}
|
|
)
|
|
logger.debug(f"Added rain forecast at {datetime.fromtimestamp(forecast['dt'], tz=self.tz_zone)}: {precip_mm}")
|
|
|
|
self.hourly_forecasts = hourly_forecasts
|
|
|
|
return self.hourly_forecasts
|
|
|
|
def get_forecast_for_day(self, days_from_today: int) -> Dict:
|
|
"""
|
|
Get temperature range, rain and most frequent icon code
|
|
for the day that is days_from_today away
|
|
:param days_from_today:
|
|
should be int from 0-4: e.g. 2 -> 2 days from today
|
|
:return:
|
|
Forecast dictionary
|
|
"""
|
|
# Make sure hourly forecasts are up to date
|
|
_ = self.get_weather_forecast()
|
|
|
|
# Calculate the start and end times for the specified number of days from now
|
|
current_time = datetime.now()
|
|
start_time = (
|
|
(current_time + timedelta(days=days_from_today))
|
|
.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
.astimezone(tz=self.tz_zone)
|
|
)
|
|
end_time = (start_time + timedelta(days=1)).astimezone(tz=self.tz_zone)
|
|
|
|
# Get all the forecasts for that day's time range
|
|
forecasts = [
|
|
f
|
|
for f in self.hourly_forecasts
|
|
if is_timestamp_within_range(timestamp=f["datetime"], start_time=start_time, end_time=end_time)
|
|
]
|
|
|
|
# In case the next available forecast is already for the next day, use that one for the less than 3 remaining hours of today
|
|
if forecasts == []:
|
|
forecasts.append(self.hourly_forecasts[0])
|
|
|
|
# Get rain and temperatures for that day
|
|
temps = [f["temp"] for f in forecasts]
|
|
rain = sum([f["precip_3h_mm"] for f in forecasts])
|
|
|
|
# Get all weather icon codes for this day
|
|
icons = [f["icon"] for f in forecasts]
|
|
day_icons = [icon for icon in icons if "d" in icon]
|
|
|
|
# Use the day icons if possible
|
|
icon = max(set(day_icons), key=icons.count) if len(day_icons) > 0 else max(set(icons), key=icons.count)
|
|
|
|
# Return a dict with that day's data
|
|
day_data = {
|
|
"datetime": start_time.timestamp(),
|
|
"icon": icon,
|
|
"temp_min": min(temps),
|
|
"temp_max": max(temps),
|
|
"precip_mm": rain,
|
|
}
|
|
|
|
return day_data
|
|
|
|
def get_converted_temperature(self, value: float) -> float:
|
|
if self.temp_unit == "fahrenheit":
|
|
value = self.celsius_to_fahrenheit(value)
|
|
return value
|
|
|
|
def get_converted_windspeed(self, value: float) -> float:
|
|
Literal["meters_sec", "km_hour", "miles_hour", "knots", "beaufort"]
|
|
if self.wind_unit == "km_hour":
|
|
value = self.celsius_to_fahrenheit(value)
|
|
elif self.wind_unit == "km_hour":
|
|
value = self.mps_to_kph(value)
|
|
elif self.wind_unit == "miles_hour":
|
|
value = self.mps_to_mph(value)
|
|
elif self.wind_unit == "knots":
|
|
value = self.mps_to_knots(value)
|
|
elif self.wind_unit == "beaufort":
|
|
value = self.mps_to_beaufort(value)
|
|
return value
|
|
|
|
@staticmethod
|
|
def mps_to_beaufort(meters_per_second: float) -> int:
|
|
"""Map meters per second to the beaufort scale.
|
|
|
|
Args:
|
|
meters_per_second:
|
|
float representing meters per seconds
|
|
|
|
Returns:
|
|
an integer of the beaufort scale mapping the input
|
|
"""
|
|
thresholds = [0.3, 1.6, 3.4, 5.5, 8.0, 10.8, 13.9, 17.2, 20.8, 24.5, 28.5, 32.7]
|
|
return next((i for i, threshold in enumerate(thresholds) if meters_per_second < threshold), 12)
|
|
|
|
@staticmethod
|
|
def mps_to_mph(meters_per_second: float) -> float:
|
|
"""Map meters per second to miles per hour
|
|
|
|
Args:
|
|
meters_per_second:
|
|
float representing meters per seconds.
|
|
|
|
Returns:
|
|
float representing the input value in miles per hour.
|
|
"""
|
|
# 1 m/s is approximately equal to 2.23694 mph
|
|
miles_per_hour = meters_per_second * 2.23694
|
|
return miles_per_hour
|
|
|
|
@staticmethod
|
|
def mps_to_kph(meters_per_second: float) -> float:
|
|
"""Map meters per second to kilometers per hour
|
|
|
|
Args:
|
|
meters_per_second:
|
|
float representing meters per seconds.
|
|
|
|
Returns:
|
|
float representing the input value in kilometers per hour.
|
|
"""
|
|
# 1 m/s is equal to 3.6 km/h
|
|
kph = meters_per_second * 3.6
|
|
return kph
|
|
|
|
@staticmethod
|
|
def mps_to_knots(meters_per_second: float) -> float:
|
|
"""Map meters per second to knots (nautical miles per hour)
|
|
|
|
Args:
|
|
meters_per_second:
|
|
float representing meters per seconds.
|
|
|
|
Returns:
|
|
float representing the input value in knots.
|
|
"""
|
|
# 1 m/s is equal to 1.94384 knots
|
|
knots = meters_per_second * 1.94384
|
|
return knots
|
|
|
|
@staticmethod
|
|
def celsius_to_fahrenheit(celsius: int or float) -> float:
|
|
"""Converts the given temperate from degrees Celsius to Fahrenheit."""
|
|
fahrenheit = (float(celsius) * 9.0 / 5.0) + 32.0
|
|
return fahrenheit
|
|
|
|
|
|
def main():
|
|
"""Main function, only used for testing purposes"""
|
|
key = ""
|
|
city = 2643743
|
|
lang = "de"
|
|
owm = OpenWeatherMap(api_key=key, city_id=city, language=lang)
|
|
|
|
current_weather = owm.get_current_weather()
|
|
print(current_weather)
|
|
hourly_forecasts = owm.get_weather_forecast()
|
|
print(owm.get_forecast_for_day(days_from_today=2))
|
|
|
|
if __name__ == "__main__":
|
|
main()
|