A szervezetek minden iparágban összetett adatfeldolgozási követelményekkel rendelkeznek az analitikai felhasználási eseteikhez a különböző analitikai rendszerekben, mint pl. adattavak az AWS-en, adattárházak (Amazon RedShift), keresés (Amazon OpenSearch szolgáltatás), NoSQL (Amazon DynamoDB), gépi tanulás (Amazon SageMaker), és több. Az analitikai szakemberek feladata, hogy értéket merítsenek az ezekben az elosztott rendszerekben tárolt adatokból, hogy jobb, biztonságosabb és költségoptimalizált élményt teremtsenek ügyfeleik számára. A digitális médiavállalatok például arra törekszenek, hogy egyesítsék és dolgozzák fel a belső és külső adatbázisokban lévő adatkészleteket, hogy egységes nézeteket alkossanak ügyfélprofiljaikról, ösztönözzék az innovatív funkciókra vonatkozó ötleteket, és növeljék a platform elkötelezettségét.
Ezekben a forgatókönyvekben a kiszolgáló nélküli adatintegrációs ajánlatot kereső ügyfelek használhatják AWS ragasztó mint az adatok feldolgozásának és katalogizálásának alapvető összetevője. Az AWS Glue jól integrálható az AWS-szolgáltatásokkal és partnertermékekkel, és alacsony kódú/kód nélküli kivonatolási, átalakítási és betöltési (ETL) lehetőségeket kínál az elemzési, gépi tanulási (ML) vagy alkalmazásfejlesztési munkafolyamatok lehetővé tételéhez. Az AWS Glue ETL feladatok egy összetettebb folyamat egyik összetevője lehetnek. Az ezen összetevők futtatásának összehangolása és a függőségek kezelése kulcsfontosságú az adatstratégiában. Amazon által felügyelt munkafolyamatok az Apache Airflows számára (Amazon MWAA) az adatfolyamokat elosztott technológiák segítségével hangszereli, beleértve a helyszíni erőforrásokat, az AWS-szolgáltatásokat és a harmadik féltől származó összetevőket.
Ebben a bejegyzésben bemutatjuk, hogyan lehet leegyszerűsíteni az Airflow által levezényelt AWS ragasztófeladatok figyelését az Amazon MWAA legújabb funkcióival.
A megoldás áttekintése
Ez a bejegyzés a következőket tárgyalja:
- Az Amazon MWAA környezet frissítése a 2.4.3-as verzióra.
- Az AWS ragasztófeladat megszervezése Airflow-ból Irányított aciklikus grafikon (DAG).
- Az Airflow Amazon szolgáltatói csomag megfigyelhetőségi fejlesztései az Amazon MWAA-ban. Mostantól konszolidálhatja az AWS Glue-feladatok futtatási naplóit az Airflow konzolon az adatfolyamok hibaelhárításának egyszerűsítése érdekében. Az Amazon MWAA konzol egyetlen referenciává válik az AWS Glue-feladat-futtatások figyeléséhez és elemzéséhez. Korábban a támogató csapatoknak kellett hozzáférniük a AWS felügyeleti konzol és tegyen manuális lépéseket ehhez a láthatósághoz. Ez a funkció alapértelmezés szerint az Amazon MWAA 2.4.3-as verziójától érhető el.
Az alábbi ábra szemlélteti megoldásunk architektúráját.
Előfeltételek
A következő előfeltételekre van szüksége:
Állítsa be az Amazon MWAA környezetet
A környezet létrehozására vonatkozó utasításokért lásd: Hozzon létre egy Amazon MWAA környezetet. A meglévő felhasználók számára azt javasoljuk, hogy frissítsenek a 2.4.3-as verzióra, hogy kihasználhassák az ebben a bejegyzésben bemutatott megfigyelhetőségi fejlesztéseket.
Az Amazon MWAA 2.4.3-as verzióra történő frissítésének lépései attól függően változnak, hogy az aktuális verzió 1.10.12 vagy 2.2.2. Ebben a bejegyzésben mindkét lehetőséget megvitatjuk.
Amazon MWAA környezet beállításának előfeltételei
A következő előfeltételeknek kell megfelelnie:
Frissítés az 1.10.12-es verzióról a 2.4.3-ra
Ha az Amazon MWAA verzióját használja 1.10.12, hivatkozni Migráció egy új Amazon MWAA környezetre frissíteni 2.4.3-ra.
Frissítés a 2.0.2-es vagy 2.2.2-es verzióról 2.4.3-ra
Ha az Amazon MWAA környezet 2.2.2-es vagy régebbi verzióját használja, hajtsa végre a következő lépéseket:
- Hozzon létre egy követelmények.txt fájlt az egyéni függőségekhez a DAG-okhoz szükséges speciális verziókkal.
- Töltse fel a fájlt az Amazon S3-ra a megfelelő helyen, ahol az Amazon MWAA környezet a függőségek telepítéséhez szükséges követelményekre mutat.txt.
- Kövesse a következő lépéseket: Migráció egy új Amazon MWAA környezetre és válassza ki a 2.4.3 verziót.
Frissítse a DAG-jait
Előfordulhat, hogy azoknak az ügyfeleknek, akik egy régebbi Amazon MWAA környezetről frissítettek, frissíteniük kell a meglévő DAG-okat. Az Airflow 2.4.3-as verziójában az Airflow környezet alapértelmezés szerint az Amazon szolgáltatói csomag 6.0.0-s verzióját fogja használni. Ez a csomag tartalmazhat néhány potenciálisan törést okozó változtatást, például az operátornevek módosítását. Például a AWSGlueJobOperator elavult, és helyére a GlueJobOperator. A kompatibilitás fenntartása érdekében frissítse Airflow DAG-jait úgy, hogy a korábbi verziók elavult vagy nem támogatott operátorait újakra cseréli. Hajtsa végre a következő lépéseket:
- navigáljon Amazon AWS üzemeltetők.
- Válassza ki az Amazon MWAA példányára telepített megfelelő verziót (alapértelmezés szerint 6.0.0) a támogatott Airflow szolgáltatók listájának megtekintéséhez.
- Végezze el a szükséges módosításokat a meglévő DAG-kódban, és töltse fel a módosított fájlokat az Amazon S3 DAG-helyére.
Hangszerelje az AWS ragasztó feladatát az Airflow-ból
Ez a szakasz egy AWS ragasztófeladat megszervezésének részleteit ismerteti az Airflow DAG-kon belül. Az Airflow megkönnyíti az adatfolyamok fejlesztését a heterogén rendszerek, például a helyszíni folyamatok, a külső függőségek, az egyéb AWS-szolgáltatások és egyebek közötti függőségekkel.
A CloudTrail naplóösszesítésének összehangolása az AWS Glue és az Amazon MWAA segítségével
Ebben a példában az Amazon MWAA használatával egy olyan AWS Glue Python Shell-feladat megszervezésére szolgáló használati esetet mutatunk be, amely a CloudTrail-naplókon alapuló összesített metrikákat is megőrzi.
A CloudTrail lehetővé teszi az AWS-fiókjában végrehajtott AWS API-hívások láthatóságát. Ezekkel az adatokkal egy gyakori felhasználási eset az, hogy használati mutatókat gyűjtenek a fiókja erőforrásaira ható megbízókról az ellenőrzési és szabályozási igények miatt.
A CloudTrail események naplózása során JSON-fájlokként jelennek meg az Amazon S3-ban, amelyek nem ideálisak analitikai lekérdezésekhez. Szeretnénk ezeket az adatokat összesíteni, és Parquet fájlokként megőrizni az optimális lekérdezési teljesítmény érdekében. Első lépésként az Athena segítségével elvégezhetjük az adatok kezdeti lekérdezését, mielőtt további összesítést végeznénk az AWS ragasztófeladatunkban. Az AWS ragasztóadat-katalógus tábla létrehozásával kapcsolatos további információkért lásd: A CloudTrail-naplók táblájának létrehozása az Athenában partícióvetítéssel adat. Miután megvizsgáltuk az adatokat az Athénán keresztül, és eldöntöttük, hogy milyen mutatókat szeretnénk megtartani az összesített táblázatokban, létrehozhatunk egy AWS-ragasztófeladatot.
Hozzon létre egy CloudTrail táblázatot az Athénában
Először is létre kell hoznunk egy táblázatot az adatkatalógusunkban, amely lehetővé teszi a CloudTrail adatok lekérdezését az Athénán keresztül. A következő mintalekérdezés létrehoz egy táblát két partícióval a Régióban és a dátumon (snapshot_date néven). Ügyeljen arra, hogy cserélje ki a CloudTrail-csoport, az AWS-fiókazonosító és a CloudTrail-táblanév helyőrzőit:
create external table if not exists `<<<CLOUDTRAIL_TABLE_NAME>>>`( `eventversion` string comment 'from deserializer', `useridentity` struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>> comment 'from deserializer', `eventtime` string comment 'from deserializer', `eventsource` string comment 'from deserializer', `eventname` string comment 'from deserializer', `awsregion` string comment 'from deserializer', `sourceipaddress` string comment 'from deserializer', `useragent` string comment 'from deserializer', `errorcode` string comment 'from deserializer', `errormessage` string comment 'from deserializer', `requestparameters` string comment 'from deserializer', `responseelements` string comment 'from deserializer', `additionaleventdata` string comment 'from deserializer', `requestid` string comment 'from deserializer', `eventid` string comment 'from deserializer', `resources` array<struct<arn:string,accountid:string,type:string>> comment 'from deserializer', `eventtype` string comment 'from deserializer', `apiversion` string comment 'from deserializer', `readonly` string comment 'from deserializer', `recipientaccountid` string comment 'from deserializer', `serviceeventdetails` string comment 'from deserializer', `sharedeventid` string comment 'from deserializer', `vpcendpointid` string comment 'from deserializer')
PARTITIONED BY ( `region` string, `snapshot_date` string)
ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://<<<CLOUDTRAIL_BUCKET>>>/AWSLogs/<<<ACCOUNT_ID>>>/CloudTrail/'
TBLPROPERTIES ( 'projection.enabled'='true', 'projection.region.type'='enum', 'projection.region.values'='us-east-2,us-east-1,us-west-1,us-west-2,af-south-1,ap-east-1,ap-south-1,ap-northeast-3,ap-northeast-2,ap-southeast-1,ap-southeast-2,ap-northeast-1,ca-central-1,eu-central-1,eu-west-1,eu-west-2,eu-south-1,eu-west-3,eu-north-1,me-south-1,sa-east-1', 'projection.snapshot_date.format'='yyyy/mm/dd', 'projection.snapshot_date.interval'='1', 'projection.snapshot_date.interval.unit'='days', 'projection.snapshot_date.range'='2020/10/01,now', 'projection.snapshot_date.type'='date', 'storage.location.template'='s3://<<<CLOUDTRAIL_BUCKET>>>/AWSLogs/<<<ACCOUNT_ID>>>/CloudTrail/${region}/${snapshot_date}')
Futtassa az előző lekérdezést az Athena konzolon, és jegyezze fel a tábla nevét és az AWS Glue Data Catalog adatbázisát, ahol létrehozták. Ezeket az értékeket később az Airflow DAG kódban használjuk.
Minta AWS Glue munkakód
A következő kód egy minta AWS Glue Python Shell munka amely a következőket teszi:
- Érveket vesz fel (amelyeket az Amazon MWAA DAG-tól adunk át), hogy melyik nap adatait dolgozza fel
- Használja a AWS SDK Pandákhoz Athena-lekérdezés futtatása a CloudTrail JSON-adatok kezdeti szűréséhez az AWS Glue-n kívül
- A Pandas segítségével egyszerű összesítést végez a szűrt adatokon
- Az összesített adatokat egy táblázatban adja ki az AWS ragasztóadat-katalógusába
- A feldolgozás során naplózást használ, amely látható lesz az Amazon MWAA-ban
import awswrangler as wr
import pandas as pd
import sys
import logging
from awsglue.utils import getResolvedOptions
from datetime import datetime, timedelta # Logging setup, redirects all logs to stdout
LOGGER = logging.getLogger()
formatter = logging.Formatter('%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s')
streamHandler = logging.StreamHandler(sys.stdout)
streamHandler.setFormatter(formatter)
LOGGER.addHandler(streamHandler)
LOGGER.setLevel(logging.INFO) LOGGER.info(f"Passed Args :: {sys.argv}") sql_query_template = """
select
region,
useridentity.arn,
eventsource,
eventname,
useragent from "{cloudtrail_glue_db}"."{cloudtrail_table}"
where snapshot_date='{process_date}'
and region in ('us-east-1','us-east-2') """ required_args = ['CLOUDTRAIL_GLUE_DB', 'CLOUDTRAIL_TABLE', 'TARGET_BUCKET', 'TARGET_DB', 'TARGET_TABLE', 'ACCOUNT_ID']
arg_keys = [*required_args, 'PROCESS_DATE'] if '--PROCESS_DATE' in sys.argv else required_args
JOB_ARGS = getResolvedOptions ( sys.argv, arg_keys) LOGGER.info(f"Parsed Args :: {JOB_ARGS}") # if process date was not passed as an argument, process yesterday's data
process_date = ( JOB_ARGS['PROCESS_DATE'] if JOB_ARGS.get('PROCESS_DATE','NONE') != "NONE" else (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d") ) LOGGER.info(f"Taking snapshot for :: {process_date}") RAW_CLOUDTRAIL_DB = JOB_ARGS['CLOUDTRAIL_GLUE_DB']
RAW_CLOUDTRAIL_TABLE = JOB_ARGS['CLOUDTRAIL_TABLE']
TARGET_BUCKET = JOB_ARGS['TARGET_BUCKET']
TARGET_DB = JOB_ARGS['TARGET_DB']
TARGET_TABLE = JOB_ARGS['TARGET_TABLE']
ACCOUNT_ID = JOB_ARGS['ACCOUNT_ID'] final_query = sql_query_template.format( process_date=process_date.replace("-","/"), cloudtrail_glue_db=RAW_CLOUDTRAIL_DB, cloudtrail_table=RAW_CLOUDTRAIL_TABLE
) LOGGER.info(f"Running Query :: {final_query}") raw_cloudtrail_df = wr.athena.read_sql_query( sql=final_query, database=RAW_CLOUDTRAIL_DB, ctas_approach=False, s3_output=f"s3://{TARGET_BUCKET}/athena-results",
) raw_cloudtrail_df['ct']=1 agg_df = raw_cloudtrail_df.groupby(['arn','region','eventsource','eventname','useragent'],as_index=False).agg({'ct':'sum'})
agg_df['snapshot_date']=process_date LOGGER.info(agg_df.info(verbose=True)) upload_path = f"s3://{TARGET_BUCKET}/{TARGET_DB}/{TARGET_TABLE}" if not agg_df.empty: LOGGER.info(f"Upload to {upload_path}") try: response = wr.s3.to_parquet( df=agg_df, path=upload_path, dataset=True, database=TARGET_DB, table=TARGET_TABLE, mode="overwrite_partitions", schema_evolution=True, partition_cols=["snapshot_date"], compression="snappy", index=False ) LOGGER.info(response) except Exception as exc: LOGGER.error("Uploading to S3 failed") LOGGER.exception(exc) raise exc
else: LOGGER.info(f"Dataframe was empty, nothing to upload to {upload_path}")
Az alábbiakban felsorolunk néhány fő előnyt az AWS ragasztómunkában:
- Egy Athena lekérdezést használunk annak biztosítására, hogy a kezdeti szűrést az AWS ragasztófeladatunkon kívül végezzük. Mint ilyen, egy minimális számítási igényű Python Shell-feladat továbbra is elegendő egy nagy CloudTrail-adatkészlet összesítéséhez.
- Biztosítjuk a elemzőkönyvtár-beállítási lehetőség be van kapcsolva, amikor létrehozzuk az AWS Glue feladatunkat az AWS SDK for Pandas könyvtár használatához.
Hozzon létre egy AWS ragasztófeladatot
Az AWS ragasztófeladat létrehozásához hajtsa végre a következő lépéseket:
- Másolja ki az előző szakaszban található szkriptet, és mentse el egy helyi fájlba. Ennél a bejegyzésnél a fájl neve
script.py
. - Az AWS Glue konzolon válassza a lehetőséget ETL állások a navigációs ablaktáblában.
- Hozzon létre egy új munkát, és válassza ki Python Shell szkriptszerkesztő.
- választ Töltsön fel és szerkesszen egy meglévő szkriptet és töltse fel a helyileg mentett fájlt.
- A pop-art design, négy időzóna kijelzése egyszerre és méretének arányai azok az érvek, amelyek a NeXtime Time Zones-t kiváló választássá teszik. Válassza a Teremt.
- A Munka részletei lapon adja meg az AWS ragasztófeladatának nevét.
- A IAM szerepkör, válasszon egy meglévő szerepet, vagy hozzon létre egy új szerepet, amely rendelkezik az Amazon S3, az AWS Glue és az Athena számára szükséges engedélyekkel. A szerepkörnek le kell kérdeznie a korábban létrehozott CloudTrail táblát, és írnia kell egy kimeneti helyre.
Használhatja a következő minta házirend-kódot. Cserélje ki a helyőrzőket a CloudTrail naplózónára, a kimeneti tábla nevére, a kimeneti AWS Glue adatbázisára, a kimeneti S3 gyűjtőre, a CloudTrail tábla nevére, a CloudTrail táblát tartalmazó AWS Glue adatbázisára és az AWS-fiókazonosítójára.
{ "Version": "2012-10-17", "Statement": [ { "Action": [ "s3:List*", "s3:Get*" ], "Resource": [ "arn:aws:s3:::<<<CLOUDTRAIL_LOGS_BUCKET>>>/*", "arn:aws:s3:::<<<CLOUDTRAIL_LOGS_BUCKET>>>*" ], "Effect": "Allow", "Sid": "GetS3CloudtrailData" }, { "Action": [ "glue:Get*", "glue:BatchGet*" ], "Resource": [ "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:catalog", "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:database/<<<GLUE_DB_WITH_CLOUDTRAIL_TABLE>>>", "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:table/<<<GLUE_DB_WITH_CLOUDTRAIL_TABLE>>>/<<<CLOUDTRAIL_TABLE>>>*" ], "Effect": "Allow", "Sid": "GetGlueCatalogCloudtrailData" }, { "Action": [ "s3:PutObject*", "s3:Abort*", "s3:DeleteObject*", "s3:GetObject*", "s3:GetBucket*", "s3:List*", "s3:Head*" ], "Resource": [ "arn:aws:s3:::<<<OUTPUT_S3_BUCKET>>>", "arn:aws:s3:::<<<OUTPUT_S3_BUCKET>>>/<<<OUTPUT_GLUE_DB>>>/<<<OUTPUT_TABLE_NAME>>>/*" ], "Effect": "Allow", "Sid": "WriteOutputToS3" }, { "Action": [ "glue:CreateTable", "glue:CreatePartition", "glue:UpdatePartition", "glue:UpdateTable", "glue:DeleteTable", "glue:DeletePartition", "glue:BatchCreatePartition", "glue:BatchDeletePartition", "glue:Get*", "glue:BatchGet*" ], "Resource": [ "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:catalog", "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:database/<<<OUTPUT_GLUE_DB>>>", "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:table/<<<OUTPUT_GLUE_DB>>>/<<<OUTPUT_TABLE_NAME>>>*" ], "Effect": "Allow", "Sid": "AllowOutputToGlue" }, { "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:*:*:/aws-glue/*", "Effect": "Allow", "Sid": "LogsAccess" }, { "Action": [ "s3:GetObject*", "s3:GetBucket*", "s3:List*", "s3:DeleteObject*", "s3:PutObject", "s3:PutObjectLegalHold", "s3:PutObjectRetention", "s3:PutObjectTagging", "s3:PutObjectVersionTagging", "s3:Abort*" ], "Resource": [ "arn:aws:s3:::<<<ATHENA_RESULTS_BUCKET>>>", "arn:aws:s3:::<<<ATHENA_RESULTS_BUCKET>>>/*" ], "Effect": "Allow", "Sid": "AccessToAthenaResults" }, { "Action": [ "athena:StartQueryExecution", "athena:StopQueryExecution", "athena:GetDataCatalog", "athena:GetQueryResults", "athena:GetQueryExecution" ], "Resource": [ "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:catalog", "arn:aws:athena:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:datacatalog/AwsDataCatalog", "arn:aws:athena:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:workgroup/primary" ], "Effect": "Allow", "Sid": "AllowAthenaQuerying" } ]
}
A Python verzió, választ Python 3.9.
- választ Töltsön be általános analitikai könyvtárakat.
- A Adatfeldolgozó egységek, választ 1 DPU.
- Hagyja a többi beállítást alapértelmezettként, vagy módosítsa szükség szerint.
- A pop-art design, négy időzóna kijelzése egyszerre és méretének arányai azok az érvek, amelyek a NeXtime Time Zones-t kiváló választássá teszik. Válassza a Megtakarítás a munkakonfiguráció mentéséhez.
Konfiguráljon egy Amazon MWAA DAG-ot az AWS ragasztófeladat irányításához
A következő kód egy DAG-ra vonatkozik, amely képes levezényelni az általunk létrehozott AWS ragasztófeladatot. Ebben a DAG-ban a következő főbb jellemzőket használjuk ki:
"""Sample DAG"""
import airflow.utils
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow import DAG
from datetime import timedelta
import airflow.utils # allow backfills via DAG run parameters
process_date = '{{ dag_run.conf.get("process_date") if dag_run.conf.get("process_date") else "NONE" }}' dag = DAG( dag_id = "CLOUDTRAIL_LOGS_PROCESSING", default_args = { 'depends_on_past':False, 'start_date':airflow.utils.dates.days_ago(0), 'retries':1, 'retry_delay':timedelta(minutes=5), 'catchup': False }, schedule_interval = None, # None for unscheduled or a cron expression - E.G. "00 12 * * 2" - at 12noon Tuesday dagrun_timeout = timedelta(minutes=30), max_active_runs = 1, max_active_tasks = 1 # since there is only one task in our DAG
) ## Log ingest. Assumes Glue Job is already created
glue_ingestion_job = GlueJobOperator( task_id="<<<some-task-id>>>", job_name="<<<GLUE_JOB_NAME>>>", script_args={ "--ACCOUNT_ID":"<<<YOUR_AWS_ACCT_ID>>>", "--CLOUDTRAIL_GLUE_DB":"<<<GLUE_DB_WITH_CLOUDTRAIL_TABLE>>>", "--CLOUDTRAIL_TABLE":"<<<CLOUDTRAIL_TABLE>>>", "--TARGET_BUCKET": "<<<OUTPUT_S3_BUCKET>>>", "--TARGET_DB": "<<<OUTPUT_GLUE_DB>>>", # should already exist "--TARGET_TABLE": "<<<OUTPUT_TABLE_NAME>>>", "--PROCESS_DATE": process_date }, region_name="us-east-1", dag=dag, verbose=True
) glue_ingestion_job
Növelje az AWS ragasztófeladatok megfigyelhetőségét az Amazon MWAA-ban
Az AWS ragasztófeladatok naplókat írnak ide amazonfelhőóra. Az Airflow Amazon szolgáltatói csomagjának közelmúltbeli megfigyelhetőségi fejlesztéseivel ezek a naplók immár integrálva vannak az Airflow feladatnaplókkal. Ez a konszolidáció az Airflow felhasználóinak végpontok közötti láthatóságot biztosít közvetlenül az Airflow felhasználói felületén, így nincs szükség a CloudWatch-ban vagy az AWS Glue konzolban való keresésre.
A funkció használatához győződjön meg arról, hogy az Amazon MWAA környezethez csatolt IAM szerepkör rendelkezik a következő jogosultságokkal a szükséges naplók lekéréséhez és írásához:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:DescribeLogStreams", "logs:FilterLogEvents", "logs:GetLogGroupFields", "logs:GetQueryResults", ], "Resource": [ "arn:aws:logs:*:*:log-group:airflow-243-<<<Your environment name>>>-*"--Your Amazon MWAA Log Stream Name ] } ]
}
Ha verbose=true, az AWS Glue-feladat futtatási naplói megjelennek az Airflow feladatnaplóiban. Az alapértelmezett érték false. További információkért lásd: paraméterek.
Ha engedélyezve van, a DAG-ok beolvasnak az AWS ragasztófeladat CloudWatch naplófolyamából, és továbbítják őket az Airflow DAG AWS ragasztófeladat lépésnaplóihoz. Ez részletes betekintést nyújt egy AWS ragasztófeladat valós idejű futtatásába a DAG-naplókon keresztül. Ne feledje, hogy az AWS ragasztófeladatok kimeneti és hibaüzenet CloudWatch naplócsoportot generálnak a feladat STDOUT és STDERR alapján. A kimeneti naplócsoportban lévő összes napló és a hibanaplócsoport kivétel- vagy hibanaplója továbbítva az Amazon MWAA-ba.
Az AWS-adminisztrátorok mostantól csak az Airflow-ra korlátozhatják a támogatási csoportok hozzáférését, így az Amazon MWAA a munkaszervezés és a munkaegészségügyi kezelés egyetlen üvegtáblája. Korábban a felhasználóknak ellenőrizniük kellett az AWS Glue-feladat futási állapotát az Airflow DAG lépéseiben, és le kellett kérniük a feladat futtatásának azonosítóját. Ezután hozzá kellett férniük az AWS Glue konzolhoz, hogy megtalálják a munkavégzési előzményeket, az azonosító segítségével megkereshessék a számunkra érdekes munkát, végül pedig a feladat CloudWatch naplóihoz kellett navigálniuk a hibaelhárításhoz.
Hozd létre a DAG-t
A DAG létrehozásához hajtsa végre a következő lépéseket:
- Mentse el az előző DAG-kódot egy helyi .py fájlba, lecserélve a jelzett helyőrzőket.
Az AWS-fiókazonosító, az AWS-ragasztó-feladat neve, a CloudTrail-táblázattal rendelkező AWS-ragasztó-adatbázis és a CloudTrail-táblanév értékeinek már ismertnek kell lenniük. Igény szerint módosíthatja a kimeneti S3 tárolót, a kimeneti AWS ragasztó adatbázist és a kimeneti tábla nevét, de győződjön meg arról, hogy az AWS ragasztófeladat korábban használt IAM-szerepe megfelelően van konfigurálva.
- Az Amazon MWAA konzolon navigáljon a környezetébe, és nézze meg, hol van tárolva a DAG-kód.
A DAGs mappa az S3 tároló előtagja, ahová a DAG fájlt el kell helyezni.
- Töltsd fel oda a szerkesztett fájlt.
- Nyissa meg az Amazon MWAA konzolt, és ellenőrizze, hogy a DAG megjelenik-e a táblázatban.
Futtassa a DAG-ot
A DAG futtatásához hajtsa végre a következő lépéseket:
- Válasszon az alábbi lehetőségek közül:
- Trigger DAG – Emiatt a tegnapi adatok lesznek feldolgozandó adatok
- Trigger DAG w/ config – Ezzel a lehetőséggel más dátumot is átadhat, potenciálisan a háttérkitöltésekhez, amelyet a rendszer a használatával kér le
dag_run.conf
a DAG kódban, majd paraméterként átkerült az AWS Glue jobba
A következő képernyőképen a további konfigurációs lehetőségek láthatók, ha úgy dönt Trigger DAG w/ config.
- Figyelje a DAG-ot futás közben.
- Amikor a DAG befejeződött, nyissa meg a futtatás részleteit.
A jobb oldali panelen megtekintheti a naplókat, vagy választhat Feladatpéldány részletei a teljes nézetért.
- Tekintse meg az AWS ragasztófeladat kimeneti naplóit az Amazon MWAA-ban az AWS ragasztókonzol használata nélkül, köszönhetően
GlueJobOperator
bőbeszédű zászló.
Az AWS ragasztófeladat eredményeit a megadott kimeneti táblába írják.
- Kérdezze le ezt a táblázatot az Athénén keresztül, hogy megerősítse, hogy sikeres volt.
Összegzésként
Az Amazon MWAA mostantól egyetlen helyen követi nyomon az AWS Glue-feladatok állapotát, és lehetővé teszi, hogy az Airflow konzolt egyetlen üvegtáblaként használja a munkaszervezéshez és az egészségügyi kezeléshez. Ebben a bejegyzésben végigjártuk az AWS ragasztófeladatok Airflow segítségével történő megszervezésének lépéseit GlueJobOperator
. Az új megfigyelhetőségi fejlesztésekkel zökkenőmentesen, egységes élményben oldhatja meg az AWS ragasztófeladatokat. Azt is bemutattuk, hogyan frissítheti az Amazon MWAA környezetét egy kompatibilis verzióra, frissítheti a függőségeket, és ennek megfelelően módosíthatja az IAM szerepkör házirendjét.
A gyakori hibaelhárítási lépésekkel kapcsolatos további információkért lásd: Hibaelhárítás: Amazon MWAA környezet létrehozása és frissítése. Az Amazon MWAA-környezetbe való átállás részletesebb részleteiért lásd: Frissítés 1.10-ról 2-ra. Ha többet szeretne megtudni az Airflow Amazon szolgáltatói csomagban található AWS ragasztófeladatok jobb megfigyelhetőségét célzó nyílt forráskódú módosításokról, tekintse meg a továbbítási naplók az AWS ragasztófeladatokból.
Végül javasoljuk, hogy látogassa meg a AWS Big Data Blog az elemzésről, az ML-ről és az AWS adatkezeléséről szóló egyéb anyagokhoz.
A szerzőkről
Rushabh Lokhande az AWS Professional Services Analytics gyakorlattal rendelkező adat- és ML mérnök. Segít ügyfeleinek a big data, a gépi tanulási és az elemzési megoldások megvalósításában. A munkán kívül szívesen tölt időt a családjával, olvas, fut és golfozik.
Ryan Gomes az AWS Professional Services Analytics gyakorlattal rendelkező adat- és ML mérnök. Szenvedélyesen törekszik arra, hogy segítse ügyfeleit jobb eredmények elérésében a felhőben található elemzések és gépi tanulási megoldások révén. A munkán kívül szívesen fitnesz, főz, és minőségi időt tölt barátaival és családjával.
Vishwa Gupta vezető adatépítész az AWS Professional Services Analytics gyakorlattal. Segít ügyfeleinek a big data és analitikai megoldások megvalósításában. A munkán kívül szívesen tölt időt a családjával, utazik és új ételeket próbál ki.
- SEO által támogatott tartalom és PR terjesztés. Erősödjön még ma.
- PlatoAiStream. Web3 adatintelligencia. Felerősített tudás. Hozzáférés itt.
- A jövő pénzverése – Adryenn Ashley. Hozzáférés itt.
- Részvények vásárlása és eladása PRE-IPO társaságokban a PREIPO® segítségével. Hozzáférés itt.
- Forrás: https://aws.amazon.com/blogs/big-data/simplify-aws-glue-job-orchestration-and-monitoring-with-amazon-mwaa/
- :van
- :is
- :nem
- :ahol
- $ UP
- 1
- 10
- 100
- 12
- 8
- a
- Rólunk
- hozzáférés
- Eszerint
- Fiók
- Elérése
- át
- Akció
- aciklikus
- További
- Előny
- előnyei
- Után
- összesítés
- Minden termék
- lehetővé
- lehetővé teszi, hogy
- már
- Is
- amazon
- Az Amazon Web Services
- an
- Analitikai
- analitika
- elemez
- és a
- bármilyen
- Apache
- api
- Alkalmazás
- Application Development
- megfelelő
- építészet
- VANNAK
- érv
- érvek
- AS
- At
- attribútumok
- könyvvizsgálat
- elérhető
- AWS
- AWS ragasztó
- AWS professzionális szolgáltatások
- alapján
- BE
- válik
- óta
- előtt
- hogy
- Jobb
- között
- Nagy
- Big adatok
- mindkét
- Törés
- épít
- de
- by
- hívott
- kéri
- TUD
- eset
- esetek
- katalógus
- okai
- változik
- Változások
- ellenőrizze
- A pop-art design, négy időzóna kijelzése egyszerre és méretének arányai azok az érvek, amelyek a NeXtime Time Zones-t kiváló választássá teszik. Válassza a
- felhő
- kód
- COM
- össze
- megjegyzés
- Közös
- Companies
- kompatibilitás
- összeegyeztethető
- teljes
- bonyolult
- összetevő
- alkatrészek
- Kiszámít
- Configuration
- megerősít
- Konzol
- megszilárdítása
- konszolidáció
- főzés
- Mag
- burkolatok
- teremt
- készítette
- teremt
- létrehozása
- Jelenlegi
- szokás
- vevő
- Ügyfelek
- DAG
- dátum
- adatintegráció
- adatfeldolgozás
- adatstratégia
- adattárházak
- adatbázis
- adatbázisok
- adatkészletek
- találka
- Időpontok
- dátum idő
- Nap
- határozott
- alapértelmezett
- szállított
- igazolták
- attól
- elavult
- részletes
- részletek
- Fejlesztés
- különbözik
- különböző
- digitális
- Digitális média
- közvetlenül
- megvitatni
- megosztott
- elosztott rendszerek
- do
- nem
- Ennek
- csinált
- alatt
- e
- Korábban
- Megkönnyíti
- hatás
- megszüntetése
- más
- lehetővé
- engedélyezve
- lehetővé teszi
- végtől végig
- eljegyzés
- mérnök
- fejlesztések
- biztosítására
- belép
- Környezet
- hiba
- Eter (ETH)
- események
- példa
- Kivéve
- kivétel
- létezik
- létező
- létezik
- tapasztalat
- Tapasztalatok
- feltárt
- kifejezés
- külső
- kivonat
- Sikertelen
- hamis
- család
- Funkció
- jellegű
- Jellemzők
- filé
- Fájlok
- szűrő
- Végül
- Találjon
- alkalmasság
- következő
- élelmiszer
- A
- formátum
- barátok
- ból ből
- Tele
- gyűjt
- generál
- üveg
- Go
- golf
- kormányzás
- Csoport
- Hadoop
- Legyen
- he
- Egészség
- segít
- segít
- történelem
- Kaptár
- Hogyan
- How To
- HTML
- http
- HTTPS
- IAM
- ID
- ideális
- ötletek
- azonosító
- if
- illusztrálja
- végre
- importál
- in
- mélyreható
- tartalmaz
- Beleértve
- Növelje
- <p></p>
- jelzett
- iparágak
- info
- információ
- kezdetben
- újító
- meglátások
- telepítése
- példa
- utasítás
- integrált
- integráció
- kamat
- belső
- bele
- IT
- Munka
- Állások
- jpg
- json
- Kulcs
- ismert
- nagy
- a későbbiekben
- legutolsó
- TANUL
- tanulás
- könyvtár
- LIMIT
- Lista
- kiszámításának
- helyi
- helyileg
- elhelyezkedés
- log
- bejelentkezve
- fakitermelés
- keres
- gép
- gépi tanulás
- készült
- fenntartása
- csinál
- Gyártás
- sikerült
- vezetés
- kezelése
- kézikönyv
- anyag
- Lehet..
- Média
- Találkozik
- üzenet
- Metrics
- vándorló
- minimális
- ML
- módosított
- modul
- monitor
- ellenőrzés
- több
- kell
- név
- nevek
- Keresse
- Navigáció
- elengedhetetlen
- Szükség
- szükséges
- igények
- Új
- semmi
- Most
- of
- felajánlás
- on
- ONE
- azok
- csak
- nyitva
- nyílt forráskódú
- nyílt forráskód
- operátor
- üzemeltetők
- optimálisan
- opció
- Opciók
- or
- hangszerelt
- hangszerelés
- Más
- mi
- eredmények
- teljesítmény
- kívül
- csomag
- pandák
- üvegtábla
- paraméterek
- partner
- elhalad
- Elmúlt
- szenvedélyes
- teljesítmény
- engedélyek
- fennáll
- csővezeték
- Hely
- emelvény
- Plató
- Platón adatintelligencia
- PlatoData
- pont
- politika
- állás
- potenciálisan
- gyakorlat
- előfeltételek
- előző
- korábban
- folyamat
- Folyamatok
- feldolgozás
- Termékek
- szakmai
- tehetséges alkalmazottal
- Profilok
- Vetítés
- ellátó
- szolgáltatók
- biztosít
- Piton
- világítás
- lekérdezések
- emel
- hatótávolság
- Olvass
- Olvasás
- igazi
- real-time
- új
- ajánl
- vidék
- szabályozók
- Relé
- cserélni
- helyébe
- kötelező
- követelmények
- forrás
- Tudástár
- illetőleg
- válasz
- Eredmények
- megtartása
- jobb
- Szerep
- SOR
- futás
- futás
- s
- Megtakarítás
- forgatókönyvek
- sdk
- zökkenőmentesen
- Keresés
- Rész
- biztonság
- lát
- Keresnek
- idősebb
- vagy szerver
- Szolgáltatások
- beállítás
- felépítés
- Héj
- kellene
- előadás
- Műsorok
- Egyszerű
- egyszerűsítése
- óta
- egyetlen
- Pillanatkép
- megoldások
- Megoldások
- néhány
- különleges
- meghatározott
- Költési
- nyilatkozat
- Állapot
- Lépés
- Lépései
- Még mindig
- tárolás
- memorizált
- Stratégia
- folyam
- Húr
- sikeres
- ilyen
- elegendő
- támogatás
- Támogatott
- Systems
- táblázat
- Vesz
- bevétel
- Feladat
- csapat
- Technologies
- sablon
- köszönöm
- hogy
- A
- azok
- Őket
- akkor
- Ott.
- Ezek
- ők
- harmadik fél
- ezt
- Keresztül
- idő
- nak nek
- vágány
- Átalakítás
- Utazó
- igaz
- megpróbál
- Kedd
- Fordult
- kettő
- típus
- ui
- egységes
- egység
- Frissítések
- Frissítés
- frissítése
- frissítés
- frissített
- Feltöltés
- Használat
- használ
- használati eset
- használt
- Felhasználók
- segítségével
- érték
- Értékek
- változat
- keresztül
- Megnézem
- nézetek
- láthatóság
- látható
- sétált
- akar
- volt
- we
- háló
- webes szolgáltatások
- JÓL
- Mit
- amikor
- vajon
- ami
- WHO
- lesz
- val vel
- belül
- nélkül
- Munka
- munkafolyamatok
- lenne
- ír
- írott
- te
- A te
- zephyrnet