"""Kismet REST interface module.
(c) 2018 Mike Kershaw / Dragorn
Licensed under GPL2 or above.
"""
from .base_interface import BaseInterface
"""
The field simplification and pathing options are best described in the
developer docs for Kismet under docs/dev/webui_rest.md ; basically, they
allow for selecting specific fields from the tree and returning ONLY those
fields, instead of the entire object.
This will increase the speed of searches of large sets of data, and decrease
the time it takes for Kismet to return them.
Whenever possible this API will use the 'ekjson' format for multiple returned
objects - this places a JSON object for each element in an array/vector
response as a complete JSON record followed by a newline; this allows for
parsing the JSON response without allocating the entire vector object in memory
first, and enables streamed-base parsing of very large responses.
Field Simplification Specification:
Several endpoints in Kismet take a field filtering object. These
use a common specification:
[
field1,
...
fieldN
]
where a field may be a single-element string, consisting of a
field name -or- a field path, such as:
'kismet.device.base.channel'
'kismet.device.base.signal/kismet.common.signal.last_signal_dbm'
OR a field may be a two-value array, consisting of a field name or
path, and a target name the field will be aliased to:
['kismet.device.base.channel', 'base.channel']
['kismet.device.base.signal/kismet.common.signal.last_signal_dbm',
'last.signal']
The fields in the returned device will be inserted as their final
name - that is, from the first above example, the device will contain
'kismet.device.base.channel' and 'kismet.common.signal.last_signal_dbm'
and from the second example:
'base.channel' and 'last.signal'
Filter Specification:
Several endpoints in Kismet take a regex object. These use a common
specification:
[
[ multifield, regex ],
...
[ multifield, regex ]
]
Multifield is a field path specification which will automatically expand
value-maps and vectors found in the path. For example, the multifield
path:
'dot11.device/dot11.device.advertised_ssid_map/dot11.advertisedssid.ssid'
would apply to all 'dot11.advertisedssid.ssid' fields in the ssid_map
automatically.
Regex is a basic string containing a regular expression, compatible with
PCRE.
To match on SSIDs:
regex = [
['dot11.device/dot11.device.advertised_ssid_map/dot11.advertisedssid.ssid',
'^SomePrefix.*' ]
]
A device is included in the results if it matches any of the regular
expressions.
"""
[docs]class KismetConnector(BaseInterface):
"""Kismet rest API."""
[docs] def system_status(self):
"""Return system status.
Note: This is superseded by :py:meth:`kismet_rest.System.get_status`
"""
return self.interact("GET", "system/status.json")
[docs] def device_summary(self, callback=None, cbargs=None):
"""Return a summary of all devices.
Note: This is superseded by :py:meth:`kismet_rest.Devices.all`
Deprecated API - now referenced as device_list(..)
"""
return self.device_list(callback, cbargs)
[docs] def device_list(self, callback=None, cbargs=None):
"""Return all fields of all devices.
Note: This is superseded by :py:meth:`kismet_rest.Devices.all`
This may be extremely memory and CPU intensive and should be avoided.
Memory use can be reduced by providing a callback, which will be
invoked for each device.
In general THIS API SHOULD BE AVOIDED. There are several potentially
serious repercussions in querying all fields of all devices in a very
high device count environment.
It is strongly recommended that you use smart_device_list(...)
"""
kwargs = {}
url = "/devices/all_devices.ekjson"
if callback:
kwargs = {"callback": callback,
"callback_args": cbargs}
return self.interact("GET", url, True, **kwargs)
[docs] def device_summary_since(self, ts=0, fields=None, callback=None,
cbargs=None):
"""
device_summary_since(ts, [fields, callback, cbargs]) ->
device summary list
Note: This is superseded by :py:meth:`kismet_rest.Devices.all`
Deprecated API - now referenced as smart_device_list(...)
Return object containing summary of devices added or changed since ts
and ts info
"""
return self.smart_device_list(ts=ts, fields=fields, callback=callback,
cbargs=cbargs)
[docs] def smart_summary_since(self, ts=0, fields=None, regex=None, callback=None,
cbargs=None):
"""
smart_summary_since([ts, fields, regex, callback, cbargs]) ->
device summary list
Note: This is superseded by :py:meth:`kismet_rest.Devices.all`
Deprecated API - now referenced as smart_device_list(...)
"""
return self.smart_device_list(ts=ts, fields=fields, regex=regex,
callback=callback, cbargs=cbargs)
[docs] def smart_device_list(self, ts=0, fields=None, regex=None, callback=None,
cbargs=None):
"""Return a list of devices.
Note: This is superseded by :py:meth:`kismet_rest.Devices.all`
Perform a 'smart' device list. The device list can be manipulated in
several ways:
1. Devices active since last timestamp. By setting the 'ts'
parameter, only devices which have been active since that
timestamp will be returned.
2. Devices which match a regex, as defined by the regex spec above
3. Devices can be simplified to reduce the amount of work being
done and number of fields being returned.
If a callback is given, it will be called for each device in the
result. If no callback is provided, the results will be returned as a
vector.
Args:
ts (int): Unix epoch timestamp.
fields (list): List of field names for matching.
regex (str): Regular expression for field matching.
callback (obj): Callback for processing search results.
cbargs (list): List of arguments for callback.
Returns:
list: List of dictionary-type objects, which describe devices
observed by Kismet. Dictionary keys are:
``dot11.device``,
``kismet.device.base.basic_crypt_set``,
``kismet.device.base.basic_type_set``,
``kismet.device.base.channel``,
``kismet.device.base.commonname``,
``kismet.device.base.crypt``,
``kismet.device.base.datasize``,
``kismet.device.base.datasize.rrd``,
``kismet.device.base.first_time``,
``kismet.device.base.freq_khz_map``,
``kismet.device.base.frequency``,
``kismet.device.base.key``,
``kismet.device.base.last_time``,
``kismet.device.base.macaddr``,
``kismet.device.base.manuf``,
``kismet.device.base.mod_time``,
``kismet.device.base.name``,
``kismet.device.base.num_alerts``,
``kismet.device.base.packet.bin.1000``,
``kismet.device.base.packet.bin.250``,
``kismet.device.base.packet.bin.500``,
``kismet.device.base.packets.crypt``,
``kismet.device.base.packets.data``,
``kismet.device.base.packets.error``,
``kismet.device.base.packets.filtered``,
``kismet.device.base.packets.llc``,
``kismet.device.base.packets.rrd``,
``kismet.device.base.packets.rx``,
``kismet.device.base.packets.total``,
``kismet.device.base.packets.tx``,
``kismet.device.base.phyname``,
``kismet.device.base.seenby``,
``kismet.device.base.server_uuid``,
``kismet.device.base.signal``,
``kismet.device.base.tags``,
``kismet.device.base.type``.
"""
cmd = {}
if fields:
cmd["fields"] = fields
if regex:
cmd["regex"] = regex
url = "devices/last-time/{}/devices.ekjson".format(ts)
kwargs = {"payload": cmd}
if callback:
kwargs = {"callback": callback,
"callback_args": cbargs,
"payload": cmd}
print(url)
return self.interact("POST", url, True, **kwargs)
[docs] def device_list_by_mac(self, maclist, fields=None, callback=None,
cbargs=None):
"""List devices matching MAC addresses in maclist.
Note: This method is deprecated.
Use :py:meth:`kismet_rest.Devices.yield_by_mac` instead.
MAC addresses may be
complete MACs or masked MAC groups
("AA:BB:CC:00:00:00/FF:FF:FF:00:00:00").
Returned devices can be summarized/simplified by the fields list.
If a callback is given, it will be called for each device in the
result. If no callback is provided, the results will be returned as a
vector.
"""
cmd = {}
url = "devices/multimac/devices.ekjson"
if fields is not None:
cmd["fields"] = fields
cmd["devices"] = maclist
if callback:
return [result for result in
self.interact_yield("POST", url, payload=cmd,
callback=callback,
callback_args=cbargs, stream=True)]
return [result for result in
self.interact_yield("POST", url, payload=cmd, stream=True)]
[docs] def dot11_clients_of(self, apkey, fields=None, callback=None, cbargs=None):
"""List clients of 802.11 AP.
Note: This is superseded by
:py:meth:`kismet_rest.Devices.dot11_clients_of`
List devices which are clients of a given 802.11 access point, using
the /phy/phy80211/clients-of endpoint.
Returned devices can be summarized/simplified by the fields list.
If a callback is given, it will be called for each device in the
result. If no callback is provided, the results will be returned as a
vector.
"""
cmd = {}
if fields is not None:
cmd["fields"] = fields
url = "phy/phy80211/clients-of/{}/clients.ekjson".format(apkey)
if callback:
return [result for result in
self.interact_yield("POST", url, payload=cmd,
callback=callback,
callback_args=cbargs, stream=True)]
return [result for result in
self.interact_yield("POST", url, payload=cmd, stream=True)]
[docs] def dot11_access_points(self, tstamp=None, regex=None, fields=None,
callback=None, cbargs=None):
"""Return a list of dot11 access points.
Note: This is superseded by
:py:meth:`kismet_rest.Devices.dot11_access_points`
List devices which are considered to be 802.11 access points, using the
/devices/views/phydot11_accesspoints/ view
Returned devices can be summarized/simplified by the fields list.
If a timestamp is given, only devices modified more recently than the
timestamp (and matching any other conditions) will be returned.
If a regex is given, only devices matching the regex (and any other
conditions) will be returned.
If a callback is given, it will be called for each device in the
result. If no callback is provided, the results will be returned as a
vector.
Args:
ts (int): Unix epoch timestamp
regex (str): Regular expression for filtering results.
fields (list): Fields for filtering.
callback (obj): Callback for processing individual results.
cbargs (list): List of arguments for callback.
Return:
list: List of dictionary-type objects which describe access points.
Keys describing access points:
``dot11.device``,
``kismet.device.base.basic_crypt_set``,
``kismet.device.base.basic_type_set``,
``kismet.device.base.channel``,
``kismet.device.base.commonname``,
``kismet.device.base.crypt``,
``kismet.device.base.datasize``,
``kismet.device.base.datasize.rrd``,
``kismet.device.base.first_time``,
``kismet.device.base.freq_khz_map``,
``kismet.device.base.frequency``,
``kismet.device.base.key``,
``kismet.device.base.last_time``,
``kismet.device.base.macaddr``,
``kismet.device.base.manuf``,
``kismet.device.base.mod_time``,
``kismet.device.base.name``,
``kismet.device.base.num_alerts``,
``kismet.device.base.packet.bin.250``,
``kismet.device.base.packet.bin.500``,
``kismet.device.base.packets.crypt``,
``kismet.device.base.packets.data``,
``kismet.device.base.packets.error``,
``kismet.device.base.packets.filtered``,
``kismet.device.base.packets.llc``,
``kismet.device.base.packets.rrd``,
``kismet.device.base.packets.rx``,
``kismet.device.base.packets.total``,
``kismet.device.base.packets.tx``,
``kismet.device.base.phyname``,
``kismet.device.base.seenby``,
``kismet.device.base.server_uuid``,
``kismet.device.base.signal``,
``kismet.device.base.tags``,
``kismet.device.base.type``.
"""
cmd = {}
if tstamp is not None:
cmd["last_time"] = tstamp
if regex is not None:
cmd["regex"] = regex
if fields is not None:
cmd["fields"] = fields
url = "devices/views/phydot11_accesspoints/devices.ekjson"
if callback:
return [result for result in
self.interact_yield("POST", url, payload=cmd,
callback=callback,
callback_args=cbargs, stream=True)]
return [result for result in
self.interact_yield("POST", url, payload=cmd, stream=True)]
[docs] def device(self, key, field=None, fields=None):
"""Wrap device_by_key.
Deprecated.
"""
return self.device_by_key(key, field, fields)
[docs] def device_field(self, key, field):
"""Wrap device_by_key.
Deprecated, prefer device_by_key with field.
"""
return self.device_by_key(key, field=field)
[docs] def device_by_key(self, key, field=None, fields=None):
"""Return a dictionary representing one device, identified by ``key``.
Note: This is superseded by
:py:meth:`kismet_rest.Devices.get_by_key`
Fetch a complete device record by the Kismet key (unique key per Kismet
session) or fetch a specific sub-field by path.
If a field simplification set is passed in 'fields', perform a
simplification on the result
"""
if fields is None:
if field is not None:
field = "/" + field
else:
field = ""
url = "devices/by-key/{}/device.json{}".format(key, field)
result = self.interact("GET", url)
else:
payload = {"fields": fields}
url = "devices/by-key/{}/device.json".format(key)
result = self.interact("POST", url, payload=payload)
return result
[docs] def device_by_mac(self, mac, fields=None):
"""Return a list of all devices matching ``mac``.
Deprecated.
Use :py:meth:`kismet_rest.Devices.yield_by_mac` instead.
Return a vector of all devices in all phy types matching the supplied
MAC address; typically this will return a vector of a single device,
but MAC addresses could overlap between phy types.
If a field simplification set is passed in 'fields', perform a
simplification on the result
"""
if fields:
cmd = {"fields": fields}
url = "devices/by-mac/{}/devices.json".format(mac)
return self.interact("POST", url, payload=cmd, stream=False)
url = "devices/by-mac/{}/devices.json".format(mac)
return self.interact("POST", url, stream=False)
[docs] def datasources(self):
"""Return a list of data sources.
Deprecated.
Use :py:meth:`kismet_rest.Datasources.all` instead.
Return:
list: List of dictionary-type objects, which describe data sources.
Dictionary keys are:
``kismet.datasource.capture_interface``,
``kismet.datasource.channel``,
``kismet.datasource.channels``,
``kismet.datasource.definition``,
``kismet.datasource.dlt``,
``kismet.datasource.error``,
``kismet.datasource.error_reason``,
``kismet.datasource.hardware``,
``kismet.datasource.hop_channels``,
``kismet.datasource.hop_offset``,
``kismet.datasource.hopping``,
``kismet.datasource.hop_rate``,
``kismet.datasource.hop_shuffle``,
``kismet.datasource.hop_shuffle_skip``,
``kismet.datasource.hop_split``,
``kismet.datasource.info.amp_gain``,
``kismet.datasource.info.amp_type``,
``kismet.datasource.info.antenna_beamwidth``,
``kismet.datasource.info.antenna_gain``,
``kismet.datasource.info.antenna_orientation``,
``kismet.datasource.info.antenna_type``,
``kismet.datasource.interface``,
``kismet.datasource.ipc_binary``,
``kismet.datasource.ipc_pid``,
``kismet.datasource.linktype_override``,
``kismet.datasource.name``,
``kismet.datasource.num_error_packets``,
``kismet.datasource.num_packets``,
``kismet.datasource.packets_rrd``,
``kismet.datasource.passive``,
``kismet.datasource.paused``,
``kismet.datasource.remote``,
``kismet.datasource.retry``,
``kismet.datasource.retry_attempts``,
``kismet.datasource.running``,
``kismet.datasource.source_key``,
``kismet.datasource.source_number``,
``kismet.datasource.total_retry_attempts``,
``kismet.datasource.type_driver``,
``kismet.datasource.uuid``,
``kismet.datasource.warning``.
"""
return self.interact("GET", "datasource/all_sources.json")
[docs] def datasource_list_interfaces(self):
"""Return a list of all available interfaces.
Deprecated.
Use :py:meth:`kismet_rest.Datasources.yield_interfaces` instead.
"""
return self.interact("GET", "datasource/list_interfaces.json")
[docs] def config_datasource_set_channel(self, uuid, channel):
"""Return ``True`` if operation was successful, ``False`` otherwise.
Deprecated.
Use :py:meth:`kismet_rest.Datasources.set_channel` instead.
Locks an data source to an 802.11 channel or frequency. Channel
may be complex channel such as "6HT40+".
Requires valid login.
"""
cmd = {"channel": channel}
url = "datasource/by-uuid/{}/set_channel.cmd".format(uuid)
return self.interact("POST", url, payload=cmd, only_status=True)
[docs] def config_datasource_set_hop_rate(self, uuid, rate):
"""Set the hop rate of a specific data source by UUID.
Deprecated.
Use :py:meth:`kismet_rest.Datasources.set_hop_rate` instead.
Configures the hopping rate of a data source, while not changing the
channels used for hopping.
Requires valid login
"""
cmd = {"rate": rate}
url = "datasource/by-uuid/{}/set_channel.cmd".format(uuid)
return self.interact("POST", url, payload=cmd, only_status=True)
[docs] def config_datasource_set_hop_channels(self, uuid, rate, channels):
"""Set datasource hopping rate by UUID.
Deprecated.
Use :py:meth:`kismet_rest.Datasources.set_hop_channels` instead.
Configures a data source for hopping at 'rate' over a vector of
channels.
Requires valid login
"""
cmd = {"rate": rate,
"channels": channels}
url = "datasource/by-uuid/{}/set_channel.cmd".format(uuid)
return self.interact("POST", url, payload=cmd, only_status=True)
[docs] def config_datasource_set_hop(self, uuid):
"""Configure a source for hopping.
Deprecated.
Use :py:meth:`kismet_rest.Datasources.set_hop` instead.
Uses existing source hop / channel list / etc attributes.
Requires valid login
"""
cmd = {"hop": True}
url = "datasource/by-uuid/{}/set_hop.cmd".format(uuid)
return self.interact("POST", url, payload=cmd, only_status=True)
[docs] def add_datasource(self, source):
"""Add a new source to Kismet.
Deprecated.
Use :py:meth:`kismet_rest.Datasources.add` instead.
source is a standard source definition.
Requires valid login.
Return:
bool: Success
"""
cmd = {"definition": source}
return self.interact("POST", "datasource/add_source.cmd",
only_status=True, payload=cmd)
[docs] def define_alert(self, name, description, rate="10/min", burst="1/sec",
phyname=None):
"""
define_alert(name, description, rate, burst) -> Boolean
Deprecated.
Use :py:meth:`kismet_rest.Alerts.define` instead.
LOGIN REQUIRED
Define a new alert. This alert can then be triggered on external
conditions via raise_alert(...)
Phyname is optional, and links the alert to a specific PHY type.
Rate and Burst are optional rate and burst limits.
"""
cmd = {"name": name,
"description": description,
"throttle": rate,
"burst": burst}
if phyname is not None:
cmd["phyname"] = phyname
url = "alerts/definitions/define_alert.cmd"
return self.interact("POST", url, payload=cmd, only_status=True)
[docs] def raise_alert(self, name, text, bssid=None, source=None, dest=None,
other=None, channel=None):
"""Raise an alert in Kismet.
Deprecated.
Use :py:meth:`kismet_rest.Alerts.raise` instead.
LOGIN REQUIRED
Trigger an alert; the alert can be one defined via define_alert(...) or
an alert built into the system.
The alert name and content of the alert are required, all other fields
are optional.
Args:
name (str): Name of alert.
text (str): Descriptive text for alert.
bssid (str): BSSID to filter for.
source (str): ...
dest (str): ...
other (str): ...
channel (str): Channel to filter for.
"""
cmd = {"name": name,
"text": text}
if bssid is not None:
cmd["bssid"] = bssid
if source is not None:
cmd["source"] = source
if dest is not None:
cmd["dest"] = dest
if other is not None:
cmd["other"] = other
if channel is not None:
cmd["channel"] = channel
return self.interact("POST", "alerts/raise_alert.cmd", payload=cmd,
only_status=True)
[docs] def alerts(self, ts_sec=0, ts_usec=0):
"""Return alert object.
Deprecated.
Use :py:meth:`kismet_rest.Alerts.all` instead.
Fetch alert object, containing metadata and list of alerts, optionally
filtered to alerts since a given timestamp
Args:
ts_sec (int): Timestamp seconds (Unix epoch)
ts_usec (int): Timestamp microseconds
Return:
dict: Dictionary containing metadata and a list of alerts. Keys
represented in output: ``'kismet.alert.timestamp``,
``kismet.alert.list``.
"""
url = "alerts/last-time/{}.{}/alerts.json".format(ts_sec, ts_usec)
return self.interact("GET", url)
[docs] def messages(self, ts_sec=0, ts_usec=0):
"""Return message object.
Deprecated.
Use :py:meth:`kismet_rest.Messages.all` instead.
Fetch message object, containing metadata and list of messages,
optionally filtered to messages since a given timestamp
Args:
ts_sec (int): Timestamp seconds (Unix epoch)
ts_usec (int): Timestamp microseconds
Return:
dict: Dictionary containing metadata and a list of messages.
Top-level keys: ``kismet.messagebus.timestamp``,
``kismet.messagebus.list``
"""
url = "messagebus/last-time/{}.{}/messages.json".format(ts_sec,
ts_usec)
return self.interact("GET", url)
[docs] def location(self):
"""Return the gps location.
Deprecated.
Use :py:meth:`kismet_rest.GPS.current_location` instead.
Return:
dict: Dictionary object describing current location of Kismet
server. Keys represented in output:
``kismet.common.location.lat``,
``kismet.common.location.lon``,
``kismet.common.location.alt``,
``kismet.common.location.heading``,
``kismet.common.location.speed``,
``kismet.common.location.time_sec``,
``kismet.common.location.time_usec``,
``kismet.common.location.fix``,
``kismet.common.location.valid``
"""
return self.interact("GET", "gps/location.json")
if __name__ == "__main__":
print(KismetConnector().system_status())