Welcome to the Quantopian Pipeline Tutorial! This tutorial serves as an introduction to the Pipeline API. If you are new to Quantopian, it is recommended that you start with the Getting Started Tutorial and have at least a working knowledge of Python. This tutorial is divided into a series of lessons, with each one focusing on a different part of the Pipeline API. Lessons 2-11 will operate in the research environment while lesson 12 will be done in the IDE.
Why Pipeline?
Many trading algorithms have the following structure:
1. For each asset in a known (large) set, compute N scalar values for the asset based on a trailing window of data.
2. Select a smaller tradeable set of assets based on the values computed in (1).
3. Calculate desired portfolio weights on the set of assets selected in (2).
4. Place orders to move the algorithm’s current portfolio allocations to the desired weights computed in (3).
There are several technical challenges with doing this robustly. These include:
• efficiently querying large sets of assets
• performing computations on large sets of assets
• handling adjustments (splits and dividends)
• asset delistings
Pipeline exists to solve these challenges by providing a uniform API for expressing computations on a diverse collection of datasets.
Research and the IDE
An ideal algorithm design workflow involves a research phase and an implementation phase. In the research environment, we can interact with data or quickly iterate on different ideas in a notebook. Algorithms are implemented in the IDE where they can be backtested.
One feature of the Pipeline API is that constructing a pipeline is identical in research and the IDE. The only difference between using pipeline in the two environments is how it is run. This makes it easy to iterate on a pipeline design in research and then move it with a simple copy paste to an algorithm in the IDE. This is a workflow that will be discussed explicitly in later lessons but will be observed throughout the tutorial.
Computations
There are three types of computations that can be expressed in a pipeline: factors, filters, and classifiers.
Abstractly, factors, filters, and classifiers all represent functions that produce a value from an asset and a moment in time. Factors, filters, and classifiers are distinguished by the types of values they produce.
Factors
A factor is a function from an asset and a moment in time to a numerical value.
A simple example of a factor is the most recent price of a security. Given a security and a specific point in time, the most recent price is a number. Another example is the 10-day average trading volume of a security. Factors are most commonly used to assign values to securities which can then be used in a number of ways. A factor can be used in each of the following procedures:
• computing target weights
• generating alpha signal
• constructing other, more complex factors
• constructing filters
Filters
A filter is a function from an asset and a moment in time to a boolean.
An example of a filter is a function indicating whether a security's price is below \$10. Given a security and a point in time, this evaluates to either True or False. Filters are most commonly used for describing sets of assets to include or exclude for some particular purpose.
Classifiers
A classifier is a function from an asset and a moment in time to a categorical output.
More specifically, a classifier produces a string or an int that doesn't represent a numerical value (e.g. an integer label such as a sector code). Classifiers are most commonly used for grouping assets for complex transformations on Factor outputs. An example of a classifier is the exchange on which an asset is currently being traded.
Datasets
Pipeline computations can be performed using a variety of data such as pricing (OHLC) and volume data, fundamentals and consensus estimates. We will explore these datasets in later lessons.
A typical pipeline usually involves multiple computations and datasets. In this tutorial, we will build up to a pipeline that selects liquid securities with large changes between their 10-day and 30-day average prices.
Lessons 2-11 will be conducted in the research environment. To get set up in research, create a new Jupyter notebook or clone the notebook version of this lesson by clicking Get Notebook. If you're not yet familiar with research, we recommend that you go through the Introduction to Research lecture or check out the documentation.
Creating a Pipeline
Let's start by adding some import statements. First let's import the Pipeline class:
```from quantopian.pipeline import Pipeline
```
In a new cell, let's define a function to create our pipeline. Wrapping our pipeline creation in a function sets up a structure for more complex pipelines that we will see later on. For now, this function simply returns an empty pipeline:
```def make_pipeline():
return Pipeline()```
In a new cell, let's instantiate our pipeline by running make_pipeline():
```my_pipe = make_pipeline()
```
Running a Pipeline
Now that we have a reference to an empty Pipeline, my_pipe let's run it to see what it looks like. Before running our pipeline, we first need to import run_pipeline, a research-only function that allows us to run a pipeline over a specified time period.
```from quantopian.research import run_pipeline
```
Let's run our pipeline for one day (2015-05-05) with run_pipeline and display it. Note that the 2nd and 3rd arguments are the start and end dates of the simulation, respectively.
`result = run_pipeline(my_pipe, '2015-05-05', '2015-05-05')`
A call to run_pipeline returns a pandas DataFrame indexed by date and securities. Let's see what the first few rows of our empty pipeline look like:
`result.head()`
The output of an empty pipeline is a DataFrame with no columns. In this example, our pipeline has an index made up of all 8000+ securities (only 5 rows displayed in the image) for May 5th, 2015, but doesn't have any columns.
In the following lessons, we'll take a look at how to add columns to our pipeline output, and how to filter down to a subset of securities.
Factors
A factor is a function from an asset and a moment in time to a number.
In Pipeline, Factors are the most commonly-used term, representing the result of any computation producing a numerical result. Factors require a column of data as well as a window length as input.
The simplest factors in Pipeline are built-in Factors. Built-in Factors are pre-built to perform common computations. As a first example, let's make a factor that computes the average close price of each asset over a trailing 10-day window. We can use the SimpleMovingAverage built-in factor which computes the average value of the input data (close price) over the specified window length (10 days). To do this, we need to import our built-in SimpleMovingAverage factor and the USEquityPricing dataset.
```from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.factors import SimpleMovingAverage
```
Creating a Factor
Let's go back to our make_pipeline function from the previous lesson and instantiate a SimpleMovingAverage factor. To create a SimpleMovingAverage factor, we can call the SimpleMovingAverage constructor with two arguments: inputs, which must be a list of BoundColumn objects, and window_length, which must be an integer indicating how many days worth of data our moving average calculation should receive. (We'll discuss BoundColumn in more depth later; for now we just need to know that a BoundColumn is an object indicating what kind of data should be passed to our Factor.).
The following line creates a Factor for computing the 10-day mean close price of securities.
```mean_close_10 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=10
)```
It's important to note that creating the factor does not actually perform a computation. Creating a factor is like defining the function. To perform a computation, we need to add the factor to our pipeline and run it.
Adding a Factor to a Pipeline
Let's update our original empty pipeline to make it compute our new moving average factor. To start, let's move our factor instantatiation into make_pipeline. Next, we can tell our pipeline to compute our factor by passing it a columns argument, which should be a dictionary mapping column names to factors, filters, or classifiers. Our updated make_pipeline function should look something like this:
```def make_pipeline():

mean_close_10 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=10
)

return Pipeline(
columns={
'10_day_mean_close': mean_close_10
}
)```
To see what this looks like, let's make our pipeline, run it, and display the first few rows of the result.
```result = run_pipeline(make_pipeline(), '2015-05-05', '2015-05-05')
Now we have a column in our pipeline output with the 10-day average close price for all 8000+ securities (display truncated). Note that each row corresponds to the result of our computation for a given security on a given date stored. The DataFrame has a MultiIndex where the first level is a datetime representing the date of the computation and the second level is an Equity object corresponding to the security. For example, the first row in the above DataFrame (`2015-05-05 00:00:00+00:00`, `Equity(2 [AA])`) contains the result of our mean_close_10 factor for AA on May 5th, 2015.
Note: factors can also be added to an existing Pipeline instance using the Pipeline.add method. Using add looks something like this:
```>>> my_pipe = Pipeline()
>>> f1 = SomeFactor(...)
```
Latest
The most commonly used built-in Factor is Latest. The Latest factor gets the most recent value of a given data column. This factor is common enough that it is instantiated differently from other factors. The best way to get the latest value of a data column is by getting its .latest attribute. As an example, let's update make_pipeline to create a latest close price factor and add it to our pipeline:
```def make_pipeline():

