Source code for kismet_rest.alerts

"""Alerts abstraction."""

from .base_interface import BaseInterface


[docs]class Alerts(BaseInterface): """Alerts abstraction.""" kwargs_defaults = {"ts_sec": 0, "ts_usec": 0} url_template = "alerts/last-time/{ts_sec}.{ts_usec}/alerts.ekjson"
[docs] def all(self, callback=None, callback_args=None, **kwargs): """Yield all alerts, one at a time. If callback is set, nothing will be returned. Args: callback: Callback function. callback_args: Arguments for callback. Keyword args: ts_sec (int): Starting timestamp in seconds since Epoch. ts_usec (int): Microseconds for starting timestamp. Yield: dict: Alert json, or None if callback is set. """ callback_settings = {} if callback: callback_settings["callback"] = callback if callback_args: callback_settings["callback_args"] = callback_args query_args = self.kwargs_defaults.copy() query_args.update(kwargs) url = self.url_template.format(**query_args) for result in self.interact_yield("GET", url, **callback_settings): yield result
[docs] def define(self, name, description, rate="10/min", burst="1/sec", phyname=None): """Define an alert. LOGIN REQUIRED Define a new alert. This alert can then be triggered on external conditions via raise_alert(...) Phyname is optional, and links the alert to a specific PHY type. Rate and Burst are optional rate and burst limits. Args: name (str): Name of alert. description (str): Description of alert. rate (str): Rate limit. Defaults to ``10/min``. burst (str): Burst limit. Defaults to ``1/sec``. phyname (str): Name of PHY. Defaults to None. Return: bool: True for success, False for failed request. """ cmd = {"name": name, "description": description, "throttle": rate, "burst": burst} if phyname is not None: cmd["phyname"] = phyname url = "alerts/definitions/define_alert.cmd" return self.interact("POST", url, payload=cmd, only_status=True)
[docs] def raise_alert(self, name, text, bssid=None, source=None, dest=None, other=None, channel=None): """Raise an alert in Kismet. Trigger an alert; the alert can be one defined via define_alert(...) or an alert built into the system. The alert name and content of the alert are required, all other fields are optional. Args: name (str): Name of alert. text (str): Descriptive text for alert. bssid (str): BSSID to filter for. source (str): ... dest (str): ... other (str): ... channel (str): Channel to filter for. """ cmd = {"name": name, "text": text} if bssid is not None: cmd["bssid"] = bssid if source is not None: cmd["source"] = source if dest is not None: cmd["dest"] = dest if other is not None: cmd["other"] = other if channel is not None: cmd["channel"] = channel return self.interact("POST", "alerts/raise_alert.cmd", payload=cmd, only_status=True)