Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][feat] Add Async Streams abstraction #2273

Merged
merged 9 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fixcore/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ ignored-modules=

# List of classes names for which member attributes should not be checked
# (useful for classes with attributes dynamically set).
ignored-classes=SQLObject, optparse.Values, thread._local, _thread._local, aiostream.pipe
ignored-classes=SQLObject, optparse.Values, thread._local, _thread._local

# List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E1101 when accessed. Python regular
Expand Down
5 changes: 2 additions & 3 deletions fixcore/fixcore/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
AsyncIterable,
)

from aiostream import stream
from aiostream.core import Stream
from parsy import Parser, regex, string

from fixcore.model.graph_access import Section
from fixcore.types import JsonElement, Json
from fixcore.util import utc, parse_utc, AnyT
from fixlib.asynchronous.stream import Stream
from fixlib.durations import parse_duration, DurationRe
from fixlib.parse_util import (
make_parser,
Expand All @@ -47,7 +46,7 @@
# A sink function takes a stream and creates a result
Sink = Callable[[JsStream], Awaitable[T]]

list_sink: Callable[[JsGen], Awaitable[Any]] = stream.list # type: ignore
list_sink: Callable[[JsGen], Awaitable[List[Any]]] = Stream.as_list


@make_parser
Expand Down
34 changes: 17 additions & 17 deletions fixcore/fixcore/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@
from typing import Dict, List, Tuple, Union, Sequence
from typing import Optional, Any, TYPE_CHECKING

from aiostream import stream
from attrs import evolve
from parsy import Parser
from rich.padding import Padding

from fixcore import version
from fixcore.analytics import CoreEvent
from fixcore.cli import cmd_with_args_parser, key_values_parser, T, Sink, args_values_parser, JsGen
from fixcore.cli import cmd_with_args_parser, key_values_parser, T, Sink, args_values_parser, JsStream
from fixcore.cli.command import (
SearchPart,
PredecessorsPart,
Expand Down Expand Up @@ -78,6 +77,7 @@
from fixcore.types import JsonElement
from fixcore.user.model import Permission
from fixcore.util import group_by
from fixlib.asynchronous.stream import Stream
from fixlib.parse_util import make_parser, pipe_p, semicolon_p

if TYPE_CHECKING:
Expand All @@ -104,7 +104,7 @@ def command_line_parser() -> Parser:
return ParsedCommands(commands, maybe_env if maybe_env else {})


# multiple piped commands are separated by semicolon
# semicolon separates multiple piped commands
multi_command_parser = command_line_parser.sep_by(semicolon_p)


Expand Down Expand Up @@ -187,7 +187,7 @@ def overview() -> str:
logo = ctx.render_console(Padding(WelcomeCommand.ck, pad=(0, 0, 0, middle))) if ctx.supports_color() else ""
return headline + logo + ctx.render_console(result)

def help_command() -> JsGen:
def help_command() -> JsStream:
if not arg:
result = overview()
elif arg == "placeholders":
Expand All @@ -209,7 +209,7 @@ def help_command() -> JsGen:
else:
result = f"No command found with this name: {arg}"

return stream.just(result)
return Stream.just(result)

return CLISource.single(help_command, required_permissions={Permission.read})

Expand Down Expand Up @@ -352,11 +352,11 @@ def command(
self, name: str, arg: Optional[str] = None, ctx: CLIContext = EmptyContext, **kwargs: Any
) -> ExecutableCommand:
"""
Create an executable command for given command name, args and context.
:param name: the name of the command to execute (must be a known command)
:param arg: the arg of the command (must be parsable by the command)
:param ctx: the context of this command.
:return: the ready to run executable command.
Create an executable command for given command name, args, and context.
:param name: The name of the command to execute (must be a known command).
:param arg: The arg of the command (must be parsable by the command).
:param ctx: The context of this command.
:return: The ready to run executable command.
:raises:
CLIParseError: if the name of the command is not known, or the argument fails to parse.
"""
Expand All @@ -377,9 +377,9 @@ async def create_query(
Takes a list of query part commands and combine them to a single executable query command.
This process can also introduce new commands that should run after the query is finished.
Therefore, a list of executable commands is returned.
:param commands: the incoming executable commands, which actions are all instances of SearchCLIPart.
:param ctx: the context to execute within.
:return: the resulting list of commands to execute.
:param commands: The incoming executable commands, which actions are all instances of SearchCLIPart.
:param ctx: The context to execute within.
:return: The resulting list of commands to execute.
"""

# Pass parsed options to execute query
Expand Down Expand Up @@ -484,8 +484,8 @@ async def parse_query(query_arg: str) -> Query:
first_head_tail_in_a_row = None
head_tail_keep_order = True

# Define default sort order, if not already defined
# A sort order is required to always return the result in a deterministic way to the user.
# Define default sort order, if not already defined.
# A sort order is required to always return the result deterministically to the user.
# Deterministic order is required for head/tail to work
if query.is_simple_fulltext_search():
# Do not define any additional sort order for fulltext searches
Expand All @@ -494,7 +494,7 @@ async def parse_query(query_arg: str) -> Query:
parts = [pt if pt.sort else evolve(pt, sort=default_sort) for pt in query.parts]
query = evolve(query, parts=parts)

# If the last part is a navigation, we need to add sort which will ingest a new part.
# If the last part is a navigation, we need to add a sort which will ingest a new part.
with_sort = query.set_sort(*default_sort) if query.current_part.navigation else query
section = ctx.env.get("section", PathRoot)
# If this is an aggregate query, the default sort needs to be changed
Expand Down Expand Up @@ -534,7 +534,7 @@ def rewrite_command_line(cmds: List[ExecutableCommand], ctx: CLIContext) -> List
Rules:
- add the list command if no output format is defined
- add a format to write commands if no output format is defined
- report benchmark run will be formatted as benchmark result automatically
- report benchmark run will be formatted as a benchmark result automatically
"""
if ctx.env.get("no_rewrite") or len(cmds) == 0:
return cmds
Expand Down
Loading
Loading