mean_close_10 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=10
)
latest_close = USEquityPricing.close.latest

return Pipeline(
columns={
'10_day_mean_close': mean_close_10,
'latest_close_price': latest_close
}
)```
And now, when we make and run our pipeline again, there are two columns in our output dataframe. One column has the 10-day mean close price of each security, and the other has the latest close price.
```result = run_pipeline(make_pipeline(), '2015-05-05', '2015-05-05')
.latest can sometimes return things other than Factors. We'll see examples of other possible return types in later lessons.
Default Inputs
Some factors have default inputs that should never be changed. For example the VWAP built-in factor is always calculated from USEquityPricing.close and USEquityPricing.volume. When a factor is always calculated from the same BoundColumns, we can call the constructor without specifying inputs.
```from quantopian.pipeline.factors import VWAP
vwap = VWAP(window_length=10)
```
In the next lesson, we will look at combining factors.
Combining Factors
Factors can be combined, both with other Factors and with scalar values, via any of the builtin mathematical operators (+, -, *, etc). This makes it easy to write complex expressions that combine multiple Factors. For example, constructing a Factor that computes the average of two other Factors is simply:
```>>> f1 = SomeFactor(...)
>>> f2 = SomeOtherFactor(...)
>>> average = (f1 + f2) / 2.0```
In this lesson, we will create a pipeline that creates a percent_difference factor by combining a 10-day average price factor and a 30-day one. Let's start out by making the two factors.
```mean_close_10 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=10
)
mean_close_30 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=30
)```
Then, let's create a percent difference factor by combining our mean_close_30 factor with our mean_close_10 factor.
`percent_difference = (mean_close_10 - mean_close_30) / mean_close_30`
In this example, percent_difference is still a Factor even though it's composed as a combination of more primitive factors. We can add percent_difference as a column in our pipeline. Let's define make_pipeline to create a pipeline with percent_difference as a column (and not the mean close factors):
```def make_pipeline():

mean_close_10 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=10
)
mean_close_30 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=30
)

percent_difference = (mean_close_10 - mean_close_30) / mean_close_30

return Pipeline(
columns={
'percent_difference': percent_difference
}
)```
Let's see what the new output looks like.
```result = run_pipeline(make_pipeline(), '2015-05-05', '2015-05-05')
In the next lesson, we will learn about filters.
Filters
A filter is a function from an asset and a moment in time to a boolean:
In Pipeline, Filters are used for narrowing down the set of securities included in a computation or in the final output of a pipeline. There are two common ways to create a Filter: comparison operators and Factor/Classifier methods.
Comparison Operators
Comparison operators on Factors and Classifiers produce Filters. Since we haven't looked at `Classifiers` yet, let's stick to examples using Factors. The following example produces a filter that returns True whenever the latest close price is above \$20.
```last_close_price = USEquityPricing.close.latest
close_price_filter = last_close_price > 20```
And this example produces a filter that returns True whenever the 10-day mean is below the 30-day mean.
```mean_close_10 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=10
)
mean_close_30 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=30
)
mean_crossover_filter = mean_close_10 < mean_close_30```
Remember, a filter produces a True or False value for each security every day.
Factor/Classifier Methods
Various methods of the Factor and Classifier classes return Filters. Again, since we haven't yet looked at Classifiers, let's stick to Factor methods for now (we'll look at Classifier methods later). The Factor.top(n) method produces a Filter that returns True for the top n securities of a given factor each day. The following example produces a filter that returns True for exactly 200 securities every day, indicating that those securities were in the top 200 by last close price across all known securities.
```last_close_price = USEquityPricing.close.latest
top_close_price_filter = last_close_price.top(200)```
For a full list of Factor methods that return Filters, see this link.
For a full list of Classifier methods that return Filters, see this link.
Dollar Volume Filter
Let's create a filter that returns True if a security's 30-day average dollar volume is above \$10,000,000. To do this, we'll first need to create an AverageDollarVolume factor to compute the 30-day average dollar volume.
To import the built-in AverageDollarVolume filter, we can add to the line that we used to import SimpleMovingAverage.
`from quantopian.pipeline.factors import AverageDollarVolume, SimpleMovingAverage`
And then we can instantiate the Factor.
` dollar_volume = AverageDollarVolume(window_length=30)`
By default, AverageDollarVolume uses USEquityPricing.close and USEquityPricing.volume as its inputs, so we don't specify them.
Now that we have a dollar volume factor, we can create a filter with a boolean expression. The following line creates a filter returning True for securities with a dollar_volume greater than 10,000,000:
`high_dollar_volume = (dollar_volume > 10000000)`
To see what this filter looks like, we can add it as a column to our pipeline.
```def make_pipeline():

