from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline.data import USEquityPricing
from quantopian.pipeline.filters import QTradableStocksUS
from collections import OrderedDict
import pandas as pd
import numpy as np
from quantopian.pipeline.data import Fundamentals
from quantopian.pipeline.data import morningstar
from quantopian.pipeline import CustomFactor
cfs = morningstar.cash_flow_statement # or replace with simply Fundamentals with category cash_flow_statement not needed.
class Capex_To_Cashflows(CustomFactor):
#inputs=[cfs.capital_expenditure.latest,cfs.free_cash_flow.latest]
inputs=[cfs.capital_expenditure, cfs.free_cash_flow]
window_length=10
def compute(self, today, assets, out, capital_expenditure, free_cash_flow):
out[:] = (capital_expenditure[-1] * 4.) / (free_cash_flow[-1] * 4.)
class Capex_To_Cashflows_forwardfill(CustomFactor):
#inputs=[cfs.capital_expenditure.latest,cfs.free_cash_flow.latest]
inputs=[cfs.capital_expenditure, cfs.free_cash_flow]
window_length=10
def compute(self, today, assets, out, capital_expenditure, free_cash_flow):
capital_expenditure = nanfill(capital_expenditure)
free_cash_flow = nanfill(free_cash_flow)
out[:] = (capital_expenditure[-1] * 4.) / (free_cash_flow[-1] * 4.)
#out[:] = nanfill((capital_expenditure * 4.) / (free_cash_flow * 4.))
def initialize(context):
attach_pipeline(make_ml_pipeline(), 'pipe')
def before_trading_start(context, data):
out = pipeline_output('pipe')
#print out.sort_values(by='Capex_To_Cashflows_filled', ascending=True) # cropped
count = 0
for s in out.index: # When nan or not equal, log with the lines before and after.
if out.loc[s].Capex_To_Cashflows != out.loc[s].Capex_To_Cashflows_filled:
log.info('.\n{}'.format(out[count-1 : count+2]))
count += 1
do_log_preview = 1 # a way to toggle this off when it becomes annoying
if do_log_preview:
try: context.log_data_done
except: log_data(context, out, 9) # show pipe info once
def make_ml_pipeline():
wndw = 252 # even with a very high window_length, the problem is still there sometimes ...
'''
Capex_To_Cashflows Capex_To_Cashflows_filled
Equity(755 [BC]) 0.525970 0.525970
Equity(766 [BCE]) NaN 0.525970
Equity(794 [BDX]) -0.332198 -0.332198
2019-07-03 05:45 before_trading_start:46 INFO .
Capex_To_Cashflows Capex_To_Cashflows_filled
Equity(980 [BMY]) -0.172007 -0.172007
Equity(1010 [BNS]) NaN -0.172007
Equity(1023 [BOH]) -2.092683 -2.092683
2019-07-03 05:45 before_trading_start:46 INFO .
Capex_To_Cashflows Capex_To_Cashflows_filled
Equity(1385 [CDNS]) -0.08978 -0.089780
Equity(1402 [CEF]) NaN 3.365413
Equity(1419 [CERN]) -1.77185 -1.771850
2019-07-03 05:45 before_trading_start:46 INFO .
Capex_To_Cashflows Capex_To_Cashflows_filled
Equity(1637 [CMCS_A]) -0.574695 -0.574695
Equity(1655 [CMO]) NaN -55.563744
Equity(1665 [CMS]) -4.660550 -4.660550
2019-07-03 05:45 before_trading_start:46 INFO .
Capex_To_Cashflows Capex_To_Cashflows_filled
Equity(1789 [COT]) 10.440000 10.440000
Equity(1792 [CP]) NaN 10.440000
Equity(1795 [CPB]) -0.336283 -0.336283
'''
fltr = QTradableStocksUS()
return Pipeline(
screen = fltr,
columns = {
'Capex_To_Cashflows' : Capex_To_Cashflows (window_length=wndw, mask=fltr),
'Capex_To_Cashflows_filled': Capex_To_Cashflows_forwardfill(window_length=wndw, mask=fltr),
}
)
def nanfill(arr):
# https://www.quantopian.com/posts/forward-filling-nans-in-pipeline-custom-factors
mask = np.isnan(arr)
idx = np.where(~mask,np.arange(mask.shape[1]),0)
np.maximum.accumulate(idx,axis=1, out=idx)
arr[mask] = arr[np.nonzero(mask)[0], idx[mask]]
return arr
def log_data(context, z, num, fields=None):
''' Log info about pipeline output or, z can be any DataFrame or Series
https://quantopian.com/posts/overview-of-pipeline-content-easy-to-add-to-your-backtest
'''
if not len(z):
log.info('Empty pipe')
return
try: context.log_data_done
except:
log.info('starting_cash ${:,}'.format(int(context.portfolio.cash)))
context.log_data_done = 1
# Options
log_nan_only = 0 # Only log if nans are present.
show_sectors = 0 # If sectors, see them or not.
show_sorted_details = 1 # [num] high & low securities sorted, each column.
padmax = 6 # num characters for each field, starting point.
def out(lines): # log data lines of output efficiently
buffer_len = 1024 # each group
chunk = ':'
for line in lines:
if line is None or not len(line):
continue # skip if empty string for example
if len(chunk) + len(line) < buffer_len:
# Add to chunk if will still be under buffer_len
chunk += '\n{}'.format(line)
else: # Or log chunk and start over with new line.
log.info(chunk)
chunk = ':\n{}'.format(line)
if len(chunk) > 2: # if anything remaining
log.info(chunk)
if 'dict' in str(type(z)):
log.info('Not set up to handle a dictionary, only dataframe & series, bailing out of log_data()')
return
elif 'MultiIndex' in str(type(z.index)):
log.info('Found MultiIndex, not set up to handle it, bailing out of log_data()')
return
# Change index to just symbols for readability, meanwhile, right-aligned
z = z.rename(index=dict(zip(z.index.tolist(), [i.symbol.rjust(6) for i in z.index.tolist()])))
# Series ......
if 'Series' in str(type(z)): # is Series, not DataFrame
nan_count = len(z[z != z])
nan_count = 'NaNs {}/{}'.format(nan_count, len(z)) if nan_count else ''
if (log_nan_only and nan_count) or not log_nan_only:
pad = max( padmax, len('%.5f' % z.max()) )
log.info('{}{}{} Series len {}'.format('min'.rjust(pad+5),
'mean'.rjust(pad+5), 'max'.rjust(pad+5), len(z)))
log.info('{}{}{} {}'.format(
('%.5f' % z.round(6). min()).rjust(pad+5),
('%.5f' % z.round(6).mean()).rjust(pad+5),
('%.5f' % z.round(6). max()).rjust(pad+5),
nan_count
))
log.info('High\n{}'.format(z.sort_values(ascending=False).head(num)))
log.info('Low\n{}' .format(z.sort_values(ascending=False).tail(num)))
return
# DataFrame ......
content_min_max = [ ['','min','mean','max',''] ] ; content = []
for col in z.columns:
try: z[col].max()
except: continue # skip non-numeric
if col == 'sector' and not show_sectors: continue
nan_count = len(z[col][z[col] != z[col]])
nan_count = 'NaNs {}/{}'.format(nan_count, len(z)) if nan_count else ''
padmax = max( padmax, len(str(z[col].max())) ) ; mean_ = ''
if len(str(z[col].max())) > 8 and 'float' in str(z[col].dtype):
z[col] = z[col].round(6) # Reduce number of decimal places for floating point values
if 'float' in str(z[col].dtype): mean_ = str(round(z[col].mean(), 6))
elif 'int' in str(z[col].dtype): mean_ = str(round(z[col].mean(), 1))
content_min_max.append([col, str(z[col] .min()), mean_, str(z[col] .max()), nan_count])
if log_nan_only and nan_count or not log_nan_only:
log.info('Rows: {} Columns: {}'.format(z.shape[0], z.shape[1]))
if len(z.columns) == 1: content.append('Rows: {}'.format(z.shape[0]))
paddings = [6 for i in range(4)]
for lst in content_min_max: # set max lengths
i = 0
for val in lst[:4]: # value in each sub-list
paddings[i] = max(paddings[i], len(str(val)))
i += 1
headr = content_min_max[0]
content.append(('{}{}{}{}{}'.format(
headr[0] .rjust(paddings[0]),
(headr[1]).rjust(paddings[1]+5),
(headr[2]).rjust(paddings[2]+5),
(headr[3]).rjust(paddings[3]+5),
''
)))
for lst in content_min_max[1:]: # populate content using max lengths
content.append(('{}{}{}{} {}'.format(
lst[0].rjust(paddings[0]),
lst[1].rjust(paddings[1]+5),
lst[2].rjust(paddings[2]+5),
lst[3].rjust(paddings[3]+5),
lst[4],
)))
out(content)
if not show_sorted_details: return
if len(z.columns) == 1: return # skip detail if only 1 column
if fields == None: details = z.columns
content = []
for detail in details:
if detail == 'sector' and not show_sectors: continue
hi = z[details].sort_values(by=detail, ascending=False).head(num)
lo = z[details].sort_values(by=detail, ascending=False).tail(num)
content.append(('_ _ _ {} _ _ _' .format(detail)))
content.append(('{} highs ...\n{}'.format(detail, str(hi))))
content.append(('{} lows ...\n{}'.format(detail, str(lo))))
if log_nan_only and not len(lo[lo[detail] != lo[detail]]):
continue # skip if no nans
out(content)