Mercurial > libervia-backend
view libervia/backend/tools/common/date_utils.py @ 4151:18026ce0819c
core (xmpp): message reception workflow refactoring:
- Call methods from a root async one instead of using Deferred callbacks chain.
- Use a queue to be sure to process messages in order.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 22 Nov 2023 14:50:35 +0100 |
parents | 4325a0f13b0f |
children | c6d85c31a59f |
line wrap: on
line source
#!/usr/bin/env python3 # SAT: a jabber client # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """tools to help manipulating time and dates""" from typing import Union import calendar import datetime import re import time from babel import dates from dateutil import parser, tz from dateutil.parser import ParserError from dateutil.relativedelta import relativedelta from dateutil.utils import default_tzinfo from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.i18n import _ RELATIVE_RE = re.compile( r"\s*(?P<in>\bin\b)?" r"(?P<date>[^+-][^+]+[^\s+-])?\s*(?P<direction>[-+])?\s*" r"\s*(?P<quantity>\d+)\s*" r"(?P<unit>(second|sec|s|minute|min|month|mo|m|hour|hr|h|day|d|week|w|year|yr|y))s?" r"(?P<ago>\s+ago)?\s*", re.I ) TIME_SYMBOL_MAP = { "s": "second", "sec": "second", "m": "minute", "min": "minute", "h": "hour", "hr": "hour", "d": "day", "w": "week", "mo": "month", "y": "year", "yr": "year", } YEAR_FIRST_RE = re.compile(r"\d{4}[^\d]+") TZ_UTC = tz.tzutc() TZ_LOCAL = tz.gettz() def date_parse(value, default_tz=TZ_UTC): """Parse a date and return corresponding unix timestamp @param value(unicode): date to parse, in any format supported by parser @param default_tz(datetime.tzinfo): default timezone @return (int): timestamp """ value = str(value).strip() dayfirst = False if YEAR_FIRST_RE.match(value) else True try: dt = default_tzinfo( parser.parse(value, dayfirst=dayfirst), default_tz) except ParserError as e: if value == "now": dt = datetime.datetime.now(tz.tzutc()) else: try: # the date may already be a timestamp return int(value) except ValueError: raise e return calendar.timegm(dt.utctimetuple()) def date_parse_ext(value, default_tz=TZ_UTC): """Extended date parse which accept relative date @param value(unicode): date to parse, in any format supported by parser and with the hability to specify X days/weeks/months/years in the past or future. Relative date are specified either with something like `[main_date] +1 week` or with something like `3 days ago`, and it is case insensitive. [main_date] is a date parsable by parser, or empty to specify current date/time. "now" can also be used to specify current date/time. @param default_tz(datetime.tzinfo): same as for date_parse @return (int): timestamp """ m = RELATIVE_RE.match(value) if m is None: return date_parse(value, default_tz=default_tz) if sum(1 for g in ("direction", "in", "ago") if m.group(g)) > 1: raise ValueError( _('You can use only one of direction (+ or -), "in" and "ago"')) if m.group("direction") == '-' or m.group("ago"): direction = -1 else: direction = 1 date = m.group("date") if date is not None: date = date.strip() if not date or date == "now": dt = datetime.datetime.now(tz.tzutc()) else: try: dt = default_tzinfo(parser.parse(date, dayfirst=True), default_tz) except ParserError as e: try: timestamp = int(date) except ValueError: raise e else: dt = datetime.datetime.fromtimestamp(timestamp, tz.tzutc()) quantity = int(m.group("quantity")) unit = m.group("unit").lower() try: unit = TIME_SYMBOL_MAP[unit] except KeyError: pass delta_kw = {f"{unit}s": direction * quantity} dt = dt + relativedelta(**delta_kw) return calendar.timegm(dt.utctimetuple()) def date_fmt( timestamp: Union[float, int, str], fmt: str = "short", date_only: bool = False, auto_limit: int = 7, auto_old_fmt: str = "short", auto_new_fmt: str = "relative", locale_str: str = C.DEFAULT_LOCALE, tz_info: Union[datetime.tzinfo, str] = TZ_UTC ) -> str: """Format date according to locale @param timestamp: unix time @param fmt: one of: - short: e.g. u'31/12/17' - medium: e.g. u'Apr 1, 2007' - long: e.g. u'April 1, 2007' - full: e.g. u'Sunday, April 1, 2007' - relative: format in relative time e.g.: 3 hours note that this format is not precise - iso: ISO 8601 format e.g.: u'2007-04-01T19:53:23Z' - auto: use auto_old_fmt if date is older than auto_limit else use auto_new_fmt - auto_day: shorcut to set auto format with change on day old format will be short, and new format will be time only or a free value which is passed to babel.dates.format_datetime (see http://babel.pocoo.org/en/latest/dates.html?highlight=pattern#pattern-syntax) @param date_only: if True, only display date (not datetime) @param auto_limit: limit in days before using auto_old_fmt use 0 to have a limit at last midnight (day change) @param auto_old_fmt: format to use when date is older than limit @param auto_new_fmt: format to use when date is equal to or more recent than limit @param locale_str: locale to use (as understood by babel) @param tz_info: time zone to use """ timestamp = float(timestamp) if isinstance(tz_info, str): tz_info = tz.gettz(tz_info) if fmt == "auto_day": fmt, auto_limit, auto_old_fmt, auto_new_fmt = "auto", 0, "short", "HH:mm" if fmt == "auto": if auto_limit == 0: now = datetime.datetime.now(tz_info) # we want to use given tz_info, so we don't use date() or today() today = datetime.datetime(year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo) today = calendar.timegm(today.utctimetuple()) if timestamp < today: fmt = auto_old_fmt else: fmt = auto_new_fmt else: days_delta = (time.time() - timestamp) / 3600 if days_delta > (auto_limit or 7): fmt = auto_old_fmt else: fmt = auto_new_fmt if fmt == "relative": delta = timestamp - time.time() return dates.format_timedelta( delta, granularity="minute", add_direction=True, locale=locale_str ) elif fmt in ("short", "long", "full"): if date_only: dt = datetime.datetime.fromtimestamp(timestamp, tz_info) return dates.format_date(dt, format=fmt, locale=locale_str) else: return dates.format_datetime(timestamp, format=fmt, locale=locale_str, tzinfo=tz_info) elif fmt == "iso": if date_only: fmt = "yyyy-MM-dd" else: fmt = "yyyy-MM-ddTHH:mm:ss'Z'" return dates.format_datetime(timestamp, format=fmt) else: return dates.format_datetime(timestamp, format=fmt, locale=locale_str, tzinfo=tz_info) def delta2human(start_ts: Union[float, int], end_ts: Union[float, int]) -> str: """Convert delta of 2 unix times to human readable text @param start_ts: timestamp of starting time @param end_ts: timestamp of ending time """ if end_ts < start_ts: raise exceptions.InternalError( "end timestamp must be bigger or equal to start timestamp !" ) rd = relativedelta( datetime.datetime.fromtimestamp(end_ts), datetime.datetime.fromtimestamp(start_ts) ) text_elems = [] for unit in ("years", "months", "days", "hours", "minutes"): value = getattr(rd, unit) if value == 1: # we remove final "s" when there is only 1 text_elems.append(f"1 {unit[:-1]}") elif value > 1: text_elems.append(f"{value} {unit}") return ", ".join(text_elems) def get_timezone_name(tzinfo, timestamp: Union[float, int]) -> str: """ Get the DST-aware timezone name for a given timezone and timestamp. @param tzinfo: The timezone to get the name for @param timestamp: The timestamp to use, as a Unix timestamp (number of seconds since the Unix epoch). @return: The DST-aware timezone name. """ dt = datetime.datetime.fromtimestamp(timestamp) dt_tz = dt.replace(tzinfo=tzinfo) tz_name = dt_tz.tzname() if tz_name is None: raise exceptions.InternalError("tz_name should not be None") return tz_name