mean_close_10 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=10
)
mean_close_30 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=30
)

percent_difference = (mean_close_10 - mean_close_30) / mean_close_30

dollar_volume = AverageDollarVolume(window_length=30)

high_dollar_volume = (dollar_volume > 10000000)

return Pipeline(
columns={
'percent_difference': percent_difference,
'high_dollar_volume': high_dollar_volume
},
)
```
If we make and run our pipeline, we now have a column high_dollar_volume with a boolean value corresponding to the result of the expression for each security.
```result = run_pipeline(make_pipeline(), '2015-05-05', '2015-05-05')
Applying a Screen
By default, a pipeline produces computed values each day for every asset in the Quantopian database. Very often however, we only care about a subset of securities that meet specific criteria (for example, we might only care about securities that have enough daily trading volume to fill our orders quickly). We can tell our Pipeline to ignore securities for which a filter produces False by passing that filter to our Pipeline via the screen keyword.
To screen our pipeline output for securities with a 30-day average dollar volume greater than \$10,000,000, we can simply pass our high_dollar_volume filter as the screen argument. This is what our make_pipeline function now looks like:
```def make_pipeline():

mean_close_10 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=10
)
mean_close_30 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=30
)

percent_difference = (mean_close_10 - mean_close_30) / mean_close_30

dollar_volume = AverageDollarVolume(window_length=30)
high_dollar_volume = (dollar_volume > 10000000)

return Pipeline(
columns={
'percent_difference': percent_difference
},
screen=high_dollar_volume
)```
Running this will produce an output for only the securities that passed the high_dollar_volume on a given day. For example, running this pipeline on May 5th, 2015 results in an output for ~2,100 securities.
```result = run_pipeline(make_pipeline(), '2015-05-05', '2015-05-05')
print('Number of securities that passed the filter: %d' % len(result))```
Inverting a Filter
The ~ operator is used to invert a filter, swapping all True values with Falses and vice-versa. For example, we can write the following to filter for low dollar volume securities:
`low_dollar_volume = ~high_dollar_volume`
This will return True for all securities with an average dollar volume below or equal to \$10,000,000 over the last 30 days.
In the next lesson, we will look at combining filters.
Combining Filters
Like factors, filters can be combined. Combining filters is done using the & (and) and | (or) operators. For example, let's say we want to screen for securities that are in the top 10% of average dollar volume and have a latest close price above \$20. To start, let's make a high dollar volume filter using an AverageDollarVolume factor and percentile_between:
`high_dollar_volume = dollar_volume.percentile_between(90, 100)`
Note: percentile_between is a Factor method returning a Filter.
Next, let's create a latest_close factor and define a filter for securities that closed above \$20:
```latest_close = USEquityPricing.close.latest
above_20 = latest_close > 20```
Now we can combine our high_dollar_volume filter with our above_20 filter using the & operator:
```is_tradeable = high_dollar_volume & above_20
```
This filter will evaluate to True for securities where both high_dollar_volume and above_20 are True. Otherwise, it will evaluate to False. A similar computation can be made with the | (or) operator.
If we want to use this filter as a screen in our pipeline, we can set the screen to be is_tradeable.
```def make_pipeline():

mean_close_10 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=10
)
mean_close_30 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=30
)

percent_difference = (mean_close_10 - mean_close_30) / mean_close_30

dollar_volume = AverageDollarVolume(window_length=30)
high_dollar_volume = dollar_volume.percentile_between(90, 100)

latest_close = USEquityPricing.close.latest
above_20 = latest_close > 20

return Pipeline(
columns={
'percent_difference': percent_difference
},
)
```
Running this pipeline on May 5th, 2015 outputs around 700 securities.
```result = run_pipeline(make_pipeline(), '2015-05-05', '2015-05-05')
print('Number of securities that passed the filter: %d' % len(result))```
In the next lesson, we'll look at masking factors and filters.
Sometimes we want to ignore certain assets when computing pipeline expresssions. There are two common cases where ignoring assets is useful:
1. We want to compute an expression that's computationally expensive, and we know we only care about results for certain assets. A common example of such an expensive expression is a Factor computing the coefficients of a regression (RollingLinearRegressionOfReturns).
2. We want to compute an expression that performs comparisons between assets, but we only want those comparisons to be performed against a subset of all assets. For example, we might want to use the top method of Factor to compute the top 200 assets by earnings yield, ignoring assets that don't meet some liquidity constraint.
To support these two use-cases, all Factors and many Factor methods can accept a mask argument, which must be a Filter indicating which assets to consider when computing.
Let's say we want our pipeline to output securities with a high or low percent difference but we also only want to consider securities with a dollar volume above \$10,000,000. To do this, let's rearrange our make_pipeline function so that we first create the high_dollar_volume filter. We can then use this filter as a mask for moving average factors by passing high_dollar_volume as the mask argument to SimpleMovingAverage.
```# Dollar volume factor
dollar_volume = AverageDollarVolume(window_length=30)

# High dollar volume filter
high_dollar_volume = (dollar_volume > 10000000)

# Average close price factors
mean_close_10 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=10,
)
mean_close_30 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=30,
)

# Relative difference factor
percent_difference = (mean_close_10 - mean_close_30) / mean_close_30
```
Applying the mask to SimpleMovingAverage restricts the average close price factors to a computation over the ~2000 securities passing the high_dollar_volume filter, as opposed to ~8000 without a mask. When we combine mean_close_10 and mean_close_30 to form percent_difference, the computation is performed on the same ~2000 securities.
Masks can be also be applied to methods that return filters like top, bottom, and percentile_between.
Masks are most useful when we want to apply a filter in the earlier steps of a combined computation. For example, suppose we want to get the 50 securities with the highest open price that are also in the top 10% of dollar volume. Suppose that we then want the 90th-100th percentile of these securities by close price. We can do this with the following:
```# Dollar volume factor
dollar_volume = AverageDollarVolume(window_length=30)

# High dollar volume filter
high_dollar_volume = dollar_volume.percentile_between(90,100)

# Top open price filter (high dollar volume securities)

# Top percentile close price filter (high dollar volume, top 50 open price)
Let's put this into make_pipeline and output an empty pipeline screened with our high_close_price filter.
```def make_pipeline():

