Cookie Consent by Free Privacy Policy Generator ๐Ÿ“Œ Building a Data Lakehouse for Analyzing Elon Musk Tweets using MinIO, Apache Airflow, Apache Drill and Apache Superset

๐Ÿ  Team IT Security News

TSecurity.de ist eine Online-Plattform, die sich auf die Bereitstellung von Informationen,alle 15 Minuten neuste Nachrichten, Bildungsressourcen und Dienstleistungen rund um das Thema IT-Sicherheit spezialisiert hat.
Ob es sich um aktuelle Nachrichten, Fachartikel, Blogbeitrรคge, Webinare, Tutorials, oder Tipps & Tricks handelt, TSecurity.de bietet seinen Nutzern einen umfassenden รœberblick รผber die wichtigsten Aspekte der IT-Sicherheit in einer sich stรคndig verรคndernden digitalen Welt.

16.12.2023 - TIP: Wer den Cookie Consent Banner akzeptiert, kann z.B. von Englisch nach Deutsch รผbersetzen, erst Englisch auswรคhlen dann wieder Deutsch!

Google Android Playstore Download Button fรผr Team IT Security



๐Ÿ“š Building a Data Lakehouse for Analyzing Elon Musk Tweets using MinIO, Apache Airflow, Apache Drill and Apache Superset


๐Ÿ’ก Newskategorie: Programmierung
๐Ÿ”— Quelle: dev.to

Every act of conscious learning requires the willingness to suffer an injury to one's self-esteem. That is why young children, before they are aware of their own self-importance, learn so easily.
Thomas Szasz

Motivation

A Data Lakehouse is a modern data architecture that combines the scalability and flexibility of a data lake with the governance and performance of a data warehouse. This approach allows organizations to store and analyze large amounts of structured and unstructured data in a single platform, enabling more efficient and effective data-driven decision making.

To deep dive in this, we will build a Data lakehouse solution for analyzing tweets from Elon Musk using MinIO as storage, Apache Drill as query engine, Apache Superset for visualization and Apache Airflow for orchestration. This article will take you through the process of building and utilizing this solution to gain insights and make data-driven decisions.

Then, weโ€™ll use Docker-Compose to easily deploy our solution.

Architecture

https://github.com/mikekenneth/blogpost_resources/raw/main/twitter_data-lakehouse_drill_superset/architecture.png

Table of Content

  • What is Apache Airflow?
  • What is MinIO ?
  • What is Apache Drill ?
  • What is Apache Superset ?
  • Code
    • get_twitter_data()
    • clean_twitter_data()
    • write_to_bucket()
    • DAG (Direct Acyclic Graph)
    • Apache Drill Configuration
    • Apache Superset Configuration
    • docker-compose & .env files
  • Result

What is Apache Airflow

Airflow is a platform created by the community to programmatically author, schedule and monitor workflows.

Apache Airflow is an opensource workflow orchestration written in Python. It uses DAG (Direct Acyclic Graphs) to represent workflows. It is highly customizable/flexible and have a quite active community.

๐Ÿ’ก You can read more here.

What is MinIO

MinIO offers high-performance, S3 compatible object storage.

MinIO is an opensource Multi-cloud Object Storage and fully compatible with AWS s3. With MinIO you can host your own on-premises or cloud Object storage.

๐Ÿ’ก You ca read more here.

What is Apache Drill

Schema-free SQL Query Engine for Hadoop, NoSQL and Cloud Storage.

MinIO is an opensource Multi-cloud Object Storage and fully compatible with AWS s3. With MinIO you can host your own on-premises or cloud Object storage.

๐Ÿ’ก You ca read more here.

What is Apache Superset

Apache Superset is a modern data exploration and visualization platform.

Superset is fast, lightweight, intuitive, and loaded with options that make it easy for users of all skill sets to explore and visualize their data, from simple line charts to highly detailed geospatial charts.

๐Ÿ’ก You ca read more here.

Code

The full code can be accessed here:

Source code:

https://github.com/mikekenneth/twitter_data-lakehouse_minio_drill_superset

get_twitter_data()

Below is the python Task that pulls Elonโ€™s tweets from Twitter API into a Python list:

import os
import json
import requests
from airflow.decorators import dag, task

