2121import subprocess
2222from pathlib import Path
2323
24+ import dask .dataframe as dd
2425import pandas as pd
2526
2627from ..utils .file_io import BaseFileIO
2728
2829__all__ = [
29- "PandasParquetIO " ,
30+ "DaskParquetIO " ,
3031 "PandasCSVIO" ,
3132 "PandasExcelIO" ,
33+ "PandasParquetIO" ,
3234 "SimpleStringIO" ,
3335 "copy_csv2md" ,
3436]
3537
3638_log = logging .getLogger (__name__ )
3739
3840
39- class PandasParquetIO (BaseFileIO ):
40- """Simple helper class to read/write pandas to parquet, including path and
41- extension checking.
41+ class DaskParquetIO (BaseFileIO ):
42+ """Simple helper class to read/write dask dataframes to parquet, including
43+ path and extension checking.
4244 """
4345
4446 def __init__ (self , * args , ** kwargs ):
4547 """Inherit super"""
4648 super ().__init__ (* args , ** kwargs )
4749
48- def read (self , fn : str , * args , ** kwargs ) -> pd .DataFrame :
49- """Read parquet fn from rootdir, pass args kwargs to pd .read_parquet"""
50+ def read (self , fn : str , * args , ** kwargs ) -> dd .DataFrame :
51+ """Read parquet fn from rootdir, pass args kwargs to dask .read_parquet"""
5052 fn = Path (fn ).with_suffix (".parquet" )
5153 fqn = self .get_path_read (fn )
5254 _log .info (f"Read from { str (fqn .resolve ())} " )
53- return pd .read_parquet (str ( fqn ) , * args , ** kwargs )
55+ return dd .read_parquet (path = fqn , * args , ** kwargs )
5456
55- def write (self , df : pd .DataFrame , fn : str , * args , ** kwargs ) -> Path :
56- """Accept pandas DataFrame and fn e.g. `df.parquet`, write to fqn"""
57- fqn = self .get_path_write (Path (self .snl .clean (fn )).with_suffix (".parquet" ))
58- df .to_parquet (str (fqn ), * args , ** kwargs )
59- _log .info (f"Written to { str (fqn .resolve ())} " )
60- return fqn
57+ def write (self , ddf : dd .DataFrame , fn : str , * args , ** kwargs ) -> Path :
58+ """Accept dask DataFrame and fn e.g. `df.parquet`, write to fqn"""
59+ raise NotImplementedError
60+ # fqn = self.get_path_write(Path(self.snl.clean(fn)).with_suffix(".parquet"))
61+ # ddf.to_parquet(path=fqn, *args, **kwargs)
62+ # _log.info(f"Written to {str(fqn.resolve())}")
63+ # return fqn
6164
6265
6366class PandasCSVIO (BaseFileIO ):
@@ -74,7 +77,7 @@ def read(self, fn: str, *args, **kwargs) -> pd.DataFrame:
7477 fn = Path (fn ).with_suffix (".csv" )
7578 fqn = self .get_path_read (fn )
7679 _log .info (f"Read from { str (fqn .resolve ())} " )
77- return pd .read_csv (str ( fqn ) , * args , ** kwargs )
80+ return pd .read_csv (fqn , * args , ** kwargs )
7881
7982 def write (self , df : pd .DataFrame , fn : str , * args , ** kwargs ) -> str :
8083 """Accept pandas DataFrame and fn e.g. `df`, write to fn.csv
@@ -85,7 +88,7 @@ def write(self, df: pd.DataFrame, fn: str, *args, **kwargs) -> str:
8588 kws .update (quoting = csv .QUOTE_NONNUMERIC )
8689 if (len (df .index .names ) == 1 ) & (df .index .names [0 ] is None ):
8790 kws .update (index_label = "rowid" )
88- df .to_csv (str ( fqn ) , * args , ** kws )
91+ df .to_csv (fqn , * args , ** kws )
8992 _log .info (f"Written to { str (fqn .resolve ())} " )
9093 return fqn
9194
@@ -106,12 +109,12 @@ def read(self, fn: str, *args, **kwargs) -> pd.DataFrame:
106109 fn = Path (fn ).with_suffix (".xlsx" )
107110 fqn = self .get_path_read (fn )
108111 _log .info (f"Read from { str (fqn .resolve ())} " )
109- return pd .read_excel (str ( fqn ) , * args , ** kwargs )
112+ return pd .read_excel (fqn , * args , ** kwargs )
110113
111114 def write (self , df : pd .DataFrame , fn : str , * args , ** kwargs ) -> Path :
112115 """Accept pandas DataFrame and fn e.g. `df.xlsx`, write to fqn."""
113116 fqn = self .get_path_write (Path (self .snl .clean (fn )).with_suffix (".xlsx" ))
114- writer = pd .ExcelWriter (str ( fqn ) , engine = "xlsxwriter" )
117+ writer = pd .ExcelWriter (fqn , engine = "xlsxwriter" )
115118 df .to_excel (writer , * args , ** kwargs )
116119 writer .close ()
117120 _log .info (f"Written to { str (fqn .resolve ())} " )
@@ -142,6 +145,30 @@ def writer_close(self) -> Path:
142145 return fqn
143146
144147
148+ class PandasParquetIO (BaseFileIO ):
149+ """Simple helper class to read/write pandas to parquet, including path and
150+ extension checking.
151+ """
152+
153+ def __init__ (self , * args , ** kwargs ):
154+ """Inherit super"""
155+ super ().__init__ (* args , ** kwargs )
156+
157+ def read (self , fn : str , * args , ** kwargs ) -> pd .DataFrame :
158+ """Read parquet fn from rootdir, pass args kwargs to pd.read_parquet"""
159+ fn = Path (fn ).with_suffix (".parquet" )
160+ fqn = self .get_path_read (fn )
161+ _log .info (f"Read from { str (fqn .resolve ())} " )
162+ return pd .read_parquet (path = fqn , * args , ** kwargs )
163+
164+ def write (self , df : pd .DataFrame , fn : str , * args , ** kwargs ) -> Path :
165+ """Accept pandas DataFrame and fn e.g. `df.parquet`, write to fqn"""
166+ fqn = self .get_path_write (Path (self .snl .clean (fn )).with_suffix (".parquet" ))
167+ df .to_parquet (path = fqn , * args , ** kwargs )
168+ _log .info (f"Written to { str (fqn .resolve ())} " )
169+ return fqn
170+
171+
145172class SimpleStringIO (BaseFileIO ):
146173 """Helper class to read/write stringlike objects to txt or json files
147174 Set kind to
0 commit comments