# Dollar volume factor
dollar_volume = AverageDollarVolume(window_length=30)

# High dollar volume filter
high_dollar_volume = dollar_volume.percentile_between(90,100)

# Top open securities filter (high dollar volume securities)

# Top percentile close price filter (high dollar volume, top 50 open price)

return Pipeline(
screen=high_close_price
)
```
Running this pipeline outputs 5 securities on May 5th, 2015.
```result = run_pipeline(make_pipeline(), '2015-05-05', '2015-05-05')
print('Number of securities that passed the filter: %d' % len(result))```
Note that applying masks in layers as we did above can be thought of as an "asset funnel".
In the next lesson, we'll look at classifiers.
Classifiers
A classifier is a function from an asset and a moment in time to a categorical output such as a string or integer label:
An example of a classifier producing a string output is the exchange ID of a security. To create this classifier, we'll have to import Fundamentals.exchange_id and use the latest attribute to instantiate our classifier:
```from quantopian.pipeline.data import Fundamentals

# Since the underlying data of Fundamentals.exchange_id is of type
# string, .latest returns a Classifier
exchange = Fundamentals.exchange_id.latest
```
Previously, we saw that the latest attribute produced an instance of a Factor. In this case, since the underlying data is of type string, latest produces a Classifier.
Similarly, a computation producing the latest Morningstar sector code of a security is a Classifier. In this case, the underlying type is an int, but the integer doesn't represent a numerical value (it's a category) so it produces a classifier. To get the latest sector code, we can use the built-in Sector classifier.
```from quantopian.pipeline.classifiers.fundamentals import Sector
morningstar_sector = Sector()```
Using Sector is equivalent to Fundamentals.morningstar_sector_code.latest.
Building Filters from Classifiers
Classifiers can also be used to produce filters with methods like isnull, eq, and startswith. The full list of Classifier methods producing Filters can be found here.
As an example, if we wanted a filter to select securities trading on the New York Stock Exchange, we can use the eq method of our exchange classifier.
`nyse_filter = exchange.eq('NYS')`
This filter will return True for securities having 'NYS' as their most recent exchange_id.
Quantiles
Classifiers can also be produced from various Factor methods. The most general of these is the quantiles method, which accepts a bin count as an argument. The quantiles classifier assigns a label from 0 to (bins - 1) to every non-NaN data point in the factor output. NaNs are labeled with -1. Aliases are available for quartiles (quantiles(4)), quintiles (quantiles(5)), and deciles (quantiles(10)). As an example, this is what a filter for the top decile of a factor might look like:
```dollar_volume_decile = AverageDollarVolume(window_length=10).deciles()
top_decile = (dollar_volume_decile.eq(9))
```
Let's put each of our classifiers into a pipeline and run it to see what they look like.
```def make_pipeline():

exchange = Fundamentals.exchange_id.latest
nyse_filter = exchange.eq('NYS')

morningstar_sector = Sector()

dollar_volume_decile = AverageDollarVolume(window_length=10).deciles()
top_decile = (dollar_volume_decile.eq(9))

