Usage

Installation

You can install avroc with pip:

pip install avroc

Basic Usage

Avroc is a library for reading and writing data with Avro schemas. It works a little differently than most Avro libraries: with Avroc, a schema is compiled during runtime to convert it into efficient Python code. This means that the first time a schema is encountered during runtime, it might take a little longer to handle - but every call thereafter will be very fast when compared with the official avro library or fastavro.

If you’re working mostly with Avro files (called “Avro Object Container files” in the Avro specification), compilation will probably be mostly invisible to you when using avroc. You can write a bunch of messages to a file with avroc.write_file, and things should just work. Reading should similary just work - the schema is embedded directly into any valid Avro files, and the avroc.read_file function will compile the schema in an Avro file when it opens it.

If you’re interested in encoding single messages directly into bytes (or decoding single messages, similarly), you’ll use avroc.compile_encoder and avroc.compile_decoder to construct encoder and decoder functions.

Examples

Reading a file

import avroc

with open("avro_data.avro", "rb") as f:
    for msg in avroc.read_file(f):
        print(msg)  # etc

Writing a file

import avroc

schema = {
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "favorite_number",  "type": ["null", "int"]},
        {"name": "favorite_color", "type": ["null", "string"]}
    ]
}

messages = [
    {
        "name": "Alice",
        "favorite_number": 42,
        "favorite_color": "green",
    },
    {
        "name": "Bob",
        "favorite_number": 13,
        "favorite_color": "blue",
    },
]

with open("avro_data.avro", "wb") as f:
    avroc.write_file(f, schema, messages)

Writing a file message-by-message

import avroc

schema = {
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "favorite_number",  "type": ["null", "int"]},
        {"name": "favorite_color", "type": ["null", "string"]}
    ]
}

messages = [
    {
        "name": "Alice",
        "favorite_number": 42,
        "favorite_color": "green",
    },
    {
        "name": "Bob",
        "favorite_number": 13,
        "favorite_color": "blue",
    },
]

with open("avro_data.avro", "wb") as f:
    writer = avroc.AvroFileWriter(f, schema)
    for m in messages:
        writer.write(m)
    writer.flush()

Reading a file using a different schema from the writer

import avroc

new_schema = {
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "favorite_number",  "type": ["null", "int"]},
        {"name": "favorite_color", "type": ["null", "string"]}
        {"name": "email", "type": "string", "default": "unset"}
    ]
}

with open("avro_data.avro", "wb") as f:
    for m in avroc.read_file(f, new_schema):
        print(f'name: {m["name"]}  email: {m["email"]}')

Encoding a single message to bytes

import avroc

schema = {
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "favorite_number",  "type": ["null", "int"]},
        {"name": "favorite_color", "type": ["null", "string"]}
    ]
}

# Construct an encoder (don't do this for every message - it's a
# bunch of work)
encoder = avroc.compile_encoder(schema)

message = {
    "name": "Alice",
    "favorite_number": 42,
    "favorite_color": "green",
},

# encoder is a callable, so pass it a message directly. The
# return value is encoded bytes.
encoded = encoder(message)
print(repr(encoded))  #  b'\nAlice\x02T\x02\ngreen'

Decoding a single message from bytes

import avroc

schema = {
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "favorite_number",  "type": ["null", "int"]},
        {"name": "favorite_color", "type": ["null", "string"]}
    ]
}

# Construct a decoder (don't do this for every message - it's
# a bunch of work)
decoder = avroc.message_decoder(schema)

encoded_bytes = io.BytesIO(b'\nAlice\x02T\x02\ngreen')
decoded = decoder(encoded_bytes)

# {'name': 'Alice', 'favorite_number': 42,
#  'favorite_color': 'green'}
print(repr(decoded))

Message Types

Avro has a bunch of types, which are the basic building blocks you use when writing a Schema. This section lays out how those Avro types map to Python objects.

Each of the Avro types is mapped to and from Python types according to this table:

Avro Type

Python Type

primitive

null

None

int

int

long

int

boolean

bool

float

float

double

float

string

string

bytes

bytes

complex

map

dict

array

list

record

dict

fixed

bytes

enum

string

union

see Unions

