Skip to main content
  1. Projects/
  2. AC Data Platform (ACDP)/

AC Data Platform Introduction

The AC Data Platform (ACDP henceforth) is a streaming data pipeline designed to work alongside the ACEmulator emulator for Asheron’s Call. It uses Debezium to stream database changes into a Kafka cluster where ksqlDB stream processors enrich and structure the data for downstream consumption. This article covers the same ground as the talk I presented at an internal conference.

High-level architecture diagram #

Architectural diagram showing the high-level design of ACDP, including the emulator with its MySQL database, Debezium pulling from that database and writing into Kafka, ksqlDB running queries to create materialized views, and a SvelteKit application which uses Server-Sent Events to stream data to a frontend.

Project Overview #

Below I’ll cover the main pieces of the project:

Debezium and Kafka Connect #

Debezium can run as a Kafka Connect connector to connect to MySQL and stream database log changes into Kafka. Conventiently, ksqlDB servers can embed Kafka Connect and then you can create connectors using the same interface used to create materialized views, streams, and tables. For example,

CREATE SOURCE CONNECTOR IF NOT EXISTS ace_reader WITH (
    'connector.class' = 'io.debezium.connector.mysql.MySqlConnector',
    'database.hostname' = 'host.lima.internal',
    'database.port' = '3306',
    'database.user' = 'debezium',
    ...
);

Static Data Extractors #

The raw data in MySQL has a lot of integer values which represent enumerations in the emulator codebase. To pull this and other data, I wrote a set of what I’m calling static data extractors which pull information from the codebase using reflection, use parts of the emulator codebase which read Asheron’s Call-specific data file formats, or pull information directly from the database for later use enriching data.

  • Enumerations: I wrote a C# program which uses reflection to, uh, enumerate the enumerations and save the labels.
  • Asheron’s Call-specific file formats: Again, I wrote a C# program which this time piggy-backs on code that already exists in the emulator codebase to read AC file formats like client_portal.dat and save the table data therein.
  • Static data from MySQL: I save the points_of_interest table which, when joined with the weenie position properties table, allows me to assign a landblock cell ID and x, y, z, coordinates to all the POIs on the in-game map.

All data is serialized into ProtoBuf files for loading in the ksqlDB user-defined functions (UDFs).

ksqlDB Stream Processing #

Debezium writes Avro schemas into Confluent Schema Registry which makes it easy to get started creating streams and tables to process the change-data-capture data.

As an example, here is an excerpt from one of the ksqlDB migration files that creates some resources around the MySQL ace_shard.character table, which has information on the characters created on the server:

ASSERT TOPIC 'ace.ace_shard.character' TIMEOUT 1 MINUTE;

CREATE SOURCE STREAM IF NOT EXISTS shard_character_stream WITH (
    kafka_topic = 'ace.ace_shard.character', format = 'avro', timestamp='__source_ts_ms'
);

CREATE STREAM IF NOT EXISTS shard_character AS
    SELECT ROWKEY->id AS object_id,
           account_id,
           name,
           is_plussed,
           is_deleted,
           total_logins,
           NULLIF(FORMAT_TIMESTAMP(FROM_UNIXTIME(CAST(delete_time*1000 AS BIGINT)), 'yyyy-MM-dd''T''HH:mm:ssX', 'UTC'), FORMAT_TIMESTAMP(FROM_UNIXTIME(0), 'yyyy-MM-dd''T''HH:mm:ssX', 'UTC')) AS delete_timestamp,
           NULLIF(FORMAT_TIMESTAMP(FROM_UNIXTIME(CAST(last_login_timestamp*1000 AS BIGINT)), 'yyyy-MM-dd''T''HH:mm:ssX', 'UTC'), FORMAT_TIMESTAMP(FROM_UNIXTIME(0), 'yyyy-MM-dd''T''HH:mm:ssX', 'UTC')) AS last_login_timestamp,
           FORMAT_TIMESTAMP(FROM_UNIXTIME(ROWTIME), 'yyyy-MM-dd''T''HH:mm:ssX', 'UTC') AS timestamp
    FROM shard_character_stream
    PARTITION BY ROWKEY->id
    EMIT CHANGES;

I’ll go into more detail on the specifics in later posts, but this example shows how you can do transformations on incoming data and emit a stream of changes.

ksqlDB UDFs #

This part has been the most fun to create. ksqlDB allows you to write Java code that creates functions you can use in your ksqlDB statements. For example:

  • ace_enum(<enumeration name>, <enumeration value>) which will resolve something like ace_enum('PropertyInt', 24) to AvailableSkillCredits.
  • ace_enum(<enumeration name>, <enumeration value>, <value>) which will resolve so-called ’extended enumerations’ where the value for the specific enumeration is itself the value of another enumeration. For example, ace_enum('PropertyInt', 27) would resolve to ArmorType, and the value in the table might be 2; if you evaluated ace_enum('ArmorType', 2) you’d receive Leather - you can do this lookup in one step with this variant; ace_enum('PropertyInt', 27, 2) evaluates directly to Leather.
  • ace_attribute_collect(<attribute details struct>) is an aggregate UDF which maintains an aggregate of the attribute values seen so far, allowing you to see an up-to-date picture of the values of all 6 attributes.
  • ace_skill(<skill details struct>, <aggregated attributes>) which looks up the provided skill to find its formula and uses the attributes to calculate the current skill value.

SvelteKit Dashboard #

This is more of a proof of concept right now for my presentation, but I’d like to expand it in the future. The backend of the SvelteKit app issues push queries to ksqlDB and streams the responses to the frontend using Server-Sent Events. I found this sveltekit-sse library which made it very easy to get started. Frontend development is not something I have a lot of experience with, so this was a fun way to get into it. I’ll have more details on this piece on my blog in the future.

Author
Michael Ayoub