return Pipeline(
columns={
'exchange': exchange,
'sector_code': morningstar_sector,
'dollar_volume_decile': dollar_volume_decile
},
screen=(nyse_filter & top_decile)
)```
```result = run_pipeline(make_pipeline(), '2015-05-05', '2015-05-05')
print('Number of securities that passed the filter: %d' % len(result))
Classifiers are also useful for describing grouping keys for complex transformations on Factor outputs. Grouping operations such as demean and rank are outside the scope of this tutorial. A future tutorial will cover more advanced uses for classifiers.
In the next lesson, we'll look at the different datasets that we can use in pipeline.
When building a pipeline, we need a way to identify the inputs to our computations. The input to a pipeline computation is specified using DataSets and BoundColumns.
Datasets and BoundColumns
DataSets are simply collections of objects that tell the Pipeline API where and how to find the inputs to computations. An example of a DataSet that we have already seen is USEquityPricing.
A BoundColumn is a column of data that is concretely bound to a DataSet. Instances of BoundColumns are dynamically created upon access to attributes of DataSets. Inputs to pipeline computations must be of type BoundColumn. An example of a BoundColumn that we have already seen is USEquityPricing.close.
It is important to understand that DataSets and BoundColumns do not hold actual data. Remember that when computations are created and added to a pipeline, they don't actually perform the computation until the pipeline is run. DataSet and BoundColumns can be thought of in a similar way; they are simply used to identify the inputs of a computation. The data is populated later when the pipeline is run.
dtypes
When defining pipeline computations, we need to know the types of our inputs in order to know which operations and functions we can use. The dtype of a BoundColumn tells a computation what the type of the data will be when the pipeline is run. For example, USEquityPricing has a float dtype so a factor may perform arithmetic operations on USEquityPricing.close (e.g. compute the 5-day mean). The importance of this will become more clear in the next lesson.
The dtype of a BoundColumn can also determine the type of a computation. In the case of the latest computation, the dtype determines whether the computation is a factor (float), a filter (bool), or a classifier (string, int).
Pricing Data
US equity pricing data is stored in the USEquityPricing dataset. USEquityPricing provides five columns:
• USEquityPricing.open
• USEquityPricing.high
• USEquityPricing.low
• USEquityPricing.close
• USEquityPricing.volume
Each of these columns has a float dtype.
Fundamental Data
Quantopian provides a number of fundamental data fields sourced from Morningstar. Each field exists as a BoundColumn under the Fundamentals dataset, and there are over 900 columns available. See the Quantopian Fundamentals Reference for a full description of each of these columns.
The dtypes of the columns vary. For example, Fundamentals.market_cap is a column representing the most recently reported market cap for each asset on each date, and has a float dtype.
Partner Data
Many datasets besides USEquityPricing and Morningstar fundamentals are available on Quantopian. These include consensus estimates data, news sentiment, and more. Most datasets are namespaced by provider under quantopian.pipeline.data.
Similar to USEquityPricing, the other datasets have columns (BoundColumns) that can be used in pipeline computations. The columns, along with example pipelines that use the data can be found on the Data Reference. The dtypes of the columns vary.
BoundColumns are most commonly used in CustomFactors which we will explore in the next lesson.
Custom Factors
When we first looked at factors, we explored the set of built-in factors. Frequently, a desired computation isn't included as a built-in factor. One of the most powerful features of the Pipeline API is that it allows us to define our own custom factors. When a desired computation doesn't exist as a built-in, we define a custom factor.
Conceptually, a custom factor is identical to a built-in factor. It accepts inputs, window_length, and mask as constructor arguments, and returns a Factor object each day.
Let's take an example of a computation that doesn't exist as a built-in: standard deviation. To create a factor that computes the standard deviation over a trailing window, we can subclass quantopian.pipeline.CustomFactor and implement a compute method whose signature is:
```def compute(self, today, asset_ids, out, *inputs):
...
```
• *inputs are M x N numpy arrays, where M is the window_length and N is the number of securities (usually around ~8000 unless a mask is provided). *inputs are trailing data windows. Note that there will be one M x N array for each BoundColumn provided in the factor's inputs list. The data type of each array will be the dtype of the corresponding BoundColumn.
• out is an empty array of length N. out will be the output of our custom factor each day. The job of compute is to write output values into out.
• asset_ids will be an integer array of length N containing security ids corresponding to the columns in our *inputs arrays.
• today will be a pandas Timestamp representing the day for which compute is being called.
Of these, *inputs and out are most commonly used.
An instance of CustomFactor that’s been added to a pipeline will have its compute method called every day. For example, let's define a custom factor that computes the standard deviation of the close price over the last 5 days. To start, let's add CustomFactor and numpy to our import statements.
```from quantopian.pipeline import CustomFactor, Pipeline
import numpy```
Next, let's define our custom factor to calculate the standard deviation over a trailing window using numpy.nanstd:
```class StdDev(CustomFactor):
def compute(self, today, asset_ids, out, values):
# Calculates the column-wise standard deviation, ignoring NaNs
out[:] = numpy.nanstd(values, axis=0)
```
Finally, let's instantiate our factor in make_pipeline():
```def make_pipeline():
std_dev = StdDev(
inputs=[USEquityPricing.close],
window_length=5
)

return Pipeline(
columns={
'std_dev': std_dev
}
)
```
When this pipeline is run, StdDev.compute() will be called every day with data as follows:
• values: An M x N numpy array, where M is 5 (window_length), and N is ~8000 (the number of securities in our database on the day in question).
• out: An empty array of length N (~8000). In this example, the job of compute is to populate out with an array storing of 5-day close price standard deviations.
```result = run_pipeline(make_pipeline(), '2015-05-05', '2015-05-05')
Default Inputs
When writing a custom factor, we can set default inputs and window_length in our CustomFactor subclass. For example, let's define the TenDayMeanDifference custom factor to compute the mean difference between two data columns over a trailing window using numpy.nanmean. Let's set the default inputs to [USEquityPricing.close, USEquityPricing.open] and the default window_length to 10:
```class TenDayMeanDifference(CustomFactor):
# Default inputs.
inputs = [USEquityPricing.close, USEquityPricing.open]
window_length = 10
def compute(self, today, asset_ids, out, close, open):
# Calculates the column-wise mean difference, ignoring NaNs
out[:] = numpy.nanmean(close - open, axis=0)
```
Remember in this case that close and open are each 10 x ~8000 2D numpy arrays.
Now, if we call TenDayMeanDifference without providing any arguments, it will use the defaults.
```# Computes the 10-day mean difference between the daily open and close prices.
close_open_diff = TenDayMeanDifference()```
The defaults can be manually overridden by specifying arguments in the constructor call.
```# Computes the 10-day mean difference between the daily high and low prices.
high_low_diff = TenDayMeanDifference(inputs=[USEquityPricing.high, USEquityPricing.low])```
Further Example
Let's take another example where we build a momentum custom factor and use it to create a filter. We will then use that filter as a screen on our pipeline.
Let's start by defining a Momentum factor to be the division of the most recent close price by the close price from n days ago where n is the window_length.
```class Momentum(CustomFactor):
# Default inputs
inputs = [USEquityPricing.close]

