Skip to content

Commit 89de45c

Browse files
committed
🧪 Add unit tests for svdtriplet function in algebra.py
1 parent 524901c commit 89de45c

File tree

4 files changed

+318
-280
lines changed

4 files changed

+318
-280
lines changed

qolmat/imputations/mimca/imputer_mca.py

Lines changed: 7 additions & 212 deletions
Original file line numberDiff line numberDiff line change
@@ -1,218 +1,13 @@
11
import numpy as np # noqa: D100
22
import pandas as pd
33

4-
5-
def moy_p(V, weights):
6-
"""Compute the weighted mean of a vector, ignoring NaNs.
7-
8-
Parameters
9-
----------
10-
V : array-like
11-
Input vector with possible NaN values.
12-
weights : array-like
13-
Weights corresponding to each element in V.
14-
15-
Returns
16-
-------
17-
float
18-
Weighted mean of non-NaN elements.
19-
20-
"""
21-
mask = ~np.isnan(V)
22-
total_weight = np.sum(weights[mask])
23-
if total_weight == 0:
24-
return 0.0 # or use np.finfo(float).eps for a small positive value
25-
return np.sum(V[mask] * weights[mask]) / total_weight
26-
27-
28-
def tab_disjonctif_NA(df):
29-
"""Create a disjunctive (one-hot encoded).
30-
31-
Parameters
32-
----------
33-
df : DataFrame
34-
Input DataFrame with categorical and numeric variables.
35-
36-
Returns
37-
-------
38-
DataFrame
39-
Disjunctive table with one-hot encoding.
40-
41-
""" # noqa: E501
42-
df_encoded_list = []
43-
for col in df.columns:
44-
if df[col].dtype.name == "category" or df[col].dtype == object:
45-
df[col] = df[col].astype("category")
46-
# Include '__MISSING__' as a category if not already present
47-
if "__MISSING__" not in df[col].cat.categories:
48-
df[col] = df[col].cat.add_categories(["__MISSING__"])
49-
# Fill missing values with '__MISSING__'
50-
df[col] = df[col].fillna("__MISSING__")
51-
# One-hot encode the categorical variable
52-
encoded = pd.get_dummies(
53-
df[col],
54-
prefix=col,
55-
prefix_sep="_",
56-
dummy_na=False,
57-
dtype=float,
58-
)
59-
df_encoded_list.append(encoded)
60-
else:
61-
# Numeric column; keep as is
62-
df_encoded_list.append(df[[col]])
63-
# Concatenate all encoded columns
64-
df_encoded = pd.concat(df_encoded_list, axis=1)
65-
return df_encoded
66-
67-
68-
def tab_disjonctif_prop(df, seed=None):
69-
"""Perform probabilistic imputation for categorical columns using observed
70-
value distributions, without creating a separate missing category.
71-
72-
Parameters
73-
----------
74-
df : DataFrame
75-
DataFrame with categorical columns to impute.
76-
seed : int, optional
77-
Random seed for reproducibility. Default is None.
78-
79-
Returns
80-
-------
81-
DataFrame
82-
Disjunctive coded DataFrame with missing values probabilistically
83-
imputed.
84-
85-
""" # noqa: D205
86-
if seed is not None:
87-
np.random.seed(seed)
88-
df = df.copy()
89-
df_encoded_list = []
90-
for col in df.columns:
91-
if df[col].dtype.name == "category" or df[col].dtype == object:
92-
# Ensure categories are strings
93-
df[col] = df[col].cat.rename_categories(
94-
df[col].cat.categories.astype(str)
95-
)
96-
observed = df[col][df[col].notna()]
97-
categories = df[col].cat.categories.tolist()
98-
# Get observed frequencies
99-
freqs = observed.value_counts(normalize=True)
100-
# Impute missing values based on observed frequencies
101-
missing_indices = df[col][df[col].isna()].index
102-
if len(missing_indices) > 0:
103-
imputed_values = np.random.choice(
104-
freqs.index, size=len(missing_indices), p=freqs.values
105-
)
106-
df.loc[missing_indices, col] = imputed_values
107-
# One-hot encode without creating missing category
108-
encoded = pd.get_dummies(
109-
df[col],
110-
prefix=col,
111-
prefix_sep="_",
112-
dummy_na=False,
113-
dtype=float,
114-
)
115-
col_names = [f"{col}_{cat}" for cat in categories]
116-
encoded = encoded.reindex(columns=col_names, fill_value=0.0)
117-
df_encoded_list.append(encoded)
118-
else:
119-
df_encoded_list.append(df[[col]])
120-
df_encoded = pd.concat(df_encoded_list, axis=1)
121-
return df_encoded
122-
123-
124-
def find_category(df_original, tab_disj):
125-
"""Reconstruct the original categorical variables from the disjunctive.
126-
127-
Parameters
128-
----------
129-
df_original : DataFrame
130-
Original DataFrame with categorical variables.
131-
tab_disj : DataFrame
132-
Disjunctive table after imputation.
133-
134-
Returns
135-
-------
136-
DataFrame
137-
Reconstructed DataFrame with imputed categorical variables.
138-
139-
"""
140-
df_reconstructed = df_original.copy()
141-
start_idx = 0
142-
for col in df_original.columns:
143-
if (
144-
df_original[col].dtype.name == "category"
145-
or df_original[col].dtype == object
146-
): # noqa: E501
147-
categories = df_original[col].cat.categories.tolist()
148-
if "__MISSING__" in categories:
149-
missing_cat_index = categories.index("__MISSING__")
150-
else:
151-
missing_cat_index = None
152-
num_categories = len(categories)
153-
sub_tab = tab_disj.iloc[:, start_idx : start_idx + num_categories]
154-
if missing_cat_index is not None:
155-
sub_tab.iloc[:, missing_cat_index] = -np.inf
156-
# Find the category with the maximum value for each row
157-
max_indices = sub_tab.values.argmax(axis=1)
158-
df_reconstructed[col] = [categories[idx] for idx in max_indices]
159-
# Replace '__MISSING__' back to NaN
160-
df_reconstructed[col].replace("__MISSING__", np.nan, inplace=True)
161-
start_idx += num_categories
162-
else:
163-
# For numeric variables, keep as is
164-
start_idx += 1 # Increment start_idx by 1 for numeric columns
165-
return df_reconstructed
166-
167-
168-
def svdtriplet(X, row_w=None, ncp=np.inf):
169-
"""Perform weighted SVD on matrix X with row weights.
170-
171-
Parameters
172-
----------
173-
X : ndarray
174-
Data matrix of shape (n_samples, n_features).
175-
row_w : array-like, optional
176-
Row weights. If None, uniform weights are assumed. Default is None.
177-
ncp : int
178-
Number of principal components to retain. Default is infinity.
179-
180-
Returns
181-
-------
182-
s : ndarray
183-
Singular values.
184-
U : ndarray
185-
Left singular vectors.
186-
V : ndarray
187-
Right singular vectors.
188-
189-
"""
190-
if not isinstance(X, np.ndarray):
191-
X = np.array(X, dtype=float)
192-
else:
193-
X = X.astype(float)
194-
if row_w is None:
195-
row_w = np.ones(X.shape[0]) / X.shape[0]
196-
else:
197-
row_w = np.array(row_w, dtype=float)
198-
row_w /= row_w.sum()
199-
ncp = int(min(ncp, X.shape[0] - 1, X.shape[1]))
200-
# Apply weights to rows
201-
X_weighted = X * np.sqrt(row_w[:, None])
202-
# Perform SVD
203-
U, s, Vt = np.linalg.svd(X_weighted, full_matrices=False)
204-
V = Vt.T
205-
U = U[:, :ncp]
206-
V = V[:, :ncp]
207-
s = s[:ncp]
208-
# Adjust signs to ensure consistency
209-
mult = np.sign(np.sum(V, axis=0))
210-
mult[mult == 0] = 1
211-
U *= mult
212-
V *= mult
213-
# Rescale U by the square root of row weights
214-
U /= np.sqrt(row_w[:, None])
215-
return s, U, V
4+
from qolmat.utils.algebra import svdtriplet
5+
from qolmat.utils.utils import (
6+
find_category,
7+
moy_p,
8+
tab_disjonctif_NA,
9+
tab_disjonctif_prop,
10+
)
21611

