diff options
Diffstat (limited to 'bin/vcs_query.py')
-rwxr-xr-x | bin/vcs_query.py | 358 |
1 files changed, 358 insertions, 0 deletions
diff --git a/bin/vcs_query.py b/bin/vcs_query.py new file mode 100755 index 0000000..7862047 --- /dev/null +++ b/bin/vcs_query.py @@ -0,0 +1,358 @@ +#!/usr/bin/env python3 +# -*- coding: utf8 -*- + +# This file is part of vcs_query - https://github.com/mageta/vcs_query +# SPDX-License-Identifier: MIT +# See file LICENSE for more information. + +# TODO: modules documentation + +import collections +import email.utils +import argparse +import hashlib +import logging +import pickle +import sys +import os +import re + +# vCard Standards: +# 3.0 https://tools.ietf.org/html/rfc2426 +# 4.0 https://tools.ietf.org/html/rfc6350 +from vobject import readComponents as VObjectRead +from vobject.base import VObjectError + +LOGGER = logging.getLogger(__name__) + +Version = collections.namedtuple("Version", ["major", "minor", "patch"]) +VERSION = Version( + major=0, + minor=4, + patch=0, +) + +def main(argv): + optparser = argparse.ArgumentParser(prog=argv[0], + description="Query vCard Files for " + "EMail Addresses") + optparser.add_argument("pattern", metavar="PATTERN", + nargs='?', default=None, + help="only those lines that contain PATTERN will be" + "displayed") + optparser.add_argument("--version", + action="version", + version="%(prog)s version " + "{v.major:d}.{v.minor:d}.{v.patch:d}".format( + v=VERSION)) + optparser.add_argument("-d", "--vcard-dir", + required=True, action='append', + help="specify directory containing vCards (can be " + "given multiple times)") + optparser.add_argument("-a", "--all-addresses", + required=False, action="store_true", + help="display all addresses stored for a contact") + optparser.add_argument("-n", "--sort-names", + required=False, action="store_true", + help="sort the result according to the contact name " + "(the default is to sort according to mail-" + "address first)") + optparser.add_argument("-r", "--regex", + required=False, action="store_true", + help="interpret PATTERN as regular expression " + "(syntax: https://docs.python.org/3/library/" + "re.html#regular-expression-syntax)") + optparser.add_argument("-m", "--mode", + required=False, type=str, + choices=OutputFormat.available, + default=OutputFormat.available[0], + help="select output-mode (default: " + "{})".format(OutputFormat.available[0])) + args = optparser.parse_args(argv[1:]) + + for vcdir in args.vcard_dir: + if not os.path.isdir(vcdir): + optparser.error("'{}' is not a directory".format(vcdir)) + + try: + output = OutputFormat(args.mode) + except LookupError as error: + optparser.error(error) + + try: + pattern = Pattern(args.pattern, args.regex) + except re.error as error: + optparser.error("Given PATTERN is not a valid regular " + "expression: {!s}".format(error)) + + print("vcs_query.py, see https://github.com/mageta/vcs_query") + + # Load all contacts from the given vCard-Directories; duplicates are + # automatically handled by using a set + contacts_uniq = set() + for vcdir in args.vcard_dir: + try: + for vcard in VcardCache(vcdir).vcards: + if vcard: + if args.all_addresses: + contacts_uniq.update(vcard) + else: + contacts_uniq.add(vcard[0]) + except OSError as error: + LOGGER.error("Error while reading vCard Dir: %s: %s", vcdir, error) + + # sort the found contacts according to the given command-line options + if not args.sort_names: + contacts = sorted(contacts_uniq, + key=(lambda x: (x.mail.lower(), x.name.lower(), + x.description.lower()))) + else: + contacts = sorted(contacts_uniq, + key=(lambda x: (x.name.lower(), x.mail.lower(), + x.description.lower()))) + + for contact in contacts: + if pattern.search(output.format(contact)): + print(output.format_escape(contact)) + +class OutputFormat(object): + available = ("mutt", "vim") + + def __init__(self, mode): + if mode not in OutputFormat.available: + raise LookupError("'{}' is not a supported " + "output-mode".format(mode)) + + self.mode = mode + + def format(self, contact): + if self.mode == "mutt": + return "{}\t{}\t{}".format(contact.mail, contact.name, + contact.description) + elif self.mode == "vim": + return "{} <{}>".format(contact.name, contact.mail) + + def format_escape(self, contact): + if self.mode == "mutt": + return self.format(contact) + elif self.mode == "vim": + return email.utils.formataddr((contact.name, contact.mail)) + +class Pattern(object): + def __init__(self, pattern, is_regex): + self.match_all = False if pattern else True + self.is_regex = is_regex + + if not self.match_all: + if self.is_regex: + self.pattern = re.compile(pattern, re.IGNORECASE) + else: + self.pattern = pattern.lower() + + def search(self, string): + if self.match_all: + return True + + if self.is_regex and self.pattern.search(string): + return True + elif not self.is_regex and self.pattern in string.lower(): + return True + + return False + +class VcardCache(object): + def __init__(self, vcard_dir): + self.cache_dir = os.path.expanduser("~/.cache/") + self.vcard_dir = os.path.normcase(os.path.normpath(vcard_dir)) + + dhsh = hashlib.sha256() + dhsh.update(self.vcard_dir.encode()) + self.pickle_path = os.path.join(self.cache_dir, + "{}.vcs_query".format(dhsh.hexdigest())) + + self.last_vcard_dir_timestamp = 0 + self.vcard_files = {} + + self._state = self._load() + self._update() + self._serialize() + + _cache_version = 1 + + @property + def _default_state(self): + return (VcardCache._cache_version, 0, {}) + + @property + def _state(self): + return (VcardCache._cache_version, + self.last_vcard_dir_timestamp, self.vcard_files) + + @_state.setter + def _state(self, value): + self.last_vcard_dir_timestamp = value[1] + self.vcard_files = value[2] + + def _load(self): + try: + with open(self.pickle_path, "rb") as cache: + obj = pickle.load(cache) + + # prune invalid or outdated cache-files + if not isinstance(obj, tuple) or len(obj) < 3: + raise RuntimeError("Invalid type") + elif obj[0] != VcardCache._cache_version: + raise RuntimeError("Invalid Version ({})".format(obj[0])) + + return obj + except (OSError, RuntimeError, AttributeError, EOFError, ImportError, + IndexError, pickle.UnpicklingError) as error: + if not isinstance(error, OSError) or error.errno != 2: + LOGGER.warning("Cache file (%s) could not be read: %s", + self.pickle_path, error) + return self._default_state + + def _update(self): + vcard_dir_timestamp = get_timestamp(self.vcard_dir) + if vcard_dir_timestamp > self.last_vcard_dir_timestamp: + self.last_vcard_dir_timestamp = vcard_dir_timestamp + + paths = set() + # let erros in os.scandir() bubble up.. the whole thing failed + with os.scandir(self.vcard_dir) as directory: + for node in directory: + try: + path = os.path.abspath(node.path) + if node.is_file(): + paths.add(path) + except OSError as err: + LOGGER.error("Error reading vCard: %s: %s", node, err) + + # prune vCards that don't exist anymore + removed = list() + for path in self.vcard_files.keys(): + if path not in paths: + # we can not delete items from self.vcard_files while we + # iterate over it, so remember them instead + removed += [path] + + for path in removed: + del self.vcard_files[path] + + # add or update vCards + for path in paths: + vcard = self.vcard_files.get(path) + if not vcard or vcard.needs_update(): + try: + vcard = VcardFile(path) + self.vcard_files[path] = vcard + except OSError as err: + LOGGER.error("Error reading vCard: %s: %s", path, err) + try: + del self.vcard_files[path] + except KeyError: + pass + + def _serialize(self): + try: + if not os.path.isdir(self.cache_dir): + os.mkdir(self.cache_dir) + with open(self.pickle_path, "wb") as cache: + pickle.dump(self._state, cache) + except OSError: + LOGGER.warning("Cannot write to cache file: %s", self.pickle_path) + + @property + def vcards(self): + for vcard_file in self.vcard_files.values(): + for vcard in vcard_file.vcards: + yield vcard + +class Vcard(object): + Contact = collections.namedtuple("Contact", ["mail", "name", "description"]) + + def __init__(self, component): + # Property FN + # https://tools.ietf.org/html/rfc6350#section-6.2.1 + self.name = "" + if "fn" in component.contents: + self.name = component.fn.value + + # Property EMAIL + # https://tools.ietf.org/html/rfc6350#section-6.4.2 + self.mails = [] + if "email" in component.contents: + self.mails = [mail.value for mail in component.contents["email"]] + + # Property NOTE + # https://tools.ietf.org/html/rfc6350#section-6.7.2 + self.description = "" + if "note" in component.contents: + self.description = "; ".join([ + line for line in component.note.value.splitlines() if line + ]) + + def _get_mail_contact(self, mail): + return Vcard.Contact(str(mail), str(self.name), str(self.description)) + + def __getitem__(self, i): + return self._get_mail_contact(self.mails[i]) + + def __iter__(self): + for mail in self.mails: + yield self._get_mail_contact(mail) + + def __len__(self): + return len(self.mails) + +class VcardFile(object): + vobject_logger = logging.getLogger("vobject.base") + + def __init__(self, path): + self.path = path + self.timestamp = get_timestamp(path) + self.vcards = [] + self._read_components(path) + + def _read_components(self, path): + # As per https://tools.ietf.org/html/rfc6350#section-3.1 + # the charset for a vCard MUST be UTF-8 + try: + # let errors from FILE-I/O bubble up, this whole vCard is failed + with open(path, encoding="utf-8", errors="strict") as vcfile: + for component in VObjectRead(vcfile, ignoreUnreadable=True): + if component.name.lower() == "vcard": + # Normal Case: vCard is the top property: + # https://tools.ietf.org/html/rfc6350#section-6.1.1 + self.vcards += [Vcard(component)] + elif "vcard" in component.contents: + # Special case from RFC2426; in that version it was + # possible to nest vCards: + # https://tools.ietf.org/html/rfc2426#section-2.4.2 + # This has since been removed: + # https://tools.ietf.org/html/rfc6350#appendix-A.2 + # But we keep the code as it is rather simple and it + # provides backwards-compatibility + self.vcards += [Vcard(component.vcard)] + else: + LOGGER.warning("No vCard in a component in: %s", path) + except VObjectError as error: + LOGGER.error("Parser Error in file: %s: %s", path, error) + except ValueError as error: + LOGGER.error("Bad Encoding in file: %s: %s", path, error) + + def needs_update(self): + return get_timestamp(self.path) > self.timestamp + +# vobject regularly complains about unparsable streams and such, but as we +# don't really know which files should be vcards and which not, in the +# directory we are given, this is a bit much, and will only concern users, so +# we just ignore most warnings (there are exception, like when we found +# something that looks like a vCard but is not parsable after all). +VcardFile.vobject_logger.setLevel(logging.ERROR + 1) + +def get_timestamp(path): + return os.stat(path).st_mtime + +if __name__ == "__main__": + main(sys.argv) |