# Compute momentum
def compute(self, today, assets, out, close):
out[:] = close[-1] / close[0]
```
Now, let's instantiate our Momentum factor (twice) to create a 10-day momentum factor and a 20-day momentum factor. Let's also create a positive_momentum filter returning True for securities with both a positive 10-day momentum and a positive 20-day momentum.
```ten_day_momentum = Momentum(window_length=10)
twenty_day_momentum = Momentum(window_length=20)

positive_momentum = ((ten_day_momentum > 1) & (twenty_day_momentum > 1))
```
Next, let's add our momentum factors and our positive_momentum filter to make_pipeline. Let's also set positive_momentum to be the screen of our pipeline.
```def make_pipeline():

ten_day_momentum = Momentum(window_length=10)
twenty_day_momentum = Momentum(window_length=20)

positive_momentum = ((ten_day_momentum > 1) & (twenty_day_momentum > 1))

std_dev = StdDev(
inputs=[USEquityPricing.close],
window_length=5
)

return Pipeline(
columns={
'std_dev': std_dev,
'ten_day_momentum': ten_day_momentum,
'twenty_day_momentum': twenty_day_momentum
},
screen=positive_momentum
)
```
Running this pipeline outputs the standard deviation and each of our momentum computations for securities with positive 10-day and 20-day momentum.
```result = run_pipeline(make_pipeline(), '2015-05-05', '2015-05-05')
Custom factors allow us to define custom computations in a pipeline. They are frequently the best way to perform computations on multiple data columns. More information on CustomFactors is available here.
In the next lesson, we'll use everything we've learned so far to create a pipeline for an algorithm.
Putting It All Together
Now that we've covered the basic components of the Pipeline API, let's construct a pipeline that we might want to use in an algorithm.
To start, let's first create a filter to narrow down the types of securities coming out of our pipeline. In this example, we will create a filter to select for securities that meet the following criteria:
Why These Criteria?
Selecting for primary shares and common stock helps us to select only a single security for each company. In general, primary shares are a good representative asset of a company so we will select for these in our pipeline.
ADRs and GDRs are issuances in the US equity market for stocks that trade on other exchanges. Frequently, there is inherent risk associated with depositary receipts due to currency fluctuations so we exclude them from our pipeline.
OTC, WI, and LP securities are not tradeable with most brokers. As a result, we exclude them from our pipeline.
When it comes to ranking and comparing securities, it rarely makes sense to compare ETFs with regular stocks. ETFs are composites without fundamental data. They derive their value from a larger group of securities. To avoid comparing apples and oranges, we exclude them from our pipeline.
Creating Our Pipeline
Let's create a filter for each criterion and combine them together to create a tradeable_stocks filter. First, we need to import the Morningstar DataSet as well as the IsPrimaryShare builtin filter.
```from quantopian.pipeline.data import Fundamentals
from quantopian.pipeline.filters.fundamentals import IsPrimaryShare
```
Now we can define our filters:
```# Filter for primary share equities. IsPrimaryShare is a built-in filter.
primary_share = IsPrimaryShare()

# Equities listed as common stock (as opposed to, say, preferred stock).
# 'ST00000001' indicates common stock.
common_stock = Fundamentals.security_type.latest.eq('ST00000001')

# Non-depositary receipts. Recall that the ~ operator inverts filters,
# turning Trues into Falses and vice versa
not_depositary = ~Fundamentals.is_depositary_receipt.latest

not_otc = ~Fundamentals.exchange_id.latest.startswith('OTC')

# Not when-issued equities.
not_wi = ~Fundamentals.symbol.latest.endswith('.WI')

# Equities without LP in their name, .matches does a match using a regular
# expression
not_lp_name = ~Fundamentals.standard_name.latest.matches('.* L[. ]?P.?\$')

# Equities whose most recent Morningstar market cap is not null have
# fundamental data and therefore are not ETFs.
have_market_cap = Fundamentals.market_cap.latest.notnull()

# Filter for stocks that pass all of our previous filters.
primary_share
& common_stock
& not_depositary
& not_otc
& not_wi
& not_lp_name
& have_market_cap
)```
Note that when defining our filters, we used several Classifier methods that we haven't yet seen including notnull, startswith, endswith, and matches. Documentation on these methods is available here.
Next, let's create a filter for the top 30% of tradeable stocks by 20-day average dollar volume. We'll call this our base_universe.
```base_universe = AverageDollarVolume(window_length=20, mask=tradeable_stocks).percentile_between(70, 100)
```
Built-in Base Universe
We have just defined our own base universe to select 'tradeable' securities with high dollar volume. However, Quantopian has several built-in filters that do something similar, the best and newest of which is the QTradableStocksUS. The QTradableStocksUS is a built-in pipeline filter that selects a daily universe of stocks that are filtered in three passes and adhere to a set of criteria to yield the most liquid universe possible without any size constraints. The QTradableStocksUS therefore has no size cutoff. More detail on the selection criteria of the QTradableStocksUS can be found here).
To simplify our pipeline, let's replace what we've already written for our base_universe with the QTradableStocksUS built-in filter. First, we need to import it.
`from quantopian.pipeline.filters import QTradableStocksUS`
Then, let's set our base_universe to the QTradableStocksUS.
```base_universe = QTradableStocksUS()
```
Mean Reversion Factors
Now that we have a filter base_universe that we can use to select a subset of securities, let's focus on creating factors for this subset. For this example, let's create a pipeline for a mean reversion strategy. In this strategy, we'll look at the 10-day and 30-day moving averages (close price). Let's plan to open equally weighted long positions in the 75 securities with the least (most negative) percent difference and equally weighted short positions in the 75 with the greatest percent difference. To do this, let's create two moving average factors using our base_universe filter as a mask. Then let's combine them into a factor computing the percent difference.
```# 10-day close price average.
mean_10 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=10,
)

# 30-day close price average.
mean_30 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=30,
)

percent_difference = (mean_10 - mean_30) / mean_30```
Next, let's create filters for the top 75 and bottom 75 equities by percent_difference.
```# Create a filter to select securities to short.
shorts = percent_difference.top(75)

# Create a filter to select securities to long.
longs = percent_difference.bottom(75)```
Let's then combine shorts and longs to create a new filter that we can use as the screen of our pipeline:
```securities_to_trade = (shorts | longs)
```
Since our earlier filters were used as masks as we built up to this final filter, when we use securities_to_trade as a screen, the output securities will meet the criteria outlined at the beginning of the lesson (primary shares, non-ETFs, etc.). They will also have high dollar volume.
Finally, let's instantiate our pipeline. Since we are planning on opening equally weighted long and short positions later, the only information that we actually need from our pipeline is which securities we want to trade (the pipeline index) and whether or not to open a long or a short position. Let's add our longs and shorts filters to our pipeline and set our screen to be securities_to_trade.
```def make_pipeline():

# Base universe filter.

# 10-day close price average.
mean_10 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=10,
)

# 30-day close price average.
mean_30 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=30,
)

# Percent difference factor.
percent_difference = (mean_10 - mean_30) / mean_30

# Create a filter to select securities to short.
shorts = percent_difference.top(75)

# Create a filter to select securities to long.
longs = percent_difference.bottom(75)

# Filter for the securities that we want to trade.

return Pipeline(
columns={
'longs': longs,
'shorts': shorts
},
)
```
Running this pipeline will result in a DataFrame containing 2 columns. Each day, the columns will contain boolean values that we can use to decide whether we want to open a long or short position in each security.
```result = run_pipeline(make_pipeline(), '2015-05-05', '2015-05-05')
In the next lesson, we'll add this pipeline to an algorithm.
Moving to the IDE
So far, we have created and run a pipeline in research. Now, let's move to the IDE. To start out, let's create a skeleton algorithm, import Pipeline, and add a make_pipeline function to it that creates an empty pipeline.
```from quantopian.pipeline import Pipeline

