import yaml
import pandas as pd
import numpy as np
from tulip.plots import plot_area, plot_line
from tulip_mania.helper_functions.fetch_with_cache import fetch_equity_data
from polars_bloomberg import BQuery
from tulip_mania.columns import columns
from IPython.display import Markdown
covid_start_date = "2020-03-01"
covid_end_date = "2022-03-01"
# ===
def decompose_returns(index: str, start_date: str):
data = (
eps_decomp.xs(index)[[EPS_METRIC, PE_METRIC, "log_div_ret"]]
.loc[START_DATE:]
.cumsum()
)
data["tot_ret"] = data.sum(axis=1)
start_date = data.index[0]
end_date = data.index[-1] + pd.DateOffset(weeks=1)
# Create P/E, P/S, NI/Adj to Sales plots (to know where we are)
pe_fig = plot_line(
best_pe.loc[START_DATE:, index].rename(f"{index} Forward P/E Ratio"),
title="",
y_title="P/E Ratio",
logo=False,
default_x_range=(start_date, end_date),
figsize=(800, 500),
)
# psales_fig = plot_line(
# best_sps.loc[START_DATE:, index].rename(f"{index} Forward P/S Ratio"),
# title="",
# y_title="P/S Ratio",
# logo=False,
# default_x_range=(start_date, end_date),
# figsize=(500, 500),
# )
# ni_adj_fig = plot_line(
# best_ni_adj_to_sales.loc[START_DATE:, index].rename(
# f"{index} Forward NI/Adj to Sales"
# ),
# title="",
# logo=False,
# default_x_range=(start_date, end_date),
# y_title="NI/Adj to Sales",
# figsize=(500, 500),
# )
# Create normal returns for hover
data_normal = data.apply(np.expm1)
# data = data.apply(np.expm1)
fig_decomp = plot_area(
data,
title="",
summary_line=True,
tick_format=".1%",
exclude_from_stack="tot_ret",
source="Bloomberg, Kate Capital",
y_title="Log Returns",
figsize=(800, 500),
default_x_range=(start_date, end_date),
logo=False,
)
# Update hover templates to show normal returns
for trace in fig_decomp.data:
col_name = trace.name
if col_name in data_normal.columns:
trace.customdata = data_normal[col_name].values.reshape(-1, 1)
trace.hovertemplate = (
f"<b>{col_name}</b><br>"
"Date: %{x}<br>"
"Normal Return: %{customdata[0]:.2%}<br>"
"Log Return: %{y:.2%}<extra></extra>"
)
fig_decomp.update_yaxes(
title_text="Log Returns",
)
with columns(2, gap="10px", vertical_alignment="bottom") as cols:
cols[0].header(f"{names[index]} YTD Decomposition")
cols[0].plot(fig_decomp)
cols[1].plot(pe_fig)
# ===========================
ERP = 0.02
DIV_PO = 0.5
FIELDS_OF_INTEREST = [
"BEST_SALES",
"TRAIL_12M_SALES_PER_SH",
"BEST_EBITDA",
"TRAIL_12M_EBITDA_PER_SHARE",
"BEST_EPS",
"TRAIL_12M_EPS",
]
with open("equity_indices.yaml") as f:
data = yaml.safe_load(f)
EQUITY_INDICES = {
k: [v["index"], v["nominal_bond"], v["breakeven"]]
for k, v in data["equity_indices"].items()
}
equity_indices = [v[0] for v in EQUITY_INDICES.values()]
equity_df = fetch_equity_data(
tuple(equity_indices), pd.Timestamp("1980-01-01"), pd.Timestamp.today()
)
with BQuery() as bq:
return_df = bq.bdh(
equity_indices,
[
"PX_LAST",
"TOT_RETURN_INDEX_GROSS_DVDS",
"BEST_PE_RATIO",
"BEST_PX_SALES_RATIO",
"BEST_NI_ADJ_TO_SALES",
],
start_date=pd.Timestamp("1980-01-01"),
end_date=pd.Timestamp.today(),
)
names = (
bq.bdp(equity_indices, ["NAME"])
.to_pandas()
.set_index("security")
.to_dict()["NAME"]
)
total_return_index = (
return_df.to_pandas()
.set_index(["security", "date"])
.unstack("security")
.xs("TOT_RETURN_INDEX_GROSS_DVDS", axis=1)
.ffill()
)
log_tot_ret = (
np.log1p(total_return_index.pct_change()).stack().rename("log_tot_ret").swaplevel()
)
price_index = (
return_df.to_pandas()
.set_index(["security", "date"])
.unstack("security")
.xs("PX_LAST", axis=1)
.ffill()
)
log_prc_ret = (
np.log1p(price_index.pct_change()).stack().rename("log_prc_ret").swaplevel()
)
best_pe = (
return_df.to_pandas()
.set_index(["security", "date"])
.unstack("security")
.xs("BEST_PE_RATIO", axis=1)
.ffill()
)
best_sps = (
return_df.to_pandas()
.set_index(["security", "date"])
.unstack("security")
.xs("BEST_PX_SALES_RATIO", axis=1)
.ffill()
)
best_ni_adj_to_sales = (
return_df.to_pandas()
.set_index(["security", "date"])
.unstack("security")
.xs("BEST_NI_ADJ_TO_SALES", axis=1)
.ffill()
)
equity_df = equity_df.join(log_tot_ret).join(log_prc_ret)
equity_df["log_div_ret"] = equity_df["log_tot_ret"] - equity_df["log_prc_ret"]
monthly_equity_df = equity_df.groupby(
["security", pd.Grouper(level="date", freq="ME")]
).agg(
{
"BEST_SALES": "last",
"TRAIL_12M_SALES_PER_SH": "last",
"BEST_EBITDA": "last",
"TRAIL_12M_EBITDA_PER_SHARE": "last",
"BEST_EPS": "last",
"TRAIL_12M_EPS": "last",
"log_tot_ret": "sum",
"log_prc_ret": "sum",
"log_div_ret": "sum",
},
)
# Changes are in log space
# Replace non-positive values with NaN before taking log to avoid RuntimeWarning
fields_data = monthly_equity_df[FIELDS_OF_INTEREST]
monthly_equity_df[FIELDS_OF_INTEREST] = np.log(fields_data.where(fields_data > 0))
monthly_equity_df[FIELDS_OF_INTEREST] = monthly_equity_df.groupby(
"security", as_index=False
)[FIELDS_OF_INTEREST].diff()
# Rename columns as they are now changes
monthly_equity_df = monthly_equity_df.rename(
columns={
"BEST_SALES": "fwd_sales_chg",
"TRAIL_12M_SALES_PER_SH": "ttm_sales_chg",
"BEST_EBITDA": "fwd_ebitda_chg",
"TRAIL_12M_EBITDA_PER_SHARE": "ttm_ebitda_chg",
"BEST_EPS": "fwd_eps_chg",
"TRAIL_12M_EPS": "ttm_eps_chg",
},
)