قد يبدو بناء خطوط أنابيب ETL وكأنه هو الخيار المختار - فأنت تنقل البيانات من النقطة أ إلى النقطة ب، وتحولها إلى شيء مفيد، و التأكد من أن كل شيء يعمل بسلاسة. تدعمك لغة بايثون بجيش من المكتبات التي تجعل عملك أسهل - يشبه إلى حد ما وجود القوة إلى جانبك؟️. في هذا الدليل، سنلقي نظرة على بعض أفضل مكتبات بايثون لمساعدتك على الفوز في حرب ETL. ?
1. استخراج: سحب البيانات من المصدر
عندما يتعلق الأمر باستخراج البيانات، فأنت بحاجة إلى الأدوات المناسبة لسحب البيانات من مصادر مختلفة - قواعد البيانات وواجهات برمجة التطبيقات والملفات. هذا هو المكان الذي تبدأ فيه المتعة (صوت أوبي وان). فيما يلي المكتبات التي يمكنك الذهاب إليها للحصول على البيانات التي تحتاجها.
SQLAlchemy
عند استخراج البيانات من قواعد البيانات، فإن SQLAlchemy هو السيف الضوئي الموثوق به. إنها قوية وتتعامل مع أنواع متعددة من قواعد البيانات دون بذل أي جهد.
-
الايجابيات:
- يدعم مجموعة واسعة من قواعد البيانات (PostgreSQL، MySQL، SQLite، إلخ.)
- يمكنك التبديل بين قواعد البيانات بسهولة
-
مثال:
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:password@localhost/dbname')
connection = engine.connect()
result = connection.execute("SELECT * FROM jedi_order")
الباندا
Pandas هي سكين الجيش السويسري الخاص بك؟️ عندما يتعلق الأمر بالتعامل مع البيانات بتنسيق CSV أو Excel أو JSON أو حتى SQL. إنه سريع وسهل الاستخدام، ومثالي لاستخراج البيانات من الملفات.
-
الايجابيات:
- يمكن تحميل البيانات من تنسيقات ملفات مختلفة باستخدام سطر واحد من التعليمات البرمجية
- أداء رائع للبيانات الموجودة في الذاكرة
-
مثال:
import pandas as pd
data = pd.read_csv('rebels_data.csv')
الطلبات
بالنسبة للتعامل مع REST APIs، فإن الطلبات تشبه R2-D2 - فهي موثوقة وبسيطة وستوفر لك البيانات التي تحتاجها، بغض النظر عن الأمر.
-
الايجابيات:
- يجعل طلبات HTTP سهلة للغاية
- يتعامل مع مصادقة واجهة برمجة التطبيقات، والرؤوس، وما إلى ذلك.
-
مثال:
import requests
response = requests.get('https://api.example.com/data')
data = response.json()
2. التحويل: تشكيل البيانات
الآن بعد أن استخرجت البيانات، حان الوقت لتحويلها إلى شيء قابل للاستخدام. هذه المرحلة تشبه أخذ الميثريل الخام وتشكيله في الدروع؟️. دعونا نتعمق في بعض المكتبات الرائعة للتحويل.
الباندا
مرة أخرى، يعد Pandas مفيدًا لتحويل بياناتك. سواء أكان الأمر يتعلق بالتنظيف، أو التصفية، أو التجميع، فهو يوفر لك التغطية مثل عباءة الاختفاء.
الايجابيات- :
أطنان من الوظائف المضمنة لمعالجة البيانات
- مثالية للتحويلات داخل الذاكرة
-
مثال- :
# تنظيف البيانات عن طريق إزالة قيم NaN وتصفية الصفوف
data_cleaned = data.dropna().query('age > 18')
# Clean data by removing NaN values and filtering rows
data_cleaned = data.dropna().query('age > 18')
المساء
هل لديك مجموعات بيانات ضخمة من شأنها أن تجعل حتى نجمة الموت تبدو صغيرة؟ يتيح لك Dask التعامل مع بيانات أكبر من الذاكرة باستخدام المعالجة المتوازية، كل ذلك دون إعادة كتابة كود Pandas الخاص بك. ?
الايجابيات- :
مقاييس للتعامل مع مجموعات البيانات الكبيرة
- الحوسبة الموزعة، ولكن مع بناء جملة مألوف يشبه الباندا
-
مثال- :
استيراد dask.dataframe كـ dd
df = dd.read_csv('huge_data.csv')
النتيجة = df[df.age > 18].الحساب()
# Clean data by removing NaN values and filtering rows
data_cleaned = data.dropna().query('age > 18')
باي سبارك
بالنسبة للتحولات على مستوى Jedi في البيانات الضخمة، لا تنظر إلى أبعد من PySpark. إنه Luke Skywalker في معالجة البيانات الموزعة. ?♂️
الايجابيات- :
تحويلات البيانات بسرعة البرق على مجموعات البيانات الكبيرة
- رائع للعمل في الأنظمة البيئية للبيانات الضخمة (Hadoop، Spark)
-
مثال- :
من pyspark.sql قم باستيراد SparkSession
شرارة = SparkSession.builder.appName("ETL").getOrCreate()
df = spark.read.csv('galaxy_data.csv'، header=True، inferSchema=True)
df_filtered = df.filter(df.age > 18)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ETL").getOrCreate()
df = spark.read.csv('galaxy_data.csv', header=True, inferSchema=True)
df_filtered = df.filter(df.age > 18)
3.
التحميل: وضع البيانات في مكانها
أخيرًا، لقد قمت بتحويل بياناتك إلى شيء قابل للاستخدام. حان الوقت الآن لتحميله إلى وجهته النهائية. سواء كان ذلك مستودع بيانات، أو حاوية S3، أو قاعدة بيانات، فكر في هذا على أنه تسليم One Ring إلى Mordor؟️ - باستخدام الأدوات المناسبة، تصبح الرحلة أسهل كثيرًا.
SQLAlchemy
يجعل SQLAlchemy تحميل البيانات مرة أخرى إلى قاعدة البيانات الخاصة بك أمرًا بسيطًا. باستخدامه، يمكنك بسهولة إدراج بياناتك في قاعدة بيانات علائقية.
الايجابيات- :
يعمل مع قواعد بيانات متعددة
مثال- :
data.to_sql('jedi_council', محرك, فهرس=False, if_exists='replace')
# Clean data by removing NaN values and filtering rows
data_cleaned = data.dropna().query('age > 18')
Psycopg2
بالنسبة لقواعد بيانات PostgreSQL، فإن psycopg2 هو أفضل رفيق لك. إنه سريع وفعال ويجعل مهام SQL المعقدة أمرًا سهلاً.
الايجابيات- :
الدعم الأصلي لـ PostgreSQL
مثال- :
استيراد psycopg2
كون = psycopg2.connect(dbname = "star_wars"، المستخدم = "المستخدم"، كلمة المرور = "force123")
cur = conn.cursor()
cur.execute("INSERT INTO jedis (الاسم، العمر) VALUES (%s، %s)"، ('Luke'، 30))
conn.commit()
# Clean data by removing NaN values and filtering rows
data_cleaned = data.dropna().query('age > 18')
بوتو3
إذا كنت تعمل مع خدمات AWS مثل S3، فإن Boto3 هي أداة الانتقال لتحميل البيانات إلى السحابة. ستشعر وكأن غاندالف يستخدمها. ☁️
الايجابيات- :
متكامل تمامًا مع خدمات AWS
- سهولة التحميل/التنزيل من S3
-
مثال- :
استيراد boto3
s3 = boto3.client('s3')
s3.upload_file('local_file.csv', 'mybucket', 'file.csv')
# Clean data by removing NaN values and filtering rows
data_cleaned = data.dropna().query('age > 18')
عميل Google Cloud Storage (GCS)
بالنسبة للمطورين الذين يعملون مع Google Cloud، سيساعدك عميل GCS على تحميل البيانات إلى Google Cloud Storage بسهولة، تمامًا كما يفعل Boto3 مع AWS.
الايجابيات- :
الدعم الكامل لـ Google Cloud
مثال- :
من تخزين الاستيراد على google.cloud
العميل = التخزين. العميل ()
دلو = client.get_bucket('my_bucket')
النقطة = دلو.blob('data.csv')
blob.upload_from_filename('local_file.csv')
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ETL").getOrCreate()
df = spark.read.csv('galaxy_data.csv', header=True, inferSchema=True)
df_filtered = df.filter(df.age > 18)
4.
التنسيق: إدارة خط أنابيب ETL الخاص بك
الآن، لن يكتمل أي خط أنابيب ETL بدون القليل من التنسيق. فكر في هذا باعتباره القوة التي توجه جميع الأجزاء المتحركة ⚙️ - جدولة المهام والمراقبة وإعادة المحاولة إذا حدث خطأ ما.
تدفق هواء أباتشي
إذا كنت تعمل على أي شيء معقد، فإن Apache Airflow هو Yoda الخاص بك لتنسيق المهام. باستخدامه، يمكنك إنشاء سير العمل وجدولته ومراقبته، مما يضمن تشغيل جميع وظائف ETL كالساعة.
الايجابيات- :
جدولة قوية وإدارة المهام
- واجهة مرئية لتتبع سير العمل
-
مثال- :
من استيراد تدفق الهواء DAG
من airflow.operators.python_operator قم باستيراد PythonOperator
تعريف extract_data ():
#منطق الاستخراج
يمر
داغ = DAG('my_etl_pipeline', start_date=datetime(2023, 1, 1))
المهمة = PythonOperator(task_id='extract_task', python_callable=extract_data, dag=dag)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ETL").getOrCreate()
df = spark.read.csv('galaxy_data.csv', header=True, inferSchema=True)
df_filtered = df.filter(df.age > 18)
التفاف
ليس من الضروري أن يبدو بناء خطوط أنابيب ETL وكأنك تقاتل دارث فيدر ⚔️. باستخدام الأدوات المناسبة، يمكنك أتمتة العملية بأكملها، وتحويل البيانات بكفاءة، وتحميلها إلى وجهتها النهائية. سواء كنت تتعامل مع مجموعات بيانات صغيرة أو تعمل على أنظمة ضخمة وموزعة، ستساعدك مكتبات Python هذه على إنشاء خطوط أنابيب ETL قوية مثل One Ring (لكنها أقل شرًا).
أتمنى أن تكون قوة ETL معك.
✨