"""RyuApp shim between Ryu and Valve."""
# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
# Copyright (C) 2015 Brad Cowie, Christopher Lorier and Joe Stringer.
# Copyright (C) 2015 Research and Education Advanced Network New Zealand Ltd.
# Copyright (C) 2015--2017 The Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import random
import signal
import sys
import time
from ryu.base import app_manager
from ryu.controller.handler import CONFIG_DISPATCHER
from ryu.controller.handler import MAIN_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.controller import dpset
from ryu.controller import event
from ryu.controller import ofp_event
from ryu.lib import hub
from faucet.conf import InvalidConfigError
from faucet.config_parser import dp_parser, get_config_for_api
from faucet.config_parser_util import config_changed
from faucet.valve_util import dpid_log, get_logger, kill_on_exception, get_setting
from faucet.valve import valve_factory, SUPPORTED_HARDWARE
from faucet import faucet_experimental_api
from faucet import faucet_experimental_event
from faucet import faucet_bgp
from faucet import faucet_metrics
from faucet import valve_util
from faucet import valve_packet
from faucet import valve_of
[docs]class EventFaucetExperimentalAPIRegistered(event.EventBase):
"""Event used to notify that the API is registered with Faucet."""
pass
[docs]class EventFaucetResolveGateways(event.EventBase):
"""Event used to trigger gateway re/resolution."""
pass
[docs]class EventFaucetStateExpire(event.EventBase):
"""Event used to trigger expiration of state in controller."""
pass
[docs]class EventFaucetMetricUpdate(event.EventBase):
"""Event used to trigger update of metrics."""
pass
[docs]class EventFaucetAdvertise(event.EventBase):
"""Event used to trigger periodic network advertisements (eg IPv6 RAs)."""
pass
[docs]class Faucet(app_manager.RyuApp):
"""A RyuApp that implements an L2/L3 learning VLAN switch.
Valve provides the switch implementation; this is a shim for the Ryu
event handling framework to interface with Valve.
"""
OFP_VERSIONS = valve_of.OFP_VERSIONS
_CONTEXTS = {
'dpset': dpset.DPSet,
'faucet_experimental_api': faucet_experimental_api.FaucetExperimentalAPI,
}
_EVENTS = [EventFaucetExperimentalAPIRegistered]
logname = 'faucet'
exc_logname = logname + '.exception'
def __init__(self, *args, **kwargs):
super(Faucet, self).__init__(*args, **kwargs)
# There doesnt seem to be a sensible method of getting command line
# options into ryu apps. Instead I am using the environment variable
# FAUCET_CONFIG to allow this to be set, if it is not set it will
# default to valve.yaml
self.config_file = get_setting('FAUCET_CONFIG')
self.loglevel = get_setting('FAUCET_LOG_LEVEL')
self.logfile = get_setting('FAUCET_LOG')
self.exc_logfile = get_setting('FAUCET_EXCEPTION_LOG')
self.stat_reload = get_setting('FAUCET_CONFIG_STAT_RELOAD')
self.dpset = kwargs['dpset']
self.api = kwargs['faucet_experimental_api']
# Setup logging
self.logger = get_logger(
self.logname, self.logfile, self.loglevel, 0)
# Set up separate logging for exceptions
self.exc_logger = get_logger(
self.exc_logname, self.exc_logfile, logging.DEBUG, 1)
self.valves = {}
self.config_hashes = None
self.config_file_stats = None
self.metrics = faucet_metrics.FaucetMetrics()
self.notifier = faucet_experimental_event.FaucetExperimentalEventNotifier(
get_setting('FAUCET_EVENT_SOCK'), self.metrics, self.logger)
self._bgp = faucet_bgp.FaucetBgp(self.logger, self._send_flow_msgs)
[docs] @kill_on_exception(exc_logname)
def start(self):
super(Faucet, self).start()
# Start event notifier
notifier_thread = self.notifier.start()
if notifier_thread is not None:
self.threads.append(notifier_thread)
# Start Prometheus
prom_port = int(get_setting('FAUCET_PROMETHEUS_PORT'))
prom_addr = get_setting('FAUCET_PROMETHEUS_ADDR')
self.metrics.start(prom_port, prom_addr)
# Configure all Valves
if self.stat_reload:
self.logger.info('will automatically reload new config on changes')
self._load_configs(self.config_file)
# Start all threads
self.threads.extend([
hub.spawn(thread) for thread in (
self._gateway_resolve_request, self._state_expire_request,
self._metric_update_request, self._advertise_request,
self._config_file_stat)])
# Register to API
self.api._register(self)
self.send_event_to_observers(EventFaucetExperimentalAPIRegistered())
# Set the signal handler for reloading config file
signal.signal(signal.SIGHUP, self._signal_handler)
signal.signal(signal.SIGINT, self._signal_handler)
def _apply_configs_existing(self, dp_id, new_dp):
logging.info('Reconfiguring existing datapath %s', dpid_log(dp_id))
valve = self.valves[dp_id]
cold_start, flowmods = valve.reload_config(new_dp)
if flowmods:
if cold_start:
self.metrics.faucet_config_reload_cold.labels( # pylint: disable=no-member
**valve.base_prom_labels).inc()
self.logger.info('Cold starting %s', dpid_log(dp_id))
else:
self.metrics.faucet_config_reload_warm.labels( # pylint: disable=no-member
**valve.base_prom_labels).inc()
self.logger.info('Warm starting %s', dpid_log(dp_id))
self._send_flow_msgs(new_dp.dp_id, flowmods)
return valve
self.logger.info('No changes to datapath %s', dpid_log(dp_id))
return None
def _apply_configs_new(self, dp_id, new_dp):
self.logger.info('Add new datapath %s', dpid_log(dp_id))
valve_cl = valve_factory(new_dp)
if valve_cl is not None:
return valve_cl(new_dp, self.logname, self.notifier)
self.logger.error(
'%s hardware %s must be one of %s',
new_dp.name,
new_dp.hardware,
sorted(list(SUPPORTED_HARDWARE.keys())))
return None
def _apply_configs(self, new_dps):
"""Actually apply configs, if there were any differences."""
deleted_valve_dpids = (
set(list(self.valves.keys())) -
set([valve.dp_id for valve in new_dps]))
for new_dp in new_dps:
dp_id = new_dp.dp_id
if dp_id in self.valves:
valve = self._apply_configs_existing(dp_id, new_dp)
else:
valve = self._apply_configs_new(dp_id, new_dp)
if valve is not None:
self.valves[dp_id] = valve
self.metrics.reset_dpid(valve.base_prom_labels)
valve.update_config_metrics(self.metrics)
for deleted_valve_dpid in deleted_valve_dpids:
self.logger.info(
'Deleting de-configured %s', dpid_log(deleted_valve_dpid))
del self.valves[deleted_valve_dpid]
ryu_dp = self.dpset.get(deleted_valve_dpid)
if ryu_dp is not None:
ryu_dp.close()
self._bgp.reset(self.valves, self.metrics)
@kill_on_exception(exc_logname)
def _load_configs(self, new_config_file):
try:
new_config_hashes, new_dps = dp_parser(new_config_file, self.logname)
self.config_file = new_config_file
self.config_hashes = new_config_hashes
self._apply_configs(new_dps)
except InvalidConfigError as err:
self.logger.error('New config bad (%s) - rejecting', err)
return
@kill_on_exception(exc_logname)
def _send_flow_msgs(self, dp_id, flow_msgs, ryu_dp=None):
"""Send OpenFlow messages to a connected datapath.
Args:
dp_id (int): datapath ID.
flow_msgs (list): OpenFlow messages to send.
ryu_dp: Override datapath from DPSet.
"""
if ryu_dp is None:
ryu_dp = self.dpset.get(dp_id)
if not ryu_dp:
self.logger.error('send_flow_msgs: %s not up', dpid_log(dp_id))
return
if dp_id not in self.valves:
self.logger.error('send_flow_msgs: unknown %s', dpid_log(dp_id))
return
valve = self.valves[dp_id]
reordered_flow_msgs = valve_of.valve_flowreorder(flow_msgs)
valve.ofchannel_log(reordered_flow_msgs)
for flow_msg in reordered_flow_msgs:
self.metrics.of_flowmsgs_sent.labels( # pylint: disable=no-member
**valve.base_prom_labels).inc()
flow_msg.datapath = ryu_dp
ryu_dp.send_msg(flow_msg)
if valve.recent_ofmsgs.full():
valve.recent_ofmsgs.get()
valve.recent_ofmsgs.put(flow_msg)
def _get_valve(self, ryu_dp, handler_name, msg=None):
"""Get Valve instance to response to an event.
Args:
ryu_dp (ryu.controller.controller.Datapath): datapath.
handler_name (string): handler name to log if datapath unknown.
msg (ryu.controller.ofp_event.EventOFPMsgBase): message from datapath.
Returns:
Valve instance or None.
"""
dp_id = ryu_dp.id
if dp_id in self.valves:
valve = self.valves[dp_id]
if msg:
valve.ofchannel_log([msg])
return valve
ryu_dp.close()
self.logger.error(
'%s: unknown datapath %s', handler_name, dpid_log(dp_id))
return None
def _signal_handler(self, sigid, _):
"""Handle any received signals.
Args:
sigid (int): signal to handle.
"""
if sigid == signal.SIGHUP:
self.send_event('Faucet', EventFaucetReconfigure())
elif sigid == signal.SIGINT:
self.close()
sys.exit(0)
@staticmethod
def _thread_jitter(period, jitter=2):
"""Reschedule another thread with a random jitter."""
hub.sleep(period + random.randint(0, jitter))
def _thread_reschedule(self, ryu_event, period, jitter=2):
"""Trigger Ryu events periodically with a jitter.
Args:
ryu_event (ryu.controller.event.EventReplyBase): event to trigger.
period (int): how often to trigger.
"""
while True:
self.send_event('Faucet', ryu_event)
self._thread_jitter(period, jitter)
@kill_on_exception(exc_logname)
def _config_file_stat(self):
"""Periodically stat config files for any changes."""
# TODO: Better to use an inotify method that doesn't conflict with eventlets.
while True:
if self.config_hashes:
new_config_file_stats = valve_util.stat_config_files(
self.config_hashes)
if self.config_file_stats:
if new_config_file_stats != self.config_file_stats:
if self.stat_reload:
self.send_event('Faucet', EventFaucetReconfigure())
self.logger.info('config file(s) changed on disk')
self.config_file_stats = new_config_file_stats
self._thread_jitter(3)
def _gateway_resolve_request(self):
self._thread_reschedule(EventFaucetResolveGateways(), 2)
def _state_expire_request(self):
self._thread_reschedule(EventFaucetStateExpire(), 5)
def _metric_update_request(self):
self._thread_reschedule(EventFaucetMetricUpdate(), 5)
def _advertise_request(self):
self._thread_reschedule(EventFaucetAdvertise(), 5)
[docs] @set_ev_cls(EventFaucetResolveGateways, MAIN_DISPATCHER)
@kill_on_exception(exc_logname)
def resolve_gateways(self, _):
"""Handle a request to re/resolve gateways."""
for dp_id, valve in list(self.valves.items()):
flowmods = valve.resolve_gateways()
if flowmods:
self._send_flow_msgs(dp_id, flowmods)
[docs] @set_ev_cls(EventFaucetStateExpire, MAIN_DISPATCHER)
@kill_on_exception(exc_logname)
def state_expire(self, _):
"""Handle a request expire host state in the controller."""
for dp_id, valve in list(self.valves.items()):
flowmods = valve.state_expire()
if flowmods:
self._send_flow_msgs(dp_id, flowmods)
[docs] @set_ev_cls(EventFaucetMetricUpdate, MAIN_DISPATCHER)
@kill_on_exception(exc_logname)
def metric_update(self, _):
"""Handle a request to update metrics in the controller."""
self._bgp.update_metrics()
for valve in list(self.valves.values()):
valve.update_metrics(self.metrics)
[docs] @set_ev_cls(EventFaucetAdvertise, MAIN_DISPATCHER)
@kill_on_exception(exc_logname)
def advertise(self, _):
"""Handle a request to advertise services."""
for dp_id, valve in list(self.valves.items()):
flowmods = valve.advertise()
if flowmods:
self._send_flow_msgs(dp_id, flowmods)
[docs] def get_config(self):
"""FAUCET experimental API: return config for all Valves."""
return get_config_for_api(self.valves)
[docs] def get_tables(self, dp_id):
"""FAUCET experimental API: return config tables for one Valve."""
return self.valves[dp_id].dp.get_tables()
[docs] @set_ev_cls(EventFaucetReconfigure, MAIN_DISPATCHER)
@kill_on_exception(exc_logname)
def reload_config(self, _):
"""Handle a request to reload configuration."""
self.logger.info('request to reload configuration')
new_config_file = self.config_file
if config_changed(self.config_file, new_config_file, self.config_hashes):
self.logger.info('configuration %s changed, analyzing differences', new_config_file)
self._load_configs(new_config_file)
else:
self.logger.info('configuration is unchanged, not reloading')
self.metrics.faucet_config_reload_requests.inc() # pylint: disable=no-member
[docs] @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER) # pylint: disable=no-member
@kill_on_exception(exc_logname)
def packet_in_handler(self, ryu_event):
"""Handle a packet in event from the dataplane.
Args:
ryu_event (ryu.controller.event.EventReplyBase): packet in message.
"""
msg = ryu_event.msg
ryu_dp = msg.datapath
dp_id = ryu_dp.id
valve = self._get_valve(ryu_dp, 'packet_in_handler', msg)
if valve is None:
return
if not valve.dp.running:
return
if valve.dp.cookie != msg.cookie:
return
in_port = msg.match['in_port']
if valve_of.ignore_port(in_port):
return
# Truncate packet in data (OVS > 2.5 does not honor max_len)
msg.data = msg.data[:valve_of.MAX_PACKET_IN_BYTES]
# eth/VLAN header only
pkt, eth_pkt, vlan_vid, eth_type = valve_packet.parse_packet_in_pkt(
msg.data, max_len=valve_packet.ETH_VLAN_HEADER_SIZE)
if vlan_vid is None:
self.logger.info(
'packet without VLAN header from %s port %s', dpid_log(dp_id), in_port)
return
if pkt is None:
self.logger.info(
'unparseable packet from %s port %s', dpid_log(dp_id), in_port)
return
if vlan_vid not in valve.dp.vlans:
self.logger.info(
'packet for unknown VLAN %u from %s', vlan_vid, dpid_log(dp_id))
return
if in_port not in valve.dp.ports:
self.logger.info(
'packet for unknown port %u from %s', in_port, dpid_log(dp_id))
return
pkt_meta = valve.parse_rcv_packet(
in_port, vlan_vid, eth_type, msg.data, msg.total_len, pkt, eth_pkt)
other_valves = [other_valve for other_valve in list(self.valves.values()) if valve != other_valve]
self.metrics.of_packet_ins.labels( # pylint: disable=no-member
**valve.base_prom_labels).inc()
packet_in_start = time.time()
flowmods = valve.rcv_packet(other_valves, pkt_meta)
packet_in_stop = time.time()
self.metrics.faucet_packet_in_secs.labels( # pylint: disable=no-member
**valve.base_prom_labels).observe(packet_in_stop - packet_in_start)
self._send_flow_msgs(dp_id, flowmods)
valve.update_metrics(self.metrics)
[docs] @set_ev_cls(ofp_event.EventOFPErrorMsg, MAIN_DISPATCHER) # pylint: disable=no-member
@kill_on_exception(exc_logname)
def error_handler(self, ryu_event):
"""Handle an OFPError from a datapath.
Args:
ryu_event (ryu.controller.ofp_event.EventOFPErrorMsg): trigger
"""
msg = ryu_event.msg
ryu_dp = msg.datapath
dp_id = ryu_dp.id
valve = self._get_valve(ryu_dp, 'error_handler', msg)
if valve is None:
return
self.metrics.of_errors.labels( # pylint: disable=no-member
**valve.base_prom_labels).inc()
error_txt = msg
while not valve.recent_ofmsgs.empty():
flow_msg = valve.recent_ofmsgs.get()
if msg.xid == flow_msg.xid:
error_txt = '%s caused by %s' % (msg, flow_msg)
break
self.logger.error('%s OFError %s', dpid_log(dp_id), error_txt)
[docs] @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER) # pylint: disable=no-member
@kill_on_exception(exc_logname)
def features_handler(self, ryu_event):
"""Handle receiving a switch features message from a datapath.
Args:
ryu_event (ryu.controller.ofp_event.EventOFPStateChange): trigger.
"""
msg = ryu_event.msg
ryu_dp = msg.datapath
dp_id = ryu_dp.id
valve = self._get_valve(ryu_dp, 'features_handler', msg)
if valve is None:
return
flowmods = valve.switch_features(msg)
self._send_flow_msgs(dp_id, flowmods, ryu_dp=ryu_dp)
@kill_on_exception(exc_logname)
def _datapath_connect(self, ryu_dp):
"""Handle any/all re/connection of a datapath.
Args:
ryu_dp (ryu.controller.controller.Datapath): datapath.
"""
def port_up_valid(port):
"""Return True if port is up and in valid range."""
return port.state == 0 and not valve_of.ignore_port(port.port_no)
dp_id = ryu_dp.id
valve = self._get_valve(ryu_dp, '_datapath_connect')
if valve is None:
return
discovered_up_port_nums = [
port.port_no for port in list(ryu_dp.ports.values()) if port_up_valid(port)]
flowmods = valve.datapath_connect(discovered_up_port_nums)
self._send_flow_msgs(dp_id, flowmods)
self.metrics.of_dp_connections.labels( # pylint: disable=no-member
**valve.base_prom_labels).inc()
self.metrics.dp_status.labels( # pylint: disable=no-member
**valve.base_prom_labels).set(1)
@kill_on_exception(exc_logname)
def _datapath_disconnect(self, ryu_dp):
"""Handle any/all disconnection of a datapath.
Args:
ryu_dp (ryu.controller.controller.Datapath): datapath.
"""
valve = self._get_valve(ryu_dp, '_datapath_disconnect')
if valve is None:
return
valve.datapath_disconnect()
self.metrics.of_dp_disconnections.labels( # pylint: disable=no-member
**valve.base_prom_labels).inc()
self.metrics.dp_status.labels( # pylint: disable=no-member
**valve.base_prom_labels).set(0)
[docs] @set_ev_cls(dpset.EventDP, dpset.DPSET_EV_DISPATCHER)
@kill_on_exception(exc_logname)
def connect_or_disconnect_handler(self, ryu_event):
"""Handle connection or disconnection of a datapath.
Args:
ryu_event (ryu.controller.dpset.EventDP): trigger.
"""
ryu_dp = ryu_event.dp
dp_id = ryu_dp.id
valve = self._get_valve(ryu_dp, 'handler_connect_or_disconnect')
if valve is None:
return
if ryu_event.enter:
self.logger.info('%s connected', dpid_log(dp_id))
self._datapath_connect(ryu_dp)
else:
self.logger.info('%s disconnected', dpid_log(dp_id))
self._datapath_disconnect(ryu_dp)
[docs] @set_ev_cls(dpset.EventDPReconnected, dpset.DPSET_EV_DISPATCHER)
@kill_on_exception(exc_logname)
def reconnect_handler(self, ryu_event):
"""Handle reconnection of a datapath.
Args:
ryu_event (ryu.controller.dpset.EventDPReconnected): trigger.
"""
ryu_dp = ryu_event.dp
dp_id = ryu_dp.id
valve = self._get_valve(ryu_dp, 'reconnect_handler')
if valve is None:
return
self.logger.debug('%s reconnected', dpid_log(dp_id))
self._datapath_connect(ryu_dp)
[docs] @set_ev_cls(ofp_event.EventOFPDescStatsReply, MAIN_DISPATCHER) # pylint: disable=no-member
@kill_on_exception(exc_logname)
def desc_stats_reply_handler(self, ryu_event):
"""Handle OFPDescStatsReply from datapath.
Args:
ryu_event (ryu.controller.ofp_event.EventOFPDescStatsReply): trigger.
"""
ryu_dp = ryu_event.msg.datapath
dp_id = ryu_dp.id
valve = self._get_valve(ryu_dp, 'desc_stats_reply_handler')
if valve is None:
return
body = ryu_event.msg.body
self.metrics.of_dp_desc_stats.labels( # pylint: disable=no-member
**dict(valve.base_prom_labels,
mfr_desc=body.mfr_desc,
hw_desc=body.hw_desc,
sw_desc=body.sw_desc,
serial_num=body.serial_num,
dp_desc=body.dp_desc)).set(dp_id)
[docs] @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER) # pylint: disable=no-member
@kill_on_exception(exc_logname)
def port_status_handler(self, ryu_event):
"""Handle a port status change event.
Args:
ryu_event (ryu.controller.ofp_event.EventOFPPortStatus): trigger.
"""
msg = ryu_event.msg
ryu_dp = msg.datapath
dp_id = ryu_dp.id
valve = self._get_valve(ryu_dp, 'port_status_handler', msg)
if valve is None:
return
if not valve.dp.running:
return
port_no = msg.desc.port_no
if valve_of.ignore_port(port_no):
return
ofp = msg.datapath.ofproto
reason = msg.reason
port_down = msg.desc.state & ofp.OFPPS_LINK_DOWN
port_status = not port_down
flowmods = valve.port_status_handler(
port_no, reason, port_status)
self._send_flow_msgs(dp_id, flowmods)
port_labels = dict(valve.base_prom_labels, port=port_no)
self.metrics.port_status.labels( # pylint: disable=no-member
**port_labels).set(port_status)
[docs] @set_ev_cls(ofp_event.EventOFPFlowRemoved, MAIN_DISPATCHER) # pylint: disable=no-member
@kill_on_exception(exc_logname)
def flowremoved_handler(self, ryu_event):
"""Handle a flow removed event.
Args:
ryu_event (ryu.controller.ofp_event.EventOFPFlowRemoved): trigger.
"""
msg = ryu_event.msg
ryu_dp = msg.datapath
valve = self._get_valve(ryu_dp, 'flowremoved_handler', msg)
if valve is None:
return
ofp = msg.datapath.ofproto
reason = msg.reason
if reason == ofp.OFPRR_IDLE_TIMEOUT:
flowmods = valve.flow_timeout(msg.table_id, msg.match)
if flowmods:
self._send_flow_msgs(ryu_dp.id, flowmods)