"""Define affine demand and supply curve utilities."""
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import Latex, display
from .base import PolyBase
from .plotting import textbook_axes, AREA_FILLS, update_axes_limits
from freeride.formula import _formula
[docs]
class AffineElement(PolyBase):
"""
This class extends the PolyBase class and represents an affine function commonly
used in supply and demand curves. This does allow for negative quantities.
Parameters
--------
intercept (float): The intercept of the affine function.
slope (float): The slope of the affine function.
inverse (bool, optional): If True, interprets the parameters as inverse slope
and intercept. Defaults to True.
Methods
--------
vertical_shift(delta: float):
Shift the curve vertically by the given amount.
horizontal_shift(delta: float):
Shift the curve horizontally by the given amount.
price_elasticity(p: float) -> float:
Calculate the point price elasticity at a given price.
midpoint_elasticity(p1: float, p2: float) -> float:
Calculate the price elasticity between two prices using the midpoint formula.
plot(ax=None, textbook_style=True, max_q=10, color='black', linewidth=2, label=True):
Plot the supply or demand curve.
Attributes
--------
intercept (float): The intercept of the affine function.
slope (float): The slope of the affine function.
q_intercept (float): The quantity intercept of the affine function.
Example
--------
To create an AffineElement object and use its methods:
>>> demand_curve = Affine(10.0, -1.0)
>>> demand_curve.q(4.0) # Calculate the quantity at price p=4.0
6.0
"""
[docs]
def __init__(self, intercept, slope, inverse=True, symbols=None):
"""
Initialize an AffineElement with the given intercept and slope.
This method creates an instance of the class with the specified intercept and slope.
The parameters can be interpreted as inverse slope and intercept if the
`inverse` parameter is True.
Parameters
--------
intercept (float): The intercept of the affine function.
slope (float): The slope of the affine function.
inverse (bool, optional): If True, interprets the parameters as inverse slope
and intercept. Defaults to True.
Returns
--------
AffineElement: An AffineElement object representing the supply or demand curve.
Example
--------
>>> supply_curve = AffineElement(10.0, 2.0)
"""
if symbols is None:
x, y = "q", "p"
elif isinstance(symbols, str):
x, y = "q", "p"
else:
x, y = symbols
self.symbols = symbols
if slope == 0:
if inverse: # perfectly elastic
self.intercept = intercept
self.q_intercept = np.nan
self.slope = 0
self.inverse_expression = f"{self.intercept:g}"
self.expression = "undefined"
self._symbol = x # rhs is 0*q
else: # perfectly inelastic
self.q_intercept = intercept
self.slope = np.inf
self.intercept = np.nan
self.inverse_expression = "undefined"
self.expression = f"{self.q_intercept:g}"
self._symbol = y # rhs is 0*p
self.coef = (self.intercept, self.slope)
super().__init__(self.coef, symbols=symbols)
else:
if not inverse:
slope, intercept = 1 / slope, -intercept / slope
coef = (intercept, slope)
super().__init__(coef, symbols=symbols)
self.intercept = intercept
self.slope = slope
self.q_intercept = -intercept / slope
self.inverse_expression = f"{intercept:g}{slope:+g}{x}"
self.expression = f"{self.q_intercept:g}{1/slope:+g}{y}"
def __call__(self, x):
if self.slope == np.inf:
raise ValueError(
f"Undefined (perfectly inelastic at {self.q_intercept})"
)
else:
return self.intercept + self.slope * x
def __mul__(self, scalar):
return type(self)(
intercept=self.intercept,
slope=self.slope * (1 / scalar),
inverse=True,
symbols=self.symbols,
)
def __rmul__(self, scalar):
return self.__mul__(scalar)
[docs]
def vertical_shift(self, delta, inplace=True):
"""
Shift the curve vertically by the given amount.
This method shifts the supply or demand curve vertically by the specified
amount `delta`.
A positive `delta` shifts the demand curve to the right. A negative
`delta` shifts the supply curve to the left.
Parameters
--------
delta (float): The amount to shift the curve vertically.
Returns
--------
None
Example
--------
>>> supply_curve = Affine(10.0, -2.0)
>>> supply_curve.vertical_shift(2.0)
"""
new_intercept = self.intercept + delta
# if self.slope != 0:
# self.q_intercept = -self.intercept / self.slope
if inplace:
self.__init__(new_intercept, self.slope)
else:
return self.__class__(new_intercept, self.slope, symbols=self.symbols)
[docs]
def horizontal_shift(self, delta, inplace=True):
"""
Shift the curve horizontally by the given amount.
This method shifts the supply or demand curve horizontally by the specified amount `delta`.
Positive values of `delta` shift the curve to the right.
Parameters
--------
delta (float): The amount to shift the curve horizontally.
Returns
--------
None
Example
--------
>>> demand_curve = Affine(10.0, -2.0)
>>> demand_curve.horizontal_shift(1.0)
"""
if self.slope == np.inf:
new_q_intercept = self.q_intercept + delta
if inplace:
self.__init__(new_q_intercept, 0, inverse=False)
else:
return self.__class__(
new_q_intercept, 0, inverse=False, symbols=self.symbols
)
else:
equiv_vert = delta * -self.slope
new_intercept = self.intercept + equiv_vert
# if self.slope != 0:
# self.q_intercept = -self.intercept / self.slope
if inplace:
self.__init__(new_intercept, self.slope)
else:
return self.__class__(new_intercept, self.slope, symbols=self.symbols)
[docs]
def price_elasticity(self, p):
"""
Calculate the point price elasticity at a given price.
This method calculates the point price elasticity at the specified price `p`.
Parameters
--------
p (float): The price at which to calculate the elasticity.
Returns
--------
float: The point price elasticity.
Example
--------
>>> demand_curve = Affine(10.0, -2.0)
>>> demand_curve.price_elasticity(4.0)
"""
if p < 0:
raise ValueError("Negative price.")
if p > self.intercept:
raise ValueError("Price above choke price.")
q = self.q(p)
e = (1 / self.slope) * (p / q)
return e
[docs]
def midpoint_elasticity(self, p1, p2):
"""
Find price elasticity between two prices using the midpoint formula.
This method calculates the price elasticity between two prices, `p1` and `p2`,
using the midpoint formula.
Parameters
--------
p1 (float): The first price.
p2 (float): The second price.
Returns
--------
float: The price elasticity between the two prices.
Example
--------
>>> demand_curve = Affine(10.0, -2.0)
>>> demand_curve.midpoint_elasticity(3.0, 5.0)
"""
if (p1 < 0) or (p2 < 0):
raise ValueError("Negative price.")
if (p1 > self.intercept) or (p2 > self.intercept):
raise ValueError("Price above choke price.")
mean_p = 0.5 * p1 + 0.5 * p2
return self.price_elasticity(mean_p)
[docs]
def plot(self, ax=None, textbook_style=True, max_q=None, label=True, **kwargs):
"""
Plot the affine curve.
Parameters
----------
ax : matplotlib.axes._axes.Axes, optional
The matplotlib axis to use for plotting. If not provided, the current
axes will be used or a new figure will be created.
textbook_style : bool, optional
If True, use textbook-style plot formatting with clean axes and
appropriate labels. Defaults to True.
max_q : float, optional
The maximum quantity value for the plot. If not specified, a sensible
default will be calculated based on the curve's intercepts.
label : bool, optional
If True, label the curve and axes. Defaults to True.
**kwargs : dict
Additional keyword arguments passed to matplotlib's plot function.
Common options include:
- color : str, default 'black'
- linewidth : int, default 2
- linestyle : str, default '-'
- alpha : float, default 1.0
Returns
-------
matplotlib.axes._axes.Axes
The axes object containing the plot, which can be further customized.
Example
--------
>>> demand_curve = AffineElement(10.0, -2.0)
>>> demand_curve.plot()
"""
if ax is None:
ax = plt.gca()
# core plot
if self._domain:
x1, x2 = self._domain
if x2 == np.inf:
x2 = max_q if max_q else x1 * 2 + 1
else:
x1 = 0
q_ = self.q_intercept
if np.isnan(q_): # if slope is 0
q_ = 10**10
if type(self).__name__ == "Supply":
x2 = np.max([10, q_ * 2])
else:
x2 = q_
y1 = self(x1)
y2 = self(x2)
xs = np.linspace(x1, x2, 2)
ys = np.linspace(y1, y2, 2)
if "color" not in kwargs:
kwargs["color"] = "black"
ax.plot(xs, ys, **kwargs)
if textbook_style:
textbook_axes(ax)
if label:
# Label Curves
# if type(self).__name__ == 'Demand':
# x0 = self.q_intercept * .95
# else:
# if name:
# x0 = ax.get_xlim()[1] * .9
# y_delta = (ax.get_ylim()[1] - ax.get_ylim()[0])/30
# y0 = self.p(x0) + y_delta
# ax.text(x0, y0, name, va = 'bottom', ha = 'center', size = 14)
# Label Axes
ax.set_ylabel("Price")
ax.set_xlabel("Quantity")
update_axes_limits(ax)
return ax
[docs]
def plot_area(
self, p, q=None, ax=None, zorder=-1, color=None, alpha=None, force=False
):
"""
Plot surplus region
"""
if ax is None:
ax = self.plot()
if q is None:
q0 = np.min(self._domain)
q1 = np.max(self._domain)
q = q0, q1
else:
q0, q1 = q
if not force:
qstar = self.q(p)
if q0 < qstar <= q1:
q = q0, qstar
elif q1 < qstar:
q = q0, q1
elif qstar < q0: # plot nothing if no surplus in region
return ax
p01 = self.p(q[0]), self.p(q[1])
ax.fill_between(q, p01, p, zorder=zorder, color=color, alpha=alpha)
update_axes_limits(ax)
return ax
[docs]
def intersection(element1, element2):
"""Return the intersection of two affine elements.
The result is a 1D array ``[p, q]`` giving the price and quantity at the
intersection. When either line is perfectly vertical (``slope == np.inf``)
or perfectly horizontal (``slope == 0``) the intersection is computed
directly. If both lines are vertical or both horizontal, a
``LinAlgError`` is raised.
Parameters
----------
element1, element2 : :class:`AffineElement`
Lines for which to compute the intersection.
Returns
-------
numpy.ndarray
``[p, q]`` of the intersection point.
Raises
------
numpy.linalg.LinAlgError
If the lines are parallel.
Examples
--------
>>> line1 = AffineElement(intercept=12, slope=-1)
>>> line2 = AffineElement(intercept=0, slope=2)
>>> intersection(line1, line2)
array([8., 4.])
"""
# Parallel vertical or horizontal lines
if (element1.slope == np.inf and element2.slope == np.inf) or (
element1.slope == 0 and element2.slope == 0
):
raise np.linalg.LinAlgError("Lines are parallel")
# Handle a vertical line (perfectly inelastic)
if element1.slope == np.inf:
q = element1.q_intercept
p = element2(q)
return np.array([p, q])
if element2.slope == np.inf:
q = element2.q_intercept
p = element1(q)
return np.array([p, q])
# Handle a horizontal line (perfectly elastic)
if element1.slope == 0:
p = element1.intercept
q = element2.q(p)
return np.array([p, q])
if element2.slope == 0:
p = element2.intercept
q = element1.q(p)
return np.array([p, q])
# Generic case
A = np.array([[1, -element1.slope], [1, -element2.slope]])
b = np.array([[element1.intercept], [element2.intercept]])
yx = np.matmul(np.linalg.inv(A), b)
return np.squeeze(yx)
[docs]
def blind_sum(*curves):
'''
Computes the horizontal summation of AffineElement objects.
Parameters
----------
*curves : AffineElement
The objects to be summed.
Returns
-------
AffineElement
The horizontal summation of the input curves represented as an AffineElement object.
Returns None if no curves are provided.
'''
if len(curves) == 0:
return None
elastic_curves = [c for c in curves if c.slope == 0]
inelastic_curves = [c for c in curves if c.slope == np.inf]
regular_curves = [c for c in curves if c not in elastic_curves + inelastic_curves]
if not elastic_curves and not inelastic_curves:
qintercept = np.sum([-c.intercept/c.slope for c in curves])
qslope = np.sum([1/c.slope for c in curves])
return AffineElement(qintercept, qslope, inverse = False)
else:
raise ValueError("Perfectly Elastic and Inelastic curves not supported")
[docs]
def horizontal_sum(*curves):
"""
Compute active curves at different price midpoints based on the p-intercepts of input curves.
Parameters
----------
*curves : sequence of AffineElements
Variable-length argument list of Affine curve objects for which
the active curves are to be found.
Returns
-------
tuple
A tuple containing three elements:
- active_curves (list): List of AffineElement objects representing
the active curves at each price midpoint.
- cutoffs (list): List of unique p-intercepts sorted in ascending order.
- midpoints (list): List of midpoints computed based on the cutoffs.
"""
elastic_curves = [c for c in curves if c.slope == 0]
inelastic_curves = [c for c in curves if c.slope == np.inf]
regular_curves = [c for c in curves if c not in elastic_curves + inelastic_curves]
intercepts = [0] + [c.intercept for c in regular_curves + elastic_curves]
cutoffs = sorted(list(set(intercepts)))
# remove negative intercepts
cutoffs = [c for c in cutoffs if c>=0]
# get a point in each region
midpoints = [(a + b) / 2 for a, b in zip(cutoffs[:-1], cutoffs[1:])] + [cutoffs[-1]+1]
# get curves with positive quantity for each region
active_curves = [blind_sum(*[c for c in curves if c.q(price)>0]) for price in midpoints]
return active_curves, cutoffs, midpoints
def ppf_sum(*curves, comparative_advantage=True):
"""Combine production possibilities frontiers.
Parameters
----------
*curves : sequence of :class:`AffineElement`
PPF curves to aggregate.
comparative_advantage : bool, optional
When ``True`` curves are ordered by slope from steepest to
flattest before summation, highlighting comparative advantage.
Returns
-------
list of :class:`AffineElement`
The shifted and vertically stacked PPF segments forming the
aggregate frontier.
"""
slope_and_curves = sorted([(s.slope, s) for s in curves], reverse=comparative_advantage)
curves = [t[1] for t in slope_and_curves]
x_intercepts = [c.q_intercept for c in curves]
y_intercepts = [c.intercept for c in curves]
for key, ppf in enumerate(curves):
previous_x = sum(x_intercepts[0:key])
below_y = sum(y_intercepts[key+1:])
new = ppf.vertical_shift(below_y, inplace=False)
new.horizontal_shift(previous_x)
curves[key] = new
new._domain = previous_x + ppf.q_intercept, previous_x
return curves
class BaseAffine:
def __init__(self, intercept=None, slope=None, elements=None, inverse=True, sum_elements=True):
"""
Initialize the BaseAffine object.
Parameters
----------
intercept : float or list of floats, optional
The intercept(s) of the affine transformation. Default is None.
slope : float or list of floats, optional
The slope(s) of the affine transformation. Default is None.
elements : list of AffineElement, optional
List of AffineElement objects. If provided, it overrides
`intercept` and `slope`. Default is None.
inverse : bool, optional
Indicates if the transformation should be inverted. Default is True.
sum_elements : bool, optional
Whether to sum elements together (True) or keep them as separate
pieces (False). Default is True.
Raises
------
ValueError
If the lengths of `slope` and `intercept` do not match.
"""
if elements is None:
if isinstance(slope, (int, float)):
slope = [slope]
if isinstance(intercept, (int, float)):
intercept = [intercept]
if len(slope) != len(intercept):
raise ValueError("Slope and intercept lengths do not match.")
zipped = zip(slope, intercept)
elements = [AffineElement(slope=m, intercept=b, inverse=inverse) for m, b in zipped]
self.elements = elements
self.sum_elements = sum_elements
if intercept is None:
intercept = [c.intercept for c in elements]
if slope is None:
slope = [c.slope for c in elements]
self.intercept = intercept
self.slope = slope
def __bool__(self):
return bool(np.any([bool(el) for el in self.elements]))
def _has_perfectly_elastic_segment(self):
"""Return ``True`` if any element is perfectly elastic (zero slope)."""
return any(el.slope == 0 for el in self.elements)
def _has_perfectly_inelastic_segment(self):
"""Return ``True`` if any element is perfectly inelastic (infinite slope)."""
return any(np.isinf(el.slope) for el in self.elements)
@property
def has_perfectly_elastic_segment(self):
"""bool: Whether the curve contains a perfectly elastic segment."""
return self._has_perfectly_elastic_segment()
@property
def has_perfectly_inelastic_segment(self):
"""bool: Whether the curve contains a perfectly inelastic segment."""
return self._has_perfectly_inelastic_segment()
@property
def has_perfect_segment(self):
"""bool: Whether the curve contains a perfectly elastic or inelastic segment."""
return self.has_perfectly_elastic_segment or self.has_perfectly_inelastic_segment
@classmethod
def from_two_points(cls, x1, y1, x2, y2):
"""
Creates an Affine object from two points.
"""
A = np.array([[x1, 1], [x2, 1]])
b = np.array([y1, y2])
slope, intercept = np.linalg.solve(A, b)
return cls(slope=slope, intercept=intercept)
@classmethod
def from_points(cls, xy_points):
"""
Creates an Affine object from two points.
In the future, this might be extended to allow for three or more points.
"""
A_array = [[qp[0], 1] for qp in xy_points]
p_vals = [qp[1] for qp in xy_points]
A = np.array(A_array)
b = np.array(p_vals)
slope, intercept = np.linalg.solve(A, b)
return cls(slope=slope, intercept=intercept)
@classmethod
def from_formula(cls, equation: str):
intercept, slope = _formula(equation)
return cls(slope=slope, intercept=intercept)
def horizontal_shift(self, delta, inplace=True):
new_elements = [e.horizontal_shift(delta, inplace=False) for e in self.elements]
if inplace:
self.__init__(elements=new_elements)
else:
return type(self)(elements=new_elements)
def vertical_shift(self, delta, inplace=True):
new_elements = [e.vertical_shift(delta, inplace=False) for e in self.elements]
if inplace:
self.__init__(elements=new_elements)
else:
return type(self)(elements=new_elements)
@property
def q_intercept(self):
"""Return the quantity intercept(s) for the affine elements."""
q_int = [-b / m for b, m in zip(self.intercept, self.slope)]
return q_int[0] if len(q_int) == 1 else q_int
[docs]
class Affine(BaseAffine):
"""
A class to represent a piecewise affine function.
"""
[docs]
def __init__(self, intercept=None, slope=None, elements=None, inverse=True, sum_elements=True):
"""
Initializes an Affine object with given slopes and intercepts or elements.
The slopes correspond to elements, which are differentiated from pieces.
When sum_elements=True: elements are horizontally summed to create aggregate pieces.
When sum_elements=False: elements are kept as separate pieces (for discontinuous functions).
Parameters
----------
intercept : float, list of float, optional
The y-intercept(s) of the elements.
slope : float, list of float, optional
The slope(s) of the elements.
elements : list of AffineElement, optional
A list of AffineElements whose horizontal sum defines the Affine object.
inverse : bool, optional
When inverse is True, it is assumed that equations are in the form P(Q).
sum_elements : bool, optional
Whether to sum elements together (True) or keep them as separate
pieces (False). Default is True.
Raises
------
ValueError
If the lengths of `slope` and `intercept` do not match.
"""
super().__init__(intercept, slope, elements, inverse, sum_elements)
# Special handling for perfectly elastic curves - they can't be summed
# so they'll only have one element and don't need horizontal_sum
if self.has_perfectly_elastic_segment:
# Set up minimal structure for a single horizontal curve
pieces = [self.elements[0]]
self.pieces = pieces
cuts = [0, np.inf]
mids = [self.elements[0].intercept]
sections = [(0, np.inf)]
qsections = [(0, np.inf)]
elif not self.sum_elements:
# Non-summing mode: keep elements as separate pieces
pieces = self.elements
self.pieces = pieces
# Create sections based on element domains
sections = []
qsections = []
cuts = []
for element in self.elements:
if hasattr(element, '_domain') and element._domain:
domain = element._domain
sections.append((min(domain), max(domain)))
qsections.append((min(domain), max(domain)))
cuts.extend([min(domain), max(domain)])
# Remove duplicates and sort cuts
cuts = sorted(list(set(cuts))) if cuts else [0, np.inf]
mids = [(a + b) / 2 for a, b in sections] if sections else [0]
else:
# Normal processing for non-horizontal curves
pieces, cuts, mids = horizontal_sum(*self.elements)
self.pieces = pieces
# store piecewise info
sections = [(cuts[i], cuts[i+1]) for i in range(len(cuts)-1)]
qsections = [ (self.q(ps[0]), self.q(ps[1])) for ps in sections]
sections.append( (cuts[-1], np.inf) )
# Skip the q() calls for perfectly elastic curves to avoid warnings
if not self.has_perfectly_elastic_segment:
if self.q(cuts[-1]+1) <= 0: # demand
qsections.append((0,0))
elif len(qsections): # supply
maxq = np.max(qsections[-1])
qsections.append((maxq, np.inf))
else: # supply
qsections.append((0, np.inf))
self.psections = sections
self.qsections = qsections
self._set_piece_domains()
# for display _repr_latex_ behavior
cond = [f"{cuts[i]} \\leq p \\leq {cuts[i+1]}" for i in range(len(cuts)-1)]
cond += [f'p \\geq {cuts[-1]}']
self.conditions = cond
#self.expressions = [f"{c.q_intercept:g}{1/c.slope:+g}p" if c else '0' for c in pieces]
self.expressions = [c.expression if c else '0' for c in pieces]
self.inverse_expressions = [c.inverse_expression if c else '0' for c in pieces]
intersections = list()
if len(pieces):
intersections = [
intersection(pieces[i], pieces[i + 1])
for i in range(len(pieces) - 1)
if pieces[i] and pieces[i + 1]
]
#maxm = np.max([intercept]), np.min([intercept])
#choke = np.max([])
# inverse conditions and expressions
self.intersections = intersections
#self.intercept = intercept
#self.slope = slope
def _set_piece_domains(self):
for piece, qs in zip(self.pieces, self.qsections):
if piece:
piece._domain = qs
piece._domain_length = np.max(qs) - np.min(qs)
def _get_active_piece(self, q):
# Filter out any None pieces
valid_pieces = [piece for piece in self.pieces if piece]
if not valid_pieces:
return None
# Check if this is the last piece
last_piece = valid_pieces[-1]
for piece in valid_pieces:
q0, q1 = np.min(piece._domain), np.max(piece._domain)
if piece is last_piece:
# Last piece: use [a,b] (closed interval)
if q0 <= q <= q1:
return piece
else:
# Not the last piece: use [a,b) (right-open interval)
if q0 <= q < q1:
return piece
return None
def __call__(self, x):
"""
Computes p given q=x.
Parameters
----------
x : float
Returns
-------
float
"""
valid_pieces = [piece for piece in self.pieces if piece]
if valid_pieces:
last_piece = valid_pieces[-1]
for piece in self.pieces:
if piece:
a, b = piece._domain
if piece is last_piece:
# Last piece: use [a,b] convention
if (a <= x <= b) or (a >= x >= b):
return piece(x)
else:
# Interior piece: use [a,b) convention
if (a <= x < b) or (a >= x > b):
return piece(x)
# might be x out of limits
return np.nan
#return np.sum([np.max([0, c(x)]) for c in self.elements])
[docs]
def q(self, p):
# returns q given p
if not self.sum_elements:
# Non-summing mode: find the piece that contains this price
valid_pieces = [
piece
for piece in self.pieces
if piece and hasattr(piece, '_domain') and piece._domain
]
if valid_pieces:
last_piece = valid_pieces[-1]
for piece in valid_pieces:
domain = piece._domain
a, b = np.min(domain), np.max(domain)
if piece is last_piece:
# Last piece: use [a,b] convention
if a <= p <= b:
try:
q_val = piece.q(p)
if np.isfinite(q_val) and q_val >= 0:
return q_val
except:
continue
else:
# Interior piece: use [a,b) convention
if a <= p < b:
try:
q_val = piece.q(p)
if np.isfinite(q_val) and q_val >= 0:
return q_val
except:
continue
return 0
else:
return np.sum([np.max([0,c.q(p)]) for c in self.elements])
[docs]
def p(self, q):
# returns p given q
return self.__call__(q)
[docs]
def equation(self, inverse=False):
if inverse:
latex_str = r"p = \begin{cases} "
for expr, cond in zip(self.inverse_expressions, self.conditions):
latex_str += f"{expr} & \\text{{if }} {cond} \\\\"
latex_str += r"\end{cases}"
return f"${latex_str}$"
else:
latex_str = r"q = \begin{cases} "
for expr, cond in zip(self.expressions, self.conditions):
latex_str += f"{expr} & \\text{{if }} {cond} \\\\"
latex_str += r"\end{cases}"
return f"${latex_str}$"
[docs]
def price_elasticity(self, p, delta=.000001):
q = self.q(p)
pt = np.array([p,q])
if self.intersections and np.any(pt == self.intersections, axis=1).max():
below = self.price_elasticity(p - delta)
above = self.price_elasticity(p + delta)
s = f"\nElasticity is {below:+.3f} below P={p} and {above:+.3f} above."
raise ValueError("Point elasticity is not defined at a kink point."+s)
else:
# Get q-domains
# Use [a,b) convention: left inclusive, right exclusive
pc = [p for p in self.pieces if p and (np.min(p._domain) <= q < np.max(p._domain))]
assert len(pc) == 1
return pc[0].price_elasticity(p)
def _repr_latex_(self):
return self.equation(inverse=False)
@property
def inverse_equation(self):
display(Latex(self.equation(inverse=True)))
def __add__(self, other):
elements = self.elements + other.elements
return type(self)(elements=elements)
def __mul__(self, scalar):
elements = [e*scalar for e in self.elements]
return type(self)(elements=elements)
def __rmul__(self, scalar):
elements = [e*scalar for e in self.elements]
return type(self)(elements=elements)
[docs]
def plot(self, ax=None, set_lims=True, max_q=None, label=True, **kwargs):
'''
Plot the Affine object.
Parameters
----------
ax : matplotlib.axes.Axes, optional
The axes on which to plot. If None, a new figure and axes will be created.
set_lims : bool, optional
Whether to automatically set the limits for the axes. Default is True.
max_q : float, optional
The maximum quantity to consider for setting the x-axis limit. If
None, it will be automatically determined.
label : bool, optional
Whether to add curve/axis labels. Default True.
**kwargs : dict
Additional keyword arguments for controlling line color, style, etc.
Returns
-------
ax : matplotlib.axes.Axes
'''
def safe_eval(x):
# Evaluate self(x) or return np.nan if out of domain or invalid
try:
val = self.__call__(x)
if np.isfinite(val):
return val
else:
return np.nan
except:
return np.nan
if ax is None:
fig, ax = plt.subplots()
# Plot each piece in self.pieces
param_names = ['color', 'linewidth', 'linestyle', 'lw', 'ls']
plot_dict = {key: kwargs[key] for key in kwargs if key in param_names}
for piece in self.pieces:
if piece:
piece.plot(ax=ax, label=label, max_q=max_q, **plot_dict)
if set_lims:
update_axes_limits(ax)
# Apply any leftover kwargs that might be e.g. title, xlim, ylim
for key, value in kwargs.items():
if hasattr(plt, key):
plt_function = getattr(plt, key)
if callable(plt_function):
if isinstance(value, (tuple, list)):
plt_function(*value)
else:
plt_function(value)
# Simple labeling
if label:
ax.set_xlabel("Quantity")
ax.set_ylabel("Price")
return ax
[docs]
def plot_surplus(self, p, q=None, ax=None, color=None, max_q=None, alpha=None):
if color is None:
color = AREA_FILLS[0] # Default color
if ax is None:
ax = self.plot(max_q=max_q)
for piece in self.pieces:
if piece:
piece.plot_area(p,
q=q,
ax=ax,
color=color,
alpha=alpha)
update_axes_limits(ax)
return ax
[docs]
def surplus(self, p, q=None):
'''
Returns surplus area. The areas are negative for producer surplus.
'''
if q is None:
q = self.q(p)
if q > 0:
# find inframarginal surplus
# Find pieces completely to the left of q (using [a,b) convention)
trapezoids = [piece for piece in self.pieces if piece and (np.max(piece._domain) <= q)]
trap_areas = [piece._domain_length*(np.mean([piece.p(piece._domain[0]),piece.p(piece._domain[1])])-p) for piece in trapezoids]
# find the last unit demanded and get surplus from that curve
last_piece = self._get_active_piece(q)
q0 = np.min(last_piece._domain)
base = q - q0
ht1 = last_piece.p(q0) - p
ht2 = last_piece.p(q) - p
area = base * 0.5 * (ht1 + ht2)
return area + np.sum(trap_areas)
else:
return 0