Um die Cloud -Industrie aufgrund ihres frühen Markteintritts, ihrer robusten Technologie und ihres umfassenden Serviceangebots mit einem Anteil von 32% zu führen. Viele Benutzer sind jedoch eine Herausforderung für die Navigation, und diese Unzufriedenheit führt dazu, dass mehr Unternehmen und Organisationen seine Konkurrenten Microsoft Azure und Google Cloud Platform bevorzugen.

Trotz seiner steileren Lernkurve und weniger intuitiver Schnittstelle bleibt AWS aufgrund seiner Zuverlässigkeit, Hybrid -Cloud und maximalen Serviceoptionen der Prime -Cloud -Service. Noch wichtiger ist, dass die Auswahl der richtigen Strategien die Konfigurationskomplexität erheblich verringern, Workflows optimieren und die Leistung steigern kann.

In diesem Artikel werde ich eine effiziente Möglichkeit einführen, eine vollständige ETL -Pipeline mit Orchestrierung auf AWS einzurichten, die auf meiner eigenen Erfahrung basiert. Es gibt Ihnen auch eine aktualisierte Ansicht über die Datenerzeugung mit AWS oder Sie fühlen sich bei der Durchführung von Konfiguration weniger Probleme, wenn dies zum ersten Mal für bestimmte Aufgaben AWS verwendet wird.

Strategie zur Gestaltung einer effizienten Datenpipeline

AWS hat das umfassendste Ökosystem mit seinen enormen Dienstleistungen. Um ein produktionsbereites Information Warehouse auf AWS zu erstellen, erfordert zumindest die folgenden Dienste:

  • IAM – Obwohl dieser Service nicht in einen Teil des Workflows enthalten ist, ist dies die Grundlage für den Zugriff auf alle anderen Dienste.
  • AWS S3 – Datenseespeicher
  • AWS -Kleber – ETL -Verarbeitung
  • Amazon RedShift – Information Warehouse
  • CloudWatch – Überwachung und Protokollierung

Sie benötigen außerdem Zugriff auf den Luftstrom, wenn Sie komplexere Abhängigkeiten planen und erweiterte Wiederholungen hinsichtlich der Fehlerbehandlung durchführen müssen, obwohl Rotverschiebung einige grundlegende Cron -Jobs erledigen kann.

Um Ihre Arbeit zu erleichtern, empfehle ich dringend, eine IDE zu installieren (Visible Studio Code oder Pycharm und natürlich können Sie Ihre eigene Lieblings -IDE auswählen). Eine IDE verbessert Ihre Effizienz für komplexe Pythoncode, lokale Exams/Debugging, Versionskontrollintegration und Teamzusammenarbeit dramatisch. In der nächsten Sitzung werde ich Schritt -für -Schritt -Konfigurationen bereitstellen.

Erstes Setup

Hier sind die Schritte der ersten Konfigurationen:

  • Starten Sie eine virtuelle Umgebung in Ihrer IDE
  • Installieren Sie Abhängigkeiten – Grundsätzlich müssen wir die Bibliotheken installieren, die später verwendet werden.
pip set up apache-airflow==2.7.0 boto3 pandas pyspark sqlalchemy
  • Installieren Sie die AWS CLI – Mit diesem Schritt können Sie Skripte schreiben, um verschiedene AWS -Vorgänge zu automatisieren, und die Verwaltung von AWS -Ressourcen effizienter gestaltet.
  • AWS -Konfiguration – Geben Sie diese IAM -Benutzer -Anmeldeinformationen ein, wenn Sie aufgefordert werden:
    • AWS Entry Key ID: Von Ihrem IAM -Benutzer.
    • AWS Secret Entry Key: Von Ihrem IAM -Benutzer.
    • Standardregion: us-east-1 (oder Ihre bevorzugte Area)
    • Standardausgabeformat: json.
  • Luftstrom integrieren – hier sind die Schritte:
    • Luftstrom initialisieren
    • Erstellen Sie DAG -Dateien im Luftstrom
    • Führen Sie den Webserver unter http: // localhost: 8080 aus (Login: admin/admin).
    • Öffnen Sie eine weitere Registerkarte „Terminal“ und starten Sie den Scheduler