@task
def get_twitter_data():
    TWITTER_BEARER_TOKEN = os.getenv("TWITTER_BEARER_TOKEN")

    # Get tweets using Twitter API v2 & Bearer Token
    BASE_URL = "https://api.twitter.com/2/tweets/search/recent"
    USERNAME = "elonmusk"
    FIELDS = {"created_at", "lang", "attachments", "public_metrics", "text", "author_id"}

    url = f"{BASE_URL}?query=from:{USERNAME}&tweet.fields={','.join(FIELDS)}&expansions=author_id&max_results=50"
    response = requests.get(url=url, headers={"Authorization": f"Bearer {TWITTER_BEARER_TOKEN}"})
    response = json.loads(response.content)
    print(response)

    data = response["data"]
    includes = response["includes"]
    return data, includes

clean_twitter_data()

Below is the python Task that cleans & transform the tweets in the right format:

from uuid import uuid4
from datetime import datetime
from airflow.decorators import dag, task

@task
def clean_twitter_data(tweets_data):
    data, includes = tweets_data

    batchId = str(uuid4()).replace("-", "")
    batchDatetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    # Refine tweets data
    tweet_list = []
    for tweet in data:
        refined_tweet = {
            "tweet_id": tweet["id"],
            "username": includes["users"][0]["username"],  # Get username from the included data
            "user_id": tweet["author_id"],
            "text": tweet["text"],
            "like_count": tweet["public_metrics"]["like_count"],
            "retweet_count": tweet["public_metrics"]["retweet_count"],
            "created_at": tweet["created_at"],
            "batchID": batchId,
            "batchDatetime": batchDatetime,
        }
        tweet_list.append(refined_tweet)
    return tweet_list, batchDatetime, batchId

dump_data_to_bucket()

Below is the python task transforms the tweets list into a Pandas Dataframe, then writes it in our MinIO Object Storage as a Parquet file.

import os
from airflow.decorators import dag, task

@task
def write_to_bucket(data):
    tweet_list, batchDatetime_str, batchId = data
    batchDatetime = datetime.strptime(batchDatetime_str, "%Y-%m-%d %H:%M:%S")

    import pandas as pd
    from minio import Minio
    from io import BytesIO

    MINIO_BUCKET_NAME = os.getenv("MINIO_BUCKET_NAME")
    MINIO_ROOT_USER = os.getenv("MINIO_ROOT_USER")
    MINIO_ROOT_PASSWORD = os.getenv("MINIO_ROOT_PASSWORD")

    df = pd.DataFrame(tweet_list)
    file_data = df.to_parquet(index=False)

    client = Minio("minio:9000", access_key=MINIO_ROOT_USER, secret_key=MINIO_ROOT_PASSWORD, secure=False)

    # Make MINIO_BUCKET_NAME if not exist.
    found = client.bucket_exists(MINIO_BUCKET_NAME)
    if not found:
        client.make_bucket(MINIO_BUCKET_NAME)
    else:
        print(f"Bucket '{MINIO_BUCKET_NAME}' already exists!")

    # Put parquet data in the bucket
    filename = (
        f"tweets/{batchDatetime.strftime('%Y/%m/%d')}/elon_tweets_{batchDatetime.strftime('%H%M%S')}_{batchId}.parquet"
    )
    client.put_object(
        MINIO_BUCKET_NAME, filename, data=BytesIO(file_data), length=len(file_data), content_type="application/csv"
    )

DAG (Direct Acyclic Graph)

Below is the DAG itself that allows specifying the dependencies between tasks:

from datetime import datetime
from airflow.decorators import dag, task

@dag(
    schedule="0 * * * *",
    start_date=datetime(2023, 1, 10),
    catchup=False,
    tags=["twitter", "etl"],
)
def twitter_etl():
    raw_data = get_twitter_data()
    cleaned_data = clean_twitter_data(raw_data)
    write_to_bucket(cleaned_data)

twitter_etl()

