Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DB/ Created Airdrop table in DB, Added crud methods #112

Merged
merged 17 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ def upgrade() -> None:
if column_exists('position', 'start_price'):
print("Column 'start_price' already exists, skipping creation.")
else:
op.add_column('position', sa.Column('start_price', sa.DECIMAL(), nullable=False))
op.add_column('position', sa.Column('start_price', sa.DECIMAL(), nullable=False,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change old migration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without this config its giving error

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which error, please ,be more specific, if you have any error, attach it here with explanation what did you run the command to get this error and traceback of this error

server_default='0.0'))
print("Column 'start_price' added to the 'position' table.")
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('position', 'start_price',
existing_type=sa.DOUBLE_PRECISION(precision=53),
type_=sa.DECIMAL(),
existing_nullable=False)
existing_nullable=False,
server_default=None)
# ### end Alembic commands ###


Expand Down
67 changes: 67 additions & 0 deletions web_app/alembic/versions/e69320e12cc7_add_airdrop_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Add AirDrop model

Revision ID: e69320e12cc7
Revises: a009512f5362
Create Date: 2024-10-25 16:47:37.723379

"""

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'e69320e12cc7'
down_revision = 'a009512f5362'
branch_labels = None
depends_on = None


def upgrade() -> None:
"""
Perform the upgrade migration to create the 'airdrop' table in the database.

This migration creates a new table, 'airdrop', with the following columns:

- `id`: Primary key, UUID type, non-nullable.
- `user_id`: Foreign key referencing `user.id`, UUID type, non-nullable.
- `created_at`: Timestamp for when the airdrop was created, DateTime type, non-nullable.
- `amount`: Decimal type, nullable, representing the amount associated with the airdrop.
- `is_claimed`: Boolean type, indicating if the airdrop has been claimed, nullable.
- `claimed_at`: Timestamp for when the airdrop was claimed, DateTime type, nullable.

Additional configuration:

- Foreign key constraint on `user_id` to reference the `user` table.
- Primary key constraint on `id`.
- Index on `user_id` for optimized querying based on the `user_id` field.

This function is part of the Alembic migration and is auto-generated.
Adjustments may be made if additional configuration or constraints are needed.
"""
op.create_table('airdrop',
binayak9932 marked this conversation as resolved.
Show resolved Hide resolved
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('user_id', sa.UUID(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('amount', sa.DECIMAL(), nullable=True),
sa.Column('is_claimed', sa.Boolean(), nullable=True),
sa.Column('claimed_at', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_airdrop_user_id'), 'airdrop', ['user_id'], unique=False)
# ### end Alembic commands ###


def downgrade() -> None:
"""
Perform the downgrade migration to remove the 'airdrop' table from the database.

This migration drops the 'airdrop' table and its associated index on `user_id`.
It is intended to reverse the changes made in the `upgrade` function, allowing
for a rollback of the database schema to the state before the 'airdrop' table was added.
"""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_airdrop_user_id'), table_name='airdrop')
binayak9932 marked this conversation as resolved.
Show resolved Hide resolved
op.drop_table('airdrop')
# ### end Alembic commands ###
66 changes: 58 additions & 8 deletions web_app/db/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

import logging
import uuid
from typing import Type, TypeVar
from datetime import datetime
from decimal import Decimal
from typing import List, Type, TypeVar

from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import scoped_session, sessionmaker
from web_app.db.database import SQLALCHEMY_DATABASE_URL
from web_app.db.models import Base, Position, Status, User
from web_app.db.models import AirDrop, Base, Position, Status, User

logger = logging.getLogger(__name__)
ModelType = TypeVar("ModelType", bound=Base)
Expand Down Expand Up @@ -112,6 +114,16 @@ def delete_object(self, model: Type[Base] = None, obj_id: uuid = None) -> None:
finally:
db.close()

def create_empty_claim(self, user_id: uuid.UUID) -> AirDrop:
"""
Creates a new empty AirDrop instance for the given user_id.
:param user_id: uuid.UUID
:return: AirDrop
"""
airdrop = AirDrop(user_id=user_id)
self.write_to_db(airdrop)
return airdrop


class UserDBConnector(DBConnector):
"""
Expand Down Expand Up @@ -317,17 +329,21 @@ def close_position(self, position_id: uuid) -> Position | None:
self.write_to_db(position)
return position.status