export AIRFLOW_HOME=$(pwd)/airflow
airflow db init
airflow customers create 
  --username admin 
  --password admin 
  --firstname Admin 
  --lastname Person 
  --role Admin 
  --email (e mail protected)
#Initialize Airflow
airflow webserver --port 8080 ##run the webserver
airflow scheduler #begin the scheduler

Entwicklungsworkflow: COVID-19-Datenfallstudie

Ich verwende den öffentlichen Covid-19-Datensatz von JHU (CC nach 4.0 lizenziert) zum Demonstrationszweck. Sie können auf Daten verweisen HierAnwesend

Das folgende Diagramm zeigt den Workflow von der Aufnahme von Daten bis hin zum Laden von Daten bis hin zu Rotverschiebungstabellen in der Entwicklungsumgebung.

Entwicklungsworkflow vom Autor erstellt

Datenaufnahme

Im ersten Schritt der Datenaufnahme in AWS S3 habe ich Daten verarbeitet, indem ich sie zum langen Format schmelze und das Datumsformat konvertierte. Ich habe die Daten im Parkettformat gespeichert, um die Speicherungseffizienz zu verbessern, die Abfrageleistung zu verbessern und die Speicherkosten zu senken. Der Code für diesen Schritt ist wie unten:

import pandas as pd
from datetime import datetime
import os
import boto3
import sys

def process_covid_data():
    attempt:
        # Load uncooked knowledge
        url = "https://github.com/CSSEGISandData/COVID-19/uncooked/grasp/archived_data/archived_time_series/time_series_19-covid-Confirmed_archived_0325.csv"
        df = pd.read_csv(url)
        
        # --- Information Processing ---
        # 1. Soften to lengthy format
        df = df.soften(
            id_vars=('Province/State', 'Nation/Area', 'Lat', 'Lengthy'), 
            var_name='date_str',
            value_name='confirmed_cases'
        )
        
        # 2. Convert dates (JHU format: MM/DD/YY)
        df('date') = pd.to_datetime(
            df('date_str'), 
            format='%m/%d/%y',
            errors='coerce'
        ).dropna()
        
        # 3. Save as partitioned Parquet
        output_dir = "covid_processed"
        df.to_parquet(
            output_dir,
            engine='pyarrow',
            compression='snappy',
            partition_cols=('date')
        )
        
        # 4. Add to S3
        s3 = boto3.shopper('s3')
        total_files = 0
        
        for root, _, recordsdata in os.stroll(output_dir):
            for file in recordsdata:
                local_path = os.path.be part of(root, file)
                s3_path = os.path.be part of(
                    'uncooked/covid/',
                    os.path.relpath(local_path, output_dir)
                )
                s3.upload_file(
                    Filename=local_path,
                    Bucket='my-dev-bucket',
                    Key=s3_path
                )
            total_files += len(recordsdata)
        
        print(f"Efficiently processed and uploaded {total_files} Parquet recordsdata")
        print(f"Information covers from {df('date').min()} to {df('date').max()}")
        return True

    besides Exception as e:
        print(f"Error: {str(e)}", file=sys.stderr)
        return False

if __name__ == "__main__":
    process_covid_data()

Nachdem Sie den Python -Code ausgeführt haben, sollten Sie in der Lage sein, die Parquetdateien in den S3 -Eimer unter dem Ordner von ‚RAW/Covid/‘ zu sehen.

Screenshot vom Autor

ETL -Pipeline -Entwicklung