21712

21813
def imputeMCA(

qolmat/utils/algebra.py

Lines changed: 37 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -100,82 +100,51 @@ def kl_divergence_gaussian_exact(
100100
return div_kl
101101

102102

103-
def svdtriplet(
104-
X: NDArray[np.float64],
105-
row_weights: Optional[NDArray[np.float64]] = None,
106-
col_weights: Optional[NDArray[np.float64]] = None,
107-
ncp: int = np.inf,
108-
) -> Tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
109-
"""Perform a weighted Singular Value Decomposition (SVD) of matrix X.
110-
111-
This function computes the SVD of a weighted matrix, where weights are
112-
applied to both the rows and columns. Row and column weights are optional,
113-
and if not provided, uniform weights are applied by default.
103+
def svdtriplet(X, row_w=None, ncp=np.inf):
104+
"""Perform weighted SVD on matrix X with row weights.
114105
115106
Parameters
116107
----------
117-
X : NDArray[np.float64]
118-
Input matrix to decompose with SVD.
119-
row_weights : Optional[NDArray[np.float64]], optional
120-
Weights for the rows of the matrix, by default None (uniform weights).
121-
col_weights : Optional[NDArray[np.float64]], optional
122-
Weights for the columns of the matrix, by default None (uniform
123-
weights).
124-
ncp : int, optional
125-
The number of components to retain, by default np.inf. This will be
126-
capped at min(rows-1, cols).
108+
X : ndarray
109+
Data matrix of shape (n_samples, n_features).
110+
row_w : array-like, optional
111+
Row weights. If None, uniform weights are assumed. Default is None.
112+
ncp : int
113+
Number of principal components to retain. Default is infinity.
127114
128115
Returns
129116
-------
130-
Tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]
131-
A tuple containing:
132-
- Singular values (s)
133-
- Left singular vectors (U)
134-
- Right singular vectors (V)
117+
s : ndarray
118+
Singular values.
119+
U : ndarray
120+
Left singular vectors.
121+
V : ndarray
122+
Right singular vectors.
135123
136124
"""
137-
X = np.asarray(X, dtype=np.float64)
138-
if X.ndim != 2:
139-
raise ValueError("Input matrix X must be 2-dimensional")
140-
n_rows, n_cols = X.shape
141-
if row_weights is None:
142-
row_weights = np.ones(n_rows) / n_rows
125+
if not isinstance(X, np.ndarray):
126+
X = np.array(X, dtype=float)
143127
else:
144-
row_weights = np.asarray(row_weights, dtype=np.float64)
145-
if row_weights.shape[0] != n_rows:
146-
raise ValueError("Row weights must match the number of rows in X")
147-
148-
if col_weights is None:
149-
col_weights = np.ones(n_cols)
150-
else:
151-
col_weights = np.asarray(col_weights, dtype=np.float64)
152-
if col_weights.shape[0] != n_cols:
153-
raise ValueError(
154-
"Column weights must match the number of columns in X"
155-
)
156-
157-
row_weights /= row_weights.sum()
158-
X_weighted = X * np.sqrt(col_weights) # Column weights
159-
X_weighted *= np.sqrt(row_weights[:, None]) # Row weights
160-
161-
ncp = min(ncp, n_rows - 1, n_cols)
162-
163-
if n_cols <= n_rows:
164-
U, s, Vt = np.linalg.svd(X_weighted, full_matrices=False)
165-
V = Vt.T
128+
X = X.astype(float)
129+
if row_w is None:
130+
row_w = np.ones(X.shape[0]) / X.shape[0]
166131
else:
167-
Vt, s, U = np.linalg.svd(X_weighted.T, full_matrices=False)
168-
V = Vt.T
169-
U = U.T
170-
171-
# Truncate U, V, and s to the top ncp components
172-
U, V, s = U[:, :ncp], V[:, :ncp], s[:ncp]
173-
174-
sign_correction = np.sign(np.sum(V, axis=0))
175-
sign_correction[sign_correction == 0] = 1
176-
U *= sign_correction
177-
V *= sign_correction
178-
U /= np.sqrt(row_weights[:, None])
179-
V /= np.sqrt(col_weights[:, None])
180-
132+
row_w = np.array(row_w, dtype=float)
133+
row_w /= row_w.sum()
134+
ncp = int(min(ncp, X.shape[0] - 1, X.shape[1]))
135+
# Apply weights to rows
136+
X_weighted = X * np.sqrt(row_w[:, None])
137+
# Perform SVD
138+
U, s, Vt = np.linalg.svd(X_weighted, full_matrices=False)
139+
V = Vt.T
140+
U = U[:, :ncp]
141+
V = V[:, :ncp]
142+
s = s[:ncp]
143+
# Adjust signs to ensure consistency
144+
mult = np.sign(np.sum(V, axis=0))
145+
mult[mult == 0] = 1
146+
U *= mult
147+
V *= mult
148+
# Rescale U by the square root of row weights
149+
U /= np.sqrt(row_w[:, None])
181150
return s, U, V

0 commit comments

Comments
 (0)