Tüm sektörlerdeki kuruluşlar, aşağıdakiler gibi farklı analitik sistemlerindeki analitik kullanım durumları için karmaşık veri işleme gereksinimlerine sahiptir: AWS'de veri gölleri, veri depoları (Amazon Kırmızıya Kaydırma), aramak (Amazon Açık Arama Hizmeti), SQL (Amazon DinamoDB), makine öğrenme (Amazon Adaçayı Yapıcı), ve dahası. Analitik uzmanları, müşterileri için daha iyi, güvenli ve maliyeti optimize edilmiş deneyimler oluşturmak üzere bu dağıtılmış sistemlerde depolanan verilerden değer elde etmekle görevlendirilir. Örneğin, dijital medya şirketleri, müşteri profillerinin birleşik görünümlerini oluşturmak, yenilikçi özellikler için fikirleri teşvik etmek ve platform etkileşimini artırmak için dahili ve harici veritabanlarındaki veri kümelerini birleştirip işlemeye çalışır.
Bu senaryolarda, sunucusuz bir veri entegrasyonu teklifi arayan müşteriler, AWS Tutkal verileri işlemek ve kataloglamak için temel bir bileşen olarak. AWS Glue, AWS hizmetleri ve iş ortağı ürünleriyle iyi bir şekilde entegredir ve analitik, makine öğrenimi (ML) veya uygulama geliştirme iş akışlarını etkinleştirmek için az kodlu/kodsuz ayıklama, dönüştürme ve yükleme (ETL) seçenekleri sunar. AWS Glue ETL işleri, daha karmaşık bir işlem hattındaki bileşenlerden biri olabilir. Bu bileşenlerin işleyişini düzenlemek ve bu bileşenler arasındaki bağımlılıkları yönetmek, bir veri stratejisinde önemli bir yetenektir. Apache Airflows için Amazon Tarafından Yönetilen İş Akışları (Amazon MWAA), şirket içi kaynaklar, AWS hizmetleri ve üçüncü taraf bileşenler dahil olmak üzere dağıtılmış teknolojileri kullanarak veri işlem hatlarını düzenler.
Bu gönderide, Amazon MWAA'nın en yeni özelliklerini kullanarak Airflow tarafından yönetilen bir AWS Glue işinin izlenmesini nasıl basitleştireceğimizi gösteriyoruz.
Çözüme genel bakış
Bu gönderi aşağıdakileri tartışıyor:
- Bir Amazon MWAA ortamını 2.4.3 sürümüne yükseltme.
- Bir AWS Glue işini Airflow'dan düzenleme Yönlendirilmiş döngüsüz grafiği (DAG).
- Airflow Amazon sağlayıcı paketinin Amazon MWAA'daki gözlemlenebilirlik geliştirmeleri. Artık veri işlem hatlarında sorun gidermeyi basitleştirmek için AWS Glue işlerinin çalıştırma günlüklerini Airflow konsolunda birleştirebilirsiniz. Amazon MWAA konsolu, AWS Glue iş çalıştırmalarını izlemek ve analiz etmek için tek bir referans haline gelir. Önceden, destek ekiplerinin AWS Yönetim Konsolu ve bu görünürlük için manuel adımlar atın. Bu özellik varsayılan olarak Amazon MWAA sürüm 2.4.3'ten edinilebilir.
Aşağıdaki şema çözüm mimarimizi göstermektedir.
Önkoşullar
Aşağıdaki ön koşullara ihtiyacınız var:
Amazon MWAA ortamını kurun
Ortamınızı oluşturmaya ilişkin yönergeler için bkz. Bir Amazon MWAA ortamı oluşturun. Mevcut kullanıcılar için, bu yayında öne çıkan gözlemlenebilirlik iyileştirmelerinden yararlanmak için 2.4.3 sürümüne yükseltme yapmanızı öneririz.
Amazon MWAA'yı 2.4.3 sürümüne yükseltme adımları, geçerli sürümün 1.10.12 veya 2.2.2 olmasına bağlı olarak farklılık gösterir. Bu yazıda her iki seçeneği de tartışıyoruz.
Bir Amazon MWAA ortamı kurmak için ön koşullar
Aşağıdaki ön koşulları karşılamanız gerekir:
1.10.12 sürümünden 2.4.3 sürümüne yükseltme
Amazon MWAA sürümünü kullanıyorsanız 1.10.12bakın Yeni bir Amazon MWAA ortamına geçiş 2.4.3'e yükseltmek için.
2.0.2 veya 2.2.2 sürümünden 2.4.3 sürümüne yükseltin
Amazon MWAA ortamı sürüm 2.2.2 veya daha eski bir sürümü kullanıyorsanız aşağıdaki adımları tamamlayın:
- Hat için bir herhangi bir özel bağımlılık için gereklilikler.txt DAG'leriniz için gereken belirli sürümlerle.
- Dosyayı Amazon S3'e yükleyin Amazon MWAA ortamının bağımlılıkları yüklemek için gereklilikler.txt'ye işaret ettiği uygun konumda.
- İçindeki adımları izleyin Yeni bir Amazon MWAA ortamına geçiş ve 2.4.3 sürümünü seçin.
DAG'lerinizi güncelleyin
Daha eski bir Amazon MWAA ortamından yükseltme yapan müşterilerin mevcut DAG'lerde güncellemeler yapması gerekebilir. Airflow 2.4.3 sürümünde, Airflow ortamı varsayılan olarak Amazon sağlayıcı paketi sürüm 6.0.0'ı kullanacaktır. Bu paket, operatör adlarındaki değişiklikler gibi potansiyel olarak bazı önemli değişiklikleri içerebilir. Örneğin, AWSGlueJobOperatörü kullanımdan kaldırıldı ve yerine GlueJob Operatörü. Uyumluluğu sürdürmek için, önceki sürümlerin kullanımdan kaldırılmış veya desteklenmeyen operatörlerini yenileriyle değiştirerek Airflow DAG'lerinizi güncelleyin. Aşağıdaki adımları tamamlayın:
- Şu yöne rotayı ayarla Amazon AWS Operatörleri.
- Desteklenen Airflow operatörlerinin bir listesini bulmak için Amazon MWAA bulut sunucunuzda kurulu uygun sürümü (varsayılan olarak 6.0.0.) seçin.
- Mevcut DAG kodunda gerekli değişiklikleri yapın ve değiştirilen dosyaları Amazon S3'teki DAG konumuna yükleyin.
Airflow'dan AWS Glue işini düzenleyin
Bu bölüm, Airflow DAG'lerinde bir AWS Glue işini düzenlemenin ayrıntılarını içerir. Airflow, şirket içi süreçler, harici bağımlılıklar, diğer AWS hizmetleri ve daha fazlası gibi heterojen sistemler arasındaki bağımlılıklarla veri ardışık düzenlerinin geliştirilmesini kolaylaştırır.
AWS Glue ve Amazon MWAA ile CloudTrail günlük toplama işlemini düzenleyin
Bu örnekte, CloudTrail günlüklerine dayalı birleştirilmiş metrikleri sürdüren bir AWS Glue Python Shell işini düzenlemek için Amazon MWAA'nın kullanıldığı bir kullanım örneğini inceliyoruz.
CloudTrail, AWS hesabınızda yapılmakta olan AWS API çağrılarının görünürlüğünü sağlar. Bu verilerle ilgili yaygın bir kullanım durumu, denetim ve düzenleyici ihtiyaçlar için hesabınızın kaynaklarına göre hareket eden sorumlulara ilişkin kullanım ölçümlerini toplamak olacaktır.
CloudTrail olayları günlüğe kaydedilirken, analitik sorgular için ideal olmayan Amazon S3'te JSON dosyaları olarak teslim edilirler. Optimum sorgu performansına izin vermek için bu verileri bir araya getirmek ve Parquet dosyaları olarak kalıcı kılmak istiyoruz. İlk adım olarak, AWS Glue işimizde ek toplamalar yapmadan önce verilerin ilk sorgulamasını yapmak için Athena'yı kullanabiliriz. Bir AWS Glue Data Catalog tablosu oluşturma hakkında daha fazla bilgi için bkz. Bölüm projeksiyonunu kullanarak Athena'da CloudTrail günlükleri için tablo oluşturma veri. Athena aracılığıyla verileri inceledikten ve toplu tablolarda hangi metrikleri tutmak istediğimize karar verdikten sonra, bir AWS Glue işi oluşturabiliriz.
Athena'da bir CloudTrail tablosu oluşturun
Öncelikle Veri Kataloğumuzda CloudTrail verilerinin Athena üzerinden sorgulanabilmesini sağlayan bir tablo oluşturmamız gerekiyor. Aşağıdaki örnek sorgu, Bölge ve tarihte (anlık görüntü_tarihi olarak adlandırılır) iki bölümden oluşan bir tablo oluşturur. CloudTrail grubunuz, AWS hesap kimliğiniz ve CloudTrail tablo adınız için yer tutucuları değiştirdiğinizden emin olun:
create external table if not exists `<<<CLOUDTRAIL_TABLE_NAME>>>`( `eventversion` string comment 'from deserializer', `useridentity` struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>> comment 'from deserializer', `eventtime` string comment 'from deserializer', `eventsource` string comment 'from deserializer', `eventname` string comment 'from deserializer', `awsregion` string comment 'from deserializer', `sourceipaddress` string comment 'from deserializer', `useragent` string comment 'from deserializer', `errorcode` string comment 'from deserializer', `errormessage` string comment 'from deserializer', `requestparameters` string comment 'from deserializer', `responseelements` string comment 'from deserializer', `additionaleventdata` string comment 'from deserializer', `requestid` string comment 'from deserializer', `eventid` string comment 'from deserializer', `resources` array<struct<arn:string,accountid:string,type:string>> comment 'from deserializer', `eventtype` string comment 'from deserializer', `apiversion` string comment 'from deserializer', `readonly` string comment 'from deserializer', `recipientaccountid` string comment 'from deserializer', `serviceeventdetails` string comment 'from deserializer', `sharedeventid` string comment 'from deserializer', `vpcendpointid` string comment 'from deserializer')
PARTITIONED BY ( `region` string, `snapshot_date` string)
ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://<<<CLOUDTRAIL_BUCKET>>>/AWSLogs/<<<ACCOUNT_ID>>>/CloudTrail/'
TBLPROPERTIES ( 'projection.enabled'='true', 'projection.region.type'='enum', 'projection.region.values'='us-east-2,us-east-1,us-west-1,us-west-2,af-south-1,ap-east-1,ap-south-1,ap-northeast-3,ap-northeast-2,ap-southeast-1,ap-southeast-2,ap-northeast-1,ca-central-1,eu-central-1,eu-west-1,eu-west-2,eu-south-1,eu-west-3,eu-north-1,me-south-1,sa-east-1', 'projection.snapshot_date.format'='yyyy/mm/dd', 'projection.snapshot_date.interval'='1', 'projection.snapshot_date.interval.unit'='days', 'projection.snapshot_date.range'='2020/10/01,now', 'projection.snapshot_date.type'='date', 'storage.location.template'='s3://<<<CLOUDTRAIL_BUCKET>>>/AWSLogs/<<<ACCOUNT_ID>>>/CloudTrail/${region}/${snapshot_date}')
Athena konsolunda önceki sorguyu çalıştırın ve tablo adını ve oluşturulduğu AWS Glue Data Catalog veritabanını not edin. Bu değerleri daha sonra Airflow DAG kodunda kullanırız.
Örnek AWS Glue iş kodu
Aşağıdaki kod bir örnektir AWS Glue Python Kabuğu işi aşağıdakileri yapar:
- Hangi günün verilerinin işleneceğine ilişkin argümanları (Amazon MWAA DAG'ımızdan ilettiğimiz) alır
- Kullanır Pandalar için AWS SDK AWS Glue dışında CloudTrail JSON verilerinin ilk filtrelemesini yapmak için bir Athena sorgusu çalıştırmak için
- Filtrelenmiş veriler üzerinde basit toplamalar yapmak için Pandaları kullanır
- Birleştirilmiş verileri bir tablodaki AWS Glue Data Catalog'a çıkarır
- Amazon MWAA'da görünecek olan işleme sırasında günlük kaydını kullanır
import awswrangler as wr
import pandas as pd
import sys
import logging
from awsglue.utils import getResolvedOptions
from datetime import datetime, timedelta # Logging setup, redirects all logs to stdout
LOGGER = logging.getLogger()
formatter = logging.Formatter('%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s')
streamHandler = logging.StreamHandler(sys.stdout)
streamHandler.setFormatter(formatter)
LOGGER.addHandler(streamHandler)
LOGGER.setLevel(logging.INFO) LOGGER.info(f"Passed Args :: {sys.argv}") sql_query_template = """
select
region,
useridentity.arn,
eventsource,
eventname,
useragent from "{cloudtrail_glue_db}"."{cloudtrail_table}"
where snapshot_date='{process_date}'
and region in ('us-east-1','us-east-2') """ required_args = ['CLOUDTRAIL_GLUE_DB', 'CLOUDTRAIL_TABLE', 'TARGET_BUCKET', 'TARGET_DB', 'TARGET_TABLE', 'ACCOUNT_ID']
arg_keys = [*required_args, 'PROCESS_DATE'] if '--PROCESS_DATE' in sys.argv else required_args
JOB_ARGS = getResolvedOptions ( sys.argv, arg_keys) LOGGER.info(f"Parsed Args :: {JOB_ARGS}") # if process date was not passed as an argument, process yesterday's data
process_date = ( JOB_ARGS['PROCESS_DATE'] if JOB_ARGS.get('PROCESS_DATE','NONE') != "NONE" else (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d") ) LOGGER.info(f"Taking snapshot for :: {process_date}") RAW_CLOUDTRAIL_DB = JOB_ARGS['CLOUDTRAIL_GLUE_DB']
RAW_CLOUDTRAIL_TABLE = JOB_ARGS['CLOUDTRAIL_TABLE']
TARGET_BUCKET = JOB_ARGS['TARGET_BUCKET']
TARGET_DB = JOB_ARGS['TARGET_DB']
TARGET_TABLE = JOB_ARGS['TARGET_TABLE']
ACCOUNT_ID = JOB_ARGS['ACCOUNT_ID'] final_query = sql_query_template.format( process_date=process_date.replace("-","/"), cloudtrail_glue_db=RAW_CLOUDTRAIL_DB, cloudtrail_table=RAW_CLOUDTRAIL_TABLE
) LOGGER.info(f"Running Query :: {final_query}") raw_cloudtrail_df = wr.athena.read_sql_query( sql=final_query, database=RAW_CLOUDTRAIL_DB, ctas_approach=False, s3_output=f"s3://{TARGET_BUCKET}/athena-results",
) raw_cloudtrail_df['ct']=1 agg_df = raw_cloudtrail_df.groupby(['arn','region','eventsource','eventname','useragent'],as_index=False).agg({'ct':'sum'})
agg_df['snapshot_date']=process_date LOGGER.info(agg_df.info(verbose=True)) upload_path = f"s3://{TARGET_BUCKET}/{TARGET_DB}/{TARGET_TABLE}" if not agg_df.empty: LOGGER.info(f"Upload to {upload_path}") try: response = wr.s3.to_parquet( df=agg_df, path=upload_path, dataset=True, database=TARGET_DB, table=TARGET_TABLE, mode="overwrite_partitions", schema_evolution=True, partition_cols=["snapshot_date"], compression="snappy", index=False ) LOGGER.info(response) except Exception as exc: LOGGER.error("Uploading to S3 failed") LOGGER.exception(exc) raise exc
else: LOGGER.info(f"Dataframe was empty, nothing to upload to {upload_path}")
Aşağıda, bu AWS Glue işindeki bazı önemli avantajlar yer almaktadır:
- İlk filtrelemenin AWS Glue işimizin dışında yapıldığından emin olmak için bir Athena sorgusu kullanıyoruz. Bu nedenle, minimum işlem içeren bir Python Kabuğu işi, büyük bir CloudTrail veri kümesini bir araya getirmek için yine de yeterlidir.
- biz sağlamak analitik kitaplık kümesi seçeneği AWS SDK for Pandas kitaplığını kullanmak için AWS Glue işimizi oluştururken açılır.
Bir AWS Glue işi oluşturun
AWS Glue işinizi oluşturmak için aşağıdaki adımları tamamlayın:
- Önceki bölümde yer alan betiği kopyalayın ve yerel bir dosyaya kaydedin. Bu gönderi için dosya denir
script.py
. - AWS Glue konsolunda seçin ETL işleri Gezinti bölmesinde.
- Yeni bir iş oluşturun ve seçin Python Shell komut dosyası düzenleyicisi.
- seç Mevcut bir komut dosyasını yükleyin ve düzenleyin ve kaydettiğiniz dosyayı yerel olarak yükleyin.
- Klinik oluşturmak.
- Üzerinde İş detayları sekmesinde, AWS Glue işiniz için bir ad girin.
- İçin IAM rolü, mevcut bir rolü seçin veya Amazon S3, AWS Glue ve Athena için gerekli izinlere sahip yeni bir rol oluşturun. Rol, daha önce oluşturduğunuz CloudTrail tablosunu sorgulamalı ve bir çıkış konumuna yazmalıdır.
Aşağıdaki örnek ilke kodunu kullanabilirsiniz. Yer tutucuları CloudTrail günlük grubunuz, çıktı tablosu adı, çıktı AWS Glue veritabanı, çıktı S3 grubu, CloudTrail tablo adı, CloudTrail tablosunu içeren AWS Glue veritabanı ve AWS hesap kimliğiniz ile değiştirin.
{ "Version": "2012-10-17", "Statement": [ { "Action": [ "s3:List*", "s3:Get*" ], "Resource": [ "arn:aws:s3:::<<<CLOUDTRAIL_LOGS_BUCKET>>>/*", "arn:aws:s3:::<<<CLOUDTRAIL_LOGS_BUCKET>>>*" ], "Effect": "Allow", "Sid": "GetS3CloudtrailData" }, { "Action": [ "glue:Get*", "glue:BatchGet*" ], "Resource": [ "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:catalog", "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:database/<<<GLUE_DB_WITH_CLOUDTRAIL_TABLE>>>", "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:table/<<<GLUE_DB_WITH_CLOUDTRAIL_TABLE>>>/<<<CLOUDTRAIL_TABLE>>>*" ], "Effect": "Allow", "Sid": "GetGlueCatalogCloudtrailData" }, { "Action": [ "s3:PutObject*", "s3:Abort*", "s3:DeleteObject*", "s3:GetObject*", "s3:GetBucket*", "s3:List*", "s3:Head*" ], "Resource": [ "arn:aws:s3:::<<<OUTPUT_S3_BUCKET>>>", "arn:aws:s3:::<<<OUTPUT_S3_BUCKET>>>/<<<OUTPUT_GLUE_DB>>>/<<<OUTPUT_TABLE_NAME>>>/*" ], "Effect": "Allow", "Sid": "WriteOutputToS3" }, { "Action": [ "glue:CreateTable", "glue:CreatePartition", "glue:UpdatePartition", "glue:UpdateTable", "glue:DeleteTable", "glue:DeletePartition", "glue:BatchCreatePartition", "glue:BatchDeletePartition", "glue:Get*", "glue:BatchGet*" ], "Resource": [ "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:catalog", "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:database/<<<OUTPUT_GLUE_DB>>>", "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:table/<<<OUTPUT_GLUE_DB>>>/<<<OUTPUT_TABLE_NAME>>>*" ], "Effect": "Allow", "Sid": "AllowOutputToGlue" }, { "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:*:*:/aws-glue/*", "Effect": "Allow", "Sid": "LogsAccess" }, { "Action": [ "s3:GetObject*", "s3:GetBucket*", "s3:List*", "s3:DeleteObject*", "s3:PutObject", "s3:PutObjectLegalHold", "s3:PutObjectRetention", "s3:PutObjectTagging", "s3:PutObjectVersionTagging", "s3:Abort*" ], "Resource": [ "arn:aws:s3:::<<<ATHENA_RESULTS_BUCKET>>>", "arn:aws:s3:::<<<ATHENA_RESULTS_BUCKET>>>/*" ], "Effect": "Allow", "Sid": "AccessToAthenaResults" }, { "Action": [ "athena:StartQueryExecution", "athena:StopQueryExecution", "athena:GetDataCatalog", "athena:GetQueryResults", "athena:GetQueryExecution" ], "Resource": [ "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:catalog", "arn:aws:athena:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:datacatalog/AwsDataCatalog", "arn:aws:athena:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:workgroup/primary" ], "Effect": "Allow", "Sid": "AllowAthenaQuerying" } ]
}
İçin Python sürümü, seçmek Python 3.9.
- seç Ortak analiz kitaplıklarını yükleyin.
- İçin Veri işleme birimleri, seçmek 1 Veri Birimi.
- Diğer seçenekleri varsayılan olarak bırakın veya gerektiği gibi ayarlayın.
- Klinik İndirim iş yapılandırmanızı kaydetmek için.
AWS Glue işini düzenlemek için bir Amazon MWAA DAG yapılandırın
Aşağıdaki kod, oluşturduğumuz AWS Glue işini düzenleyebilen bir DAG içindir. Bu DAG'de aşağıdaki temel özelliklerden yararlanıyoruz:
"""Sample DAG"""
import airflow.utils
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow import DAG
from datetime import timedelta
import airflow.utils # allow backfills via DAG run parameters
process_date = '{{ dag_run.conf.get("process_date") if dag_run.conf.get("process_date") else "NONE" }}' dag = DAG( dag_id = "CLOUDTRAIL_LOGS_PROCESSING", default_args = { 'depends_on_past':False, 'start_date':airflow.utils.dates.days_ago(0), 'retries':1, 'retry_delay':timedelta(minutes=5), 'catchup': False }, schedule_interval = None, # None for unscheduled or a cron expression - E.G. "00 12 * * 2" - at 12noon Tuesday dagrun_timeout = timedelta(minutes=30), max_active_runs = 1, max_active_tasks = 1 # since there is only one task in our DAG
) ## Log ingest. Assumes Glue Job is already created
glue_ingestion_job = GlueJobOperator( task_id="<<<some-task-id>>>", job_name="<<<GLUE_JOB_NAME>>>", script_args={ "--ACCOUNT_ID":"<<<YOUR_AWS_ACCT_ID>>>", "--CLOUDTRAIL_GLUE_DB":"<<<GLUE_DB_WITH_CLOUDTRAIL_TABLE>>>", "--CLOUDTRAIL_TABLE":"<<<CLOUDTRAIL_TABLE>>>", "--TARGET_BUCKET": "<<<OUTPUT_S3_BUCKET>>>", "--TARGET_DB": "<<<OUTPUT_GLUE_DB>>>", # should already exist "--TARGET_TABLE": "<<<OUTPUT_TABLE_NAME>>>", "--PROCESS_DATE": process_date }, region_name="us-east-1", dag=dag, verbose=True
) glue_ingestion_job
Amazon MWAA'da AWS Glue işlerinin gözlemlenebilirliğini artırın
AWS Glue işleri, günlükleri şuraya yazar: Amazon Bulut İzleme. Airflow'un Amazon sağlayıcı paketindeki son gözlemlenebilirlik iyileştirmeleriyle, bu günlükler artık Airflow görev günlükleriyle entegre edilmiştir. Bu birleştirme, Airflow kullanıcılarına doğrudan Airflow kullanıcı arabiriminde uçtan uca görünürlük sağlayarak CloudWatch veya AWS Glue konsolunda arama yapma ihtiyacını ortadan kaldırır.
Bu özelliği kullanmak için Amazon MWAA ortamına eklenmiş IAM rolünün, gerekli günlükleri almak ve yazmak için aşağıdaki izinlere sahip olduğundan emin olun:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:DescribeLogStreams", "logs:FilterLogEvents", "logs:GetLogGroupFields", "logs:GetQueryResults", ], "Resource": [ "arn:aws:logs:*:*:log-group:airflow-243-<<<Your environment name>>>-*"--Your Amazon MWAA Log Stream Name ] } ]
}
verbose=true ise, AWS Glue iş çalıştırma günlükleri Airflow görev günlüklerinde gösterilir. Varsayılan yanlıştır. Daha fazla bilgi için bkz. parametreler.
Etkinleştirildiğinde, DAG'ler AWS Glue işinin CloudWatch günlük akışını okur ve bunları Airflow DAG AWS Glue işi adım günlüklerine aktarır. Bu, bir AWS Glue işinin DAG günlükleri aracılığıyla gerçek zamanlı olarak çalıştırılmasına ilişkin ayrıntılı içgörüler sağlar. AWS Glue işlerinin, sırasıyla işin STDOUT ve STDERR değerlerine dayalı olarak bir çıktı ve hata CloudWatch günlük grubu oluşturduğunu unutmayın. Çıkış günlüğü grubundaki tüm günlükler ve hata günlüğü grubundaki istisna veya hata günlükleri, Amazon MWAA'ya aktarılır.
AWS yöneticileri artık bir destek ekibinin erişimini yalnızca Airflow ile sınırlayabilir, bu da Amazon MWAA'yı iş düzenleme ve iş sağlığı yönetimi konusunda tek cam bölme haline getirir. Önceden, kullanıcıların Airflow DAG adımlarında AWS Glue iş çalıştırma durumunu kontrol etmesi ve iş çalıştırma tanımlayıcısını alması gerekiyordu. Ardından, iş çalıştırma geçmişini bulmak, tanımlayıcıyı kullanarak ilgilenilen işi aramak ve son olarak sorun gidermek için işin CloudWatch günlüklerine gitmek için AWS Glue konsoluna erişmeleri gerekiyordu.
DAG'ı oluşturun
DAG'yi oluşturmak için aşağıdaki adımları tamamlayın:
- Önceki DAG kodunu, belirtilen yer tutucuları değiştirerek yerel bir .py dosyasına kaydedin.
AWS hesap kimliğiniz, AWS Glue iş adınız, CloudTrail tablolu AWS Glue veritabanı ve CloudTrail tablo adınızın değerleri zaten biliniyor olmalıdır. Çıktı S3 klasörünü, çıktı AWS Glue veritabanını ve çıktı tablosu adını gerektiği gibi ayarlayabilirsiniz, ancak daha önce kullandığınız AWS Glue işinin IAM rolünün buna göre yapılandırıldığından emin olun.
- DAG kodunun nerede depolandığını görmek için Amazon MWAA konsolunda ortamınıza gidin.
DAGs klasörü, DAG dosyanızın yerleştirilmesi gereken S3 klasörünün önekidir.
- Düzenlenmiş dosyanızı oraya yükleyin.
- DAG'nin tabloda göründüğünü doğrulamak için Amazon MWAA konsolunu açın.
DAG'ı çalıştırın
DAG'ı çalıştırmak için aşağıdaki adımları tamamlayın:
- Aşağıdaki seçeneklerden birini seçin:
- Tetikleyici DAG – Bu, dünün verilerinin işlenecek veri olarak kullanılmasına neden olur
- DAG'yi yapılandırma ile tetikleyin – Bu seçenekle, potansiyel olarak kullanılarak alınan dolgular için farklı bir tarih verebilirsiniz.
dag_run.conf
DAG kodunda ve ardından parametre olarak AWS Glue işine geçirildi
Aşağıdaki ekran görüntüsü, isterseniz ek yapılandırma seçeneklerini gösterir. DAG'yi yapılandırma ile tetikleyin.
- DAG'ı çalışırken izleyin.
- DAG tamamlandığında çalıştırmanın ayrıntılarını açın.
Sağ bölmede, günlükleri görüntüleyebilir veya Görev Örneği Ayrıntıları tam bir görünüm için.
- sayesinde AWS Glue konsolunu kullanmadan Amazon MWAA'daki AWS Glue iş çıktısı günlüklerini görüntüleyin.
GlueJobOperator
ayrıntılı bayrak
AWS Glue işi, sonuçları belirttiğiniz çıktı tablosuna yazacaktır.
- Başarılı olduğunu doğrulamak için bu tabloyu Athena aracılığıyla sorgulayın.
Özet
Amazon MWAA artık AWS Glue iş durumunu izlemek için tek bir yer sağlıyor ve Airflow konsolunu iş düzenleme ve sağlık yönetimi için tek ekran olarak kullanmanızı sağlıyor. Bu gönderide, AWS Glue işlerini Airflow kullanarak düzenleme adımlarını inceledik. GlueJobOperator
. Yeni gözlemlenebilirlik iyileştirmeleriyle, birleşik bir deneyimde AWS Glue işlerini sorunsuz bir şekilde giderebilirsiniz. Ayrıca Amazon MWAA ortamınızı uyumlu bir sürüme nasıl yükselteceğinizi, bağımlılıkları nasıl güncelleyeceğinizi ve buna göre IAM rol politikasını nasıl değiştireceğinizi de gösterdik.
Yaygın sorun giderme adımları hakkında daha fazla bilgi için bkz. Sorun Giderme: Bir Amazon MWAA ortamı oluşturma ve güncelleme. Bir Amazon MWAA ortamına geçişle ilgili ayrıntılı ayrıntılar için bkz. 1.10'ten 2'e yükseltiliyor. Airflow Amazon sağlayıcı paketindeki AWS Glue işlerinin daha fazla gözlemlenebilirliği için açık kaynak kod değişiklikleri hakkında bilgi edinmek üzere bkz. AWS Glue işlerinden geçiş günlükleri.
Son olarak, ziyaret etmenizi öneririz. AWS Büyük Veri Blogu AWS'de analitik, makine öğrenimi ve veri yönetişimi ile ilgili diğer materyaller için.
Yazarlar Hakkında
Rushabh Lohande AWS Profesyonel Hizmetler Analitiği Uygulamasına sahip bir Veri ve Makine Öğrenimi Mühendisidir. Müşterilerin büyük veri, makine öğrenimi ve analitik çözümleri uygulamalarına yardımcı olur. İş dışında ailesiyle vakit geçirmekten, kitap okumaktan, koşmaktan ve golf oynamaktan hoşlanır.
ryan gomes AWS Profesyonel Hizmetler Analitiği Uygulamasına sahip bir Veri ve Makine Öğrenimi Mühendisidir. Bulutta analitik ve makine öğrenimi çözümleri aracılığıyla müşterilerin daha iyi sonuçlar elde etmesine yardımcı olma konusunda tutkulu. İş dışında spor yapmaktan, yemek yapmaktan ve arkadaşları ve ailesiyle kaliteli zaman geçirmekten hoşlanıyor.
Vişva Gupta AWS Profesyonel Hizmetler Analitiği Uygulamasına sahip Kıdemli Veri Mimarıdır. Müşterilerin büyük veri ve analitik çözümlerini uygulamalarına yardımcı olur. İş dışında ailesiyle vakit geçirmekten, seyahat etmekten ve yeni yemekler denemekten hoşlanır.
- SEO Destekli İçerik ve Halkla İlişkiler Dağıtımı. Bugün Gücünüzü Artırın.
- PlatoAiStream. Web3 Veri Zekası. Bilgi Genişletildi. Buradan Erişin.
- Adryenn Ashley ile Geleceği Basmak. Buradan Erişin.
- PREIPO® ile PRE-IPO Şirketlerinde Hisse Al ve Sat. Buradan Erişin.
- Kaynak: https://aws.amazon.com/blogs/big-data/simplify-aws-glue-job-orchestration-and-monitoring-with-amazon-mwaa/
- :vardır
- :dır-dir
- :olumsuzluk
- :Neresi
- $UP
- 1
- 10
- 100
- 12
- 8
- a
- Hakkımızda
- erişim
- göre
- Hesap
- Başarmak
- karşısında
- Action
- asiklik
- Ek
- avantaj
- avantajları
- Sonra
- toplanma
- Türkiye
- izin vermek
- veriyor
- zaten
- Ayrıca
- Amazon
- Amazon Web Servisleri
- an
- Analitik
- analytics
- çözümlemek
- ve
- herhangi
- Apache
- api
- Uygulama
- Uygulama Geliştirme
- uygun
- mimari
- ARE
- tartışma
- argümanlar
- AS
- At
- öznitelikleri
- denetleme
- mevcut
- AWS
- AWS Tutkal
- AWS Profesyonel Hizmetleri
- merkezli
- BE
- olur
- olmuştur
- önce
- olmak
- Daha iyi
- arasında
- Büyük
- büyük Veri
- her ikisi de
- Kırma
- inşa etmek
- fakat
- by
- denilen
- aramalar
- CAN
- dava
- durumlarda
- katalog
- nedenleri
- değişiklik
- değişiklikler
- Kontrol
- Klinik
- bulut
- kod
- COM
- birleştirmek
- yorum Yap
- ortak
- Şirketler
- uygunluk
- uyumlu
- tamamlamak
- karmaşık
- bileşen
- bileşenler
- hesaplamak
- yapılandırma
- Onaylamak
- konsolos
- pekiştirmek
- sağlamlaştırma
- yemek pişirme
- çekirdek
- kapaklar
- yaratmak
- çevrimiçi kurslar düzenliyorlar.
- oluşturur
- Oluşturma
- akım
- görenek
- müşteri
- Müşteriler
- DAG
- veri
- veri entegrasyonu
- veri işleme
- veri stratejisi
- veri depoları
- veritabanı
- veritabanları
- veri kümeleri
- Tarih
- Tarih
- datetime
- Günler
- karar
- Varsayılan
- teslim edilen
- gösterdi
- bağlı
- önerilmiyor
- detaylı
- ayrıntılar
- gelişme
- farklılık
- farklı
- dijital
- Dijital medya
- direkt olarak
- tartışmak
- dağıtıldı
- dağıtılmış sistemler
- do
- yok
- yapıyor
- yapılmış
- sırasında
- e
- Daha erken
- hareket hızları
- Efekt
- ortadan
- başka
- etkinleştirmek
- etkin
- sağlar
- son uca
- nişan
- mühendis
- geliştirmeleri
- sağlamak
- Keşfet
- çevre
- hata
- Eter (ETH)
- olaylar
- örnek
- Dışında
- istisna
- mevcut
- mevcut
- var
- deneyim
- Deneyimler
- keşfedilmeyi
- ifade
- dış
- çıkarmak
- başarısız
- yanlış
- aile
- Özellikler(Hazırlık aşamasında)
- özellikli
- Özellikler
- fileto
- dosyalar
- süzme
- Nihayet
- bulmak
- uygunluk
- takip etme
- Gıda
- İçin
- biçim
- arkadaşlar
- itibaren
- tam
- toplamak
- oluşturmak
- bardak
- Go
- golf
- yönetim
- grup
- Hadoop'un
- Var
- he
- Sağlık
- yardım
- yardımcı olur
- tarih
- kovan
- Ne kadar
- Nasıl Yapılır
- HTML
- http
- HTTPS
- IAM
- ID
- ideal
- fikirler
- tanımlayıcı
- if
- göstermektedir
- uygulamak
- ithalat
- in
- derinlemesine
- dahil
- Dahil olmak üzere
- Artırmak
- artmış
- belirtilen
- Endüstri
- bilgi
- bilgi
- ilk
- yenilikçi
- anlayışlar
- yükleme
- örnek
- talimatlar
- entegre
- bütünleşme
- faiz
- iç
- içine
- IT
- İş
- Mesleki Öğretiler
- jpg
- json
- anahtar
- bilinen
- büyük
- sonra
- son
- ÖĞRENİN
- öğrenme
- Kütüphane
- LİMİT
- Liste
- yük
- yerel
- lokal olarak
- yer
- log
- giriş
- günlüğü
- bakıyor
- makine
- makine öğrenme
- yapılmış
- korumak
- yapmak
- Yapımı
- yönetilen
- yönetim
- yönetme
- Manuel
- malzeme
- Mayıs..
- medya
- Neden
- mesaj
- Metrikleri
- göç
- en az
- ML
- değiştirilmiş
- modül
- izlemek
- izleme
- Daha
- şart
- isim
- isimleri
- Gezin
- Navigasyon
- gerekli
- gerek
- gerekli
- ihtiyaçlar
- yeni
- hiçbir şey değil
- şimdi
- of
- teklif
- on
- ONE
- olanlar
- bir tek
- açık
- açık kaynak
- açık kaynak kodu
- Şebeke
- operatörler
- optimum
- seçenek
- Opsiyonlar
- or
- orkestra
- orkestrasyon
- Diğer
- bizim
- sonuçlar
- çıktı
- dışında
- paket
- pandalar
- bölmesi
- parametreler
- Partner
- geçmek
- geçti
- tutkulu
- performans
- izinleri
- devam
- boru hattı
- yer
- platform
- Platon
- Plato Veri Zekası
- PlatoVeri
- noktaları
- politika
- Çivi
- potansiyel
- uygulama
- önkoşullar
- önceki
- Önceden
- süreç
- Süreçler
- işleme
- Ürünler
- profesyonel
- profesyoneller
- Profiller
- Projeksiyon
- sağlayan
- sağlayıcılar
- sağlar
- Python
- kalite
- sorgular
- yükseltmek
- menzil
- Okumak
- Okuma
- gerçek
- gerçek zaman
- son
- tavsiye etmek
- bölge
- düzenleyici
- Röle
- değiştirmek
- yerine
- gereklidir
- Yer Alan Kurallar
- kaynak
- Kaynaklar
- sırasıyla
- yanıt
- Sonuçlar
- tutmak
- krallar gibi yaşamaya
- Rol
- SIRA
- koşmak
- koşu
- s
- İndirim
- senaryolar
- sdk
- sorunsuz
- Ara
- Bölüm
- güvenli
- görmek
- Aramak
- kıdemli
- Serverless
- Hizmetler
- ayar
- kurulum
- Kabuk
- meli
- şov
- Gösteriler
- Basit
- basitleştirmek
- beri
- tek
- Enstantane fotoğraf
- çözüm
- Çözümler
- biraz
- özel
- Belirtilen
- Harcama
- Açıklama
- Durum
- adım
- Basamaklar
- Yine
- hafızası
- saklı
- Stratejileri
- dere
- dizi
- başarılı
- böyle
- yeterli
- destek
- destekli
- Sistemler
- tablo
- Bizi daha iyi tanımak için
- alma
- Görev
- takım
- Teknolojileri
- şablon
- Teşekkürler
- o
- The
- ve bazı Asya
- Onları
- sonra
- Orada.
- Bunlar
- onlar
- üçüncü şahıslara ait
- Re-Tweet
- İçinden
- zaman
- için
- iz
- Dönüştürmek
- Seyahat
- gerçek
- denemek
- Salı
- Dönük
- iki
- tip
- ui
- birleşik
- birim
- Güncelleme
- Güncellemeler
- güncellenmesi
- yükseltmek
- yükseltilmiş
- Yükleme
- kullanım
- kullanım
- kullanım durumu
- Kullanılmış
- kullanıcılar
- kullanma
- değer
- Değerler
- versiyon
- üzerinden
- Görüntüle
- Gösterim
- görünürlük
- gözle görülür
- yürüdü
- istemek
- oldu
- we
- ağ
- web hizmetleri
- İYİ
- Ne
- ne zaman
- olup olmadığını
- hangi
- DSÖ
- irade
- ile
- içinde
- olmadan
- İş
- iş akışları
- olur
- yazmak
- yazılı
- sen
- zefirnet