Valoarea datelor este sensibilă la timp. Procesarea în timp real face ca deciziile bazate pe date să fie precise și acționabile în secunde sau minute în loc de ore sau zile. Captura de date de modificare (CDC) se referă la procesul de identificare și captare a modificărilor aduse datelor dintr-o bază de date și apoi livrarea acestor modificări în timp real către un sistem din aval. Captarea fiecărei modificări din tranzacțiile dintr-o bază de date sursă și mutarea lor la țintă în timp real menține sistemele sincronizate și ajută la cazurile de utilizare a analizei în timp real și la migrarea bazelor de date fără timpi de nefuncționare. Următoarele sunt câteva beneficii ale CDC:
- Elimină necesitatea actualizării încărcăturii în bloc și ferestrelor batch incomode, permițând încărcarea incrementală sau transmiterea în timp real a modificărilor datelor în depozitul țintă.
- Se asigură că datele din mai multe sisteme rămân sincronizate. Acest lucru este important în special dacă luați decizii sensibile la timp într-un mediu de date de mare viteză.
Kafka Connect este o componentă open-source a Apache Kafka care funcționează ca un hub de date centralizat pentru integrarea simplă a datelor între baze de date, depozite cheie-valoare, indici de căutare și sisteme de fișiere. The Registrul AWS Glue Schema vă permite să descoperiți, să controlați și să evoluați centralizat schemele fluxului de date. Kafka Connect și Schema Registry se integrează pentru a captura informații despre schemă de la conectori. Kafka Connect oferă un mecanism pentru conversia datelor din tipurile de date interne utilizate de Kafka Connect în tipuri de date reprezentate ca Avro, Protobuf sau Schema JSON. AvroConverter, ProtobufConverter și JsonSchemaConverter înregistrează automat schemele generate de conectorii Kafka (sursă) care produc date către Kafka. Conectorii (chiuveta) care consumă date de la Kafka primesc informații despre schemă în plus față de datele pentru fiecare mesaj. Acest lucru permite conectorilor receptor să cunoască structura datelor pentru a oferi capabilități precum menținerea unei scheme de tabel de bază de date într-un catalog de date.
Postarea demonstrează cum să construiți un CDC end-to-end folosind Amazon MSK Connect, un serviciu gestionat de AWS pentru implementarea și rularea aplicațiilor Kafka Connect și AWS Glue Schema Registry, care vă permite să descoperiți, să controlați și să evoluați centralizat schemele fluxului de date.
Prezentare generală a soluțiilor
Din partea producătorului, pentru acest exemplu alegem un compatibil MySQL Amazon Aurora baza de date ca sursă de date și avem un Debezium Conector MySQL pentru a efectua CDC. Conectorul Debezium monitorizează continuu bazele de date și împinge modificări la nivel de rând la un subiect Kafka. Conectorul preia schema din baza de date pentru a serializa înregistrările într-o formă binară. Dacă schema nu există deja în registru, schema va fi înregistrată. Dacă schema există, dar serializatorul folosește o versiune nouă, registrul schemei verifică Mod de compatibilitate a schemei înainte de a actualiza schema. În această soluție, folosim modul de compatibilitate inversă. Registrul de schemă returnează o eroare dacă o nouă versiune a schemei nu este compatibilă cu versiunea anterioară și putem configura Kafka Connect să trimită mesaje incompatibile în coada de mesaje moarte.
Pe partea de consumator, folosim un Serviciul Amazon de stocare simplă (Amazon S3) conector pentru a deserializa înregistrarea și a stoca modificările în Amazon S3. Construim și implementăm conectorul Debezium și chiuveta Amazon S3 folosind MSK Connect.
Exemplu de schemă
Pentru această postare, folosim următoarea schemă ca primă versiune a tabelului:
Cerințe preliminare
Înainte de a configura conectorii producătorului și consumatorilor MSK, trebuie să setăm mai întâi o sursă de date, un cluster MSK și un nou registru de schemă. Oferim un Formarea AWS Cloud șablon pentru a genera resursele suport necesare pentru soluție:
- O bază de date Aurora compatibilă cu MySQL ca sursă de date. Pentru a efectua CDC, activăm înregistrarea binară în Grupul de parametri DB cluster.
- Un cluster MSK. Pentru a simplifica conexiunea la rețea, folosim același VPC pentru baza de date Aurora și clusterul MSK.
- Două registre de schemă pentru a gestiona scheme pentru cheia mesajului și valoarea mesajului.
- O găleată S3 ca rezervor de date.
- Pluginurile MSK Connect și configurația lucrătorilor sunt necesare pentru această demonstrație.
- O Cloud Elastic de calcul Amazon (Amazon EC2) pentru a rula comenzile bazei de date.
Pentru a configura resurse în contul dvs. AWS, parcurgeți următorii pași într-o regiune AWS care acceptă Amazon MSK, MSK Connect și AWS Glue Schema Registry:
- Alege Lansați Stack:
- Alege Pagina Următoare →.
- Pentru Numele stivei, introduceți numele potrivit.
- Pentru Parola bazei de date, introduceți parola dorită pentru utilizatorul bazei de date.
- Păstrați alte valori ca implicite.
- Alege Pagina Următoare →.
- Pe pagina următoare, alegeți Pagina Următoare →.
- Consultați detaliile de pe pagina finală și selectați Recunosc că AWS CloudFormation ar putea crea resurse IAM.
- Alege Creați stivă.
Plugin personalizat pentru conectorul sursă și destinație
Un plugin personalizat este un set de fișiere JAR care conțin implementarea unuia sau mai multor conectori, transformări sau convertoare. Amazon MSK va instala pluginul pe lucrătorii clusterului MSK Connect unde rulează conectorul. Ca parte a acestei demonstrații, pentru conectorul sursă folosim open-source JAR-urile conectorului Debezium MySQL, iar pentru conectorul de destinație folosim comunitatea Confluent cu licență JAR-uri conector de chiuvetă Amazon S3. Ambele plugin-uri sunt adăugate și cu biblioteci pentru Serializatoare și deserializatoare Avro din Registrul AWS Glue Schema. Aceste pluginuri personalizate sunt deja create ca parte a șablonului CloudFormation implementat în pasul anterior.
Utilizați AWS Glue Schema Registry cu conectorul Debezium pe MSK Connect ca producător MSK
Mai întâi implementăm conectorul sursă folosind pluginul Debezium MySQL pentru a transmite date de la un Ediție compatibilă cu Amazon Aurora MySQL baza de date către Amazon MSK. Parcurgeți următorii pași:
- Pe consola Amazon MSK, în panoul de navigare, sub MSK Connect, alege conectori.
- Alege Creați conector.
- Alege Utilizați pluginul personalizat existent apoi alegeți pluginul personalizat cu numele care începe
msk-blog-debezium-source-plugin
. - Alege Pagina Următoare →.
- Introduceți un nume potrivit, cum ar fi
debezium-mysql-connector
și o descriere opțională. - Pentru cluster Apache Kafka, alege cluster MSK și alegeți clusterul creat de șablonul CloudFormation.
- In Configurarea conectorului, ștergeți valorile implicite și utilizați următoarele perechi cheie-valoare de configurare și cu valorile corespunzătoare:
- nume – Numele folosit pentru conector.
- baza de date.nume gazdă – Ieșirea CloudFormation pentru Punct final al bazei de date.
- baza de date.utilizator și bază de date.parolă – Parametrii trecuți în șablonul CloudFormation.
- database.history.kafka.bootstrap.servere – Ieșirea CloudFormation pentru Kafka Bootstrap.
- cheie.regiune.convertor și valoare.regiune.convertor – Regiunea ta.
Unele dintre aceste setări sunt generice și ar trebui specificate pentru orice conector. De exemplu:
- connector.class este clasa Java a conectorului
- tasks.max este numărul maxim de sarcini care ar trebui create pentru acest conector
Unele setari (database.*
, transforms.*
) sunt specifice conectorului Debezium MySQL. A se referi la Proprietăți de configurare a conectorului sursă Debezium MySQL pentru mai multe informatii.
Unele setari (key.converter.*
și value.converter.*
) sunt specifice Registrului Schema. Noi folosim AWSKafkaAvroConverter
de la Biblioteca de registru a schemelor AWS Glue ca convertor de format. Pentru a configura AWSKafkaAvroConverter
, folosim valoarea proprietăților constantei șirului în AWSschemaRegistryConstants clasă:
key.converter
șivalue.converter
controlați formatul datelor care vor fi scrise în Kafka pentru conectorii sursă sau citite de la Kafka pentru conectorii receptor. FolosimAWSKafkaAvroConverter
pentru formatul Avro.key.converter.registry.name
șivalue.converter.registry.name
definiți ce registru de schemă să utilizați.key.converter.compatibility
șivalue.converter.compatibility
definiți modelul de compatibilitate.
A se referi la Utilizarea Kafka Connect cu AWS Glue Schema Registry pentru mai multe informatii.
- În continuare, configuram Capacitatea conectorului. Putem alege Aprovizionat și lăsați alte proprietăți ca implicite
- Pentru Configurația lucrătorului, alegeți configurația personalizată a lucrătorului cu numele care începe
msk-gsr-blog
creat ca parte a șablonului CloudFormation. - Pentru Permisii de acces, folosește Gestionarea identității și accesului AWS (IAM) generat de șablonul CloudFormation
MSKConnectRole
. - Alege Pagina Următoare →.
- Pentru Securitate, alegeți valorile implicite.
- Alege Pagina Următoare →.
- Pentru Livrare bușteni, Selectați Livrați la Amazon CloudWatch Logs și căutați grupul de jurnal creat de șablonul CloudFormation (
msk-connector-logs
). - Alege Pagina Următoare →.
- Examinați setările și alegeți Creați conector.
După câteva minute, conectorul trece în starea de funcționare.
Utilizați AWS Glue Schema Registry cu conectorul receptor Confluent S3 care rulează pe MSK Connect ca consumator MSK
Implementăm conectorul receptor utilizând pluginul Confluent S3 pentru a transmite date de la Amazon MSK către Amazon S3. Parcurgeți următorii pași:
-
- Pe consola Amazon MSK, în panoul de navigare, sub MSK Connect, alege conectori.
- Alege Creați conector.
- Alege Utilizați pluginul personalizat existent și alegeți pluginul personalizat cu numele care începe
msk-blog-S3sink-plugin
. - Alege Pagina Următoare →.
- Introduceți un nume potrivit, cum ar fi
s3-sink-connector
și o descriere opțională. - Pentru cluster Apache Kafka, alege cluster MSK și selectați clusterul creat de șablonul CloudFormation.
- In Configurarea conectorului, ștergeți valorile prestabilite furnizate și utilizați următoarele perechi cheie-valoare de configurare cu valorile adecvate:
-
- nume – Același nume folosit pentru conector.
- s3.găleată.nume – Ieșirea CloudFormation pentru Numele cupei.
- s3.region, key.converter.region și value.converter.region – Regiunea ta.
-
- În continuare, configuram Capacitatea conectorului. Putem alege Aprovizionat și lăsați alte proprietăți ca implicite
- Pentru Configurația lucrătorului, alegeți configurația personalizată a lucrătorului cu numele care începe
msk-gsr-blog
creat ca parte a șablonului CloudFormation. - Pentru Permisii de acces, utilizați rolul IAM generat de șablonul CloudFormation
MSKConnectRole
. - Alege Pagina Următoare →.
- Pentru Securitate, alegeți valorile implicite.
- Alege Pagina Următoare →.
- Pentru Livrare bușteni, Selectați Livrați la Amazon CloudWatch Logs și căutați grupul de jurnal creat de șablonul CloudFormation
msk-connector-logs
. - Alege Pagina Următoare →.
- Examinați setările și alegeți Creați conector.
După câteva minute, conectorul funcționează.
Testați fluxul de jurnal CDC de la capăt la capăt
Acum că ambii conectori Debezium și S3 sunt porniți și funcționează, parcurgeți următorii pași pentru a testa CDC-ul end-to-end:
- Pe consola Amazon EC2, navigați la Grupuri de securitate .
- Selectați grupul de securitate
ClientInstanceSecurityGroup
Și alegeți Editați regulile de intrare. - Adăugați o regulă de intrare care să permită conexiunea SSH din rețeaua locală.
- Pe Instanțe pagina, selectați instanța
ClientInstance
Și alegeți Conectați. - Pe EC2 Instance Connect fila, alegeți Conectați.
- Asigurați-vă că directorul de lucru actual este
/home/ec2-user
și are fișierelecreate_table.sql
,alter_table.sql
,initial_insert.sql
, șiinsert_data_with_new_column.sql
. - Creați un tabel în baza de date MySQL rulând următoarea comandă (furnizați numele gazdei bazei de date din ieșirile șablonului CloudFormation):
- Când vi se solicită o parolă, introduceți parola din parametrii șablonului CloudFormation.
- Introduceți câteva date eșantion în tabel cu următoarea comandă:
- Când vi se solicită o parolă, introduceți parola din parametrii șablonului CloudFormation.
- Pe consola AWS Glue, alegeți Registre de schemă în panoul de navigare, apoi alegeți scheme.
- Navigheaza catre
db1.sampledatabase.movies
versiunea 1 pentru a verifica noua schemă creată pentru tabelul de filme:
Un folder S3 separat este creat pentru fiecare partiție a subiectului Kafka, iar datele pentru subiect sunt scrise în acel folder.
- Pe consola Amazon S3, verificați dacă există date scrise în format Parquet în folderul pentru subiectul dvs. Kafka.
Evoluția schemei
După ce schema inițială este definită, aplicațiile ar putea avea nevoie să o evolueze în timp. Când se întâmplă acest lucru, este esențial ca consumatorii din aval să poată gestiona fără probleme datele codificate atât cu vechea schemă, cât și cu cea nouă. Modurile de compatibilitate vă permit să controlați modul în care schemele pot sau nu pot evolua în timp. Aceste moduri formează contractul dintre aplicațiile care produc și consumă date. Pentru informații detaliate despre diferitele moduri de compatibilitate disponibile în AWS Glue Schema Registry, consultați Registrul AWS Glue Schema. În exemplul nostru, folosim pieptănabilitatea înapoi pentru a ne asigura că consumatorii pot citi atât versiunea actuală, cât și cea anterioară. Parcurgeți următorii pași:
- Adăugați o nouă coloană la tabel rulând următoarea comandă:
- Introduceți date noi în tabel rulând următoarea comandă:
- Pe consola AWS Glue, alegeți Registre de schemă în panoul de navigare, apoi alegeți scheme.
- Navigați la schemă
db1.sampledatabase.movies
versiunea 2 pentru a verifica noua versiune a schemei create pentru filmele din tabelul de filme, inclusiv coloana de țară pe care ați adăugat-o:
- Pe consola Amazon S3, verificați dacă există date scrise în format Parquet în folderul pentru subiectul Kafka.
A curăța
Pentru a preveni debitările nedorite din contul dvs. AWS, ștergeți resursele AWS pe care le-ați folosit în această postare:
- Pe consola Amazon S3, navigați la compartimentul S3 creat de șablonul CloudFormation.
- Selectați toate fișierele și folderele și alegeți Șterge.
- Introduceți ștergeți definitiv conform instrucțiunilor și alegeți Ștergeți obiecte.
- Pe consola AWS CloudFormation, ștergeți stiva pe care ați creat-o.
- Așteptați ca starea stivei să se schimbe în DELETE_COMPLETE.
Concluzie
Această postare a demonstrat cum să utilizați Amazon MSK, MSK Connect și AWS Glue Schema Registry pentru a construi un flux de jurnal CDC și a evolua scheme pentru fluxurile de date pe măsură ce nevoile afacerii se schimbă. Puteți aplica acest model de arhitectură altor surse de date cu conectori Kafka diferiți. Pentru mai multe informații, consultați Exemple MSK Connect.
Despre autor
Kalyan Janaki este Senior Big Data & Analytics Specialist cu Amazon Web Services. El îi ajută pe clienți să proiecteze și să construiască soluții în cloud foarte scalabile, performante și sigure pe AWS.
- Distribuție de conținut bazat pe SEO și PR. Amplifică-te astăzi.
- Platoblockchain. Web3 Metaverse Intelligence. Cunoștințe amplificate. Accesați Aici.
- Sursa: https://aws.amazon.com/blogs/big-data/build-an-end-to-end-change-data-capture-with-amazon-msk-connect-and-aws-glue-schema-registry/
- :este
- $UP
- 1
- 10
- 11
- 7
- 8
- a
- Capabil
- Despre Noi
- acces
- Cont
- precis
- recunoaște
- adăugat
- plus
- TOATE
- Permiterea
- permite
- deja
- Amazon
- Amazon EC2
- Amazon Web Services
- Google Analytics
- și
- Apache
- Apache Kafka
- aplicatii
- Aplică
- adecvat
- arhitectură
- SUNT
- AS
- Auroră
- în mod automat
- disponibil
- AWS
- Formarea AWS Cloud
- AWS Adeziv
- BE
- înainte
- Beneficiile
- între
- Mare
- Datele mari
- Bootstrap
- construi
- afaceri
- by
- CAN
- capacități
- captura
- capturarea
- cazuri
- catalog
- CDC
- centralizat
- Schimbare
- Modificări
- taxe
- verifica
- Verificări
- Alege
- clasă
- Grup
- Coloană
- comunitate
- compatibilitate
- compatibil
- Completă
- component
- Calcula
- Configuraţie
- joncțiune
- Conectați
- conexiune
- Consoleze
- constant
- consuma
- consumator
- Consumatorii
- continuu
- contract
- Control
- ţară
- crea
- a creat
- critic
- Curent
- personalizat
- clienţii care
- de date
- integrarea datelor
- Pe bază de date
- Baza de date
- baze de date
- Zi
- Deciziile
- Mod implicit
- implicite
- definit
- livrarea
- Demo
- demonstrat
- demonstrează
- implementa
- dislocate
- descriere
- destinație
- detaliat
- detalii
- diferit
- descoperi
- Nu
- Picătură
- fiecare
- elimină
- permițând
- un capăt la altul
- asigura
- asigură
- Intrați
- Mediu inconjurator
- eroare
- mai ales
- Eter (ETH)
- Fiecare
- evolua
- exemplu
- existent
- există
- puțini
- Domenii
- Fișier
- Fişiere
- final
- First
- următor
- Pentru
- formă
- format
- din
- genera
- generată
- grup
- Grupului
- manipula
- Manipularea
- se întâmplă
- Avea
- ajutor
- ajută
- extrem de
- istorie
- gazdă
- ORE
- Cum
- Cum Pentru a
- HTML
- http
- HTTPS
- Butuc
- IAM
- identificarea
- Identitate
- implementarea
- important
- in
- Inclusiv
- indexurile
- informații
- inițială
- instala
- instanță
- in schimb
- integra
- integrare
- intern
- IT
- Java
- jpg
- JSON
- Kafka
- Cheie
- Cunoaște
- Părăsi
- biblioteci
- Autorizat
- ca
- încărca
- încărcare
- local
- Lung
- făcut
- FACE
- Efectuarea
- gestionate
- maestru
- max
- maxim
- mecanism
- mesaj
- mesaje
- ar putea
- minute
- model
- moduri de
- monitoare
- mai mult
- Filme
- în mişcare
- multiplu
- MySQL
- nume
- Navigaţi
- Navigare
- Nevoie
- necesar
- nevoilor
- reţea
- Nou
- următor
- număr
- of
- Vechi
- on
- ONE
- open-source
- Altele
- producție
- pagină
- perechi
- pâine
- parametru
- parametrii
- parte
- Trecut
- Parolă
- Model
- efectua
- permanent
- alege
- Plato
- Informații despre date Platon
- PlatoData
- conecteaza
- Plugin-uri
- Post
- împiedica
- precedent
- proces
- prelucrare
- produce
- producător
- proprietăţi
- furniza
- prevăzut
- furnizează
- Citeste
- real
- în timp real
- a primi
- record
- înregistrări
- se referă
- regiune
- Inregistreaza-te
- înregistrată
- registru
- depozit
- reprezentate
- Resurse
- Returnează
- Rol
- Regula
- Alerga
- funcţionare
- acelaşi
- scalabil
- perfect
- Caută
- secunde
- sigur
- securitate
- senior
- sensibil
- distinct
- serviciu
- Servicii
- set
- setări
- să
- simplu
- simplifica
- soluţie
- soluţii
- unele
- Sursă
- Surse
- specialist
- specific
- specificată
- stivui
- Pornire
- Stare
- Pas
- paşi
- depozitare
- stoca
- magazine
- curent
- de streaming
- fluxuri
- structura
- potrivit
- De sprijin
- Sprijină
- sincronizare
- sistem
- sisteme
- tabel
- Ţintă
- sarcini
- șablon
- test
- acea
- Sursa
- Lor
- Acestea
- timp
- sensibil la timp
- Titlu
- la
- subiect
- Tranzacții
- ÎNTORCĂ
- Tipuri
- în
- nedorit
- actualizarea
- utilizare
- Utilizator
- valoare
- Valori
- versiune
- web
- servicii web
- care
- voi
- ferestre
- cu
- lucrător
- muncitorii
- de lucru
- fabrică
- scris
- Ta
- zephyrnet