Apache Drill Configuration

  • To grant Apache Drill access to our MinIO (s3) bucket, we need to defining Access Keys in the Drill core-site.xml file:

    <?xml version="1.0" encoding="UTF-8" ?>
    <configuration>
    
        <property>
            <name>fs.s3a.access.key</name>
            <value>minioadmin</value>
        </property>
    
        <property>
            <name>fs.s3a.secret.key</name>
            <value>minioadmin</value>
        </property>
    
        <property>
          <name>fs.s3a.endpoint</name>
          <value>http://minio:9000</value>
        </property>
    
        <property>
          <name>fs.s3a.connection.ssl.enabled</name>
          <value>false</value>
        </property>
    
        <property>
          <name>fs.s3a.path.style.access</name>
          <value>true</value>
        </property>
    
    </configuration>
    
  • Next, weโ€™ll configure the S3 Storage Plugin to specify the buckets to be access by Apache Drill in the storage-plugins-override.conf file:

    "storage": {
      s3: {
        type: "file",
        connection: "s3a://twitter-data",
        workspaces: {
          "root": {
            "location": "/",
            "writable": false,
            "defaultInputFormat": null,
            "allowAccessOutsideWorkspace": false
          }
        },
        formats: {
          "parquet": {
            "type": "parquet"
          },
          "csv" : {
            "type" : "text",
            "extensions" : [ "csv" ]
          }
        },
        enabled: true
      }
    }
    

๐Ÿ’ก You can read more here.

Apache Superset Configuration

To be able to query Apache-Drill, we need to build a custom image of apache/superset using superset_drill.Dockerfile:

FROM apache/superset
# Switching to root to install the required packages
USER root
# install requirements for Apache Drill
RUN pip install sqlalchemy-drill
# Switching back to using the `superset` user
USER superset

๐Ÿ’ก You can read more here.

docker-compose & .env files

Below is the .env file that we need to create that hold the environmental variables needed to run our pipeline:

๐Ÿ’ก You can read this to learn our to generate the TWITTER_BEARER_TOKEN.

# Twitter
TWITTER_BEARER_TOKEN="TOKEN"

# Minio
MINIO_ROOT_USER=minioadmin
MINIO_ROOT_PASSWORD=minioadmin
MINIO_BUCKET_NAME='twitter-data'

# Superset
SUPERSET_USERNAME=admin
SUPERSET_PASSWORD=admin

#ย Apache Airflow
## Meta-Database
POSTGRES_USER=airflow
POSTGRES_PASSWORD=airflow
POSTGRES_DB=airflow

## Airflow Core
AIRFLOW__CORE__FERNET_KEY=''
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=True
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW_UID=50000
AIRFLOW_GID=0

## Backend DB
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS=False

## Airflow Init
_AIRFLOW_DB_UPGRADE=True
_AIRFLOW_WWW_USER_CREATE=True
_AIRFLOW_WWW_USER_USERNAME=airflow
_AIRFLOW_WWW_USER_PASSWORD=airflow
_PIP_ADDITIONAL_REQUIREMENTS= "minio pandas requests"

And below is the docker-compose.yaml file that allow to spin up the needed infrastructure for our pipeline:

version: '3.4'

x-common:
  &common
  image: apache/airflow:2.5.0
  user: "${AIRFLOW_UID}:0"
  env_file:
    - .env
  volumes:
    - ./app/dags:/opt/airflow/dags
    - ./app/logs:/opt/airflow/logs

x-depends-on:
  &depends-on
  depends_on:
    postgres:
      condition: service_healthy
    airflow-init:
      condition: service_completed_successfully

services:
  minio:
    image: minio/minio:latest
    ports:
      - '9000:9000'
      - '9090:9090'
    volumes:
      - './data:/data'
    env_file:
      - .env
    command: server --console-address ":9090" /data
    healthcheck:
      test:
        [
          "CMD",
          "curl",
          "-f",
          "http://localhost:9000/minio/health/live"
        ]
      interval: 30s
      timeout: 20s
      retries: 3

  drill:
    env_file:
      - .env
    image: apache/drill:latest
    ports:
      - '8047:8047'
      - '31010:31010'
    volumes:
      # If needed, override default settings
      - ./conf/drill/core-site.xml:/opt/drill/conf/core-site.xml
      # Register default storage plugins
      - ./conf/drill/storage-plugins-override.conf:/opt/drill/conf/storage-plugins-override.conf
    stdin_open: true
    tty: true

  superset_drill:
    env_file:
      - .env
    ports:
      - '8088:8088'
    build:
      context: .
      dockerfile: superset_drill.Dockerfile
    volumes:
      - ./superset.db:/app/superset_home/superset.db

  postgres:
    image: postgres:13
    container_name: postgres
    ports:
      - "5433:5432"
    healthcheck:
      test: [ "CMD", "pg_isready", "-U", "airflow" ]
      interval: 5s
      retries: 5
    env_file:
      - .env

  scheduler:
    <<: *common
    <<: *depends-on
    container_name: airflow-scheduler
    command: scheduler
    restart: on-failure
    ports:
      - "8793:8793"

  webserver:
    <<: *common
    <<: *depends-on
    container_name: airflow-webserver
    restart: always
    command: webserver
    ports:
      - "8080:8080"
    healthcheck:
      test:
        [
          "CMD",
          "curl",
          "--fail",
          "http://localhost:8080/health"
        ]
      interval: 30s
      timeout: 30s
      retries: 5

  airflow-init:
    <<: *common
    container_name: airflow-init
    entrypoint: /bin/bash
    command:
      - -c
      - |
        mkdir -p /sources/logs /sources/dags
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags}
        exec /entrypoint airflow version

