Exchanging AVRO messages between confluent_kafka and PySpark
Published on
A common pattern to exchange data between (offline) batch jobs and micro-services is to use Kafka as the communication layer. The data is wrapped in a message with a specific contract and is written to a Kafka topic so that it can later be consumed. The message needs to be serialized in a way that both the producer and consumer can understand, a way of specifying both the message contract (schema) and the serialization format is using the AVRO specification.
A AVRO schema is a json document that says which fields the message contains and the type of those fields. Take for example the following schema:
"type": "record",
"name": "ExampleMessage",
"namespace": "example.avro",
"fields": [
{
"name": "field_1",
"type": "string"
},
{
"name": "field_2",
"type": "long"
},
{
"name": "field_3",
"type": {
"type": "array",
"items": "long"
}
}
]
This contract says that our messages will have a string, a (long) integer and an array of (long) integers named field_1
, field_2
and field_3
. By using this contract both the producer and the consumer will know what to expect regarding the content of each message.
The contract is also important when transforming the data into bytes that can be transmitted over the network or keep in storage by Kafka in a efficient way. Those bytes might also need to be read back into memory and transformed in a representation that your programs understand. This whole process of SerDe (Serialization/Deserialization) is easily done using your favourite implementation of the AVRO specification.
Spark is the de facto batch job library used across the industry. Since AVRO and Spark are widely used it is not surprising that Spark has native support to serialize and deserialize AVRO messages given a AVRO schema. Kafka is also supported by Spark which makes writing/reading data easy, check this example.
The question is how well does the connection between Spark, AVRO and Kafka work within the rest of the python ecosystem ?
To test it, we created 2 simple scenarios:
- Producer written in PySpark. Consumer written in python using confluent-kafka-python client
- Producer in python using confluent-kafka-python client. Consumer written in PySpark.
(All the code used in these tests can be found in this repo)
Scenario 1: PySpark Producer & confluent-kafka-python Consumer
In this scenario, we create a column in a Spark DataFrame that contains structs with the same schema as showed before. The Serialization of this column to AVRO is done using the to_avro
function provided by Spark. The messages are written to Kafka using Spark and a confluent_kafka
consumer is used to read the messages back from Kafka.
You can see bellow the whole test, if you want more details you can check the code here.
@pytest.fixture()
def example_data(spark_session: SparkSession) -> DataFrame:
num_rows = 10
pdf = pd.DataFrame(
{
"id": list(range(num_rows)),
"field_1": [f"field_1_{value}" for value in range(num_rows)],
"field_2": list(range(num_rows)),
"field_3": [
[i for i in range(10, 10 + row_number)]
for row_number in range(num_rows)
],
}
)
df = spark_session.createDataFrame(pdf)
df = df.withColumn("message", spark_func.struct("field_1", "field_2", "field_3"))
return df
def test_write_spark_read_confluent(
kafka_topic: KafkaOptions,
schema_registry_client: SchemaRegistryClient,
example_schema: Dict[str, Any],
example_data: DataFrame,
) -> None:
"""
This test shows that if we write a message to Kafka serialized in avro
using the pyspark.sql.avro.functions.to_avro function, we cannot read it in using the confluent_kafka consumer.
We will get a deserialization error!
"""
schema_json = json.dumps(example_schema)
kafka_conf = {
"bootstrap.servers": kafka_topic.host,
"compression.type": kafka_topic.compression_type,
"group.id": "pytest-tests",
"auto.offset.reset": "earliest",
}
df_message_avro = example_data.withColumn(
"message_avro",
spark_to_avro(example_data["message"], schema_json),
)
write_spark_kafka(df_message_avro, "message_avro", kafka_topic)
with pytest.raises(SerializationError) as e_info:
_ = read_avro_confluent_kafka(
kafka_conf,
kafka_topic.topic,
schema_registry_client,
schema_json,
)
assert (
" This message was not produced with a Confluent Schema Registry serializer"
in str(e_info)
)
When reading the messages back from Kafka using a confluent_kafka
consumer, we see that we get a SerializationError
saying that the message was not produced using a conluent_kafka
serializer. This happens due to the consumer deserializer expecting a field, containing the schema version.
Scenario 2: confluent-kafka-python Producer & PySpark Consumer
In this scenario, we produce messages using a confluent_kafka
producer and we try to read them with a Spark consumer. The deserialization is done by using the from_avro
Spark function.
@pytest.fixture()
def example_messages() -> List[Dict[str, Any]]:
num_messages = 10
return [
{
"id": int(msg_num),
"field_1": f"field_1_{msg_num}",
"field_2": msg_num,
"field_3": [i for i in range(10, 10 + msg_num)],
}
for msg_num in range(num_messages)
]
def test_write_confluent_read_spark(
kafka_topic: KafkaOptions,
schema_registry_client: SchemaRegistryClient,
example_schema: Dict[str, Any],
example_messages: List[Dict[str, Any]],
spark_session: SparkSession,
) -> None:
"""
This test shows that if we write a message to Kafka serialized in avro
using the the confluent_kafka producer, we cannot deserialize it correctly using the
pyspark.sql.avro.functions.from_avro.
We will messages populate with default fields, and not the correct data.
"""
kafka_conf = {
"bootstrap.servers": kafka_topic.host,
"compression.type": kafka_topic.compression_type,
"group.id": "pytest-tests",
"auto.offset.reset": "earliest",
}
schema_json = json.dumps(example_schema)
write_avro_confluent_kafka(
example_messages,
kafka_conf,
kafka_topic.topic,
schema_registry_client,
schema_json,
)
df_messages_read = read_spark_kafka(spark_session, kafka_topic)
pdf_messages_read = df_messages_read.withColumn(
"message",
spark_from_avro(df_messages_read["value"], schema_json),
).toPandas()
field_1_read_messages = set(
[x["field_1"] for x in pdf_messages_read["message"].to_list()]
)
field_1_original_messages = set([x["field_1"] for x in example_messages])
assert field_1_original_messages.intersection(field_1_read_messages) == set()
(You can find the code for this test here.)
When reading the messages using Spark, we see a weird behaviour. We don’t get any deserialization errors but the information present in the read messages is not correct. For example, in the test above we see that the overlap of the values written and read is empty for field_1
.
Solution
From the two scenarios above we might think that there is no easy way to exchange data between PySpark and confluent-kafka-python making us think that a custom solution might be needed. If we think, the whole issue is related with the SerDe implementation expected by confluent-kafka-python so we need to work at the level of the from_avro
, to_avro
implementations given by Spark to make it compliant with what confluent_kafka
expects.
Thankfully, someone already did the heavy lifting for us and we can use the custom ABRis SerDe functions. This package is only available in Scala, but we can also make it work in PySpark by using the Py4j connection present in the SparkContext
and adding some configuration to our SparkSession
. We can create our custom from_avro
and to_avro
functions that can be used to replace the Spark specific ones.
Using these new functions, in scenario 1 we are capable of reading the messages written by Spark using confluent_kafka
without SerializationError
issues.
def test_write_spark_custom_read_confluent(
kafka_topic: KafkaOptions,
schema_registry_client: SchemaRegistryClient,
example_schema: Dict[str, Any],
example_data: DataFrame,
schema_registry_config: Dict[str, str],
example_messages: List[Dict[str, Any]],
) -> None:
"""
Test that shows correctness if we use the Abris package to write an Avro serialized message.
If we serialize the messages with Abris, when we read from kafka using kafka_confluent we
get the right data.
"""
schema_json = json.dumps(example_schema)
kafka_conf = {
"bootstrap.servers": kafka_topic.host,
"compression.type": kafka_topic.compression_type,
"group.id": "pytest-tests",
"auto.offset.reset": "earliest",
}
# this will register the schema automatically
# check https://github.com/AbsaOSS/ABRiS
abris_config = to_avro_abris_config(
{"schema.registry.url": schema_registry_config["url"]},
kafka_topic.topic,
False,
schema_json=schema_json,
)
df_message_avro = example_data.withColumn(
"message_avro",
custom_to_avro(example_data["message"], abris_config),
)
write_spark_kafka(df_message_avro, "message_avro", kafka_topic)
read_messages = read_avro_confluent_kafka(
kafka_conf,
kafka_topic.topic,
schema_registry_client,
schema_json,
)
field_1_read_messages = set([x["field_1"] for x in read_messages])
field_1_original_messages = set([x["field_1"] for x in example_messages])
assert len(read_messages) == len(example_messages)
assert field_1_read_messages == field_1_original_messages
(you can find the code of this snippet here)
The same for scenario 2, the messages read into the Spark DataFrame have now the correct information that was written by the confluent_kafka
.
def test_write_confluent_read_spark_custom(
kafka_topic: KafkaOptions,
schema_registry_client: SchemaRegistryClient,
example_schema: Dict[str, Any],
example_messages: List[Dict[str, Any]],
spark_session: SparkSession,
schema_registry_config: Dict[str, str],
) -> None:
"""
Test that shows correctness if we use the Abris package to read a confluent Avro serialized message.
IF we use Abris from_avro, we get the correct data when reading.
"""
kafka_conf = {
"bootstrap.servers": kafka_topic.host,
"compression.type": kafka_topic.compression_type,
"group.id": "pytest-tests",
"auto.offset.reset": "earliest",
}
schema_json = json.dumps(example_schema)
write_avro_confluent_kafka(
example_messages,
kafka_conf,
kafka_topic.topic,
schema_registry_client,
schema_json,
)
df_messages_read = read_spark_kafka(spark_session, kafka_topic)
abris_config = from_avro_abris_config(
{"schema.registry.url": schema_registry_config["url"]}, kafka_topic.topic, False
)
pdf_messages_read = df_messages_read.withColumn(
"message",
custom_from_avro(df_messages_read["value"], abris_config),
).toPandas()
field_1_read_messages = set(
[x["field_1"] for x in pdf_messages_read["message"].to_list()]
)
field_1_original_messages = set([x["field_1"] for x in example_messages])
assert (
field_1_original_messages.intersection(field_1_read_messages)
== field_1_original_messages
)
(you can find the code of this snippet here)
An alternative to using ABRis would be to implement extra logic to SerDe the messages using custom python UDF’s. While possible (and probably resulting in more pythonic or cleaner code), by using python UDF’s you are usually adding an overhead due to the python-jvm communication making your workloads run slower.
To conclude, I hope that this post saves you some headache the next time you have to exchange data between confluent_kafka
and PySpark. If you feel that a lot of information in this post is missing such as the specific implementations of the functions you see in the code snippets, you can check this repo for more details.
Thanks for reading :)