Skip to content

Commit

Permalink
Merge pull request #13 from flightaware/BCK-5393-message-replay
Browse files Browse the repository at this point in the history
Cleanup, no particular theme
  • Loading branch information
conej730 authored Dec 2, 2020
2 parents 94ce028 + 75f0ef0 commit d27ba38
Show file tree
Hide file tree
Showing 15 changed files with 102 additions and 116 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ __pycache__/
.env
venv/
.python-version
.mypy_cache/
5 changes: 5 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ jobs:
env:
- INIT_CMD_TIME="pitr 1584126630"
- INIT_CMD_ARGS="events \"flightplan departure arrival cancellation\""
- KAFKA_FLIFO_TOPIC_NAME=feed1
- KAFKA_POSITION_TOPIC_NAME=position_feed1
install:
- cd connector
- make pip-sync-travis

- stage: Db-updater
env:
- KAFKA_TOPIC_NAME=feed1
- KAFKA_GROUP_NAME=group1
install:
- cd db-updater
- make pip-sync-travis
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
run:
docker-compose up --build
docker-compose build --parallel && docker-compose up

run-background:
docker-compose up -d --build
docker-compose build --parallel && docker-compose up -d
1 change: 0 additions & 1 deletion Makefile.inc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ pip-sync: venv pip-compile

