run_pipeline in chunks and 2 bugs

Dear Quantopian,

I wrote a convenient drop-in replacement for run_pipeline that splits the time period in chunks to avoid crashes due to high memory usage . While the code runs fine I found 2 possible bugs in the official run_pipeline, please see attached NB.

The first bug is that run_pipeline sometime returns values for dates after the requested ones. Not a severe bug but it might imply other underling issues. Also it means we cannot simply concatenate results from subsequent calls to run_pipeline when splitting a time periods in chunks because we would get duplicated rows (that in turn leads to wrong data and breaks code that expects no rows duplication).

The second bug is that the default classified Sector returns the wrong value '-1' for certain date ranges.

Finally, it would be great to make the run_pipeline in chunk the default behaviour in research as the current one doesn't have any advantage other than crashing too often ;)

Thanks!

17
13 responses

Hi Luca,

Thank you for bringing up these issues in such an easy-to-follow manner. It looks like the issue with run_pipeline is that if the end_date is set to be a non-trading day, the pipeline computes values for an extra day after the end_date. This is a bug and I've notified our engineers.

For the second issue, is it possible that problem has since been resolved? Looking at the last cell, it seems that p1 and p2 are the exact same. If i'm misunderstanding, do you think you could point me to where I should be looking in the notebook?

Disclaimer

The material on this website is provided for informational purposes only and does not constitute an offer to sell, a solicitation to buy, or a recommendation or endorsement for any security or strategy, nor does it constitute an offer to provide investment advisory services by Quantopian. In addition, the material offers no opinion with respect to the suitability of any security or specific investment. No information contained herein should be regarded as a suggestion to engage in or refrain from any investment-related course of action as none of Quantopian nor any of its affiliates is undertaking to provide investment advice, act as an adviser to any plan or entity subject to the Employee Retirement Income Security Act of 1974, as amended, individual retirement account or individual retirement annuity, or give advice in a fiduciary capacity with respect to the materials presented herein. If you are an individual retirement or other investor, contact your financial advisor or other fiduciary unrelated to Quantopian about whether any given investment idea, strategy, product or service described herein may be appropriate for your circumstances. All investments involve risk, including loss of principal. Quantopian makes no guarantees as to the accuracy or completeness of the views expressed in the website. The views are subject to change, and may have become unreliable for various reasons, including changes in market conditions or economic circumstances.

The first bug is that run_pipeline sometime returns values for dates after the requested ones.

The behavior here is that when you call run_pipeline(pipe, start_date, end_date), if start_date or end_date are not trading days, they're rolled forward to the next trading day in the US Equity calendar. This is intended, since it's the only behavior that makes it easy to run single-day pipelines (a common task when debugging or testing a new filter/factor/classifier) without the user having to know the historical trading calendar by heart.

Consider the case where a user wants to run a pipeline for the first day of 2016. The obvious thing to write here is something like run_pipeline(pipe, '2016-01-01', '2016-01-01'). As it turns out, however, the first trading day of 2016 was January 4th, so we have a few options for what we can do:

1. Return an empty dataframe.
2. Raise an exception informing the user that there are no trading days between the requested dates.
3. Roll the start and end dates to the next or previous trading day and then compute.

Option 1 is likely to just confuse users, and Option 2 forces users to know the historical trading calendar by heart in order to use run_pipeline without errors, so Option 3 seems like the most friendly behavior for a function that will be invoked interactively. Within Option 3, we have a few possible choices:

1. Roll both start_date and end_date forward.
2. Roll both start_date and end_date backward.
3. Roll start_date backward and end_date forward. (This gives the largest possible window).
4. Roll start_date forward and end_date backward. (This gives the shortest possible window).

Option (4) seems like the natural choice, but that doesn't solve the problem in the case that there are no trading days between start_date and end_date, so we'd still end up blowing up or returning an empty result in many cases.

Option (3) has the confusing behavior that run_pipeline(pipe, '2014', '2014') would return two days of data rather than just one.

That leaves Option (1) and Option (2). The choice here is more or less arbitrary. I think rolling forward (Option (1)) is slightly more intuitive because it has the nice property that run_pipeline(pipe, '2014', '2014') still gives you data from 2014, rather than rolling back to a previous year.

Disclaimer

The material on this website is provided for informational purposes only and does not constitute an offer to sell, a solicitation to buy, or a recommendation or endorsement for any security or strategy, nor does it constitute an offer to provide investment advisory services by Quantopian. In addition, the material offers no opinion with respect to the suitability of any security or specific investment. No information contained herein should be regarded as a suggestion to engage in or refrain from any investment-related course of action as none of Quantopian nor any of its affiliates is undertaking to provide investment advice, act as an adviser to any plan or entity subject to the Employee Retirement Income Security Act of 1974, as amended, individual retirement account or individual retirement annuity, or give advice in a fiduciary capacity with respect to the materials presented herein. If you are an individual retirement or other investor, contact your financial advisor or other fiduciary unrelated to Quantopian about whether any given investment idea, strategy, product or service described herein may be appropriate for your circumstances. All investments involve risk, including loss of principal. Quantopian makes no guarantees as to the accuracy or completeness of the views expressed in the website. The views are subject to change, and may have become unreliable for various reasons, including changes in market conditions or economic circumstances.

Finally, it would be great to make the run_pipeline in chunk the default behaviour in research as the current one doesn't have any advantage other than crashing too often ;)

This isn't quite true. Chunking will be slower if you have terms whose window_lengths are longer than your chunk size, because you'll end up querying for the same data more than once.

