EDA and ETL Convention
Exploratory Data Analysis (EDA)
EDA should be a fun and liberal process 😇 but it’s also important to have a structured approach to ensure a comprehensive understanding of the dataset. This template provides a systematic guide for conducting EDA and preparing the data for further analysis or modeling.
0. Mount Google Drive
First, you need to mount your Google Drive to the Colab environment. This will allow you to access files stored in your Google Drive.
from google.colab import drive
'/content/drive') drive.mount(
Running this code will prompt you to authorize access to your Google Drive. Follow the link provided, select your Google account, and copy the authorization code back into the Colab prompt.
0.1. Set Directory to a Specific Folder
Once your Google Drive is mounted, you can set the working directory to a specific folder within your Google Drive. For example, if you have a folder named MyFolder
in your MyDrive
, you can set the directory as follows:
import os
# Set the directory to a specific folder in your Google Drive
'/content/drive/MyDrive/MyFolder')
os.chdir(
# Verify the current working directory
print(os.getcwd())
This code changes the current working directory to MyFolder
within your Google Drive and prints the current working directory to confirm the change.
1. Data Loading and Initial Inspection
1.1 Import Libraries
Start by importing the necessary libraries for data manipulation and visualization.
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
1.2 Load the Data
Load your dataset into a Pandas DataFrame.
= pd.read_csv('your_dataset.csv') df
1.3 Initial Inspection
Inspect the first few rows, data types, and basic statistics.
# Display the first few rows
print(df.head())
# Display data types and non-null counts
print(df.info())
# Display basic statistics
print(df.describe())
2. Data Cleaning and Preprocessing
2.1 Handling Missing Values
Identify and handle missing values.
# Check for missing values
print(df.isnull().sum())
# Example: Fill missing values with the mean
=True) df.fillna(df.mean(), inplace
2.2 Handling Duplicates
Check for and remove duplicate rows.
# Check for duplicates
print(df.duplicated().sum())
# Remove duplicates
=True) df.drop_duplicates(inplace
2.3 Data Type Conversion
Convert data types if necessary.
# Example: Convert a column to datetime
'date_column'] = pd.to_datetime(df['date_column']) df[
2.4 Feature Engineering
Create new features if needed.
# Example: Create a new feature from existing columns
'new_feature'] = df['feature1'] / df['feature2'] df[
3. Univariate, Bivariate, and Multivariate Analyses
3.1 Univariate Analysis
Analyze individual variables.
3.1.1 Categorical Variables
Visualize the distribution of categorical variables.
# Example: Bar plot for a categorical variable
='categorical_column', data=df)
sns.countplot(x plt.show()
3.1.2 Numerical Variables
Visualize the distribution of numerical variables.
# Example: Histogram for a numerical variable
'numerical_column'], kde=True)
sns.histplot(df[ plt.show()
3.2 Bivariate Analysis
Analyze relationships between two variables.
3.2.1 Categorical vs. Numerical
Visualize the relationship between categorical and numerical variables.
# Example: Box plot
='categorical_column', y='numerical_column', data=df)
sns.boxplot(x plt.show()
3.2.2 Numerical vs. Numerical
Visualize the relationship between two numerical variables.
# Example: Scatter plot
='numerical_column1', y='numerical_column2', data=df)
sns.scatterplot(x plt.show()
3.3 Multivariate Analysis
Analyze relationships between multiple variables.
3.3.1 Correlation Matrix
Visualize the correlation between numerical variables.
# Compute the correlation matrix
= df.corr()
corr_matrix
# Plot the heatmap
=True, cmap='coolwarm')
sns.heatmap(corr_matrix, annot plt.show()
3.3.2 Pair Plot
Visualize pairwise relationships in the dataset.
# Pair plot
sns.pairplot(df) plt.show()
Conclusion
Summarize your findings and insights from the EDA. This structured approach ensures a comprehensive understanding of the dataset and prepares it for further analysis or modeling.
ETL Convention
To construct an ETL (Extract, Transform, Load) pipeline, it’s essential to follow a structured approach that ensures data is efficiently moved from source systems to a destination system, typically a data warehouse. Here is a detailed convention for building an ETL pipeline:
The following table shows the key libraries used in a typical Python-based ETL pipeline:
Libraries Used
Library | Purpose |
---|---|
pandas |
Data manipulation and analysis |
numpy |
Numerical computing and array operations |
requests |
Making HTTP requests to APIs |
sqlalchemy |
SQL toolkit and Object-Relational Mapping (ORM) |
airflow |
Workflow management and scheduling |
logging |
Logging for monitoring and debugging |
multiprocessing |
Parallel processing for performance optimization |
1. Setup and Import Libraries
# Import necessary libraries
import pandas as pd
import numpy as np
import requests
from sqlalchemy import create_engine
import logging
from multiprocessing import Pool
2. Define ETL Functions
2.1 Extract Data
def extract_data_from_api(api_url):
= requests.get(api_url)
response = response.json()
data = pd.DataFrame(data)
df return df
def extract_data_from_db(connection_string, query):
= create_engine(connection_string)
engine = pd.read_sql(query, con=engine)
df return df
2.2 Transform Data
def clean_data(df):
=True) # Remove missing values
df.dropna(inplace=True) # Remove duplicates
df.drop_duplicates(inplacereturn df
def transform_data(df):
'date'] = pd.to_datetime(df['date']) # Convert date column to datetime
df['new_feature'] = df['feature1'] / df['feature2'] # Create a new feature
df[return df
2.3 Load Data
def load_data_to_db(df, connection_string, table_name):
= create_engine(connection_string)
engine =engine, if_exists='replace', index=False) df.to_sql(table_name, con
3. Define the ETL Pipeline
def etl_pipeline(api_url, db_connection_string, query, table_name):
# Extract
= extract_data_from_api(api_url)
df_api = extract_data_from_db(db_connection_string, query)
df_db
# Transform
= clean_data(df_api)
df_api = clean_data(df_db)
df_db = transform_data(df_api)
df_api = transform_data(df_db)
df_db
# Load
load_data_to_db(df_api, db_connection_string, table_name) load_data_to_db(df_db, db_connection_string, table_name)
4. Execute the ETL Pipeline
# Define parameters
= 'https://api.example.com/data'
api_url = 'mysql+pymysql://user:password@host/dbname'
db_connection_string = 'SELECT * FROM source_table'
query = 'destination_table'
table_name
# Run the ETL pipeline
etl_pipeline(api_url, db_connection_string, query, table_name)
5. Automate and Schedule the Pipeline
5.1 Using Apache Airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
= {
default_args 'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
= DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')
dag
= PythonOperator(task_id='extract', python_callable=extract_data_from_api, op_args=[api_url], dag=dag)
extract_task = PythonOperator(task_id='transform', python_callable=transform_data, op_args=[df], dag=dag)
transform_task = PythonOperator(task_id='load', python_callable=load_data_to_db, op_args=[df, db_connection_string, table_name], dag=dag)
load_task
>> transform_task >> load_task extract_task
6. Logging and Error Handling
=logging.INFO)
logging.basicConfig(level
def etl_pipeline_with_logging(api_url, db_connection_string, query, table_name):
try:
'Starting ETL pipeline')
logging.info(
# Extract
= extract_data_from_api(api_url)
df_api = extract_data_from_db(db_connection_string, query)
df_db
# Transform
= clean_data(df_api)
df_api = clean_data(df_db)
df_db = transform_data(df_api)
df_api = transform_data(df_db)
df_db
# Load
load_data_to_db(df_api, db_connection_string, table_name)
load_data_to_db(df_db, db_connection_string, table_name)
'ETL pipeline completed successfully')
logging.info(except Exception as e:
f'Error in ETL pipeline: {e}') logging.error(
7. Parallel Processing (Optional)
def process_chunk(chunk):
= clean_data(chunk)
chunk = transform_data(chunk)
chunk return chunk
def parallel_processing(df):
= [df[i:i+1000] for i in range(0, len(df), 1000)]
chunks with Pool(4) as p:
= p.map(process_chunk, chunks)
processed_chunks return pd.concat(processed_chunks)
# Example usage
= parallel_processing(df) df
By following this template, you can create a robust and efficient ETL pipeline using Python in a Jupyter Notebook. This template covers the essential steps of extracting, transforming, and loading data, along with automation, logging, and parallel processing for performance optimization.