diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..54f9789 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,141 @@ +name: CI +on: + push: + branches: + - "main" + pull_request: + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +jobs: + lint-python: + name: Lint Python + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + cache: "pip" + + - name: Run flake8 + uses: py-actions/flake8@v2 + + validate-compute-block: + name: Validate Compute Block Config + runs-on: ubuntu-latest + needs: lint-python + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + + - name: Intall dependencies + run: | + pip install -r requirements.txt + + - name: Check cbcs + run: | + python3 - <<'EOF' + import main + + from scystream.sdk.config import load_config, get_compute_block + from scystream.sdk.config.config_loader import _compare_configs + from pathlib import Path + + CBC_PATH = Path("cbc.yaml") + + if not CBC_PATH.exists(): + raise FileNotFoundError("cbc.yaml not found in repo root.") + + block_from_code = get_compute_block() + block_from_yaml = load_config(str(CBC_PATH)) + + _compare_configs(block_from_code, block_from_yaml) + + print("cbc.yaml matches python code definition") + EOF + + run-test: + name: Run Tests + runs-on: ubuntu-latest + needs: validate-compute-block + services: + minio: + image: lazybit/minio + ports: + - 9000:9000 + env: + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin + options: >- + --health-cmd "curl -f http://localhost:9000/minio/health/live || exit 1" + --health-interval 5s + --health-retries 5 + --health-timeout 5s + postgres: + image: postgres:15 + ports: + - 5432:5432 + env: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: postgres + options: >- + --health-cmd="pg_isready -U postgres" + --health-interval=5s + --health-retries=10 + --health-timeout=5s + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + cache: "pip" + + - name: Install dependencies + run: | + pip install -r requirements.txt + + - name: Run Tests + run: pytest -vv + + build: + name: Build docker image + runs-on: ubuntu-latest + needs: run-test + permissions: + contents: read + packages: write + steps: + - name: Checkout Repository + uses: actions/checkout@v4 + + - name: Log in to Docker Hub + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract metadata for docker + id: meta + uses: docker/metadata-action@v5 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}/topic-modeling + tags: | + type=ref, event=pr + type=raw, value=latest, enable=${{ (github.ref == format('refs/heads/{0}', 'main')) }} + + - name: Build and push Docker image + uses: docker/build-push-action@v5 + with: + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml deleted file mode 100644 index 2ef13d9..0000000 --- a/.github/workflows/docker.yaml +++ /dev/null @@ -1,44 +0,0 @@ -name: Docker -on: - push: - branches: - - "main" - pull_request: - -env: - REGISTRY: ghcr.io - IMAGE_NAME: ${{ github.repository }} - -jobs: - build: - name: Build docker image - runs-on: ubuntu-latest - permissions: - contents: read - packages: write - steps: - - name: Checkout Repository - uses: actions/checkout@v4 - - - name: Log in to Docker Hub - uses: docker/login-action@v3 - with: - registry: ${{ env.REGISTRY }} - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - - name: Extract metadata for docker - id: meta - uses: docker/metadata-action@v5 - with: - images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}/topic-modeling - tags: | - type=ref, event=pr - type=raw, value=latest, enable=${{ (github.ref == format('refs/heads/{0}', 'main')) }} - - - name: Build and push Docker image - uses: docker/build-push-action@v5 - with: - push: true - tags: ${{ steps.meta.outputs.tags }} - labels: ${{ steps.meta.outputs.labels }} diff --git a/algorithms/__init__.py b/algorithms/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/algorithms/lda.py b/algorithms/lda.py index 883552f..b916540 100644 --- a/algorithms/lda.py +++ b/algorithms/lda.py @@ -1,8 +1,12 @@ +import logging + import numpy as np import pandas as pd from sklearn.decomposition import LatentDirichletAllocation +logger = logging.getLogger(__name__) + class LDAModeler: def __init__( @@ -24,6 +28,11 @@ def __init__( self.random_state = random_state self.n_top_words = n_top_words + logger.debug( + f"Initialized LDAModeler: topics={n_topics}, iter={max_iter}, " + f"learning='{learning_method}', n_top_words={n_top_words}" + ) + self.lda = LatentDirichletAllocation( n_components=n_topics, max_iter=max_iter, @@ -34,22 +43,31 @@ def __init__( self.doc_topic_dist = None def fit(self): + logger.info(f"Running LDA fit(), DTM shape {self.dtm.shape}") self.lda.fit(self.dtm) + logger.info("LDA model fitted successfully") def extract_doc_topics(self) -> pd.DataFrame: """ Generates document-topic distribution DF """ + logger.info("Extracting doc-topics...") self.doc_topic_dist = self.lda.transform(self.dtm) - return pd.DataFrame( + + df = pd.DataFrame( self.doc_topic_dist, columns=[f"topic_{i}" for i in range(self.n_topics)], ) + logger.debug( + f"Extracted doc-topic distribution DataFrame shape={df.shape}") + return df + def extract_topic_terms(self): """ Generate topic and top-terms DataFrame """ + logger.info("Extracting top terms per topic...") idx2term = {idx: term for term, idx in self.vocab.items()} topic_rows = [] @@ -62,4 +80,7 @@ def extract_topic_terms(self): "term": idx2term[i], "weight": topic[i] }) - return pd.DataFrame(topic_rows) + + df = pd.DataFrame(topic_rows) + logger.info(f"Generated topic_terms DataFrame rows={df.shape[0]}") + return df diff --git a/docker-compose.yml b/docker-compose.yml index 13022c9..4dc26cc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,4 +7,17 @@ services: - POSTGRES_PASSWORD=postgres - POSTGRES_DB=postgres ports: - - "5432:5432" + - "5432:5432" + + minio: + image: quay.io/minio/minio + restart: always + command: server /data --console-address ":9001" + environment: + - MINIO_ROOT_USER=minioadmin + - MINIO_ROOT_PASSWORD=minioadmin + - MINIO_LOG_LEVEL=debug + ports: + - "9000:9000" + - "9001:9001" + diff --git a/dtm.pkl b/dtm.pkl new file mode 100644 index 0000000..0ea1d88 Binary files /dev/null and b/dtm.pkl differ diff --git a/main.py b/main.py index 02a271e..aae6284 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,4 @@ +import logging import pickle from scystream.sdk.core import entrypoint @@ -13,6 +14,12 @@ from algorithms.lda import LDAModeler +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + class DTMFileInput(FileSettings, InputSettings): __identifier__ = "dtm" @@ -48,23 +55,38 @@ class LDATopicModeling(EnvSettings): def write_df_to_postgres(df, settings: PostgresSettings): + logger.info(f"Writing DataFrame to DB table '{settings.DB_TABLE}'…") + engine = create_engine( - f"postgresql+psycopg2://{settings.PG_USER}:{settings.PG_PASS}" - f"@{settings.PG_HOST}:{int(settings.PG_PORT)}/" + f"postgresql+psycopg2://{settings.PG_USER}:{settings.PG_PASS}" + f"@{settings.PG_HOST}:{int(settings.PG_PORT)}/" ) df.to_sql(settings.DB_TABLE, engine, if_exists="replace", index=False) + logger.info(f"Successfully wrote {len(df)} rows to '{settings.DB_TABLE}'.") @entrypoint(LDATopicModeling) def lda_topic_modeling(settings): + logger.info("Starting LDA topic modeling pipeline…") + + logger.info("Downloading vocabulary file...") S3Operations.download(settings.vocab, "vocab.pkl") + + logger.info("Loading vocab.pkl from disk...") with open("vocab.pkl", "rb") as f: vocab = pickle.load(f) + logger.info(f"Loaded vocab with {len(vocab)} terms.") + + logger.info("Downloading DTM file...") S3Operations.download(settings.dtm, "dtm.pkl") + + logger.info("Loading dtm.pkl from disk...") with open("dtm.pkl", "rb") as f: dtm = pickle.load(f) + logger.info(f"Loaded DTM with shape {dtm.shape}") + # TODO: Check if dtm and vocab is of correct schema lda = LDAModeler( dtm=dtm, diff --git a/requirements.txt b/requirements.txt index 6bbed29..b6ac3b1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,4 @@ numpy==2.3.3 pandas==2.3.2 SQLAlchemy==2.0.43 psycopg2-binary==2.9.10 +pytest==9.0.1 diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/test_lda_entrypoint.py b/test/test_lda_entrypoint.py new file mode 100644 index 0000000..c3d92ba --- /dev/null +++ b/test/test_lda_entrypoint.py @@ -0,0 +1,152 @@ +import os +import boto3 +import pytest +import psycopg2 +import time +import pandas as pd + +from pathlib import Path +from botocore.exceptions import ClientError +from main import lda_topic_modeling + +MINIO_USER = "minioadmin" +MINIO_PWD = "minioadmin" +BUCKET_NAME = "testbucket" + +POSTGRES_USER = "postgres" +POSTGRES_PWD = "postgres" + +N_TOPICS = 5 + + +def ensure_bucket(s3, bucket): + try: + s3.head_bucket(Bucket=bucket) + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code in ("404", "NoSuchBucket"): + s3.create_bucket(Bucket=bucket) + else: + raise + + +def download_to_tmp(s3, bucket, key): + tmp_path = Path("/tmp") / key.replace("/", "_") + s3.download_file(bucket, key, str(tmp_path)) + return tmp_path + + +@pytest.fixture +def s3_minio(): + client = boto3.client( + "s3", + endpoint_url="http://localhost:9000", + aws_access_key_id=MINIO_USER, + aws_secret_access_key=MINIO_PWD + ) + ensure_bucket(client, BUCKET_NAME) + return client + + +@pytest.fixture(scope="session") +def postgres_conn(): + """Wait until postgres is ready, then yield a live connection.""" + for _ in range(30): + try: + conn = psycopg2.connect( + host="127.0.0.1", + port=5432, + user=POSTGRES_USER, + password=POSTGRES_PWD, + database="postgres" + ) + conn.autocommit = True + yield conn + conn.close() + return + except Exception: + time.sleep(1) + raise RuntimeError("Postgres did not start") + + +def test_lda_entrypoint(s3_minio, postgres_conn): + input_dtm_file_name = "dtm" + input_vocab_file_name = "vocab" + + doc_topic_table_name = "doc_topic" + topic_terms_table_name = "topic_terms" + + dtm_path = Path(__file__).parent / "files" / f"{input_dtm_file_name}.pkl" + dtm_bytes = dtm_path.read_bytes() + + vocab_path = Path(__file__).parent / "files" / \ + f"{input_vocab_file_name}.pkl" + vocab_bytes = vocab_path.read_bytes() + + s3_minio.put_object( + Bucket=BUCKET_NAME, + Key=f"{input_dtm_file_name}.pkl", + Body=dtm_bytes + ) + s3_minio.put_object( + Bucket=BUCKET_NAME, + Key=f"{input_vocab_file_name}.pkl", + Body=vocab_bytes + ) + + env = { + "N_TOPICS": "5", + + "dtm_S3_HOST": "http://127.0.0.1", + "dtm_S3_PORT": "9000", + "dtm_S3_ACCESS_KEY": MINIO_USER, + "dtm_S3_SECRET_KEY": MINIO_PWD, + "dtm_BUCKET_NAME": BUCKET_NAME, + "dtm_FILE_PATH": "", + "dtm_FILE_NAME": input_dtm_file_name, + "dtm_FILE_EXT": "pkl", + + "vocab_S3_HOST": "http://127.0.0.1", + "vocab_S3_PORT": "9000", + "vocab_S3_ACCESS_KEY": MINIO_USER, + "vocab_S3_SECRET_KEY": MINIO_PWD, + "vocab_BUCKET_NAME": BUCKET_NAME, + "vocab_FILE_PATH": "", + "vocab_FILE_NAME": input_vocab_file_name, + "vocab_FILE_EXT": "pkl", + + "docs_to_topics_PG_HOST": "127.0.0.1", + "docs_to_topics_PG_PORT": "5432", + "docs_to_topics_PG_USER": POSTGRES_USER, + "docs_to_topics_PG_PASS": POSTGRES_PWD, + "docs_to_topics_DB_TABLE": doc_topic_table_name, + + "top_terms_per_topic_PG_HOST": "127.0.0.1", + "top_terms_per_topic_PG_PORT": "5432", + "top_terms_per_topic_PG_USER": POSTGRES_USER, + "top_terms_per_topic_PG_PASS": POSTGRES_PWD, + "top_terms_per_topic_DB_TABLE": topic_terms_table_name, + } + + for k, v in env.items(): + os.environ[k] = v + + lda_topic_modeling() + + cur = postgres_conn.cursor() + + # 1. doc-topic distribution + cur.execute(f"SELECT * FROM {doc_topic_table_name} ORDER BY 1;") + doc_topics = pd.DataFrame(cur.fetchall(), columns=[ + desc[0] for desc in cur.description]) + assert len(doc_topics) == 26 + assert doc_topics.shape[1] == N_TOPICS + + # 2. topic-term listing + cur.execute( + f"SELECT * FROM { + topic_terms_table_name} ORDER BY topic_id, weight DESC;") + topic_terms = pd.DataFrame(cur.fetchall(), columns=[ + desc[0] for desc in cur.description]) + assert len(topic_terms) > 0 + assert "term" in topic_terms.columns diff --git a/test/test_lda_modeler.py b/test/test_lda_modeler.py new file mode 100644 index 0000000..e3cdae3 --- /dev/null +++ b/test/test_lda_modeler.py @@ -0,0 +1,52 @@ +import pandas as pd +from algorithms.lda import LDAModeler + +import numpy as np +import pytest + + +@pytest.fixture +def small_vocab(): + # term -> index + return {"apple": 1, "banana": 1, "carrot": 2, "date": 3} + + +@pytest.fixture +def small_dtm(): + # 2 documents, 4 terms + return np.array([ + [0, 0, 2, 0], + [1, 1, 0, 3], + [0, 1, 1, 1], + ]) + + +def test_lda_fit(small_dtm, small_vocab): + lda = LDAModeler(dtm=small_dtm, vocab=small_vocab, n_topics=2) + lda.fit() + + assert lda.lda.components_.shape == (2, 4) + + +def test_extract_doc_topics(small_dtm, small_vocab): + lda = LDAModeler(dtm=small_dtm, vocab=small_vocab, n_topics=2) + lda.fit() + df = lda.extract_doc_topics() + + assert isinstance(df, pd.DataFrame) + assert df.shape == (3, 2) + assert list(df.columns) == ["topic_0", "topic_1"] + + +def test_extract_topic_terms(small_dtm, small_vocab): + lda = LDAModeler(dtm=small_dtm, vocab=small_vocab, + n_topics=2, n_top_words=2) + lda.fit() + df = lda.extract_topic_terms() + + # 2 topics x 2 top-words each = 4 rows + assert df.shape[0] == 4 + assert set(df.columns) == {"topic_id", "term", "weight"} + + # All terms must come from vocab + assert set(df["term"]).issubset(set(small_vocab.keys())) diff --git a/vocab.pkl b/vocab.pkl new file mode 100644 index 0000000..eeaed7f Binary files /dev/null and b/vocab.pkl differ