A bit more detail is given in the following sections.

Primitives

Primitives mostly work as you’d expect. null becomes None, boolean becomes bool, and so on.

The only tricky thing is around Avro’s distinction between 32-bit numeric types (int, float) and 64-bit numeric types (long, double). All integers just become Python int values; int can hold integers of _any_ size. Floating point numbers become Python float values, which always are 64-bit.

This is never a problem when reading data - we can happily take a 32-bit integer and store it in Python’s int. But when writing data, you might get an error if you try to write an integer which is bigger than the 32-bit maximum. The same applies to floating point numbers.

Records

Records are represented in Python as plain old dictionaries. The keys are the field names. So, for example this schema:

{
  "type": "record",
  "name": "ExampleRecord",
  "fields": [
    {"name": "some_field", "type": "boolean"},
    {"name": "another_cooler_field", "type": "int"},
    {"name": "yet_another_field", "type": "long"},
  ]
}

corresponds to this Python object:

value = {
  "some_field": False,
  "another_cooler_field": 12,
  "yet_another_field": 3214,
}

Maps

Maps are represented in Python as plain old dictionaries. For example:

{
  "type": "map",
  "values": "float"
}

corresponds to this Python object:

value = {
  "k1": 3.21,
  "k2": 4.56,
  "k3": 8.1243,
}

Arrays

Arrays are represented in Python with lists. For example:

{
  "type": "array",
  "items": "string"
}

corresponds to this Python object:

value = ["hello", "world"]

Enums

Enums are represented in Python with the string value of the selected Enum symbol. For example:

{
  "type": "enum",
  "name": "ExampleEnum",
  "symbols": ["RED", "YELLOW", "BLUE"],
}

corresponds to this Python object:

value = "YELLOW"

Unions

Unions are implemented transparently. When you’re reading union-typed Avro data, you’ll just get the actual concretely typed value that was stored. To put it another way, you won’t explicitly know which branch of the union was stored, but it shouldn’t matter.

When you’re writing a message with a union-typed schema, avroc will attempt to infer the type to use. It does this greedily: it will encode the data with the first schema in the union that appears to be “valid.”

Validity is checked using the code found in the avroc.runtime.typetest module.

This can be easier to understand by looking at some of the generated code for unions. Let’s take a very simple record schema with just one field: a union of “int”, “float”, and “string”:

{
  "name": "ExampleRecord",
  "type": "record",
  "fields": [
    {
      "type": ["int", "float", "string"],
      "name": "example_union_field",
    },
  ]
}

The reader will produce a dictionary with one key, example_union_field. It will hold either an int, a float, or a string, depending on the bytes being read. Here’s what the generated code looks like:

import datetime
import decimal
import uuid
from avroc.runtime.encoding import *
from avroc.runtime.blocks import decode_block

def decoder(src):
    ExampleRecord = {}
    union_choice = decode_long(src)
    if union_choice == 0:
        ExampleRecord['example_union_field'] = decode_int(src)
    elif union_choice == 1:
        ExampleRecord['example_union_field'] = decode_float(src)
    elif union_choice == 2:
        ExampleRecord['example_union_field'] = decode_string(src)
    result = ExampleRecord
    return result

And the writer will take in a dictionary, and decide how to encode based on type tests. The writer function here expects a msg shapedc like {"example_union_field": 8}.

import numbers
from avroc.runtime.encoding import *
from avroc.runtime.typetest import *

def writer(msg):
    buf = bytes()
    if is_int(msg['example_union_field']):
        buf += encode_long(0)
        buf += encode_int(msg['example_union_field'])
    elif is_float(msg['example_union_field']):
        buf += encode_long(1)
        buf += encode_float(msg['example_union_field'])
    elif is_string(msg['example_union_field']):
        buf += encode_long(2)
        buf += encode_string(msg['example_union_field'])
    else:
        raise ValueError("message type doesn't match any options in the union")
    return buf

These cases are relatively straightforward. But type matching can be more complicated for record types. If multiple record types are possible in a union, the Avro specification leaves it up to the implementation to decide what to do.

Avroc decides to pick the first record type with field names that match the dictionary keys for the input record, in this case. Another example may be useful. Here’s a schema which represents a union over three possible record types:

