Embeddings
In order to store the data in our vector database, we need to convert the free text into embeddings. Let's look at GitHub first. First when we are extracting data from GitHub, we will want to be careful to avoid rate limiting. We can do this by partitioning our asset into weekly chunks using a WeeklyPartitionsDefinition:
START_TIME = "2023-01-01"
weekly_partition = dg.WeeklyPartitionsDefinition(start_date=START_TIME)
We will supply that partition in the decorator of our asset, as well as an AutomationCondition to ensure that the weekly partition is updated every Monday. The rest of our asset will use the GithubResource we defined earlier and return a list of LangChain Documents:
@dg.asset(
    group_name="ingestion",
    kinds={"github"},
    partitions_def=weekly_partition,
    io_manager_key="document_io_manager",
    automation_condition=dg.AutomationCondition.on_cron("0 0 * * 1"),
    description="""
   Ingests raw GitHub issues data from the Dagster repository on a weekly basis.
   
   This asset fetches GitHub issues, including:
   - Issue title and body
   - Comments and discussion threads
   - Issue metadata (status, labels, assignees)
   - Creation and update timestamps
   
   Technical Details:
       - Runs weekly (Mondays at midnight)
       - Processes issues in weekly partitions
       - Converts issues to Document format for embedding
       - Preserves all issue metadata for search context
       
   Returns:
       List[Document]: Collection of Document objects containing issue content 
       and associated metadata for each weekly partition
   """,
)
def github_issues_raw(
    context: dg.AssetExecutionContext,
    github: GithubResource,
) -> list[Document]:
    start, end = context.partition_time_window
    context.log.info(f"Finding issues from {start} to {end}")
    issues = github.get_issues(
        start_date=start.strftime("%Y-%m-%d"), end_date=end.strftime("%Y-%m-%d")
    )
    return github.convert_issues_to_documents(issues)
The next asset will convert those Documents to vectors and upload them to Pinecone. In order to generate the embeddings, we will need an AI model. In this case, we will use OpenAI's text-embedding-3-small model to transform the text we have collected from GitHub into embeddings. Dagster provides an OpenAIResource to interact with the OpenAI client ,and we will use that to create the embeddings. This asset will also create the index in Pinecone:
@dg.asset(
    group_name="embeddings",
    kinds={"github", "openai", "pinecone"},
    partitions_def=weekly_partition,
    io_manager_key="document_io_manager",
    automation_condition=dg.AutomationCondition.any_deps_updated(),
    description="""
   Creates and stores vector embeddings for GitHub issues in Pinecone.
   
   This asset processes weekly batches of GitHub issues by:
   1. Converting issue content to OpenAI embeddings
   2. Storing embeddings and metadata in Pinecone vector database
   3. Using namespace 'dagster-github' for unified GitHub content storage
   
   Dependencies:
       - github_issues_raw: Raw issue documents from weekly partition
       
   Technical Details:
       - Uses OpenAI's text-embedding-3-small model
       - Embedding dimension: 1536
       - Stores in Pinecone index: 'dagster-knowledge'
       - Preserves metadata like issue status, labels, and timestamps
       - Processes issues in weekly batches
       
   Vector Storage:
       - Each vector contains issue content embedding and metadata
       - Uses auto-generated sequential IDs
       - Stored in 'dagster-github' namespace for consolidated search
       
   Returns:
       MaterializeResult with metadata about number of issues processed
   """,
)
def github_issues_embeddings(
    context: dg.AssetExecutionContext,
    openai: OpenAIResource,
    pinecone: PineconeResource,
    github_issues_raw: list[Document],
) -> dg.MaterializeResult:
    # Create index if doesn't exist
    pinecone.create_index("dagster-knowledge", dimension=1536)
    index, namespace_kwargs = pinecone.get_index("dagster-knowledge", namespace="dagster-github")
    texts = [doc.page_content for doc in github_issues_raw]
    with openai.get_client(context) as client:
        embeddings = [
            item.embedding
            for item in client.embeddings.create(model="text-embedding-3-small", input=texts).data
        ]
    # Prepare metadata
    metadata = [
        {k: v for k, v in doc.metadata.items() if isinstance(v, (str, int, float, bool))}
        for doc in github_issues_raw
    ]
    # Upsert to Pinecone with namespace
    index.upsert(
        vectors=zip(
            [str(i) for i in range(len(texts))],  # IDs
            embeddings,
            metadata,
        ),
        **namespace_kwargs,  # Include namespace parameters
    )
    return dg.MaterializeResult(
        metadata={
            "number_of_issues": len(github_issues_raw),
        }
    )
This process will be very similar for the GitHub discussions content.
Custom IO Manager
Looking at the code, you may have noticed the document_io_manager. Because LangChain Documents are a special object type, we need to do some work to serialize and deserialize the data. I/O Managers are responsible for handling the inputs and outputs of assets and how the data is persisted. This I/O manager will use the local file system to save the output of assets returning Documents as JSON files. It will then read those JSON files back into Documents in assets that take in those inputs:
class DocumentIOManager(dg.IOManager):
    def __init__(self, base_dir):
        self.base_dir = base_dir
        os.makedirs(base_dir, exist_ok=True)
    def handle_output(self, context, obj):
        # Convert documents to simple dicts
        file_path = os.path.join(self.base_dir, f"{context.asset_key.path[-1]}.json")
        # Convert documents to simple dicts
        serialized_docs = [
            {"page_content": doc.page_content, "metadata": doc.metadata} for doc in obj
        ]
        # Save as JSON
        with open(file_path, "w") as f:
            json.dump(serialized_docs, f)
    def load_input(self, context):
        file_path = os.path.join(self.base_dir, f"{context.asset_key.path[-1]}.json")
        if not os.path.exists(file_path):
            return []
        # Load and reconstruct Documents
        with open(file_path) as f:
            data = json.load(f)
        return [
            Document(page_content=doc["page_content"], metadata=doc["metadata"]) for doc in data
        ]
This I/O manager will be attached to the Definitions of the project, which also contains our assets and resources:
defs = dg.Definitions(
    assets=[
        docs_embedding,
        docs_scrape_raw,
        github_discussions_embeddings,
        github_discussions_raw,
        github_issues_embeddings,
        github_issues_raw,
        query,
    ],
    resources={
        "github": github_resource,
        "scraper": scraper_resource,
        "pinecone": pinecone_resource,
        "openai": OpenAIResource(api_key=dg.EnvVar("OPENAI_API_KEY")),
        "document_io_manager": document_io_manager.configured({"base_dir": "data/documents"}),
    },
)
Scraping embeddings
The assets for the documentation scraping will behave similar to the GitHub assets. We will not partition by date like Github, so we can leave out that out of the asset. But like the GitHub assets, our ingestion asset will return a collection of Documents that will be handled by the I/O manager. This asset will also include the AutomationCondition to update data on the same cadence as our GitHub source.
The asset that generates the embeddings with the documentation site will need one additional change. Because the content of the documentation pages is so large, we need to split data into chunks. The split_text function ensures that we split the text into equal length chunks. We also want to keep similar chunks together and associated with the page they were on so we will hash the index of the URL to ensure data stays together. correctly Once the data is chunked, it can be batched and sent to Pinecone:
    # Create vectors with metadata
    vectors = []
    for i, embedding in enumerate(all_embeddings):
        doc_idx = chunk_to_doc[i]
        doc = docs_scrape_raw[doc_idx]
        meta = {k: v for k, v in doc.metadata.items() if isinstance(v, (str, int, float, bool))}
        meta["chunk_index"] = i
        doc_id = f"{hashlib.md5(doc.metadata['source'].encode()).hexdigest()}_{i}"
        vectors.append((doc_id, embedding, meta))
        # Upsert when batch is full or at end
        if len(vectors) >= PINECONE_BATCH_SIZE or i == len(all_embeddings) - 1:
            index.upsert(vectors=vectors, **namespace_kwargs)
            vectors = []
            time.sleep(1)
Dagster is now set to continuously ingest data from all of our configured sources and populate the Pinecone index. We have now completed the main half of our RAG system. Next we need to ensure we can pull relevant information when it is answering questions. We will add one final asset to query our system.
Next steps
- Continue this example with retrieval