"""Library for polling dataplanes for statistics."""
# Copyright (C) 2015 Research and Education Advanced Network New Zealand Ltd.
# Copyright (C) 2015--2017 The Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import random
from ryu.lib import hub
from faucet.valve_util import dpid_log
from faucet.valve_of import devid_present
from faucet.valve_of_old import OLD_MATCH_FIELDS
[docs]class GaugePoller(object):
"""Abstraction for a poller for statistics."""
def __init__(self, conf, logname, prom_client):
self.dp = conf.dp # pylint: disable=invalid-name
self.conf = conf
self.prom_client = prom_client
self.reply_pending = False
self.logger = logging.getLogger(
logname + '.{0}'.format(self.conf.type)
)
# _running indicates that the watcher is receiving data
self._running = False
[docs] def report_dp_status(self, dp_status):
"""Report DP status."""
self.prom_client.dp_status.labels(
**dict(dp_id=hex(self.dp.dp_id), dp_name=self.dp.name)).set(dp_status) # pylint: disable=no-member
[docs] def start(self, _ryudp, _active):
"""Start the poller."""
return
[docs] def stop(self):
"""Stop the poller."""
return
[docs] def running(self):
"""Return True if the poller is running."""
return self._running
[docs] def is_active(self):
"""Return True if the poller is controlling the requiest loop for its
stat"""
return False
[docs] def send_req(self):
"""Send a stats request to a datapath."""
raise NotImplementedError
[docs] def no_response(self):
"""Called when a polling cycle passes without receiving a response."""
raise NotImplementedError
[docs] def update(self, rcv_time, dp_id, msg):
"""Handle the responses to requests.
Called when a reply to a stats request sent by this object is received
by the controller.
It should acknowledge the receipt by setting self.reply_pending to
false.
Args:
rcv_time: the time the response was received
dp_id: DP ID
msg: the stats reply message
"""
# TODO: it may be worth while verifying this is the correct stats
# response before doing this
if not self._running:
self.logger.debug('{} update received when not running'.format(
dpid_log(dp_id)
))
return
self.reply_pending = False
self._update()
def _update(self):
# TODO: this should be implemented by subclasses instead of having a
# super call to update
pass
def _stat_port_name(self, msg, stat, dp_id):
"""Return port name as string based on port number."""
if stat.port_no == msg.datapath.ofproto.OFPP_CONTROLLER:
return 'CONTROLLER'
elif stat.port_no == msg.datapath.ofproto.OFPP_LOCAL:
return 'LOCAL'
elif stat.port_no in self.dp.ports:
return self.dp.ports[stat.port_no].name
self.logger.debug('%s stats for unknown port %u',
dpid_log(dp_id), stat.port_no)
return str(stat.port_no)
@staticmethod
def _format_port_stats(delim, stat):
formatted_port_stats = []
for stat_name_list, stat_val in (
(('packets', 'out'), stat.tx_packets),
(('packets', 'in'), stat.rx_packets),
(('bytes', 'out'), stat.tx_bytes),
(('bytes', 'in'), stat.rx_bytes),
(('dropped', 'out'), stat.tx_dropped),
(('dropped', 'in'), stat.rx_dropped),
(('errors', 'in'), stat.rx_errors)):
# For openvswitch, unsupported statistics are set to
# all-1-bits (UINT64_MAX), skip reporting them
if stat_val != 2**64-1:
stat_name = delim.join(stat_name_list)
formatted_port_stats.append((stat_name, stat_val))
return formatted_port_stats
[docs]class GaugeThreadPoller(GaugePoller):
"""A ryu thread object for sending and receiving OpenFlow stats requests.
The thread runs in a loop sending a request, sleeping then checking a
response was received before sending another request.
The methods send_req, update and no_response should be implemented by
subclasses.
"""
def __init__(self, conf, logname, prom_client):
super(GaugeThreadPoller, self).__init__(conf, logname, prom_client)
self.thread = None
self.interval = self.conf.interval
self.ryudp = None
[docs] def start(self, ryudp, active):
self.ryudp = ryudp
self.stop()
self._running = True
if active:
self.thread = hub.spawn(self)
[docs] def stop(self):
self._running = False
if self.is_active():
hub.kill(self.thread)
hub.joinall([self.thread])
self.thread = None
return self.thread is not None
def __call__(self):
"""Send request loop.
Delays the initial request for a random interval to reduce load.
Then sends a request to the datapath, waits the specified interval and
checks that a response has been received in a loop."""
# TODO: this should use a deterministic method instead of random
hub.sleep(random.randint(1, self.conf.interval))
while True:
self.send_req()
self.reply_pending = True
hub.sleep(self.conf.interval)
if self.reply_pending:
self.no_response()
[docs] def send_req(self):
"""Send a stats request to a datapath."""
raise NotImplementedError
[docs] def no_response(self):
"""Called when a polling cycle passes without receiving a response."""
raise NotImplementedError
[docs]class GaugePortStatsPoller(GaugeThreadPoller):
"""Periodically sends a port stats request to the datapath and parses
and outputs the response.
"""
[docs] def send_req(self):
if self.ryudp:
ofp = self.ryudp.ofproto
ofp_parser = self.ryudp.ofproto_parser
req = ofp_parser.OFPPortStatsRequest(self.ryudp, 0, ofp.OFPP_ANY)
self.ryudp.send_msg(req)
[docs] def no_response(self):
self.logger.info(
'port stats request timed out for %s', self.dp.name)
[docs]class GaugeFlowTablePoller(GaugeThreadPoller):
"""Periodically dumps the current datapath flow table as a yaml object.
Includes a timestamp and a reference ($DATAPATHNAME-flowtables). The
flow table is dumped as an OFFlowStatsReply message (in yaml format) that
matches all flows.
"""
[docs] def send_req(self):
if self.ryudp:
ofp = self.ryudp.ofproto
ofp_parser = self.ryudp.ofproto_parser
match = ofp_parser.OFPMatch()
req = ofp_parser.OFPFlowStatsRequest(
self.ryudp, 0, ofp.OFPTT_ALL, ofp.OFPP_ANY, ofp.OFPG_ANY,
0, 0, match)
self.ryudp.send_msg(req)
[docs] def no_response(self):
self.logger.info(
'flow dump request timed out for %s', self.dp.name)
def _parse_flow_stats(self, stats):
"""Parse flow stats reply message into tags/labels and byte/packet counts."""
packet_count = int(stats['packet_count'])
byte_count = int(stats['byte_count'])
instructions = stats['instructions']
tags = {
'dp_name': self.dp.name,
'dp_id': hex(self.dp.dp_id),
'table_id': int(stats['table_id']),
'priority': int(stats['priority']),
'inst_count': len(instructions),
'cookie': int(stats['cookie']),
}
oxm_matches = stats['match']['OFPMatch']['oxm_fields']
for oxm_match in oxm_matches:
oxm_tlv = oxm_match['OXMTlv']
mask = oxm_tlv['mask']
val = oxm_tlv['value']
field = oxm_tlv['field']
if mask is not None:
val = '/'.join((str(val), str(mask)))
if field in OLD_MATCH_FIELDS:
field = OLD_MATCH_FIELDS[field]
tags[field] = val
if field == 'vlan_vid' and mask is None:
tags['vlan'] = devid_present(int(val))
return (
('flow_packet_count', tags, packet_count),
('flow_byte_count', tags, byte_count))
[docs]class GaugePortStateBaseLogger(GaugePoller):
"""Abstraction for port state poller."""
[docs] def send_req(self):
"""Send a stats request to a datapath."""
raise NotImplementedError
[docs] def no_response(self):
"""Called when a polling cycle passes without receiving a response."""
raise NotImplementedError