AWS -Kleber wird hauptsächlich für die Entwicklung von ETL -Pipeline verwendet. Obwohl es auch für die Aufnahme von Daten verwendet werden kann, auch wenn die Daten nicht auf S3 geladen wurden, liegt seine Stärke in der Verarbeitung von Daten, sobald sie für Information Warehousing -Zwecke in S3 sind. Hier finden Sie PYSPARK -Skripte für die Datenumwandlung:

# transform_covid.py
from awsglue.context import GlueContext
from pyspark.sql.features import *

glueContext = GlueContext(SparkContext.getOrCreate())
df = glueContext.create_dynamic_frame.from_options(
    "s3",
    {"paths": ("s3://my-dev-bucket/uncooked/covid/")},
    format="parquet"
).toDF()

# Add transformations right here
df_transformed = df.withColumn("load_date", current_date())

# Write to processed zone
df_transformed.write.parquet(
    "s3://my-dev-bucket/processed/covid/",
    mode="overwrite"
)
Screenshot vom Autor

Der nächste Schritt besteht darin, Daten in Rotverschiebung zu laden. Klicken Sie in Redshift -Konsole auf der linken Seite auf „Question Editor Q2“. Sie können Ihren SQL -Code bearbeiten und die Redshift -Kopie beenden.

# Create a desk covid_data in dev schema
CREATE TABLE dev.covid_data (
    "Province/State" VARCHAR(100),  
    "Nation/Area" VARCHAR(100),
    "Lat" FLOAT8,
    "Lengthy" FLOAT8,
    date_str VARCHAR(100),
    confirmed_cases FLOAT8  
)
DISTKEY("Nation/Area")   
SORTKEY(date_str);
# COPY knowledge to redshift
COPY dev.covid_data (
    "Province/State",
    "Nation/Area",
    "Lat",
    "Lengthy",
    date_str,
    confirmed_cases
)
FROM 's3://my-dev-bucket/processed/covid/'
IAM_ROLE 'arn:aws:iam::your-account-id:function/RedshiftLoadRole'
REGION 'your-region'
FORMAT PARQUET;

Dann sehen Sie die Daten erfolgreich in das Information Warehouse hochgeladen.

Screenshot vom Autor

Pipeline -Automatisierung

Der einfachste Weg, um Ihre Datenpipeline zu automatisieren, besteht darin, Jobs unter RedShift Question Editor V2 durch Erstellen eines gespeicherten Verfahrens zu planen (ich habe eine detailliertere Einführung in die SQL -gespeicherte Prozedur. Dieser Artikel).

CREATE OR REPLACE PROCEDURE dev.run_covid_etl()
AS $$
BEGIN
  TRUNCATE TABLE dev.covid_data;
  COPY dev.covid_data 
  FROM 's3://simba-dev-bucket/uncooked/covid'
  IAM_ROLE 'arn:aws:iam::your-account-id:function/RedshiftLoadRole'
  REGION 'your-region'
  FORMAT PARQUET;
END;
$$ LANGUAGE plpgsql;
Screenshot vom Autor

Alternativ können Sie Luftstrom für geplante Jobs ausführen.

from datetime import datetime
from airflow import DAG
from airflow.suppliers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator

default_args = {
    'proprietor': 'data_team',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 2
}

with DAG(
    'redshift_etl_dev',
    default_args=default_args,
    schedule_interval='@every day',
    catchup=False
) as dag:

    run_etl = RedshiftSQLOperator(
        task_id='run_covid_etl',
        redshift_conn_id='redshift_dev',
        sql='CALL dev.run_covid_etl()',
    )

Produktionsworkflow

Die Luftstrom -Dag ist leistungsstark, um Ihre gesamte ETL -Pipeline zu orchestrieren, wenn es viele Abhängigkeiten gibt, und es ist auch eine gute Praxis in der Produktionsumgebung.

Nachdem Sie Ihre ETL -Pipeline entwickelt und getestet haben, können Sie Ihre Aufgaben in der Produktionsumgebung mit dem Luftstrom automatisieren.

Produktionsworkflow vom Autor erstellt