def initialize(context):
my_pipe = make_pipeline()

def make_pipeline():
return Pipeline()
```
Attaching a Pipeline
Recall that in research, we ran make_pipeline to create an instance of our pipeline object and we used run_pipeline to run our pipeline over a specified date range. We cannot safely do this in an algorithm, we have to somehow allow the simulation to run our pipeline for us. In order for the simulation to run our pipeline, we must attach our pipeline with attach_pipeline.
The attach_pipeline function requires two arguments: a reference to our Pipeline object, and a string name for the pipeline which can be decided arbitrarily. Let's import attach_pipeline and attach our empty pipeline in our skeleton example.
```import quantopian.algorithm as algo
from quantopian.pipeline import Pipeline

def initialize(context):
my_pipe = make_pipeline()
algo.attach_pipeline(my_pipe, 'my_pipeline')

def make_pipeline():
return Pipeline()
```
Now that our pipeline is attached, it will be run once each day of the simulation. If an algorithm is backtested or live traded from Monday, June 6, 2016 to Friday, Jun 10, 2016, our pipeline will be run once on each day of the week (5 times total). Attaching our pipeline will produce a new output dataframe each day. The daily output is similar to the output of run_pipeline in research. However, the output dataframe in our algorithm will not include the date in the index, as the current simulation date is implied to be the date of our pipeline computations.
Pipeline Output
The daily output of our pipeline can be retrieved using pipeline_output in before_trading_start. pipeline_output requires the name of the attached pipeline as an argument and returns the output dataframe for the current date in the simulation. Let's import pipeline_output and modify our skeleton example to store our pipeline output in context each day.
```import quantopian.algorithm as algo
from quantopian.pipeline import Pipeline

def initialize(context):
my_pipe = make_pipeline()
algo.attach_pipeline(my_pipe, 'my_pipeline')

def make_pipeline():
return Pipeline()

# Store our pipeline output DataFrame in context.
context.output = algo.pipeline_output('my_pipeline')
```
Our skeleton example now produces an empy dataframe with 8000+ rows and 0 columns each day. The output dataframe will look like this (note that the index is no longer a MultiIndex as it was in research):
Using Our Pipeline From Research
To include the pipeline that we created in the previous lesson in our algorithm, we can simply copy the make_pipeline function that we wrote in research to our algorithm as well as the required import statements. The following will run our pipeline and store the output dataframe with 150 rows and 2 columns (longs and shorts) in context each day.
```import quantopian.algorithm as algo
from quantopian.pipeline import Pipeline
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.factors import SimpleMovingAverage

def initialize(context):
my_pipe = make_pipeline()
algo.attach_pipeline(my_pipe, 'my_pipeline')

def make_pipeline():
"""
Create our pipeline.
"""

# Base universe set to the QTradableStocksUS.

# 10-day close price average.
mean_10 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=10,
)

# 30-day close price average.
mean_30 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=30,
)

percent_difference = (mean_10 - mean_30) / mean_30

# Filter to select securities to short.
shorts = percent_difference.top(75)

# Filter to select securities to long.
longs = percent_difference.bottom(75)

# Filter for all securities that we want to trade.

return Pipeline(
columns={
'longs': longs,
'shorts': shorts
},
)

