-
Notifications
You must be signed in to change notification settings - Fork 1
/
lightkurve_ext.py
1860 lines (1466 loc) · 68.4 KB
/
lightkurve_ext.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Convenience helpers for `lightkurve` package.
"""
# so that TransitTimeSpec can be referenced in type annotation in the class itself
# see: https://stackoverflow.com/a/49872353
from __future__ import annotations
import itertools
import os
import logging
import math
import json
import re
import warnings
from collections import OrderedDict
from types import SimpleNamespace
from retry import retry
import astropy
from astropy.io import fits
from astropy import coordinates as coord
from astropy.coordinates import SkyCoord
from astropy.table import QTable, Table
from astropy.time import Time
import astropy.units as u
import numpy as np
from scipy.interpolate import UnivariateSpline
from IPython.display import display, HTML
import lightkurve as lk
import asyncio_compat
log = logging.getLogger(__name__)
def of_lcs(lc_coll, filter_func):
"""Filter a LightCurveCollection using the given filter_func.
Example
--------
Only retain TESS SPOC 2 minute cadence lightcurves
> of_lcs(lc_coll, lambda lc: lc.author == "SPOC")
"""
return type(lc_coll)([lc for lc in lc_coll if filter_func(lc)])
def of_sector(lcf_coll, sectorNum):
res_list = of_sectors(lcf_coll, sectorNum)
return res_list[0] if len(res_list) > 0 else None
def of_sectors(*args):
lk_coll_or_sr = args[0]
if lk_coll_or_sr is None:
return []
if len(args) == 1:
# when no sectors are specified, return entire collection
# For convenience: when a notebooks is modified such that
# a user sometimes use a subset of sectors , and sometimes everything
# the user can can still use of_sectors() wrapper regardless
return lk_coll_or_sr
sector_nums = args[1:]
if hasattr(lk_coll_or_sr, "sector"):
return lk_coll_or_sr[np.in1d(lk_coll_or_sr.sector, sector_nums)]
elif hasattr(lk_coll_or_sr, "table") and lk_coll_or_sr.table["sequence_number"] is not None:
res = lk.SearchResult(lk_coll_or_sr.table[np.in1d(lk_coll_or_sr.table["sequence_number"], sector_nums)])
res = _sort_chronologically(res)
return res
else:
raise TypeError(f"Unsupported type of collection: {type(lk_coll_or_sr)}")
def of_sector_n_around(lk_coll_or_sr, sector_num, num_additions=8):
def get_sector_for_lk_coll(lk_coll):
return lk_coll.sector
def get_sector_for_sr(sr):
return sr.table["sequence_number"]
sector_accessor_func = None
if hasattr(lk_coll_or_sr, "sector"):
sector_accessor_func = get_sector_for_lk_coll
elif hasattr(lk_coll_or_sr, "table") and lk_coll_or_sr.table["sequence_number"] is not None:
sector_accessor_func = get_sector_for_sr
else:
raise TypeError(f"Unsupported type of collection: {type(lk_coll_or_sr)}")
subset_slice = _get_slice_for_of_sector_n_around(
lk_coll_or_sr,
sector_accessor_func,
sector_num,
num_additions=num_additions,
)
if subset_slice is not None:
return lk_coll_or_sr[subset_slice]
else:
return type(lk_coll_or_sr)([])
def _get_slice_for_of_sector_n_around(coll, sector_accessor_func, sector_num, num_additions):
if sector_num not in sector_accessor_func(coll):
return None
idx = np.where(sector_accessor_func(coll) == sector_num)[0][0]
# if num_additions is odd number, we add one to older sector
start = max(idx - math.ceil(num_additions / 2), 0)
end = min(idx + math.floor(num_additions / 2) + 1, len(coll))
# case the start:end slice does not fill up the requested num_additions,
# try to fill it up
cur_slice_size = end - start - 1
if cur_slice_size < num_additions:
num_more_needed = num_additions - cur_slice_size
if start > 0:
start = max(start - num_more_needed, 0)
else:
end = min(end + num_more_needed, len(coll))
return slice(start, end)
def of_2min_cadences(lcf_coll):
"""Return LightCurveFiles of short, typically 2-minute cadence, only.
Primary use case is to filter out 20-second files.
"""
filtered = [lcf for lcf in lcf_coll if "short" == estimate_cadence_type(lcf)]
return lk.LightCurveCollection(filtered)
def estimate_cadence(lc, unit=None, round_unit_result=True):
"""Estimate the cadence of a lightcurve by returning the median of a sample"""
if isinstance(lc, lk.FoldedLightCurve):
time_vals = lc.time_original.value
time_vals = np.sort(time_vals)
else:
time_vals = lc.time.value
res = np.nanmedian(np.diff(time_vals[:100]))
if unit is not None:
res = (res * u.day).to(unit) # LATER: handle cases lc.time is not in days
if round_unit_result:
res = res.round()
return res
def map_cadence_type(cadence_in_days):
long_minimum = 9.9 / 60 / 24 # 10 minutes in days, with some margin of error
short_minimum = 0.9 / 60 / 24 # 1 minute in days, with some margin of error
if cadence_in_days is None:
return None
if cadence_in_days >= long_minimum:
return "long"
if cadence_in_days >= short_minimum:
return "short"
return "fast"
def estimate_cadence_type(lc):
"""Estimate the type of cadence to be one of long, short, or fast.
The definition is the same as ``exptime`` in `lightkurve.search_lightcurve()`.
"""
return map_cadence_type(estimate_cadence(lc))
def of_tic(lcf_coll, tic):
"""Return LightCurveFiles of the given TIC.
Useful in case the default MAST result returned nearby targets.
"""
filtered = [lcf for lcf in lcf_coll if lcf.meta.get("TICID", None) == tic]
return lk.LightCurveCollection(filtered)
def select(lcf_coll_or_sr, filter_func):
"""Filter the given LightCurveCollection or SearchResult with the filter_func."""
return type(lcf_coll_or_sr)([obj for obj in lcf_coll_or_sr if filter_func(obj)])
def exclude_range(lc, start, end, column="time"):
"""Exclude the specified range of time from the given lightcurve."""
tmask = (lc[column].value >= start) & (lc[column].value < end)
return lc[~tmask]
def get_obs_date_range(lcf_coll):
"""Return the observation date span and the number of days with observation."""
# the code assumes the time are in all in BTJD, or other consistent format in days
if isinstance(lcf_coll, lk.LightCurve):
lcf_coll = lk.LightCurveCollection([lcf_coll])
# to support folded lightcurve
time_colname = "time_original" if "time_original" in lcf_coll[0].colnames else "time"
t_start = lcf_coll[0][time_colname].min().value
t_end = lcf_coll[-1][time_colname].max().value
obs_span = t_end - t_start
obs_actual = len(set(np.concatenate([lc[time_colname].value.astype("int") for lc in lcf_coll])))
return obs_span, obs_actual
def estimate_object_radius_in_r_jupiter(lc, depth):
"""Return a back of envelope estimate of a companion object's radius."""
R_JUPITER_IN_R_SUN = 71492 / 695700
r_star = lc.meta.get("RADIUS") # assumed to be in R_sun
if r_star is None or r_star < 0 or depth <= 0:
return None # cannot estimate
r_obj = math.sqrt(r_star * r_star * depth)
r_obj_in_r_jupiter = r_obj / R_JUPITER_IN_R_SUN
return r_obj_in_r_jupiter
def _sort_chronologically(sr: lk.SearchResult):
# Resort the SearchResult rows, because
# lightkurve v2.4.2 does not honor mission (chronological)
# the workaround here is to resort with pre-v2.4.2 criteria
# Note: we must use a copy of the table
# because if the underlying table is actually a subset of the underlying table,
# the sorting seems to affect the underlying table in some cases,
# creating really weird / corrupted results.
if len(sr) < 1:
return sr
res = lk.SearchResult(sr.table.copy())
res.table.sort(["distance", "year", "mission", "sort_order", "exptime"])
return res
# BEGIN lightkurve search with retry
LK_SEARCH_NUM_RETRIES = 4
LK_DOWNLOAD_NUM_RETRIES = 2
@retry(IOError, tries=LK_SEARCH_NUM_RETRIES, delay=0.5, backoff=2, jitter=(0, 0.5))
def search_lightcurve(*args, **kwargs):
return lk.search_lightcurve(*args, **kwargs)
@retry(IOError, tries=LK_SEARCH_NUM_RETRIES, delay=0.5, backoff=2, jitter=(0, 0.5))
def search_targetpixelfile(*args, **kwargs):
return lk.search_targetpixelfile(*args, **kwargs)
@retry(IOError, tries=LK_SEARCH_NUM_RETRIES, delay=0.5, backoff=2, jitter=(0, 0.5))
def search_tesscut(*args, **kwargs):
return lk.search_tesscut(*args, **kwargs)
@retry(IOError, tries=LK_DOWNLOAD_NUM_RETRIES, delay=0.5, backoff=2, jitter=(0, 0.5))
def download_all(search_result, *args, **kwargs):
return search_result.download_all(*args, **kwargs)
# END lightkurve search with retry
def download_lightcurves_of_tic_with_priority(
tic, author_priority=["SPOC", "QLP", "TESS-SPOC"], download_filter_func=None, download_dir=None
):
"""For a given TIC, download lightcurves across all sectors.
For each sector, download one based on pre-set priority.
"""
sr_unfiltered = search_lightcurve(f"TIC{tic}", mission="TESS")
if len(sr_unfiltered) < 1:
print(f"WARNING: no result found for TIC {tic}")
return None, None, None
sr_unfiltered = sr_unfiltered[sr_unfiltered.target_name == str(tic)] # in case we get some other nearby TICs
sr_unfiltered = _sort_chronologically(sr_unfiltered)
# filter out HLSPs not supported by lightkurve yet
sr = sr_unfiltered[sr_unfiltered.author != "DIAMANTE"]
if len(sr) < len(sr_unfiltered):
print("Note: there are products not supported by Lightkurve, which are excluded from download.")
# for each sector, filter based on the given priority.
# - note: prefer QLP over TESS-SPOC because QLP is detrended, with multiple apertures within 1 file
sr = filter_by_priority(
sr,
author_priority=author_priority,
exptime_priority=["short", "long", "fast"],
)
num_filtered = len(sr_unfiltered) - len(sr)
num_fast = len(sr_unfiltered[sr_unfiltered.exptime < 60 * u.second])
if num_filtered > 0:
msg = f"{num_filtered} rows filtered"
if num_fast > 0:
msg = msg + f" ; {num_fast} fast (20secs) products."
print(msg)
with astropy.conf.set_temp("max_lines", -1):
display(sr)
# let caller to optionally further restrict a subset to be downloaded
sr_to_download = sr
if download_filter_func is not None:
sr_to_download = download_filter_func(sr)
sr_to_download = _sort_chronologically(sr_to_download)
if len(sr_to_download) < len(sr):
display(
HTML(
"""<font style="background-color: yellow;">Note</font>:
SearchResult is further filtered - only a subset will be downloaded."""
)
)
lcf_coll = download_all(sr_to_download, download_dir=download_dir)
print(f"TIC {tic} \t, all available sectors: {abbrev_sector_list(sr)}")
if lcf_coll is not None and len(lcf_coll) > 0:
print(f"downloaded #sectors: {len(lcf_coll)} ; {abbrev_sector_list(lcf_coll)}")
print(
(
f" sector {lcf_coll[-1].meta['SECTOR']}: \t"
f"camera = {lcf_coll[-1].meta['CAMERA']} ; ccd = {lcf_coll[-1].meta['CCD']}"
)
)
else:
print(f"TIC {tic}: no data")
return lcf_coll, sr, sr_unfiltered
def download_lightcurve(
target,
mission=("Kepler", "K2", "TESS"),
exptime="short",
author="SPOC",
download_dir=None,
use_cache="yes",
display_search_result=True,
):
"""
Wraps `lightkurve.search_lightcurve()` and the
subsequent `lightkurve.search.SearchResult.download_all()` calls,
with the option of caching, so that for a given search,
if the the result has been downloaded, the cache will be used.
The parameters all propagate to the underlying `search_lightcurvefile()`
and `download_all()` calls. The lone exception is `use_cache`.
Parameters
----------
use_cache : str, must be one of 'yes', or 'no'\n
OPEN: an option of 'fallback': cache will be used when offline.\n
OPEN: for now, actual download lightcurve cache will still be used if
available irrespective of the settings.
Returns
-------
collection : `~lightkurve.collections.Collection` object
Returns a `~lightkurve.collections.LightCurveCollection`
containing all lightcurve files that match the criteria
"""
if use_cache == "no":
return _search_and_cache(target, mission, exptime, author, download_dir, display_search_result)
if use_cache == "yes":
result_file_ids = _load_from_cache_if_any(target, mission, download_dir)
if result_file_ids is not None:
result_files = list(map(lambda e: f"{download_dir}/mastDownload/{e}", result_file_ids))
return lk.collections.LightCurveCollection(list(map(lambda f: lk.read(f), result_files)))
# else
return _search_and_cache(target, mission, exptime, author, download_dir, display_search_result)
# else
raise ValueError("invalid value for argument use_cache")
# Private helpers for `download_lightcurvefiles`
def _search_and_cache(target, mission, exptime, author, download_dir, display_search_result):
search_res = lk.search_lightcurve(target=target, mission=mission, exptime=exptime, author=author)
if len(search_res) < 1:
return None
if display_search_result:
_display_search_result(search_res)
_cache_search_result_product_identifiers(search_res, download_dir, target, mission)
return search_res.download_all(quality_bitmask="default", download_dir=download_dir)
def _display_search_result(search_res):
from IPython.core.display import display
tab = search_res.table
# move useful columns to the front
preferred_cols = ["proposal_id", "target_name", "sequence_number", "t_exptime"]
colnames_reordered = preferred_cols + [c for c in tab.colnames if c not in preferred_cols]
display(tab[colnames_reordered])
def _load_from_cache_if_any(target, mission, download_dir):
key = _get_cache_key(target, mission)
return _load_search_result_product_identifiers(download_dir, key)
def _cache_search_result_product_identifiers(search_res, download_dir, target, mission):
key = _get_cache_key(target, mission)
identifiers = _to_product_identifiers(search_res)
_save_search_result_product_identifiers(identifiers, download_dir, key)
return key
def _get_search_result_cache_dir(download_dir):
# TODO: handle download_dir is None (defaults)
cache_dir = f"{download_dir}/mastQueries"
if os.path.isdir(cache_dir):
return cache_dir
# else it doesn't exist, make a new cache directory
try:
os.mkdir(cache_dir)
# downloads locally if OS error occurs
except OSError:
log.warning(
"Warning: unable to create {}. "
"Cache MAST query results to the current "
"working directory instead.".format(cache_dir)
)
cache_dir = "."
return cache_dir
def _get_cache_key(target, mission):
# TODO: handle cases the generated key is not a valid filename
return f"{target}_{mission}_ids"
def _to_product_identifiers(search_res):
"""
Returns
-------
A list of str, constructed from `(obs_collection, obs_id, productFilename)` tuples, that can
identify cached lightcurve file,s if any.
"""
return list(
map(
lambda e: e["obs_collection"] + "/" + e["obs_id"] + "/" + e["productFilename"],
search_res.table,
)
)
def _save_search_result_product_identifiers(identifiers, download_dir, key):
resolved_cache_dir = _get_search_result_cache_dir(download_dir)
filepath = f"{resolved_cache_dir}/{key}.json"
fp = open(filepath, "w+")
json.dump(identifiers, fp)
return filepath
def _load_search_result_product_identifiers(download_dir, key):
resolved_cache_dir = _get_search_result_cache_dir(download_dir)
filepath = f"{resolved_cache_dir}/{key}.json"
try:
fp = open(filepath, "r")
return json.load(fp)
except OSError as err:
# errno == 2: file not found, typical case of cache miss
# errno != 2: unexpected error, log a warning
if err.errno != 2:
log.warning("Unexpected OSError in retrieving cached search result: {}".format(err))
return None
def filter_by_priority(
sr,
author_priority=["SPOC", "TESS-SPOC", "QLP"],
exptime_priority=["short", "long", "fast"],
):
if sr is None or len(sr) < 1:
return sr
author_sort_keys = {}
for idx, author in enumerate(author_priority):
author_sort_keys[author] = idx + 1
exptime_sort_keys = {}
for idx, exptime in enumerate(exptime_priority):
exptime_sort_keys[exptime] = idx + 1
def calc_filter_priority(row):
# Overall priority key is in the form of <author_key><exptime_key>, e.g., 101
# - "01" is the exptime_key
# - the leading "1" is the author_key, given it is the primary one
author_default = max(dict(author_sort_keys).values()) + 1
author_key = author_sort_keys.get(row["author"], author_default) * 100
# secondary priority
exptime_default = max(dict(exptime_sort_keys).values()) + 1
exptime_key = exptime_sort_keys.get(map_cadence_type(row["exptime"] / 60 / 60 / 24), exptime_default)
return author_key + exptime_key
sr.table["_filter_priority"] = [calc_filter_priority(r) for r in sr.table]
# A temporary table that sorts the table by the priority
sorted_t = sr.table.copy()
sorted_t.sort(["mission", "_filter_priority"])
# create an empty table for results, with the same set of columns
res_t = sr.table[np.zeros(len(sr), dtype=bool)].copy()
# for each mission (e.g., TESS Sector 01), select a row based on specified priority
# - select the first row given the table has been sorted by priority
uniq_missions = list(OrderedDict.fromkeys(sorted_t["mission"]))
for m in uniq_missions:
mission_t = sorted_t[sorted_t["mission"] == m]
# OPEN: if for a given mission, the only row available is not listed in the priorities,
# the logic still add a row to the result.
# We might want it to be an option specified by the user.
res_t.add_row(mission_t[0])
sr = lk.SearchResult(table=res_t)
sr = _sort_chronologically(sr)
return sr
# Download TPF asynchronously
def search_and_download_tpf(*args, **kwargs):
"""Search and Download a TPFs.
All parameters are passed on ``search_targetpixelfile()``,
with the exception of ``download_dir`` and ``quality_bitmask``,
which are passed to ``download_all()`
"""
# extract download_all() parameters
download_dir = kwargs.pop("download_dir", None)
quality_bitmask = kwargs.pop("quality_bitmask", None)
sr = search_targetpixelfile(*args, **kwargs) # pass the rest of the argument to search_targetpixelfile
sr = _sort_chronologically(sr)
tpf_coll = download_all(sr, download_dir=download_dir, quality_bitmask=quality_bitmask)
return tpf_coll, sr
def create_download_tpf_task(*args, **kwargs):
return asyncio_compat.create_background_task(search_and_download_tpf, *args, **kwargs)
#
# Other misc. extensions
#
def get_bkg_lightcurve(lcf):
"""Returns the background flux, i.e., ``SAP_BKG`` in the file"""
lc = lcf.copy()
lc["flux"] = lc["sap_bkg"]
lc["flux_err"] = lc["sap_bkg_err"]
lc.label = lc.label + " BKG"
return lc
def _do_create_quality_issues_mask(quality, flux, flags_included=0b0101001010111111):
"""Returns a boolean array which flags cadences with *issues*.
The default `flags_included` is a TESS default, based on
https://outerspace.stsci.edu/display/TESS/2.0+-+Data+Product+Overview#id-2.0DataProductOverview-Table:CadenceQualityFlags
"""
if np.issubdtype(quality.dtype, np.integer):
return np.logical_and(quality & flags_included, np.isfinite(flux))
else:
# quality column is not an integer, probably a non-standard product
return np.zeros_like(quality, dtype=bool)
def create_quality_issues_mask(lc, flags_included=0b0101001010111111):
"""Returns a boolean array which flags cadences with *issues*.
The default `flags_included` is a TESS default, based on
https://outerspace.stsci.edu/display/TESS/2.0+-+Data+Product+Overview#id-2.0DataProductOverview-Table:CadenceQualityFlags
"""
# use sap_flux when available (it may not be there in some HLSP)
# we prefer sap_flux over pdcsap_flux as
# pdcsap_flux is more likely to be NaN (due to exclusion by quality flags)
if "sap_flux" in lc.colnames:
flux = lc["sap_flux"]
else:
flux = lc.flux
return _do_create_quality_issues_mask(lc.quality, flux)
def _get_n_truncate_fits_data(lc, before, after, return_columns, return_mask=False):
with fits.open(lc.filename) as hdu:
time = hdu[1].data["TIME"]
mask = (time >= before) & (time < after)
res = dict()
for col in return_columns:
res[col] = hdu[1].data[col][mask]
if return_mask:
return res, mask
else:
return res
def list_times_w_quality_issues(lc, include_excluded_cadences=False):
if not include_excluded_cadences:
mask = create_quality_issues_mask(lc)
return lc.time[mask], lc.quality[mask]
else:
# case we want cadences that have been excluded in the lc object
# use the underlying fits file
flux_colname = lc.meta.get("FLUX_ORIGIN", "sap_flux")
if flux_colname == "pdcsap_flux":
flux_colname = "sap_flux" # pdcsap_flux would not have values in excluded cadences, defeating the purpose
# data is truncated if the given lc is truncated
# TODO: cadences excluded before lc.time.min() or after lc.time.max() will still be missing.
data = _get_n_truncate_fits_data(lc, lc.time.min().value, lc.time.max().value, ["time", "quality", flux_colname])
mask = _do_create_quality_issues_mask(data["quality"], data[flux_colname])
return data["time"][mask], data["quality"][mask]
def list_transit_times(t0, period, steps_or_num_transits=range(0, 10), return_string=False):
"""List the transit times based on the supplied transit parameters"""
if isinstance(steps_or_num_transits, int):
steps = range(0, steps_or_num_transits)
else:
steps = steps_or_num_transits
times = [t0 + period * i for i in steps]
if return_string:
return ",".join(map(str, times))
else:
return times
def get_segment_times_idx(times, break_tolerance=5):
"""Segment the input array of times into segments due to data gaps. Return the indices of the segments.
The minimal gap size is determined by `break_tolerance`.
The logic is adapted from `LightCurve.flatten`
"""
if hasattr(times, "value"): # convert astropy Time to raw values if needed
times = times.value
if len(times) < 1:
return (None, None)
dt = times[1:] - times[0:-1]
median_cadence = np.nanmedian(dt)
# print(f"TRACE median_cadence={median_cadence * 24 * 60} min, break_tolerance={break_tolerance}")
with warnings.catch_warnings(): # Ignore warnings due to NaNs
warnings.simplefilter("ignore", RuntimeWarning)
cut = np.where(dt > break_tolerance * median_cadence)[0] + 1
low = np.append([0], cut)
high = np.append(cut, len(times))
return (low, high)
def get_segment_times(times, **kwargs):
if hasattr(times, "value"): # convert astropy Time to raw values if needed
times = times.value
low, high = get_segment_times_idx(times, **kwargs)
if low is None: # edge case times is empty
return []
else: # normal case
# add a small 1e-10 to end so that the end time is exclusive (follow convention in range)
return [(times[lo], times[hi - 1] + 1e-10) for lo, hi in zip(low, high)]
def get_transit_times_in_range(t0, period, start, end):
t_start = t0 + math.ceil((start - t0) / period) * period
num_t = math.ceil((end - t_start) / period)
return [t_start + period * i for i in range(num_t)]
def get_transit_times_in_lc(lc, t0, period, return_string=False, **kwargs):
"""Get the transit times with observations of the given lightcurve, based on the supplied transit parameters.
The method will exclude the times where there is no observation due to data gaps.
"""
lc = lc.remove_nans() # exclude cadences with no flux.
# break up the times to exclude times in gap
times_list = get_segment_times(lc.time, **kwargs)
# print(f"TRACE times_list={times_list}")
transit_times = []
for start, end in times_list:
transit_times.extend(get_transit_times_in_range(t0, period, start, end))
if return_string:
return ",".join(map(str, transit_times))
else:
return transit_times
def _concatenate_list_of_lists(lists):
# https://stackoverflow.com/a/14807729
combined = list(itertools.chain.from_iterable(lists))
# equivalent to using list comprehension, but itertools is supposed to be faster
# combined = [item for sublist in lists for item in sublist]
return combined
def get_compatible_periods_of_2dips(
epoch1, epoch2, lcc: lk.LightCurveCollection, flux_column="flux", min_period=10, verbose=False
):
"""For the case where 2 dips are observed. Return the compatible periods that fit the observations.
Note: Parameter `lcc` accepts a `LightCurveCollection` instead of a `LightCurve`. This is because
the logic of segmenting lightcurve used internally is sensitive to the lightcurve cadence,
and could yield unexpected results if the cadence is mixed.
E.g., if users have 1 30-minute cadence LC and 2 2-min cadence LCs and stitches them together,
the median cadence determined would be 2 min.
The segmenting logic would then (by default) segmenting the lightcurve when there is
a 10 min gap (2min X 5). As a result, the 30-min cadence portion of the lightcurve will be be broken
up such that each cadence is its own segment.
Counting of transit times could miss some values due to rounding issues in those 1-cadence segments.
"""
num_cycles = 2
compat_p_list = []
while True:
trial_p = abs(epoch2 - epoch1) / num_cycles
if trial_p <= min_period:
break
trial_ttimes = _concatenate_list_of_lists(
[get_transit_times_in_lc(lc.select_flux(flux_column), epoch1, trial_p) for lc in lcc]
)
if len(trial_ttimes) <= 2: # assuming epoch1, epoch2 is always in the trial_ttimes.
compat_p_list.append(trial_p)
if verbose:
print(f" Period {trial_p} compatible. {trial_ttimes}")
else:
if verbose:
print(f" Period {trial_p} is incompatible. Has unexpected transits at times {trial_ttimes}")
num_cycles += 1
return compat_p_list
def get_compatible_periods_of_3dips(
epoch1, epoch2, epoch3, lcc: lk.LightCurveCollection, flux_column="flux", min_period=10, verbose=False
):
"""For the case where 2 dips are observed. Return the compatible periods that fit the observations.
Note: Parameter `lcc` accepts a `LightCurveCollection` instead of a `LightCurve`. This is because
the logic of segmenting lightcurve used internally is sensitive to the lightcurve cadence,
and could yield unexpected results if the cadence is mixed.
E.g., if users have 1 30-minute cadence LC and 2 2-min cadence LCs and stitches them together,
the median cadence determined would be 2 min.
The segmenting logic would then (by default) segmenting the lightcurve when there is
a 10 min gap (2min X 5). As a result, the 30-min cadence portion of the lightcurve will be be broken
up such that each cadence is its own segment.
Counting of transit times could miss some values due to rounding issues in those 1-cadence segments.
"""
num_cycles = 2
compat_p_list = []
while True:
trial_p = abs(epoch3 - epoch1) / num_cycles
if trial_p <= min_period:
break
# check if trial_p fits epoch2
num_cycles_for_epoch2 = round((epoch2 - epoch1) / trial_p)
epoch2_from_trial_p = epoch1 + trial_p * num_cycles_for_epoch2
epoch2_diff = abs(epoch2 - epoch2_from_trial_p)
tolerance = 0.05 # observed epoch2 needs to be within tolerance (in days) with predicted epoch 2 time
if epoch2_diff > tolerance:
if verbose:
print(f" Period {trial_p} is incompatible. Does not fit epoch2 (off by {epoch2_diff:.2f} days).")
num_cycles += 1
continue
trial_ttimes = _concatenate_list_of_lists(
[get_transit_times_in_lc(lc.select_flux(flux_column), epoch1, trial_p) for lc in lcc]
)
if len(trial_ttimes) <= 3: # assuming epoch1, epoch2, epoch3 is always in the trial_ttimes.
compat_p_list.append(trial_p)
if verbose:
print(f" Period {trial_p} compatible. {trial_ttimes}")
else:
if verbose:
print(f" Period {trial_p} is incompatible. Has unexpected transits at times {trial_ttimes}")
num_cycles += 1
return compat_p_list
class TransitTimeSpec(dict):
def __init__(
self,
epoch: float = None,
period: float = None,
duration_hr: float = None,
transit_depth_percent: float = None,
sector: int = None,
steps_to_show: list = None,
surround_time: float = None,
label: str = None,
defaults: TransitTimeSpec = None,
):
# core parameters
self["epoch"] = epoch
self["period"] = period
self["duration_hr"] = duration_hr
# depth is used occasionally, for typical plotting, it is not used
self["transit_depth_percent"] = transit_depth_percent
# used for plotting
self["sector"] = sector
self["steps_to_show"] = steps_to_show
self["surround_time"] = surround_time
self["label"] = label
if defaults is None:
defaults = {}
self._defaults = defaults # put it as a custom attribute
def __getitem__(self, key):
res = super().get(key)
if res is None:
res = self._defaults.get(key)
return res
def get(self, key, default=None):
res = self.__getitem__(key)
if res is None:
res = default
return res
class TransitTimeSpecList(list):
def __init__(self, *tt_spec_dict_list, defaults={}):
self._defaults = TransitTimeSpec(**defaults)
for tt_spec_dict in tt_spec_dict_list:
self.append(TransitTimeSpec(**tt_spec_dict, defaults=self._defaults))
def _spec_property_values(self, property_name):
return np.array([tt[property_name] for tt in self])
#
# The following properties return the specific transit parameters
# in an array. Together they can be used to create a mask
# for the transits using ``LightCurve.create_transit_mask()``
#
@property
def epoch(self):
return self._spec_property_values("epoch")
@property
def period(self):
return self._spec_property_values("period")
@property
def duration_hr(self):
return self._spec_property_values("duration_hr")
@property
def duration(self):
return self.duration_hr / 24
@property
def transit_depth_percent(self):
return self._spec_property_values("transit_depth_percent")
@property
def label(self):
return self._spec_property_values("label")
def to_table(self, columns=("label", "epoch", "duration_hr", "period")):
"""Convert the specs to an ``astropy.Table``"""
data = [getattr(self, col) for col in columns]
return Table(data, names=columns)
def stitch(lcf_coll, ignore_incompatible_column_warning=False, **kwargs):
"""Wrapper over native stitch(), and tweak the metadata so that it behaves like a typical single-sector lightcurve."""
def update_meta_if_exists_in(lc_src, keys):
for key in keys:
val = lc_src.meta.get(key, None)
if val is not None:
lc_stitched.meta[key] = val
def safe_del_meta(key):
if lc_stitched.meta.get(key, None) is not None:
del lc_stitched.meta[key]
if ignore_incompatible_column_warning:
with warnings.catch_warnings():
# suppress useless warning. Use cases: stitching QLP lightcurves with SPOC lightcurves (sap_flux is incompatible)
warnings.filterwarnings(
"ignore",
category=lk.LightkurveWarning,
message="The following columns will be excluded from stitching because the column types are incompatible:.*",
)
lc_stitched = lcf_coll.stitch(**kwargs)
else:
lc_stitched = lcf_coll.stitch(**kwargs)
# now update the metadata
lc_stitched.meta["STITCHED"] = True
# update observation start/stop dates
update_meta_if_exists_in(lcf_coll[0], ("TSTART", "DATE-OBS"))
update_meta_if_exists_in(lcf_coll[-1], ("TSTOP", "DATE-END"))
# TODO: recalculate TELAPSE, LIVETIME, DEADC (which is LIVETIME / TELAPSE)
safe_del_meta("FILENAME") # don't associate it with a file anymore
# record the sectors stitched and the associated metadata
sector_list = [lc.meta.get("SECTOR") for lc in lcf_coll if lc.meta.get("SECTOR") is not None]
meta_list = [lc.meta for lc in lcf_coll if lc.meta.get("SECTOR") is not None]
if len(sector_list) > 0:
lc_stitched.meta["SECTORS"] = sector_list
meta_dict = dict()
for sector, meta in zip(sector_list, meta_list):
meta_dict[sector] = meta.copy()
lc_stitched.meta["HEADERS_ORIGINAL"] = meta_dict
return lc_stitched
def to_window_length_for_2min_cadence(length_day):
"""Helper for LightCurve.flatten().
Return a `window_length` for the given number of days, assuming the data has 2-minute cadence."""
return to_window_length_for_cadence(length_day * u.day, 2 * u.min)
def to_window_length_for_cadence(length, cadence):
"""Helper for LightCurve.flatten().
Return a `window_length` for the given length and cadence.
Parameters length and cadence should be ~~astropy.quantity.Quantity~~.
If they are unitless number, they should be in the same unit.
"""
res = math.floor(length / cadence)
if res % 2 == 0:
res += 1 # savgol_filter window length must be odd number
return res
# detrend using spline
# Based on: https://github.com/barentsen/kepler-athenaeum-tutorial/blob/master/how-to-find-a-planet-tutorial.ipynb
def flatten_with_spline_normalized(lc, return_trend=False, **kwargs):
lc = lc.remove_nans()
spline = UnivariateSpline(lc.time, lc.flux, **kwargs)
trend = spline(lc.time)
# detrended = lc.flux - trend
detrended_relative = 100 * ((lc.flux / trend) - 1) + 100 # in percentage
lc_flattened = lc.copy()
lc_flattened.flux = detrended_relative
lc_flattened.flux_unit = "percent"
if not return_trend:
return lc_flattened
else:
lc_trend = lc.copy()
lc_trend.flux = trend
return (lc_flattened, lc_trend)
def _lksl_statistics(ts):
"""Compute LKSL Statistics of the given (time-series) values.
Useful to compare the noises in a folded lightcurve.
Based on https://arxiv.org/pdf/1901.00009.pdf , equation 4.
See https://www.aanda.org/articles/aa/pdf/2002/17/aa2208.pdf for more information.
"""
ts = ts[~np.isnan(ts)]
vector_length_sq_sum = np.square(np.diff(ts)).sum()
if len(ts) > 2:
# to fully utilize the data by including the vector length between the last and first measurement
# section 2 of Clarke, 2002
vector_length_sq_sum += np.square(ts[-1] - ts[0])
diff_from_mean_sq_sum = np.square(ts - np.mean(ts)).sum()
if diff_from_mean_sq_sum == 0: # to avoid boundary case that'd cause division by zero
diff_from_mean_sq_sum = 1e-10
return (vector_length_sq_sum / diff_from_mean_sq_sum) * (len(ts) - 1) / (2 * len(ts))
def lksl_statistics(lc, column="flux"):
return _lksl_statistics(lc[column].value)
#
# TODO: util to estimate transit depth
#
def estimate_snr(
lc,
signal_depth,
signal_duration,
num_signals,
savgol_to_transit_window_ratio=4,