Hier finden Sie die Checkliste der wichtigsten Schritte für die Vorbereitung, um den erfolgreichen Einsatz im Luftstrom zu unterstützen:

  • Erstellen Sie S3 -Eimer my-prod-bucket
  • Kleberjob erstellen prod_covid_transformation In AWS -Konsole
  • Erstellen Sie eine Redshift -gespeicherte Prozedur prod.load_covid_data()
  • Luftstrom konfigurieren
  • Konfigurieren Sie SMTP für E -Mails in airflow.cfg

Anschließend ist die Bereitstellung der Datenpipeline im Luftstrom:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.suppliers.amazon.aws.operators.glue import GlueJobOperator
from airflow.suppliers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
from airflow.operators.e mail import EmailOperator

# 1. DAG CONFIGURATION
default_args = {
    'proprietor': 'data_team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2023, 1, 1)
}

# 2. DATA INGESTION FUNCTION
def load_covid_data():
    import pandas as pd
    import boto3
    
    url = "https://github.com/CSSEGISandData/COVID-19/uncooked/grasp/archived_data/archived_time_series/time_series_19-covid-Confirmed_archived_0325.csv"
    df = pd.read_csv(url)

    df = df.soften(
        id_vars=('Province/State', 'Nation/Area', 'Lat', 'Lengthy'), 
        var_name='date_str',
        value_name='confirmed_cases'
    )
    df('date') = pd.to_datetime(df('date_str'), format='%m/%d/%y')
    
    df.to_parquet(
        's3://my-prod-bucket/uncooked/covid/',
        engine='pyarrow',
        partition_cols=('date')
    )

# 3. DAG DEFINITION
with DAG(
    'covid_etl',
    default_args=default_args,
    schedule_interval='@every day',
    catchup=False
) as dag:

    # Job 1: Ingest Information
    ingest = PythonOperator(
        task_id='ingest_data',
        python_callable=load_covid_data
    )

    # Job 2: Rework with Glue
    remodel = GlueJobOperator(
        task_id='transform_data',
        job_name='prod_covid_transformation',
        script_args={
            '--input_path': 's3://my-prod-bucket/uncooked/covid/',
            '--output_path': 's3://my-prod-bucket/processed/covid/'
        }
    )

    # Job 3: Load to Redshift
    load = RedshiftSQLOperator(
        task_id='load_data',
        sql="CALL prod.load_covid_data()"
    )

    # Job 4: Notifications
    notify = EmailOperator(
        task_id='send_email',
        to='you-email-address',
        topic='ETL Standing: {{ ds }}',
        html_content='ETL job accomplished: <a href="{{ ti.log_url }}">View Logs</a>'
    )

Meine letzten Gedanken

Obwohl einige Benutzer, insbesondere diejenigen, die neu in der Cloud sind und einfache Lösungen suchen, in der Regel durch AWS ‚hohe Eintrittsbarriere entmutigt werden und von den massiven Auswahlmöglichkeiten der Dienste überwältigt werden, ist es die Zeit und die Bemühungen wert und hier sind die Gründe:

  • Der Konfigurationsvorgang und das Entwerfen, Erstellen und Testen der Datenpipelines erhalten Sie ein tiefes Verständnis für einen typischen Datenentwicklungs -Workflow. Die Fähigkeiten kommen Ihnen zugute, auch wenn Sie Ihre Projekte mit anderen Cloud -Diensten wie Azure, GCP und Alibaba Cloud produzieren.
  • Das ausgereifte Ökosystem, das AWS hat, und eine Vielzahl von Diensten, die es Benutzern ermöglicht, ihre Datenarchitekturstrategien anzupassen und mehr Flexibilität und Skalierbarkeit in ihren Projekten zu genießen.

Danke fürs Lesen! Hoffe, dieser Artikel hilfreich, um Ihre Cloud-Base-Datenpipeline zu erstellen!

Von admin

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert