Skip to content

Commit

Permalink
Week 4 Challenge (#60)
Browse files Browse the repository at this point in the history
* Week 4 Challenge
  • Loading branch information
dehume authored Sep 7, 2022
1 parent e62d4ae commit 750ee61
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 1 deletion.
1 change: 1 addition & 0 deletions week_4/project/dbt_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DBT_PROJECT_PATH = "/opt/dagster/dagster_home/project/dbt_test_project"
22 changes: 22 additions & 0 deletions week_4/project/dbt_test_project/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: 'test_dbt'
version: '1.0.0'
config-version: 2

profile: 'test_dbt'

source-paths: ["models"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
data-paths: ["data"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

target-path: "target"
clean-targets:
- "target"
- "dbt_modules"

models:
test_dbt:
example:
+materialized: view
5 changes: 5 additions & 0 deletions week_4/project/dbt_test_project/models/my_first_dbt_model.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{{ config(materialized='table') }}


SELECT *
FROM {{ source('postgresql', 'dbt_table') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{{ config(materialized='table') }}


SELECT column_2 AS my_column
FROM {{ ref('my_first_dbt_model') }}
22 changes: 22 additions & 0 deletions week_4/project/dbt_test_project/models/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version: 2

models:
- name: my_first_dbt_model
description: "A starter dbt model"
columns:
- name: column_1
tests:
- not_null
- name: column_2
tests:
- not_null
- name: column_3
tests:
- not_null

- name: my_second_dbt_model
description: "A starter dbt model"
columns:
- name: my_column
tests:
- not_null
7 changes: 7 additions & 0 deletions week_4/project/dbt_test_project/models/sources.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
version: 2

sources:
- name: postgresql
schema: analytics
tables:
- name: dbt_table
12 changes: 12 additions & 0 deletions week_4/project/dbt_test_project/profiles.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
test_dbt:
target: test
outputs:
test:
type: postgres
host: postgresql
user: postgres_user
password: postgres_password
port: 5432
dbname: postgres_db
schema: analytics
threads: 4
11 changes: 10 additions & 1 deletion week_4/project/repo.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
from dagster import repository
from dagster import repository, with_resources
from dagster_dbt import dbt_cli_resource
from project.dbt_config import DBT_PROJECT_PATH
from project.resources import postgres_resource
from project.week_4 import (
get_s3_data_docker,
process_data_docker,
put_redis_data_docker,
)
from project.week_4_challenge import create_dbt_table, insert_dbt_data


@repository
def repo():
return [get_s3_data_docker, process_data_docker, put_redis_data_docker]


@repository
def assets_dbt():
pass
36 changes: 36 additions & 0 deletions week_4/project/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,26 @@

import boto3
import redis
import sqlalchemy
from dagster import Field, Int, String, resource


class Postgres:
def __init__(self, host: str, user: str, password: str, database: str):
self.host = host
self.user = user
self.password = password
self.database = database
self._engine = sqlalchemy.create_engine(self.uri)

@property
def uri(self):
return f"postgresql://{self.user}:{self.password}@{self.host}/{self.database}"

def execute_query(self, query: str):
self._engine.execute(query)


class S3:
def __init__(self, bucket: str, access_key: str, secret_key: str, endpoint_url: str = None):
self.bucket = bucket
Expand Down Expand Up @@ -39,6 +56,25 @@ def put_data(self, name: str, value: str):
self.client.set(name, value)


@resource(
config_schema={
"host": Field(String),
"user": Field(String),
"password": Field(String),
"database": Field(String),
},
description="A resource that can run Postgres",
)
def postgres_resource(context) -> Postgres:
"""This resource defines a Postgres client"""
return Postgres(
host=context.resource_config["host"],
user=context.resource_config["user"],
password=context.resource_config["password"],
database=context.resource_config["database"],
)


@resource
def mock_s3_resource(context):
stocks = [
Expand Down
36 changes: 36 additions & 0 deletions week_4/project/week_4_challenge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from random import randint

from dagster import AssetIn, asset
from dagster_dbt import load_assets_from_dbt_project
from project.dbt_config import DBT_PROJECT_PATH


@asset(
required_resource_keys={"database"},
op_tags={"kind": "postgres"},
)
def create_dbt_table(context):
sql = "CREATE SCHEMA IF NOT EXISTS analytics;"
context.resources.database.execute_query(sql)
sql = "CREATE TABLE IF NOT EXISTS analytics.dbt_table (column_1 VARCHAR(100), column_2 VARCHAR(100), column_3 VARCHAR(100));"
context.resources.database.execute_query(sql)


@asset(
required_resource_keys={"database"},
op_tags={"kind": "postgres"},
)
def insert_dbt_data(context, create_dbt_table):
sql = "INSERT INTO analytics.dbt_table (column_1, column_2, column_3) VALUES ('A', 'B', 'C');"

number_of_rows = randint(1, 10)
for _ in range(number_of_rows):
context.resources.database.execute_query(sql)
context.log.info("Inserted a row")

context.log.info("Batch inserted")


@asset
def final(context):
context.log.info("Week 4 Challenge completed")

0 comments on commit 750ee61

Please sign in to comment.