Source code for photon.tools.ping

from multiprocessing import cpu_count as _cpu_count
from multiprocessing.dummy import Pool as _Pool
from re import findall as _findall
from re import search as _search

from photon.photon import check_m
from photon.util.structures import to_list

rxlss = '(?P<loss>[\d.]+)[%] packet loss\n'
rxmst = 'time=([\d.]*) ms\n'
rxrtt = '(?P<min>[\d.]+)/(?P<avg>[\d.]+)/(?P<max>[\d.]+)/(?P<stddev>[\d.]+) ms'


[docs]class Ping(object): ''' The Ping tool helps to send pings, returning detailed results each probe, and calculates a summary of all probes. :param six: Either use ``ping`` or ``ping6`` :param net_if: Specify network interface to send pings from :param num: How many pings to send each probe :param packetsize: Specifies the number of data bytes to be sent :param max_pool_size: Hosts passed to :func:`probe` in form of a list, will be processed in parallel. Specify the maximum size of the thread pool workers here. If skipped, the number of current CPUs is used ''' def __init__(self, m, six=False, net_if=None, num=5, packetsize=None, max_pool_size=None): super().__init__() self.m = check_m(m) self.__ping_cmd = 'ping6' if six else 'ping' self.__net_if = '-I %s' % (net_if) if net_if else '' if not num: num = 1 self.__num = '-c %d' % (num) if not packetsize: packetsize = '' if packetsize and packetsize > 1: packetsize = '-s %s' % (packetsize) self.__packetsize = packetsize if not max_pool_size: max_pool_size = _cpu_count() if max_pool_size < 1: max_pool_size = 1 self.__max_pool_size = max_pool_size self.__probe_results = dict() self.m( 'ping tool startup done', more=dict( pingc=self.__ping_cmd, net_if=self.__net_if, num=self.__num ), verbose=False ) @property def probe(self): ''' :param hosts: One or a list of hosts (URLs, IP-addresses) to send pings to * If you need to check multiple hosts, it is best \ to pass them together as a list. * This will probe all hosts in parallel, \ with ``max_pool_size`` workers. :returns: A dictionary with all hosts probed as keys specified as following: * 'up': ``True`` or ``False`` depending if ping was successful * 'loss': The packet loss as list (if 'up') * 'ms': A list of times each packet sent (if 'up') * 'rtt': A dictionary with the fields \ *avg*, *min*, *max* & *stddev* (if 'up') ''' return self.__probe_results @probe.setter def probe(self, hosts): ''' .. seealso:: :attr:`probe` ''' def __send_probe(host): ping = self.m( '', cmdd=dict( cmd=' '.join([ self.__ping_cmd, self.__num, self.__net_if, self.__packetsize, host ]) ), critical=False, verbose=False ) up = True if ping.get('returncode') == 0 else False self.__probe_results[host] = {'up': up} if up: p = ping.get('out') loss = _search(rxlss, p) ms = _findall(rxmst, p) rtt = _search(rxrtt, p) if loss: loss = loss.group('loss') self.__probe_results[host].update(dict( ms=ms, loss=loss, rtt=rtt.groupdict() )) hosts = to_list(hosts) pool_size = ( len(hosts) if len(hosts) <= self.__max_pool_size else self.__max_pool_size ) pool = _Pool(pool_size) pool.map(__send_probe, hosts) pool.close() pool.join() @property def status(self): ''' :returns: A dictionary with the following: * 'num': Total number of hosts already probed * 'up': Number of hosts up * 'down': Number of hosts down * 'ratio': Ratio between 'up'/'down' as float Ratio: * ``100%`` up == `1.0` * ``10%`` up == `0.1` * ``0%`` up == `0.0` ''' num = len(self.probe) up = len([h for h in self.probe if self.probe[h]['up']]) ratio = up/num if num != 0 else 0 # over 9000! return dict(num=num, up=up, down=num-up, ratio=ratio)