def open_position(self, position_id: uuid) -> Position | None:
def open_position(self, position_id: uuid.UUID) -> str | None:
"""
Retrieves a position by its contract address.
:param position_id: uuid
:return: Position | None
Opens a position by updating its status and creating an AirDrop claim.
:param position_id: uuid.UUID
:return: str | None
"""
position = self.get_object(Position, position_id)
if position:
position.status = Status.OPENED.value
self.write_to_db(position)
return position.status
self.create_empty_claim(position.user_id)
return position.status
else:
logger.error(f"Position with ID {position_id} not found")
return None

def get_unique_users_count(self) -> int:
"""
Expand Down Expand Up @@ -360,4 +376,38 @@ def get_total_amounts_for_open_positions(self) -> float | None:
return total_opened_amount
except SQLAlchemyError as e:
logger.error(f"Error calculating total amount for open positions: {e}")
return None
return None


class AirDropDBConnector(DBConnector):
"""
Provides database connection and operations management for the AirDrop model.
"""

def save_claim_data(self, airdrop_id: uuid.UUID, amount: Decimal) -> None:
"""
Updates the AirDrop instance with claim data.
:param airdrop_id: uuid.UUID
:param amount: Decimal
"""
airdrop = self.get_object(AirDrop, airdrop_id)
if airdrop:
airdrop.amount = amount
airdrop.is_claimed = True
airdrop.claimed_at = datetime.now()
self.write_to_db(airdrop)
else:
logger.error(f"AirDrop with ID {airdrop_id} not found")

def get_all_unclaimed(self) -> List[AirDrop]:
"""
Returns all AirDrop instances where is_claimed=False.
:return: List[AirDrop]
"""
with self.Session() as db:
try:
binayak9932 marked this conversation as resolved.
Show resolved Hide resolved
unclaimed = db.query(AirDrop).filter(AirDrop.is_claimed == False).all()
return unclaimed
except SQLAlchemyError as e:
logger.error(f"Failed to retrieve unclaimed AirDrops: {str(e)}")
return []
17 changes: 17 additions & 0 deletions web_app/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,20 @@ class Position(Base):
default="pending",
)
start_price = Column(DECIMAL, nullable=False)


class AirDrop(Base):
"""
SQLAlchemy model for the airdrop table.
"""

__tablename__ = "airdrop"

id = Column(UUID(as_uuid=True), primary_key=True, default=uuid4)
user_id = Column(
UUID(as_uuid=True), ForeignKey("user.id"), index=True, nullable=False
)
created_at = Column(DateTime, nullable=False, default=func.now())
amount = Column(DECIMAL, nullable=True)
is_claimed = Column(Boolean, default=False)
binayak9932 marked this conversation as resolved.
Show resolved Hide resolved
claimed_at = Column(DateTime, nullable=True)
171 changes: 171 additions & 0 deletions web_app/tests/test_airdrop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
"""
Tests for the AirDropDBConnector class, covering key database operations for airdrops.

Fixtures:
- db_connector: Provides an AirDropDBConnector instance with test user and airdrop data.

Test Cases:
- test_create_empty_claim_positive: Verifies airdrop creation for an existing user.
- test_create_empty_claim_non_existent_user: Checks error handling for invalid user IDs.
- test_save_claim_data_positive: Ensures claim data updates correctly.
- test_save_claim_data_non_existent_airdrop: Confirms logging for invalid airdrop IDs.
- test_get_all_unclaimed_positive: Retrieves unclaimed airdrops.
- test_get_all_unclaimed_after_claiming: Excludes claimed airdrops from unclaimed results.
"""

import uuid
from datetime import datetime
from decimal import Decimal

import pytest
from sqlalchemy.exc import SQLAlchemyError
from web_app.db.crud import AirDropDBConnector
from web_app.db.models import AirDrop, User


@pytest.fixture
def db_connector():
"""
Sets up an AirDropDBConnector with a test user and airdrop record, then cleans
up after the test.
This fixture:
- Initializes an AirDropDBConnector instance.
- Creates and saves a test user and associated airdrop record.
- Yields the connector, user, and airdrop instances for test use.
- Cleans up the database by removing the test user and airdrop after the test.

