Twitter Data Pipeline with Apache Airflow + MinIO (S3 compatible Object Storage)

Twitter Data Pipeline with Apache Airflow + MinIO (S3 compatible Object Storage)

The more that you read, the more things you will know. The more that you learn, the more places youโ€™ll go.
Dr. Seuss


During my journey as a Data Engineer, I stumbled upon many tools.
One that caught my attention is MinIO, a Multi-cloud Object Storage that is AWS s3 Compatible.

To learn more about it, I built a Data Pipeline that uses Apache Airflow to pull Elon Musk tweets using the Twitter API and store the result in a CSV stored in a MinIO (OSS alternative to AWS s3) Object Storage bucket.

Then, weโ€™ll use Docker-Compose to easily deploy our code.

Table of Content

  • What is Apache Airflow?
  • What is MinIO ?
  • Code
    • get_twitter_data()
    • dump_data_to_bucket()
    • DAG (Direct Acyclic Graph)
    • docker-compose & .env files

What is Apache Airflow

Airflow is a platform created by the community to programmatically author, schedule and monitor workflows.

Apache Airflow is an opensource workflow orchestration written in Python. It uses DAG (Direct Acyclic Graphs) to represent workflows. It is highly customizable/flexible and have a quite active community.

You can read more here.

What is MinIO

MinIO offers high-performance, S3 compatible object storage.

MinIO is an opensource Multi-cloud Object Storage and fully compatible with AWS s3. With MinIO you can host your own on-premises or cloud Object Storage.

You can read more here.


The full code can be accessed.

Source code:


Below is the python Task that pulls Elonโ€™s tweets from Twitter API into a python list:

import os
import json
import requests
from airflow.decorators import dag, task

def get_twitter_data():

    # Get tweets using Twitter API v2 & Bearer Token
    BASE_URL = ""
    USERNAME = "elonmusk"
    FIELDS = {"created_at", "lang", "attachments", "public_metrics", "text", "author_id"}

    url = f"{BASE_URL}?query=from:{USERNAME}&tweet.fields={','.join(FIELDS)}&expansions=author_id&max_results=50"
    response = requests.get(url=url, headers={"Authorization": f"Bearer {TWITTER_BEARER_TOKEN}"})
    response = json.loads(response.content)

    data = response["data"]
    includes = response["includes"]

    # Refine tweets data
    tweet_list = []
    for tweet in data:
        refined_tweet = {
            "tweet_id": tweet["id"],
            "username": includes["users"][0]["username"],  # Get username from the included data
            "user_id": tweet["author_id"],
            "text": tweet["text"],
            "like_count": tweet["public_metrics"]["like_count"],
            "retweet_count": tweet["public_metrics"]["retweet_count"],
            "created_at": tweet["created_at"],
    return tweet_list


Below is the python Task that transforms the tweets list into a Pandas dataframe, then dumps it in our MinIO Object Storage as a CSV file:

import os
from airflow.decorators import dag, task

def dump_data_to_bucket(tweet_list: list):
    import pandas as pd
    from minio import Minio
    from io import BytesIO

    df = pd.DataFrame(tweet_list)
    csv = df.to_csv(index=False).encode("utf-8")

    client = Minio("minio:9000", access_key=MINIO_ROOT_USER, secret_key=MINIO_ROOT_PASSWORD, secure=False)

    # Make MINIO_BUCKET_NAME if not exist.
    found = client.bucket_exists(MINIO_BUCKET_NAME)
    if not found:
        print(f"Bucket '{MINIO_BUCKET_NAME}' already exists!")

    # Put csv data in the bucket
        "airflow-bucket", "twitter_elon_musk.csv", data=BytesIO(csv), length=len(csv), content_type="application/csv"

DAG (Direct Acyclic Graph)

Below is the DAG itself that allows specifying the dependencies between tasks:

from datetime import datetime
from airflow.decorators import dag, task

    schedule="0 */2 * * *",
    start_date=datetime(2022, 12, 26),
    tags=["twitter", "etl"],
def twitter_etl():


docker-compose & .env files

Below is the .env file that we need to create that hold the environmental variables needed to run our pipeline:

You can read this to learn our to generate the TWITTER_BEARER_TOKEN.

# Twitter (Must not be empty)

# Meta-Database

# Airflow Core

# Backend DB

# Airflow Init
_PIP_ADDITIONAL_REQUIREMENTS= "minio pandas requests"

# Minio

And below is the docker-compose.yaml file that allow to spin up the needed infrastructure for our pipeline:

version: '3.4'

  image: apache/airflow:2.5.0
  user: "${AIRFLOW_UID}:0"
    - .env
    - ./app/dags:/opt/airflow/dags
    - ./app/logs:/opt/airflow/logs

      condition: service_healthy
      condition: service_completed_successfully

    image: minio/minio:latest
      - '9000:9000'
      - '9090:9090'
      - './minio_data:/data'
      - .env
    command: server --console-address ":9090" /data

    image: postgres:13
    container_name: postgres
      - "5433:5432"
      test: [ "CMD", "pg_isready", "-U", "airflow" ]
      interval: 5s
      retries: 5
      - .env

    <<: *common
    <<: *depends-on
    container_name: airflow-scheduler
    command: scheduler
    restart: on-failure
      - "8793:8793"

    <<: *common
    <<: *depends-on
    container_name: airflow-webserver
    restart: always
    command: webserver
      - "8080:8080"
      interval: 30s
      timeout: 30s
      retries: 5

    <<: *common
    container_name: airflow-init
    entrypoint: /bin/bash
      - -c
      - |
        mkdir -p /sources/logs /sources/dags
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags}
        exec /entrypoint airflow version

When we access the Apache-Airflow Web UI, we can see the DAG and we can run it directly to see the results.

DAG (Apache-Airflow web UI):

Tweets file generated in our bucket (MinIO Console):

This is a wrap. I hope this helps you.

