Building Data Engineering Pipelines in Python

The landing zone contains raw data, the clean zone contains clean data, and the business zone contains domain-specific data, usually related to solve business problems.

parquet文件也可以是csv,用file filename可以看到file的类型。

type(catalog["diaper_reviews"].read())可以看到file的内部结构。

Singer’s core concepts

Aim: “The open-source standard for writing scripts that move data”

Singer is a specication

  • data exchange format: JSON

  • extract and load with taps and targets => language independent

  • communicate over streams: schema (metadata) state (process metadata) record (data)

columns = ("id", "name", "age", "has_children") 

users = {(1, "Adrian", 32, False),
        (2, "Ruanne", 28, False), 
        (3, "Hillary", 29, True)}

json_schema = {
"properties": {"age": {"maximum": 130, "minimum": 1,
"type": "integer"},
        "has_children": {"type": "boolean"}, 
        "id": {"type": "integer"},
        "name": {"type": "string"}},
"$id": "http://yourdomain.com/schemas/my_user_schema.json", 
"$schema": "http://json-schema.org/draft-07/schema#"}

schema = {'properties': {
    'brand': {'type': 'string'},
    'model': {'type': 'string'},
    'price': {'type': 'number'},
    'currency': {'type': 'string'},
    'quantity': {'type': 'integer', 'minimum': 1},  
    'date': {'type': 'string', 'format': 'date'},
    'countrycode': {'type': 'string', 'pattern': "^[A-Z]{2}$"}, 
    'store_name': {'type': 'string'}}}

# Write the schema to stdout
singer.write_schema(stream_name='products', schema=schema, key_properties=[])
# Describing the data through its schema
import singer 

singer.write_schema(schema=json_schema,
    stream_name='DC_employees', 
    key_properties=["id"])
    
#Serializing JSON
import json 
json.dumps(json_schema["properties"]["age"])

with open("foo.json", mode="w") as fh:
json.dump(obj=json_schema, fp=fh) # writes the json-serialized object
# to the open file handle  

Last updated