Yields:
tuple: (AirDropDBConnector, User, AirDrop)
"""
connector = AirDropDBConnector()
test_user = User(wallet_id="test_wallet_id")
connector.write_to_db(test_user)
airdrop = AirDrop(user_id=test_user.id)
connector.write_to_db(airdrop)
yield connector, test_user, airdrop
connector.delete_object(AirDrop, airdrop.id)
connector.delete_object(User, test_user.id)


def test_create_empty_claim_positive(db_connector):
"""
Tests that create_empty_claim successfully creates a new airdrop for an
existing user.

Steps:
- Calls create_empty_claim with a valid user ID.
- Asserts the airdrop is created with the correct user_id and
is initially unclaimed.

Args:
db_connector (fixture): Provides the AirDropDBConnector, test user,
and test airdrop.
"""
connector, test_user, _ = db_connector
new_airdrop = connector.create_empty_claim(test_user.id)
assert new_airdrop is not None
assert new_airdrop.user_id == test_user.id
assert not new_airdrop.is_claimed
connector.delete_object(AirDrop, new_airdrop.id)


def test_create_empty_claim_non_existent_user(db_connector):
"""
Tests that create_empty_claim raises an error when called with
a non-existent user ID.

Steps:
- Generates a fake user ID that does not exist in the database.
- Verifies that calling create_empty_claim with this ID raises
an SQLAlchemyError.

Args:
db_connector (fixture): Provides the AirDropDBConnector
and test setup.
"""
connector, _, _ = db_connector
fake_user_id = uuid.uuid4()
with pytest.raises(SQLAlchemyError):
connector.create_empty_claim(fake_user_id)


def test_save_claim_data_positive(db_connector):
"""
Tests that save_claim_data correctly updates an existing airdrop
with claim details.

Steps:
- Calls save_claim_data with a valid airdrop ID and amount.
- Asserts the airdrop's amount, is_claimed status, and claimed_at
timestamp are updated correctly.

Args:
db_connector (fixture): Provides the AirDropDBConnector, test user,
and test airdrop.
"""
connector, _, airdrop = db_connector
amount = Decimal("100.50")
connector.save_claim_data(airdrop.id, amount)
updated_airdrop = connector.get_object(AirDrop, airdrop.id)
assert updated_airdrop.amount == amount
assert updated_airdrop.is_claimed
assert updated_airdrop.claimed_at is not None


def test_save_claim_data_non_existent_airdrop(db_connector, caplog):
"""
Tests that save_claim_data logs an error when called with a non-existent
airdrop ID.

Steps:
- Generates a fake airdrop ID that is not in the database.
- Calls save_claim_data with this ID and checks that the appropriate
error message is logged.

Args:
db_connector (fixture): Provides the AirDropDBConnector and
test setup.
caplog (fixture): Captures log output for verification.
"""
connector, _, _ = db_connector
fake_airdrop_id = uuid.uuid4()
connector.save_claim_data(fake_airdrop_id, Decimal("50.00"))
assert f"AirDrop with ID {fake_airdrop_id} not found" in caplog.text


def test_get_all_unclaimed_positive(db_connector):
"""
Tests that get_all_unclaimed retrieves unclaimed airdrops correctly.

Steps:
- Calls get_all_unclaimed to fetch unclaimed airdrops.
- Asserts that the test airdrop (unclaimed) is present in the retrieved
list by matching IDs.

Args:
db_connector (fixture): Provides the AirDropDBConnector, test user,
and test airdrop.
"""
connector, _, airdrop = db_connector
unclaimed_airdrops = connector.get_all_unclaimed()
assert any(airdrop.id == unclaimed.id for unclaimed in unclaimed_airdrops)


def test_get_all_unclaimed_after_claiming(db_connector):
"""
Tests that get_all_unclaimed excludes airdrops that have been claimed.

Steps:
- Marks the test airdrop as claimed using save_claim_data.
- Calls get_all_unclaimed to fetch unclaimed airdrops.
- Asserts that the claimed airdrop is not included in the
returned list.

Args:
db_connector (fixture): Provides the AirDropDBConnector,
test user, and test airdrop.
"""
connector, _, airdrop = db_connector
connector.save_claim_data(airdrop.id, Decimal("50.00"))
unclaimed_airdrops = connector.get_all_unclaimed()
assert airdrop not in unclaimed_airdrops
Loading