pip-sync-travis:
$(PYTHON) -m pip install pip-tools
make -C requirements all
pip-sync $(wildcard requirements/*.txt)

docker-setup: venv pip-sync
Expand Down
67 changes: 36 additions & 31 deletions connector/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
CONNECTION_ERROR_LIMIT = 3

COMPRESSION: str
USERNAME: Optional[str]
APIKEY: Optional[str]
USERNAME: str
APIKEY: str
KEEPALIVE: int
INIT_CMD_ARGS: str
INIT_CMD_TIME: str
Expand Down Expand Up @@ -78,8 +78,7 @@ async def open_connection(


def build_init_cmd(time_mode: str) -> str:
"""Builds the init command based on the environment variables provided in docker-compose
"""
"""Builds the init command based on the environment variables provided in docker-compose"""
initiation_command = f"{time_mode} username {USERNAME} password {APIKEY}"
if COMPRESSION != "":
initiation_command += f" compression {COMPRESSION}"
Expand All @@ -93,29 +92,28 @@ def build_init_cmd(time_mode: str) -> str:


def parse_script_args() -> None:
"""Sets global variables based on the environment variables provided in docker-compose
"""
"""Sets global variables based on the environment variables provided in docker-compose"""
# pylint: disable=global-statement
# pylint: disable=line-too-long
global USERNAME, APIKEY, SERVERNAME, COMPRESSION, STATS_PERIOD, KEEPALIVE, INIT_CMD_TIME, INIT_CMD_ARGS

# **** REQUIRED ****
USERNAME = os.getenv("FH_USERNAME")
APIKEY = os.getenv("FH_APIKEY")
USERNAME = os.environ["FH_USERNAME"]
APIKEY = os.environ["FH_APIKEY"]
# **** NOT REQUIRED ****
SERVERNAME = os.getenv("SERVER") # type: ignore
COMPRESSION = os.getenv("COMPRESSION") # type: ignore
STATS_PERIOD = int(os.getenv("PRINT_STATS_PERIOD")) # type: ignore
KEEPALIVE = int(os.getenv("KEEPALIVE")) # type: ignore
INIT_CMD_TIME = os.getenv("INIT_CMD_TIME") # type: ignore
SERVERNAME = os.environ["SERVER"]
COMPRESSION = os.environ["COMPRESSION"]
STATS_PERIOD = int(os.environ["PRINT_STATS_PERIOD"])
KEEPALIVE = int(os.environ["KEEPALIVE"])
INIT_CMD_TIME = os.environ["INIT_CMD_TIME"]
if INIT_CMD_TIME.split()[0] not in ["live", "pitr"]:
raise ValueError(f'$INIT_CMD_TIME value is invalid, should be "live" or "pitr <pitr>"')
INIT_CMD_ARGS = os.getenv("INIT_CMD_ARGS") # type: ignore
INIT_CMD_ARGS = os.environ["INIT_CMD_ARGS"]
for command in ["live", "pitr", "compression", "keepalive", "username", "password"]:
if command in INIT_CMD_ARGS.split():
raise ValueError(
f'$INIT_CMD_ARGS should not contain the "{command}" command. \
It belongs in its own variable.'
f'$INIT_CMD_ARGS should not contain the "{command}" command. '
"It belongs in its own variable."
)


Expand Down Expand Up @@ -158,13 +156,13 @@ async def print_stats(period: int) -> None:
total_bytes += bytes_read
if period_seconds:
print(
f"Period messages/s {lines_read / period_seconds:>5.0f}, \
period bytes/s {bytes_read / period_seconds:>5.0f}"
f"Period messages/s {lines_read / period_seconds:>5.0f}, "
f"period bytes/s {bytes_read / period_seconds:>5.0f}"
)
if total_seconds:
print(
f"Total messages/s {total_lines / total_seconds:>5.0f}, \
total bytes/s {total_bytes / total_seconds:>5.0f}"
f"Total messages/s {total_lines / total_seconds:>5.0f}, "
f"total bytes/s {total_bytes / total_seconds:>5.0f}"
)
if catchup_rate:
print(f"Total catchup rate: {catchup_rate:.2f}x")
Expand All @@ -175,8 +173,8 @@ async def print_stats(period: int) -> None:


async def read_firehose(time_mode: str) -> Optional[str]:
"""Open a connection to Firehose and read from it forever,
passing all messages along to all connected clients.
"""Open a connection to Firehose and read from it forever, passing all
messages along to our kafka queues.
Any errors will result in the function returning a string pitr value that
can be passed to the function on a future call to allow for a reconnection
Expand All @@ -192,7 +190,11 @@ async def read_firehose(time_mode: str) -> Optional[str]:

context = ssl.create_default_context()
context.minimum_version = ssl.TLSVersion.TLSv1_2
fh_reader, fh_writer = await open_connection(host=SERVERNAME, port=1501, ssl=context)
try:
fh_reader, fh_writer = await open_connection(host=SERVERNAME, port=1501, ssl=context)
except (AttributeError, OSError) as error:
print("Initial connection failed:", error)
return None
print(f"Opened connection to Firehose at {SERVERNAME}:1501")

initiation_command = build_init_cmd(time_mode)
Expand Down Expand Up @@ -233,11 +235,14 @@ def delivery_report(err, _):
key = message.get("id", "").encode() or None
try:
if message["type"] == "keepalive":
topics = [os.getenv("KAFKA_POSITION_TOPIC_NAME"), os.getenv("KAFKA_FLIFO_TOPIC_NAME")]
topics = [
os.environ["KAFKA_POSITION_TOPIC_NAME"],
os.environ["KAFKA_FLIFO_TOPIC_NAME"],
]
elif message["type"] == "position":
topics = [os.getenv("KAFKA_POSITION_TOPIC_NAME")]
topics = [os.environ["KAFKA_POSITION_TOPIC_NAME"]]
else:
topics = [os.getenv("KAFKA_FLIFO_TOPIC_NAME")]
topics = [os.environ["KAFKA_FLIFO_TOPIC_NAME"]]
for topic in topics:
producer.produce(
topic,
Expand All @@ -247,11 +252,12 @@ def delivery_report(err, _):
)
except BufferError as e:
print(f"Encountered full outgoing buffer, should resolve itself: {e}")
time.sleep(1)
except KafkaException as e:
if not e.args[0].retriable():
raise
print(
f"Encountered retriable kafka error (e.args[0].str()), "
f"Encountered retriable kafka error ({e.args[0].str()}), "
"waiting a moment and trying again"
)
time.sleep(1)
Expand All @@ -263,8 +269,7 @@ def delivery_report(err, _):


async def main():
"""Connect to Firehose and write the output to kafka
"""
"""Connect to Firehose and write the output to kafka"""
# pylint: disable=global-statement
global producer, stats_lock, finished, last_good_pitr

Expand Down Expand Up @@ -298,8 +303,8 @@ async def main():
errors += 1
else:
print(
f"Connection failed {CONNECTION_ERROR_LIMIT} \
times before getting a non-error message, quitting"
f"Connection failed {CONNECTION_ERROR_LIMIT} "
"times before getting a non-error message, quitting"
)
break

Expand Down
2 changes: 1 addition & 1 deletion connector/requirements/dev.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-c base.txt
mypy==0.740
black==19.10b0
black
pylint==2.4.4
19 changes: 7 additions & 12 deletions connector/requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,8 @@ astroid==2.3.3 \
--hash=sha256:71ea07f44df9568a75d0f354c49143a4575d90645e9fead6dfb52c26a85ed13a \
--hash=sha256:840947ebfa8b58f318d42301cf8c0a20fd794a33b61cc4638e28e9e61ba32f42 \
# via pylint
attrs==19.3.0 \
--hash=sha256:08a96c641c3a74e44eb59afb61a24f2cb9f4d7188748e76ba4bb5edfa3cb7d1c \
--hash=sha256:f7b7ce16570fe9965acd6d30101a28f62fb4a7f9e926b3bbc9b61f8b04247e72 \
# via black
black==19.10b0 \
--hash=sha256:1b30e59be925fafc1ee4565e5e08abef6b03fe455102883820fe5ee2e4734e0b \
--hash=sha256:c2edb73a08e9e0e6f65a0e6af18b059b8b1cdd5bef997d7a0b181df93dc81539 \
black==20.8b1 \
--hash=sha256:1c02557aa099101b9d21496f8a914e9ed2222ef70336404eeeac8edba836fbea \
# via -r dev.in
click==7.1.2 \
--hash=sha256:d2b5255c7c6349bc1bd1e59e08cd12acbbd63ce649f2588755783aa94dfb6b1a \
Expand Down Expand Up @@ -58,7 +53,7 @@ mccabe==0.6.1 \
mypy-extensions==0.4.3 \
--hash=sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d \
--hash=sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8 \
# via mypy
# via black, mypy
mypy==0.740 \
--hash=sha256:1521c186a3d200c399bd5573c828ea2db1362af7209b2adb1bb8532cea2fb36f \
--hash=sha256:31a046ab040a84a0fc38bc93694876398e62bc9f35eca8ccbf6418b7297f4c00 \
Expand Down Expand Up @@ -110,9 +105,9 @@ six==1.14.0 \
--hash=sha256:236bdbdce46e6e6a3d61a337c0f8b763ca1e8717c03b369e87a7ec7ce1319c0a \
--hash=sha256:8f3cd2e254d8f793e7f3d6d9df77b92252b52637291d0f0da013c76ea2724b6c \
# via astroid
toml==0.10.0 \
--hash=sha256:229f81c57791a41d65e399fc06bf0848bab550a9dfd5ed66df18ce5f05e73d5c \
--hash=sha256:235682dd292d5899d361a811df37e04a8828a5b1da3115886b73cf81ebc9100e \
toml==0.10.2 \
--hash=sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b \
--hash=sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f \
# via black
typed-ast==1.4.1 \
--hash=sha256:0666aa36131496aed8f7be0410ff974562ab7eeac11ef351def9ea6fa28f6355 \
Expand Down Expand Up @@ -141,7 +136,7 @@ typing-extensions==3.7.4.2 \
--hash=sha256:6e95524d8a547a91e08f404ae485bbb71962de46967e1b71a0cb89af24e761c5 \
--hash=sha256:79ee589a3caca649a9bfd2a8de4709837400dfa00b6cc81962a1e6a1815969ae \
--hash=sha256:f8d2bd89d25bc39dabe7d23df520442fa1d8969b82544370e03d88b5a591c392 \
# via mypy
# via black, mypy
wrapt==1.11.2 \
--hash=sha256:565a021fd19419476b9362b05eeaa094178de64f8361e44468f9e9d7843901e1 \
# via astroid
Loading

0 comments on commit d27ba38

Please sign in to comment.