Searching Case Law Data with Natural Language
Introduction
Search is about understanding the user's intent and finding relevant information among vast amounts of data. Understanding intent is always challenging. From thought to language and back from language to thought, information loss is inevitable in this process, leading to misunderstandings and distortions. Therefore, it might be impossible to fully convey one's thoughts with just a single word or sentence.
Due to technical limitations, we have been searching for information using keywords. This method involves Full-Text Search (FTS) breaking down the input query into tokens and finding documents where those tokens match. Because it's hard to fully convey thoughts with this method, we've tried to understand intent by adding various syntaxes like "AND" and "OR." While this method often works efficiently and provides reasonably satisfactory results, our language changes its meaning depending on context, and sometimes the meaning is reversed due to particles treated almost as stop words in FTS. This is probably why we cannot abandon our desire to search using natural "natural language."
This desire, combined with the advancement of vector embedding technology that represents words or paragraphs as vectors with minimal information loss, and technologies like HNSW that search vectors, has led to the emergence of vector search methods. This technology allows us to better grasp contextual meanings that traditional keyword searches couldn't detect. Of course, this method is not perfect and has different strengths and weaknesses compared to FTS. Therefore, we currently prefer the Hybrid search method that uses both together.
This article builds upon Making Case Law Data Quickly Searchable, where we implemented a search system using FTS, and discusses how to build a service that searches in natural language using the vector or Hybrid search methods described earlier. You can check out the related demo at Case Law Search Demo.
Modifying the Search System
This work is based on the previous project. We will explain the content in the order of 1. Data Preparation, 2. Data Encoding, 3. Indexing, and 4. Demo Development. The tests described below were conducted on a MacBook Pro M3 Max with 36GB of RAM.
1. Data Preparation
We use the case law data provided by the Ministry of Legislation, which was preprocessed in the previous project.
2. Data Encoding
The vector embedding process is conducted similarly to Applying Natural Language Search to Product Search, using the same model intfloat/multilingual-e5-large
.
The overall code reads the previously preprocessed data and calls _encode
to extract and save the embedding vectors.
import re from pathlib import Path import click import pandas as pd from sentence_transformers import SentenceTransformer from tqdm import tqdm from utils import get_logger logger = get_logger(__file__) _MODEL_PATH = "intfloat/multilingual-e5-large" # 1024, 560M _CHUNK_SIZE = 512 _BATCH_SIZE = 16 _POOL_SIZE = 1 _EMPTY_LINE_PATTERN = re.compile(r"^\s+$", re.MULTILINE) # 중략 def _chunkify(df: pd.DataFrame, chunk_size: int): return [df.iloc[i : i + chunk_size] for i in range(0, len(df), chunk_size)] @click.command() @click.option( "--data_path", type=Path, help="Path of docs files", default="data/docs" ) def main(data_path): model_name = _MODEL_PATH.rsplit("/", maxsplit=1)[-1] output_path = Path(f"data/embedding/{model_name}") output_path.mkdir(exist_ok=True, parents=True) model = SentenceTransformer(_MODEL_PATH, device="mps") print(f"Dims: {model.get_sentence_embedding_dimension()}") pool = model.start_multi_process_pool( [f"mps:{id}" for id in range(_POOL_SIZE)] ) files = sorted(data_path.glob("*.pk")) for i, file in enumerate(files): logger.info("encoding docs: %s", file.name) df = pd.read_pickle(file, compression="gzip") chunks = _chunkify(df, _CHUNK_SIZE) for j, chunk in enumerate(tqdm(chunks)): output_file = output_path / f"docs_{i:02d}_{j:02d}.pk" _encode(model, pool, chunk, output_file) if __name__ == "__main__": main()
The preprocessed data (df
) is stored in the following format.
The _encode
function outputs and saves each of the text data fields, judgment summary (judgment_summary
) and holding statement (holding_statement
), as vectors (judgment_summary_embed
, holding_statement_embed
). Although there is variation in the length of the text data in these two fields, we output them as a single vector for convenience.
Additionally, since the main text is long, we split content
into multiple sentences using _split_chunks
and save it as a separate file to store it in an independent collection. This is because it's impossible to store the split main text in the same collection without data duplication.
In the future, we will cover how to index data with stored two-dimensional embedding vectors and integrate them into a collection without splitting collections like this.
def _encode( model: SentenceTransformer, pool: dict[str, any], df: pd.DataFrame, output_path: Path, ): judgment_embed = model.encode_multi_process( df.judgment_summary.tolist(), pool, batch_size=_BATCH_SIZE ) df_judgment_embed = pd.Series( list(judgment_embed), index=df.index, name="judgment_summary_embed" ) holding_embed = model.encode_multi_process( df.holding_statement.tolist(), pool, batch_size=_BATCH_SIZE ) df_holding_embed = pd.Series( list(holding_embed), index=df.index, name="holding_statement_embed" ) df = pd.concat( [df, df_judgment_embed, df_holding_embed], axis=1, ) df.to_pickle(output_path, compression="gzip") content_data = df.apply( lambda x: _split_chunks(x.doc_id, x.content, field_name="content"), axis=1, ).sum() df_content = pd.DataFrame(content_data) content_embed = model.encode_multi_process( df_content.content.tolist(), pool, batch_size=_BATCH_SIZE ) df_content_embed = pd.Series( list(content_embed), index=df_content.index, name="content_embed" ) df_content = pd.concat([df_content, df_content_embed], axis=1) df_content.to_pickle( output_path.with_suffix(".content.pk"), compression="gzip" )
We used SentenceSplitter
from llama_index
in _split_chunks
to divide passages (sentence fragments). There is no optimal sentence-splitting method for embedding, but we can consider the following:
- Training Data of the Embedding Model
- If the training data consists only of short sentences, there may be loss during the embedding process when inputting long sentences.
- It's advantageous if the sentence length is similar to or shorter than that of the embedding model's training data.
- If the input exceeds the context window size defined by the model, the input may be discarded.
- Original Sentence Structure
- If the original data has information like headings or paragraphs, it's good to preserve this as much as possible when splitting.
Due to the reasons above, <br>
tags included in the preprocessing process were changed to line breaks, and unnecessary spaces before and after \n
(newlines) were removed. To distinguish paragraphs, we set paragraph_separator
to \n\n
to utilize the separation information from the original as much as possible.
from llama_index.core import Document from llama_index.core.node_parser import SentenceSplitter def _split_chunks( doc_id: str, text: str, field_name: str = "text", chunk_size: int = 1024, chunk_overlap: int = 40, ) -> list[dict[str, any]]: text = _EMPTY_LINE_PATTERN.sub("", text) spliter = SentenceSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, paragraph_separator="\n\n", ) doc = Document(id_=doc_id, text=text) chunks = spliter.get_nodes_from_documents([doc]) result = [] for i, chunk in enumerate(chunks): item = { "doc_id": doc_id, "sent_id": i, field_name: chunk.get_content(), "start_char_index": chunk.start_char_idx, "end_char_index": chunk.end_char_idx, } result.append(item) return result
By proceeding as explained, we obtained the following results. The time below can be reduced by using a more specialized GPU.
- Time Taken: 9h 51m 59.5s, using MPS
- Data File: GZip-compressed pickle, 2.5GB
- 87,491 case documents
- 435,263 separated main text sentences
3. Indexing
Data storage and indexing are not significantly different from the previous project. In the _insert
function, the demo.law
collection is stored with doc_id
as the primary key, and the main text stored in demo.law.content
is stored with doc_id
and sent_id
as the primary key. A simple preprocessing step that converts np.ndarray
to list
is included during the storage process.
from pathlib import Path import click import numpy as np import pandas as pd from aeca import Channel, DocumentDB from tqdm import tqdm from utils import get_logger logger = get_logger(__file__) _COLLECTION_NAME = "demo.law" _AECA_HOST = "localhost" _AECA_PORT = 10080 # 중략 def _insert( docdb: DocumentDB, collection_name: str, files: list[Path], pk: list[str], embed_fields: list[str] | None = None, ) -> None: if collection_name in docdb.list_collections(): docdb.drop_collection(collection_name) indexes = [ { "fields": pk, "unique": True, "index_type": "kPrimaryKey", }, ] docdb.create_collection(collection=collection_name, indexes=indexes) for file in tqdm(files): df = pd.read_pickle(file, compression="gzip") if embed_fields: for field in embed_fields: df[field] = df[field].apply( lambda embed: ( [float(x) for x in embed] if isinstance(embed, (np.ndarray, np.generic, list)) else None ) ) data = df.to_dict(orient="records") docdb.insert(collection_name, data) @click.command() @click.option( "--data_path", type=Path, help="Data path", default="data/embedding/multilingual-e5-large", ) def main(data_path): collection_name = _COLLECTION_NAME content_collection_name = f"{_COLLECTION_NAME}.content" channel = Channel(_AECA_HOST, _AECA_PORT) docdb = DocumentDB(channel) all_files = sorted(data_path.glob("*.pk")) files = [x for x in all_files if "content" not in x.name] content_files = [x for x in all_files if "content" in x.name] _insert( docdb, collection_name, files, ["doc_id"], ["judgment_summary_embed", "holding_statement_embed"], ) _insert( docdb, content_collection_name, content_files, ["doc_id", "sent_id"], ["content_embed"], ) _create_index(docdb, collection_name) _create_content_index(docdb, content_collection_name)
The following is the index creation code. In _create_index
, you can see that judgment_summary_embed
and holding_statement_embed
have been added. In _create_content_index
, the index corresponding to the main text is being created.
_DEFAULT_ANALYZER = { "analyzer": {"type": "standard_cjk", "options": {"tokenizer": "mecab"}}, "index_options": "offsets", } _INT_ANALYZER = {"analyzer": {"type": "int64"}, "index_options": "doc_freqs"} _KEYWORD_ANALYZER = { "analyzer": {"type": "keyword"}, "index_options": "doc_freqs", } _DATETIME_ANALYZER = { "analyzer": {"type": "datetime"}, "index_options": "doc_freqs", } _HNSW_OPTIONS = { "index_type": "HNSW", "dims": 1024, "m": 64, "ef_construction": 100, "ef_search": 32, "metric": "inner_product", "normalize": True, "shards": 1, } _HNSW_ANALYZER = { "analyzer": { "type": "DenseVectorAnalyzer", "options": _HNSW_OPTIONS, }, "index_options": "doc_freqs", } def _create_index(docdb: DocumentDB, collection_name: str) -> None: index = { "index_name": "sk_fts", "fields": [ "doc_id", "name", "number", "judgment_date", "judgment", "court_name", "court_type_code", "type_name", "type_code", "judgment_type", "holding_statement", "holding_statement_embed", "judgment_summary", "judgment_summary_embed", "reference_provisions", "reference_cases", "content", ], "index_type": "kFullTextSearchIndex", "unique": False, "options": { "doc_id": _INT_ANALYZER, "name": _DEFAULT_ANALYZER, "number": _KEYWORD_ANALYZER, "judgment_date": _DATETIME_ANALYZER, "judgment": _KEYWORD_ANALYZER, "court_name": _KEYWORD_ANALYZER, "court_type_code": _KEYWORD_ANALYZER, "type_name": _KEYWORD_ANALYZER, "type_code": _KEYWORD_ANALYZER, "judgment_type": _KEYWORD_ANALYZER, "holding_statement": _DEFAULT_ANALYZER, "holding_statement_embed": _HNSW_ANALYZER, "judgment_summary": _DEFAULT_ANALYZER, "judgment_summary_embed": _HNSW_ANALYZER, "reference_provisions": _DEFAULT_ANALYZER, "reference_cases": _DEFAULT_ANALYZER, "content": _DEFAULT_ANALYZER, }, } docdb.create_index(collection_name, **index) def _create_content_index(docdb: DocumentDB, collection_name: str) -> None: index = { "index_name": "sk_fts", "fields": [ "doc_id", "sent_id", "content", "content_embed", ], "index_type": "kFullTextSearchIndex", "unique": False, "options": { "doc_id": _INT_ANALYZER, "sent_id": _INT_ANALYZER, "content": _DEFAULT_ANALYZER, "content_embed": _HNSW_ANALYZER, }, } docdb.create_index(collection_name, **index)
This work yielded the following results. We can see that the stored raw data is similar in size to the compressed GZip file. By using the quantization option of the DenseVectorAnalyzer, we can further improve storage capacity and speed.
- Data File: GZip-compressed pickle, 2.5GB
- Data Input: 14m 49.4s
- demo.law: 967.5MB
- demo.law.content: 2.3GB
- Indexing: 33.6m
- demo.law: 3.7GB, 9.4m
- demo.law.content: 8GB, 24.2m
4. Demo Development
The demo was configured with slight modifications from Making Case Law Data Quickly Searchable. The following corresponds to the API code in Next.js.
import { StatusCode, statusSuccess } from "@_api/_lib/status" import { COLLECTION_COLUMNS, SearchResult } from "@_api/law/_lib/document" import config from "@_app/config" import { Channel, DocumentDB, SentenceTransformer } from "@aeca/client" import { NextResponse } from "next/server" import { NextRequest } from "next/server" const _COLLECTION = "demo.law" const _COLLECTION_CONTENT = "demo.law.content" const _COLLECTION_COLUMNS = COLLECTION_COLUMNS.filter( (x) => !x.endsWith("_embed"), ) const _SEARCH_LIMIT = 10000 export async function GET( request: NextRequest, ): Promise<NextResponse<SearchResult>> { const channel = new Channel(config.host, config.port) const docdb = new DocumentDB(channel) const model = new SentenceTransformer(channel, config.model) const searchParams = request.nextUrl.searchParams const query = searchParams.get("query") const filterQuery = searchParams.get("filter") const queryWithFilter = filterQuery ? `(${query}) AND (${filterQuery})` : query if (!query) { return NextResponse.json( { code: StatusCode.INVALID_ARGUMENT, message: "query is empty" }, { status: 500 }, ) } const queryEmbed = await model.encode([query]) const embedQueryString = queryEmbed[0].data.join(",") const embedFields = ["judgment_summary_embed", "holding_statement_embed"] const embedFieldsQuery = embedFields .map((x) => `${x}:[${embedQueryString}]`) .join(" OR ") const aql = [ { $search: { query: `(${queryWithFilter}) AND (${embedFieldsQuery})^10`, highlight: true, limit: _SEARCH_LIMIT, }, $project: [..._COLLECTION_COLUMNS, "_meta", "_highlights"], }, { $join: { type: "inner", collection: _COLLECTION_CONTENT, query: [ { $search: { query: `(${query}) AND (content_embed:[${embedQueryString}])^10`, highlight: true, limit: _SEARCH_LIMIT, }, }, { $project: [ "doc_id", { _meta: "passage._meta" }, { _highlights: "passage._highlights" }, { sent_id: "passage.sent_id" }, { content: "passage.content" }, { start_char_index: "passage.start_char_index" }, { end_char_index: "passage.end_char_index" }, ], }, ], on: ["doc_id"], }, }, { $limit: 30, }, ] const findStartTime = performance.now() const df = await docdb.find(_COLLECTION, aql) const findElapsedTime = performance.now() - findStartTime return NextResponse.json({ ...statusSuccess, docs: df?.data || [], info: { findElapsedTime: findElapsedTime, }, }) }
Looking at it step by step, the case law search is configured to allow users to select cases using filters separately from queries, as shown below. Therefore, we assign separate variables to distinguish between the user query (query
) and the filter (filterQuery
). This is to extract the embedding vector only from the user query in the next step.
We assign queryWithFilter
by merging the filter and the user query into one. This variable becomes the query for FTS afterward.
const queryWithFilter = filterQuery ? `(${query}) AND (${filterQuery})` : query
Then, we use Aeca's ML model serving feature to convert query
into a vector and change it into a string. At this time, we do not pass the filter information (filterQuery
) received above.
const model = new SentenceTransformer(channel, config.model) const queryEmbed = await model.encode([query]) const embedQueryString = queryEmbed[0].data.join(",") const embedFields = ["judgment_summary_embed", "holding_statement_embed"] const embedFieldsQuery = embedFields .map((x) => `${x}:[${embedQueryString}]`) .join(" OR ")
Now, embeddingFieldsQuery
contains the following content.
judgment_summary_embed:[0.024,0.0031, ...] OR holding_statement_embed:[0.024,0.0031,...]
We merge the FTS query queryWithFilter
and embedFieldsQuery
and give weight to the vector search using the Boosting operator ^
. This weight is one of the variables that must be experimentally determined depending on the form of the service.
const aql = [ { $search: { query: `(${queryWithFilter}) AND (${embedFieldsQuery})^10`, highlight: true, limit: _SEARCH_LIMIT, }, $project: [..._COLLECTION_COLUMNS, "_meta", "_highlights"], }, ... ]
Next, we write the query to search in the main text. The query
remains the same, and in $project
, we convert it into nested fields like passage._highlights
before merging, structurally changing it to prevent duplication in the same column. To simplify data processing, we use inner
in $join
to ensure that both the main text of the demo.law.content
collection and other fields of demo.law
must be satisfied simultaneously.
This part can be simplified if the separated main text is stored in the same collection as a two-dimensional array, and we will cover this in the future.
{ $join: { type: "inner", collection: _COLLECTION_CONTENT, query: [ { $search: { query: `(${query}) AND (content_embed:[${embedQueryString}])^10`, highlight: true, limit: _SEARCH_LIMIT, }, }, { $project: [ "doc_id", { _meta: "passage._meta" }, { _highlights: "passage._highlights" }, { sent_id: "passage.sent_id" }, { content: "passage.content" }, { start_char_index: "passage.start_char_index" }, { end_char_index: "passage.end_char_index" }, ], }, ], on: ["doc_id"], }, },
Conclusion
With this implementation, we can now search cases using natural language queries like Can someone be punished for refusing a test after drunk driving?. However, this requires the cost of embedding sentences or paragraphs and storing them separately. Compared to the case where we only used FTS, the advantages and disadvantages are clear. It's necessary to choose and enhance the appropriate method for each service.
As seen in the example above, there are no separate constraints in defining one or more fields as embedding fields and merging them with FTS to calculate scores. Moreover, it operates organically within a single system without separating the DBMS for data storage, the search engine for handling FTS, and the vector DB for vector search. This can be a significant advantage in experimentation, cost, and operation.
Getting to Know Aeca
If you'd like to explore Aeca further, you can easily install it with Docker and start using it right away. For more detailed explanations about adopting Aeca or to request a product brochure, please contact us via Customer Support.
Read more
Making Case Law Data Quickly Searchable
Explains the process of downloading case law data and building a case law search service in just one day using Aeca.
By Aeca Team|2024-06-21
Applying Natural Language Search to Product Search
We explain the process of data collection and processing, search, and service development for product search using Aeca. Learn how to index when structured and unstructured data are mixed, and how to transform queries for search using LLM.
By Aeca Team|2024-06-12