Result

  • When we access the Apache-Airflow Web UI, we can see the DAG and we can run it directly to see the results.
    DAG (Apache-Airflow web UI):

    https://github.com/mikekenneth/blogpost_resources/raw/main/twitter_data-lakehouse_drill_superset/airflow.png

  • Tweets Parquet file generated in our bucket (MinIO Console):

    https://github.com/mikekenneth/blogpost_resources/raw/main/twitter_data-lakehouse_drill_superset/minio.png

  • If needed, we can query our Data-Lakehouse directly using Apache-Drill Web Interface:

    https://github.com/mikekenneth/blogpost_resources/raw/main/twitter_data-lakehouse_drill_superset/drill.png

  • Finally, we can visualize our Dashboard (I built this already, but it can be easily modified and the data is stored in the superset.db file):

    https://github.com/mikekenneth/blogpost_resources/raw/main/twitter_data-lakehouse_drill_superset/superset.png

This is a wrap. I hope this helps you.

About Me

I am a Data Engineer with 3+ years of experience and more years as a Software Engineer (5+ years). I enjoy learning and teaching (mostly learning ๐Ÿ˜Ž).

You can get in touch with me through Twitter & LinkedIn or [email protected].

...



๐Ÿ“Œ Building a Data Lakehouse for Analyzing Elon Musk Tweets using MinIO, Apache Airflow, Apache Drill and Apache Superset


๐Ÿ“ˆ 177.66 Punkte

๐Ÿ“Œ Open Lakehouse Engineering/Apache Iceberg Lakehouse Engineering - A Directory of Resources


๐Ÿ“ˆ 48.65 Punkte

๐Ÿ“Œ Twitter Data Pipeline with Apache Airflow + MinIO (S3 compatible Object Storage)


๐Ÿ“ˆ 47.33 Punkte

๐Ÿ“Œ Demo: Analyzing MySQL database export using HeatWave Lakehouse


๐Ÿ“ˆ 42.23 Punkte

๐Ÿ“Œ CVE-2023-28707 | Apache Airflow Drill Provider up to 2.3.1 input validation


๐Ÿ“ˆ 41.08 Punkte

๐Ÿ“Œ False Hawaii Missile Alert Sent After Drill Recording Said 'This Is Not A Drill'


๐Ÿ“ˆ 39.39 Punkte

๐Ÿ“Œ Apache Airflow 2.0.0 Configurations Endpoint airflow.cfg access control


๐Ÿ“ˆ 38.27 Punkte

๐Ÿ“Œ CVE-2023-22884 | Apache Airflow/Airflow MySQL Provider command injection


๐Ÿ“ˆ 38.27 Punkte

๐Ÿ“Œ Building a Local Development Environment: Running a Next.js Full-Stack App with PostgreSQL and Minio S3 Using Docker


๐Ÿ“ˆ 37.7 Punkte

๐Ÿ“Œ Elon Musk: Twitter-User bekommen massenweise Musk-Tweets angezeigt


๐Ÿ“ˆ 36.09 Punkte

๐Ÿ“Œ Big data: ecco come orientarsi tra data warehouse, data lake e data lakehouse


๐Ÿ“ˆ 35.52 Punkte

๐Ÿ“Œ How the open data lakehouse makes data mesh realโ€”and radically expands data use for business