# Store our pipeline output DataFrame in context
context.output = algo.pipeline_output('my_pipeline')
```
The output from this pipeline will look something like this every day:
We can then define some functions to compute target weights and place orders for our long and short positions as specified by our pipeline output. Let's use some of the basics that we learned in the Getting Started Tutorial to implement weight calculations and ordering.
```def compute_target_weights(context, data):
"""
Compute ordering weights.
"""

# Initialize empty target weights dictionary.
# This will map securities to their target weight.
weights = {}

# If there are securities in our longs and shorts lists,
# compute even target weights for each security.
if context.longs and context.shorts:
long_weight = 0.5 / len(context.longs)
short_weight = -0.5 / len(context.shorts)
else:
return weights

# Exit positions in our portfolio if they are not
# in our longs or shorts lists.
for security in context.portfolio.positions:
if security not in context.longs and security not in context.shorts and data.can_trade(security):
weights[security] = 0

for security in context.longs:
weights[security] = long_weight

for security in context.shorts:
weights[security] = short_weight

return weights

"""
Get pipeline results.
"""

# Gets our pipeline output every day.
pipe_results = algo.pipeline_output('my_pipeline')

# Go long in securities for which the 'longs' value is True,
# and check if they can be traded.
context.longs = []
for sec in pipe_results[pipe_results['longs']].index.tolist():
context.longs.append(sec)

# Go short in securities for which the 'shorts' value is True,
# and check if they can be traded.
context.shorts = []
for sec in pipe_results[pipe_results['shorts']].index.tolist():
context.shorts.append(sec)

def my_rebalance(context, data):
"""
Rebalance weekly.
"""

# Calculate target weights to rebalance
target_weights = compute_target_weights(context, data)

# If we have target weights, rebalance our portfolio
if target_weights:
algo.order_optimal_portfolio(
objective=opt.TargetWeights(target_weights),
constraints=[],
)
```
Finally, let's put everything together. We'll specify our algorithm to rebalance weekly.
```from quantopian.algorithm import order_optimal_portfolio
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.factors import SimpleMovingAverage
import quantopian.optimize as opt

def initialize(context):
# Schedule our rebalance function to run at the start of
# each week, when the market opens.
schedule_function(
my_rebalance,
date_rules.week_start(),
time_rules.market_open()
)

# Create our pipeline and attach it to our algorithm.
my_pipe = make_pipeline()
attach_pipeline(my_pipe, 'my_pipeline')

def make_pipeline():
"""
Create our pipeline.
"""

# Base universe set to the QTradableStocksUS.

# 10-day close price average.
mean_10 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=10,
)

# 30-day close price average.
mean_30 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=30,
)

percent_difference = (mean_10 - mean_30) / mean_30

# Filter to select securities to short.
shorts = percent_difference.top(75)

# Filter to select securities to long.
longs = percent_difference.bottom(75)

# Filter for all securities that we want to trade.

return Pipeline(
columns={
'longs': longs,
'shorts': shorts
},
)

def compute_target_weights(context, data):
"""
Compute ordering weights.
"""

# Initialize empty target weights dictionary.
# This will map securities to their target weight.
weights = {}

# If there are securities in our longs and shorts lists,
# compute even target weights for each security.
if context.longs and context.shorts:
long_weight = 0.5 / len(context.longs)
short_weight = -0.5 / len(context.shorts)
else:
return weights

# Exit positions in our portfolio if they are not
# in our longs or shorts lists.
for security in context.portfolio.positions:
if security not in context.longs and security not in context.shorts and data.can_trade(security):
weights[security] = 0

for security in context.longs:
weights[security] = long_weight

for security in context.shorts:
weights[security] = short_weight

return weights

"""
Get pipeline results.
"""

# Gets our pipeline output every day.
pipe_results = pipeline_output('my_pipeline')

# Go long in securities for which the 'longs' value is True,
# and check if they can be traded.
context.longs = []
for sec in pipe_results[pipe_results['longs']].index.tolist():
context.longs.append(sec)

# Go short in securities for which the 'shorts' value is True,
# and check if they can be traded.
context.shorts = []
for sec in pipe_results[pipe_results['shorts']].index.tolist():
context.shorts.append(sec)

def my_rebalance(context, data):
"""
Rebalance weekly.
"""

# Calculate target weights to rebalance
target_weights = compute_target_weights(context, data)

# If we have target weights, rebalance our portfolio
if target_weights:
order_optimal_portfolio(
objective=opt.TargetWeights(target_weights),
constraints=[],
)

```
Note: When a pipeline is run in a backtest, the computations are performed in batches to increase overall computation speed. Because the computations are performed in batches, the performance chart will appear to pause periodically.
Congratulations on completing the Pipeline tutorial! Try designing a pipeline in research and use it in an algorithm of your own.

The material on this website is provided for informational purposes only and does not constitute an offer to sell, a solicitation to buy, or a recommendation or endorsement for any security or strategy, nor does it constitute an offer to provide investment advisory services by Quantopian.

In addition, the material offers no opinion with respect to the suitability of any security or specific investment. No information contained herein should be regarded as a suggestion to engage in or refrain from any investment-related course of action as none of Quantopian nor any of its affiliates is undertaking to provide investment advice, act as an adviser to any plan or entity subject to the Employee Retirement Income Security Act of 1974, as amended, individual retirement account or individual retirement annuity, or give advice in a fiduciary capacity with respect to the materials presented herein. If you are an individual retirement or other investor, contact your financial advisor or other fiduciary unrelated to Quantopian about whether any given investment idea, strategy, product or service described herein may be appropriate for your circumstances. All investments involve risk, including loss of principal. Quantopian makes no guarantees as to the accuracy or completeness of the views expressed in the website. The views are subject to change, and may have become unreliable for various reasons, including changes in market conditions or economic circumstances.