Skip to content

Commit

Permalink
incorporate wandb functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
lfunderburk committed Oct 27, 2024
1 parent 3cb5369 commit 799523d
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 39 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,4 @@ uci-marketing-data
*.duckdb*
*.bkp
*.dtmp
ch6/wandb/*
187 changes: 156 additions & 31 deletions ch6/adding-observability/indexing_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,37 @@
import wandb
from haystack.components.preprocessors import DocumentCleaner
from haystack.components.embedders import OpenAITextEmbedder, OpenAIDocumentEmbedder
from haystack import Pipeline
from haystack.components.embedders import OpenAITextEmbedder
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter
from haystack.components.writers import DocumentWriter
from haystack.document_stores.types import DuplicatePolicy
from haystack.utils import Secret
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore

from haystack import component, Document
from typing import Any, Dict, List, Union
from haystack.dataclasses import ByteStream

import wandb
from dotenv import load_dotenv
import os
import time
import json
import logging

import re
from bs4 import BeautifulSoup
from pathlib import Path

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

load_dotenv(".env")

# Initialize Weights and Biases
os.environ["WANDB_API_KEY"] = os.getenv("WANDB_API_KEY")
wandb.init(project="haystack-indexing", config={"task": "Indexing Pipeline"})

load_dotenv(".env")
# Load OpenAI API key
open_ai_key = os.environ.get("OPENAI_API_KEY")

# Initialize ElasticsearchDocumentStore and embedder
Expand All @@ -23,36 +43,141 @@
data = [json.loads(line) for line in file]
wandb.config.update({"dataset_size": len(data)})

# Indexing pipeline
def indexing_pipeline(data):
documents = []
for entry in data:
start_time = time.time()
def read_jsonl_file(file_path):
"""
Reads a JSONL (JSON Lines) file and returns a list of dictionaries representing each valid JSON object.
Lines with JSON decoding errors are skipped.
:param file_path: The path to the JSONL file.
:return: A list of dictionaries, each representing a parsed JSON object.
"""
data = []

try:
with open(file_path, 'r') as file:
for line in file:
try:
# Attempt to load the JSON data from the current line
json_data = json.loads(line)
data.append(json_data)
except json.JSONDecodeError as e:
# Log an error message for any lines that can't be decoded
logger.error(f"Error decoding JSON on line: {line[:30]}... - {e}")
except FileNotFoundError as e:
logger.error(f"File not found: {e}")

return data


# Embed text and log embedding time
embedding = text_embedder.embed({"text": entry["content"]})
end_time = time.time()
@component
class BenzingaNews:

@component.output_types(documents=List[Document])
def run(self, sources: Dict[str, Any]) -> None:
logger.info("Starting BenzingaNews.run with sources")
documents = []
try:
for source in sources:
logger.debug(f"Processing source: {source.get('headline', 'Unknown headline')}")
for key in source:
if isinstance(source[key], str):
source[key] = self.clean_text(source[key])

if source['content'] == "":
logger.warning(f"Skipping source due to empty content: {source.get('headline', 'Unknown headline')}")
continue

# Create a Document with the cleaned content and metadata
content = source['content']
document = Document(content=content, meta=source)
documents.append(document)

logger.info(f"Successfully processed {len(documents)} documents.")

# Log individual document metrics
wandb.log({
"embedding_time": end_time - start_time,
"document_length": len(entry["content"]),
"embedding_status": "success" if embedding else "failure"
})
except Exception as e:
logger.error(f"Error during BenzingaNews.run: {e}")

# Create document and store in Elasticsearch
document = {
"content": entry["content"],
"meta": entry
}
document_store.write_documents([document])

# Add to document list for summary logging
documents.append(document)
return {"documents": documents}

def clean_text(self, text):
logger.debug("Cleaning text content.")
try:
# Remove HTML tags using BeautifulSoup
soup = BeautifulSoup(text, "html.parser")
text = soup.get_text()
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text).strip()
logger.debug("Text cleaned successfully.")
except Exception as e:
logger.error(f"Error during text cleaning: {e}")
raise
return text


@component
class BenzingaEmbeder:

wandb.log({"total_documents_indexed": len(documents)})
def __init__(self):
logger.info("Initializing BenzingaEmbeder pipeline.")
try:
get_news = BenzingaNews()
document_store = ElasticsearchDocumentStore(embedding_similarity_function="cosine", hosts="http://localhost:9200")
document_cleaner = DocumentCleaner(
remove_empty_lines=True,
remove_extra_whitespaces=True,
remove_repeated_substrings=False
)
document_splitter = DocumentSplitter(split_by="passage", split_length=5)
document_writer = DocumentWriter(document_store=document_store,
policy=DuplicatePolicy.OVERWRITE)
embedding = OpenAIDocumentEmbedder(api_key=Secret.from_token(open_ai_key))

self.pipeline = Pipeline()
self.pipeline.add_component("get_news", get_news)
self.pipeline.add_component("document_cleaner", document_cleaner)
self.pipeline.add_component("document_splitter", document_splitter)
self.pipeline.add_component("embedding", embedding)
self.pipeline.add_component("document_writer", document_writer)

self.pipeline.connect("get_news", "document_cleaner")
self.pipeline.connect("document_cleaner", "document_splitter")
self.pipeline.connect("document_splitter", "embedding")
self.pipeline.connect("embedding", "document_writer")

# Run the indexing pipeline
indexing_pipeline(data)
logger.info("Pipeline initialized successfully.")
except Exception as e:
logger.error(f"Error during BenzingaEmbeder initialization: {e}")
raise

@component.output_types(documents=List[Document])
def run(self, event: List[Union[str, Path, ByteStream]]):
logger.info(f"Running BenzingaEmbeder with event: {event}")
try:
documents = self.pipeline.run({"get_news": {"sources": [event]}})
self.pipeline.draw("benzinga_pipeline.png")
logger.info("Pipeline executed successfully, drawing pipeline graph.")
return documents
except Exception as e:
logger.error(f"Error during pipeline execution: {e}")
raise

if __name__ == "__main__":
document_embedder = BenzingaEmbeder()
data = read_jsonl_file("./news_out.jsonl")

wandb.finish()
for ite in data:
try:
start_time = time.time()
documents = document_embedder.run(ite)
end_time = time.time()
# Log individual document metrics
wandb.log({
"embedding_time": end_time - start_time,
"embedding_token_usage": documents['embedding']['meta']['usage']['total_tokens'],
})

print(documents['embedding']['meta']['usage']['total_tokens'])
except Exception as e:
logger.error(f"Error during document embedding: {e}")
wandb.log({"embedding_status": 0})
wandb.finish()
Binary file added ch6/benzinga_pipeline.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 2 additions & 8 deletions ch6/case-I-q-and-a-dataset/indexingpipeline.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
from haystack.components.preprocessors import DocumentCleaner
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter
from haystack.components.embedders import OpenAIDocumentEmbedder
from haystack import Pipeline
from haystack.components.embedders import OpenAIDocumentEmbedder
from haystack.components.preprocessors import DocumentCleaner
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter
from haystack.document_stores.types import DuplicatePolicy
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.utils import Secret
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore


from haystack import component, Document
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Union
from haystack.dataclasses import ByteStream

import json
Expand All @@ -31,8 +27,6 @@
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

import json

def read_jsonl_file(file_path):
"""
Reads a JSONL (JSON Lines) file and returns a list of dictionaries representing each valid JSON object.
Expand Down
File renamed without changes.

0 comments on commit 799523d

Please sign in to comment.