From 51837d6259e8704cbae37082336b9948acbd51c1 Mon Sep 17 00:00:00 2001 From: Zhichang Yu Date: Thu, 18 Sep 2025 09:56:19 +0800 Subject: [PATCH 1/2] Fix highlight with infinity Fix on OpenSUSE Tumbleweed --- README.md | 8 +- download_deps.py | 1 - pyproject.toml | 2 +- rag/utils/infinity_conn.py | 175 ++++++++++++++++++------------------- uv.lock | 6 +- 5 files changed, 94 insertions(+), 98 deletions(-) diff --git a/README.md b/README.md index 0a64d5c26d6..915000be677 100644 --- a/README.md +++ b/README.md @@ -341,11 +341,13 @@ docker build --platform linux/amd64 -f Dockerfile -t infiniflow/ragflow:nightly 5. If your operating system does not have jemalloc, please install it as follows: ```bash - # ubuntu + # Ubuntu sudo apt-get install libjemalloc-dev - # centos + # CentOS sudo yum install jemalloc - # mac + # OpenSUSE + sudo zypper install jemalloc + # macOS sudo brew install jemalloc ``` diff --git a/download_deps.py b/download_deps.py index bb815f74a89..d140be347ee 100644 --- a/download_deps.py +++ b/download_deps.py @@ -6,7 +6,6 @@ # dependencies = [ # "huggingface-hub", # "nltk", -# "argparse", # ] # /// diff --git a/pyproject.toml b/pyproject.toml index e46242fc758..b1f4d059c64 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -118,7 +118,7 @@ dependencies = [ "graspologic>=3.4.1,<4.0.0", "mini-racer>=0.12.4,<0.13.0", "pyodbc>=5.2.0,<6.0.0", - "pyicu>=2.13.1,<3.0.0", + "pyicu>=2.15.3,<3.0.0", "flasgger>=0.9.7.1,<0.10.0", "xxhash>=3.5.0,<4.0.0", "trio>=0.29.0", diff --git a/rag/utils/infinity_conn.py b/rag/utils/infinity_conn.py index 26a98e845fc..1ce80fdfff1 100644 --- a/rag/utils/infinity_conn.py +++ b/rag/utils/infinity_conn.py @@ -30,6 +30,7 @@ from rag.utils import singleton import pandas as pd from api.utils.file_utils import get_project_base_directory +from rag.nlp import is_english from rag.utils.doc_store_conn import ( DocStoreConnection, @@ -40,13 +41,15 @@ OrderByExpr, ) -logger = logging.getLogger('ragflow.infinity_conn') +logger = logging.getLogger("ragflow.infinity_conn") + def field_keyword(field_name: str): - # The "docnm_kwd" field is always a string, not list. - if field_name == "source_id" or (field_name.endswith("_kwd") and field_name != "docnm_kwd" and field_name != "knowledge_graph_kwd"): - return True - return False + # The "docnm_kwd" field is always a string, not list. + if field_name == "source_id" or (field_name.endswith("_kwd") and field_name != "docnm_kwd" and field_name != "knowledge_graph_kwd"): + return True + return False + def equivalent_condition_to_str(condition: dict, table_instance=None) -> str | None: assert "_id" not in condition @@ -74,7 +77,7 @@ def exists(cln): inCond = list() for item in v: if isinstance(item, str): - item = item.replace("'","''") + item = item.replace("'", "''") inCond.append(f"filter_fulltext('{k}', '{item}')") if inCond: strInCond = " or ".join(inCond) @@ -86,7 +89,7 @@ def exists(cln): inCond = list() for item in v: if isinstance(item, str): - item = item.replace("'","''") + item = item.replace("'", "''") inCond.append(f"'{item}'") else: inCond.append(str(item)) @@ -112,13 +115,13 @@ def concat_dataframes(df_list: list[pd.DataFrame], selectFields: list[str]) -> p df_list2 = [df for df in df_list if not df.empty] if df_list2: return pd.concat(df_list2, axis=0).reset_index(drop=True) - + schema = [] for field_name in selectFields: - if field_name == 'score()': # Workaround: fix schema is changed to score() - schema.append('SCORE') - elif field_name == 'similarity()': # Workaround: fix schema is changed to similarity() - schema.append('SIMILARITY') + if field_name == "score()": # Workaround: fix schema is changed to score() + schema.append("SCORE") + elif field_name == "similarity()": # Workaround: fix schema is changed to similarity() + schema.append("SIMILARITY") else: schema.append(field_name) return pd.DataFrame(columns=schema) @@ -158,9 +161,7 @@ def __init__(self): def _migrate_db(self, inf_conn): inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore) - fp_mapping = os.path.join( - get_project_base_directory(), "conf", "infinity_mapping.json" - ) + fp_mapping = os.path.join(get_project_base_directory(), "conf", "infinity_mapping.json") if not os.path.exists(fp_mapping): raise Exception(f"Mapping file not found at {fp_mapping}") schema = json.load(open(fp_mapping)) @@ -178,16 +179,12 @@ def _migrate_db(self, inf_conn): continue res = inf_table.add_columns({field_name: field_info}) assert res.error_code == infinity.ErrorCode.OK - logger.info( - f"INFINITY added following column to table {table_name}: {field_name} {field_info}" - ) + logger.info(f"INFINITY added following column to table {table_name}: {field_name} {field_info}") if field_info["type"] != "varchar" or "analyzer" not in field_info: continue inf_table.create_index( f"text_idx_{field_name}", - IndexInfo( - field_name, IndexType.FullText, {"ANALYZER": field_info["analyzer"]} - ), + IndexInfo(field_name, IndexType.FullText, {"ANALYZER": field_info["analyzer"]}), ConflictType.Ignore, ) @@ -221,9 +218,7 @@ def createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int): inf_conn = self.connPool.get_conn() inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore) - fp_mapping = os.path.join( - get_project_base_directory(), "conf", "infinity_mapping.json" - ) + fp_mapping = os.path.join(get_project_base_directory(), "conf", "infinity_mapping.json") if not os.path.exists(fp_mapping): raise Exception(f"Mapping file not found at {fp_mapping}") schema = json.load(open(fp_mapping)) @@ -253,15 +248,11 @@ def createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int): continue inf_table.create_index( f"text_idx_{field_name}", - IndexInfo( - field_name, IndexType.FullText, {"ANALYZER": field_info["analyzer"]} - ), + IndexInfo(field_name, IndexType.FullText, {"ANALYZER": field_info["analyzer"]}), ConflictType.Ignore, ) self.connPool.release_conn(inf_conn) - logger.info( - f"INFINITY created table {table_name}, vector size {vectorSize}" - ) + logger.info(f"INFINITY created table {table_name}, vector size {vectorSize}") def deleteIdx(self, indexName: str, knowledgebaseId: str): table_name = f"{indexName}_{knowledgebaseId}" @@ -288,20 +279,21 @@ def indexExist(self, indexName: str, knowledgebaseId: str) -> bool: """ def search( - self, selectFields: list[str], - highlightFields: list[str], - condition: dict, - matchExprs: list[MatchExpr], - orderBy: OrderByExpr, - offset: int, - limit: int, - indexNames: str | list[str], - knowledgebaseIds: list[str], - aggFields: list[str] = [], - rank_feature: dict | None = None + self, + selectFields: list[str], + highlightFields: list[str], + condition: dict, + matchExprs: list[MatchExpr], + orderBy: OrderByExpr, + offset: int, + limit: int, + indexNames: str | list[str], + knowledgebaseIds: list[str], + aggFields: list[str] = [], + rank_feature: dict | None = None, ) -> tuple[pd.DataFrame, int]: """ - TODO: Infinity doesn't provide highlight + BUG: Infinity returns empty for a highlight field if the query string doesn't use that field. """ if isinstance(indexNames, str): indexNames = indexNames.split(",") @@ -438,9 +430,7 @@ def search( matchExpr.extra_options.copy(), ) elif isinstance(matchExpr, FusionExpr): - builder = builder.fusion( - matchExpr.method, matchExpr.topn, matchExpr.fusion_params - ) + builder = builder.fusion(matchExpr.method, matchExpr.topn, matchExpr.fusion_params) else: if filter_cond and len(filter_cond) > 0: builder.filter(filter_cond) @@ -455,15 +445,13 @@ def search( self.connPool.release_conn(inf_conn) res = concat_dataframes(df_list, output) if matchExprs: - res['Sum'] = res[score_column] + res[PAGERANK_FLD] - res = res.sort_values(by='Sum', ascending=False).reset_index(drop=True).drop(columns=['Sum']) + res["Sum"] = res[score_column] + res[PAGERANK_FLD] + res = res.sort_values(by="Sum", ascending=False).reset_index(drop=True).drop(columns=["Sum"]) res = res.head(limit) logger.debug(f"INFINITY search final result: {str(res)}") return res, total_hits_count - def get( - self, chunkId: str, indexName: str, knowledgebaseIds: list[str] - ) -> dict | None: + def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None: inf_conn = self.connPool.get_conn() db_instance = inf_conn.get_database(self.dbName) df_list = list() @@ -476,8 +464,7 @@ def get( try: table_instance = db_instance.get_table(table_name) except Exception: - logger.warning( - f"Table not found: {table_name}, this knowledge base isn't created in Infinity. Maybe it is created in other document engine.") + logger.warning(f"Table not found: {table_name}, this knowledge base isn't created in Infinity. Maybe it is created in other document engine.") continue kb_res, _ = table_instance.output(["*"]).filter(f"id = '{chunkId}'").to_df() logger.debug(f"INFINITY get table: {str(table_list)}, result: {str(kb_res)}") @@ -487,9 +474,7 @@ def get( res_fields = self.getFields(res, res.columns.tolist()) return res_fields.get(chunkId, None) - def insert( - self, documents: list[dict], indexName: str, knowledgebaseId: str = None - ) -> list[str]: + def insert(self, documents: list[dict], indexName: str, knowledgebaseId: str = None) -> list[str]: inf_conn = self.connPool.get_conn() db_instance = inf_conn.get_database(self.dbName) table_name = f"{indexName}_{knowledgebaseId}" @@ -532,7 +517,7 @@ def insert( d[k] = v elif re.search(r"_feas$", k): d[k] = json.dumps(v) - elif k == 'kb_id': + elif k == "kb_id": if isinstance(d[k], list): d[k] = d[k][0] # since d[k] is a list, but we need a str elif k == "position_int": @@ -561,18 +546,16 @@ def insert( logger.debug(f"INFINITY inserted into {table_name} {str_ids}.") return [] - def update( - self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str - ) -> bool: + def update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) -> bool: # if 'position_int' in newValue: # logger.info(f"update position_int: {newValue['position_int']}") inf_conn = self.connPool.get_conn() db_instance = inf_conn.get_database(self.dbName) table_name = f"{indexName}_{knowledgebaseId}" table_instance = db_instance.get_table(table_name) - #if "exists" in condition: + # if "exists" in condition: # del condition["exists"] - + clmns = {} if table_instance: for n, ty, de, _ in table_instance.show_columns().rows(): @@ -587,7 +570,7 @@ def update( newValue[k] = v elif re.search(r"_feas$", k): newValue[k] = json.dumps(v) - elif k == 'kb_id': + elif k == "kb_id": if isinstance(newValue[k], list): newValue[k] = newValue[k][0] # since d[k] is a list, but we need a str elif k == "position_int": @@ -611,11 +594,11 @@ def update( del newValue[k] else: newValue[k] = v - - remove_opt = {} # "[k,new_value]": [id_to_update, ...] + + remove_opt = {} # "[k,new_value]": [id_to_update, ...] if removeValue: col_to_remove = list(removeValue.keys()) - row_to_opt = table_instance.output(col_to_remove + ['id']).filter(filter).to_df() + row_to_opt = table_instance.output(col_to_remove + ["id"]).filter(filter).to_df() logger.debug(f"INFINITY search table {str(table_name)}, filter {filter}, result: {str(row_to_opt[0])}") row_to_opt = self.getFields(row_to_opt, col_to_remove) for id, old_v in row_to_opt.items(): @@ -632,8 +615,8 @@ def update( logger.debug(f"INFINITY update table {table_name}, filter {filter}, newValue {newValue}.") for update_kv, ids in remove_opt.items(): k, v = json.loads(update_kv) - table_instance.update(filter + " AND id in ({0})".format(",".join([f"'{id}'" for id in ids])), {k:"###".join(v)}) - + table_instance.update(filter + " AND id in ({0})".format(",".join([f"'{id}'" for id in ids])), {k: "###".join(v)}) + table_instance.update(filter, newValue) self.connPool.release_conn(inf_conn) return True @@ -645,9 +628,7 @@ def delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int: try: table_instance = db_instance.get_table(table_name) except Exception: - logger.warning( - f"Skipped deleting from table {table_name} since the table doesn't exist." - ) + logger.warning(f"Skipped deleting from table {table_name} since the table doesn't exist.") return 0 filter = equivalent_condition_to_str(condition, table_instance) logger.debug(f"INFINITY delete table {table_name}, filter {filter}.") @@ -675,37 +656,39 @@ def getFields(self, res: tuple[pd.DataFrame, int] | pd.DataFrame, fields: list[s if not fields: return {} fieldsAll = fields.copy() - fieldsAll.append('id') + fieldsAll.append("id") column_map = {col.lower(): col for col in res.columns} - matched_columns = {column_map[col.lower()]:col for col in set(fieldsAll) if col.lower() in column_map} + matched_columns = {column_map[col.lower()]: col for col in set(fieldsAll) if col.lower() in column_map} none_columns = [col for col in set(fieldsAll) if col.lower() not in column_map] res2 = res[matched_columns.keys()] res2 = res2.rename(columns=matched_columns) - res2.drop_duplicates(subset=['id'], inplace=True) + res2.drop_duplicates(subset=["id"], inplace=True) for column in res2.columns: k = column.lower() if field_keyword(k): - res2[column] = res2[column].apply(lambda v:[kwd for kwd in v.split("###") if kwd]) + res2[column] = res2[column].apply(lambda v: [kwd for kwd in v.split("###") if kwd]) elif re.search(r"_feas$", k): res2[column] = res2[column].apply(lambda v: json.loads(v) if v else {}) elif k == "position_int": + def to_position_int(v): if v: - arr = [int(hex_val, 16) for hex_val in v.split('_')] - v = [arr[i:i + 5] for i in range(0, len(arr), 5)] + arr = [int(hex_val, 16) for hex_val in v.split("_")] + v = [arr[i : i + 5] for i in range(0, len(arr), 5)] else: v = [] return v + res2[column] = res2[column].apply(to_position_int) elif k in ["page_num_int", "top_int"]: - res2[column] = res2[column].apply(lambda v:[int(hex_val, 16) for hex_val in v.split('_')] if v else []) + res2[column] = res2[column].apply(lambda v: [int(hex_val, 16) for hex_val in v.split("_")] if v else []) else: pass for column in none_columns: res2[column] = None - + return res2.set_index("id").to_dict(orient="index") def getHighlight(self, res: tuple[pd.DataFrame, int] | pd.DataFrame, keywords: list[str], fieldnm: str): @@ -719,23 +702,35 @@ def getHighlight(self, res: tuple[pd.DataFrame, int] | pd.DataFrame, keywords: l for i in range(num_rows): id = column_id[i] txt = res[fieldnm][i] + if re.search(r"[^<>]+", txt, flags=re.IGNORECASE | re.MULTILINE): + ans[id] = txt + continue txt = re.sub(r"[\r\n]", " ", txt, flags=re.IGNORECASE | re.MULTILINE) txts = [] for t in re.split(r"[.?!;\n]", txt): - for w in keywords: - t = re.sub( - r"(^|[ .?/'\"\(\)!,:;-])(%s)([ .?/'\"\(\)!,:;-])" - % re.escape(w), - r"\1\2\3", - t, - flags=re.IGNORECASE | re.MULTILINE, - ) - if not re.search( - r"[^<>]+", t, flags=re.IGNORECASE | re.MULTILINE - ): + if is_english([t]): + for w in keywords: + t = re.sub( + r"(^|[ .?/'\"\(\)!,:;-])(%s)([ .?/'\"\(\)!,:;-])" % re.escape(w), + r"\1\2\3", + t, + flags=re.IGNORECASE | re.MULTILINE, + ) + else: + for w in sorted(keywords, key=len, reverse=True): + t = re.sub( + w, + f"{w}", + t, + flags=re.IGNORECASE | re.MULTILINE, + ) + if not re.search(r"[^<>]+", t, flags=re.IGNORECASE | re.MULTILINE): continue txts.append(t) - ans[id] = "...".join(txts) + if txts: + ans[id] = "...".join(txts) + else: + ans[id] = txt return ans def getAggregation(self, res: tuple[pd.DataFrame, int] | pd.DataFrame, fieldnm: str): diff --git a/uv.lock b/uv.lock index 328d3aac7c5..c0d5a04737b 100644 --- a/uv.lock +++ b/uv.lock @@ -4854,9 +4854,9 @@ wheels = [ [[package]] name = "pyicu" -version = "2.15.2" +version = "2.15.3" source = { registry = "https://mirrors.aliyun.com/pypi/simple" } -sdist = { url = "https://mirrors.aliyun.com/pypi/packages/9f/57/9db810ab75133a1c87ac2e327fb59199d78d233f575fbb63bfd3492b769c/pyicu-2.15.2.tar.gz", hash = "sha256:561e77eedff17cec6839f26211f7a5ce3c071b776e8a0ec9d1207f46cbce598f" } +sdist = { url = "https://mirrors.aliyun.com/pypi/packages/88/b0/c8b61bac55424e2ff80e20d7251c3f002baff3c07c34cee3849e3505d8f5/pyicu-2.15.3.tar.gz", hash = "sha256:f32e78e1cb64d0aeb14f027e037a8944861d3114548818a6adf0081ef51aefc3" } [[package]] name = "pyjwt" @@ -5514,7 +5514,7 @@ requires-dist = [ { name = "psycopg2-binary", specifier = "==2.9.9" }, { name = "pyclipper", specifier = "==1.3.0.post5" }, { name = "pycryptodomex", specifier = "==3.20.0" }, - { name = "pyicu", specifier = ">=2.13.1,<3.0.0" }, + { name = "pyicu", specifier = ">=2.15.3,<3.0.0" }, { name = "pymysql", specifier = ">=1.1.1,<2.0.0" }, { name = "pyodbc", specifier = ">=5.2.0,<6.0.0" }, { name = "pypdf", specifier = "==6.0.0" }, From 15e444801a48f22dd3502daf6374f187750bd004 Mon Sep 17 00:00:00 2001 From: Zhichang Yu Date: Sun, 28 Sep 2025 17:54:20 +0800 Subject: [PATCH 2/2] Resolve review comments --- rag/utils/infinity_conn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rag/utils/infinity_conn.py b/rag/utils/infinity_conn.py index 1ce80fdfff1..001531a28e4 100644 --- a/rag/utils/infinity_conn.py +++ b/rag/utils/infinity_conn.py @@ -719,7 +719,7 @@ def getHighlight(self, res: tuple[pd.DataFrame, int] | pd.DataFrame, keywords: l else: for w in sorted(keywords, key=len, reverse=True): t = re.sub( - w, + re.escape(w), f"{w}", t, flags=re.IGNORECASE | re.MULTILINE,