# Apibara > This page explains how to add support for a new chain to the DNA protocol. It's recommended that you're familiar with the high-level [DNA architecture](/docs/dna/architecture) and the [DNA streaming p --- # Source: https://www.apibara.com/docs/dna/add-new-chain # Adding a new chain This page explains how to add support for a new chain to the DNA protocol. It's recommended that you're familiar with the high-level [DNA architecture](/docs/dna/architecture) and the [DNA streaming protocol](/docs/dna/protocol) before reading this page. ## Overview Adding a new chain is relatively straightforward. Most of the code you need to write is describing the type of data stored on your chain. The guide is split in the following sections: - **gRPC Protocol**: describes how to augment the gRPC protocol with filters and data types specific to the new chain. - **Storage**: describes how data is stored on disk and S3. - **Data filtering**: describes how to filter data based on the client's request. ## gRPC Protocol The first step is to define the root `Filter` and `Block` protobuf messages. There are a few hard requirements on the messages: - The `header` field of the block must have tag `1`. - All other fields can have any tag. - Add one message type for each chain's resource (transactions, receipts, logs, etc.). - Each resource must have a `filter_ids` field with tag `1`. - Add a `Filter.id: uint32` property. Indexers use this to know which filters matched a specific piece of data and is used to populate the `filter_ids` field. The following items are optional: - Add an option to the `Filter` to request all block headers. Users use this to debug their indexer. - Think how users are going to use the data. For example, developers often access the transaction's hash of a log, for this reason we include the transaction hash in the `Log` message. - Avoid excessive nesting of messages. ## Storage The goal of the ingestion service is to fetch data from the chain (using the chain's RPC protocol), preprocess and index it, and then store it into the object storage. DNA stores block data as pre-serialized protobuf messages. This is done to send data to clients by copying bytes directly, without expensive serialization and deserialization. Since DNA doesn't know about the chain, it needs a way to filter data without scanning the entire block. This is done with _indices_. The chain-specific ingestion service is responsible for creating these indices. The next section goes into detail how indices work, the important part is that: - Indices are grouped by the type of data they index (for example transactions, logs, and traces). - For each type of data, there can be multiple indices. - Indices point to one or more pre-serialized protobuf messages. ## Data filtering As mentioned in the previous section, the DNA server uses indices to lookup data without scanning the entire block. This is done by compiling the protobuf filter sent by the client into a special representation. This `Filter` specifies: - What resource to filter (for example transactions, logs, and traces). - The list of conditions to match. A _condition_ is a tuple with the filter id and the lookup key. --- # Source: https://www.apibara.com/docs/dna/architecture # DNA v2 architecture This page describes in detail the architecture of DNA v2. At a high-level, the goals for DNA v2 are: - serve onchain data through a protocol that's optimized for building indexers. - provide a scalable and cost-efficient way to access onchain data. - decouple compute from storage. This is achieved by building a _cloud native_ service that ingests onchain data from an archive node and stores it into Object Storage (for example Amazon S3, Cloudflare R2). Data is served by stateless workers that read and filter data from Object Storage before sending it to the indexers. The diagram below shows all the high-level components that make a production deployment of DNA v2. Communication between components is done through etcd. ```txt ┌─────────────────────────────────────────────┐ \ │ Archive Node │░ \ └─────────────────────────────────────────────┘░ \ ░░░░░░░░░░░░░░░░░░░░░░│░░░░░░░░░░░░░░░░░░░░░░░░ \ │ \ │ \ ╔═ DNA Cluster ═══════════════════════╬══════════════════════════════════════╗ \ ║ │ ║░ ║ ┌──────┐ ▼ ┌──────┐ ║░ ║ │ │ ┌─────────────────────────────────────────────┐ │ │ ║░ ║ │ │ │ │ │ │ ║░ ║ │ │◀────│ Ingestion Service │────▶│ │ ║░ ║ │ │ │ │ │ │ ║░ ║ │ │ └─────────────────────────────────────────────┘ │ │ ║░ ║ │ │ ┌─────────────────────────────────────────────┐ │ │ ║░ ║ │ │ │ │ │ │ ║░ ║ │ │◀────│ Compaction Service │────▶│ │ ║░ ║ │ │ │ │ │ │ ║░ ║ │ │ └─────────────────────────────────────────────┘ │ │ ║░ ║ │ S3 │ ┌─────────────────────────────────────────────┐ │ etcd │ ║░ ║ │ │ │ │ │ │ ║░ ║ │ │◀────│ Pruning Service │────▶│ │ ║░ ║ │ │ │ │ │ │ ║░ ║ │ │ └─────────────────────────────────────────────┘ │ │ ║░ ║ │ │ ┌───────────────────────────────────────────┐ │ │ ║░ ║ │ │ │┌──────────────────────────────────────────┴┐ │ │ ║░ ║ │ │ ││┌──────────────────────────────────────────┴┐ │ │ ║░ ║ │ │ │││ │ │ │ ║░ ║ │ │ │││ Stream │ │ │ ║░ ║ │ │◀────┤││ ├────▶│ │ ║░ ║ │ │ │││ Service │ │ │ ║░ ║ └──────┘ └┤│ │ └──────┘ ║░ ║ └┤ │ ║░ ║ └───────────────────────────────────────────┘ ║░ ║ ║░ ╚════════════════════════════════════════════════════════════════════════════╝░ ░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░\\ ``` ## DNA service The DNA service is comprised of several components: - ingestion service: listens for new blocks on the network and stores them into Object Storage. - compaction service: combines multiple blocks together into _segments_. Segments are grouped by data type (like logs, transactions, and receipts). - pruner service: removes blocks that have been compacted to reduce storage cost. - stream service: receives streaming requests from clients (indexers) and serves onchain data by filtering objects stored on S3. ### Ingestion service The ingestion service fetches blocks from the network and stores them into Object Storage. This service is the only chain-specific service in DNA, all other components work on generic data-structures. Serving onchain data requires serving a high-volume of data filtered by a relatively small number of columns. When designing DNA, we took a few decisions to make this process as efficient as possible: - data is stored as pre-serialized protobuf messages to avoid wasting CPU cycles serializing the same data over and over again. - filtering is entirely done using indices to reduce reads. - joins (for example include logs' transactions) are also achieved with indices. The ingestion service is responsible for creating this data and indices. Data is grouped into `blocks`. Blocks are comprised of `fragments`, that is groups of related data. All fragments have an unique numerical id used to identify them. There are four different types of fragments: - index: a collection of indices, the fragment id is `0`. Indices are grouped by the fragment they index. - join: a collection of join indices, the fragment id is `254`. Join indices are also grouped by the source fragment index. - header: the block header, the fragment id is `1`. Header are stored as pre-serialized protobuf messages. - body: the chain-specific block data, grouped by fragment id. Note that we call block number + hash a _cursor_ since it uniquely identifies a block in the chain. ```txt ╔═ Block ══════════════════════════════════════════════════════════════╗ ║ ┌─ Index ──────────────────────────────────────────────────────────┐ ║░ ║ │ ┌─ Fragment 0 ─────────────────────────────────────────────────┐ │ ║░ ║ │ │┌────────────────────────────────────────────────────────────┐│ │ ║░ ║ │ ││ Index 0 ││ │ ║░ ║ │ │├────────────────────────────────────────────────────────────┤│ │ ║░ ║ │ ││ Index 1 ││ │ ║░ ║ │ │├────────────────────────────────────────────────────────────┤│ │ ║░ ║ │ │ │ │ ║░ ║ │ │├────────────────────────────────────────────────────────────┤│ │ ║░ ║ │ ││ Index N ││ │ ║░ ║ │ │└────────────────────────────────────────────────────────────┘│ │ ║░ ║ │ └──────────────────────────────────────────────────────────────┘ │ ║░ ║ └──────────────────────────────────────────────────────────────────┘ ║░ ║ ┌─ Join ───────────────────────────────────────────────────────────┐ ║░ ║ │ ┌─ Fragment 0 ─────────────────────────────────────────────────┐ │ ║░ ║ │ │┌────────────────────────────────────────────────────────────┐│ │ ║░ ║ │ ││ Fragment 1 ││ │ ║░ ║ │ │├────────────────────────────────────────────────────────────┤│ │ ║░ ║ │ ││ Fragment 2 ││ │ ║░ ║ │ │└────────────────────────────────────────────────────────────┘│ │ ║░ ║ │ └──────────────────────────────────────────────────────────────┘ │ ║░ ║ └──────────────────────────────────────────────────────────────────┘ ║░ ║ ┌─ Body ───────────────────────────────────────────────────────────┐ ║░ ║ │ ┌──────────────────────────────────────────────────────────────┐ │ ║░ ║ │ │ │ │ ║░ ║ │ │ Fragment 0 │ │ ║░ ║ │ │ │ │ ║░ ║ │ └──────────────────────────────────────────────────────────────┘ │ ║░ ║ │ ┌──────────────────────────────────────────────────────────────┐ │ ║░ ║ │ │ │ │ ║░ ║ │ │ Fragment 1 │ │ ║░ ║ │ │ │ │ ║░ ║ │ └──────────────────────────────────────────────────────────────┘ │ ║░ ║ └──────────────────────────────────────────────────────────────────┘ ║░ ╚══════════════════════════════════════════════════════════════════════╝░ ░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░\\ ``` On supported networks, the ingestion service is also responsible for periodically refreshing the mempool (pending) block data and uploading it into Object Storage. This works exactly as for all other blocks. The ingestion service tracks the canonical chain and uploads it to Object Storage. This data is used by the stream service to track online and offline chain reorganizations. The ingestion service stores its data on etcd. Stream services subscribe to etcd updates to receive notifications about new blocks ingested and other changes to the chain (for example changes to the finalized block). Finally, the ingestion service is _fault tolerant_. When the ingestion service starts, it acquires a distributed lock from etcd to ensure only one instance is running at the same time. If running multiple deployments of DNA, all other instances will wait for the lock to be released (following a service restart or crash) and will try to take over the ingestion. ## Compaction service The compaction service groups together data from several blocks (usually 100 or 1000) into _segments_. Segments only contain data for one fragment type (for example headers, indices, and transactions). In other words, the compaction service groups `N` blocks into `M` segments. Only data that has been finalized is compacted into segments. The compaction service also creates block-level indices called _groups_. Groups combine indices from multiple blocks/segments to quickly look up which blocks contain specific data. This type of index is very useful to increase performance on sparse datasets. ```txt ╔═ Index Segment ═══════════════════════╗ ╔═ Transaction Segment ═════════════════╗ ║ ┌─ Block ───────────────────────────┐ ║░ ║ ┌─ Block ───────────────────────────┐ ║░ ║ │ ┌─ Fragment 0 ──────────────────┐ │ ║░ ║ │ ┌───────────────────────────────┐ ║░ ║ │ │┌─────────────────────────────┐│ │ ║░ ║ │ │ │ ║░ ║ │ ││ Index 0 ││ │ ║░ ║ │ │ │ ║░ ║ │ │├─────────────────────────────┤│ │ ║░ ║ │ │ Fragment 2 │ ║░ ║ │ ││ Index 1 ││ │ ║░ ║ │ │ │ ║░ ║ │ │├─────────────────────────────┤│ │ ║░ ║ │ │ │ ║░ ║ │ │ │ │ ║░ ║ │ └───────────────────────────────┘ ║░ ║ │ │├─────────────────────────────┤│ │ ║░ ║ └───────────────────────────────────┘ ║░ ║ │ ││ Index N ││ │ ║░ ║ ┌─ Block ───────────────────────────┐ ║░ ║ │ │└─────────────────────────────┘│ │ ║░ ║ │ ┌───────────────────────────────┐ ║░ ║ │ └───────────────────────────────┘ │ ║░ ║ │ │ │ ║░ ║ └───────────────────────────────────┘ ║░ ║ │ │ │ ║░ ║ ┌─ Block ───────────────────────────┐ ║░ ║ │ │ Fragment 2 │ ║░ ║ │ ┌─ Fragment 0 ──────────────────┐ │ ║░ ║ │ │ │ ║░ ║ │ │┌─────────────────────────────┐│ │ ║░ ║ │ │ │ ║░ ║ │ ││ Index 0 ││ │ ║░ ║ │ └───────────────────────────────┘ ║░ ║ │ │├─────────────────────────────┤│ │ ║░ ║ └───────────────────────────────────┘ ║░ ║ │ ││ Index 1 ││ │ ║░ ║ ┌─ Block ───────────────────────────┐ ║░ ║ │ │├─────────────────────────────┤│ │ ║░ ║ │ ┌───────────────────────────────┐ ║░ ║ │ │ │ │ ║░ ║ │ │ │ ║░ ║ │ │├─────────────────────────────┤│ │ ║░ ║ │ │ Fragment 2 │ ║░ ║ │ ││ Index N ││ │ ║░ ║ │ │ │ ║░ ║ │ │└─────────────────────────────┘│ │ ║░ ║ │ │ │ ║░ ║ │ └───────────────────────────────┘ │ ║░ ║ │ └───────────────────────────────┘ ║░ ║ └───────────────────────────────────┘ ║░ ║ └───────────────────────────────────┘ ║░ ╚═══════════════════════════════════════╝░ ╚═══════════════════════════════════════╝░ ░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ ░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░\\ ``` ## Pruner service The pruner service cleans up block data that has been included in segments. This is done to reduce the storage used by DNA. ## Object hierarchy We now have all elements to understand the objects uploaded to Object Storage by the ingestion service. If you run DNA pointing it to your bucket, you can eventually see a folder structure that looks like the following. ```txt my-chain ├── blocks │ ├── 000020908017 │ │ └── 0xc137607affd53bd9e857af372429762f77eaff0fe32f0e49224e9fc0e439118d │ │ ├── pending-0 │ │ ├── pending-1 │ │ └── pending-2 │ ├── 000020908018 │ │ └── ... same as above │ └── 000020908019 │ └── ... same as above ├── chain │ ├── recent │ ├── z-000020906000 │ ├── z-000020907000 │ └── z-000020908000 ├── groups │ └── 000020905000 │ └── index └── segments ├── 000020906000 │ ├── header │ ├── index │ ├── join │ ├── log │ ├── receipt │ └── transaction ├── 000020907000 │ └── ... same as above └── 000020908000 └── ... same as above ``` ## Stream service The stream service is responsible for serving data to clients. The raw onchain data stored in Object Storage is filtered by the stream service before being sent over the network, this results in lower egress fees compared to solutions that filter data on the client. Upon receiving a stream request, the service validates and compiles the request into a _query_. A query is simply a list of index lookup requests that are applied to each block. The stream service loops then keeps repeating the following steps: - check if it should send a new block of data or inform the client of a chain reorganization. - load the indices from the segment or the block and use them to compute what data to send the client. - load the pre-serialized protobuf messages and copy them to the output stream. One critical aspect of the stream service is how it loads blocks and segments. Reading from Object Storage has virtually unlimited throughput, but also high latency. The service is also very likely to access data closer to the chain's tip more frequently, and we should cache Object Storage requests to avoid unnecessarily increase our cloud spending. We achieve all of this (and more!) by using an hybrid cache that stores frequently accessed data in memory and on disk. This may come as a surprise since isn't the point of DNA to avoid expensive disks and rely on cheap Object Storage? The reasons this design still makes sense are multiple: - we can use cheaper and higher performance temporary NVMe disks attached directly to our server. - we can quickly scale horizontally the stream service without re-indexing all data. - we can use disks that are much smaller than the full chain's data. The cache dynamically stores the most frequently accessed data. Unused or rarely used data lives on Object Storage. The following table, inspired [by the table in this article by Vantage](https://www.vantage.sh/blog/ebs-vs-nvme-pricing-performance), shows the difference in performance and price between an AWS EC2 instance using (temporary) NVMe disks and two using EBS (one with a general purpose `gp3` volume, and one with higher performance `io1` volume). All prices as of April 2024, US East, 1 year reserved with no upfront payment. | Metric | EBS (gp3) | EBS (io1) | NVMe | | --------------------- | ----------- | ----------- | ----------------- | | Instance Type | r6i.8xlarge | r6i.8xlarge | i4i.8xlarge | | vCPU | 32 | 32 | 32 | | Memory (GiB) | 256 | 256 | 245 | | Network (Gibps) | 12.50 | 12.50 | 18.75 | | Storage (GiB) | 7500 | 7500 | 2x3750 | | IOPS (read/write) | 16,000 | 40,000 | 800,000 / 440,000 | | Cost - Compute ($/mo) | 973 | 973 | 1,300 | | Cost - Storage ($/mo) | 665 | 3,537 | 0 | | Cost - Total ($/mo) | 1,638 | 4,510 | 1,300 | Notice how the NVMe instance has 30-50x the IOPS per dollar. This price difference means that Apibara users benefit from lower costs and/or higher performance. --- # Source: https://www.apibara.com/docs/networks/starknet/data#transaction-receipt # Source: https://www.apibara.com/docs/networks/starknet/data#transaction # Source: https://www.apibara.com/docs/networks/starknet/data # Source: https://www.apibara.com/docs/networks/evm/data # Source: https://www.apibara.com/docs/networks/beaconchain/data # Source: https://www.apibara.com/docs/networks/starknet/data # Source: https://www.apibara.com/docs/networks/evm/data # Source: https://www.apibara.com/docs/networks/beaconchain/data # Source: https://www.apibara.com/docs/networks/starknet/data # Source: https://www.apibara.com/docs/networks/evm/data # Source: https://www.apibara.com/docs/networks/beaconchain/data # Beacon Chain data reference This page contains reference about the available data in Beacon Chain DNA streams. ## Related pages - [Beacon Chain data filter reference](/docs/networks/beaconchain/filter) ## Filter ID All filters have an associated ID. To help clients correlate filters with data, the filter ID is included in the `filterIds` field of all data objects. This field contains the list of _all filter IDs_ that matched a piece of data. ## Nullable fields **Important**: most fields are nullable to allow evolving the protocol. You should always assert the presence of a field for critical indexers. ## Scalar types The `@apibara/beaconchain` package defines the following scalar types: - `Address`: a 20-byte Ethereum address, represented as a `0x${string}` type. - `B256`: a 32-byte Ethereum value, represented as a `0x${string}` type. - `B384`: a 48-byte Ethereum value, represented as a `0x${string}` type. - `Bytes`: arbitrary length bytes, represented as a `0x${string}` type. ## Data type ### Block The root object is the `Block`. ```ts export type Block = { header?: BlockHeader; transactions: Transaction[]; blobs: Blob[]; validators: Validator[]; }; ``` ### Header This is the block header, which contains information about the block. ```ts export type BlockHeader = { slot?: bigint; proposerIndex?: number; parentRoot?: B256; stateRoot?: B256; randaoReveal?: Bytes; depositCount?: bigint; depositRoot?: B256; blockHash?: B256; graffiti?: B256; executionPayload?: ExecutionPayload; blobKzgCommitments: B384[]; }; ``` ### Validator Data about validators. ```ts export type ValidatorStatus = | "pending_initialized" | "pending_queued" | "active_ongoing" | "active_exiting" | "active_slashed" | "exited_unslashed" | "exited_slashed" | "withdrawal_possible" | "withdrawal_done"; export type Validator = { filterIds: number[]; validatorIndex?: number; balance?: bigint; status?: ValidatorStatus; pubkey?: B384; withdrawalCredentials?: B256; effectiveBalance?: bigint; slashed?: boolean; activationEligibilityEpoch?: bigint; activationEpoch?: bigint; exitEpoch?: bigint; withdrawableEpoch?: bigint; }; ``` ## Properties - `slot`: the slot number. - `proposerIndex`: the index of the validator that proposed the block. - `parentRoot`: the parent root. - `stateRoot`: the state root. - `randaoReveal`: the randao reveal. - `depositCount`: the number of deposits. - `depositRoot`: the deposit root. - `blockHash`: the block hash. - `graffiti`: the graffiti. - `executionPayload`: the execution payload. - `blobKzgCommitments`: the blob kzg commitments. ### Transaction An EVM transaction. ```ts export type Transaction = { filterIds: number[]; transactionIndex?: number; transactionHash?: B256; nonce?: bigint; from?: Address; to?: Address; value?: bigint; gasPrice?: bigint; gas?: bigint; maxFeePerGas?: bigint; maxPriorityFeePerGas?: bigint; input: Bytes; signature?: Signature; chainId?: bigint; accessList: AccessListItem[]; transactionType?: bigint; maxFeePerBlobGas?: bigint; blobVersionedHashes?: B256[]; }; ``` ### AccessListItem ```ts export type AccessListItem = { address?: Address; storageKeys: B256[]; }; ``` ## Properties - `transactionIndex`: the index of the transaction in the block. - `transactionHash`: the hash of the transaction. - `nonce`: the nonce of the transaction. - `from`: the sender of the transaction. - `to`: the recipient of the transaction. Empty if it's a create transaction. - `value`: the value of the transaction, in wei. - `gasPrice`: the gas price of the transaction. - `gas`: the gas limit of the transaction. - `maxFeePerGas`: the max fee per gas of the transaction. - `maxPriorityFeePerGas`: the max priority fee per gas of the transaction. - `input`: the input data of the transaction. - `signature`: the signature of the transaction. - `chainId`: the chain ID of the transaction. - `accessList`: the access list of the transaction. - `transactionType`: the transaction type. - `maxFeePerBlobGas`: the max fee per blob gas of the transaction. - `blobVersionedHashes`: the hashes of blobs posted by the transaction. - `transactionStatus`: the status of the transaction. ## Relevant filters - `filter.transactions` - `filter.blobs[].includeTransaction` ## Validator Data about validators. ```ts export type ValidatorStatus = | "pending_initialized" | "pending_queued" | "active_ongoing" | "active_exiting" | "active_slashed" | "exited_unslashed" | "exited_slashed" | "withdrawal_possible" | "withdrawal_done"; export type Validator = { filterIds: number[]; validatorIndex?: number; balance?: bigint; status?: ValidatorStatus; pubkey?: B384; withdrawalCredentials?: B256; effectiveBalance?: bigint; slashed?: boolean; activationEligibilityEpoch?: bigint; activationEpoch?: bigint; exitEpoch?: bigint; withdrawableEpoch?: bigint; }; ``` ## Properties - `validatorIndex`: the index of the validator. - `balance`: the balance of the validator. - `status`: the status of the validator. - `pubkey`: the validator's public key. - `withdrawalCredentials`: the withdrawal credentials. - `effectiveBalance`: the effective balance of the validator. - `slashed`: whether the validator is slashed. - `activationEligibilityEpoch`: the epoch at which the validator can be activated. - `activationEpoch`: the epoch at which the validator was activated. - `exitEpoch`: the epoch at which the validator exited. - `withdrawableEpoch`: the epoch at which the validator can withdraw. ## Relevant filters - `filter.validators` --- # Source: https://www.apibara.com/docs/storage/drizzle-pg # Drizzle with PostgreSQL The Apibara Indexer SDK supports Drizzle ORM for storing data to PostgreSQL. ## Installation ### Using the CLI You can add an indexer that uses Drizzle for storage by selecting "PostgreSQL" in the "Storage" section when creating an indexer. The CLI automatically updates your `package.json` to add all necessary dependencies. ### Manually To use Drizzle with PostgreSQL, you need to install the following dependencies: ```bash npm install drizzle-orm pg @apibara/plugin-drizzle@next ``` We recommend using Drizzle Kit to manage the database schema. ```bash npm install --save-dev drizzle-kit ``` Additionally, if you want to use PGLite to run a Postgres compatible database without a full Postgres installation, you should install that package too. ```bash npm install @electric-sql/pglite ``` ## Persisting the indexer's state The Drizzle plugin automatically persists the indexer's state to the database. You can explicitly configure this option with the `persistState` flag. Read more [about state persistence in the internals page](/docs/storage/drizzle-pg/internals#state-persistence). ## Adding the plugin to your indexer Add the `drizzleStorage` plugin to your indexer's `plugins`. Notice the following: - Use the `drizzle` helper exported by `@apibara/plugin-drizzle` to create a drizzle instance. This method supports creating an in-memory database (powered by PgLite) by specifying the `memory:` connection string. - Always specify the database schema. This schema is used by the indexer to know which tables it needs to protect against chain reorganizations. - By default, the connection string is read from the `POSTGRES_CONNECTION_STRING` environment variable. If left empty, a local PGLite database will be created. This is great because it means you don't need to start Postgres on your machine to develop locally! ```ts import { drizzle, drizzleStorage, useDrizzleStorage } from "@apibara/plugin-drizzle"; import { transfers } from "@/lib/schema"; const db = drizzle({ schema: { transfers, }, }); export default defineIndexer(EvmStream)({ // ... plugins: [ drizzleStorage({ db, idColumn: "_id", }), ], // ... }); ``` ## Schema configuration You can use the `pgTable` function from `drizzle-orm/pg-core` to define the schema, no changes required. The only important thing to notice is that your table **must have an `id` column (name configurable)** that uniquely identifies each row. This requirement is necessary to handle chain reorganizations. Read more how the plugin handles chain reorganizations [on the internals page](/docs/storage/drizzle-pg/internals). ```ts import { bigint, pgTable, text, uuid } from "drizzle-orm/pg-core"; export const transfers = pgTable("transfers", { id: uuid("id").primaryKey().defaultRandom(), amount: bigint("amount", { mode: "number" }), transactionHash: text("transaction_hash"), }); ``` ## Specifying the id column As mentioned in the previous section, the id column is required by the plugin to handle chain reorganizations. The plugin allows you to specify the id column name for each table in the schema. You can do this by passing the `idColumn` option to the `drizzleStorage` plugin. This option accepts either a string value or a record mapping table names to column names. You can use the special `*` table name to define the default id column name for all tables. ### Example This example uses the same id column name (`_id`) for all tables. ```ts export default defineIndexer(EvmStream)({ // ... plugins: [ drizzleStorage({ db, idColumn: "_id", }), ], // ... }); ``` This example uses different id column names for each table. The `transfers` table will use `transfer_id` as the id column, while all other tables will use `_id`. ```ts export default defineIndexer(EvmStream)({ // ... plugins: [ drizzleStorage({ db, idColumn: { transfers: "transfer_id", "*": "_id", }, }), ], // ... }); ``` ## Writing and reading data from within the indexer Use the `useDrizzleStorage` hook to access the current database transaction. This transaction behaves exactly like a regular Drizzle ORM transaction because it is. Thanks to the way the plugin works and handles chain reorganizations, it can expose the full Drizzle ORM API without any limitations. ```ts export default defineIndexer(EvmStream)({ // ... async transform({ endCursor, block, context, finality }) { const { db } = useDrizzleStorage(); for (const event of block.events) { await db.insert(transfers).values(decodeEvent(event)); } }, }); ``` You are not limited to inserting data, you can also update and delete rows. ### Drizzle query Using the [Drizzle Query interface](https://orm.drizzle.team/docs/rqb) is easy. Pass the database instance to `useDrizzleStorage`: in this case the database type is used to automatically deduce the database schema. **Note**: the database instance is not used to query data but only for type inference. ```ts const database = drizzle({ schema }); export default defineIndexer(EvmStream)({ // ... async transform({ endCursor, block, context, finality }) { const { db } = useDrizzleStorage(database); const existingToken = await db.query.tokens.findFirst({ address }); }, }); ``` ## Querying data from outside the indexer You can query data from your application like you always do, using the standard Drizzle ORM library. ## Database migrations There are two strategies you can adopt for database migrations: - run migrations separately, for example using the drizzle-kit CLI. - run migrations automatically upon starting the indexer. If you decide to adopt the latter strategy, use the `migrate` option. Notice that the `migrationsFolder` path is relative from the project's root. ```ts import { drizzle } from "@apibara/plugin-drizzle"; const database = drizzle({ schema }); export default defineIndexer(EvmStream)({ // ... plugins: [ drizzleStorage({ db, migrate: { // Path relative to the project's root. migrationsFolder: "./migrations", }, }), ], // ... }); ``` --- # Source: https://www.apibara.com/docs/networks/starknet/filter#header # Source: https://www.apibara.com/docs/networks/starknet/filter # Source: https://www.apibara.com/docs/networks/evm/filter # Source: https://www.apibara.com/docs/networks/beaconchain/filter # Source: https://www.apibara.com/docs/networks/starknet/filter # Source: https://www.apibara.com/docs/networks/evm/filter # Source: https://www.apibara.com/docs/networks/beaconchain/filter # Source: https://www.apibara.com/docs/networks/evm/filter # EVM filter reference This page contains reference about the available data filters for EVM DNA streams. ## Related pages - [EVM block data reference](/docs/networks/evm/data) ## Filter ID All filters have an associated ID. When the server filters a block, it will return a list of all filters that matched a piece of data with the data. You can use this ID to build powerful abstractions in your indexers. ## Usage with viem Most types are compatible with [viem](https://viem.sh/). For example, you can generate log filters with the following code: ```ts import { encodeEventTopics, parseAbi } from 'viem'; const abi = parseAbi([ 'event Transfer(address indexed from, address indexed to, uint256 value)', ]); const filter = { logs: [ { topics: encodeEventTopics({ abi, eventName: 'Transfer', args: { from: null, to: null }, }), strict: true, }, ], }; ``` ## Filter types ### Root The root filter object contains a collection of filters. Notice that providing an empty filter object is an error. ```ts export type Filter = { header?: HeaderFilter; logs?: LogFilter[]; transactions?: TransactionFilter[]; withdrawals?: WithdrawalFilter[]; }; ``` ### Header The `HeaderFilter` object controls when the block header is returned to the client. ```ts export type HeaderFilter = 'always' | 'on_data' | 'on_data_or_on_new_block'; ``` The values have the following meaning: - `always`: Always return the header, even if no other filter matches. - `on_data`: Return the header only if any other filter matches. This is the default value. - `on_data_or_on_new_block`: Return the header only if any other filter matches. If no other filter matches, return the header only if the block is a new block. ## Logs Logs are the most common type of DNA filters. Use this filter to get the logs and their associated data like transactions, receipts, and sibling logs. ```ts export type LogFilter = { id?: number; address?: `0x${string}`; topics?: `0x${string} | null`; strict?: boolean; transactionStatus?: 'succeeded' | 'reverted' | 'all'; includeTransaction?: boolean; includeReceipt?: boolean; includeSiblings?: boolean; }; ``` **Properties** - `address`: filter by contract address. If empty, matches any contract address. - `topics`: filter by topic. Use `null` to match _any_ value. - `strict`: return logs whose topics length matches the filter. By default, the filter does a prefix match on the topics. - `transactionStatus`: return logs emitted by transactions with the provided status. Defaults to `succeeded`. - `includeTransaction`: also return the transaction that emitted the log. - `includeReceipt`: also return the receipt of the transaction that emitted the log. - `includeSiblings`: also return all other logs emitted by the same transaction that emitted the matched log. **Examples** - All logs in a block emitted by successful transactions. ```ts const filter = { logs: [], }; ``` - All `Transfer` events emitted by successful transactions. Notice that this will match logs from ERC-20, ERC-721, and other contracts that emit `Transfer`. ```ts const filter = { logs: [ { topics: [ '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef', ], }, ], }; ``` - All `Transfer` events that follow the ERC-721 standard. Notice that this will not match logs from ERC-20 since the number of indexed parameters is different. ```ts const filter = { logs: [ { topics: [ '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef', null, // from null, // to null, // tokenId ], strict: true, }, ], }; ``` - All logs emitted by `CONTRACT_A` OR `CONTRACT_B`. ```ts const filter = { logs: [ { address: CONTRACT_A, }, { address: CONTRACT_B, }, ], }; ``` ## Transactions Request Ethereum transactions. ```ts export type TransactionFilter = { id?: number; from?: `0x${string}`; to?: `0x${string}`; create?: true; transactionStatus?: 'succeeded' | 'reverted' | 'all'; includeReceipt?: boolean; includeLogs?: boolean; }; ``` **Properties** - `from`: filter by sender address. If empty, matches any sender address. - `to`: filter by receiver address. If empty, matches any receiver address. - `create`: filter by whether the transaction is a create transaction. - `transactionStatus`: return transactions with the provided status. Defaults to `succeeded`. - `includeReceipt`: also return the receipt of the transaction. - `includeLogs`: also return the logs emitted by the transaction. **Examples** - All transactions in a block. ```ts const filter = { transactions: [], }; ``` - All transactions from `0xAB...`. ```ts const filter = { transactions: [ { from: '0xAB...', }, ], }; ``` - All create transactions. ```ts const filter = { transactions: [ { create: true, }, ], }; ``` ## Withdrawals Request Ethereum withdrawals. ```ts export type WithdrawalFilter = { id?: number; validatorIndex?: number; address?: string; }; ``` **Properties** - `validatorIndex`: filter by validator's index. If empty, matches any validator's index. - `address`: filter by withdrawal address. If empty, matches any withdrawal address. **Examples** - All withdrawals. ```ts const filter = { withdrawals: [], }; ``` - All withdrawals from validator with index `1234`. ```ts const filter = { withdrawals: [ { validatorIndex: 1234, }, ], }; ``` - All withdrawals from validators with index `1234` OR `7890`. ```ts const filter = { withdrawals: [ { validatorIndex: 1234, }, { validatorIndex: 7890, }, ], }; ``` - All withdrawals to address `0xAB...`. ```ts const filter = { withdrawals: [ { address: '0xAB...', }, ], }; ``` --- # Source: https://www.apibara.com/docs/getting-started/indexers#with-runtime-config # Source: https://www.apibara.com/docs/getting-started/indexers # Building indexers Indexers are created using the `defineIndexer` higher-order function. This function takes a _stream definition_ and returns a function to define the indexer. The job of an indexer is to stream and process historical data (backfilling) and then switch to real-time mode. Indexers built using our SDK are designed to handle chain-reorganizations automatically. If, for any reason, you need to receive notifications about reorgs, you can define [a custom `message:invalidate` hook](/docs/getting-started/plugins#hooks) to handle them. By default, the indexer is stateless (restarts from the beginning on restart) and does not provide any storage. You can add persistence and storage by using one of the provided storage plugins. ## Examples The following examples show how to create indexers for the Beacon Chain, EVM (Ethereum), and Starknet. ### Beacon Chain indexer ```ts import { BeaconChainStream } from "@apibara/beaconchain"; import { defineIndexer } from "@apibara/indexer"; export default defineIndexer(BeaconChainStream)({ /* ... */ }); ``` ### EVM (Ethereum) indexer ```ts import { EvmStream } from "@apibara/evm"; import { defineIndexer } from "@apibara/indexer"; export default defineIndexer(EvmStream)({ /* ... */ }); ``` ### Starknet indexer ```ts import { StarknetStream } from "@apibara/starknet"; import { defineIndexer } from "@apibara/indexer"; export default defineIndexer(StarknetStream)({ /* ... */ }); ``` ## With runtime config To configure the indexer at runtime, export a function that takes the configuration and returns the indexer's definition. ```ts import { EvmStream } from "@apibara/evm"; import type { ApibaraRuntimeConfig } from "apibara/types"; import { defineIndexer } from "@apibara/indexer"; export default function (runtimeConfig: ApibaraRuntimeConfig) { return defineIndexer(EvmStream)({ // ... }); } ``` ## Indexer configuration All indexers take the same configuration options. - **`streamUrl`** `string` - **`filter`** `TFilter` - **`finality`** `string | "finalized" | "accepted" | "pending"` - **`startingCursor`** `{ orderKey: bigint, uniqueKey?: string }` - **`debug`** `boolean` - **`transform`** `({ block, cursor, endCursor, finality, context }) => Promise` - **`factory`** `({ block, context }) => Promise<{ filter?: TFilter }>` - **`hooks`** `object` - **`plugins`** `array` ### The transform function The `transform` function is invoked for each block received from the DNA stream. This function is where you should implement your business logic. **Arguments** - **`block`** `TBlock` - **`cursor`** `{ orderKey: bigint, uniqueKey?: string }` - **`endCursor`** `{ orderKey: bigint, uniqueKey?: string }` - **`finality`** `string | "finalized" | "accepted" | "pending"` - **`context`** `object` The following example shows a minimal indexer that streams block headers and prints them to the console. ```ts import { EvmStream } from "@apibara/evm"; import { defineIndexer } from "@apibara/indexer"; export default defineIndexer(EvmStream)({ streamUrl: "https://mainnet.ethereum.a5a.ch", filter: { header: "always", }, async transform({ block }) { const { header } = block; console.log(header); }, }); ``` ### The factory function The `factory` function is used to add data filters at runtime. This is useful for creating indexers for smart contracts that deploy other smart contracts like Uniswap V2 and its forks. **Arguments** - **`block`** `TBlock` - **`context`** `object` The following example shows a minimal indexer that streams `PairCreated` events from Uniswap V2 to detect new pools, and then streams the pool's events. ```ts import { EvmStream } from "@apibara/evm"; import { defineIndexer } from "@apibara/indexer"; export default defineIndexer(EvmStream)({ streamUrl: "https://mainnet.ethereum.a5a.ch", filter: { logs: [ { /* ... */ }, ], }, async factory({ block }) { const { logs } = block; return { /* ... */ }; }, async transform({ block }) { const { header, logs } = block; console.log(header); console.log(logs); }, }); ``` --- # Source: https://www.apibara.com/docs/getting-started/installation # Installation This tutorial shows how to setup an Apibara project from scratch. The goal is to start indexing data as quickly as possible and to understand the basic structure of a project. By the end of this tutorial, you will have a basic indexer that streams data from two networks (Ethereum and Starknet). ## Installation This tutorial starts with a fresh Typescript project. In the examples, we use `pnpm` as the package manager, but you can use any package manager you prefer. Let's start by creating the project. The `--language` flag specifies which language to use to implement indexers, while the `--no-create-indexer` flag is used to delay the creation of the indexer. ```bash mkdir my-indexer cd my-indexer pnpm dlx apibara@next init . --language=ts --no-create-indexer ``` ```typescript import { defineConfig } from 'apibara/config'; export default defineConfig({ runtimeConfig: {}, }); ``` ## API Key The streams hosted by Apibara require an API key. - [Sign up for a free account](https://app.apibara.com/) - Create an API key - Export the API key as the `DNA_TOKEN` environment variable ## EVM Indexer Let's create the first EVM indexer. All indexers must go in the `indexers` directory and have a name that ends with `.indexer.ts` or `.indexer.js`. The Apibara CLI will automatically detect the indexers in this directory and make them available to the project. You can use the `apibara add` command to add an indexer to your project. This command does the following: - gathers information about the chain you want to index. - asks about your preferred storage solution. - creates the indexer. - adds dependencies to your `package.json`. ```bash pnpm apibara add ``` ```typescript import { defineIndexer } from 'apibara/indexer'; import { useLogger } from 'apibara/plugins'; import { EvmStream } from '@apibara/evm'; import type { ApibaraRuntimeConfig } from 'apibara/types'; export default function (runtimeConfig: ApibaraRuntimeConfig) { const { startingBlock, streamUrl } = runtimeConfig.rocketPool; return defineIndexer(EvmStream)({ streamUrl, finality: 'accepted', startingBlock: BigInt(startingBlock), filter: { logs: [ { address: '0xae78736Cd615f374D3085123A210448E74Fc6393' }, ], }, plugins: [], async transform({ block }) { const logger = useLogger(); const { logs, header } = block; logger.log(`Block number ${header?.blockNumber}`); for (const log of logs) { logger.log( `Log ${log.logIndex} from ${log.address} tx=${log.transactionHash}` ); } }, }); } ``` Notice the following: - The indexer file exports a single indexer. - The `defineIndexer` function takes the stream as parameter. In this case, the `EvmStream` is used. This is needed because Apibara supports multiple networks with different data types. - `streamUrl` specifies where the data comes from. You can connect to streams hosted by us, or to self-hosted streams. - `startingBlock` specifies from which block to start streaming. - These two properties are read from the `runtimeConfig` object. Use the runtime configuration object to have multiple presets for the same indexer. - The `filter` specifies which data to receive. You can read more about the available data for EVM chains in the [EVM documentation](/docs/networks/evm/filter). - The `transform` function is called for each block. It receives the block as parameter. This is where your indexer processes the data. - The `useLogger` hook returns an indexer-specific logger. There are more indexer options available, you can find them [in the documentation](/docs/getting-started/indexers). ## Running the indexer During development, you will use the `apibara` CLI to build and run indexers. For convenience, the template adds the following scripts to your `package.json`: ```json { "scripts": { "dev": "apibara dev", "build": "apibara build", "start": "apibara start" } } ``` - `dev`: runs all indexers in development mode. Indexers are automatically reloaded and restarted when they change. - `build`: builds the indexers for production. - `start`: runs a _single indexer_ in production mode. Notice you must first build the indexers. Before running the indexer, you must set the `DNA_TOKEN` environment variable to your DNA API key, created from the dashboard. You can store the environment variable in a `.env` file, but make sure not to commit it to git! Now, run the indexer in development mode. ```bash pnpm run dev ``` ```bash apibara-app@0.1.0 dev /tmp/my-indexer apibara dev ``` ```bash ✔ Output directory .apibara/build cleaned ✔ Types written to .apibara/types ✔ Indexers built in 19369 ms ✔ Restarting indexers rocket-pool | log Block number 21000071 rocket-pool | log Log 239 from 0xae78736cd615f374d3085123a210448e74fc6393 tx=0xe3b7e285c02e9a1dad654ba095ee517cf4c15bf0c2c0adec555045e86ea1de89 rocket-pool | log Block number 21000097 rocket-pool | log Log 265 from 0xae78736cd615f374d3085123a210448e74fc6393 tx=0x8946aaa1ae303a19576d6dca9abe0f774709ff6c3f2de40c11dfda2ab276fbba rocket-pool | log Log 266 from 0xae78736cd615f374d3085123a210448e74fc6393 tx=0x8946aaa1ae303a19576d6dca9abe0f774709ff6c3f2de40c11dfda2ab276fbba rocket-pool | log Block number 21000111 ``` ## Starknet indexer You can index data on different networks in the same project. Let's add an indexer for Starknet. Like before, you can use the `apibara add` command to add an indexer to your project. ```bash pnpm apibara add ``` ```typescript import { defineIndexer } from 'apibara/indexer'; import { useLogger } from 'apibara/plugins'; import { StarknetStream } from '@apibara/starknet'; import { ApibaraRuntimeConfig } from 'apibara/types'; export default function (runtimeConfig: ApibaraRuntimeConfig) { const { startingBlock, streamUrl } = runtimeConfig.strkStaking; return defineIndexer(StarknetStream)({ streamUrl, finality: 'accepted', startingBlock: BigInt(startingBlock), filter: { events: [ { address: '0x028d709c875c0ceac3dce7065bec5328186dc89fe254527084d1689910954b0a' }, ], }, plugins: [], async transform({ block }) { // Unchanged. }, }); } ``` You can now run the indexer. In this case, you can specify which indexer you want to run with the `--indexers` option. When the flag is omitted, all indexers are run concurrently. ```bash pnpm run dev --indexers strk-staking ``` ```bash ... apibara-app@0.1.0 dev /tmp/my-indexer apibara dev --indexers=strk-staking ``` ```bash ✔ Output directory .apibara/build cleaned ✔ Types written to .apibara/types ✔ Indexers built in 20072 ms ✔ Restarting indexers strk-staking | log Block number 929092 strk-staking | log Event 233 tx=0x012f8356ef02c36ed1ffddd5252c4f03707166cabcccb49046acf4ab565051c7 strk-staking | log Event 234 tx=0x012f8356ef02c36ed1ffddd5252c4f03707166cabcccb49046acf4ab565051c7 strk-staking | log Event 235 tx=0x012f8356ef02c36ed1ffddd5252c4f03707166cabcccb49046acf4ab565051c7 strk-staking | log Event 236 tx=0x012f8356ef02c36ed1ffddd5252c4f03707166cabcccb49046acf4ab565051c7 strk-staking | log Block number 929119 strk-staking | log Event 122 tx=0x01078c3bb0f339eeaf303bc5c47ea03b781841f7b4628f79bb9886ad4c170be7 strk-staking | log Event 123 tx=0x01078c3bb0f339eeaf303bc5c47ea03b781841f7b4628f79bb9886ad4c170be7 ``` ## Production build The `apibara build` command is used to build a production version of the indexer. There are two main changes for the production build: - No hot code reloading is available. - Only one indexer is started. If your project has multiple indexers, it should start them independently. ```bash pnpm run build ``` ```bash apibara-app@0.1.0 build /tmp/my-indexer apibara build ``` ```bash ✔ Output directory .apibara/build cleaned ✔ Types written to .apibara/types ◐ Building 2 indexers ✔ Build succeeded! ℹ You can start the indexers with apibara start ``` ## Runtime configuration & presets Apibara provides a mechanism for indexers to load their configuration from the `apibara.config.ts` file: - Add the configuration under the `runtimeConfig` key in `apibara.config.ts`. - Change your indexer's module to return a function that, given the runtime configuration, returns the indexer. You can update the configuration to define values that are configurable by your indexer. This example used the runtime configuration to store the DNA stream URL and contract address. ```ts import { defineConfig } from 'apibara/config'; export default defineConfig({ runtimeConfig: { strkStaking: { startingBlock: 900_000, streamUrl: 'https://mainnet.starknet.a5a.ch', contractAddress: '0x028d709c875c0ceac3dce7065bec5328186dc89fe254527084d1689910954b0a' as `0x${string}`, }, }, }); ``` Then update the indexer to return a function that returns the indexer. Your editor is going to show a type error since the types of `config.streamUrl` and `config.contractAddress` are unknown, the next session is going to explain how to solve that issue. ```ts import { defineIndexer } from 'apibara/indexer'; import { useLogger } from 'apibara/plugins'; import { StarknetStream } from '@apibara/starknet'; import { ApibaraRuntimeConfig } from 'apibara/types'; export default function (runtimeConfig: ApibaraRuntimeConfig) { const config = runtimeConfig.strkStaking; const { startingBlock, streamUrl } = config; return defineIndexer(StarknetStream)({ streamUrl, startingBlock: BigInt(startingBlock), filter: { events: [ { address: config.contractAddress as `0x${string}` }, ], }, plugins: [], async transform({ block }) { // Unchanged. }, }); } ``` ### Typescript & type safety You may have noticed that the CLI generates types in `.apibara/types` before building the indexers (both in development and production mode). These types contain the type definition of your runtime configuration. You can instruct Typescript to use them by adding the following `tsconfig.json` to your project. ```json { "$schema": "https://json.schemastore.org/tsconfig", "compilerOptions": { "target": "ES2022", "module": "ESNext", "moduleResolution": "bundler" }, "include": ["**/*.ts", ".apibara/types"], "exclude": ["node_modules"] } ``` After restarting the Typescript language server you will have a type-safe runtime configuration right into your indexer! ### Presets Having a single runtime configuration is useful but not enough for real-world indexers. The CLI provides a way to have multiple `presets` and select which one to use at runtime. This is useful, for example, if you're deploying the same indexers on multiple networks where only the DNA stream URL and contract addresses change. You can have any number of presets in the configuration and use the `--preset` flag to select which one to use. For example, you can add a `sepolia` preset that contains the URL of the Starknet Sepolia DNA stream. If a preset doesn't specify a key, then the value from the root configuration is used. ```ts import { defineConfig } from 'apibara/config'; export default defineConfig({ runtimeConfig: { streamUrl: 'https://mainnet.starknet.a5a.ch', contractAddress: '0x028d709c875c0ceac3dce7065bec5328186dc89fe254527084d1689910954b0a' as `0x${string}`, presets: { sepolia: { runtimeConfig: { streamUrl: 'https://sepolia.starknet.a5a.ch', }, }, }, }, }); ``` You can then run the indexer in development mode using the `sepolia` preset. ```bash pnpm run dev -- --indexers=strk-staking --preset=sepolia ``` ```bash ... apibara-app@0.1.0 dev /tmp/my-indexer apibara dev --indexers=strk-staking --preset=sepolia ``` ```bash ✔ Output directory .apibara/build cleaned ✔ Types written to .apibara/types ✔ Indexers built in 3858 ms ✔ Restarting indexers strk-staking | log Block number 100092 strk-staking | log Event 233 tx=0x012f8356ef02c36ed1ffddd5252c4f03707166cabcccb49046acf4ab565051c7 strk-staking | log Event 234 tx=0x012f8356ef02c36ed1ffddd5252c4f03707166cabcccb49046acf4ab565051c7 strk-staking | log Event 235 tx=0x012f8356ef02c36ed1ffddd5252c4f03707166cabcccb49046acf4ab565051c7 strk-staking | log Event 236 tx=0x012f8356ef02c36ed1ffddd5252c4f03707166cabcccb49046acf4ab565051c7 strk-staking | log Block number 100119 strk-staking | log Event 122 tx=0x01078c3bb0f339eeaf303bc5c47ea03b781841f7b4628f79bb9886ad4c170be7 strk-staking | log Event 123 tx=0x01078c3bb0f339eeaf303bc5c47ea03b781841f7b4628f79bb9886ad4c170be7 ``` ## Storing data & persisting state across restarts All indexers implemented in this tutorial are stateless. They don't store any data to a database and if you restart them they will restart indexing from the beginning. You can refer to our storage section to learn more about writing data to a database and persisting the indexer's state across restarts. - [Drizzle with PostgreSQL](/docs/storage/drizzle-pg) --- # Source: https://www.apibara.com/docs/storage/drizzle-pg/internals#state-persistence # Drizzle's plugin internals This section describes how the Drizzle plugin works. Understanding the content of this page is not needed for using the plugin. ## Drizzle and the indexer The plugin wraps all database operations in the `transform` and `factory` functions in a database transaction. This ensures that the indexer's state is always consistent and that data is never lost due to crashes or network failures. More specifically, the plugin is implemented as a [middleware](/docs/getting-started/plugins#middleware). At a very high level, the plugin looks like the following: ```ts indexer.hooks.hook("handler:middleware", async ({ use }) => { use(async (context, next) => { await db.transaction(async (txn) => { // Assign the transaction to the context, to be accessed using useDrizzleStorage context.db = txn; await next(); delete context.db; // Update the indexer's state with cursor. await updateState(txn); }); }); }); ``` ## Chain reorganizations The indexer needs to be able to rollback state after a chain reorganization. The behavior described in this section is only relevant for un-finalized blocks. Finalized blocks don't need special handling since they are, by definition, not going to be part of a chain reorganization. The main idea is to create an [\"audit table\"](https://supabase.com/blog/postgres-audit) with all changes to the indexer's schema. The name of the audit table is `airfoil.reorg_rollback` and has the following schema. ```txt +------------+--------------+-----------------------+ | Column | Type | Modifiers | |------------+--------------+-----------------------| | n | integer | not null default ... | | op | character(1) | not null | | table_name | text | not null | | cursor | integer | not null | | row_id | text | | | row_value | jsonb | | | indexer_id | text | not null | +------------+--------------+-----------------------+ ``` The data stored in the `row_value` column is specific to each operation (INSERT, DELETE, UPDATE) contains the data needed to revert the operation. Notice that the table's row must be JSON-serializable. At each block, the plugin registers a trigger for each table managed by the indexer. At the end of the transaction, the trigger inserts data into the audit table. The audit table is periodically pruned to remove snapshots of data that is now finalized. ### Reverting a block When a chain reorganization is detected, all operations in the audit table where `cursor` is greater than the new chain's head are reverted in reverse order. - `op = INSERT`: the row with id `row_id` is deleted from the table. - `op = DELETE`: the row with id `row_id` is inserted back into the table, with the value stored in `row_value`. - `op = UPDATE`: the row with id `row_id` is updated in the table, with the value stored in `row_value`. ## State persistence The state of the indexer is persisted in the database, in the `airfoil.checkpoints` and `airfoil.filters` tables. The checkpoints table contains the last indexed block for each indexer. ```txt +------------+--------------+-----------------------+ | Column | Type | Modifiers | |------------+--------------+-----------------------| | id | text | primary key | | order_key | integer | not null | | unique_key | text | | +------------+--------------+-----------------------+ ``` The filters table is used to manage the dynamic filter of factory indexers. It contains the JSON-serialized filter together with the block range it applies to. ```txt +------------+--------------+-----------------------+ | Column | Type | Modifiers | |------------+--------------+-----------------------| | id | text | not null | | filter | text | not null | | from_block | integer | not null | | to_block | integer | default null | +------------+--------------+-----------------------+ ``` --- # Source: https://www.apibara.com/docs/getting-started/plugins#middleware # Source: https://www.apibara.com/docs/getting-started/plugins#hooks # Plugins & Hooks Indexers are extensible through hooks and plugins. Hooks are functions that are called at specific points in the indexer's lifecycle. Plugins are components that contain reusable hooks callbacks. ## Hooks The following hooks are available in all indexers. - **`run:before`** - `() => void` - Called before the indexer starts running. - **`run:after`** - `() => void` - Called after the indexer has finished running. - **`connect:before`** - `({ request, options }) => void` - Called before the indexer connects to the DNA stream. Can be used to change the request or stream options. - **`connect:after`** - `({ request }) => void` - Called after the indexer has connected to the DNA stream. - **`connect:factory`** - `({ request, endCursor: { orderKey: bigint, uniqueKey?: string } }) => void` - Called before the indexer reconnects to the DNA stream with a new filter (in factory mode). - **`message`** - `({ message }) => void` - Called for each message received from the DNA stream. Additionally, message-specific hooks are available: `message:invalidate`, `message:finalize`, `message:heartbeat`, `message:systemMessage`. - **`handler:middleware`** - `({ use: MiddlewareFunction }) => void` - Called to register indexer's middlewares. - **`using-plugins`** - `using (componentCode)` - For all cases where you want to use a hook without creating a plugin, you can use the `hooks` property of the indexer. IMPORTANT: inline hooks are the recommended way to add hooks to an indexer. If the same hook is needed in multiple indexers, it is better to create a plugin. Usually, plugins lives in the `lib` folder, for example `lib/my-plugin.ts`. ## Using plugins You can register plugins in the indexer's configuration, under the `routes/_marketing` key. ```ts import { BeaconChainStream } from "@apibara/beaconchain"; import { defineIndexer } from "@apibara/indexer"; import { myAwesomePlugin } from "@/lib/my-plugin.ts"; export default defineIndexer(BeaconChainStream)({ streamUrl: "https://beaconchain.preview.apibara.org", filter: { /* ... */ }, plugins: [myAwesomePlugin()], async transform({ block: { header, validators } }) { /* ... */ }, }); ``` ## Building plugins Developers can create new plugins to be shared across multiple indexers or projects. Plugins use the available hooks to extend the functionality of indexers. The main way to define a plugin is by using the `defineIndexerPlugin` function. This function takes a callback with the indexer as parameter, the plugin should register itself with the indexer's hooks. When the runner runs the indexer, all the relevant hooks are called. ```ts import type { Cursor } from "@apibara/protocol"; import { defineIndexerPlugin } from "@apibara/indexer/plugins"; export function myAwesomePlugin() { return defineIndexerPlugin((indexer) => { indexer.hooks.hook("connect:before", ({ request, options }) => { // Do something before the indexer connects to the DNA stream. }); indexer.hooks.hook("run:after", () => { // Do something after the indexer has finished running. }); }); } ``` ## Middleware Apibara indexers support wrapping the `transform` function in middleware. This is used, for example, to wrap all database operations in a transaction. The middleware is registered using the `handler:middleware` hook. This hook takes a `use` argument to register the middleware with the indexer. The argument to `use` is a function that takes the indexer's context and a `next` function to call the next middleware or the transform function. ```ts import type { Cursor } from "@apibara/protocol"; import { defineIndexerPlugin } from "@apibara/indexer/plugins"; export function myAwesomePlugin() { return defineIndexerPlugin((indexer) => { const db = openDatabase(); indexer.hooks.hook("handler:middleware", ({ use }) => { use(async (context, next) => { // Start a transaction. await db.transaction(async (txn) => { // Add the transaction to the context. context.db = txn; try { // Call the next middleware or the transform function. await next(); } finally { // Remove the transaction from the context. context.db = undefined; } }); }); }); }); } ``` ## Inline hooks For all cases where you want to use a hook without creating a plugin, you can use the `hooks` property of the indexer. ```ts import { BeaconChainStream } from "@apibara/beaconchain"; import { defineIndexer } from "@apibara/indexer"; export default defineIndexer(BeaconChainStream)({ streamUrl: "https://beaconchain.preview.apibara.org", filter: { /* ... */ }, async transform({ block: { header, validators } }) { /* ... */ }, hooks: { async "connect:before"({ request, options }) { // Do something before the indexer connects to the DNA stream. }, }, }); ``` ## Indexer lifecycle The following Javascript pseudocode shows the indexer's lifecycle. This should give you a good understanding of when hooks are called. ```js function run(indexer) { indexer.callHook("run:before"); const { use, middleware } = registerMiddleware(indexer); indexer.callHook("handler:middleware", { use }); // Create the request based on the indexer's configuration. const request = Request.create({ filter: indexer.filter, startingCursor: indexer.startingCursor, finality: indexer.finality, }); // Stream options. const options = {}; indexer.callHook("connect:before", { request, options }); let stream = indexer.streamData(request, options); indexer.callHook("connect:after"); while (true) { const { message, done } = stream.next(); if (done) { break; } indexer.callHook("message", { message }); switch (message._tag) { case "data": { const { block, endCursor, finality } = message.data; middleware(() => { if (indexer.isFactoryMode()) { // Handle the factory portion of the indexer data. // Implementation detail is not important here. const newFilter = indexer.factory(); const request = Request.create(/* ... */); indexer.callHook("connect:factory", { request, endCursor }); stream = indexer.streamData(request, options); } indexer.transform({ block, endCursor, finality }); }); break; } case "invalidate": { indexer.callHook("message:invalidate", { message }); break; } case "finalize": { indexer.callHook("message:finalize", { message }); break; } case "heartbeat": { indexer.callHook("message:heartbeat", { message }); break; } case "systemMessage": { indexer.callHook("message:systemMessage", { message }); break; } } } indexer.callHook("run:after"); } ``` --- # Source: https://www.apibara.com/docs/dna/protocol # DNA wire protocol ## `Cursor` message Before explaining the DNA protocol in more detail, we're going to discuss the `Cursor` message type. This type is used by all methods discussed later and plays a central role in how DNA works. DNA models a blockchain as a sequence of blocks. The distance of a block from the first block in the chain (the genesis block) is known as chain height. The genesis block has height `0`. Ideally, a blockchain should always build a block on top of the most recent block, but that's not always the case. For this reason, a block's height isn't enough to uniquely identify a block in the blockchain. A _chain reorganization_ is when a chain produces blocks that are not building on top of the most recent block. As we will see later, the DNA protocol detects and handles chain reorganizations. A block that can't be part of a chain reorganization is _finalized_. DNA uses a _cursor_ to uniquely identify blocks on the chain. A cursor contains two fields: - `order_key`: the block's height. - `unique_key`: the block's unique identifier. Depending on the chain, it's the block hash or state root. ## `Status` method The `Status` method is used to retrieve the state of the DNA server. The request is an empty message. The response has the following fields: - `last_ingested`: returns the last block ingested by the server. This is the most recent block available for streaming. - `finalized`: the most recent finalized block. - `starting`: the first available block. Usually this is the genesis block, but DNA server operators can prune older nodes to save on storage space. ## `StreamData` method The `StreamData` method is used to start a DNA stream. It accepts a `StreamDataRequest` message and returns an infinite stream of `StreamDataResponse` messages. ### Request The request message is used to configure the stream. All fields except `filter` are optional. - `starting_cursor`: resume the stream from the provided cursor. The first block received in the stream will be the block following the provided cursor. If no cursor is provided, the stream will start from the genesis block. Notice that since `starting_cursor` is a cursor, the DNA server can detect if that block has been part of a chain's reorganization while the indexer was offline. - `finality`: the stream contains data with at least the specified finality. Possible values are _finalized_ (only receive finalized data), _accepted_ (receive finalized and non-finalized blocks), and _pending_ (receive finalized, non-finalized, and pending blocks). - `filter`: a non-empty list of chain-specific data filters. - `heartbeat_interval`: the stream will send an heartbeat message if there are no messages for the specified amount of time. This is useful to detect if the stream hangs. Value must be between 10 and 60 seconds. ### Response Once the server validates and accepts the request, it starts streaming data. Each stream message can be one of the following message types: - `data`: receive data about a block. - `invalidate`: the specified blocks don't belong to the canonical chain anymore because they were part of a chain reorganization. - `finalize`: the most recent finalized block moved forward. - `heartbeat`: an heartbeat message. - `system_message`: used to send messages from the server to the client. #### `Data` message Contains the requested data for a single block. All data messages cursors are monotonically increasing, unless an `Invalidate` message is received. The message contains the following fields: - `cursor`: the cursor of the block before this message. If the client reconnects using this cursor, the first message will be the same as this message. - `end_cursor`: this block's cursor. Reconnecting to the stream using this cursor will resume the stream. - `finality`: finality status of this block. - `production`: how the block was produced. Either `backfill` or `live`. - `data`: a list of encoded block data. Notice how the `data` field is a _list of block data_. This sounds counter-intuitive since the `Data` message contains data about a _single block_. The reason is that, as we've seen in the _"Request"_ section, the client can specify a list of filters. The `data` field has the same length as the request's `filters` field. In most cases, the client specifies a single filter and receives a single block of data. For advanced use cases (like tracking contracts deployed by a factory), the client uses multiple filters to have parallel streams of data synced on the block number. #### `Invalidate` message This message warns the client about a chain reorganization. It contains the following fields: - `cursor`: the new chain's head. All previously received messages where the `end_cursor.order_key` was greater than (`\u003e`) this message `cursor.order_key` should be considered invalid/recalled. - `removed`: a list of cursors that used to belong to the canonical chain. #### `Finalize` message This message contains a single `cursor` field with the cursor of the most recent finalized block. All data at or before this block can't be part of a chain reorganization. This message is useful to prune old data. #### `Heartbeat` message This message is sent at regular intervals once the stream reaches the chain's head. Clients can detect if the stream hang by adding a timeout to the stream's _receive_ method. #### `SytemMessage` message This message is used by the server to send out-of-band messages to the client. It contains text messages such as data usage, warnings about reaching the free quota, or information about upcoming system upgrades. ## protobuf definition This section contains the protobuf definition used by the DNA server and clients. If you're implementing a new SDK for DNA, you can use this as the starting point. ```proto syntax = "proto3"; package dna.v2.stream; import "google/protobuf/duration.proto"; service DnaStream { // Stream data from the server. rpc StreamData(StreamDataRequest) returns (stream StreamDataResponse); // Get DNA server status. rpc Status(StatusRequest) returns (StatusResponse); } // A cursor over the stream content. message Cursor { // Key used for ordering messages in the stream. // This is usually the block or slot number. uint64 order_key = 1; // Key used to discriminate branches in the stream. // This is usually the hash of the block. bytes unique_key = 2; } // Request for the `Status` method. message StatusRequest {} // Response for the `Status` method. message StatusResponse { // The current head of the chain. Cursor current_head = 1; // The last cursor that was ingested by the node. Cursor last_ingested = 2; // The finalized block. Cursor finalized = 3; // The first block available. Cursor starting = 4; } // Request data to be streamed. message StreamDataRequest { // Cursor to start streaming from. // If not specified, starts from the genesis block. // Use the data's message `end_cursor` field to resume streaming. optional Cursor starting_cursor = 1; // Return data with the specified finality. // If not specified, defaults to `DATA_FINALITY_ACCEPTED`. optional DataFinality finality = 2; // Filters used to generate data. repeated bytes filter = 3; // Heartbeat interval. // Value must be between 10 and 60 seconds. // If not specified, defaults to 30 seconds. optional google.protobuf.Duration heartbeat_interval = 4; } // Contains a piece of streamed data. message StreamDataResponse { oneof message { Data data = 1; Invalidate invalidate = 2; Finalize finalize = 3; Heartbeat heartbeat = 4; SystemMessage system_message = 5; } } // Invalidate data after the given cursor. message Invalidate { // The cursor of the new chain's head. // All data after this cursor should be considered invalid. Cursor cursor = 1; // List of blocks that were removed from the chain. repeated Cursor removed = 2; } // Move the finalized block forward. message Finalize { // The cursor of the new finalized block. // All data before this cursor cannot be invalidated. Cursor cursor = 1; } // A single block of data. message Data { // Cursor that generated this block of data. optional Cursor cursor = 1; // Block cursor. Use this cursor to resume the stream. Cursor end_cursor = 2; // The finality status of the block. DataFinality finality = 3; // The block data. // This message contains chain-specific data serialized using protobuf. repeated bytes data = 4; // The production mode of the block. DataProduction production = 5; } // Sent to clients to check if stream is still connected. message Heartbeat {} // Message from the server to the client. message SystemMessage { oneof output { // Output to stdout. string stdout = 1; // Output to stderr. string stderr = 2; } } // Data finality. enum DataFinality { DATA_FINALITY_UNKNOWN = 0; // Data was received, but is not part of the canonical chain yet. DATA_FINALITY_PENDING = 1; // Data is now part of the canonical chain, but could still be invalidated. DATA_FINALITY_ACCEPTED = 2; // Data is finalized and cannot be invalidated. DATA_FINALITY_FINALIZED = 3; } // Data production mode. enum DataProduction { DATA_PRODUCTION_UNKNOWN = 0; // Data is for a backfilled block. DATA_PRODUCTION_BACKFILL = 1; // Data is for a live block. DATA_PRODUCTION_LIVE = 2; } ``` --- # Source: https://www.apibara.com/docs/networks/starknet/upgrade-from-v1 # Upgrading from v1 This page contains a list of changes between DNA v1 and DNA v2. ## @apibara/starknet package This package now works in combination with `@apibara/protocol` to provide a DNA stream that automatically encodes and decodes the Protobuf data. This means that field elements are automatically converted to `0x${string}` values. Notice that the data stream is now unary. ```js import { createClient } from "@apibara/protocol"; import { Filter, StarknetStream } from "@apibara/starknet"; const client = createClient(StarknetStream, process.env.STREAM_URL); const filter = { events: [ { address: "0x049d36570d4e46f48e99674bd3fcc84644ddd6b96f7c741b1562b82f9e004dc7", }, ], satisfies: Filter, }; const request = StarknetStream.Request.make({ filter: [filter], finality: "accepted", startingCursor: { orderKey: 800_000n, }, }); for await (const message of client.streamData(request)) { switch (message._tag) { case "data": { break; } case "invalidate": { break; } default: { break; } } } ``` ### Reconnecting on error **NOTE:** this section only applies if you're using the gRPC client directly. The client now doesn't automatically reconnect on error. This is because the reconnection step is very delicate and depends on your indexer's implementation. The recommended approach is to wrap your indexer's main loop in a `try/catch` block. ```ts import { createClient, type ClientError, type Status } from "@apibara/protocol"; import { Filter, StarknetStream } from "@apibara/starknet"; const client = createClient(StarknetStream, process.env.STREAM_URL); const filter = { events: [ { address: "0x049d36570d4e46f48e99674bd3fcc84644ddd6b96f7c741b1562b82f9e004dc7", }, ], satisfies: Filter, }; while (true) { try { const startingCursor = await loadCursorFromDatabase(); const request = StarknetStream.Request.make({ filter: [filter], finality: "accepted", startingCursor, }); for await (const message of client.streamData(request)) { } } catch (err) { if (err instanceof ClientError) { // It's a gRPC error. if (err.status !== Status.INTERNAL) { // NON-INTERNAL errors are not recoverable. throw err; } // INTERNAL errors are caused by a disconnection. // Sleep and reconnect. await new Promise((r) => setTimeout(r, 2_000)); } } } ``` ## Filter ### Header - The `header` field is now an enum. See the [dedicated section](/docs/networks/starknet/filter#header) in the filter documentation for more information. ### Events - `fromAddress` is now `address`. - The `keys` field accepts `null` values to match any key at that position. - The `data` field was removed. - Use `transactionStatus: "all"` instead of `includeReverted` to include reverted transactions. - `includeReceipt` and `includeTransaction` are now `false` by default. ### Transactions - Now you can only filter by transaction type. - We will add transaction-specific filters in the future. - Use `transactionStatus: "all"` instead of `includeReverted` to include reverted transactions. - `includeReceipt` is now `false` by default. ### Messages - Can now filter by `fromAddress` and `toAddress`. - Use `transactionStatus: "all"` instead of `includeReverted` to include reverted transactions. - `includeReceipt` and `includeTransaction` are now `false` by default. ### State Update - State update has been split into separate filters for storage diffs, contract changes, and nonce updates. - Declared and deployed contracts, declared classes, and replaced classes are now a single `contractChanges` filter. ## Block data - Block data has been _flattened_. Use the `*Index` field to access related data. For example, the following code iterates over all events and looks up their transactions. ```js for (const event of block.events) { const transaction = block.transactions.find( (tx) => tx.transactionIndex === event.transactionIndex ); } ``` ## Events - `fromAddress` is now `address`. - `index` is now `eventIndex`. - Events now include `transactionIndex`, `transactionHash`, and `transactionStatus`. ## Transactions - `TransactionMeta` now includes `transactionIndex`, `transactionHash`, and `transactionStatus`. - The transaction type is now an enum using the `_tag` field as discriminator. - For other minor changes, see the [transaction documentation](/docs/networks/starknet/data#transaction). ## Receipts - Transaction receipts are now transaction-specific. - For other minor changes, see the [receipts documentation](/docs/networks/starknet/data#transaction-receipt). ## Messages - `index` is now `messageIndex`. - Messages now include `transactionIndex`, `transactionHash`, and `transactionStatus`.