I am a big proponent of Polars for data analysis. It combines the ease of Python with the speed of Rust. In many cases it allows working with very large datasets very convenient. I have even written about it before on this blog.
Despite the nice things I wrote above about Polars above, it is still a work in progress. One big issue it has is less than ideal memory management when working with very large dataframes as discussed in this ticket on github. I have seen similar issues with Polars, where for no explicable reason it will run my machine out of memory. Below is a simplified example of a situation that I have run into this problem.
This example is in the middle of a processing pipeline. I have created a set of
Parquet files in which
I am including an ff column because I want to be able to filter on it.
The entire set of Parquet files in dirname would not fit in memory, but
a subset will, and I can process these subsets separately and
combine the reduced dataframes at the end to achieve the same result.
The distribution of values in ff is very even,
meaning that each subset uses very closely the same amount of memory.
Because ff is in the Parquet files, Polars can use
predicate pushdown
to filter on ff at the read step, meaning that the full dataset is never
read into memory.
Indeed, the first few loops of this process work quite well. The filtered subsets fit into memory, and reduced dataframes are calculated. However, eventually Polars stops managing memory correctly and things go bad, and the machine runs out of memory.
import polars as pl
data = pl.scan_parquet(dirname)
reduced = []
# this will eventually run out of memory
for ff in range(N):
print("doing", ff)
# note that data is untouched each cycle
data1 = data.filter(pl.col("ff") == ff)
# Do some various LazyFrame manipulations on data1; we don't call
# .collect() until below
reduced.append(data1.collect())
reduced_data = pl.concat(reduced)
If the situation is amenable, there is a way around this.
By running the Polars steps in a subprocess using
multiprocessing,
we can force Polars to free memory after each cycle by shutting down
the process that runs Polars every cycle.
Of course, there is a little bit of additional overhead here.
Starting the subprocess takes a little time, as does re-scanning the
directory of files in scan_parquet().
There is also some cost in
un/pickling
the reduced dataframe between the main process and the Polars subprocess.
But if the data is large enough to resort to chunking like this, these extra costs
are rounding errors, and are not worth worrying about.
from multiprocessing import Process, Queue, set_start_method
def worker(dirname, ff, out_q):
data = pl.scan_parquet(dirname)
data = data.filter(pl.col("ff") == ff)
# Do some various LazyFrame manipulations on data; we don't call
# .collect() until below
out_q.put(data.collect())
if __name__ == "__main__":
set_start_method("spawn")
out_q = Queue()
# this works to completion
for ff in range(N):
print("doing", ff)
p = Process(target=worker, args=(dirname, ff, out_q), daemon=True)
p.start()
reduced.append(out_q.get())
p.join()
reduced_data = pl.concat(reduced)
I think this trick very simply illustrates the way that Polars is currently mismanaging memory. The two examples above are doing the same basic thing, but one works and the other doesn't. In fact, if I run the two methods above and watch memory use in real time, the two methods have similar behavior for some number of cycles. The memory use goes up and down as the filtered data is processed and cleaned up each cycle. This indicates that Polars can do memory management correctly, but for some reason after too many cycles, Polars stops freeing memory. In the github ticket linked above, it's suggested that a fix might have a performance cost. Hopefully something can be done about this that both prevents the memory leak and doesn't cost too much in performance. For now, this trick can be useful in the right situations.