diff --git a/environment.yml b/environment.yml index d56824be..db0da63d 100755 --- a/environment.yml +++ b/environment.yml @@ -2,16 +2,16 @@ name: polyglotdb-dev channels: - conda-forge dependencies: + - openjdk=21 + - pip - librosa - - scipy - - praatio ~= 5.0 + - scipy<=1.12.0 + - praatio<=5.0 - textgrid - influxdb - tqdm - future - requests - - openjdk=11 - - pip + - neo4j-python-driver - pip: - - conch_sounds - - neo4j-driver ~= 4.3 + - conch-sounds diff --git a/polyglotdb/.DS_Store b/polyglotdb/.DS_Store deleted file mode 100755 index f5df3716..00000000 Binary files a/polyglotdb/.DS_Store and /dev/null differ diff --git a/polyglotdb/__init__.py b/polyglotdb/__init__.py index 3d2d7a5a..74c3b702 100755 --- a/polyglotdb/__init__.py +++ b/polyglotdb/__init__.py @@ -1,8 +1,3 @@ -__ver_major__ = 1 -__ver_minor__ = 2 -__ver_patch__ = 1 -__version__ = f"{__ver_major__}.{__ver_minor__}.{__ver_patch__}" - __all__ = ['query', 'io', 'corpus', 'config', 'exceptions', 'CorpusContext', 'CorpusConfig'] import polyglotdb.query.annotations as graph diff --git a/polyglotdb/acoustics/io.py b/polyglotdb/acoustics/io.py index 43b275f2..64c7ec13 100755 --- a/polyglotdb/acoustics/io.py +++ b/polyglotdb/acoustics/io.py @@ -145,10 +145,11 @@ def point_measures_from_csv(corpus_context, header_info, annotation_type="phone" import_path = 'file:///{}'.format(make_path_safe(path)) import_statement = ''' - USING PERIODIC COMMIT 2000 - LOAD CSV WITH HEADERS FROM "{path}" AS csvLine - MATCH (n:{annotation_type}:{corpus_name}) where n.id = csvLine.id - SET {new_properties}''' + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MATCH (n:{annotation_type}:{corpus_name}) WHERE n.id = csvLine.id + SET {new_properties} + }} IN TRANSACTIONS OF 2000 ROWS''' statement = import_statement.format(path=import_path, corpus_name=corpus_context.cypher_safe_name, @@ -159,7 +160,7 @@ def point_measures_from_csv(corpus_context, header_info, annotation_type="phone" if h == 'id': continue try: - corpus_context.execute_cypher('CREATE INDEX ON :%s(%s)' % (annotation_type, h)) + corpus_context.execute_cypher('CREATE INDEX FOR (n:%s) ON (n.%s)' % (annotation_type, h)) except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise diff --git a/polyglotdb/config.py b/polyglotdb/config.py index 2fae31da..15ff5c0d 100755 --- a/polyglotdb/config.py +++ b/polyglotdb/config.py @@ -2,7 +2,7 @@ import logging import configparser -CONFIG_DIR = os.path.expanduser('~/.pgdb') +CONFIG_DIR = os.environ.get('PGDB_HOME', os.path.expanduser('~/.pgdb')) BASE_DIR = os.path.join(CONFIG_DIR, 'data') diff --git a/polyglotdb/corpus/importable.py b/polyglotdb/corpus/importable.py index 38bf7b03..56b6d05d 100755 --- a/polyglotdb/corpus/importable.py +++ b/polyglotdb/corpus/importable.py @@ -52,13 +52,13 @@ def initialize_import(self, speakers, token_headers, subannotations=None): w.writeheader() def _corpus_index(tx): - tx.run('CREATE CONSTRAINT ON (node:Corpus) ASSERT node.name IS UNIQUE') + tx.run('CREATE CONSTRAINT FOR (node:Corpus) REQUIRE node.name IS UNIQUE') def _discourse_index(tx): - tx.run('CREATE INDEX ON :Discourse(name)') + tx.run('CREATE INDEX FOR (d:Discourse) ON (d.name)') def _speaker_index(tx): - tx.run('CREATE INDEX ON :Speaker(name)') + tx.run('CREATE INDEX FOR (s:Speaker) ON (s.name)') def _corpus_create(tx, corpus_name): tx.run('MERGE (n:Corpus {name: $corpus_name}) return n', corpus_name=corpus_name) diff --git a/polyglotdb/databases/neo4j.conf b/polyglotdb/databases/neo4j.conf index 4ac96c79..d4572397 100644 --- a/polyglotdb/databases/neo4j.conf +++ b/polyglotdb/databases/neo4j.conf @@ -32,8 +32,8 @@ dbms.security.auth_enabled=false # calculated based on available system resources. # Uncomment these lines to set specific initial and maximum # heap size. -dbms.memory.heap.initial_size=512m -dbms.memory.heap.max_size=512m +server.memory.heap.initial_size=512m +server.memory.heap.max_size=512m # The amount of memory to use for mapping the store files, in bytes (or # kilobytes with the 'k' suffix, megabytes with 'm' and gigabytes with 'g'). @@ -66,17 +66,17 @@ dbms.memory.heap.max_size=512m # individual advertised_address. # Bolt connector -dbms.connector.bolt.enabled=true +server.bolt.enabled=true #dbms.connector.bolt.tls_level=OPTIONAL -dbms.connector.bolt.listen_address=:{bolt_port} +server.bolt.listen_address=:{bolt_port} # HTTP Connector. There can be zero or one HTTP connectors. -dbms.connector.http.enabled=true -dbms.connector.http.listen_address=:{http_port} +server.http.enabled=true +server.http.listen_address=:{http_port} # HTTPS Connector. There can be zero or one HTTPS connectors. -dbms.connector.https.enabled=false -dbms.connector.https.listen_address=:{https_port} +server.https.enabled=false +server.https.listen_address=:{https_port} # Number of Neo4j worker threads. #dbms.threads.worker_count= @@ -255,7 +255,7 @@ dbms.security.allow_csv_import_from_file_urls=true #dbms.security.http_strict_transport_security= # Retention policy for transaction logs needed to perform recovery and backups. -dbms.tx_log.rotation.retention_policy=false +db.tx_log.rotation.retention_policy=false # Only allow read operations from this Neo4j instance. This mode still requires # write access to the directory for lock purposes. @@ -283,12 +283,8 @@ dbms.tx_log.rotation.retention_policy=false # G1GC generally strikes a good balance between throughput and tail # latency, without too much tuning. -dbms.jvm.additional=-XX:+UseG1GC - # Have common exceptions keep producing stack traces, so they can be # debugged regardless of how often logs are rotated. -dbms.jvm.additional=-XX:-OmitStackTraceInFastThrow - # Make sure that `initmemory` is not only allocated, but committed to # the process, before starting the database. This reduces memory # fragmentation, increasing the effectiveness of transparent huge @@ -296,18 +292,11 @@ dbms.jvm.additional=-XX:-OmitStackTraceInFastThrow # due to heap-growing GC events, where a decrease in available page # cache leads to an increase in mean IO response time. # Try reducing the heap memory, if this flag degrades performance. -dbms.jvm.additional=-XX:+AlwaysPreTouch - # Trust that non-static final fields are really final. # This allows more optimizations and improves overall performance. # NOTE: Disable this if you use embedded mode, or have extensions or dependencies that may use reflection or # serialization to change the value of final fields! -dbms.jvm.additional=-XX:+UnlockExperimentalVMOptions -dbms.jvm.additional=-XX:+TrustFinalNonStaticFields - # Disable explicit garbage collection, which is occasionally invoked by the JDK itself. -dbms.jvm.additional=-XX:+DisableExplicitGC - # Remote JMX monitoring, uncomment and adjust the following lines as needed. Absolute paths to jmx.access and # jmx.password files are required. # Also make sure to update the jmx.access and jmx.password files with appropriate permission roles and passwords, @@ -328,10 +317,23 @@ dbms.jvm.additional=-XX:+DisableExplicitGC # Expand Diffie Hellman (DH) key size from default 1024 to 2048 for DH-RSA cipher suites used in server TLS handshakes. # This is to protect the server from any potential passive eavesdropping. -dbms.jvm.additional=-Djdk.tls.ephemeralDHKeySize=2048 - # This mitigates a DDoS vector. -dbms.jvm.additional=-Djdk.tls.rejectClientInitiatedRenegotiation=true +#******************************************************************** +# Other Neo4j system properties +#******************************************************************** +server.jvm.additional=-XX:+UseG1GC +server.jvm.additional=-XX:-OmitStackTraceInFastThrow +server.jvm.additional=-XX:+AlwaysPreTouch +server.jvm.additional=-XX:+UnlockExperimentalVMOptions +server.jvm.additional=-XX:+TrustFinalNonStaticFields +server.jvm.additional=-XX:+DisableExplicitGC +server.jvm.additional=-Djdk.tls.ephemeralDHKeySize=2048 +server.jvm.additional=-Djdk.tls.rejectClientInitiatedRenegotiation=true +server.jvm.additional=-Dunsupported.dbms.udc.source=tarball +server.jvm.additional=--add-opens=java.base/java.nio=ALL-UNNAMED +server.jvm.additional=--add-opens=java.base/java.io=ALL-UNNAMED +server.jvm.additional=--add-opens=java.base/sun.nio.ch=ALL-UNNAMED +server.jvm.additional=-Dlog4j2.disable.jmx=true #******************************************************************** # Wrapper Windows NT/2000/XP Service Properties @@ -342,9 +344,4 @@ dbms.jvm.additional=-Djdk.tls.rejectClientInitiatedRenegotiation=true # service can then be reinstalled. # Name of the service -dbms.windows_service_name=neo4j - -#******************************************************************** -# Other Neo4j system properties -#******************************************************************** -dbms.jvm.additional=-Dunsupported.dbms.udc.source=tarball +server.windows_service_name=neo4j diff --git a/polyglotdb/io/.DS_Store b/polyglotdb/io/.DS_Store deleted file mode 100755 index 933d934e..00000000 Binary files a/polyglotdb/io/.DS_Store and /dev/null differ diff --git a/polyglotdb/io/importer/from_csv.py b/polyglotdb/io/importer/from_csv.py index ecce0b1b..b8e9e097 100755 --- a/polyglotdb/io/importer/from_csv.py +++ b/polyglotdb/io/importer/from_csv.py @@ -35,9 +35,8 @@ def import_type_csvs(corpus_context, type_headers): type_path = 'file:///site/proj/{}'.format(make_path_safe(path)) else: type_path = 'file:///{}'.format(make_path_safe(path)) - try: - corpus_context.execute_cypher('CREATE CONSTRAINT ON (node:%s_type) ASSERT node.id IS UNIQUE' % at) + corpus_context.execute_cypher('CREATE CONSTRAINT FOR (node:%s_type) REQUIRE node.id IS UNIQUE' % at) except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise @@ -48,14 +47,14 @@ def import_type_csvs(corpus_context, type_headers): if 'label' in h: properties.append('label_insensitive: toLower(csvLine.label)') try: - corpus_context.execute_cypher('CREATE INDEX ON :%s_type(label_insensitive)' % at) + corpus_context.execute_cypher('CREATE INDEX FOR (n:%s_type) ON (n.label_insensitive)' % at) except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise for x in h: if x != 'id': try: - corpus_context.execute_cypher('CREATE INDEX ON :%s_type(%s)' % (at, x)) + corpus_context.execute_cypher('CREATE INDEX FOR (n:%s_type) ON (n.%s)' % (at, x)) except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise @@ -63,13 +62,15 @@ def import_type_csvs(corpus_context, type_headers): type_prop_string = ', '.join(properties) else: type_prop_string = '' - type_import_statement = '''USING PERIODIC COMMIT 2000 -LOAD CSV WITH HEADERS FROM "{path}" AS csvLine -MERGE (n:{annotation_type}_type:{corpus_name} {{ {type_property_string} }}) + type_import_statement = ''' + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MERGE (n:{annotation_type}_type:{corpus_name} {{ {type_property_string} }}) + }} IN TRANSACTIONS OF 2000 ROWS ''' kwargs = {'path': type_path, 'annotation_type': at, - 'type_property_string': type_prop_string, - 'corpus_name': corpus_context.cypher_safe_name} + 'type_property_string': type_prop_string, + 'corpus_name': corpus_context.cypher_safe_name} statement = type_import_statement.format(**kwargs) log.info('Loading {} types...'.format(at)) begin = time.time() @@ -116,19 +117,19 @@ def import_csvs(corpus_context, speakers, token_headers, hierarchy, call_back=No statements = [] def _unique_function(tx, at): - tx.run('CREATE CONSTRAINT ON (node:%s) ASSERT node.id IS UNIQUE' % at) + tx.run('CREATE CONSTRAINT FOR (node:%s) REQUIRE node.id IS UNIQUE' % at) def _prop_index(tx, at, prop): - tx.run('CREATE INDEX ON :%s(%s)' % (at, prop)) + tx.run('CREATE INDEX FOR (n:%s) ON (n.%s)' % (at, prop)) def _label_index(tx, at): - tx.run('CREATE INDEX ON :%s(label_insensitive)' % at) + tx.run('CREATE INDEX FOR (n:%s) ON (n.label_insensitive)' % at) def _begin_index(tx, at): - tx.run('CREATE INDEX ON :%s(begin)' % (at,)) + tx.run('CREATE INDEX FOR (n:%s) ON (n.begin)' % at) def _end_index(tx, at): - tx.run('CREATE INDEX ON :%s(end)' % (at,)) + tx.run('CREATE INDEX FOR (n:%s) ON (n.end)' % at) with corpus_context.graph_driver.session() as session: for i, s in enumerate(speakers): @@ -176,47 +177,55 @@ def _end_index(tx, at): token_prop_string = ', ' + ', '.join(properties) else: token_prop_string = '' - node_import_statement = '''USING PERIODIC COMMIT 2000 - LOAD CSV WITH HEADERS FROM '{path}' AS csvLine - CREATE (t:{annotation_type}:{corpus_name}:speech {{id: csvLine.id, begin: toFloat(csvLine.begin), - end: toFloat(csvLine.end){token_property_string} }}) + node_import_statement = ''' + CALL {{ + LOAD CSV WITH HEADERS FROM '{path}' AS csvLine + CREATE (t:{annotation_type}:{corpus_name}:speech {{ + id: csvLine.id, + begin: toFloat(csvLine.begin), + end: toFloat(csvLine.end){token_property_string} + }}) + }} IN TRANSACTIONS OF 2000 ROWS ''' + node_kwargs = {'path': rel_path, 'annotation_type': at, 'token_property_string': token_prop_string, 'corpus_name': corpus_context.cypher_safe_name} if st is not None: - rel_import_statement = '''USING PERIODIC COMMIT 2000 - LOAD CSV WITH HEADERS FROM '{path}' AS csvLine - MATCH (n:{annotation_type}_type:{corpus_name} {{id: csvLine.type_id}}), (super:{stype}:{corpus_name} {{id: csvLine.{stype}}}), - (d:Discourse:{corpus_name} {{name: csvLine.discourse}}), - (s:Speaker:{corpus_name} {{name: csvLine.speaker}}), - (t:{annotation_type}:{corpus_name}:speech {{id: csvLine.id}}) - CREATE (t)-[:is_a]->(n), - (t)-[:contained_by]->(super), - (t)-[:spoken_in]->(d), - (t)-[:spoken_by]->(s) - WITH t, csvLine - MATCH (p:{annotation_type}:{corpus_name}:speech {{id: csvLine.previous_id}}) - CREATE (p)-[:precedes]->(t) - ''' + rel_import_statement = ''' + CALL{{ + LOAD CSV WITH HEADERS FROM '{path}' AS csvLine + MATCH (n:{annotation_type}_type:{corpus_name} {{id: csvLine.type_id}}), (super:{stype}:{corpus_name} {{id: csvLine.{stype}}}), + (d:Discourse:{corpus_name} {{name: csvLine.discourse}}), + (s:Speaker:{corpus_name} {{name: csvLine.speaker}}), + (t:{annotation_type}:{corpus_name}:speech {{id: csvLine.id}}) + CREATE (t)-[:is_a]->(n), + (t)-[:contained_by]->(super), + (t)-[:spoken_in]->(d), + (t)-[:spoken_by]->(s) + WITH t, csvLine + MATCH (p:{annotation_type}:{corpus_name}:speech {{id: csvLine.previous_id}}) + CREATE (p)-[:precedes]->(t) + }} IN TRANSACTIONS OF 2000 ROWS''' rel_kwargs = {'path': rel_path, 'annotation_type': at, 'corpus_name': corpus_context.cypher_safe_name, 'stype': st} else: - rel_import_statement = '''USING PERIODIC COMMIT 2000 - LOAD CSV WITH HEADERS FROM '{path}' AS csvLine - MATCH (n:{annotation_type}_type:{corpus_name} {{id: csvLine.type_id}}), - (d:Discourse:{corpus_name} {{name: csvLine.discourse}}), - (s:Speaker:{corpus_name} {{ name: csvLine.speaker}}), - (t:{annotation_type}:{corpus_name}:speech {{id: csvLine.id}}) - CREATE (t)-[:is_a]->(n), - (t)-[:spoken_in]->(d), - (t)-[:spoken_by]->(s) - WITH t, csvLine - MATCH (p:{annotation_type}:{corpus_name}:speech {{id: csvLine.previous_id}}) - CREATE (p)-[:precedes]->(t) - ''' + rel_import_statement = ''' + CALL{{ + LOAD CSV WITH HEADERS FROM '{path}' AS csvLine + MATCH (n:{annotation_type}_type:{corpus_name} {{id: csvLine.type_id}}), + (d:Discourse:{corpus_name} {{name: csvLine.discourse}}), + (s:Speaker:{corpus_name} {{ name: csvLine.speaker}}), + (t:{annotation_type}:{corpus_name}:speech {{id: csvLine.id}}) + CREATE (t)-[:is_a]->(n), + (t)-[:spoken_in]->(d), + (t)-[:spoken_by]->(s) + WITH t, csvLine + MATCH (p:{annotation_type}:{corpus_name}:speech {{id: csvLine.previous_id}}) + CREATE (p)-[:precedes]->(t) + }} IN TRANSACTIONS OF 2000 ROWS''' rel_kwargs = {'path': rel_path, 'annotation_type': at, 'corpus_name': corpus_context.cypher_safe_name} node_statement = node_import_statement.format(**node_kwargs) @@ -260,7 +269,7 @@ def _end_index(tx, at): for s in v: path = os.path.join(directory, '{}_{}_{}.csv'.format(re.sub(r'\W', '_', sp), k, s)) try: - corpus_context.execute_cypher('CREATE CONSTRAINT ON (node:%s) ASSERT node.id IS UNIQUE' % s) + corpus_context.execute_cypher('CREATE CONSTRAINT FOR (node:%s) REQUIRE node.id IS UNIQUE' % s) except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise @@ -270,12 +279,21 @@ def _end_index(tx, at): else: sub_path = 'file:///{}'.format(make_path_safe(path)) - rel_import_statement = '''USING PERIODIC COMMIT 1000 - LOAD CSV WITH HEADERS FROM "{path}" AS csvLine - MATCH (n:{annotation_type} {{id: csvLine.annotation_id}}) - CREATE (t:{subannotation_type}:{corpus_name}:speech {{id: csvLine.id, type: $subannotation_type, begin: toFloat(csvLine.begin), - end: toFloat(csvLine.end), label: CASE csvLine.label WHEN NULL THEN '' ELSE csvLine.label END }}) - CREATE (t)-[:annotates]->(n)''' + rel_import_statement = ''' + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MATCH (n:{annotation_type} {{id: csvLine.annotation_id}}) + CREATE (t:{subannotation_type}:{corpus_name}:speech {{ + id: csvLine.id, + type: $subannotation_type, + begin: toFloat(csvLine.begin), + end: toFloat(csvLine.end), + label: CASE csvLine.label WHEN NULL THEN '' ELSE csvLine.label END + }}) + CREATE (t)-[:annotates]->(n) + }} IN TRANSACTIONS OF 1000 ROWS + ''' + kwargs = {'path': sub_path, 'annotation_type': k, 'subannotation_type': s, 'corpus_name': corpus_context.cypher_safe_name} @@ -328,16 +346,24 @@ def import_lexicon_csvs(corpus_context, typed_data, case_sensitive=False): else: lex_path = 'file:///{}'.format(make_path_safe(path)) if case_sensitive: - import_statement = '''USING PERIODIC COMMIT 3000 - LOAD CSV WITH HEADERS FROM "{path}" AS csvLine - with csvLine - MATCH (n:{word_type}_type:{corpus_name}) where n.label = csvLine.label - SET {new_properties}''' + import_statement = ''' + CALL { + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + WITH csvLine + MATCH (n:{word_type}_type:{corpus_name}) WHERE n.label = csvLine.label + SET {new_properties} + } IN TRANSACTIONS OF 3000 ROWS + ''' + else: - import_statement = '''USING PERIODIC COMMIT 3000 - LOAD CSV WITH HEADERS FROM "{path}" AS csvLine - MATCH (n:{word_type}_type:{corpus_name}) where n.label_insensitive = csvLine.label - SET {new_properties}''' + import_statement = ''' + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MATCH (n:{word_type}_type:{corpus_name}) WHERE n.label_insensitive = csvLine.label + SET {new_properties} + }} IN TRANSACTIONS OF 3000 ROWS + ''' + statement = import_statement.format(path=lex_path, corpus_name=corpus_context.cypher_safe_name, @@ -346,7 +372,7 @@ def import_lexicon_csvs(corpus_context, typed_data, case_sensitive=False): corpus_context.execute_cypher(statement) for h, v in typed_data.items(): try: - corpus_context.execute_cypher('CREATE INDEX ON :%s(%s)' % (corpus_context.word_name, h)) + corpus_context.execute_cypher('CREATE INDEX FOR (n:%s) ON (n.%s)' % (corpus_context.word_name, h)) except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise @@ -401,7 +427,7 @@ def import_feature_csvs(corpus_context, typed_data): corpus_context.execute_cypher(statement) for h, v in typed_data.items(): try: - corpus_context.execute_cypher('CREATE INDEX ON :%s(%s)' % (corpus_context.phone_name, h)) + corpus_context.execute_cypher('CREATE INDEX FOR (n:%s) ON (n.%s)' % (corpus_context.phone_name, h)) except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise @@ -456,7 +482,7 @@ def import_syllable_enrichment_csvs(corpus_context, typed_data): corpus_context.execute_cypher(statement) for h, v in typed_data.items(): try: - corpus_context.execute_cypher('CREATE INDEX ON :%s(%s)' % ("syllable", h)) + corpus_context.execute_cypher('CREATE INDEX FOR (n:%s) ON (n.%s)' % ("syllable", h)) except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise @@ -510,7 +536,7 @@ def import_utterance_enrichment_csvs(corpus_context, typed_data): corpus_context.execute_cypher(statement) for h, v in typed_data.items(): try: - corpus_context.execute_cypher('CREATE INDEX ON :%s(%s)' % ("utterance", h)) + corpus_context.execute_cypher('CREATE INDEX FOR (n:%s) ON (n.%s)' % ("utterance", h)) except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise @@ -563,7 +589,7 @@ def import_speaker_csvs(corpus_context, typed_data): corpus_context.execute_cypher(statement) for h, v in typed_data.items(): try: - corpus_context.execute_cypher('CREATE INDEX ON :Speaker(%s)' % h) + corpus_context.execute_cypher('CREATE INDEX FOR (s:Speaker) ON (s.%s)' % h) except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise @@ -617,7 +643,7 @@ def import_discourse_csvs(corpus_context, typed_data): corpus_context.execute_cypher(statement) for h, v in typed_data.items(): try: - corpus_context.execute_cypher('CREATE INDEX ON :Discourse(%s)' % h) + corpus_context.execute_cypher('CREATE INDEX FOR (d:Discourse) ON (d.%s)' % h) except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise @@ -641,7 +667,7 @@ def import_utterance_csv(corpus_context, call_back=None, stop_check=None): call_back('Importing data...') call_back(0, len(speakers)) try: - corpus_context.execute_cypher('CREATE CONSTRAINT ON (node:utterance) ASSERT node.id IS UNIQUE') + corpus_context.execute_cypher('CREATE CONSTRAINT FOR (node:utterance) REQUIRE node.id IS UNIQUE') except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise @@ -666,12 +692,17 @@ def import_utterance_csv(corpus_context, call_back=None, stop_check=None): begin = time.time() node_statement = ''' - USING PERIODIC COMMIT 1000 - LOAD CSV WITH HEADERS FROM "{path}" AS csvLine - MATCH (begin:{word_type}:{corpus}:speech {{id: csvLine.begin_word_id}}), + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MATCH (begin:{word_type}:{corpus}:speech {{id: csvLine.begin_word_id}}), (end:{word_type}:{corpus}:speech {{id: csvLine.end_word_id}}) - WITH csvLine, begin, end - CREATE (utt:utterance:{corpus}:speech {{id: csvLine.id, begin: begin.begin, end: end.end}})-[:is_a]->(u_type:utterance_type:{corpus}) + WITH csvLine, begin, end + CREATE (utt:utterance:{corpus}:speech {{ + id: csvLine.id, + begin: begin.begin, + end: end.end + }})-[:is_a]->(u_type:utterance_type:{corpus}) + }} IN TRANSACTIONS OF 1000 ROWS ''' statement = node_statement.format(path=csv_path, @@ -682,14 +713,17 @@ def import_utterance_csv(corpus_context, call_back=None, stop_check=None): print('Utterance node creation took {} seconds.'.format(time.time() - begin)) begin = time.time() - rel_statement = '''USING PERIODIC COMMIT 1000 - LOAD CSV WITH HEADERS FROM "{path}" AS csvLine - MATCH (d:Discourse:{corpus})<-[:spoken_in]-(begin:{word_type}:{corpus}:speech {{id: csvLine.begin_word_id}})-[:spoken_by]->(s:Speaker:{corpus}), + rel_statement = ''' + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MATCH (d:Discourse:{corpus})<-[:spoken_in]-(begin:{word_type}:{corpus}:speech {{id: csvLine.begin_word_id}})-[:spoken_by]->(s:Speaker:{corpus}), (utt:utterance:{corpus}:speech {{id: csvLine.id}}) - CREATE - (d)<-[:spoken_in]-(utt), - (s)<-[:spoken_by]-(utt) + CREATE + (d)<-[:spoken_in]-(utt), + (s)<-[:spoken_by]-(utt) + }} IN TRANSACTIONS OF 1000 ROWS ''' + statement = rel_statement.format(path=csv_path, corpus=corpus_context.cypher_safe_name, word_type=corpus_context.word_name) @@ -698,12 +732,14 @@ def import_utterance_csv(corpus_context, call_back=None, stop_check=None): print('Spoken relationship creation took {} seconds.'.format(time.time() - begin)) begin = time.time() - rel_statement = '''USING PERIODIC COMMIT 1000 - LOAD CSV WITH HEADERS FROM "{path}" AS csvLine - MATCH (begin:{word_type}:{corpus}:speech {{id: csvLine.begin_word_id}}), + rel_statement = ''' + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MATCH (begin:{word_type}:{corpus}:speech {{id: csvLine.begin_word_id}}), (utt:utterance:{corpus}:speech {{id: csvLine.id}}), - (prev:utterance {{id:csvLine.prev_id}}) - CREATE (prev)-[:precedes]->(utt) + (prev:utterance {{id: csvLine.prev_id}}) + CREATE (prev)-[:precedes]->(utt) + }} IN TRANSACTIONS OF 1000 ROWS ''' statement = rel_statement.format(path=csv_path, corpus=corpus_context.cypher_safe_name, @@ -713,15 +749,17 @@ def import_utterance_csv(corpus_context, call_back=None, stop_check=None): print('Precedence relationship creation took {} seconds.'.format(time.time() - begin)) begin = time.time() - word_statement = '''USING PERIODIC COMMIT 1000 - LOAD CSV WITH HEADERS FROM "{path}" AS csvLine - MATCH (begin:{word_type}:{corpus}:speech {{id: csvLine.begin_word_id}}), + word_statement = ''' + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MATCH (begin:{word_type}:{corpus}:speech {{id: csvLine.begin_word_id}}), (utt:utterance:{corpus}:speech {{id: csvLine.id}}), (end:{word_type}:{corpus}:speech {{id: csvLine.end_word_id}}), path = shortestPath((begin)-[:precedes*0..]->(end)) - WITH utt, nodes(path) as words - UNWIND words as w - CREATE (w)-[:contained_by]->(utt) + WITH utt, nodes(path) AS words + UNWIND words AS w + CREATE (w)-[:contained_by]->(utt) + }} IN TRANSACTIONS OF 1000 ROWS ''' statement = word_statement.format(path=csv_path, corpus=corpus_context.cypher_safe_name, @@ -747,37 +785,37 @@ def import_syllable_csv(corpus_context, call_back=None, stop_check=None): call_back('Importing syllables...') call_back(0, len(speakers)) try: - corpus_context.execute_cypher('CREATE CONSTRAINT ON (node:syllable) ASSERT node.id IS UNIQUE') + corpus_context.execute_cypher('CREATE CONSTRAINT FOR (node:syllable) REQUIRE node.id IS UNIQUE') except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise try: - corpus_context.execute_cypher('CREATE CONSTRAINT ON (node:syllable_type) ASSERT node.id IS UNIQUE') + corpus_context.execute_cypher('CREATE CONSTRAINT FOR (node:syllable_type) REQUIRE node.id IS UNIQUE') except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise try: - corpus_context.execute_cypher('CREATE INDEX ON :syllable(begin)') + corpus_context.execute_cypher('CREATE INDEX FOR (s:syllable) ON (s.begin)') except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise try: - corpus_context.execute_cypher('CREATE INDEX ON :syllable(prev_id)') + corpus_context.execute_cypher('CREATE INDEX FOR (s:syllable) ON (s.prev_id)') except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise try: - corpus_context.execute_cypher('CREATE INDEX ON :syllable(end)') + corpus_context.execute_cypher('CREATE INDEX FOR (s:syllable) ON (s.end)') except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise try: - corpus_context.execute_cypher('CREATE INDEX ON :syllable(label)') + corpus_context.execute_cypher('CREATE INDEX FOR (s:syllable) ON (s.label)') except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise try: - corpus_context.execute_cypher('CREATE INDEX ON :syllable_type(label)') + corpus_context.execute_cypher('CREATE INDEX FOR (s:syllable_type) ON (s.label)') except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise @@ -801,10 +839,11 @@ def import_syllable_csv(corpus_context, call_back=None, stop_check=None): begin = time.time() nucleus_statement = ''' - USING PERIODIC COMMIT 2000 - LOAD CSV WITH HEADERS FROM "{path}" as csvLine - MATCH (n:{phone_name}:{corpus}:speech {{id: csvLine.vowel_id}})-[r:contained_by]->(w:{word_name}:{corpus}:speech) - SET n :nucleus, n.syllable_position = 'nucleus' + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MATCH (n:{phone_name}:{corpus}:speech {{id: csvLine.vowel_id}})-[r:contained_by]->(w:{word_name}:{corpus}:speech) + SET n :nucleus, n.syllable_position = 'nucleus' + }} IN TRANSACTIONS OF 2000 ROWS ''' statement = nucleus_statement.format(path=csv_path, corpus=corpus_context.cypher_safe_name, @@ -816,15 +855,20 @@ def import_syllable_csv(corpus_context, call_back=None, stop_check=None): begin = time.time() node_statement = ''' - USING PERIODIC COMMIT 2000 - LOAD CSV WITH HEADERS FROM "{path}" as csvLine - MERGE (s_type:syllable_type:{corpus} {{id: csvLine.type_id}}) - ON CREATE SET s_type.label = csvLine.label - WITH s_type, csvLine - CREATE (s:syllable:{corpus}:speech {{id: csvLine.id, prev_id: csvLine.prev_id, - label: csvLine.label, - begin: toFloat(csvLine.begin), end: toFloat(csvLine.end)}}), - (s)-[:is_a]->(s_type) + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MERGE (s_type:syllable_type:{corpus} {{id: csvLine.type_id}}) + ON CREATE SET s_type.label = csvLine.label + WITH s_type, csvLine + CREATE (s:syllable:{corpus}:speech {{ + id: csvLine.id, + prev_id: csvLine.prev_id, + label: csvLine.label, + begin: toFloat(csvLine.begin), + end: toFloat(csvLine.end) + }}), + (s)-[:is_a]->(s_type) + }} IN TRANSACTIONS OF 2000 ROWS ''' statement = node_statement.format(path=csv_path, corpus=corpus_context.cypher_safe_name) @@ -834,13 +878,14 @@ def import_syllable_csv(corpus_context, call_back=None, stop_check=None): begin = time.time() rel_statement = ''' - USING PERIODIC COMMIT 2000 - LOAD CSV WITH HEADERS FROM "{path}" as csvLine - MATCH (n:{phone_name}:{corpus}:speech:nucleus {{id: csvLine.vowel_id}})-[:contained_by]->(w:{word_name}:{corpus}:speech), + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MATCH (n:{phone_name}:{corpus}:speech:nucleus {{id: csvLine.vowel_id}})-[:contained_by]->(w:{word_name}:{corpus}:speech), (s:syllable:{corpus}:speech {{id: csvLine.id}}) - WITH n, w, s - CREATE (s)-[:contained_by]->(w), + WITH n, w, s + CREATE (s)-[:contained_by]->(w), (n)-[:contained_by]->(s) + }} IN TRANSACTIONS OF 2000 ROWS ''' statement = rel_statement.format(path=csv_path, corpus=corpus_context.cypher_safe_name, @@ -852,15 +897,16 @@ def import_syllable_csv(corpus_context, call_back=None, stop_check=None): begin = time.time() rel_statement = ''' - USING PERIODIC COMMIT 2000 - LOAD CSV WITH HEADERS FROM "{path}" as csvLine - MATCH (n:{phone_name}:{corpus}:speech:nucleus {{id: csvLine.vowel_id}}), + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MATCH (n:{phone_name}:{corpus}:speech:nucleus {{id: csvLine.vowel_id}}), (s:syllable:{corpus}:speech {{id: csvLine.id}}), (n)-[:spoken_by]->(sp:Speaker), (n)-[:spoken_in]->(d:Discourse) - WITH sp, d, s - CREATE (s)-[:spoken_by]->(sp), + WITH sp, d, s + CREATE (s)-[:spoken_by]->(sp), (s)-[:spoken_in]->(d) + }} IN TRANSACTIONS OF 2000 ROWS ''' statement = rel_statement.format(path=csv_path, corpus=corpus_context.cypher_safe_name, @@ -872,12 +918,13 @@ def import_syllable_csv(corpus_context, call_back=None, stop_check=None): begin = time.time() prev_rel_statement = ''' - USING PERIODIC COMMIT 2000 - LOAD CSV WITH HEADERS FROM "{path}" as csvLine - MATCH (s:syllable:{corpus}:speech {{id: csvLine.id}}) - with csvLine, s - MATCH (prev:syllable {{id:csvLine.prev_id}}) - CREATE (prev)-[:precedes]->(s) + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MATCH (s:syllable:{corpus}:speech {{id: csvLine.id}}) + WITH csvLine, s + MATCH (prev:syllable {{id: csvLine.prev_id}}) + CREATE (prev)-[:precedes]->(s) + }} IN TRANSACTIONS OF 2000 ROWS ''' statement = prev_rel_statement.format(path=csv_path, corpus=corpus_context.cypher_safe_name, @@ -889,10 +936,11 @@ def import_syllable_csv(corpus_context, call_back=None, stop_check=None): begin = time.time() del_rel_statement = ''' - USING PERIODIC COMMIT 2000 - LOAD CSV WITH HEADERS FROM "{path}" as csvLine - MATCH (n:{phone_name}:{corpus}:speech:nucleus {{id: csvLine.vowel_id}})-[r:contained_by]->(w:{word_name}:{corpus}:speech) - DELETE r + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MATCH (n:{phone_name}:{corpus}:speech:nucleus {{id: csvLine.vowel_id}})-[r:contained_by]->(w:{word_name}:{corpus}:speech) + DELETE r + }} IN TRANSACTIONS OF 2000 ROWS ''' statement = del_rel_statement.format(path=csv_path, corpus=corpus_context.cypher_safe_name, @@ -904,23 +952,22 @@ def import_syllable_csv(corpus_context, call_back=None, stop_check=None): begin = time.time() onset_statement = ''' - USING PERIODIC COMMIT 2000 - LOAD CSV WITH HEADERS FROM "{path}" as csvLine - MATCH (n:{phone_name}:nucleus:{corpus}:speech)-[:contained_by]->(s:syllable:{corpus}:speech {{id: csvLine.id}})-[:contained_by]->(w:{word_name}:{corpus}:speech) - WITH csvLine, s, w, n - OPTIONAL MATCH + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MATCH (n:{phone_name}:nucleus:{corpus}:speech)-[:contained_by]->(s:syllable:{corpus}:speech {{id: csvLine.id}})-[:contained_by]->(w:{word_name}:{corpus}:speech) + WITH csvLine, s, w, n + OPTIONAL MATCH (onset:{phone_name}:{corpus} {{id: csvLine.onset_id}}), onspath = (onset)-[:precedes*1..10]->(n) - - with n, w,s, csvLine, onspath - UNWIND (case when onspath is not null then nodes(onspath)[0..-1] else [null] end) as o - - OPTIONAL MATCH (o)-[r:contained_by]->(w) - with n, w,s, csvLine, [x in collect(o) WHERE x is not NULL| x] as ons, - [x in collect(r) WHERE x is not NULL | x] as rels - FOREACH (o in ons | SET o :onset, o.syllable_position = 'onset') - FOREACH (o in ons | CREATE (o)-[:contained_by]->(s)) - FOREACH (r in rels | DELETE r) + WITH n, w, s, csvLine, onspath + UNWIND (CASE WHEN onspath IS NOT NULL THEN nodes(onspath)[0..-1] ELSE [NULL] END) AS o + OPTIONAL MATCH (o)-[r:contained_by]->(w) + WITH n, w, s, csvLine, [x IN collect(o) WHERE x IS NOT NULL | x] AS ons, + [x IN collect(r) WHERE x IS NOT NULL | x] AS rels + FOREACH (o IN ons | SET o :onset, o.syllable_position = 'onset') + FOREACH (o IN ons | CREATE (o)-[:contained_by]->(s)) + FOREACH (r IN rels | DELETE r) + }} IN TRANSACTIONS OF 2000 ROWS ''' statement = onset_statement.format(path=csv_path, corpus=corpus_context.cypher_safe_name, @@ -931,25 +978,25 @@ def import_syllable_csv(corpus_context, call_back=None, stop_check=None): print('Onset hierarchical relationship creation took {} seconds.'.format(time.time() - begin)) begin = time.time() - coda_statment = ''' - USING PERIODIC COMMIT 2000 - LOAD CSV WITH HEADERS FROM "{path}" as csvLine - MATCH (n:nucleus:{corpus}:speech)-[:contained_by]->(s:syllable:{corpus}:speech {{id: csvLine.id}})-[:contained_by]->(w:{word_name}:{corpus}:speech) - WITH csvLine, s, w, n - OPTIONAL MATCH + coda_statement = ''' + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MATCH (n:nucleus:{corpus}:speech)-[:contained_by]->(s:syllable:{corpus}:speech {{id: csvLine.id}})-[:contained_by]->(w:{word_name}:{corpus}:speech) + WITH csvLine, s, w, n + OPTIONAL MATCH (coda:{phone_name}:{corpus} {{id: csvLine.coda_id}}), - codapath = (n)-[:precedes*1..10]->(coda) - WITH n, w, s, codapath - UNWIND (case when codapath is not null then nodes(codapath)[1..] else [null] end) as c - - OPTIONAL MATCH (c)-[r:contained_by]->(w) - WITH n, w,s, [x in collect(c) WHERE x is not NULL | x] as cod, - [x in collect(r) WHERE x is not NULL | x] as rels - FOREACH (c in cod | SET c :coda, c.syllable_position = 'coda') - FOREACH (c in cod | CREATE (c)-[:contained_by]->(s)) - FOREACH (r in rels | DELETE r) + codapath = (n)-[:precedes*1..10]->(coda) + WITH n, w, s, codapath + UNWIND (CASE WHEN codapath IS NOT NULL THEN nodes(codapath)[1..] ELSE [NULL] END) AS c + OPTIONAL MATCH (c)-[r:contained_by]->(w) + WITH n, w, s, [x IN collect(c) WHERE x IS NOT NULL | x] AS cod, + [x IN collect(r) WHERE x IS NOT NULL | x] AS rels + FOREACH (c IN cod | SET c :coda, c.syllable_position = 'coda') + FOREACH (c IN cod | CREATE (c)-[:contained_by]->(s)) + FOREACH (r IN rels | DELETE r) + }} IN TRANSACTIONS OF 2000 ROWS ''' - statement = coda_statment.format(path=csv_path, + statement = coda_statement.format(path=csv_path, corpus=corpus_context.cypher_safe_name, word_name=corpus_context.word_name, phone_name=corpus_context.phone_name) @@ -976,32 +1023,32 @@ def import_nonsyl_csv(corpus_context, call_back=None, stop_check=None): call_back('Importing degenerate syllables...') call_back(0, len(speakers)) try: - corpus_context.execute_cypher('CREATE CONSTRAINT ON (node:syllable) ASSERT node.id IS UNIQUE') + corpus_context.execute_cypher('CREATE CONSTRAINT FOR (node:syllable) REQUIRE node.id IS UNIQUE') except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise try: - corpus_context.execute_cypher('CREATE CONSTRAINT ON (node:syllable_type) ASSERT node.id IS UNIQUE') + corpus_context.execute_cypher('CREATE CONSTRAINT FOR (node:syllable_type) REQUIRE node.id IS UNIQUE') except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise try: - corpus_context.execute_cypher('CREATE INDEX ON :syllable(begin)') + corpus_context.execute_cypher('CREATE INDEX FOR (s:syllable) ON (s.begin)') except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise try: - corpus_context.execute_cypher('CREATE INDEX ON :syllable(end)') + corpus_context.execute_cypher('CREATE INDEX FOR (s:syllable) ON (s.end)') except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise try: - corpus_context.execute_cypher('CREATE INDEX ON :syllable(label)') + corpus_context.execute_cypher('CREATE INDEX FOR (s:syllable) ON (s.label)') except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise try: - corpus_context.execute_cypher('CREATE INDEX ON :syllable_type(label)') + corpus_context.execute_cypher('CREATE INDEX FOR (s:syllable_type) ON (s.label)') except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise @@ -1026,15 +1073,21 @@ def import_nonsyl_csv(corpus_context, call_back=None, stop_check=None): csv_path = 'file:///{}'.format(make_path_safe(path)) begin = time.time() - node_statement = '''USING PERIODIC COMMIT 2000 - LOAD CSV WITH HEADERS FROM "{path}" as csvLine - MERGE (s_type:syllable_type:{corpus} {{id: csvLine.type_id}}) - ON CREATE SET s_type.label = csvLine.label - WITH s_type, csvLine - CREATE (s:syllable:{corpus}:speech {{id: csvLine.id, prev_id: csvLine.prev_id, - begin: toFloat(csvLine.begin), end: toFloat(csvLine.end), - label: csvLine.label}}), - (s)-[:is_a]->(s_type) + node_statement = ''' + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MERGE (s_type:syllable_type:{corpus} {{id: csvLine.type_id}}) + ON CREATE SET s_type.label = csvLine.label + WITH s_type, csvLine + CREATE (s:syllable:{corpus}:speech {{ + id: csvLine.id, + prev_id: csvLine.prev_id, + begin: toFloat(csvLine.begin), + end: toFloat(csvLine.end), + label: csvLine.label + }}), + (s)-[:is_a]->(s_type) + }} IN TRANSACTIONS OF 2000 ROWS ''' statement = node_statement.format(path=csv_path, @@ -1045,16 +1098,17 @@ def import_nonsyl_csv(corpus_context, call_back=None, stop_check=None): begin = time.time() rel_statement = ''' - USING PERIODIC COMMIT 2000 - LOAD CSV WITH HEADERS FROM "{path}" as csvLine - MATCH (o:{phone_name}:{corpus}:speech {{id: csvLine.onset_id}})-[r:contained_by]->(w:{word_name}:{corpus}:speech), + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MATCH (o:{phone_name}:{corpus}:speech {{id: csvLine.onset_id}})-[r:contained_by]->(w:{word_name}:{corpus}:speech), (o)-[:spoken_by]->(sp:Speaker), (o)-[:spoken_in]->(d:Discourse), (s:syllable:{corpus}:speech {{id: csvLine.id}}) - WITH w, csvLine, sp, d, s - CREATE (s)-[:contained_by]->(w), + WITH w, csvLine, sp, d, s + CREATE (s)-[:contained_by]->(w), (s)-[:spoken_by]->(sp), (s)-[:spoken_in]->(d) + }} IN TRANSACTIONS OF 2000 ROWS ''' statement = rel_statement.format(path=csv_path, corpus=corpus_context.cypher_safe_name, @@ -1066,12 +1120,13 @@ def import_nonsyl_csv(corpus_context, call_back=None, stop_check=None): begin = time.time() rel_statement = ''' - USING PERIODIC COMMIT 2000 - LOAD CSV WITH HEADERS FROM "{path}" as csvLine - MATCH (s:syllable:{corpus}:speech {{id: csvLine.id}}) - with csvLine, s - MATCH (prev:syllable {{id:csvLine.prev_id}}) - CREATE (prev)-[:precedes]->(s) + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" as csvLine + MATCH (s:syllable:{corpus}:speech {{id: csvLine.id}}) + with csvLine, s + MATCH (prev:syllable {{id:csvLine.prev_id}}) + CREATE (prev)-[:precedes]->(s) + }} IN TRANSACTIONS OF 2000 ROWS ''' statement = rel_statement.format(path=csv_path, corpus=corpus_context.cypher_safe_name, @@ -1083,12 +1138,13 @@ def import_nonsyl_csv(corpus_context, call_back=None, stop_check=None): begin = time.time() rel_statement = ''' - USING PERIODIC COMMIT 2000 - LOAD CSV WITH HEADERS FROM "{path}" as csvLine - MATCH (s:syllable:{corpus}:speech {{id: csvLine.id}}) - with csvLine, s - MATCH (foll:syllable {{prev_id:csvLine.id}}) - CREATE (s)-[:precedes]->(foll) + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" as csvLine + MATCH (s:syllable:{corpus}:speech {{id: csvLine.id}}) + with csvLine, s + MATCH (foll:syllable {{prev_id:csvLine.id}}) + CREATE (s)-[:precedes]->(foll) + }} IN TRANSACTIONS OF 2000 ROWS ''' statement = rel_statement.format(path=csv_path, corpus=corpus_context.cypher_safe_name, @@ -1099,24 +1155,25 @@ def import_nonsyl_csv(corpus_context, call_back=None, stop_check=None): print('Second precedence relationship creation took {} seconds.'.format(time.time() - begin)) begin = time.time() - phone_statement = '''USING PERIODIC COMMIT 2000 - LOAD CSV WITH HEADERS FROM "{path}" as csvLine - MATCH (o:{phone_name}:{corpus}:speech {{id: csvLine.onset_id}}), - (s:syllable:{corpus}:speech {{id: csvLine.id}})-[:contained_by]->(w:{word_name}:{corpus}:speech) - with o, w, csvLine, s - OPTIONAL MATCH - (c:{phone_name}:{corpus}:speech {{id: csvLine.coda_id}})-[:contained_by]->(w), - p = (o)-[:precedes*..10]->(c) - with o, w, s, p, csvLine - UNWIND (case when p is not null then nodes(p) else [o] end) as c - - OPTIONAL MATCH (c)-[r:contained_by]->(w) - with w,s, toInteger(csvLine.break) as break, [x in collect(c) WHERE x is not NULL | x] as cod, - [x in collect(r) WHERE x is not NULL| x] as rels - FOREACH (c in cod[break..] | SET c :coda, c.syllable_position = 'coda') - FOREACH (c in cod[..break] | SET c :onset, c.syllable_position = 'onset') - FOREACH (c in cod | CREATE (c)-[:contained_by]->(s)) - FOREACH (r in rels | DELETE r) + phone_statement = ''' + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MATCH (o:{phone_name}:{corpus}:speech {{id: csvLine.onset_id}}), + (s:syllable:{corpus}:speech {{id: csvLine.id}})-[:contained_by]->(w:{word_name}:{corpus}:speech) + WITH o, w, csvLine, s + OPTIONAL MATCH + (c:{phone_name}:{corpus}:speech {{id: csvLine.coda_id}})-[:contained_by]->(w), + p = (o)-[:precedes*..10]->(c) + WITH o, w, s, p, csvLine + UNWIND (CASE WHEN p IS NOT NULL THEN nodes(p) ELSE [o] END) AS c + OPTIONAL MATCH (c)-[r:contained_by]->(w) + WITH w, s, toInteger(csvLine.break) AS break, [x IN collect(c) WHERE x IS NOT NULL | x] AS cod, + [x IN collect(r) WHERE x IS NOT NULL | x] AS rels + FOREACH (c IN cod[break..] | SET c :coda, c.syllable_position = 'coda') + FOREACH (c IN cod[..break] | SET c :onset, c.syllable_position = 'onset') + FOREACH (c IN cod | CREATE (c)-[:contained_by]->(s)) + FOREACH (r IN rels | DELETE r) + }} IN TRANSACTIONS OF 2000 ROWS ''' statement = phone_statement.format(path=csv_path, corpus=corpus_context.cypher_safe_name, @@ -1155,7 +1212,7 @@ def import_subannotation_csv(corpus_context, type, annotated_type, props): prop_temp = '''{name}: csvLine.{name}''' properties = [] try: - corpus_context.execute_cypher('CREATE CONSTRAINT ON (node:%s) ASSERT node.id IS UNIQUE' % type) + corpus_context.execute_cypher('CREATE CONSTRAINT FOR (node:%s) REQUIRE node.id IS UNIQUE' % type) except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise @@ -1168,13 +1225,18 @@ def import_subannotation_csv(corpus_context, type, annotated_type, props): properties = ', ' + ', '.join(properties) else: properties = '' - statement = '''USING PERIODIC COMMIT 500 - LOAD CSV WITH HEADERS FROM "{path}" AS csvLine - MATCH (annotated:{a_type}:{corpus} {{id: csvLine.annotated_id}}) - CREATE (annotated) <-[:annotates]-(annotation:{type}:{corpus} - {{id: csvLine.id, type: $type, begin: toFloat(csvLine.begin), - end: toFloat(csvLine.end){properties}}}) - ''' + statement = ''' + CALL {{ + LOAD CSV WITH HEADERS FROM "{path}" AS csvLine + MATCH (annotated:{a_type}:{corpus} {{id: csvLine.annotated_id}}) + CREATE (annotated) <-[:annotates]-(annotation:{type}:{corpus} {{ + id: csvLine.id, + type: $type, + begin: toFloat(csvLine.begin), + end: toFloat(csvLine.end){properties} + }}) + }} IN TRANSACTIONS OF 500 ROWS + ''' statement = statement.format(path=csv_path, corpus=corpus_context.cypher_safe_name, a_type=annotated_type, @@ -1185,7 +1247,7 @@ def import_subannotation_csv(corpus_context, type, annotated_type, props): if p in ['id', 'annotated_id']: continue try: - corpus_context.execute_cypher('CREATE INDEX ON :%s(%s)' % (type, p)) + corpus_context.execute_cypher('CREATE INDEX FOR (n:%s) ON (n.%s)' % (type, p)) except neo4j.exceptions.ClientError as e: if e.code != 'Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists': raise @@ -1240,11 +1302,13 @@ def import_token_csv(corpus_context, path, annotated_type, id_column, properties corpus_context.encode_hierarchy() property_update = ', '.join(["x.{} = csvLine.{}".format(p, p) for p in properties]) - statement = '''USING PERIODIC COMMIT 500 - LOAD CSV WITH HEADERS FROM "file://{path}" AS csvLine - MATCH (x:{a_type}:{corpus} {{id: csvLine.{id_column}}}) - SET {property_update} - '''.format(path=path, a_type=annotated_type, corpus=corpus_context.cypher_safe_name, + statement = ''' + CALL {{ + LOAD CSV WITH HEADERS FROM "file://{path}" AS csvLine + MATCH (x:{a_type}:{corpus} {{id: csvLine.{id_column}}}) + SET {property_update} + }} IN TRANSACTIONS OF 500 ROWS + '''.format(path=path, a_type=annotated_type, corpus=corpus_context.cypher_safe_name, id_column=id_column, property_update=property_update) corpus_context.execute_cypher(statement) os.remove(path) diff --git a/bin/pgdb b/polyglotdb/pgdb.py similarity index 85% rename from bin/pgdb rename to polyglotdb/pgdb.py index 0ff50ffe..624ac51e 100644 --- a/bin/pgdb +++ b/polyglotdb/pgdb.py @@ -7,19 +7,9 @@ import configparser import subprocess import signal - from urllib.request import urlretrieve -import requests from tqdm import tqdm -CONFIG_DIR = os.path.expanduser('~/.pgdb') - -DEFAULT_DATA_DIR = os.path.join(CONFIG_DIR, 'data') - -CONFIG_PATH = os.path.join(CONFIG_DIR, 'config.ini') - -CONFIG_CHANGED = True - def load_config(): c = configparser.ConfigParser() @@ -48,11 +38,7 @@ def save_config(c): c.write(configfile) -CONFIG = load_config() - -TEMP_DIR = os.path.join(CONFIG_DIR, 'downloads') - -NEO4J_VERSION = '4.3.3' +NEO4J_VERSION = '5.21.0' INFLUXDB_VERSION = '1.8.9' @@ -98,7 +84,7 @@ def download_neo4j(data_directory, overwrite=False): import win32com.shell.shell as shell exe = 'neo4j.bat' neo4j_bin = os.path.join(CONFIG['Data']['directory'], 'neo4j', 'bin', exe) - params = 'install-service' + params = 'windows-service install' shell.ShellExecuteEx(lpVerb='runas', lpFile=neo4j_bin, lpParameters=params) return True @@ -108,34 +94,33 @@ def download_influxdb(data_directory, overwrite=False): if not overwrite and os.path.exists(influxdb_directory): print('Using existing InfluxDB installation.') return - if sys.platform != 'darwin': - os.makedirs(TEMP_DIR, exist_ok=True) - print('Downloading InfluxDB...') + os.makedirs(TEMP_DIR, exist_ok=True) + print('Downloading InfluxDB...') - if sys.platform.startswith('win'): - dist_string = 'windows_amd64.zip' - path = os.path.join(TEMP_DIR, 'influxdb.zip') - else: - dist_string = 'linux_amd64.tar.gz' - path = os.path.join(TEMP_DIR, 'influxdb.tar.gz') - - download_link = 'https://dl.influxdata.com/influxdb/releases/influxdb-{version}_{dist_string}'.format( - version=INFLUXDB_VERSION, dist_string=dist_string) - - with requests.get(download_link, stream=True) as r: - with open(path, 'wb') as f: - shutil.copyfileobj(r.raw, f) - shutil.unpack_archive(path, data_directory) - for d in os.listdir(data_directory): - if d.startswith(('influxdb')): - os.rename(os.path.join(data_directory, d), influxdb_directory) + if sys.platform.startswith('win'): + dist_string = 'windows_amd64.zip' + path = os.path.join(TEMP_DIR, 'influxdb.zip') + elif sys.platform == 'darwin': + dist_string = 'darwin_amd64.tar.gz' + path = os.path.join(TEMP_DIR, 'influxdb.tar.gz') else: - print('Installing InfluxDB...') - subprocess.call(['brew', 'update']) - subprocess.call(['brew', 'install', 'influxdb@1']) + dist_string = 'linux_amd64.tar.gz' + path = os.path.join(TEMP_DIR, 'influxdb.tar.gz') + + download_link = 'https://dl.influxdata.com/influxdb/releases/influxdb-{version}_{dist_string}'.format( + version=INFLUXDB_VERSION, dist_string=dist_string) + + with tqdm(unit='B', unit_scale=True, miniters=1) as t: + filename, headers = urlretrieve(download_link, path, reporthook=tqdm_hook(t), data=None) + shutil.unpack_archive(filename, data_directory) + for d in os.listdir(data_directory): + if d.startswith(('influxdb')): + os.rename(os.path.join(data_directory, d), influxdb_directory) + return True + def configure_neo4j(data_directory): from polyglotdb.databases.config import neo4j_template_path neo4j_conf_path = os.path.join(data_directory, 'neo4j', 'conf', 'neo4j.conf') @@ -177,11 +162,12 @@ def uninstall(): import win32com.shell.shell as shell exe = 'neo4j.bat' neo4j_bin = os.path.join(CONFIG['Data']['directory'], 'neo4j', 'bin', exe) - params = 'uninstall-service asadmin' + params = 'windows-service uninstall' shell.ShellExecuteEx(lpVerb='runas', lpFile=neo4j_bin, lpParameters=params) try: shutil.rmtree(directory) + shutil.rmtree(CONFIG_DIR) except FileNotFoundError: pass @@ -202,8 +188,6 @@ def start(): print(neo4j_bin) if sys.platform.startswith('win'): influxdb_bin = os.path.join(CONFIG['Data']['directory'], 'influxdb', 'influxd.exe') - elif sys.platform == 'darwin': - influxdb_bin = '/usr/local/opt/influxdb@1/bin/influxd' else: influxdb_bin = os.path.join(CONFIG['Data']['directory'], 'influxdb', 'usr', 'bin', 'influxd') influxdb_conf = os.path.join(CONFIG['Data']['directory'], 'influxdb', 'influxdb.conf') @@ -242,7 +226,26 @@ def status(name): pass -if __name__ == '__main__': +def main(): + global CONFIG_DIR + CONFIG_DIR = os.environ.get('PGDB_HOME', os.path.expanduser('~/.pgdb')) + + global DEFAULT_DATA_DIR + DEFAULT_DATA_DIR = os.path.join(CONFIG_DIR, 'data') + + global CONFIG_PATH + CONFIG_PATH = os.path.join(CONFIG_DIR, 'config.ini') + + global CONFIG_CHANGED + CONFIG_CHANGED = False + + global CONFIG + CONFIG = load_config() + + global TEMP_DIR + TEMP_DIR = os.path.join(CONFIG_DIR, 'downloads') + + parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(help='Command to use') install_parser = subparsers.add_parser("install") @@ -311,3 +314,6 @@ def status(name): if CONFIG_CHANGED: save_config(CONFIG) + +if __name__ == '__main__': + main() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..93d9d895 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,6 @@ +[build-system] +requires = ["setuptools>=64", "setuptools_scm>=8"] +build-backend = "setuptools.build_meta" + +[tool.setuptools_scm] +version_file = "polyglotdb/_version.py" \ No newline at end of file diff --git a/pytest.ini b/pytest.ini index 0aa1db69..88dc11bd 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,3 +1,5 @@ [pytest] +addopts = --strict --verbose --tb=long -x +testpaths = tests markers = acoustic: mark a test as an acoustic test \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 22bd9a03..329622a9 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ -neo4j-driver ~= 4.3 +neo4j librosa -scipy +scipy ~= 1.12.0 praatio ~= 5.0 textgrid influxdb diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 00000000..9db30a4b --- /dev/null +++ b/setup.cfg @@ -0,0 +1,49 @@ +[metadata] +name = polyglotdb +long_description = file: README.md +long_description_content_type = text/markdown +url = https://github.com/MontrealCorpusTools/PolyglotDB +classifiers = + Development Status :: 3 - Alpha + Programming Language :: Python + Programming Language :: Python :: 3 + Intended Audience :: Science/Research + License :: OSI Approved :: MIT License + Operating System :: OS Independent + Topic :: Scientific/Engineering + Topic :: Text Processing :: Linguistic +keywords = + phonology + corpus + phonetics +author = Montreal Corpus Tools +author_email = michael.e.mcauliffe@gmail.com +license = MIT +license_file = LICENSE + +[options] +packages = find: +install_requires = + neo4j + praatio<=5.0 + textgrid + conch_sounds + librosa + influxdb + tqdm + requests + scipy<=1.12.0 + pywin32; os_name == 'nt' +include_package_data = True + +[options.entry_points] +console_scripts = + pgdb = polyglotdb.pgdb:main + +[options.extras_require] +test = pytest + +[options.package_data] +polyglotdb = + databases/*.conf + polyglotdb/acoustics/formants/*.praat diff --git a/setup.py b/setup.py index 7a7ba6e8..fc1f76c8 100755 --- a/setup.py +++ b/setup.py @@ -1,107 +1,3 @@ -import sys -import os from setuptools import setup -from setuptools.command.test import test as TestCommand -import codecs - -def readme(): - with open('README.md') as f: - return f.read() - - -class PyTest(TestCommand): - def finalize_options(self): - TestCommand.finalize_options(self) - self.test_args = ['--strict', '--verbose', '--tb=long', 'tests', '-x'] - self.test_suite = True - - def run_tests(self): - if __name__ == '__main__': # Fix for multiprocessing infinite recursion on Windows - import pytest - errcode = pytest.main(self.test_args) - sys.exit(errcode) - -def read(rel_path): - here = os.path.abspath(os.path.dirname(__file__)) - with codecs.open(os.path.join(here, rel_path), 'r') as fp: - return fp.read() - - -def get_version(rel_path): - delim = ' = ' - for line in read(rel_path).splitlines(): - if line.startswith('__ver_major__'): - major_version = line.split(delim)[1] - elif line.startswith('__ver_minor__'): - minor_version = line.split(delim)[1] - elif line.startswith('__ver_patch__'): - patch_version = line.split(delim)[1].replace("'", '') - break - else: - raise RuntimeError("Unable to find version string.") - return "{}.{}.{}".format(major_version, minor_version, patch_version) - - -if __name__ == '__main__': - setup(name='polyglotdb', - version=get_version("polyglotdb/__init__.py"), - description='', - long_description=readme(), - long_description_content_type='text/markdown', - classifiers=[ - 'Development Status :: 3 - Alpha', - 'Programming Language :: Python', - 'Programming Language :: Python :: 3', - 'Intended Audience :: Science/Research', - 'License :: OSI Approved :: MIT License', - 'Operating System :: OS Independent', - 'Topic :: Scientific/Engineering', - 'Topic :: Text Processing :: Linguistic', - ], - keywords='phonology corpus phonetics', - url='https://github.com/MontrealCorpusTools/PolyglotDB', - author='Montreal Corpus Tools', - author_email='michael.e.mcauliffe@gmail.com', - packages=['polyglotdb', - 'polyglotdb.acoustics', - 'polyglotdb.acoustics.formants', - 'polyglotdb.acoustics.pitch', - 'polyglotdb.acoustics.vot', - 'polyglotdb.client', - 'polyglotdb.corpus', - 'polyglotdb.databases', - 'polyglotdb.io', - 'polyglotdb.io.types', - 'polyglotdb.io.parsers', - 'polyglotdb.io.inspect', - 'polyglotdb.io.exporters', - 'polyglotdb.io.importer', - 'polyglotdb.io.enrichment', - 'polyglotdb.query', - 'polyglotdb.query.base', - 'polyglotdb.query.annotations', - 'polyglotdb.query.annotations.attributes', - 'polyglotdb.query.discourse', - 'polyglotdb.query.speaker', - 'polyglotdb.query.lexicon', - 'polyglotdb.query.metadata', - 'polyglotdb.syllabification'], - package_data={'polyglotdb.databases': ['*.conf'], - 'polyglotdb.acoustics.formants': ['*.praat']}, - install_requires=[ - 'neo4j-driver~=4.3', - 'praatio~=5.0', - 'textgrid', - 'conch_sounds', - 'librosa', - 'influxdb', - 'tqdm', - 'requests' - ], - scripts=['bin/pgdb'], - cmdclass={'test': PyTest}, - extras_require={ - 'testing': ['pytest'], - } - ) +setup() \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index ccf327a4..4f38bef3 100755 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -538,6 +538,8 @@ def praat_path(): return 'praat.exe' elif os.environ.get('TRAVIS', False): return os.path.join(os.environ.get('HOME'), 'tools', 'praat') + elif sys.platform == 'darwin': + return '/Applications/Praat.app/Contents/MacOS/Praat' else: return 'praat'