Suppose, for example, that I compute a 1-year moving average of some field for the full year of 2015. If I compute in one chunk, I'll perform exactly one load of all the data from 2014 and 2015. If I compute in two 6-month chunks, the first chunk will load all the data from 2014-01 to 2015-07, and the second chunk will load the data from 2014-07 to the end of 2015. This means that I end up double-loading the data from the middle of 2014 to the middle of 2015. The tradeoff here is that my high-water memory usage in the chunked version is lower, because I never hold the full 2-year dataset in RAM simultaneously.

All that said, chunking is definitely useful for cases where you're computing over a very long window, since pipeline doesn't do a good job of releasing intermediate computations right now, which means it has very high peak memory consumption. That could be addressed with a bit of work (basically the internal pipeline engine would have to do some reference counting to determine when it can release an intermediate result), but even with that change there are pipelines that would need to be chunked to run in research.

Overall, I'm +1 on adding a chunksize argument to run_pipeline, but we'd probably have to do some careful benchmarking to understand the performance tradeoffs.

@Jamie, the second bug can be seen with this line:

print 'mean ', (p1-p2).mean(), 'std ', (p1-p2).std()


factor mean 0.000000 std 0.000000
sector mean -0.069757 std 2.679579 <---- the values returned by the two pipelines differ

And those lines plot the differing values:

print p1[ (p1-p2).any(axis=1) ]
print p2[ (p1-p2).any(axis=1) ]


@Scott, I disagree with the implementation decision but I understand your point. All in all you can't make everybody happy and in this case I am the one disappointed ;)

Thanks for considering the additional chunksize argument to run_pipeline.

I disagree with the implementation decision but I understand your point.

I'd be curious to hear more about which piece you disagree with. The rolling behavior for start_date/end_date, or the default (non) chunking behavior? Would your preferred behavior be one of the other options I outlined above, or is there another alternative I missed?

All in all you can't make everybody happy and in this case I am the one disappointed ;)

I liked more the option 1 "Return an empty dataframe.", it's more deterministic. However I can see that is the one you discarded very quickly (probably the one you like the least) and that makes me think there is no such as the "best" solution, it's just a matter of personal preferences (even though every programmer likes to thing their view is the best ;)

I always appreciate your detailed answers though, no doubts are left at the end.

I'm with Luca on this - just hit the same thing by trying to do:

results = [run_pipeline(pipeline, '%s-01-01' % year, '%s-12-31' % year) for year in range(2010,2016)]
results.append(run_pipeline(pipeline, '2016-01-01', '2016-06-30'))
results = pd.concat(results).fillna(value=0.)


to get around memory exhaustion in pipeline. It works for some years, not for others (presumably depending on whether 12-31 is a trading day or not),
as you get duplicate index entries. The call for 2011 returns data for 2012-01-03 despite it being clearly outside of the specified range? That seems like a very unintuitive behaviour to me - principle of least surprise, etc.

I can see the argument for option (2) above to throw an exception instead of returning an empty frame - but it seems like the API should do exactly what the user asks it to do, not pad.

Well, you can use my function above that works fine. It prints the unexpected return dates but it manages to get the next chunk from the right date to avoid duplicates.

Hi Luca -

I'm trying out the run_pipeline_chunks you posted above. Is it the most up-to-date version? Is any post-processing of the result required? Or should I expect it to be the same as the output of run_pipeline?

No big deal, but the required chunks_len = pd.Timedelta(days=126) threw me, since I was expecting to be able to put in chunks_len = 126. If this does get incorporated into the Quantopian API, I'd recommend the latter, versus requiring users to do the time formatting within the function call.

Also, for chunks_len = None you could just not chunk at all. The function would just call the existing run_pipeline for backward compatibility.

@Grant

I am currently using the following code, but that version works fine too. You don't need to do any post processing, it's a drop-in replacement.
I agree with you regarding the chunks_len parameter and I hope Q will add this feature eventually but they certainly won't use this code.

def run_pipeline_chunks(pipe, start_date, end_date, chunks_len = None):
"""
Drop-in replacement for run_pipeline.
run_pipeline fails over a very long period of time (memery usage),
so we need to split in chunks the pipeline and concatenate the results
"""
chunks  = []
current = pd.Timestamp(start_date)
end     = pd.Timestamp(end_date)
step    = pd.Timedelta(weeks=26) if chunks_len is None else chunks_len
while current <= end:
current_end = current + step
if current_end > end:
current_end = end
print 'Running pipeline:', current, ' - ', current_end
results = run_pipeline(pipe, current.strftime("%Y-%m-%d"), current_end.strftime("%Y-%m-%d"))
chunks.append(results)
# pipeline returns more days than requested (if no trading day), so get last date from the results
current_end = results.index.get_level_values(0)[-1].tz_localize(None)
current = current_end + pd.Timedelta(days=1)

return pd.concat(chunks)


Thanks Luca. --Grant

zipline:

    def attach_pipeline(self, pipeline, name, chunks=None, eager=True):
"""Register a pipeline to be computed at the start of each day.

Parameters
----------
pipeline : Pipeline
The pipeline to have computed.
name : str
The name of the pipeline.
chunks : int or iterator, optional
The number of days to compute pipeline results for. Increasing
this number will make it longer to get the first results but
may improve the total runtime of the simulation. If an iterator
is passed, we will run in chunks based on values of the iterator.
Default is True.
eager : bool, optional
Whether or not to compute this pipeline prior to

attach_pipeline(make_pipeline(), 'pipe', 10)

And then there's eager. Nice. :)