Skip to content

Commit

Permalink
Merge branch 'main' into collapse-create-base-text-units
Browse files Browse the repository at this point in the history
  • Loading branch information
natoverse authored Sep 23, 2024
2 parents 744b172 + ea46820 commit f11f5be
Show file tree
Hide file tree
Showing 14 changed files with 397 additions and 101 deletions.
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20240911201935470388.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "Calculate new inputs and deleted inputs on update"
}
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20240918221118566693.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "Merge existing and new entities, updating values accordingly"
}
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20240920000120463201.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "Collapse create-final-nodes."
}
2 changes: 1 addition & 1 deletion docsite/posts/config/env_vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ If the embedding target is `all`, and you want to only embed a subset of these f

## Input Data

Our pipeline can ingest .csv or .txt data from an input folder. These files can be nested within subfolders. To configure how input data is handled, what fields are mapped over, and how timestamps are parsed, look for configuration values starting with `GRAPHRAG_INPUT_` below. In general, CSV-based data provides the most customizeability. Each CSV should at least contain a `text` field (which can be mapped with environment variables), but it's helpful if they also have `title`, `timestamp`, and `source` fields. Additional fields can be included as well, which will land as extra fields on the `Document` table.
Our pipeline can ingest .csv or .txt data from an input folder. These files can be nested within subfolders. To configure how input data is handled, what fields are mapped over, and how timestamps are parsed, look for configuration values starting with `GRAPHRAG_INPUT_` below. In general, CSV-based data provides the most customizability. Each CSV should at least contain a `text` field (which can be mapped with environment variables), but it's helpful if they also have `title`, `timestamp`, and `source` fields. Additional fields can be included as well, which will land as extra fields on the `Document` table.

## Base LLM Settings

Expand Down
60 changes: 42 additions & 18 deletions graphrag/index/run/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from graphrag.index.typing import PipelineRunResult

