forked from NVIDIA-AI-Blueprints/vulnerability-analysis
-
Notifications
You must be signed in to change notification settings - Fork 4
/
config.py
330 lines (211 loc) · 9.26 KB
/
config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import tempfile
import typing
from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Discriminator
from pydantic import Field
from pydantic import FilePath
from pydantic import NonNegativeInt
from pydantic import PositiveInt
from pydantic import Tag
from pydantic import field_validator
from pydantic.functional_validators import AfterValidator
from morpheus.utils.http_utils import HTTPMethod
from .common import TypedBaseModel
from .input import AgentMorpheusInput
def _llm_discriminator(v: typing.Any) -> str | None:
if isinstance(v, dict):
return v.get("service").get("_type")
return getattr(getattr(v, "service"), "_type")
class GeneralConfig(BaseModel):
model_config = ConfigDict(protected_namespaces=())
base_vdb_dir: str = os.path.join(tempfile.gettempdir(), "am_cache", "vdb")
base_git_dir: str = os.path.join(tempfile.gettempdir(), "am_cache", "git")
enable_llm_list_parsing: bool = False
cache_dir: str | None = None
ignore_build_vdb_errors: bool = False
max_retries: NonNegativeInt = 10
model_max_batch_size: PositiveInt = 64
num_threads: PositiveInt = Field(default_factory=os.cpu_count)
pipeline_batch_size: PositiveInt = 1024
use_uvloop: bool = True
"""
Whether to use uvloop for the event loop. This can provide a significant speedup in some cases. Disable to provide
better error messages when debugging.
"""
code_search_tool: bool = False
@field_validator("num_threads")
@classmethod
def get_num_threads(cls, v: int) -> int:
if v is None or v < 1:
return os.cpu_count() or 1
else:
return v
########################### LLM Model Configs ###########################
def check_prompt_file(prompt: str) -> str:
if prompt is not None and os.path.isfile(prompt):
with open(prompt, 'r') as prompt_file:
read_prompt = prompt_file.read()
return read_prompt
else:
return prompt
PromptConfig = typing.Annotated[str, AfterValidator(check_prompt_file)]
class NeMoLLMServiceConfig(TypedBaseModel[typing.Literal["nemo"]]):
api_key: str | None = None
org_id: str | None = None
retry_count: NonNegativeInt = 5
class NeMoLLMModelConfig(BaseModel):
service: NeMoLLMServiceConfig
model_name: str
prompt: PromptConfig | None = None
customization_id: str | None = None
temperature: typing.Annotated[float, Field(ge=0.0, le=1.0)] = 0.0
top_k: NonNegativeInt = 0
top_p: float = 1
random_seed: int | None = None
tokens_to_generate: int = 300
beam_search_diversity_rate: float = 0.0
beam_width: int = 1
repetition_penalty: float = 1.0
length_penalty: float = 1.0
logprobs: bool = True
model_config = ConfigDict(protected_namespaces=())
class NVFoundationLLMServiceConfig(TypedBaseModel[typing.Literal["nvfoundation"]]):
api_key: str | None = None
base_url: str | None = None
class NVFoundationLLMModelConfig(TypedBaseModel[typing.Literal["nvfoundation"]]):
service: NVFoundationLLMServiceConfig
model_name: str
prompt: PromptConfig | None = None
temperature: float = 0.0
top_p: float | None = None
max_tokens: PositiveInt = 300
seed: int | None = None
model_config = ConfigDict(protected_namespaces=())
class OpenAIServiceConfig(TypedBaseModel[typing.Literal["openai"]]):
api_key: str | None = None
base_url: str | None = None
class OpenAIModelConfig(BaseModel):
service: OpenAIServiceConfig
model_name: str
prompt: PromptConfig | None = None
temperature: float = 0.0
top_p: float = 1.0
seed: int | None = None
max_retries: int = 10
json_output: bool = Field(False, alias='json')
model_config = ConfigDict(protected_namespaces=(), populate_by_name=True)
LLMModelConfig = typing.Annotated[typing.Annotated[NeMoLLMModelConfig, Tag("nemo")]
| typing.Annotated[OpenAIModelConfig, Tag("openai")]
| typing.Annotated[NVFoundationLLMModelConfig, Tag("nvfoundation")],
Discriminator(_llm_discriminator)]
########################### Input Configs ###########################
class ManualInputConfig(TypedBaseModel[typing.Literal["manual"]]):
message: AgentMorpheusInput
repeat_count: int = 1
class FileInputConfig(TypedBaseModel[typing.Literal["file"]]):
file: str
repeat_count: int = 1
class HttpInputConfig(TypedBaseModel[typing.Literal["http"]]):
address: str = "127.0.0.1"
endpoint: str = "/scan"
port: int = 8080
http_method: HTTPMethod = HTTPMethod.POST
stop_after: int = 0
class PluginInputConfig(TypedBaseModel[typing.Literal["plugin"]]):
plugin_name: str
plugin_config: dict[str, typing.Any] = {}
InputConfig = typing.Annotated[typing.Annotated[ManualInputConfig, Tag(ManualInputConfig.static_type())]
| typing.Annotated[FileInputConfig, Tag(FileInputConfig.static_type())]
| typing.Annotated[HttpInputConfig, Tag(HttpInputConfig.static_type())]
| typing.Annotated[PluginInputConfig, Tag(PluginInputConfig.static_type())],
Discriminator(TypedBaseModel.discriminator)]
class HuggingFaceEmbeddingModelConfig(TypedBaseModel[typing.Literal["huggingface"]]):
model_config = ConfigDict(protected_namespaces=())
model_name: str
model_kwargs: dict[str, typing.Any] = {}
encode_kwargs: dict[str, typing.Any] = {}
class OpenAIEmbeddingModelConfig(TypedBaseModel[typing.Literal["openai"]]):
openai_api_key: str | None = None
max_retries: int = 10
chunk_size: int = 128
class NIMEmbeddingModelConfig(TypedBaseModel[typing.Literal["nim"]]):
api_key: str | None = None
model: str
truncate: typing.Literal["NONE", "START", "END"] = "END"
max_batch_size: int = 128
EmbeddingModelConfig = typing.Annotated[
typing.Annotated[HuggingFaceEmbeddingModelConfig, Tag(HuggingFaceEmbeddingModelConfig.static_type())]
| typing.Annotated[OpenAIEmbeddingModelConfig, Tag(OpenAIEmbeddingModelConfig.static_type())]
| typing.Annotated[NIMEmbeddingModelConfig, Tag(NIMEmbeddingModelConfig.static_type())],
Discriminator(TypedBaseModel.discriminator)]
class EngineAgentConfig(BaseModel):
model: LLMModelConfig
version_compare_tool: bool = False
prompt_examples: bool = False
verbose: bool = False
return_intermediate_steps: bool = False
return_source_documents: bool = False
class EngineConfig(BaseModel):
agent: EngineAgentConfig
checklist_model: LLMModelConfig
justification_model: LLMModelConfig
rag_embedding: EmbeddingModelConfig
sbom_faiss_dir: str | None = None
summary_model: LLMModelConfig
class OutputPrintConfig(TypedBaseModel[typing.Literal["print"]]):
pass
class OutputFileConfig(TypedBaseModel[typing.Literal["file"]]):
file_path: str | None = "./.tmp/agent_morpheus_output.json"
markdown_dir: str | None = "./.tmp/vulnerability_markdown_reports"
overwrite: bool = False
class OutputHttpConfig(TypedBaseModel[typing.Literal["http"]]):
url: str
endpoint: str
class OutputElasticsearchConfig(TypedBaseModel[typing.Literal["elasticsearch"]]):
url: str
index: str
conf_file: FilePath
raise_on_exception: bool = False
class OutputPluginConfig(TypedBaseModel[typing.Literal["plugin"]]):
plugin_name: str
plugin_config: dict[str, typing.Any] = {}
OutputConfig = typing.Annotated[typing.Annotated[OutputPrintConfig, Tag(OutputPrintConfig.static_type())]
| typing.Annotated[OutputFileConfig, Tag(OutputFileConfig.static_type())]
| typing.Annotated[OutputHttpConfig, Tag(OutputHttpConfig.static_type())]
| typing.Annotated[OutputPluginConfig, Tag(OutputPluginConfig.static_type())],
Discriminator(TypedBaseModel.discriminator)]
class RunConfig(BaseModel):
model_config = ConfigDict(protected_namespaces=())
# Global Options
general: GeneralConfig = GeneralConfig()
# Input Configuration
input: InputConfig
# Engine Configuration
engine: EngineConfig
# Output Configuration
output: OutputConfig
@staticmethod
def generate_json_schema() -> dict[str, typing.Any]:
return RunConfig.model_json_schema()
@staticmethod
def write_json_schema(schema_path: str) -> None:
import json
schema = RunConfig.generate_json_schema()
with open(schema_path, "w", encoding="utf-8") as f:
json.dump(schema, f, indent=2)