diff --git a/twarc/client2.py b/twarc/client2.py index 7a76db1d..09889f0b 100644 --- a/twarc/client2.py +++ b/twarc/client2.py @@ -41,7 +41,6 @@ def __init__( access_token_secret=None, bearer_token=None, connection_errors=0, - http_errors=0, metadata=True, ): """ @@ -71,14 +70,11 @@ def __init__( Bearer Token, can be generated from API keys. connection_errors (int): Number of retries for GETs - http_errors (int): - Number of retries for sample stream. metadata (bool): Append `__twarc` metadata to results. """ self.api_version = "2" self.connection_errors = connection_errors - self.http_errors = http_errors self.metadata = metadata self.bearer_token = None @@ -144,7 +140,7 @@ def _search( made_call = time.monotonic() for response in self.get_paginated(url, params=params): - # can return without 'data' if there are no results + # can't return without 'data' if there are no results if "data" in response: count += len(response["data"]) yield response @@ -436,6 +432,7 @@ def lookup_batch(users): if batch: yield (lookup_batch(batch)) + @catch_chunked_encoding_error @requires_app_auth def sample(self, event=None, record_keepalive=False): """ @@ -458,70 +455,30 @@ def sample(self, event=None, record_keepalive=False): generator[dict]: a generator, dict for each tweet. """ url = "https://api.twitter.com/2/tweets/sample/stream" - errors = 0 - while True: - try: - log.info("Connecting to V2 sample stream") - resp = self.get(url, params=expansions.EVERYTHING.copy(), stream=True) - errors = 0 - for line in resp.iter_lines(chunk_size=512): - - # quit & close the stream if the event is set - if event and event.is_set(): - log.info("stopping sample") - resp.close() - return - - # return the JSON data w/ optional keep-alive - if not line: - log.info("keep-alive") - if record_keepalive: - yield "keep-alive" - continue - else: - data = json.loads(line.decode()) - if self.metadata: - data = _append_metadata(data, resp.url) - yield data - - # Check for an operational disconnect error in the response - if data.get("errors", []): - for error in data["errors"]: - if ( - error.get("disconnect_type") - == "OperationalDisconnect" - ): - log.info( - "Received operational disconnect message: " - "This stream has fallen too far behind in " - "processing tweets. Some data may have been " - "lost." - ) - # Sleep briefly, then break this get call and - # attempt to reconnect. - time.sleep(5) - break - - except requests.exceptions.RequestException as e: - errors += 1 - log.error("caught request error %s on %s try", e, errors) - - if self.http_errors and errors == self.http_errors: - log.warning("too many errors") - raise e - - if ( - isinstance(e, requests.exceptions.HTTPError) - and response.status_code == 420 - ): - if interruptible_sleep(errors * 60, event): - log.info("stopping filter") - return + log.info("Connecting to V2 sample stream") + resp = self.get(url, params=expansions.EVERYTHING.copy(), stream=True) + for line in resp.iter_lines(chunk_size=512): + + # quit & close the stream if the event is set + if event and event.is_set(): + log.info("stopping sample") + resp.close() + return + + # return the JSON data w/ optional keep-alive + if not line: + log.info("keep-alive") + if record_keepalive: + yield "keep-alive" + continue else: - if interruptible_sleep(errors * 5, event): - log.info("stopping filter") - return + data = json.loads(line.decode()) + if self.metadata: + data = _append_metadata(data, resp.url) + yield data + self._check_for_disconnect(data) + @requires_app_auth def add_stream_rules(self, rules): @@ -568,6 +525,7 @@ def delete_stream_rule_ids(self, rule_ids): url = "https://api.twitter.com/2/tweets/search/stream/rules" return self.post(url, {"delete": {"ids": rule_ids}}).json() + @catch_chunked_encoding_error @requires_app_auth def stream(self, event=None, record_keep_alives=False): """ @@ -610,6 +568,7 @@ def stream(self, event=None, record_keep_alives=False): data = _append_metadata(data, resp.url) yield data + self._check_for_disconnect(data) def _timeline( self, @@ -963,6 +922,17 @@ def id_exists(user): else: raise ValueError(f"No such user {user}") + + def _check_for_disconnect(self, data): + """ + Look for disconnect errors in a response, and reconnect if found. + """ + for error in data.get("errors", []): + if error.get("disconnect_type") == "OperationalDisconnect": + log.info("Received operational disconnect message, reconnecting") + self.connect() + break + def _ts(dt): """ @@ -1007,3 +977,4 @@ def _append_metadata(result, url): """ result["__twarc"] = {"url": url, "version": version, "retrieved_at": _utcnow()} return result + diff --git a/twarc/command2.py b/twarc/command2.py index 8d5e2181..e3b2c9c3 100644 --- a/twarc/command2.py +++ b/twarc/command2.py @@ -25,6 +25,7 @@ from click_config_file import configuration_option config_provider = ConfigProvider() +log = logging.getLogger("twarc") @with_plugins(iter_entry_points("twarc.plugins")) @@ -66,7 +67,7 @@ "higher with user authentication, but not all endpoints are supported.", show_default=True, ) -@click.option("--log", default="twarc.log") +@click.option("--log", "-l", "log_file", default="twarc.log") @click.option("--verbose", is_flag=True, default=False) @click.option( "--metadata/--no-metadata", @@ -85,7 +86,7 @@ def twarc2( access_token, access_token_secret, bearer_token, - log, + log_file, metadata, app_auth, verbose, @@ -94,12 +95,12 @@ def twarc2( Collect data from the Twitter V2 API. """ logging.basicConfig( - filename=log, + filename=log_file, level=logging.DEBUG if verbose else logging.INFO, format="%(asctime)s %(levelname)s %(message)s", ) - logging.info("using config %s", config_provider.file_path) + log.info("using config %s", config_provider.file_path) if bearer_token or (consumer_key and consumer_secret): if app_auth and (bearer_token or (consumer_key and consumer_secret)): @@ -152,11 +153,11 @@ def configure(ctx): """ config_file = config_provider.file_path - logging.info("creating config file: %s", config_file) + log.info("creating config file: %s", config_file) config_dir = pathlib.Path(config_file).parent if not config_dir.is_dir(): - logging.info("creating config directory: %s", config_dir) + log.info("creating config directory: %s", config_dir) config_dir.mkdir(parents=True) keys = handshake() @@ -255,6 +256,10 @@ def search( query, since_id, until_id, start_time, end_time, max_results ): _write(result, outfile) + + tweet_ids = [t['id'] for t in result.get("data", [])] + log.info("archived %s", ','.join(tweet_ids)) + count += len(result["data"]) if limit != 0 and count >= limit: break @@ -436,6 +441,8 @@ def sample(T, outfile, limit): if limit != 0 and count >= limit: event.set() _write(result, outfile) + if result: + log.info("archived %s", result["data"]["id"]) @twarc2.command("hydrate") @@ -449,6 +456,8 @@ def hydrate(T, infile, outfile): """ for result in T.tweet_lookup(infile): _write(result, outfile) + tweet_ids = [t["id"] for t in result.get("data", [])] + log.info("archived %s", ','.join(tweet_ids)) @twarc2.command("users") @@ -620,7 +629,7 @@ def timelines( line_count += 1 line = line.strip() if line == "": - logging.warn("skipping blank line on line %s", line_count) + log.warn("skipping blank line on line %s", line_count) continue users = None @@ -634,7 +643,7 @@ def timelines( if isinstance(json_data, str) and json_data: users = set([json_data]) else: - logging.warn( + log.warn( "ignored line %s which didn't contain users", line_count ) continue @@ -657,7 +666,7 @@ def timelines( # only process a given user once if user in seen: - logging.info("already processed %s, skipping", user) + log.info("already processed %s, skipping", user) continue seen.add(user) @@ -802,24 +811,24 @@ def f(): for conv_id in conv_ids: if conv_id in seen: - logging.info(f"already fetched conversation_id {conv_id}") + log.info(f"already fetched conversation_id {conv_id}") seen.add(conv_id) conv_count = 0 - logging.info(f"fetching conversation {conv_id}") + log.info(f"fetching conversation {conv_id}") for result in search(f"conversation_id:{conv_id}"): _write(result, outfile, False) count += len(result["data"]) if limit != 0 and count >= limit: - logging.info(f"reached tweet limit of {limit}") + log.info(f"reached tweet limit of {limit}") stop = True break conv_count += len(result["data"]) if conversation_limit != 0 and conv_count >= conversation_limit: - logging.info(f"reached conversation limit {conversation_limit}") + log.info(f"reached conversation limit {conversation_limit}") break @@ -867,9 +876,11 @@ def stream(T, outfile, limit): for result in T.stream(event=event): count += 1 if limit != 0 and count == limit: - logging.info(f"reached limit {limit}") + log.info(f"reached limit {limit}") event.set() _write(result, outfile) + if "data" in result: + log.info("archived %s", result["data"]["id"]) @twarc2.group() diff --git a/twarc/decorators.py b/twarc/decorators.py index cc9abf4f..88b73b21 100644 --- a/twarc/decorators.py +++ b/twarc/decorators.py @@ -245,3 +245,4 @@ def new_f(self, *args, **kwargs): return f(self, *args, **kwargs) return new_f + diff --git a/twarc/version.py b/twarc/version.py index cc2cb42c..125a4ada 100644 --- a/twarc/version.py +++ b/twarc/version.py @@ -1 +1 @@ -version = "2.3.3" +version = "2.3.4"