Author: Rayan Alatab - Senior Data Engineer
EarnIn we leverage data to revolutionize the provision of earned wage access services and to build the best financial intelligence platform possible. With those end goals in mind, EarnIn’s Data Engineering team has been supporting the company wide initiative to design event driven services following industry standards. In particular, the team also adheres to the
Cloud Events specification.
With the increased move to events being produced by services, comes the need to democratize the events in our data lake for data processing, analytics, and machine learning. We wanted to build a self-serve solution for stream materialization that EarnIn engineers could easily work with.
EarnIn’s existing infrastructure uses DataBricks. So the decision to adopt DLT (Delta Live Tables) was a natural one. However, the DLT product does not provide a built-in integration with a schema registry. This was a perfect opportunity to build a robust materialization capability that seamlessly integrates with DLT and a schema registry.
High-Level Design
The overall materializer flow is the following:
Fetch the latest schema from the Confluent schema registry
Consume the events from Kafka
Deserialize to a Spark Dataframe
Save the newly consumed data in the lake and register it in Unity Catalog
One of our requirements was to support both JSON and Protobuf formatted events.
Deserializing the Kafka payload to a Spark Schema was straightforward with Protobuf but proved to be challenging with JSON. While many solutions for JSON schemas require users to hardcode a Spark schema for deserialization, we aimed for a completely generic approach that would be transparent to engineers, relying solely on the schema stored in the registry.
The flow of a JSON schema topic
Fetching the schemas from the schema registry uses the Confluent Python SDK
from confluent_kafka.schema_registry import SchemaRegistryClient, RegisteredSchema
schema_registry_conf = {
'url': "<schema_registry_url>",
'': "<schema_registry_api_key>:<schema_registry_api_secret>"
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
json_schema = schema_registry_client.get_latest_version(topic_name).schema.schema_str
Fetch the events from Kafka
kafka_df = self.spark.readStream.format("kafka") \
.option("subscribe", topic_name) \
.option("includeHeaders", "true") \
.option("", <consumer_group_id>) \
.option("kafka.bootstrap.servers", <bootstrap_servers>) \
.option("startingOffsets", "earliest") \
Convert Json Schema to Spark Schema
converter = JsonToSparkSchemaConverter() (check GitHub repo)
spark_schema = converter.convert(schema_str)
As mentioned previously, there was no generic solution to convert a JSON schema to Spark schema for deserialization. EarnIn has built a small library to convert JSON schemas to a Spark Schema which we plan to release as an open source project! Once we do, this post will be updated with the new repo link.
Deserialize JSON payload to a Spark Struct
spark_df = kafka_df.withColumn("payload", from_json(col("value").cast("string"), spark_schema))
The flow of a Protobuf schema topic
For Protobuf topics, Spark has a built-in method that deserializes a binary column of Protobuf format to a Spark struct.
schema_registry_options = {"schema.registry.subject":<subject_name>,
"schema.registry.address":< registry_address>,
"confluent.schema.registry.basic.auth.credentials.source": "USER_INFO",
"": '{}:{}'.format(<schema_registry_api_key>, <schema_registry_api_secret>)
spark_df = from_protobuf(col("value"), options=schema_registry_options)
For the materialization itself, we save the data in two configurable forms, an event log and a snapshot, by creating DLT pipelines in Databricks. The event log consists of all historical data while the snapshot table represents the aggregated view of the events and the latest state. The DLT tables are then registered with Unity Catalog and are available in the lake.
The democratization of data in the lake empowers EarnIn to make well-informed decisions, thereby enhancing our ability to better serve our community members. By integrating Delta Live Tables with the Confluent Schema Registry, we've established a self-serve, resilient materializer capability that allows us to have better control over data freshness in the lake.
Interested in being a part of a collaborative engineering culture? Come join us at
Author: Rayan Alatab - Senior Data Engineer