๐Ÿ“ˆ 33.96 Punkte

๐Ÿ“Œ Building the Next-Generation Data Lakehouse: 10X Performance


๐Ÿ“ˆ 33.57 Punkte

๐Ÿ“Œ Building a File Storage With Next.js, PostgreSQL, and Minio S3


๐Ÿ“ˆ 32.53 Punkte

๐Ÿ“Œ "Stop Elon", "Fuck Elon": Gegner von Elon Musk starten eigene Kryptowรคhrungen


๐Ÿ“ˆ 31.84 Punkte

๐Ÿ“Œ ELON MUSK STUNS Everyone With NEW Statements On AI (Exclusive Elon Musk Interview)


๐Ÿ“ˆ 31.31 Punkte

๐Ÿ“Œ BREAKING: ELON MUSK Drops OPEN AI BOMBSHELL "AGI Achieved" (Elon Musk Lawsuit) Q" QSTAR


๐Ÿ“ˆ 31.31 Punkte

๐Ÿ“Œ Elon Musk Tweets About Tesla Sales, the SEC, and a Special Offer From SpaceX


๐Ÿ“ˆ 30.33 Punkte

๐Ÿ“Œ How Emily Gladstone Cole, Unix security specialist & co-author of a book on Solaris Security, went from analyzing tree DNA to analyzing code


๐Ÿ“ˆ 29.96 Punkte

๐Ÿ“Œ CVE-2022-43718 | Apache Superset up to 1.5.2/2.0.0 Update Data Form cross site scripting


๐Ÿ“ˆ 28.8 Punkte

๐Ÿ“Œ The rise of the data lakehouse: A new era of data value


๐Ÿ“ˆ 28.8 Punkte

๐Ÿ“Œ Privacera connects to Dremioโ€™s data lakehouse to aid data governance


๐Ÿ“ˆ 28.8 Punkte

๐Ÿ“Œ A New Era of Data Analytics: Exploring the Innovative World of Data Lakehouse Architectures


๐Ÿ“ˆ 28.8 Punkte

๐Ÿ“Œ Popular Mechanics Defends Elon Musk -- While He Tweets About Fortnite


๐Ÿ“ˆ 28.53 Punkte

๐Ÿ“Œ Tesla: Elon Musk lรคsst Tesla-Kurs mit neuen Tweets abstรผrzen


๐Ÿ“ˆ 28.53 Punkte

๐Ÿ“Œ Elon Musk Tweets New Details About Tesla's Model Y Electric SUV


๐Ÿ“ˆ 28.53 Punkte

๐Ÿ“Œ As Sacha Baron Cohen Criticizes Zuckerberg, Elon Musk Tweets "#DeleteFacebook'


๐Ÿ“ˆ 28.53 Punkte

๐Ÿ“Œ GameStop Jumps After Hours As Elon Musk Tweets Out Reddit Board That's Hyping Stock


๐Ÿ“ˆ 28.53 Punkte

๐Ÿ“Œ Kryptowรคhrung: Elon Musk befeuert Dogecoin-Kurs mit Tweets


๐Ÿ“ˆ 28.53 Punkte

๐Ÿ“Œ To the Moon? Dogecoin Leaps 46% in 24 Hours After Tweets From Elon Musk, Snoop Dogg


๐Ÿ“ˆ 28.53 Punkte

๐Ÿ“Œ Elon Musk: US-Bรถrsenaufsicht untersucht Dogecoin-Tweets des Tesla-Chefs


๐Ÿ“ˆ 28.53 Punkte

๐Ÿ“Œ Tesla-Chef: Elon Musk wegen unberechenbarer Tweets verklagt


๐Ÿ“ˆ 28.53 Punkte

๐Ÿ“Œ Leben zerstรถrt: Anonymous greift Elon Musk wegen Kryptowรคhrungs-Tweets an


๐Ÿ“ˆ 28.53 Punkte

๐Ÿ“Œ Twitter bekรคmpft Elon Musk mit seinem schlimmsten Feind: Seinen eigenen Tweets


๐Ÿ“ˆ 28.53 Punkte

๐Ÿ“Œ Bรถrsenaufsicht vs. Elon Musk: Dogecoin-Tweets kommen unter die Lupe


๐Ÿ“ˆ 28.53 Punkte











matomo