[
     {
         "type": "record",
         "name": "CelsiusTemperature",
         "fields": [
             {"name": "temperature", "type": "double"},
             {"name": "measurement_error", "type": "double"}
         ]
     },
     {
         "type": "record",
         "name": "WindSpeed",
         "fields": [
             {"name": "speed", "type": "double"},
             {"name": "measurement_error", "type": "double"}
         ]
     },
     {
         "type": "record",
         "name": "FahrenheitTemperature",
         "fields": [
             {"name": "temperature", "type": "double"},
             {"name": "measurement_error", "type": "double"}
         ]
     }
 ]

Here’s the generated writer code:

import numbers
from avroc.runtime.encoding import *
from avroc.runtime.typetest import *

def writer(msg):
    buf = bytes()
    if is_record(msg, {'temperature', 'measurement_error'}):
        buf += encode_long(0)
        buf += encode_double(msg['temperature'])
        buf += encode_double(msg['measurement_error'])
    elif is_record(msg, {'speed', 'measurement_error'}):
        buf += encode_long(1)
        buf += encode_double(msg['speed'])
        buf += encode_double(msg['measurement_error'])
    elif is_record(msg, {'temperature', 'measurement_error'}):
        buf += encode_long(2)
        buf += encode_double(msg['temperature'])
        buf += encode_double(msg['measurement_error'])
    else:
        raise ValueError("message type doesn't match any options in the union")
    return buf

Using that code, any of the following are valid:

# Write a Celsius temperature measurement:
writer({"temperature": 21.5, "measurement_error": 0.4})

# Write a Windspeed measurement:
writer({"speed": 3.21, "measurement_error": 0.04})

# Write a Fahrenheit measurement - BUT this actually writes as "CelsiusTemperature"
writer({"temperatuire": 73.2, "measurement_error": 2.1})

Note that, since the CelsiusTemperature and the FahrenheitTemperature record types in the schema have exactly the same field names, the writer can’t tell which one is intended. In this case, it just takes the first one which matches.

If you need to disambiguate in cases like this, you might want to either add a field name to act as a flag, or store an additional enum-typed value to help out.

Logical Types

Avro supports “logical types.” These are annotations on types which indicate the semantic intent of a field. Avroc uses logicalType annotations to encode and decode values into certain types provided by the Python standard library. Specifically:

logicalType

Python type

“decimal”

decimal.Decimal

“uuid”

uuid.UUID

“date”

datetime.Date

“time-millis”, “time-micros”

datetime.time

“timestamp-millis”, “timestamp-micros”

datetime.datetime

If a logicalType is not recognized, or its arguments are invalid, then it will be encoded or decoded as the underlying type.

Schema Types

The schemas passed in to avroc APIs are the plain old dictionaries (or strings or lists) you’d get from JSON-decoding an Avro Schema. For example, this is a record schema:

schema = {
    "type": "record",
    "name": "WeatherData",
    "fields": [
        {"name": "temperature", "type": "float"},
        {"name": "location", "type": {
            "type": "record",
            "name": "Location",
            "fields": [
                {"name": "latitude", "type": "float"},
                {"name": "longitude", "type": "float"},
            ]
        }},

    ]
}

That schema can be compiled by avroc. The associated messages that avroc expects when writing, and that it will output when reading, will be dictionaries of a similar shape:

msg = {
    "temperature": 71.4,
    "location": {
        "latitude": 40.213,
        "longitude": 45.231,
    },
}

Schema Resolution

One of Avro’s most distinctive features is schema resolution. This is the feature that allows for safe upgrades (or downgrades) of a data schema: you can read data with a different schema than was used to write it.

The way this works in avroc is that you provide a second reader_schema when you’re calling a function that reads Avro data.

All the rules in the Avro specification’s Schema Resolution section apply.

The resulting objects, when read, will match the reader_schema, rather than the writer’s schema.

Note that some sorts of errors in schema resolution can only be detected during decoding. In particular, if a writer uses a union schema, and the reader’s schema is not compatible with every possible option in the union, then avroc will not raise an error unless the actual incompatible data type is encountered during decoding.