Source code for freeride.curves

"""Implement demand, supply and related curves."""

import warnings

import numpy as np
import matplotlib.pyplot as plt
from bokeh.plotting import figure
from bokeh.models import HoverTool, ColumnDataSource

from freeride.plotting import update_axes_limits
from freeride.formula import _formula
from freeride.affine import (
    AffineElement,
    Affine,
    intersection,
    blind_sum,
    horizontal_sum,
)
from freeride.revenue import Revenue, MarginalRevenue
from freeride.exceptions import PPFError

__all__ = [
    "ppf_sum",
    "BaseAffine",
    "Demand",
    "Supply",
    "Constraint",
    "PPF",
    "AffineElement",
    "Affine",
    "intersection",
    "blind_sum",
    "horizontal_sum",
    "Revenue",
    "MarginalRevenue",
]


[docs] def ppf_sum(*curves, comparative_advantage=True): """Combine production possibilities frontiers. Parameters ---------- *curves : sequence of :class:`AffineElement` PPF curves to aggregate. comparative_advantage : bool, optional When ``True`` curves are ordered by slope from steepest to flattest before summation, highlighting comparative advantage. Returns ------- list of :class:`AffineElement` The shifted and vertically stacked PPF segments forming the aggregate frontier. """ curves = sorted( curves, key=lambda s: s.slope, reverse=comparative_advantage, ) x_intercepts = [c.q_intercept for c in curves] y_intercepts = [c.intercept for c in curves] for key, ppf in enumerate(curves): previous_x = sum(x_intercepts[0:key]) below_y = sum(y_intercepts[key + 1:]) new = ppf.vertical_shift(below_y, inplace=False) new.horizontal_shift(previous_x) curves[key] = new new._domain = previous_x + ppf.q_intercept, previous_x return curves
[docs] class BaseAffine:
[docs] def __init__( self, intercept=None, slope=None, elements=None, inverse=True, sum_elements=True, ): """ Initialize the BaseAffine object. Parameters ---------- intercept : float or list of floats, optional The intercept(s) of the affine transformation. Default is None. slope : float or list of floats, optional The slope(s) of the affine transformation. Default is None. elements : list of AffineElement, optional List of AffineElement objects. If provided, it will override `intercept` and `slope`. Default is None. inverse : bool, optional Indicates if the transformation should be inverted. Default is True. sum_elements : bool, optional Whether to sum elements together (True) or keep them as separate pieces (False). Default is True. Raises ------ ValueError If the lengths of `slope` and `intercept` do not match. """ if elements is None: if isinstance(slope, (int, float)): slope = [slope] if isinstance(intercept, (int, float)): intercept = [intercept] if len(slope) != len(intercept): raise ValueError("Slope and intercept lengths do not match.") zipped = zip(slope, intercept) elements = [ AffineElement(slope=m, intercept=b, inverse=inverse) for m, b in zipped ] self.elements = elements self.sum_elements = sum_elements if intercept is None: intercept = [c.intercept for c in elements] if slope is None: slope = [c.slope for c in elements] self.intercept = intercept self.slope = slope
def __bool__(self): return bool(np.any([bool(el) for el in self.elements])) def _has_perfectly_elastic_segment(self): """Return ``True`` if any element is perfectly elastic (zero slope).""" return any(el.slope == 0 for el in self.elements) def _has_perfectly_inelastic_segment(self): """Return ``True`` if any element is perfectly inelastic (infinite slope).""" return any(np.isinf(el.slope) for el in self.elements) @property def has_perfectly_elastic_segment(self): """bool: Whether the curve contains a perfectly elastic segment.""" return self._has_perfectly_elastic_segment() @property def has_perfectly_inelastic_segment(self): """bool: Whether the curve contains a perfectly inelastic segment.""" return self._has_perfectly_inelastic_segment() @property def has_perfect_segment(self): """bool: Whether the curve contains a perfectly elastic or inelastic segment.""" return ( self.has_perfectly_elastic_segment or self.has_perfectly_inelastic_segment )
[docs] @classmethod def from_two_points(cls, x1, y1, x2, y2): """ Creates an Affine object from two points. """ A = np.array([[x1, 1], [x2, 1]]) b = np.array([y1, y2]) slope, intercept = np.linalg.solve(A, b) return cls(slope=slope, intercept=intercept)
[docs] @classmethod def from_points(cls, xy_points): """ Creates an Affine object from two points. In the future, this might be extended to allow for three or more points. """ A_array = [[qp[0], 1] for qp in xy_points] p_vals = [qp[1] for qp in xy_points] A = np.array(A_array) b = np.array(p_vals) slope, intercept = np.linalg.solve(A, b) return cls(slope=slope, intercept=intercept)
[docs] @classmethod def from_formula(cls, equation: str): intercept, slope = _formula(equation) return cls(slope=slope, intercept=intercept)
[docs] def horizontal_shift(self, delta, inplace=True): new_elements = [ e.horizontal_shift(delta, inplace=False) for e in self.elements ] if inplace: self.__init__(elements=new_elements) else: return type(self)(elements=new_elements)
[docs] def vertical_shift(self, delta, inplace=True): new_elements = [ e.vertical_shift(delta, inplace=False) for e in self.elements ] if inplace: self.__init__(elements=new_elements) else: return type(self)(elements=new_elements)
@property def q_intercept(self): """Return the quantity intercept(s) for the affine elements.""" q_int = [-b / m for b, m in zip(self.intercept, self.slope)] return q_int[0] if len(q_int) == 1 else q_int
[docs] class Demand(Affine):
[docs] def __init__( self, intercept=None, slope=None, elements=None, inverse=True ): """ Initializes a Demand curve object. """ super().__init__(intercept, slope, elements, inverse) self._check_slope() # Warn about perfectly elastic segments if self.has_perfectly_elastic_segment: warnings.warn( "Created perfectly elastic demand curve. " "Note: Due to current implementation limitations, this curve " "cannot be used with Equilibrium or combined with other " "curves. The economics are valid, but the software support is " "incomplete.", UserWarning, )
def _check_slope(self): for slope in self.slope: if slope > 0: raise ValueError("Upward-sloping demand curve.") # Check for economically reasonable demand curves for intercept in self.intercept: if intercept <= 0: raise ValueError( "Demand curve intercept must be positive " f"(got {intercept}). " "A demand curve represents willingness to pay, so " "the price intercept should be positive." ) if not self.has_perfectly_elastic_segment: if self.q(0) < 0: raise ValueError("Negative demand.")
[docs] def q(self, p): """ Calculate quantity demanded at price p, handling perfectly elastic segments. For horizontal demand at price P*: - q(P*) = 0 (with warning about indeterminacy) - q(P > P*) = 0 - q(P < P*) = ∞ """ # Check if we have perfectly elastic segments if self.has_perfectly_elastic_segment: for element in self.elements: if element.slope == 0: # Horizontal segment p_star = element.intercept if np.isclose(p, p_star): warnings.warn( f"Quantity demanded is indeterminate at P={p_star}" " for perfectly elastic demand. " "Returning np.inf as a placeholder " "(actual quantity is indeterminate).", UserWarning, ) return np.inf elif p > p_star: return 0 else: # p < p_star return np.inf # Default behavior for non-horizontal curves return super().q(p)
[docs] def consumer_surplus(self, p, q=None): return self.surplus(p, q)
[docs] def total_revenue(self): return Revenue.from_demand(self)
def __repr__(self): """Text representation for terminal/console.""" if len(self.elements) == 1: elem = self.elements[0] if elem.slope == 0: return f"Demand: P = {elem.intercept:g}" elif np.isinf(elem.slope): return f"Demand: Q = {elem.q_intercept:g}" else: # Format using :+g for automatic sign handling # Note: demand slopes are negative, so we negate to show positive in P = a - bQ form if elem.slope == -1: slope_part = "-Q" elif elem.slope == 1: slope_part = "+Q" else: slope_part = f"{elem.slope:+g}Q" if elem.intercept == 0: # Remove leading + for cleaner display when no intercept return f"Demand: P = {slope_part.lstrip('+')}" else: return f"Demand: P = {elem.intercept:g}{slope_part}" else: return f"Demand: {len(self.elements)}-piece piecewise function"
[docs] def marginal_revenue(self): return MarginalRevenue.from_demand(self)
def __and__(self, other): """Create a Market from intersection with Supply using & operator.""" from .equilibrium import Market if isinstance(other, Supply): return Market(demand=self, supply=other) else: return NotImplemented
[docs] class Supply(Affine):
[docs] def __init__( self, intercept=None, slope=None, elements=None, inverse=True ): """ Initializes a Supply curve object. """ super().__init__(intercept, slope, elements, inverse) self._check_slope() # Warn about perfectly elastic segments if self.has_perfectly_elastic_segment: warnings.warn( "Created perfectly elastic supply curve. " "Note: Due to current implementation limitations, this curve " "cannot be used with Equilibrium or combined with other " "curves. The economics are valid, but the software support is " "incomplete.", UserWarning, )
def _check_slope(self): for slope in self.slope: if slope < 0: raise ValueError("Downward-sloping supply curve.") if not self.has_perfectly_elastic_segment: if self.q(0) < 0: raise ValueError("Negative supply.")
[docs] def q(self, p): """ Calculate quantity supplied at price p, handling perfectly elastic segments. For horizontal supply at price P*: - q(P*) = 0 (with warning about indeterminacy) - q(P > P*) = ∞ - q(P < P*) = 0 """ # Check if we have perfectly elastic segments if self.has_perfectly_elastic_segment: for element in self.elements: if element.slope == 0: # Horizontal segment p_star = element.intercept if np.isclose(p, p_star): warnings.warn( f"Quantity supplied is indeterminate at P={p_star}" " for perfectly elastic supply. " "Returning np.inf as a placeholder (actual " "quantity is indeterminate).", UserWarning, ) return np.inf elif p > p_star: return np.inf else: # p < p_star return 0 # Default behavior for non-horizontal curves return super().q(p)
[docs] def producer_surplus(self, p, q=None): return -self.surplus(p, q)
def __repr__(self): """Text representation for terminal/console.""" if len(self.elements) == 1: elem = self.elements[0] if elem.slope == 0: return f"Supply: P = {elem.intercept:g}" elif np.isinf(elem.slope): return f"Supply: Q = {elem.q_intercept:g}" else: # Format using :+g to handle signs automatically if elem.slope == 1: slope_part = "+Q" elif elem.slope == -1: slope_part = "-Q" else: slope_part = f"{elem.slope:+g}Q" if elem.intercept == 0: # Remove leading + for cleaner display when no intercept return f"Supply: P = {slope_part.lstrip('+')}" else: return f"Supply: P = {elem.intercept:g}{slope_part}" else: return f"Supply: {len(self.elements)}-piece piecewise function" def __and__(self, other): """Create a Market from intersection with Demand using & operator.""" from .equilibrium import Market if isinstance(other, Demand): return Market(demand=other, supply=self) else: return NotImplemented
[docs] class Constraint(BaseAffine):
[docs] def __init__( self, p1, p2, endowment=1, name1=None, name2=None, elements=None, inverse=True, ): """ Incomplete. """ if elements is None: slope = -p1 / p2 intercept = endowment / p2 super().__init__(intercept, slope, elements, inverse) else: super().__init__(None, None, elements, inverse)
[docs] class PPF(BaseAffine): """ Production possibilities frontier. """
[docs] def __init__( self, intercept=None, slope=None, elements=None, inverse=True ): """ Initializes a PPF object with given slope and intercept or elements. Parameters ---------- intercept : float or list of float, optional The y-intercept(s) of the elements. slope : float or list of float, optional The slope(s) of the elements. elements : list of AffineElement, optional A list of AffineElements whose horizontal sum defines the PPF. inverse : bool, optional When inverse is True, it is assumed that equations are in the form P(Q). Raises ------ ValueError If the lengths of `slope` and `intercept` do not match. """ super().__init__(intercept, slope, elements, inverse) self._check_slope() self.pieces = ppf_sum(*self.elements)
def _check_slope(self): for slope in self.slope: if slope >= 0 or np.isinf(slope): raise PPFError("Upward-sloping or infinite-slope PPF.") def __add__(self, other): elements = self.elements + other.elements return type(self)(elements=elements) def __repr__(self): """Text representation for terminal/console.""" if len(self.pieces) == 1: piece = self.pieces[0] # PPF slopes are negative, so we negate to show as y = a - bx if piece.slope == -1: slope_part = "-x" elif piece.slope == 1: slope_part = "+x" else: slope_part = f"{piece.slope:+g}x" if piece.intercept == 0: return f"PPF: y = {slope_part.lstrip('+')}" else: return f"PPF: y = {piece.intercept:g}{slope_part}" else: return f"PPF: {len(self.pieces)}-piece frontier" def _repr_latex_(self): """LaTeX representation for Jupyter notebooks.""" if len(self.pieces) == 1: piece = self.pieces[0] if piece.slope == -1: slope_str = "x" else: slope_str = f"{-piece.slope}x" return f"$y = {piece.intercept} - {slope_str}$" else: # Piecewise representation cases = [] for piece in self.pieces: x0, x1 = piece._domain if piece.slope == -1: slope_str = "x" else: slope_str = f"{-piece.slope}x" expr = f"{piece.intercept} - {slope_str}" if x0 == 0 and np.isinf(x1): condition = f"x \\geq 0" elif x0 == 0: condition = f"0 \\leq x \\leq {x1}" elif np.isinf(x1): condition = f"x \\geq {x0}" else: condition = f"{x0} \\leq x \\leq {x1}" cases.append(f"{expr} & \\text{{if }} {condition}") cases_str = " \\\\".join(cases) return f"$y = \\begin{{cases}} {cases_str} \\end{{cases}}$" def __call__(self, x): """Return the quantity of the second good for ``x`` units of the first.""" for piece in self.pieces: if piece: a, b = piece._domain if (a <= x <= b) or (a >= x >= b): return piece(x) return np.nan
[docs] def horizontal_shift(self, delta, inplace=True): new_elements = [ e.horizontal_shift(delta, inplace=False) for e in self.elements ] if inplace: self.__init__(elements=new_elements) return self return type(self)(elements=new_elements)
[docs] def vertical_shift(self, delta, inplace=True): new_elements = [ e.vertical_shift(delta, inplace=False) for e in self.elements ] if inplace: self.__init__(elements=new_elements) return self return type(self)(elements=new_elements)
def __mul__(self, scalar): new_elements = [] for e in self.elements: new_intercept = scalar * e.intercept new_el = AffineElement( intercept=new_intercept, slope=e.slope, inverse=True, symbols=e.symbols, ) new_elements.append(new_el) return type(self)(elements=new_elements) def __rmul__(self, scalar): return self.__mul__(scalar)
[docs] def plot( self, ax=None, set_lims=True, max_q=None, label=True, backend="mpl", **kwargs, ): """ Plot the ppf. """ if backend == "bokeh": p = figure(width=400, height=400, tools="") lines_data = {"xs": [], "ys": [], "label": []} for key, piece in enumerate(self.pieces): x0, x1 = piece._domain xx = np.linspace(x0, x1) yy = [piece(u) for u in xx] lines_data["xs"].append(xx) lines_data["ys"].append(yy) lines_data["label"].append(f"Piece {key}") source = ColumnDataSource(data=lines_data) p.multi_line(xs="xs", ys="ys", source=source, line_width=2) # Add HoverTool hover = HoverTool( tooltips=[("", "@label")], renderers=[p.renderers[-1]] ) p.add_tools(hover) return p elif backend == "mpl": if ax is None: fig, ax = plt.subplots() param_names = [ "color", "linewidth", "linestyle", "lw", "ls", "marker", "markersize", ] plot_dict = { key: kwargs[key] for key in kwargs if key in param_names } # Plot each element for piece in self.pieces: if piece: piece.plot(ax=ax, label=label, max_q=max_q, **plot_dict) if label: ax.set_xlabel("Good 1") ax.set_ylabel("Good 2") # Run additional parameters as pyplot functions # xlim or ylim will overwrite the previous set_lims behavior for key, value in kwargs.items(): if hasattr(plt, key): plt_function = getattr(plt, key) if callable(plt_function): # Unpack sequences (e.g. for plt.text) if isinstance(value, tuple) or isinstance(value, list): plt_function(*value) else: plt_function(value) update_axes_limits(ax) return ax