Kafka Connect — ลองทำ Change Data Capture (CDC) บน MongoDB ไปยัง Elasticsearch แบบเร็ว ๆ ⚡️ [Concepts Part]
หลังจากที่ไปลองเล่น Kafka Connect มาแบบไว ๆ ก็รู้สึกว่าเป็นเครื่องมือที่น่าสนใจ ช่วยให้เราสามารถทำ data integration ระหว่าง database ได้ไวขึ้น โดยที่ไม่ต้องคอยสร้าง producer หรือ consumer ตัวใหม่และ logic มีแค่การ serialize/deserialize ลงถัง database บทความนี้เราเลยจะลองเล่น Kafka Connect เพื่อทำ Change Data Capture (CDC) จาก MongoDB ไปยัง Elasticsearch กันน!!
ตอนแรกจะรวมทั้ง concepts และ demo ในบทความเดียว เขียนไปเขียนมายาวจัดเอาเป็นว่าบทความนี้จะมีแต่ concept ที่ไปอ่านมาจาก official document และที่ไปทำ PoC มานะครับ 🎉
ส่วนสำหรับใครที่มีไปอ่านจาก official document แล้วอยากลองขึ้น Kafka Connect แบบไว ๆ สามารถเข้าไป clone repository ด้านล่าง ลองเล่นได้เลยครับ
บทความนี้จะไม่มีการอธิบายพื้นฐานของ Kafka นะครับ 😅
Change Data Capture (CDC)
เป็นวิธีการ capture record ที่มีการเปลี่ยนแปลงบน database/data system ที่เราสนใจและส่งข้อมูลการเปลี่ยนแปลงนั้นไปยัง database/data system ปลายทางเพื่อเพิ่มความ consistency ให้กับระบบที่เรามีอยู่ โดยปกติเเล้วการทำ data capture นั้นมีหลากหลายวิธี
- Timestamp-based
- Trigger-based
- Log-based
Kafka Connect
เป็นหนึ่งในเครื่องมือที่ช่วยในการ streaming data จาก database/data system หนึ่งไปอีกที่หนึ่ง ด้วยการใช้ Kafka เป็นตัวกลางในการรับ/ส่ง data ซึ่งจะต้องสร้างสิ่งที่เรียกว่า source/sink connector เพื่อให้ Kafka Connect รู้ต้นทางและปลายทางของข้อมูลที่เราอยากจะรับส่ง
Kafka Connect มี concepts หลัก ๆ ที่เราควรรู้ เพื่อใช้ config conector ดังนี้
- Connector
- Task
- Converter
- Worker
- Transform
- Dead Letter Queue
ผมเองก็ไม่ได้ไป PoC ถึงการปรับ config เพื่อวัด benchmark มากเท่าไหร่ครับแค่ focus การ config และสร้าง demo มาลองเล่น หากใครสนใจ concepts ทั้งหมดสามารถเข้าไปอ่านต่อที่ลิงก์ด้านล่างครับ
Connector 🔌
เป็นสิ่งที่ใช้บอก Kafka Connect ให้รู้ว่าเราอยากได้ข้อมูลมาจาก database/data system ไหน และส่งไปยัง database/data system อะไร
connector เลยแบ่งออกได้เป็น 2 ประเภทดังนี้
I) Source Connector — เป็น connector ที่เราใช้อ่าน/รับ ข้อมูลจาก database/data system ที่เราต้องการ ไปยัง Kafka Topic
II) Sink connector — เป็น connector ที่เราใช้เขียน/ส่ง ข้อมูลจาก Kafka topic ไปยัง database/data system ที่เราต้องการ
ยกตัวอย่างใน demo ที่เราจะลองเล่นคือการทำ CDC บน MongoDB ไปยัง Elasticsearch เราจึงต้องไปหา connector plugin มาติดตั้งบน connect service เราดังนี้
- MongoDB Source Connector
- Elasticsearch Sink Connector
หากพูดถึง “connector” เราสามารถแบ่งออกมาได้ 2 contexts ดังนี้ครับ
I) Connector Plugin — ประกอบไปด้วย connector classes ทั้งหมดที่เราจะต้อง install ไว้บน connect service ก่อนที่จะเริ่มสร้าง connector ผ่าน REST API ที่ Kafka Connect มีให้เราใช้จัดการ connector
เราสามารถหา connector plugin ผ่านเว็บของ Confluent ได้ด้านล่างนี้ หรือบางครั้งใน database document ที่เราสนใจก็จะมีหัวข้อเกี่ยวกับ CDC และมี connector plugin ให้เราใช้งานได้เช่นกัน
หรืออีกทางเราสามารถสร้าง connector plugin เองได้เช่นกัน หากสนใจสามารถอ่านต่อจากลิงก์ด้านล่างนี้ครับ
ผมยังไม่ได้เข้าไปดู เเต่รายละเอียดลึก ๆ น่าจะมีให้อ่านเยอะเลยครับ
II) Connector Instance — หลังจากที่เราสร้าง conecctor ผ่าน REST API เราจะได้ สิ่งที่เรียกว่า connector instance ซึ่งสิ่งนี้เราจะเจอบ่อยในมุมของการ config เรื่องจำนวน workers, tasks
ใน demo part เราจะมีการใช้ Redpanda ในการจัดการตัว connector น่าจะทำให้เราเห็นภาพตรงนี้มากขึ้นครับ
Task 💣
Task เป็นหน่วยของงานที่ workers คอยดูแล มีหน้าที่ในการคัดลอกข้อมูลเข้าออก Kafka ระหว่าง source และ sink เลยทำให้เราเพิ่มความสามารถ parallelism ได้ด้วยการ config จำนวน task per connector
การ config ค่า task ไม่ได้ตรงไปตรงมามากเท่าไหร่ เพราะมีข้อจำกัดในส่วนของการ implementation ของ connector ที่เราใช้ครับ เช่น หาก source ของเราใช้แค่ single cursor ในการทำ change stream หมายความว่าเราเพิ่มจำนวน task ไปก็ไม่ได้ช่วยอะไร หรือฝั่งของ sink เราสร้าง parition topic แค่ 4 partition การเพิ่มจำนวน task ให้มากกว่าจำนวน partition ก็ไม่ได้เพิ่ม throughput ให้เราอยู่ดีครับ ตามดั้งเดิม relation ระหว่างจำนวน partition และจำนวน member ของ consumer group
Worker 🖥
Kafka Connect จะใช้ workers ในการจัดการดูแล connector instances และ tasks ที่ทำงานเป็น thread ดังภาพ โดย woker แบ่งออกเป็นสองประเภทครับ
I) Standalone Workers — ใช้ single process ดูแลทุกอย่างทั้ง connectors และ tasks แน่นอนว่าเราควรใช้แค่ใน development environment มากกว่า
II) Distributed Workers — ใช้ หลาย process ในการดูแล connectors และ tasks เหมือนในภาพด้านบน ที่ทำให้มีความสามารถ scalability และ automatic fault tolerance โดยปกติจำนวนเริ่มต้นของ worker จะเท่าจำนวน node ของ Kafka Connect หมายความว่าการเพิ่มจำนวน worker ไม่ได้มีผลต่อ throughput การ produce/consumer message แต่มีผลต่อ latency ในการทำงานต่อ 1 message เพราะ woker จะคอย redistribute connectors และ tasks ให้กระจายตามแต่ละเครื่อง และใช้หลักการเดียวกันกับ consumer group rebalancing ครับ หากมี worker เข้าออกจาก worker group ไม่ว่าจากเหตุผลอะไร จะมีการ rebalance workers และ redistribute connectors/tasks เกิดขึ้นเสมอ
ค่า confg บางอย่างเราสามารถทำได้ในระดับของ worker และสามารถ override จาก config ใน connector level ได้อีกทีครับ ดังนั้นเราควรระมัดระวังในการ config ที่ worker ให้ดีด้วยเช่นกัน เดียวการพูดถึงเรื่องนี้อีกทีใน demo part ครับ
connector instance thread จะคอยจัดการ/รับคำสั่งการเปลี่ยนแปลง config ของตัว connector และ task ก็มีหน้าที่เหมือนที่อธิบายในหัวข้อก่อนหน้า
Converter 🔧
เป็นส่วนสำคัญที่เราใช้กำหนดหน้าตา format ของ message ที่เราจะเขียน/อ่าน บน Kafka topic ซึ่งปกติเเล้ว Kafka จะอาศัย filesystem ในการจัดเก็บ messages เป็นรูปแบบ bytes ทำให้เราสามารถกำหนดได้ว่า เราอยากจะ serialize/deserialize ข้อมูลที่จะเข้าออก Kafka topic เป็น format อะไรได้หลากหลาย format ครับ
โดย Kafka Connect จะมี converter ให้เราใช้ดังนี้
- AvroConverter
io.confluent.connect.avro.AvroConverter
- ProtobufConverter
io.confluent.connect.protobuf.ProtobufConverter
- JsonSchemaConverter
io.confluent.connect.json.JsonSchemaConverter
- JsonConverter
org.apache.kafka.connect.json.JsonConverter
- StringConverter
org.apache.kafka.connect.storage.StringConverter
- ByteArrayConverter
org.apache.kafka.connect.converters.ByteArrayConverter
converter บางตัวจะต้องมีการใช้งานคู่กับสิ่งที่เรียกว่า Schema Register ซึ่งเราจะมาเล่ากันอีกทีว่ามันมีบทบาทอะไรกับ converter ใน demo part อีกที
ตัวอย่างจากภาพด้านบน source connector ได้ข้อมูลมาจาก MySQL และส่งต่อไปหา converter เพื่อที่จะ serialize ข้อมูลก้อนนั้นเป็น Arvo format ในรูปแบบ Bytes หลังจากนั้น sink connector ทำการ consumer และ deserializes ข้อมูลก้อนนั้นออกมา โดยที่ sink connector รู้ว่า bytes ที่รับมาเป็น Arvo format
เคสนี้ค่อนข้างตรงไปตรงมา ที่เราจะ serialize/deserialize ด้วย converter ตัวเดียวกัน แต่หากเรามีเคสที่ data destination หรือ secondary database ของเราต้องการข้อมูลจาก topic อื่นและใช้ converter คนละตัวกับที่เราทำอยู่ เราจะทำยังไงดีนะ 🤔?
โดยปกติ converter จะถูกแยกออกมาจาก connector หรือก็คือไม่ได้ขึ้นต่อกัน เพื่อให้สามารถ reuse converter ในหลาย connector ครับ (ใน doc เขาบอกแบบนั้นนะครับแต่ผมไม่ได้ดูต่อว่ามัน reuse ยังไง 😅) นั่นเลยทำให้ converter จะถูก config ผ่าน worker เป็น default และสามารถถูก connector override ตัว config ได้ จากคำถามด้านบนเราจึงสามารถสร้าง connector อีกตัวเพื่อที่จะใช้ convertor อีก format ในการ consumer ข้อมูลจากอีก topic นั่นเอง
เรื่อง level ของการ config เราน่าจะเห็นภาพมากขึ้นใน demo part นะครับ
Transform 🦠
เป็นส่วนที่เราสามารถ transform ข้อมูลให้อยู่ในรูปที่เราต้องการ ผ่าน connector config เช่น การลบ/เพิ่ม field ตัวอย่างเช่นหากเราไม่อยากได้ field ใด field หนึ่งเข้าเก็บบน secondary database เราก็ต้องมีการเขียน transform ให้มีการลบ field นั้นทิ้งบน sink connector config ครับ
ตัวอย่างในการ config ผมจะพาไปดูใน demo part นะครับ
เราสามารถสร้าง transformation ของตัวได้เหมือนกัน จะคล้าย ๆ connector plugin โดยการไป implement interface ที่เขากำหนดไว้
หากตัว connect service ของเรามี base image เป็นของ Confluent เราสามารถใช้สิ่งที่เรียกว่า Single Message Transforms (SMT) ในการ config transformation ได้ สามารถดู SMT functions ทั้งหมดได้จากลิงก์ด้านล่างครับ
Dead Letter Queue (DLQ) 👻
เป็นส่วนที่ผมไม่ได้ไปลองใน PoC แต่จะเล่าจากที่อ่านมาจาก official document
แน่นอนว่าค่อนข้างคล้ายกับเครื่องมือหรือ service ตัวอื่น ๆ ที่เรารู้จักกันครับ หน้าที่ของมันใน Kafka Connect คือคอยเก็บ record/message ที่ผิดพลาดในฝั่ง sink connector เช่น sink connector ได้รับข้อมูลที่ถูก serialized เป็น JSON format แต่เรา config ที่ connector ไว้ให้ deserialize จาก Arvo format
โดยการทำงานมันจะขึ้นอยู่กับค่า config ที่ชื่อว่า errors.tolerance ดังนี้
errors.tolerance = none — หากมี error/invalid record ตัว task จะถูกหยุดเเละเปลี่ยน state เป็น fail โดยเราสามารถหาสาเหตุจากการดู worker log และ manual restart connector เอง
errors.tolerance = all —จะทำการ ignore พวก error/invalid record และทำงานต่อไปเรื่อย ๆ และไม่มี error เขียนลง worker log ด้วยเช่นกัน
Next chapter >> Demo Part 💣
หากใครพร้อมจะไปมือเปื้อนกันต่อเเล้ว สามารถไปอ่าน Demo Part ต่อจากลิงก์ด้านล่างกันได้เลยคร้าบบ !!
Author 📝
ถึงจะ title ไว้ว่า “แบบเร็ว ๆ” ก็น่าจะไม่เร็วตามที่คิดไว้เท่าไหร่ 5555 😂
ขอบคุณทุกคนที่อ่านมาจบถึงตรงนี้ด้วยนะครับ 😁 ไม่ได้เขียนอะไรแบบนี้มานานและปกติก็ไม่ได้เขียนบ่อยอยู่แล้ว 😂 ถ้าผิดพลาดหรือข้อมูลตกหล่นตรงไหนสามารถเพิ่มเติมรายละเอียดได้เลยนะครับ ยินดีมาก ๆ เลยครับ เอาไว้เจอกันต่อใน demo part นะคร้าบบบ 🚀