# Register all verbs
from graphrag.index.update.dataframes import get_delta_docs, update_dataframe_outputs
from graphrag.index.verbs import * # noqa
from graphrag.index.workflows import (
VerbDefinitions,
Expand Down Expand Up @@ -111,9 +112,6 @@ async def run_pipeline_with_config(
else await _create_input(config.input, progress_reporter, root_dir)
)

if is_update_run:
# TODO: Filter dataset to only include new data (this should be done in the input module)
pass
post_process_steps = input_post_process_steps or _create_postprocess_steps(
config.input
)
Expand All @@ -123,21 +121,47 @@ async def run_pipeline_with_config(
msg = "No dataset provided!"
raise ValueError(msg)

async for table in run_pipeline(
workflows=workflows,
dataset=dataset,
storage=storage,
cache=cache,
callbacks=callbacks,
input_post_process_steps=post_process_steps,
memory_profile=memory_profile,
additional_verbs=additional_verbs,
additional_workflows=additional_workflows,
progress_reporter=progress_reporter,
emit=emit,
is_resume_run=is_resume_run,
):
yield table
if is_update_run:
delta_dataset = await get_delta_docs(dataset, storage)

delta_storage = storage.child("delta")

# Run the pipeline on the new documents
tables_dict = {}
async for table in run_pipeline(
workflows=workflows,
dataset=delta_dataset.new_inputs,
storage=delta_storage,
cache=cache,
callbacks=callbacks,
input_post_process_steps=post_process_steps,
memory_profile=memory_profile,
additional_verbs=additional_verbs,
additional_workflows=additional_workflows,
progress_reporter=progress_reporter,
emit=emit,
is_resume_run=False,
):
tables_dict[table.workflow] = table.result

await update_dataframe_outputs(tables_dict, storage)

else:
async for table in run_pipeline(
workflows=workflows,
dataset=dataset,
storage=storage,
cache=cache,
callbacks=callbacks,
input_post_process_steps=post_process_steps,
memory_profile=memory_profile,
additional_verbs=additional_verbs,
additional_workflows=additional_workflows,
progress_reporter=progress_reporter,
emit=emit,
is_resume_run=is_resume_run,
):
yield table


async def run_pipeline(
Expand Down
4 changes: 4 additions & 0 deletions graphrag/index/update/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

"""Incremental Indexing main module definition."""
192 changes: 192 additions & 0 deletions graphrag/index/update/dataframes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

"""Dataframe operations and utils for Incremental Indexing."""

import os
from dataclasses import dataclass

import numpy as np
import pandas as pd

from graphrag.index.storage.typing import PipelineStorage
from graphrag.utils.storage import _load_table_from_storage

mergeable_outputs = [
"create_final_documents",
"create_final_entities",
"create_final_relationships",
]


@dataclass
class InputDelta:
"""Dataclass to hold the input delta.
Attributes
----------
new_inputs : pd.DataFrame
The new inputs.
deleted_inputs : pd.DataFrame
The deleted inputs.
"""

new_inputs: pd.DataFrame
deleted_inputs: pd.DataFrame


async def get_delta_docs(
input_dataset: pd.DataFrame, storage: PipelineStorage
) -> InputDelta:
"""Get the delta between the input dataset and the final documents.
Parameters
----------
input_dataset : pd.DataFrame
The input dataset.
storage : PipelineStorage
The Pipeline storage.
Returns
-------
InputDelta
The input delta. With new inputs and deleted inputs.
"""
final_docs = await _load_table_from_storage(
"create_final_documents.parquet", storage
)

# Select distinct title from final docs and from dataset
previous_docs: list[str] = final_docs["title"].unique().tolist()
dataset_docs: list[str] = input_dataset["title"].unique().tolist()

# Get the new documents (using loc to ensure DataFrame)
new_docs = input_dataset.loc[~input_dataset["title"].isin(previous_docs)]

# Get the deleted documents (again using loc to ensure DataFrame)
deleted_docs = final_docs.loc[~final_docs["title"].isin(dataset_docs)]

return InputDelta(new_docs, deleted_docs)


async def update_dataframe_outputs(
dataframe_dict: dict[str, pd.DataFrame],
storage: PipelineStorage,
) -> None:
"""Update the mergeable outputs.
Parameters
----------
dataframe_dict : dict[str, pd.DataFrame]
The dictionary of dataframes.
storage : PipelineStorage
The storage used to store the dataframes.
"""
await _concat_dataframes("create_base_text_units", dataframe_dict, storage)
await _concat_dataframes("create_final_documents", dataframe_dict, storage)

old_entities = await _load_table_from_storage(
"create_final_entities.parquet", storage
)
delta_entities = dataframe_dict["create_final_entities"]

merged_entities_df, _ = _group_and_resolve_entities(old_entities, delta_entities)
# Save the updated entities back to storage
# TODO: Using _new in the mean time, to compare outputs without overwriting the original
await storage.set(
"create_final_entities_new.parquet", merged_entities_df.to_parquet()
)


async def _concat_dataframes(name, dataframe_dict, storage):
"""Concatenate the dataframes.
Parameters
----------
name : str
The name of the dataframe to concatenate.
dataframe_dict : dict[str, pd.DataFrame]
The dictionary of dataframes from a pipeline run.
storage : PipelineStorage
The storage used to store the dataframes.
"""
old_df = await _load_table_from_storage(f"{name}.parquet", storage)
delta_df = dataframe_dict[name]

# Merge the final documents
final_df = pd.concat([old_df, delta_df], copy=False)

# TODO: Using _new in the mean time, to compare outputs without overwriting the original
await storage.set(f"{name}_new.parquet", final_df.to_parquet())


def _group_and_resolve_entities(
df_a: pd.DataFrame, df_b: pd.DataFrame
) -> tuple[pd.DataFrame, dict]:
"""Group and resolve entities.
Parameters
----------
df_a : pd.DataFrame
The first dataframe.
df_b : pd.DataFrame
The second dataframe.
Returns
-------
pd.DataFrame
The resolved dataframe.
dict
The id mapping for existing entities. In the form of {df_b.id: df_a.id}.
"""
# If a name exists in A and B, make a dictionary for {B.id : A.id}
merged = df_b[["id", "name"]].merge(
df_a[["id", "name"]],
on="name",
suffixes=("_B", "_A"),
copy=False,
)
id_mapping = dict(zip(merged["id_B"], merged["id_A"], strict=True))

# Concat A and B
combined = pd.concat([df_a, df_b], copy=False)

# Group by name and resolve conflicts
aggregated = (
combined.groupby("name")
.agg({
"id": "first",
"type": "first",
"human_readable_id": "first",
"graph_embedding": "first",
"description": lambda x: os.linesep.join(x.astype(str)), # Ensure str
# Concatenate nd.array into a single list
"text_unit_ids": lambda x: ",".join(str(i) for j in x.tolist() for i in j),
# Keep only descriptions where the original value wasn't modified
"description_embedding": lambda x: x.iloc[0] if len(x) == 1 else np.nan,
})
.reset_index()
)

# Force the result into a DataFrame
resolved: pd.DataFrame = pd.DataFrame(aggregated)

# Recreate humand readable id with an autonumeric
resolved["human_readable_id"] = range(len(resolved))

# Modify column order to keep consistency
resolved = resolved.loc[
:,
[
"id",
"name",
"description",
"type",
"human_readable_id",
"graph_embedding",
"text_unit_ids",
"description_embedding",
],
]

return resolved, id_mapping
21 changes: 19 additions & 2 deletions graphrag/index/verbs/graph/layout/layout_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,25 @@ def layout_graph(
min_dist: 0.75 # Optional, The min distance to use for the umap algorithm, default: 0.75
```
"""
output_df = cast(pd.DataFrame, input.get_input())
input_df = cast(pd.DataFrame, input.get_input())
output_df = layout_graph_df(
input_df, callbacks, strategy, embeddings_column, graph_column, to, graph_to
)

return TableContainer(table=output_df)


def layout_graph_df(
input_df: pd.DataFrame,
callbacks: VerbCallbacks,
strategy: dict[str, Any],
embeddings_column: str,
graph_column: str,
to: str,
graph_to: str | None = None,
):
"""Apply a layout algorithm to a graph."""
output_df = input_df
num_items = len(output_df)
strategy_type = strategy.get("type", LayoutGraphStrategyType.umap)
strategy_args = {**strategy}
Expand Down Expand Up @@ -93,7 +110,7 @@ def layout_graph(
),
axis=1,
)
return TableContainer(table=output_df)
return output_df


def _run_layout(
Expand Down
Loading

0 comments on commit f11f5be

Please sign in to comment.