لدى المؤسسات في جميع الصناعات متطلبات معالجة البيانات المعقدة لحالات الاستخدام التحليلي عبر أنظمة التحليلات المختلفة ، مثل بحيرات البيانات على AWS، مستودعات البيانات (الأمازون الأحمر)، يبحث (خدمة Amazon OpenSearch) ، NoSQL (الأمازون DynamoDB)، التعلم الالي (الأمازون SageMaker)، و اكثر. يتم تكليف محترفي التحليلات باستخلاص القيمة من البيانات المخزنة في هذه الأنظمة الموزعة لإنشاء تجارب أفضل وآمنة ومحسّنة من حيث التكلفة لعملائهم. على سبيل المثال ، تسعى شركات الوسائط الرقمية إلى دمج مجموعات البيانات ومعالجتها في قواعد البيانات الداخلية والخارجية لبناء طرق عرض موحدة لملفات تعريف العملاء ، وتحفيز الأفكار للميزات المبتكرة ، وزيادة مشاركة النظام الأساسي.
في هذه السيناريوهات ، يبحث العملاء عن تكامل بيانات بدون خادم يوفر الاستخدام غراء AWS كمكون أساسي لمعالجة البيانات وفهرستها. تتكامل AWS Glue جيدًا مع خدمات AWS والمنتجات الشريكة ، وتوفر خيارات استخراج وتحويل وتحميل (ETL) منخفضة الكود / بدون كود لتمكين التحليلات أو التعلم الآلي (ML) أو سير عمل تطوير التطبيقات. قد تكون وظائف AWS Glue ETL مكونًا واحدًا في خط أنابيب أكثر تعقيدًا. يعد تنظيم تشغيل وإدارة التبعيات بين هذه المكونات قدرة أساسية في إستراتيجية البيانات. تدفقات عمل أمازون المدارة لتدفقات أباتشي الجوية (Amazon MWAA) ينظم خطوط البيانات باستخدام التقنيات الموزعة بما في ذلك الموارد المحلية وخدمات AWS ومكونات الجهات الخارجية.
في هذا المنشور ، نوضح كيفية تبسيط مراقبة مهمة AWS Glue التي تنظمها Airflow باستخدام أحدث ميزات Amazon MWAA.
نظرة عامة على الحل
يناقش هذا المنشور ما يلي:
- كيفية ترقية بيئة Amazon MWAA إلى الإصدار 2.4.3.
- كيفية تنظيم مهمة AWS Glue من تدفق الهواء إخراج الحلقية الرسم البياني (داج).
- تحسينات الملاحظة الخاصة بحزمة موفر Airflow Amazon في Amazon MWAA. يمكنك الآن دمج سجلات التشغيل الخاصة بمهام AWS Glue على وحدة التحكم Airflow لتبسيط استكشاف الأخطاء وإصلاحها في خطوط أنابيب البيانات. تصبح وحدة تحكم Amazon MWAA مرجعًا واحدًا لمراقبة عمليات تشغيل مهام AWS Glue وتحليلها. في السابق ، كانت فرق الدعم بحاجة إلى الوصول إلى وحدة تحكم إدارة AWS واتخذ خطوات يدوية لهذه الرؤية. تتوفر هذه الميزة افتراضيًا من الإصدار 2.4.3 من Amazon MWAA.
يوضح الرسم البياني التالي بنية الحلول لدينا.
المتطلبات الأساسية المسبقة
أنت بحاجة إلى المتطلبات الأساسية التالية:
قم بإعداد بيئة Amazon MWAA
للحصول على إرشادات حول إنشاء بيئتك ، يرجى الرجوع إلى قم بإنشاء بيئة Amazon MWAA. بالنسبة للمستخدمين الحاليين ، نوصي بالترقية إلى الإصدار 2.4.3 للاستفادة من تحسينات الملاحظة الواردة في هذا المنشور.
تختلف خطوات ترقية Amazon MWAA إلى الإصدار 2.4.3 اعتمادًا على ما إذا كان الإصدار الحالي هو 1.10.12 أو 2.2.2. نناقش كلا الخيارين في هذا المنشور.
المتطلبات الأساسية لإعداد بيئة Amazon MWAA
يجب أن تفي بالمتطلبات التالية:
قم بالترقية من الإصدار 1.10.12 إلى 2.4.3
إذا كنت تستخدم إصدار Amazon MWAA 1.10.12، تشير إلى الانتقال إلى بيئة Amazon MWAA جديدة للترقية إلى 2.4.3.
قم بالترقية من الإصدار 2.0.2 أو 2.2.2 إلى 2.4.3
إذا كنت تستخدم الإصدار 2.2.2 من بيئة Amazon MWAA أو إصدار أقل ، فأكمل الخطوات التالية:
- إنشاء Requirements.txt لأي تبعيات مخصصة مع إصدارات محددة مطلوبة لـ DAGs الخاصة بك.
- قم بتحميل الملف إلى Amazon S3 في الموقع المناسب حيث تشير بيئة Amazon MWAA إلى متطلبات .txt لتثبيت التبعيات.
- اتبع الخطوات الواردة الانتقال إلى بيئة Amazon MWAA جديدة وحدد الإصدار 2.4.3.
قم بتحديث DAGs الخاصة بك
قد يحتاج العملاء الذين قاموا بالترقية من بيئة Amazon MWAA القديمة إلى إجراء تحديثات على DAGs الموجودة. في الإصدار 2.4.3 من Airflow ، ستستخدم بيئة Airflow إصدار حزمة مزود Amazon 6.0.0 افتراضيًا. قد تتضمن هذه الحزمة بعض التغييرات المحتملة ، مثل التغييرات في أسماء المشغلين. على سبيل المثال ، ملف AWSGlueJobOperator تم إهماله واستبداله بـ GlueJobOperator. للحفاظ على التوافق ، قم بتحديث Airflow DAGs عن طريق استبدال أي مشغلين مهملين أو غير مدعومين من الإصدارات السابقة بأخرى جديدة. أكمل الخطوات التالية:
- انتقل إلى مشغلي Amazon AWS.
- حدد الإصدار المناسب المثبت في مثيل Amazon MWAA (6.0.0. افتراضيًا) للعثور على قائمة بمشغلي Airflow المدعومين.
- قم بإجراء التغييرات اللازمة في كود DAG الموجود وقم بتحميل الملفات المعدلة إلى موقع DAG في Amazon S3.
نسق مهمة AWS Glue من Airflow
يغطي هذا القسم تفاصيل تنظيم مهمة AWS Glue داخل Airflow DAGs. يعمل Airflow على تسهيل تطوير خطوط أنابيب البيانات مع التبعيات بين الأنظمة غير المتجانسة مثل العمليات المحلية والاعتمادات الخارجية وخدمات AWS الأخرى والمزيد.
تنسيق تجميع سجلات CloudTrail باستخدام AWS Glue و Amazon MWAA
في هذا المثال ، نراجع حالة استخدام استخدام Amazon MWAA لتنظيم مهمة AWS Glue Python Shell التي تستمر في المقاييس المجمعة استنادًا إلى سجلات CloudTrail.
تتيح CloudTrail إمكانية الرؤية في مكالمات AWS API التي يتم إجراؤها في حساب AWS الخاص بك. تتمثل إحدى حالات الاستخدام الشائع لهذه البيانات في جمع مقاييس الاستخدام على المبادئ التي تعمل على موارد حسابك للتدقيق والاحتياجات التنظيمية.
أثناء تسجيل أحداث CloudTrail ، يتم تسليمها كملفات JSON في Amazon S3 ، وهي ليست مثالية للاستعلامات التحليلية. نريد تجميع هذه البيانات واستمرارها كملفات باركيه للسماح بأداء الاستعلام الأمثل. كخطوة أولية ، يمكننا استخدام Athena لإجراء الاستعلام الأولي عن البيانات قبل القيام بتجميعات إضافية في مهمة AWS Glue الخاصة بنا. لمزيد من المعلومات حول إنشاء جدول AWS Glue Data Catalog ، يرجى الرجوع إلى إنشاء جدول سجلات CloudTrail في أثينا باستخدام عرض التقسيم بيانات. بعد أن استكشفنا البيانات عبر Athena وقررنا المقاييس التي نريد الاحتفاظ بها في جداول مجمعة ، يمكننا إنشاء وظيفة AWS Glue.
أنشئ جدول CloudTrail في أثينا
أولاً ، نحتاج إلى إنشاء جدول في كتالوج البيانات لدينا يسمح بالاستعلام عن بيانات CloudTrail عبر أثينا. ينشئ نموذج الاستعلام التالي جدولاً يحتوي على قسمين في المنطقة والتاريخ (يسمى snapshot_date). تأكد من استبدال العناصر النائبة لحاوية CloudTrail ومعرف حساب AWS واسم جدول CloudTrail:
create external table if not exists `<<<CLOUDTRAIL_TABLE_NAME>>>`( `eventversion` string comment 'from deserializer', `useridentity` struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>> comment 'from deserializer', `eventtime` string comment 'from deserializer', `eventsource` string comment 'from deserializer', `eventname` string comment 'from deserializer', `awsregion` string comment 'from deserializer', `sourceipaddress` string comment 'from deserializer', `useragent` string comment 'from deserializer', `errorcode` string comment 'from deserializer', `errormessage` string comment 'from deserializer', `requestparameters` string comment 'from deserializer', `responseelements` string comment 'from deserializer', `additionaleventdata` string comment 'from deserializer', `requestid` string comment 'from deserializer', `eventid` string comment 'from deserializer', `resources` array<struct<arn:string,accountid:string,type:string>> comment 'from deserializer', `eventtype` string comment 'from deserializer', `apiversion` string comment 'from deserializer', `readonly` string comment 'from deserializer', `recipientaccountid` string comment 'from deserializer', `serviceeventdetails` string comment 'from deserializer', `sharedeventid` string comment 'from deserializer', `vpcendpointid` string comment 'from deserializer')
PARTITIONED BY ( `region` string, `snapshot_date` string)
ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://<<<CLOUDTRAIL_BUCKET>>>/AWSLogs/<<<ACCOUNT_ID>>>/CloudTrail/'
TBLPROPERTIES ( 'projection.enabled'='true', 'projection.region.type'='enum', 'projection.region.values'='us-east-2,us-east-1,us-west-1,us-west-2,af-south-1,ap-east-1,ap-south-1,ap-northeast-3,ap-northeast-2,ap-southeast-1,ap-southeast-2,ap-northeast-1,ca-central-1,eu-central-1,eu-west-1,eu-west-2,eu-south-1,eu-west-3,eu-north-1,me-south-1,sa-east-1', 'projection.snapshot_date.format'='yyyy/mm/dd', 'projection.snapshot_date.interval'='1', 'projection.snapshot_date.interval.unit'='days', 'projection.snapshot_date.range'='2020/10/01,now', 'projection.snapshot_date.type'='date', 'storage.location.template'='s3://<<<CLOUDTRAIL_BUCKET>>>/AWSLogs/<<<ACCOUNT_ID>>>/CloudTrail/${region}/${snapshot_date}')
قم بتشغيل الاستعلام السابق على وحدة تحكم Athena ، ولاحظ اسم الجدول وقاعدة بيانات AWS Glue Data Catalog حيث تم إنشاؤه. نستخدم هذه القيم لاحقًا في كود Airflow DAG.
نموذج لرمز مهمة AWS Glue
التعليمات البرمجية التالية هي عينة مهمة AWS Glue Python Shell يقوم بما يلي:
- يأخذ الحجج (التي نمررها من Amazon MWAA DAG) حول بيانات اليوم التي يجب معالجتها
- يستخدم AWS SDK لباندا لتشغيل استعلام Athena لإجراء التصفية الأولية لبيانات CloudTrail JSON خارج AWS Glue
- يستخدم Pandas للقيام بتجميعات بسيطة على البيانات التي تمت تصفيتها
- تُخرج البيانات المجمعة إلى AWS Glue Data Catalog في جدول
- يستخدم التسجيل أثناء المعالجة ، والذي سيكون مرئيًا في Amazon MWAA
import awswrangler as wr
import pandas as pd
import sys
import logging
from awsglue.utils import getResolvedOptions
from datetime import datetime, timedelta # Logging setup, redirects all logs to stdout
LOGGER = logging.getLogger()
formatter = logging.Formatter('%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s')
streamHandler = logging.StreamHandler(sys.stdout)
streamHandler.setFormatter(formatter)
LOGGER.addHandler(streamHandler)
LOGGER.setLevel(logging.INFO) LOGGER.info(f"Passed Args :: {sys.argv}") sql_query_template = """
select
region,
useridentity.arn,
eventsource,
eventname,
useragent from "{cloudtrail_glue_db}"."{cloudtrail_table}"
where snapshot_date='{process_date}'
and region in ('us-east-1','us-east-2') """ required_args = ['CLOUDTRAIL_GLUE_DB', 'CLOUDTRAIL_TABLE', 'TARGET_BUCKET', 'TARGET_DB', 'TARGET_TABLE', 'ACCOUNT_ID']
arg_keys = [*required_args, 'PROCESS_DATE'] if '--PROCESS_DATE' in sys.argv else required_args
JOB_ARGS = getResolvedOptions ( sys.argv, arg_keys) LOGGER.info(f"Parsed Args :: {JOB_ARGS}") # if process date was not passed as an argument, process yesterday's data
process_date = ( JOB_ARGS['PROCESS_DATE'] if JOB_ARGS.get('PROCESS_DATE','NONE') != "NONE" else (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d") ) LOGGER.info(f"Taking snapshot for :: {process_date}") RAW_CLOUDTRAIL_DB = JOB_ARGS['CLOUDTRAIL_GLUE_DB']
RAW_CLOUDTRAIL_TABLE = JOB_ARGS['CLOUDTRAIL_TABLE']
TARGET_BUCKET = JOB_ARGS['TARGET_BUCKET']
TARGET_DB = JOB_ARGS['TARGET_DB']
TARGET_TABLE = JOB_ARGS['TARGET_TABLE']
ACCOUNT_ID = JOB_ARGS['ACCOUNT_ID'] final_query = sql_query_template.format( process_date=process_date.replace("-","/"), cloudtrail_glue_db=RAW_CLOUDTRAIL_DB, cloudtrail_table=RAW_CLOUDTRAIL_TABLE
) LOGGER.info(f"Running Query :: {final_query}") raw_cloudtrail_df = wr.athena.read_sql_query( sql=final_query, database=RAW_CLOUDTRAIL_DB, ctas_approach=False, s3_output=f"s3://{TARGET_BUCKET}/athena-results",
) raw_cloudtrail_df['ct']=1 agg_df = raw_cloudtrail_df.groupby(['arn','region','eventsource','eventname','useragent'],as_index=False).agg({'ct':'sum'})
agg_df['snapshot_date']=process_date LOGGER.info(agg_df.info(verbose=True)) upload_path = f"s3://{TARGET_BUCKET}/{TARGET_DB}/{TARGET_TABLE}" if not agg_df.empty: LOGGER.info(f"Upload to {upload_path}") try: response = wr.s3.to_parquet( df=agg_df, path=upload_path, dataset=True, database=TARGET_DB, table=TARGET_TABLE, mode="overwrite_partitions", schema_evolution=True, partition_cols=["snapshot_date"], compression="snappy", index=False ) LOGGER.info(response) except Exception as exc: LOGGER.error("Uploading to S3 failed") LOGGER.exception(exc) raise exc
else: LOGGER.info(f"Dataframe was empty, nothing to upload to {upload_path}")
فيما يلي بعض المزايا الرئيسية في وظيفة AWS Glue هذه:
- نستخدم استعلام Athena لضمان إجراء التصفية الأولية خارج مهمة AWS Glue. على هذا النحو ، لا تزال مهمة Python Shell ذات الحد الأدنى من الحوسبة كافية لتجميع مجموعة بيانات CloudTrail كبيرة.
- نحن نضمن خيار مجموعة مكتبة التحليلات قيد التشغيل عند إنشاء مهمة AWS Glue لاستخدام AWS SDK لمكتبة Pandas.
أنشئ مهمة AWS Glue
أكمل الخطوات التالية لإنشاء مهمة AWS Glue الخاصة بك:
- انسخ البرنامج النصي في القسم السابق واحفظه في ملف محلي. لهذا المنشور ، يسمى الملف
script.py
. - في وحدة تحكم AWS Glue ، اختر وظائف ETL في جزء التنقل.
- قم بإنشاء وظيفة جديدة وحدد محرر نصوص Python Shell.
- أختار تحميل وتحرير نص موجود وقم بتحميل الملف الذي حفظته محليًا.
- اختار إنشاء.
- على تفاصيل الوظيفة علامة التبويب ، أدخل اسمًا لمهمة AWS Glue الخاصة بك.
- في حالة دور IAM، اختر دورًا موجودًا أو أنشئ دورًا جديدًا لديه الأذونات المطلوبة لـ Amazon S3 و AWS Glue و Athena. يحتاج الدور إلى الاستعلام عن جدول CloudTrail الذي أنشأته مسبقًا والكتابة إلى موقع الإخراج.
يمكنك استخدام نموذج رمز السياسة التالي. استبدل العناصر النائبة بحاوية سجلات CloudTrail ، واسم جدول الإخراج ، وقاعدة بيانات AWS Glue ، وحاوية الإخراج S3 ، واسم جدول CloudTrail ، وقاعدة بيانات AWS Glue التي تحتوي على جدول CloudTrail ، ومعرف حساب AWS الخاص بك.
{ "Version": "2012-10-17", "Statement": [ { "Action": [ "s3:List*", "s3:Get*" ], "Resource": [ "arn:aws:s3:::<<<CLOUDTRAIL_LOGS_BUCKET>>>/*", "arn:aws:s3:::<<<CLOUDTRAIL_LOGS_BUCKET>>>*" ], "Effect": "Allow", "Sid": "GetS3CloudtrailData" }, { "Action": [ "glue:Get*", "glue:BatchGet*" ], "Resource": [ "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:catalog", "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:database/<<<GLUE_DB_WITH_CLOUDTRAIL_TABLE>>>", "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:table/<<<GLUE_DB_WITH_CLOUDTRAIL_TABLE>>>/<<<CLOUDTRAIL_TABLE>>>*" ], "Effect": "Allow", "Sid": "GetGlueCatalogCloudtrailData" }, { "Action": [ "s3:PutObject*", "s3:Abort*", "s3:DeleteObject*", "s3:GetObject*", "s3:GetBucket*", "s3:List*", "s3:Head*" ], "Resource": [ "arn:aws:s3:::<<<OUTPUT_S3_BUCKET>>>", "arn:aws:s3:::<<<OUTPUT_S3_BUCKET>>>/<<<OUTPUT_GLUE_DB>>>/<<<OUTPUT_TABLE_NAME>>>/*" ], "Effect": "Allow", "Sid": "WriteOutputToS3" }, { "Action": [ "glue:CreateTable", "glue:CreatePartition", "glue:UpdatePartition", "glue:UpdateTable", "glue:DeleteTable", "glue:DeletePartition", "glue:BatchCreatePartition", "glue:BatchDeletePartition", "glue:Get*", "glue:BatchGet*" ], "Resource": [ "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:catalog", "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:database/<<<OUTPUT_GLUE_DB>>>", "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:table/<<<OUTPUT_GLUE_DB>>>/<<<OUTPUT_TABLE_NAME>>>*" ], "Effect": "Allow", "Sid": "AllowOutputToGlue" }, { "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:*:*:/aws-glue/*", "Effect": "Allow", "Sid": "LogsAccess" }, { "Action": [ "s3:GetObject*", "s3:GetBucket*", "s3:List*", "s3:DeleteObject*", "s3:PutObject", "s3:PutObjectLegalHold", "s3:PutObjectRetention", "s3:PutObjectTagging", "s3:PutObjectVersionTagging", "s3:Abort*" ], "Resource": [ "arn:aws:s3:::<<<ATHENA_RESULTS_BUCKET>>>", "arn:aws:s3:::<<<ATHENA_RESULTS_BUCKET>>>/*" ], "Effect": "Allow", "Sid": "AccessToAthenaResults" }, { "Action": [ "athena:StartQueryExecution", "athena:StopQueryExecution", "athena:GetDataCatalog", "athena:GetQueryResults", "athena:GetQueryExecution" ], "Resource": [ "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:catalog", "arn:aws:athena:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:datacatalog/AwsDataCatalog", "arn:aws:athena:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:workgroup/primary" ], "Effect": "Allow", "Sid": "AllowAthenaQuerying" } ]
}
في حالة نسخة بايثون، اختر بيثون 3.9.
- أختار تحميل مكتبات التحليلات المشتركة.
- في حالة وحدات معالجة البيانات، اختر 1 وحدة DPU.
- اترك الخيارات الأخرى كخيار افتراضي أو اضبطها حسب الحاجة.
- اختار حفظ لحفظ تكوين عملك.
قم بتكوين Amazon MWAA DAG لتنظيم مهمة AWS Glue
الكود التالي مخصص لـ DAG الذي يمكنه تنسيق مهمة AWS Glue التي أنشأناها. نحن نستفيد من الميزات الرئيسية التالية في DAG هذا:
"""Sample DAG"""
import airflow.utils
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow import DAG
from datetime import timedelta
import airflow.utils # allow backfills via DAG run parameters
process_date = '{{ dag_run.conf.get("process_date") if dag_run.conf.get("process_date") else "NONE" }}' dag = DAG( dag_id = "CLOUDTRAIL_LOGS_PROCESSING", default_args = { 'depends_on_past':False, 'start_date':airflow.utils.dates.days_ago(0), 'retries':1, 'retry_delay':timedelta(minutes=5), 'catchup': False }, schedule_interval = None, # None for unscheduled or a cron expression - E.G. "00 12 * * 2" - at 12noon Tuesday dagrun_timeout = timedelta(minutes=30), max_active_runs = 1, max_active_tasks = 1 # since there is only one task in our DAG
) ## Log ingest. Assumes Glue Job is already created
glue_ingestion_job = GlueJobOperator( task_id="<<<some-task-id>>>", job_name="<<<GLUE_JOB_NAME>>>", script_args={ "--ACCOUNT_ID":"<<<YOUR_AWS_ACCT_ID>>>", "--CLOUDTRAIL_GLUE_DB":"<<<GLUE_DB_WITH_CLOUDTRAIL_TABLE>>>", "--CLOUDTRAIL_TABLE":"<<<CLOUDTRAIL_TABLE>>>", "--TARGET_BUCKET": "<<<OUTPUT_S3_BUCKET>>>", "--TARGET_DB": "<<<OUTPUT_GLUE_DB>>>", # should already exist "--TARGET_TABLE": "<<<OUTPUT_TABLE_NAME>>>", "--PROCESS_DATE": process_date }, region_name="us-east-1", dag=dag, verbose=True
) glue_ingestion_job
زيادة إمكانية ملاحظة وظائف AWS Glue في Amazon MWAA
تكتب مهام AWS Glue السجلات إلى الأمازون CloudWatch. مع التحسينات الأخيرة في إمكانية الملاحظة لحزمة مزود Airflow's Amazon ، تم دمج هذه السجلات الآن مع سجلات مهام Airflow. يوفر هذا الدمج لمستخدمي Airflow رؤية شاملة مباشرة في Airflow UI ، مما يلغي الحاجة إلى البحث في CloudWatch أو وحدة تحكم AWS Glue.
لاستخدام هذه الميزة ، تأكد من أن دور IAM المرفق ببيئة Amazon MWAA لديه الأذونات التالية لاسترداد السجلات الضرورية وكتابتها:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:DescribeLogStreams", "logs:FilterLogEvents", "logs:GetLogGroupFields", "logs:GetQueryResults", ], "Resource": [ "arn:aws:logs:*:*:log-group:airflow-243-<<<Your environment name>>>-*"--Your Amazon MWAA Log Stream Name ] } ]
}
إذا كانت القيمة المطولة = true ، فستظهر سجلات تشغيل مهمة AWS Glue في سجلات مهام Airflow. الافتراضي هو خطأ. لمزيد من المعلومات ، يرجى الرجوع إلى المعلمات.
عند التمكين ، تقرأ DAGs من تدفق سجل CloudWatch الخاص بمهمة AWS Glue وترحيلها إلى سجلات خطوات مهمة Airflow DAG AWS Glue. يوفر هذا رؤى تفصيلية حول تشغيل مهمة AWS Glue في الوقت الفعلي عبر سجلات DAG. لاحظ أن مهام AWS Glue تولد ناتجًا وخطأ مجموعة سجل CloudWatch بناءً على STDOUT و STDERR للوظيفة ، على التوالي. يتم ترحيل جميع السجلات في مجموعة سجل الإخراج والاستثناءات أو سجلات الأخطاء من مجموعة سجل الأخطاء إلى Amazon MWAA.
يمكن لمسؤولي AWS الآن تقييد وصول فريق الدعم إلى Airflow فقط ، مما يجعل Amazon MWAA الجزء الوحيد من الزجاج في تنسيق العمل وإدارة الصحة الوظيفية. في السابق ، كان المستخدمون بحاجة إلى التحقق من حالة تشغيل مهمة AWS Glue في خطوات Airflow DAG واسترداد معرف تشغيل المهمة. ثم احتاجوا إلى الوصول إلى وحدة تحكم AWS Glue للعثور على سجل تشغيل الوظيفة ، والبحث عن الوظيفة التي تهمهم باستخدام المعرّف ، ثم الانتقال أخيرًا إلى سجلات CloudWatch الخاصة بالمهمة لاستكشاف الأخطاء وإصلاحها.
قم بإنشاء DAG
لإنشاء DAG ، أكمل الخطوات التالية:
- احفظ رمز DAG السابق في ملف .py محلي ، مع استبدال العناصر النائبة المشار إليها.
يجب أن تكون قيم معرف حساب AWS واسم وظيفة AWS Glue وقاعدة بيانات AWS Glue مع جدول CloudTrail واسم جدول CloudTrail معروفة بالفعل. يمكنك ضبط حاوية S3 للإخراج ، وقاعدة بيانات AWS Glue ، واسم جدول الإخراج حسب الحاجة ، ولكن تأكد من تكوين دور IAM لوظيفة AWS Glue الذي استخدمته مسبقًا وفقًا لذلك.
- في وحدة تحكم Amazon MWAA ، انتقل إلى بيئتك لمعرفة مكان تخزين رمز DAG.
مجلد DAGs هو البادئة داخل حاوية S3 حيث يجب وضع ملف DAG الخاص بك.
- قم بتحميل الملف المحرر الخاص بك هناك.
- افتح وحدة تحكم Amazon MWAA للتأكد من ظهور DAG في الجدول.
قم بتشغيل DAG
لتشغيل DAG ، أكمل الخطوات التالية:
- اختر من بين الخيارات التالية:
- الزناد DAG - يؤدي هذا إلى استخدام بيانات الأمس كبيانات للمعالجة
- الزناد DAG w / config - باستخدام هذا الخيار ، يمكنك المرور في تاريخ مختلف ، ربما لعمليات الردم ، والتي يتم استرجاعها باستخدام
dag_run.conf
في كود DAG ثم تم تمريره إلى وظيفة AWS Glue كمعامل
تُظهر لقطة الشاشة التالية خيارات التكوين الإضافية إذا اخترت ذلك الزناد DAG w / config.
- مراقبة DAG أثناء تشغيله.
- عند اكتمال DAG ، افتح تفاصيل التشغيل.
في الجزء الأيسر ، يمكنك عرض السجلات أو الاختيار تفاصيل مثيل المهمة للحصول على عرض كامل.
- اعرض سجلات مخرجات مهمة AWS Glue في Amazon MWAA دون استخدام وحدة تحكم AWS Glue بفضل ملف
GlueJobOperator
علم مطول.
ستكون نتائج مهمة AWS Glue مكتوبة في جدول الإخراج الذي حددته.
- استعلم عن هذا الجدول عبر أثينا للتأكد من نجاحه.
نبذة عامة
توفر Amazon MWAA الآن مكانًا واحدًا لتتبع حالة مهمة AWS Glue وتمكنك من استخدام وحدة تحكم Airflow كجزء واحد من الزجاج لتنسيق العمل وإدارة الصحة. في هذا المنشور ، انتقلنا عبر خطوات تنظيم مهام AWS Glue عبر استخدام Airflow GlueJobOperator
. من خلال تحسينات الملاحظة الجديدة ، يمكنك استكشاف أخطاء AWS Glue وإصلاحها بسلاسة في تجربة موحدة. لقد أوضحنا أيضًا كيفية ترقية بيئة Amazon MWAA الخاصة بك إلى إصدار متوافق ، وتحديث التبعيات ، وتغيير سياسة دور IAM وفقًا لذلك.
لمزيد من المعلومات حول خطوات استكشاف الأخطاء وإصلاحها الشائعة ، راجع استكشاف الأخطاء وإصلاحها: إنشاء وتحديث بيئة Amazon MWAA. للحصول على تفاصيل متعمقة عن الترحيل إلى بيئة Amazon MWAA ، يرجى الرجوع إلى الترقية من 1.10 إلى 2. للتعرف على تغييرات التعليمات البرمجية مفتوحة المصدر لزيادة إمكانية ملاحظة وظائف AWS Glue في حزمة مزود Airflow Amazon ، يرجى الرجوع إلى سجلات الترحيل من وظائف AWS Glue.
أخيرًا ، نوصي بزيارة مدونة البيانات الضخمة في AWS للمواد الأخرى المتعلقة بالتحليلات وتعلم الآلة وحوكمة البيانات على AWS.
حول المؤلف
رشابه لوخاندي مهندس بيانات وتعلم آلي مع ممارسة تحليلات خدمات AWS الاحترافية. يساعد العملاء في تنفيذ البيانات الضخمة والتعلم الآلي وحلول التحليلات. خارج العمل ، يستمتع بقضاء الوقت مع العائلة والقراءة والجري والجولف.
ريان جوميز مهندس بيانات وتعلم آلي مع ممارسة تحليلات خدمات AWS الاحترافية. إنه متحمس لمساعدة العملاء على تحقيق نتائج أفضل من خلال التحليلات وحلول التعلم الآلي في السحابة. خارج العمل ، يستمتع باللياقة البدنية والطبخ وقضاء وقت ممتع مع الأصدقاء والعائلة.
فيشوا جوبتا هو مهندس بيانات أقدم مع AWS Professional Services Analytics Practice. يساعد العملاء في تنفيذ حلول البيانات والتحليلات الضخمة. خارج العمل ، يستمتع بقضاء الوقت مع العائلة والسفر وتجربة طعام جديد.
- محتوى مدعوم من تحسين محركات البحث وتوزيع العلاقات العامة. تضخيم اليوم.
- أفلاطونايستريم. ذكاء بيانات Web3. تضخيم المعرفة. الوصول هنا.
- سك المستقبل مع أدرين أشلي. الوصول هنا.
- شراء وبيع الأسهم في شركات ما قبل الاكتتاب مع PREIPO®. الوصول هنا.
- المصدر https://aws.amazon.com/blogs/big-data/simplify-aws-glue-job-orchestration-and-monitoring-with-amazon-mwaa/
- :لديها
- :يكون
- :ليس
- :أين
- $ UP
- 1
- 10
- 100
- 12
- 8
- a
- من نحن
- الوصول
- وفقا لذلك
- حسابي
- التأهيل
- في
- اكشن
- اسيكليك
- إضافي
- مميزات
- مزايا
- بعد
- تجميع
- الكل
- السماح
- يسمح
- سابقا
- أيضا
- أمازون
- أمازون ويب سيرفيسز
- an
- تحليلية
- تحليلات
- تحليل
- و
- أي وقت
- أباتشي
- API
- تطبيق
- تطوير التطبيقات
- مناسب
- هندسة معمارية
- هي
- حجة
- الحجج
- AS
- At
- سمات
- التدقيق
- متاح
- AWS
- غراء AWS
- الخدمات المهنية AWS
- على أساس
- BE
- يصبح
- كان
- قبل
- يجري
- أفضل
- ما بين
- كبير
- البيانات الكبيرة
- على حد سواء
- كسر
- نساعدك في بناء
- لكن
- by
- تسمى
- دعوات
- CAN
- حقيبة
- الحالات
- الأقسام
- الأسباب
- تغيير
- التغييرات
- التحقق
- اختار
- سحابة
- الكود
- COM
- دمج
- التعليق
- مشترك
- الشركات
- التوافق
- متوافق
- إكمال
- مجمع
- عنصر
- مكونات
- إحصاء
- الاعداد
- أكد
- كنسولات
- دعم
- توحيد
- الطهي
- جوهر
- ويغطي
- خلق
- خلق
- يخلق
- خلق
- حالياًّ
- على
- زبون
- العملاء
- DAG
- البيانات
- تكامل البيانات
- معالجة المعلومات
- استراتيجية البيانات
- مستودعات البيانات
- قاعدة البيانات
- قواعد البيانات
- قواعد البيانات
- التاريخ
- تمور
- التاريخ والوقت
- أيام
- قررت
- الترتيب
- تم التوصيل
- تظاهر
- اعتمادا
- إهمال
- مفصلة
- تفاصيل
- التطوير التجاري
- اختلف
- مختلف
- رقمي
- وسائل الإعلام الرقمية
- مباشرة
- بحث
- وزعت
- الانظمة الموزعة
- do
- هل
- فعل
- فعل
- أثناء
- e
- في وقت سابق
- يخفف
- تأثير
- القضاء
- آخر
- تمكين
- تمكين
- تمكن
- النهائي إلى نهاية
- اشتباك
- مهندس
- التحسينات
- ضمان
- أدخل
- البيئة
- خطأ
- الأثير (ETH)
- أحداث
- مثال
- إلا
- استثناء
- يوجد
- القائمة
- موجود
- الخبره في مجال الغطس
- خبرة
- استكشاف
- التعبير
- خارجي
- استخراج
- فشل
- زائف
- للعائلات
- الميزات
- عقار مميز
- المميزات
- قم بتقديم
- ملفات
- تصفية
- أخيرا
- اللياقة البدنية
- متابعيك
- طعام
- في حالة
- شكل
- الاصدقاء
- تبدأ من
- بالإضافة إلى
- جمع
- توليد
- زجاج
- Go
- الغولف
- الحكم
- تجمع
- Hadoop
- يملك
- he
- صحة الإنسان
- مساعدة
- يساعد
- تاريخ
- خلية النحل
- كيفية
- كيفية
- HTML
- HTTP
- HTTPS
- IAM
- ID
- المثالي
- الأفكار
- معرف
- if
- يوضح
- تنفيذ
- استيراد
- in
- في العمق
- تتضمن
- بما فيه
- القيمة الاسمية
- زيادة
- وأشار
- الصناعات
- info
- معلومات
- في البداية
- مبتكرة
- رؤى
- تركيب
- مثل
- تعليمات
- المتكاملة
- التكامل
- مصلحة
- داخلي
- إلى
- IT
- وظيفة
- المشــاريــع
- JPG
- جسون
- القفل
- معروف
- كبير
- الى وقت لاحق
- آخر
- تعلم
- تعلم
- المكتبة
- مما سيحدث
- قائمة
- تحميل
- محلي
- محليا
- موقع
- سجل
- تسجيل الدخول
- تسجيل
- أبحث
- آلة
- آلة التعلم
- صنع
- المحافظة
- جعل
- القيام ب
- تمكن
- إدارة
- إدارة
- كتيب
- مادة
- مايو..
- الوسائط
- تعرف علي
- الرسالة
- المقاييس
- المهاجرة
- أدنى
- ML
- تم التعديل
- وحدة
- مراقبة
- مراقبة
- الأكثر من ذلك
- يجب
- الاسم
- أسماء
- التنقل
- قائمة الإختيارات
- ضروري
- حاجة
- بحاجة
- إحتياجات
- جديد
- لا شى
- الآن
- of
- الوهب
- on
- ONE
- منها
- فقط
- جاكيت
- المصدر المفتوح
- كود مفتوح المصدر
- عامل
- مشغلي
- الأمثل
- خيار
- مزيد من الخيارات
- or
- مدبرة
- تزامن
- أخرى
- لنا
- النتائج
- الناتج
- في الخارج
- صفقة
- الباندا
- خبز
- المعلمات
- الشريكة
- pass
- مرت
- عاطفي
- أداء
- أذونات
- لا يزال قائما
- خط أنابيب
- المكان
- المنصة
- أفلاطون
- الذكاء افلاطون البيانات
- أفلاطون داتا
- نقاط
- سياسة
- منشور
- يحتمل
- ممارسة
- الشروط
- سابق
- سابقا
- عملية المعالجة
- العمليات
- معالجة
- المنتجات
- محترف
- المهنيين
- ملامح
- إسقاط
- مزود
- مقدمي
- ويوفر
- بايثون
- جودة
- الاستفسارات
- رفع
- نطاق
- عرض
- نادي القراءة
- حقيقي
- في الوقت الحقيقي
- الأخيرة
- نوصي
- منطقة
- المنظمين
- المرحل
- يحل محل
- استبدال
- مطلوب
- المتطلبات الأساسية
- مورد
- الموارد
- على التوالي
- استجابة
- النتائج
- احتفظ
- حق
- النوع
- صف
- يجري
- تشغيل
- s
- حفظ
- سيناريوهات
- الإستراحة
- بسلاسة
- بحث
- القسم
- تأمين
- انظر تعريف
- طلب
- كبير
- Serverless
- خدماتنا
- ضبط
- الإعداد
- قذيفة
- ينبغي
- إظهار
- يظهر
- الاشارات
- تبسيط
- منذ
- عزباء
- لقطة
- حل
- الحلول
- بعض
- محدد
- محدد
- الإنفاق
- ملخص الحساب
- الحالة
- خطوة
- خطوات
- لا يزال
- تخزين
- تخزين
- الإستراتيجيات
- مجرى
- خيط
- ناجح
- هذه
- كاف
- الدعم
- مدعومة
- أنظمة
- جدول
- أخذ
- مع الأخذ
- مهمة
- فريق
- التكنولوجيا
- قالب
- شكر
- أن
- •
- من مشاركة
- منهم
- then
- هناك.
- تشبه
- هم
- طرف ثالث
- عبر
- الوقت
- إلى
- مسار
- تحول
- السفر
- صحيح
- محاولة
- الثلاثاء
- تحول
- اثنان
- نوع
- ui
- موحد
- وحدة
- تحديث
- آخر التحديثات
- تحديث
- ترقية
- ترقية
- تحميل
- الأستعمال
- تستخدم
- حالة الاستخدام
- مستعمل
- المستخدمين
- استخدام
- قيمنا
- القيم
- الإصدار
- بواسطة
- المزيد
- الرؤى
- رؤية
- مرئي
- مشى
- تريد
- وكان
- we
- الويب
- خدمات ويب
- حسن
- ابحث عن
- متى
- سواء
- التي
- من الذى
- سوف
- مع
- في غضون
- بدون
- للعمل
- سير العمل
- سوف
- اكتب
- مكتوب
- لصحتك!
- حل متجر العقارات الشامل الخاص بك في جورجيا
- زفيرنت