Skip to content

Commit

Permalink
add observability query pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
lfunderburk committed Oct 27, 2024
1 parent 799523d commit e7709e6
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 61 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,4 @@ uci-marketing-data
*.bkp
*.dtmp
ch6/wandb/*
ch6/adding-observability/wandb/*
2 changes: 0 additions & 2 deletions ch6/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ Exercises for Chapter 6. "Setting up a reproducible project: question and answer

* Optimizing performance through a feedback loop

* Bringing all pieces together

## Technical requirements

To follow along and successfully implement the concepts discussed in this chapter, you need to set up a proper development environment. The main technical requirement is Python, which will be used to manage the dependencies and run the necessary code for the project. Specifically, you need:
Expand Down
10 changes: 1 addition & 9 deletions ch6/adding-observability/indexing_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,6 @@
# Load OpenAI API key
open_ai_key = os.environ.get("OPENAI_API_KEY")

# Initialize ElasticsearchDocumentStore and embedder
document_store = ElasticsearchDocumentStore(hosts="http://localhost:9200")
text_embedder = OpenAITextEmbedder(api_key=Secret.from_token(open_ai_key))

# Load dataset and log dataset metadata
with open("news_out.jsonl", 'r') as file:
data = [json.loads(line) for line in file]
wandb.config.update({"dataset_size": len(data)})

def read_jsonl_file(file_path):
"""
Reads a JSONL (JSON Lines) file and returns a list of dictionaries representing each valid JSON object.
Expand Down Expand Up @@ -164,6 +155,7 @@ def run(self, event: List[Union[str, Path, ByteStream]]):
if __name__ == "__main__":
document_embedder = BenzingaEmbeder()
data = read_jsonl_file("./news_out.jsonl")
wandb.config.update({"dataset_size": len(data)})

for ite in data:
try:
Expand Down
Binary file added ch6/adding-observability/query_pipeline.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
87 changes: 37 additions & 50 deletions ch6/adding-observability/query_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,31 @@
import wandb
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
from haystack import Pipeline
from haystack.components.embedders import OpenAITextEmbedder
from haystack.components.embedders import OpenAITextEmbedder
from haystack.utils import Secret
from haystack_integrations.components.retrievers.elasticsearch import ElasticsearchEmbeddingRetriever
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
from haystack.utils import Secret

from dotenv import load_dotenv
import os
import wandb
import time

# Initialize Weights and Biases
wandb.init(project="haystack-querying", config={"task": "Querying Pipeline"})

load_dotenv(".env")
open_ai_key = os.environ.get("OPENAI_API_KEY")
os.environ["WANDB_API_KEY"] = os.getenv("WANDB_API_KEY")
wandb.init(project="haystack-querying", config={"task": "Query Pipeline"})

# Initialize components
document_store = ElasticsearchDocumentStore(hosts="http://localhost:9200")
# Initialize ElasticsearchDocumentStore
document_store = ElasticsearchDocumentStore(hosts = "http://localhost:9200")

# Initialize a text embedder to create an embedding for the user query.
text_embedder = OpenAITextEmbedder(api_key=Secret.from_token(open_ai_key))

# Initialize retriever
retriever = ElasticsearchEmbeddingRetriever(document_store=document_store)
generator = OpenAIGenerator(model="gpt-4o-mini", api_key=open_ai_key)

# Define template
# Define the template prompt
template = """
Given the following information, answer the question.
Context:
Expand All @@ -34,51 +37,35 @@
"""
prompt_builder = PromptBuilder(template=template)

# Build and connect components in the pipeline
# Initialize Generator (Replace 'your-api-key' with your OpenAI API Key)
generator = OpenAIGenerator(model="gpt-4o-mini")
generator.api_key = open_ai_key

# Build the Pipeline
query_pipeline = Pipeline()
query_pipeline.add_component("text_embedder", text_embedder)
query_pipeline.add_component("retriever", retriever)
query_pipeline.add_component("prompt_builder", prompt_builder)
query_pipeline.add_component("generator", generator)
query_pipeline.add_component("llm", generator)
query_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
query_pipeline.connect("retriever", "prompt_builder.documents")
query_pipeline.connect("prompt_builder", "generator")
query_pipeline.connect("prompt_builder", "llm")

# Run a query through the pipeline
def query_pipeline_run(question):
start_time = time.time()

# Embed question
embedding_start = time.time()
question_embedding = text_embedder.embed({"text": question})
embedding_end = time.time()

# Retrieve documents and measure retrieval time
retrieval_start = time.time()
retrieved_docs = retriever.retrieve(question_embedding)
retrieval_end = time.time()

# Generate response and log metrics
prompt = prompt_builder.build({"documents": retrieved_docs, "question": question})
generation_start = time.time()
response = generator.generate({"prompt": prompt})
generation_end = time.time()

# Log query pipeline metrics to Weights and Biases
wandb.log({
"question_length": len(question),
"embedding_time": embedding_end - embedding_start,
"retrieval_time": retrieval_end - retrieval_start,
"generation_time": generation_end - generation_start,
"retrieved_documents_count": len(retrieved_docs),
"response_accuracy": "placeholder_accuracy_metric" # Replace with your accuracy measurement if available
})

return response
if __name__ == "__main__":
query_pipeline.draw(path="query_pipeline.png")

# Test with a question
question = "Tell me about the latest news in the dataset."
response = query_pipeline_run(question)
print(response["replies"][0])

wandb.finish()
# Running the pipeline
question = "Tell me about what you know"
start_time = time.time()
response = query_pipeline.run({"text_embedder": {"text": question}, "prompt_builder": {"question": question}})
end_time = time.time()
wandb.log({
"query_time": end_time - start_time,
"embedding_token_usage": response['text_embedder']['meta']['usage']['total_tokens'],
"llm_prompt_token_usage": response['llm']['meta'][0]['usage']['prompt_tokens'],
"llm_completion_token_usage": response['llm']['meta'][0]['usage']['completion_tokens'],
"llm_total_token_usage": response['llm']['meta'][0]['usage']['total_tokens']
})

print(response["llm"]["replies"][0])
Binary file added ch6/query_pipeline.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit e7709e6

Please sign in to comment.