diff --git a/.github/workflows/todo.yml b/.github/workflows/todo.yml index af970b4..af2e465 100644 --- a/.github/workflows/todo.yml +++ b/.github/workflows/todo.yml @@ -1,6 +1,8 @@ name: "Run TODO to Issue" on: push: + branches: + - "master" workflow_dispatch: inputs: MANUAL_COMMIT_REF: diff --git a/data/classifications.tsv b/data/classifications.tsv deleted file mode 100644 index a58acb1..0000000 --- a/data/classifications.tsv +++ /dev/null @@ -1,2 +0,0 @@ -start end encounter_ids classifications -2016-12-21T00:49:30 2016-12-21T00:50:30 ['9182'] [[0.8753612041473389], [0.746759295463562], [0.26265254616737366], [0.45787951350212097], [0.35406064987182617], [0.42348742485046387], [0.4947870969772339], [0.7287474274635315], [0.7099379897117615], [0.2122703194618225], [0.044488538056612015], [0.00849922839552164], [0.024390267208218575], [0.33750119805336], [0.6530888080596924], [0.3057247996330261], [0.1243574470281601], [0.027093390002846718], [0.011367958970367908], [0.004032353404909372], [0.026372192427515984], [0.021978065371513367], [0.006407670211046934], [0.5405446887016296], [0.34207114577293396], [0.6080849766731262], [0.5394770503044128], [0.3662146031856537], [0.16772609949111938], [0.3641503155231476], [0.060217034071683884], [0.008764371275901794], [0.012523961253464222], [0.009186000563204288], [0.022050702944397926], [0.3908870816230774], [0.15179167687892914], [0.3454047441482544], [0.4770602285861969], [0.07589100301265717], [0.5439115166664124], [0.8634722232818604], [0.985602617263794], [0.3311924636363983], [0.8832067847251892], [0.6166273951530457], [0.42301759123802185], [0.03573732450604439], [0.09752023965120316], [0.01426385436207056], [0.022987568750977516], [0.012294118292629719], [0.010207954794168472], [0.00296270614489913]] diff --git a/data/output.txt b/data/output.txt deleted file mode 100644 index de54fc9..0000000 --- a/data/output.txt +++ /dev/null @@ -1,3 +0,0 @@ -{'encounter_id': '11486', 'latitude': 36.91, 'longitude': -122.02, 'displayImgUrl': 'https://au-hw-media-m.happywhale.com/c5522187-058e-4a1a-83d7-893560ba6b2c.jpg', 'audio': array([], dtype=float32), 'start': Timestamp('2016-12-21 00:20:30'), 'end': Timestamp('2016-12-21 00:21:30'), 'classifications': []} -{'encounter_id': '9182', 'latitude': 36.91, 'longitude': -122.02, 'displayImgUrl': 'https://au-hw-media-m.happywhale.com/d40b9e6e-07cf-4f20-8cb4-4042ba22a00b.jpg', 'audio': array([-0.00352275, -0.00346267, -0.00334585, ..., -0.00339496, - -0.00333035, -0.00329852], dtype=float32), 'start': Timestamp('2016-12-21 00:49:30'), 'end': Timestamp('2016-12-21 00:50:30'), 'classifications': [[0.8753612041473389], [0.746759295463562], [0.26265254616737366], [0.45787951350212097], [0.35406064987182617], [0.42348742485046387], [0.4947870969772339], [0.7287474274635315], [0.7099379897117615], [0.2122703194618225], [0.044488538056612015], [0.00849922839552164], [0.024390267208218575], [0.33750119805336], [0.6530888080596924], [0.3057247996330261], [0.1243574470281601], [0.027093390002846718], [0.011367958970367908], [0.004032353404909372], [0.026372192427515984], [0.021978065371513367], [0.006407670211046934], [0.5405446887016296], [0.34207114577293396], [0.6080849766731262], [0.5394770503044128], [0.3662146031856537], [0.16772609949111938], [0.3641503155231476], [0.060217034071683884], [0.008764371275901794], [0.012523961253464222], [0.009186000563204288], [0.022050702944397926], [0.3908870816230774], [0.15179167687892914], [0.3454047441482544], [0.4770602285861969], [0.07589100301265717], [0.5439115166664124], [0.8634722232818604], [0.985602617263794], [0.3311924636363983], [0.8832067847251892], [0.6166273951530457], [0.42301759123802185], [0.03573732450604439], [0.09752023965120316], [0.01426385436207056], [0.022987568750977516], [0.012294118292629719], [0.010207954794168472], [0.00296270614489913]]} diff --git a/data/pipeline-diagram.json b/data/pipeline-diagram.json deleted file mode 100644 index 247a514..0000000 --- a/data/pipeline-diagram.json +++ /dev/null @@ -1 +0,0 @@ -[{"state":{"code":"flowchart TD\n A[fa:fa-calendar Inputs] --> B(fa:fa-globe Geometry Search)\n A --> C(fa:fa-music Retrive Audio)\n B --> C\n C --> E(fa:fa-filter Filter Frequency)\n E --> F(fa:fa-bullseye Classify Audio)\n F --> G(fa:fa-tag Postprocess Labels)\n G --> H[fa:fa-fish Output]\n B --> H \n ","mermaid":"{\n \"theme\": \"default\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487,"editorMode":"code"},"time":1726314834573,"type":"auto","id":"80a09c2c-1234-44b0-9d16-aaaae58962ff","name":"yellow-nigeria"},{"state":{"code":"flowchart TD\n A[fa:fa-calendar Inputs] --> B(fa:fa-globe Geometry Search)\n A --> C(fa:fa-music Retrive Audio)\n B --> C\n C --> E(fa:fa-filter Filter Frequency)\n E --> F(fa:fa-tag Classify Audio)\n F --> G(fa:fa-map Postprocess Labels)\n G --> H[fa:fa-fish Output]\n B --> H \n ","mermaid":"{\n \"theme\": \"default\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487,"editorMode":"code"},"time":1726314774573,"type":"auto","id":"f17806f0-86f7-49d8-9cf7-a60e91cdaf16","name":"puny-lunch"},{"state":{"code":"flowchart TD\n A[fa:fa-calendar Inputs] --> B(fa:fa-globe Geometry Search)\n A --> C(fa:fa-music Retrive Audio)\n B --> C\n C --> E(fa:fa-filter Filter Frequency)\n E --> F(fa:fa-tag Classify Audio)\n F --> G(Postprocess Labels)\n G --> H[fa:fa-fish Output]\n B --> H \n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487,"editorMode":"code"},"time":1726314714578,"type":"auto","id":"abe11adf-0ebf-40f3-a97c-b82d77455145","name":"rich-apple"},{"state":{"code":"flowchart TD\n A[fa:fa-calendar Inputs] --> B(fa:fa-globe Geometry Search)\n A --> C(fa:fa-music Retrive Audio)\n B --> C\n C --> E(fa:fa-gate Filter Frequency)\n E --> F(Classify Audio)\n F --> G(Postprocess Labels)\n G --> H[fa:fa-fish Output]\n B --> H \n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487,"editorMode":"code"},"time":1726314654572,"type":"auto","id":"6ac55d30-8244-4a7d-9928-16699958b045","name":"obnoxious-woman"},{"state":{"code":"flowchart TD\n A[fa:fa-calendar Inputs] --> B(Geometry Search)\n A --> C(Retrive Audio)\n B --> C\n C --> E(Filter Frequency)\n E --> F(Classify Audio)\n F --> G(Postprocess Labels)\n G --> H[fa:fa-fish Output]\n B --> H\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487},"time":1726174988485,"type":"auto","id":"f501c97c-00e0-4f34-8be6-758b3bf36e97","name":"quaint-cpu"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Retrive Audio)\n B --> C\n C --> E(Filter Frequency)\n E --> F(Classify Audio)\n F --> G(Postprocess Labels)\n G --> H[fa:fa-fish Output]\n B --> H\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487},"time":1726174928166,"type":"auto","id":"9afa49db-e080-493d-b3e8-80d0442cf901","name":"embarrassed-evening"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Retrive Audio)\n B --> C\n C --> E(Filter Frequency)\n E --> F(Classify Audio)\n F --> G(Postprocess Clips)\n B --> H[Output]\n G --> H\n C -->|Three| F[fa:fa-fish Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487},"time":1726174868110,"type":"auto","id":"c5406a00-e37a-4988-9ac3-2cae3ad529d0","name":"moldy-branch"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Retrive Audio)\n B --> C\n C --> E(Filter Frequency)\n E --> F(Classify Audio)\n F --> G(Postprocess Clips)\n B --> H[Output]\n G --> H\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487},"time":1726174808113,"type":"auto","id":"94befebd-1740-4eea-9add-b3a4d456683a","name":"strong-father"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Retrive Audio)\n B --> C\n C --> E(Filter Frequency)\n E --> F(Classify Audio)\n B --> H[Output]\n G --> H\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487},"time":1726174748112,"type":"auto","id":"8ddd6b83-8967-428d-a920-b07da7525483","name":"sour-zettabyte"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> C\n C --> E(Frequency Detection)\n E --> F(Audio Classification)\n F --> G(Audio Clips)\n B --> H[Output]\n G --> H\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487},"time":1726172906224,"type":"auto","id":"fa11762a-401e-49e3-a584-268598479e52","name":"loose-knife"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> C\n C --> E(Frequency Detection)\n E --> F(Audio Classification)\n F --> G(Audio Clips)\n B --> H[Output]\n G --> H\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":true,"pan":{"x":289.8924659991275,"y":57.030104204754096},"zoom":0.8936022236523117},"time":1726172846221,"type":"auto","id":"a9e1445d-a5cc-41f6-8e8e-9eeb7f0d02a2","name":"old-optician"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> C\n C --> E(Frequency Detection)\n E --> F(Audio Classification)\n F --> G(Audio Clips)\n B --> H[Output]\n G --> H\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":true},"time":1726172786221,"type":"auto","id":"edf676d2-a9d6-428b-b974-1a7cf755574d","name":"abandoned-midnight"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> | windown aroudn sighting| C\n C --> E(Frequency Detection)\n E --> F(Audio Classification)\n F --> G(Audio Clips)\n B --> H[Output]\n G --> H\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172726225,"type":"auto","id":"adb79b5b-498b-4c87-8d42-6fa81df6b784","name":"rancid-nest"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n B --> D(Sightings)\n A --> C(Audio Retrival)\n D --> C\n C --> E(Frequency Detection)\n D --> E\n E --> F(Audio Classification)\n F --> G(Audio Clips)\n B --> H[Output]\n G --> H\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172666223,"type":"auto","id":"1e8e08bc-7a3b-4e73-856f-f719bd152473","name":"whining-gold"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> D(Sightings)\n D --> C\n C --> E(Frequency Detection)\n D --> E\n E --> F(Audio Classification)\n F --> G(Audio Clips)\n B --> H[Output]\n G --> H\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172606223,"type":"auto","id":"0c53c3f4-7124-4b76-94ad-4b6453505ccb","name":"blue-china"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> D(Sightings)\n D --> C\n C --> E(Frequency Detection)\n E --> F(Audio Classification)\n F --> G(Audio Clips)\n B --> H[Output]\n G --> H\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172546223,"type":"auto","id":"638c3efa-6445-47ec-bf72-ede14fb952f2","name":"happy-electrician"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> D(Sightings)\n D --> C\n C --> E(Frequency Detection)\n E --> F(Audio Classification)\n F --> G(Audio Clips)\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172486225,"type":"auto","id":"f46c484a-2050-4e3a-b935-96ace611778b","name":"cuddly-analyst"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> D(Sightings)\n D --> C\n C --> E(Frequency Detection)\n E --> F(Audio Classification)\n \n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172426228,"type":"auto","id":"434e7429-ecda-4555-aefa-9cb608cd2e00","name":"spicy-football"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> D(Sightings)\n D --> C\n C --> E(Frequency Detec)\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172366227,"type":"auto","id":"0a8f6c61-b028-4016-974d-043bb8a5e7ae","name":"hallowed-angle"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> D(Sightings)\n D --> C\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172306227,"type":"auto","id":"5afb754e-d9d3-45cc-9832-e3537ce6ba27","name":"bulky-librarian"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> D(Sightings)\n D --> E()\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172246229,"type":"auto","id":"3cd4968d-2edb-4df5-8971-2547148df82c","name":"jealous-furniture"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> \n C --> D[Laptop]\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172186227,"type":"auto","id":"0003b272-dcea-4339-bbbc-8de48cd1edcf","name":"kind-candle"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n C -->|One| D[Laptop]\n C -->|Two| E[iPhone]\n C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172126231,"type":"auto","id":"5078c8e0-2ba7-44fe-b6ab-86710238bdf3","name":"bashful-kitchen"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n C -->|One| D[Laptop]\n C -->|Two| E[iPhone]\n C -->|Three| F[fa:fa-car Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172066232,"type":"auto","id":"915bf687-f51e-4aed-a8a4-814aeffcd641","name":"hissing-eve"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Go shopping)\n B --> C{Let me think}\n C -->|One| D[Laptop]\n C -->|Two| E[iPhone]\n C -->|Three| F[fa:fa-car Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172006253,"type":"auto","id":"610fb017-4f91-4c57-8906-e0dcdcfe30ac","name":"drab-child"}] \ No newline at end of file diff --git a/docs/ladr/LADR_0005_persist_intermediate_outputs.md b/docs/ladr/LADR_0005_persist_intermediate_outputs.md new file mode 100644 index 0000000..5e6a267 --- /dev/null +++ b/docs/ladr/LADR_0005_persist_intermediate_outputs.md @@ -0,0 +1,137 @@ +# Intermediate stage outputs + +This doc discusses handling the intermediate outputs between stages. +Whether or not to store the output should be confirgurable for the end user, either via command line params or in [config.yaml](../../src/config/common.yaml) +We want to enable these storage options to support local adn cloud storage. +This means we need to consider costs and effeciency when designing output schemas. + +Some stages (like geo-search) require locally storing outputs, since the (unaltered) Happywhale API currently writes found encounters to file, and does not return a df. + +Other stages like audio retrival may make sense to keep stateless to avoid storage costs on our end. +Or storing only start and stop values for the audio, with a link to that day's data. + + +For during local development and debugging, having all intermediate data stored helps speed up iteration time. +Additionally, if data already exists for run on a particular date, the pipeline should skip these stages and load the outputs from the previous run. +While a productionized pipeline might only run once per geofile-date, and not need to store intermediate outputs, this decision should be left up to the end user. + +Another point to consider is that data-usage agreements with the audio proivders. +Make sure to read any agreements to ensure that storing intermediate outputs is allowed. + + + +Some questions to consider: +- Exactly what data should we preserve from each stage? Will this different from the output of the stage? +Ex. start/stop times of sifted audio, full classification arrays or pooled results. + +- How to handle overwrites or parallel writes? Parallel writes should never occur, since we find overlapping encounter ids, and group them together. Overwrites could occur if stage does not check if data exists for stage before writing. + + +- Do we have a true key throughout our entire dataflow? Do we need one? After geo-search, we could consider a concatenation of start, end, and encounter_id as key, though this might be misleading, if sifted audio changes the start and end times. + +## Stages +For every stage, I'll discuss what outputs to store, and how they should be written locally and in the cloud. + +### 1. Geo-search +#### Local storage +We are forced to store the outputs for this stage, since the API requires a file path to write the pandas df to. +This means, there is a chance of overwites when running locally, or on a persistant server. +Can however be solved by providing a temporary file location to the API, loading in the old data and the temporary outputs, then write to the final location. + +#### Cloud storage +This data is very structured, and could be stored in a database. +We should init and create a table in our project.dataset_id. +This can be the alternative to storing the temporary file to a more persistant final location. + + +### 2. Audio retrieval +We should likely not store the full outputs for this stage. +The data is open source and can be retrieved at any time, only costs to download. +The main argument for storing here would be if download costs were significantly higher than storage, i.e. on a persistant server. +Development and debugging are still easier with the data stored, so we also need to smartly design these outputs. + +#### Local storage +Writing to my local machine has been easy enough with np.save. +This makes the data easily accessible for listening to, which is extremely helpful when analysing plots. +For now, I'll assume this is good enough, and rather rely on the built-in teardown of the DataflowRunner to clean this data up if wronglly configured during cloud runs. + +#### Cloud storage +We could store the start, stop times of (and maybe url link to) the audio retrived for the found encounter ids in a table in our project.dataset_id. +This will can be beneficial if a user decides not to use any audio sifting. +Maybe add config option to allow storing full audio? +If stored, should be identified by a key (start, stop, encounter_id) to avoid overwrites, and stored in a bucket, not a table. + +### 3. Audio sifting +How much audio sift data should be persisted? +Full audio arrays with start, stop times and encounter ids? +Or just the start, stop times and encounter ids, assuming the audio will be downloaded and passed from the previous stage? +There is really no need to double storage here, but option should still be available. + +The main argument for storing the full audio arrays is that it speeds up iteration time, and allows for easier debugging. +We likely also want this data easily accessible, if our final outputs are going to contain the audio snippets with classifications. +That's kinda the whole point of this pipeline, so _some_ audio will eventually need to be be stored. +And its likely at this stage we will want to store it, since this audio is truncated. + +Will need to think about the key or unique identifier here, since there are a lot of parameters that can affect how much audio was truncated. essentially, all of these can be in the path, but that will make for extremely long paths. + +#### Local storage +Again, np.save is a good option for storing the audio arrays. + +#### Cloud storage +Similar as before, needs to be stored in a bucket. +Can maybe inherit same write method from previous stage, if we fingure out how to pass classes between stage local stages-files. + +### 4. Classification +After the audio has been fed through the model, we'll get an array shorter than the length of the audio array, but still arbitrry lengths. +Large context windows will produce large classification arrays, meaning high storage costs. +Are all of these data necessary to save, or would a pooled score be best? It depends on the use-case ;) + +We could alternatively cut the audio to only the parts where a min and max classification above a threshold is found. +This would eliminate any real dependency on audio sifting (in case that stage turns out to not be needed later). +And This would serve as the best waste-reduction strategy, since we would only store the audio that we are confident contains a whale call. + +#### Local storage +For now, let' stick to storing the entire clasasification array for this stage, using np.save, with similar paths to audio storage. + +#### Cloud storage +Since we are dealing with arbitrary lengths, I'd say stick to bucket with parameters as path variables. + + +### 5. Postprocessings (pooling and labelling) +The final stage definitely needs to be store, but the main discussion here becomes, what to store? +If we already have stored intermediate stages like sifted audio or truncated classified audio, we could avoid saving them again, and rather load from those tables when presenting aggregated results. + +Though, I like the idea of the last stage of the pipeline containing all the data necessary found through the pipeline. +This makes data sharing easier, with a concrete final product, instead of a bunch of fragmentated tables that need to be joined to have any true value. +Maybe storing the easily queryable data in a table, then include a link to the audio storage location (whether that be a file by me or MBARI or other hydrophone provider). + +#### Local storage +I'll assume a similar structure to the expected tobale when saving local. This means arrays data like audio and classifications are excluded. +This frees me up to store one entry per encounter id. +Paths to the audio and classification arrays can be stored in the table. + +#### Cloud storage +I feel like this table shuold always be written to (i.e. no config option to disable). +Outputs will include: +``` +encounter_id +longitude +latitude +start_time +stop_time +pooled_score +img_path +audio_path +classification_path +``` + +## Conclusion +- All stages (except last) will be configurable to save or not. +- File exists sensors should allow skipping a stage. +- Local storage or arrays w/ variable lengths with np.save, and paramter values in path. +- Structured data will be stored in tables with relevant links to array data + + +## Resources +- Interesting blog post comparing stateful DoFns to CombineFn: https://beam.apache.org/blog/stateful-processing/ +- Beam BigQuery walkthrough: https://beam.apache.org/documentation/io/built-in/google-bigquery/ diff --git a/examples/write_to_bigquery.py b/examples/write_to_bigquery.py new file mode 100644 index 0000000..4e152eb --- /dev/null +++ b/examples/write_to_bigquery.py @@ -0,0 +1,40 @@ + +from apache_beam.io.gcp.internal.clients import bigquery +import apache_beam as beam + + +# project-id:dataset_id.table_id +table_spec = 'bioacoustics-2024.whale_speech.sample_quotes' + + +table_schema = { + 'fields': [{ + 'name': 'source', 'type': 'STRING', 'mode': 'NULLABLE' + }, { + 'name': 'quote', 'type': 'STRING', 'mode': 'REQUIRED' + }] +} + +# Create a pipeline +temp_location = "gs://bioacoustics/whale-speech/temp" + +with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions()) as pipeline: + + quotes = pipeline | beam.Create([ + { + 'source': 'Mahatma Gandhi', 'quote': 'My life is my message.' + }, + { + 'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'." + }, + ]) + quotes | beam.io.WriteToBigQuery( + table_spec, + schema=table_schema, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + custom_gcs_temp_location=temp_location, + method=beam.io.WriteToBigQuery.Method.FILE_LOADS + ) + +print("Completed writing to BigQuery") \ No newline at end of file diff --git a/makefile b/makefile index beca38d..d7de1a3 100644 --- a/makefile +++ b/makefile @@ -1,5 +1,6 @@ local-run: bash scripts/kill_model_server.sh + python3 src/create_table.py python3 src/model_server.py & python3 src/pipeline.py bash scripts/kill_model_server.sh @@ -11,3 +12,6 @@ model-server: kill-model-server: bash scripts/kill_model_server.sh + +create-table: + python3 src/create_table.py \ No newline at end of file diff --git a/src/config/common.yaml b/src/config/common.yaml index 309cb01..4abb209 100644 --- a/src/config/common.yaml +++ b/src/config/common.yaml @@ -3,6 +3,11 @@ pipeline: verbose: true debug: true show_plots: false + is_local: false + + # gcp - bigquery + project: "bioacoustics-2024" + dataset_id: "whale_speech" input: start: "2016-12-21T00:30:00" @@ -57,16 +62,41 @@ pipeline: plot_scores: true plot_path_template: "data/plots/results/{year}/{month:02}/{plot_name}.png" classification_path: "data/classifications.tsv" - url: https://tfhub.dev/google/humpback_whale/1 - model_url: "http://127.0.0.1:5000/predict" + model_uri: https://tfhub.dev/google/humpback_whale/1 + inference_url: "http://127.0.0.1:5000/predict" med_filter_size: 3 postprocess: + confidence_threshold: 0.5 min_gap: 60 # 1 minute + output_path: "data/postprocess/output.json" pooling: "average" - confidence_threshold: 0.5 - output_path_template: "data/labels/{year}/{month:02}/{day:02}.csv" - - - - + postprocess_table_id: "mapped_audio" + postprocess_table_schema: + encounter_id: + type: 'STRING' + mode: 'REQUIRED' + latitude: + type: 'FLOAT64' + mode: 'REQUIRED' + longitude: + type: 'FLOAT64' + mode: 'REQUIRED' + start: + type: 'TIMESTAMP' + mode: 'REQUIRED' + end: + type: 'TIMESTAMP' + mode: 'REQUIRED' + pooled_score: + type: 'FLOAT64' + mode: 'REQUIRED' + img_path: + type: 'STRING' + mode: 'NULLABLE' + audio_path: + type: 'STRING' + mode: 'NULLABLE' + classification_path: + type: 'STRING' + mode: 'NULLABLE' diff --git a/src/config/local.yaml b/src/config/local.yaml index 050a51b..fe7ca78 100644 --- a/src/config/local.yaml +++ b/src/config/local.yaml @@ -3,6 +3,7 @@ pipeline: verbose: true debug: true show_plots: false + is_local: true search: export_template: "data/encounters/{filename}-{timeframe}.csv" diff --git a/src/create_table.py b/src/create_table.py new file mode 100644 index 0000000..0dea21f --- /dev/null +++ b/src/create_table.py @@ -0,0 +1,71 @@ +from google.cloud import bigquery +from google.api_core.exceptions import Conflict + +from config import load_pipeline_config + +config = load_pipeline_config() + +client = bigquery.Client() + + +# Define the table schema +schema = [ + { + "name": name, + "type": getattr(config.postprocess.postprocess_table_schema, name).type, + "mode": getattr(config.postprocess.postprocess_table_schema, name).mode + } + for name in vars(config.postprocess.postprocess_table_schema) +] + + +# Create a dataset +def create_dataset(dataset_id): + try: + dataset_path = f"{client.project}.{dataset_id}" + dataset = bigquery.Dataset(dataset_path) + dataset.location = "US" + dataset = client.create_dataset(dataset, timeout=30) + print(f"Created dataset {client.project}.{dataset.dataset_id}") + except Conflict as e: + if "Already Exists" in str(e): + dataset = client.get_dataset(dataset_id) + print(f"Dataset {client.project}.{dataset.dataset_id} already exists. Continuing.") + else: + raise e + + return dataset + + +# Create a table +def create_table(dataset_id, table_id, schema=schema): + try: + table_path = f"{client.project}.{dataset_id}.{table_id}" + table = bigquery.Table(table_path, schema=schema) + table = client.create_table(table) + print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}") + except Conflict as e: + if "Already Exists" in str(e): + table = client.get_table(table_path) + print(f"Table {table.project}.{table.dataset_id}.{table.table_id} already exists. Continuing.") + else: + raise e + + +def run(args): + dataset = create_dataset(args.dataset_id) + table = create_table(dataset.dataset_id, args.table_id) + return table + + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("--dataset_id", type=str, help="BigQuery dataset ID", default=config.general.dataset_id) + parser.add_argument("--table_id", type=str, help="BigQuery table ID", default=config.postprocess.postprocess_table_id) + + args = parser.parse_args() + + run(args) diff --git a/src/model_server.py b/src/model_server.py index efd06fb..d4b613d 100644 --- a/src/model_server.py +++ b/src/model_server.py @@ -8,12 +8,9 @@ from config import load_pipeline_config config = load_pipeline_config() - - # Load the TensorFlow model logging.info("Loading model...") -# model = hub.load("https://www.kaggle.com/models/google/humpback-whale/TensorFlow2/humpback-whale/1") -model = hub.load("https://tfhub.dev/google/humpback_whale/1") +model = hub.load(config.classify.model_uri) score_fn = model.signatures["score"] logging.info("Model loaded.") diff --git a/src/pipeline.py b/src/pipeline.py index 51b8f2b..db25baa 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -5,7 +5,9 @@ from stages.audio import RetrieveAudio, WriteAudio, WriteSiftedAudio from stages.sift import Butterworth from stages.classify import WhaleClassifier, WriteClassifications -from stages.postprocess import PostprocessLabels +from stages.postprocess import PostprocessLabels, WritePostprocess + +from apache_beam.io.gcp.internal.clients import bigquery from config import load_pipeline_config @@ -13,7 +15,11 @@ def run(): # Initialize pipeline options - pipeline_options = PipelineOptions() + pipeline_options = PipelineOptions( + # runner="DataflowRunner", + project="bioacoustics-2024", + temp_location="gs://bioacoustics/whale-speech/temp", + ) pipeline_options.view_as(SetupOptions).save_main_session = True args = { "start": config.input.start, @@ -35,10 +41,7 @@ def run(): audio_output | "Store Audio (temp)" >> beam.ParDo(WriteAudio()) sifted_audio | "Store Sifted Audio" >> beam.ParDo(WriteSiftedAudio("butterworth")) classifications | "Store Classifications" >> beam.ParDo(WriteClassifications(config)) - postprocess_labels | "Write Results" >> beam.io.WriteToText("data/output.txt", shard_name_template="") - - # Output results - # postprocessed_labels | "Write Results" >> beam.io.WriteToText("output.txt") + postprocess_labels | "Write to BigQuery" >> beam.ParDo(WritePostprocess(config)) if __name__ == "__main__": diff --git a/src/stages/classify.py b/src/stages/classify.py index 1fa5802..429a39d 100644 --- a/src/stages/classify.py +++ b/src/stages/classify.py @@ -11,6 +11,7 @@ import os import time import pandas as pd +import pickle import requests import math @@ -31,7 +32,7 @@ def __init__(self, config: SimpleNamespace): self.batch_duration = config.classify.batch_duration self.model_sample_rate = config.classify.model_sample_rate - self.model_url = config.classify.model_url + self.inference_url = config.classify.inference_url # plotting parameters self.hydrophone_sensitivity = config.classify.hydrophone_sensitivity @@ -103,6 +104,7 @@ def _plot_scores(self, scores, t=None): plt.ylabel('Model Score') plt.xlabel('Seconds') plt.xlim(0, len(t)) if t is not None else None + plt.title('Model Scores') if self.med_filter_size is not None: scores_int = [int(s[0]*1000) for s in scores] @@ -147,12 +149,55 @@ def _plot_spectrogram_scipy( plt.title(f'Calibrated spectrum levels, 16 {self.source_sample_rate / 1000.0} kHz data') return t, f, psd - def _plot_audio(self, audio, key): - plt.plot(audio) - plt.xlabel('Samples') - plt.xlim(0, len(audio)) - plt.ylabel('Energy') - plt.title(f'Raw audio signal for {key}') + def _plot_audio(self, audio, start, key): + # plt.plot(audio) + # plt.xlabel('Samples') + # plt.xlim(0, len(audio)) + # plt.ylabel('Energy') + # plt.title(f'Raw audio signal for {key}') + with open(f"data/plots/Butterworth/{start.year}/{start.month}/{start.day}/data/{key}_min_max.pkl", "rb") as f: + min_max_samples = pickle.load(f) + with open(f"data/plots/Butterworth/{start.year}/{start.month}/{start.day}/data/{key}_all.pkl", "rb") as f: + all_samples = pickle.load(f) + # plt.plot(audio) # TODO remove this if does not work properly + + def _plot_signal_detections(signal, min_max_detection_samples, all_samples): + # TODO refactor plot_signal_detections in classify + logging.info(f"Plotting signal detections: {min_max_detection_samples}") + + plt.plot(signal) + + # NOTE: for legend logic, plot min_max window first + if len(min_max_detection_samples): + # shade window that resulted in detection + plt.axvspan( + min_max_detection_samples[0], + min_max_detection_samples[-1], + alpha=0.3, + color='y', + zorder=7, # on top of all detections + ) + + if len(all_samples): + # shade window that resulted in detection + for detection in all_samples: + plt.axvspan( + detection - 512/2, # TODO replace w/ window size from config + detection + 512/2, + alpha=0.5, + color='r', + zorder=5, # on top of signal + ) + + plt.legend(['Input signal', 'detection window', 'all detections']).set_zorder(10) + plt.xlabel(f'Samples (seconds * {16000} Hz)') # TODO replace with sample rate from config + plt.ylabel('Amplitude (normalized and centered)') + + title = f"Signal detections: {start.strftime('%Y-%m-%d %H:%M:%S')}" + plt.title(title) + + + _plot_signal_detections(audio, min_max_samples, all_samples) def _plot(self, output): audio, start, end, encounter_ids, scores = output @@ -170,7 +215,8 @@ def _plot(self, output): # Plot spectrogram: plt.subplot(gs[0]) - self._plot_audio(audio, key) + # self._plot_audio(audio, key) + self._plot_audio(audio, start, key) # Plot spectrogram: plt.subplot(gs[1]) @@ -207,6 +253,8 @@ def expand(self, pcoll): if self.plot_scores: outputs | "Plot scores" >> beam.Map(self._plot) + + logging.info(f"Finished {self.name} stage: {outputs}") return outputs @@ -223,7 +271,7 @@ def _postprocess(self, pcoll, grouped_outputs): class InferenceClient(beam.DoFn): def __init__(self, config: SimpleNamespace): - self.model_url = config.classify.model_url + self.inference_url = config.classify.inference_url self.retries = config.classify.inference_retries def process(self, element): @@ -244,7 +292,7 @@ def process(self, element): wait = 0 while wait < 5: try: - response = requests.post(self.model_url, json=data) + response = requests.post(self.inference_url, json=data) response.raise_for_status() break except requests.exceptions.ConnectionError as e: @@ -253,14 +301,14 @@ def process(self, element): wait += 1 time.sleep(wait*wait) - response = requests.post(self.model_url, json=data) + response = requests.post(self.inference_url, json=data) response.raise_for_status() predictions = response.json().get("predictions", []) - logging.info(f"Received response:\n key: {key} predictions:{len(predictions)}") + logging.info(f"Inference response:\n key: {key} predictions:{len(predictions)}") - yield (key, predictions) + yield (key, predictions) # TODO fix mixing yield and return in DoFn class ListCombine(beam.CombineFn): @@ -300,6 +348,7 @@ def __init__(self, config: SimpleNamespace): def process(self, element): logging.info(f"Writing classifications to {self.classification_path}") + logging.debug(f"Received element: {element}") # skip if empty if self._is_empty(element): @@ -316,10 +365,8 @@ def process(self, element): classification_df = pd.concat([classification_df, row], axis=0, ignore_index=True) # drop duplicates - logging.info(f"Dropping duplicates from {len(classification_df)} rows") - logging.info(f"before: \n {classification_df}") + logging.debug(f"Dropping duplicates from {len(classification_df)} rows") classification_df = classification_df.drop_duplicates(subset=["start", "end"], keep="last") # , "encounter_ids" - logging.info(f"resulting df: \n {classification_df}") # write to file classification_df.to_csv(self.classification_path, sep='\t', index=False) @@ -328,8 +375,10 @@ def process(self, element): def _is_empty(self, element): + if len(element) == 0: + return True array, start, end, encounter_ids, classifications = element - logging.debug(f"Checking if classifications are empty for start {start.strftime('%Y-%m-%dT%H:%M:%S')}: {len(classifications)}") + logging.info(f"Checking if classifications are empty for start {start.strftime('%Y-%m-%dT%H:%M:%S')}: {len(classifications)}") return len(classifications) == 0 @@ -375,7 +424,7 @@ def sample_run(): batch_duration=30, # seconds hydrophone_sensitivity=-168.8, model_sample_rate=10_000, - model_url="http://127.0.0.1:5000/predict", + inference_url="http://127.0.0.1:5000/predict", plot_scores=True, plot_path_template="data/plots/results/{year}/{month:02}/{plot_name}.png", med_filter_size=3, diff --git a/src/stages/postprocess.py b/src/stages/postprocess.py index 885e7bb..5d863f7 100644 --- a/src/stages/postprocess.py +++ b/src/stages/postprocess.py @@ -1,21 +1,13 @@ import apache_beam as beam - -from datetime import datetime -from typing import Dict, Any, Tuple -from types import SimpleNamespace -from matplotlib import gridspec - -import librosa import logging import numpy as np -import os -import time import pandas as pd +import os -import requests -import math -import matplotlib.pyplot as plt -import scipy.signal +from apache_beam.io.gcp.internal.clients import bigquery + +from typing import Dict, Any, Tuple +from types import SimpleNamespace class PostprocessLabels(beam.DoFn): @@ -26,34 +18,167 @@ def __init__(self, config: SimpleNamespace): self.sifted_audio_path_template = config.sift.output_path_template self.classification_path = config.classify.classification_path + self.pooling = config.postprocess.pooling + self.project = config.general.project + self.dataset_id = config.general.dataset_id + self.table_id = config.postprocess.postprocess_table_id - def process(self, element: Dict[str, Any], search_output: Dict[str, Any]): - logging.info(f"element \n{element}") - logging.info(f"search_output \n{search_output}") - breakpoint() - classifications_df = pd.DataFrame([element], columns=["audio", "start", "end", "encounter_ids", "classifications"]) - classifications_df = classifications_df.explode("encounter_ids").rename(columns={"encounter_ids": "encounter_id"}) - classifications_df["encounter_id"] = classifications_df["encounter_id"].astype(str) - - # TODO pool classifications in postprocessing + def process(self, element, search_output): + # convert element to dataframe + classifications_df = self._build_classification_df(element) + + # clean up search_output dataframe + search_output_df = self._build_search_output_df(search_output) + + # join dataframes + joined_df = pd.merge(search_output_df, classifications_df, how="inner", on="encounter_id") + + # add paths + final_df = self._add_paths(joined_df) + + logging.info(f"Final output: \n{final_df.head()}") + logging.info(f"Final output columns: {final_df.columns}") + + yield final_df.to_dict(orient="records") + + def _build_classification_df(self, element: Tuple) -> pd.DataFrame: + # convert element to dataframe + df = pd.DataFrame([element], columns=["audio", "start", "end", "encounter_ids", "classifications"]) + df = df[df["classifications"].apply(lambda x: len(x) > 0)] # rm empty rows + + # explode encounter_ids + df = df.explode("encounter_ids").rename(columns={"encounter_ids": "encounter_id"}) + df["encounter_id"] = df["encounter_id"].astype(str) + + # pool classifications in postprocessing + df["pooled_score"] = df["classifications"].apply(self._pool_classifications) + # convert start and end to isoformat + df["start"] = df["start"].apply(lambda x: x.isoformat()) + df["end"] = df["end"].apply(lambda x: x.isoformat()) + # drop audio and classification columns + df = df.drop(columns=["audio"]) + df = df.drop(columns=["classifications"]) + + logging.info(f"Classifications: \n{df.head()}") + logging.info(f"Classifications shape: {df.shape}") + return df.reset_index(drop=True) + + def _build_search_output_df(self, search_output: Dict[str, Any]) -> pd.DataFrame: + # convert search_output to dataframe search_output = search_output.rename(columns={"id": "encounter_id"}) - search_output["encounter_id"] = search_output["encounter_id"].astype(str) # TODO do in one line + search_output["encounter_id"] = search_output["encounter_id"].astype(str) search_output = search_output[[ - # TODO refactor to confing "encounter_id", "latitude", "longitude", "displayImgUrl", # "species", # TODO add in geo search stage (require rm local file) ]] + logging.info(f"Search output: \n{search_output.head()}") + logging.info(f"Search output shape: {search_output.shape}") - # join dataframes - joined_df = pd.merge(search_output, classifications_df, how="inner", on="encounter_id") + return search_output + + def _pool_classifications(self, classifications: np.array) -> Dict[str, Any]: + if self.pooling == "mean" or self.pooling == "avg" or self.pooling == "average": + pooled_score = np.mean(classifications) + elif self.pooling == "max": + pooled_score = np.max(classifications) + elif self.pooling == "min": + pooled_score = np.min(classifications) + else: + raise ValueError(f"Pooling method {self.pooling} not supported.") + + return pooled_score + + def _add_paths(self, df: pd.DataFrame) -> pd.DataFrame: + df["audio_path"] = "NotImplemented" + df["classification_path"] = "NotImplemented" + df["img_path"] = df["displayImgUrl"] + df = df.drop(columns=["displayImgUrl"]) + return df + + +class WritePostprocess(beam.DoFn): + def __init__(self, config: SimpleNamespace): + self.config = config - logging.info(f"Final output: \n{joined_df.head()}") + self.is_local = config.general.is_local + self.output_path = config.postprocess.output_path + self.project = config.general.project + self.dataset_id = config.general.dataset_id + self.table_id = config.postprocess.postprocess_table_id + self.columns = list(vars(config.postprocess.postprocess_table_schema)) + self.schema = self._schema_to_dict(config.postprocess.postprocess_table_schema) + def process(self, element): + if len(element) == 0: + return + + if self.is_local: + return self._write_local(element) + else: + return self._write_gcp(element) + + def _schema_to_dict(self, schema): + return { + "fields": [ + { + "name": name, + "type": getattr(schema, name).type, + "mode": getattr(schema, name).mode + } + for name in vars(schema) + ] + } + + def _write_gcp(self, element): + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED + method=beam.io.WriteToBigQuery.Method.FILE_LOADS + custom_gcs_temp_location="gs://bioacoustics/whale-speech/temp" + + logging.info(f"Writing to BigQuery") + logging.info(f"Table: {self.table_id}") + logging.info(f"Dataset: {self.dataset_id}") + logging.info(f"Project: {self.project}") + logging.info(f"Schema: {self.schema}") + logging.info(f"Len element: {len(element)}") + logging.info(f"Element keys: {element[0].keys()}") + + element | "Write to BigQuery" >> beam.io.WriteToBigQuery( + self.table_id, + dataset=self.dataset_id, + project=self.project, + # "bioacoustics-2024.whale_speech.mapped_audio", + schema=self.schema, + write_disposition=write_disposition, + create_disposition=create_disposition, + method=method, + custom_gcs_temp_location=custom_gcs_temp_location + ) + + yield element + + def _write_local(self, element): + if os.path.exists(self.output_path): + stored_df = pd.read_json(self.output_path, orient="records") + + # convert encounter_id to str + stored_df["encounter_id"] = stored_df["encounter_id"].astype(str) + + else: + os.makedirs(os.path.dirname(self.output_path), exist_ok=True) + stored_df = pd.DataFrame([], columns=self.columns) + + element_df = pd.DataFrame(element, columns=self.columns) + final_df = pd.concat([stored_df, element_df], ignore_index=True) + final_df = final_df.drop_duplicates() + logging.debug(f"Appending df to {self.output_path} \n{final_df}") - return joined_df.to_dict(orient="records") + # store as json (hack: to remove \/\/ escapes) + final_df_json = final_df.to_json(orient="records").replace("\\/", "/") + print(final_df_json, file=open(self.output_path, "w")) diff --git a/src/stages/sift.py b/src/stages/sift.py index fdf21f9..58096d4 100644 --- a/src/stages/sift.py +++ b/src/stages/sift.py @@ -9,6 +9,7 @@ import matplotlib.pyplot as plt import numpy as np import os +import pickle from config import load_pipeline_config @@ -111,7 +112,8 @@ def _plot_signal_detections(self, pcoll, min_max_detections, all_detections, par signal = signal / np.max(signal) # normalize signal = signal - np.mean(signal) # center - plt.figure(figsize=(20, 10)) + # plt.figure(figsize=(20, 10)) + fig = plt.figure() plt.plot(signal) # NOTE: for legend logic, plot min_max window first @@ -146,6 +148,13 @@ def _plot_signal_detections(self, pcoll, min_max_detections, all_detections, par title += f"Encounters: {encounter_ids}" plt.title(title) plt.savefig(plot_path) + + # TODO remove hack to reuse sift figure later + with open(f"{plot_path.split(key)[0]}/data/{key}_min_max.pkl", 'wb') as handle: + pickle.dump(min_max_detection_samples, handle) + with open(f"{plot_path.split(key)[0]}/data/{key}_all.pkl", 'wb') as handle: + pickle.dump(all_detections[key], handle) + plt.show() if self.show_plots else plt.close() diff --git a/tests/test_classify.py b/tests/test_classify.py index 0ce0c27..d6908ff 100644 --- a/tests/test_classify.py +++ b/tests/test_classify.py @@ -20,7 +20,7 @@ def example_config(): batch_duration=30, # seconds hydrophone_sensitivity=-168.8, model_sample_rate=10_000, - model_url="http://127.0.0.1:5000/predict", + inference_url="http://127.0.0.1:5000/predict", plot_scores=True, plot_path_template="data/plots/results/{year}/{month:02}/{plot_name}.png", med_filter_size=3, diff --git a/tests/test_postprocess.py b/tests/test_postprocess.py new file mode 100644 index 0000000..6829f78 --- /dev/null +++ b/tests/test_postprocess.py @@ -0,0 +1,116 @@ +import pytest +import pandas as pd + +from datetime import datetime +from types import SimpleNamespace +from src.stages.postprocess import PostprocessLabels + +@pytest.fixture +def config(): + return SimpleNamespace( + search=SimpleNamespace(export_template="template"), + sift=SimpleNamespace(output_path_template="template"), + classify=SimpleNamespace(classification_path="path"), + postprocess=SimpleNamespace(pooling="mean", postprocess_table_id="table_id"), + general=SimpleNamespace(project="project", dataset_id="dataset_id") + ) + +@pytest.fixture +def element(): + return { + "audio": "audio", + "start": datetime(2024, 9, 10, 11, 12, 13), + "end": datetime(2024, 9, 10, 12, 13, 14), + "encounter_ids": ["a123", "b456"], + "classifications": [1, 2, 3] + } + +@pytest.fixture +def search_output(): + return pd.DataFrame({ + "id": ["a123", "b456", "c789"], + "latitude": [1.0, 2.0, 3.0], + "longitude": [1.1, 2.2, 3.3], + "displayImgUrl": ["example.com/a123", "example.com/b456", "example.com/c789"], + "extra_column": ["extra1", "extra2", "extra3"] + }) + + +def test_build_classification_df(config, element): + # Arrange + postprocess_labels = PostprocessLabels(config) + + expected = pd.DataFrame([ + { + "start": "2024-09-10T11:12:13", + "end": "2024-09-10T12:13:14", + "encounter_id": "a123", + "pooled_score": 2.0 + }, + { + "start": "2024-09-10T11:12:13", + "end": "2024-09-10T12:13:14", + "encounter_id": "b456", + "pooled_score": 2.0 + } + ]) + + # Act + actual = postprocess_labels._build_classification_df(element) + + # Assert + assert expected.equals(actual) + + +def test_build_search_output_df(config, search_output): + # Arrange + postprocess_labels = PostprocessLabels(config) + + expected = pd.DataFrame({ + "encounter_id": ["a123", "b456", "c789"], + "latitude": [1.0, 2.0, 3.0], + "longitude": [1.1, 2.2, 3.3], + "displayImgUrl": ["example.com/a123", "example.com/b456", "example.com/c789"], + }) + + # Act + actual = postprocess_labels._build_search_output_df(search_output) + + # Assert + assert expected.equals(actual) + + +def test_pool_classifications(config): + # Arrange + postprocess_labels = PostprocessLabels(config) + classifications = [1, 2, 3, 4] + + # Act + actual = postprocess_labels._pool_classifications(classifications) + + # Assert + assert actual == 2.5 # note only checks mean, update for more + + +def test_add_paths(config, search_output): + # Arrange + postprocess_labels = PostprocessLabels(config) + + expected = pd.DataFrame({ + # reusing same data as above for simplicity + "id": ["a123", "b456", "c789"], + "latitude": [1.0, 2.0, 3.0], + "longitude": [1.1, 2.2, 3.3], + "extra_column": ["extra1", "extra2", "extra3"], + + # added path columns + "audio_path": ["NotImplemented", "NotImplemented", "NotImplemented"], + "classification_path": ["NotImplemented", "NotImplemented", "NotImplemented"], + "img_path": ["example.com/a123", "example.com/b456", "example.com/c789"], + }) + + # Act + actual = postprocess_labels._add_paths(search_output) + + # Assert + assert expected.equals(actual)