Skip to content

Commit

Permalink
Add getCompressedMintTokenHolders endpoint and fix pagination for sig…
Browse files Browse the repository at this point in the history
…natures and compressed token accounts (#241)

* Increase poller timeout

* Fix poller bug

* Intermediate commit

* Intermediate commit

* Intermediate commit

* Intermediate commit

* Intermediate commit

* Update docs
  • Loading branch information
pmantica11 authored Nov 8, 2024
1 parent 15495a9 commit cf6c6eb
Show file tree
Hide file tree
Showing 34 changed files with 517 additions and 97 deletions.
15 changes: 15 additions & 0 deletions src/api/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ use super::method::get_compressed_account::AccountResponse;
use super::method::get_compressed_balance_by_owner::{
get_compressed_balance_by_owner, GetCompressedBalanceByOwnerRequest,
};
use super::method::get_compressed_mint_token_holders::{
get_compressed_mint_token_holders, GetCompressedMintTokenHoldersRequest, OwnerBalancesResponse,
};
use super::method::get_compressed_token_balances_by_owner::{
get_compressed_token_balances_by_owner, GetCompressedTokenBalancesByOwnerRequest,
TokenBalancesResponse,
Expand Down Expand Up @@ -210,6 +213,13 @@ impl PhotonApi {
get_compressed_accounts_by_owner(self.db_conn.as_ref(), request).await
}

pub async fn get_compressed_mint_token_holders(
&self,
request: GetCompressedMintTokenHoldersRequest,
) -> Result<OwnerBalancesResponse, PhotonApiError> {
get_compressed_mint_token_holders(self.db_conn.as_ref(), request).await
}

pub async fn get_multiple_compressed_accounts(
&self,
request: GetMultipleCompressedAccountsRequest,
Expand Down Expand Up @@ -306,6 +316,11 @@ impl PhotonApi {
request: Some(GetCompressedAccountsByOwnerRequest::schema().1),
response: GetCompressedAccountsByOwnerResponse::schema().1,
},
OpenApiSpec {
name: "getCompressedMintTokenHolders".to_string(),
request: Some(GetCompressedMintTokenHoldersRequest::schema().1),
response: OwnerBalancesResponse::schema().1,
},
OpenApiSpec {
name: "getMultipleCompressedAccounts".to_string(),
request: Some(GetMultipleCompressedAccountsRequest::adjusted_schema()),
Expand Down
110 changes: 110 additions & 0 deletions src/api/method/get_compressed_mint_token_holders.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use byteorder::{ByteOrder, LittleEndian};
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, QuerySelect};
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

use crate::common::typedefs::bs58_string::Base58String;
use crate::common::typedefs::serializable_pubkey::SerializablePubkey;
use crate::common::typedefs::unsigned_integer::UnsignedInteger;
use crate::dao::generated::token_owner_balances;

use super::super::error::PhotonApiError;
use super::utils::{parse_decimal, Context, Limit, PAGE_LIMIT};

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
pub struct OwnerBalance {
pub owner: SerializablePubkey,
pub balance: UnsignedInteger,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
pub struct OwnerBalanceList {
pub items: Vec<OwnerBalance>,
pub cursor: Option<Base58String>,
}

// We do not use generics to simplify documentation generation.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
pub struct OwnerBalancesResponse {
pub context: Context,
pub value: OwnerBalanceList,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema, Default)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
pub struct GetCompressedMintTokenHoldersRequest {
pub mint: SerializablePubkey,
pub cursor: Option<Base58String>,
pub limit: Option<Limit>,
}

pub async fn get_compressed_mint_token_holders(
conn: &DatabaseConnection,
request: GetCompressedMintTokenHoldersRequest,
) -> Result<OwnerBalancesResponse, PhotonApiError> {
let context = Context::extract(conn).await?;
let GetCompressedMintTokenHoldersRequest {
mint,
cursor,
limit,
} = request;
let mut filter = token_owner_balances::Column::Mint.eq::<Vec<u8>>(mint.into());
if let Some(cursor) = cursor {
let bytes = cursor.0;
let expected_cursor_length = 40;
let (balance, owner) = if bytes.len() == expected_cursor_length {
let (balance, owner) = bytes.split_at(expected_cursor_length);
(balance, owner)
} else {
return Err(PhotonApiError::ValidationError(format!(
"Invalid cursor length. Expected {}. Received {}.",
expected_cursor_length,
bytes.len()
)));
};
let balance = LittleEndian::read_u64(&balance);
filter = filter.and(
token_owner_balances::Column::Amount.lt(balance).or(
token_owner_balances::Column::Amount
.eq(balance)
.and(token_owner_balances::Column::Owner.gt::<Vec<u8>>(owner.into())),
),
);
}
let limit = limit.map(|l| l.value()).unwrap_or(PAGE_LIMIT);

let items = token_owner_balances::Entity::find()
.filter(filter)
.order_by_desc(token_owner_balances::Column::Amount)
.order_by_asc(token_owner_balances::Column::Mint)
.limit(limit)
.all(conn)
.await?
.drain(..)
.map(|token_owner_balance| {
Ok(OwnerBalance {
owner: token_owner_balance.owner.try_into()?,
balance: UnsignedInteger(parse_decimal(token_owner_balance.amount)?),
})
})
.collect::<Result<Vec<OwnerBalance>, PhotonApiError>>()?;

let mut cursor = items.last().map(|item| {
Base58String({
let item = item.clone();
let mut bytes: Vec<u8> = Vec::new();
bytes.extend_from_slice(&item.balance.0.to_le_bytes());
bytes.extend_from_slice(&item.owner.0.to_bytes());
bytes
})
});
if items.len() < limit as usize {
cursor = None;
}

Ok(OwnerBalancesResponse {
value: OwnerBalanceList { items, cursor },
context,
})
}
81 changes: 54 additions & 27 deletions src/api/method/get_compressed_token_balances_by_owner.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::collections::HashMap;

use sea_orm::DatabaseConnection;
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, QuerySelect};
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

use crate::common::typedefs::bs58_string::Base58String;
use crate::common::typedefs::serializable_pubkey::SerializablePubkey;
use crate::common::typedefs::unsigned_integer::UnsignedInteger;
use crate::dao::generated::token_owner_balances;

use super::utils::{Authority, Context, GetCompressedTokenAccountsByAuthorityOptions, Limit};
use super::{super::error::PhotonApiError, utils::fetch_token_accounts};
use super::utils::{
parse_decimal, Context, Limit,
PAGE_LIMIT,
};
use super::super::error::PhotonApiError;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
pub struct TokenBalance {
Expand All @@ -20,7 +23,7 @@ pub struct TokenBalance {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
pub struct TokenBalanceList {
pub token_balances: Vec<TokenBalance>,
pub cursor: Option<String>,
pub cursor: Option<Base58String>,
}

// We do not use generics to simplify documentation generation.
Expand All @@ -44,40 +47,64 @@ pub async fn get_compressed_token_balances_by_owner(
conn: &DatabaseConnection,
request: GetCompressedTokenBalancesByOwnerRequest,
) -> Result<TokenBalancesResponse, PhotonApiError> {
let context = Context::extract(conn).await?;
let GetCompressedTokenBalancesByOwnerRequest {
owner,
mint,
cursor,
limit,
} = request;
let mut filter = token_owner_balances::Column::Owner.eq::<Vec<u8>>(owner.into());
if let Some(mint) = mint {
filter = filter.and(token_owner_balances::Column::Mint.eq::<Vec<u8>>(mint.into()));
}
if let Some(cursor) = cursor {
let bytes = cursor.0;
let expected_cursor_length = 32;
let mint = if bytes.len() == expected_cursor_length {
bytes.to_vec()
} else {
return Err(PhotonApiError::ValidationError(format!(
"Invalid cursor length. Expected {}. Received {}.",
expected_cursor_length,
bytes.len()
)));
};
filter = filter.and(token_owner_balances::Column::Mint.gte::<Vec<u8>>(mint.into()));
}
let limit = limit.map(|l| l.value()).unwrap_or(PAGE_LIMIT);

let options = GetCompressedTokenAccountsByAuthorityOptions {
mint,
cursor,
limit,
};
let token_accounts = fetch_token_accounts(conn, Authority::Owner(owner), options).await?;
let mut mint_to_balance: HashMap<SerializablePubkey, u64> = HashMap::new();
let items = token_owner_balances::Entity::find()
.filter(filter)
.order_by_asc(token_owner_balances::Column::Mint)
.limit(limit)
.all(conn)
.await?
.drain(..)
.map(|token_owner_balance| {
Ok(TokenBalance {
mint: token_owner_balance.mint.try_into()?,
balance: UnsignedInteger(parse_decimal(token_owner_balance.amount)?),
})
})
.collect::<Result<Vec<TokenBalance>, PhotonApiError>>()?;

for token_account in token_accounts.value.items.iter() {
let balance = mint_to_balance
.entry(token_account.token_data.mint)
.or_insert(0);
*balance += token_account.token_data.amount.0;
}
let token_balances: Vec<TokenBalance> = mint_to_balance
.into_iter()
.map(|(mint, balance)| TokenBalance {
mint,
balance: UnsignedInteger(balance),
let mut cursor = items.last().map(|item| {
Base58String({
let item = item.clone();
let bytes: Vec<u8> = item.mint.into();
bytes
})
.collect();
});
if items.len() < limit as usize {
cursor = None;
}

Ok(TokenBalancesResponse {
context: token_accounts.context,
value: TokenBalanceList {
token_balances,
cursor: None,
token_balances: items,
cursor,
},
context,
})
}
1 change: 1 addition & 0 deletions src/api/method/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod get_compressed_account_balance;
pub mod get_compressed_account_proof;
pub mod get_compressed_accounts_by_owner;
pub mod get_compressed_balance_by_owner;
pub mod get_compressed_mint_token_holders;
pub mod get_compressed_token_account_balance;
pub mod get_compressed_token_accounts_by_delegate;
pub mod get_compressed_token_accounts_by_owner;
Expand Down
24 changes: 15 additions & 9 deletions src/api/method/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,14 @@ pub async fn fetch_token_accounts(
)));
}
let (mint, hash) = bytes.split_at(32);
filter = filter
.and(token_accounts::Column::Mint.gte::<Vec<u8>>(mint.into()))
.and(token_accounts::Column::Hash.gt::<Vec<u8>>(hash.into()));

filter = filter.and(
token_accounts::Column::Mint.gt::<Vec<u8>>(mint.into()).or(
token_accounts::Column::Mint
.eq::<Vec<u8>>(mint.into())
.and(token_accounts::Column::Hash.gt::<Vec<u8>>(hash.into())),
),
);
}
if let Some(l) = options.limit {
limit = l.value();
Expand All @@ -282,9 +287,11 @@ pub async fn fetch_token_accounts(
let items = token_accounts::Entity::find()
.find_also_related(accounts::Entity)
.filter(filter)
.order_by_asc(token_accounts::Column::Mint)
.order_by_asc(token_accounts::Column::Hash)
.order_by(token_accounts::Column::Mint, sea_orm::Order::Asc)
.order_by(token_accounts::Column::Hash, sea_orm::Order::Asc)
.limit(limit)
.order_by(token_accounts::Column::Mint, sea_orm::Order::Asc)
.order_by(token_accounts::Column::Hash, sea_orm::Order::Asc)
.all(conn)
.await?
.drain(..)
Expand Down Expand Up @@ -572,15 +579,14 @@ fn compute_cursor_filter(
let signature = Signature::try_from(signature).map_err(|_| {
PhotonApiError::ValidationError("Invalid signature in cursor".to_string())
})?;
let (slot_arg_index, signature_arg_index) =
(num_preceding_args + 1, num_preceding_args + 2);

Ok((
format!(
"AND transactions.slot <= ${} AND transactions.signature < ${}",
slot_arg_index, signature_arg_index
"AND (transactions.slot < ${} OR (transactions.slot = ${} AND transactions.signature < ${}))",
num_preceding_args + 1, num_preceding_args + 2, num_preceding_args + 3
),
vec![
slot.into(),
slot.into(),
Into::<Vec<u8>>::into(Into::<[u8; 64]>::into(signature)).into(),
],
Expand Down
10 changes: 10 additions & 0 deletions src/api/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,16 @@ fn build_rpc_module(api_and_indexer: PhotonApi) -> Result<RpcModule<PhotonApi>,
.map_err(Into::into)
},
)?;
module.register_async_method(
"getCompressedMintTokenHolders",
|rpc_params, rpc_context| async move {
let api = rpc_context.as_ref();
let payload = rpc_params.parse()?;
api.get_compressed_mint_token_holders(payload)
.await
.map_err(Into::into)
},
)?;

Ok(module)
}
54 changes: 54 additions & 0 deletions src/migration/m20241008_000006_init.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use sea_orm_migration::prelude::*;
use sea_orm_migration::sea_orm::{ConnectionTrait, DatabaseBackend, Statement};

use crate::migration::model::table::TokenOwnerBalances;

#[derive(DeriveMigrationName)]
pub struct Migration;

async fn execute_sql<'a>(manager: &SchemaManager<'_>, sql: &str) -> Result<(), DbErr> {
manager
.get_connection()
.execute(Statement::from_string(
manager.get_database_backend(),
sql.to_string(),
))
.await?;
Ok(())
}

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
if manager.get_database_backend() == DatabaseBackend::Postgres {
// Create index concurrently for Postgres
execute_sql(
manager,
"CREATE INDEX CONCURRENTLY IF NOT EXISTS token_holder_mint_balance_owner_idx ON token_owner_balances (mint, amount, owner);",
)
.await?;
} else {
// For other databases, create index normally
execute_sql(
manager,
"CREATE INDEX IF NOT EXISTS token_holder_mint_balance_owner_idx ON token_owner_balances (mint, amount, owner);",
)
.await?;
}

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_index(
Index::drop()
.name("token_holder_mint_balance_owner_idx")
.table(TokenOwnerBalances::Table)
.to_owned(),
)
.await?;

Ok(())
}
}
Loading

0 comments on commit cf6c6eb

Please sign in to comment.