자연어로 판례 데이터 검색하기

Posted on July 4, 2024
자연어로 판례 데이터 검색하기

들어가기

검색은 사용자의 의도를 이해하고 많은 데이터 속에서 관련된 정보를 찾아내는 것입니다. 의도를 이해하는 것은 늘 어려운 일입니다. 생각에서 언어로, 언어에서 다시 생각으로, 이 과정에서 정보 손실이 발생할 수밖에 없고 오해와 왜곡을 발생시킵니다. 그래서 하나의 단어, 문장으로 생각을 온전히 전달한다는 것은 불가능한 일일지도 모릅니다.

우리는 정보를 찾을 때 기술적인 한계 때문에 키워드로 정보를 찾아왔습니다. 이 방법은 Full-Text Search(이하 FTS)가 입력된 질의를 토큰으로 나누고 저장된 문서에서 토큰이 일치하는 문서들을 찾기 때문이며, 이러한 방법으로 온전히 생각을 전달하기 힘들기 때문에 "AND", "OR" 등 다양한 문법을 추가하며 의도를 파악하고자 노력했습니다. 물론 이 방법은 대부분 효율적으로 동작하고 적당히 만족스러운 결과를 보여주기도 합니다. 그러나 우리의 언어는 맥락에 따라 그 의미를 달리하며, FTS에서 불용어에 가깝게 다루는 조사에 따라서 그 의미가 반전되기도 합니다. 이것이 우리가 자연스러운 "자연어"로 검색하고자 하는 욕망을 버릴 수 없는 이유일 것 같습니다. 이러한 욕구는 단어 혹은 단락을 최소한의 정보 손실로 벡터로 표현하는 벡터 임베딩 기술의 발전과 여기에 HNSW 같은 벡터를 검색하는 기술이 합쳐지면서 벡터 서치라는 방법이 나오게 되었습니다. 이 기술로 기존의 키워드 검색이 감지하지 못했던 맥락상 의미를 조금 더 잘 파악하게 되었습니다. 물론 이 방법도 완벽하지 않고 FTS와는 서로 다른 특성의 장단점을 가지고 있습니다. 그래서 현재는 이 둘을 함께 사용하는 Hybrid 검색 방식을 선호하고 있습니다.

이 글은 FTS로 검색 시스템을 구현했던 판례 데이터를 빠르게 검색 가능한 상태로 만들기를 발전시켜 앞서 설명했던 벡터 혹은 Hybrid 검색을 활용해 자연어로 검색하는 서비스를 어떻게 구축하는지에 대한 내용을 다룹니다. 관련된 데모는 판례 검색 데모에서 확인하실 수 있습니다.

검색 시스템의 수정

이번 작업 내용은 이전 작업을 기반으로 합니다. 내용은 1. 데이터 준비, 2. 데이터 인코딩, 3. 색인, 4. 데모 개발 순서로 설명합니다. 아래에 기술되는 테스트는 MacBook Pro M3 Max, 36GB에서 진행했습니다.

1. 데이터 준비

법제처에서 제공하는 판례 데이터는 이전 작업에서 전처리된 데이터를 그대로 사용합니다.

2. 데이터 인코딩

벡터 임베딩 과정은 상품 검색에 자연어 검색을 적용하기와 유사하게 진행하며 모델은 동일하게 intfloat/multilingual-e5-large를 사용합니다.

전체적인 코드는 아래와 같이 기존 전처리된 데이터를 읽고 _encode를 호출하여 임베딩 벡터를 추출하고 저장하는 코드로 구성되어 있습니다.

import re from pathlib import Path import click import pandas as pd from sentence_transformers import SentenceTransformer from tqdm import tqdm from utils import get_logger logger = get_logger(__file__) _MODEL_PATH = "intfloat/multilingual-e5-large" # 1024, 560M _CHUNK_SIZE = 512 _BATCH_SIZE = 16 _POOL_SIZE = 1 _EMPTY_LINE_PATTERN = re.compile(r"^\s+$", re.MULTILINE) # 중략 def _chunkify(df: pd.DataFrame, chunk_size: int): return [df.iloc[i : i + chunk_size] for i in range(0, len(df), chunk_size)] @click.command() @click.option( "--data_path", type=Path, help="Path of docs files", default="data/docs" ) def main(data_path): model_name = _MODEL_PATH.rsplit("/", maxsplit=1)[-1] output_path = Path(f"data/embedding/{model_name}") output_path.mkdir(exist_ok=True, parents=True) model = SentenceTransformer(_MODEL_PATH, device="mps") print(f"Dims: {model.get_sentence_embedding_dimension()}") pool = model.start_multi_process_pool( [f"mps:{id}" for id in range(_POOL_SIZE)] ) files = sorted(data_path.glob("*.pk")) for i, file in enumerate(files): logger.info("encoding docs: %s", file.name) df = pd.read_pickle(file, compression="gzip") chunks = _chunkify(df, _CHUNK_SIZE) for j, chunk in enumerate(tqdm(chunks)): output_file = output_path / f"docs_{i:02d}_{j:02d}.pk" _encode(model, pool, chunk, output_file) if __name__ == "__main__": main()

전처리된 데이터(df)는 다음과 같은 형식으로 저장되어 있습니다.

demo.law Collection

_encode 함수는 텍스트 데이터인 판결요지(judgment_summary), 판시사항(holding_statement)를 각각 하나의 벡터(judgment_summary_embed, holding_statement_embed)로 출력 후 저장합니다. 이 두 필드에 구성된 텍스트 데이터의 길이 편차가 있지만 편의상 하나의 벡터로 출력했습니다.

demo.law.content Collection

여기에 추가로 본문은 장문이기 때문에 content_split_chunks를 통해 여러 문장으로 분리하고 독립된 컬렉션에 저장하기 위해 별도의 파일로 저장합니다. 이는 하나 이상으로 분리된 본문이 동일한 컬렉션에 데이터 중복 없이 저장하는 것이 불가능하기 때문입니다.

2차원 임베딩 벡터가 저장된 데이터를 색인하여 이와 같이 컬렉션을 분리하지 않고 컬렉션에 통합하여 저장하는 방법은 우리는 왜 벡터 검색을 위해 2차원 벡터를 저장하게 되었나?에서 확인하실 수 있습니다.

def _encode( model: SentenceTransformer, pool: dict[str, any], df: pd.DataFrame, output_path: Path, ): judgment_embed = model.encode_multi_process( df.judgment_summary.tolist(), pool, batch_size=_BATCH_SIZE ) df_judgment_embed = pd.Series( list(judgment_embed), index=df.index, name="judgment_summary_embed" ) holding_embed = model.encode_multi_process( df.holding_statement.tolist(), pool, batch_size=_BATCH_SIZE ) df_holding_embed = pd.Series( list(holding_embed), index=df.index, name="holding_statement_embed" ) df = pd.concat( [df, df_judgment_embed, df_holding_embed], axis=1, ) df.to_pickle(output_path, compression="gzip") content_data = df.apply( lambda x: _split_chunks(x.doc_id, x.content, field_name="content"), axis=1, ).sum() df_content = pd.DataFrame(content_data) content_embed = model.encode_multi_process( df_content.content.tolist(), pool, batch_size=_BATCH_SIZE ) df_content_embed = pd.Series( list(content_embed), index=df_content.index, name="content_embed" ) df_content = pd.concat([df_content, df_content_embed], axis=1) df_content.to_pickle( output_path.with_suffix(".content.pk"), compression="gzip" )

_split_chunks는 다음과 같이 passage(문장 조각)를 나누기 위해 llama_indexSentenceSplitter를 사용했습니다. 임베딩을 위해 최선의 문장 분리 방법은 없으나 다음을 고려해 볼 수 있습니다.

  • 임베딩 모델의 학습 데이터
    • 학습 데이터가 짧은 문장으로만 구성되어 있다면 긴 문장의 입력 시 임베딩 과정에서 손실이 있을 수 있습니다.
    • 문장의 길이는 임베딩 모델의 학습 데이터의 구성과 유사하거나 작을수록 유리합니다.
    • 모델에서 정의한 context window 크기를 초과하면 입력이 버려질 수 있습니다.
  • 원본의 문장 구조
    • 원본 데이터에 제목(heading), 단락 등의 정보가 있다면 이 내용을 최대한 살려 분리하는 것이 좋습니다.

위와 같은 이유로 전처리 과정에 포함된 <br> 태그는 개행으로 변경하고 \n, 개행 전후의 불필요한 공백을 제거했습니다. 그리고 단락 구별을 위해 paragraph_separator\n\n로 전달하여 가능한 원본의 분리 정보를 활용했습니다.

from llama_index.core import Document from llama_index.core.node_parser import SentenceSplitter def _split_chunks( doc_id: str, text: str, field_name: str = "text", chunk_size: int = 1024, chunk_overlap: int = 40, ) -> list[dict[str, any]]: text = _EMPTY_LINE_PATTERN.sub("", text) spliter = SentenceSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, paragraph_separator="\n\n", ) doc = Document(id_=doc_id, text=text) chunks = spliter.get_nodes_from_documents([doc]) result = [] for i, chunk in enumerate(chunks): item = { "doc_id": doc_id, "sent_id": i, field_name: chunk.get_content(), "start_char_index": chunk.start_char_idx, "end_char_index": chunk.end_char_idx, } result.append(item) return result

위에 설명된 내용을 진행하여 다음과 같은 내용을 얻을 수 있었습니다. 아래의 시간은 조금 더 전문적인 GPU를 사용하여 줄일 수 있습니다.

  • 소요 시간: 9h 51m 59.5s, MPS 사용
  • 데이터 파일: GZip으로 압축된 pickle, 2.5GB
    • 87,491개의 판례 문서
    • 435,263개의 분리된 본문 문장

3. 색인

데이터 저장과 색인은 기존 진행했던 내용과 큰 차이는 없습니다. _insert 함수에서 demo.law 컬렉션은 primary key로 doc_id, 본문을 저장하는 demo.law.content는 primary key로 doc_idsent_id로 저장합니다. 저장 과정에서 np.ndarraylist로 변경하는 간단한 전처리가 포함되어 있습니다.

from pathlib import Path import click import numpy as np import pandas as pd from aeca import Channel, DocumentDB from tqdm import tqdm from utils import get_logger logger = get_logger(__file__) _COLLECTION_NAME = "demo.law" _AECA_HOST = "localhost" _AECA_PORT = 10080 # 중략 def _insert( docdb: DocumentDB, collection_name: str, files: list[Path], pk: list[str], embed_fields: list[str] | None = None, ) -> None: if collection_name in docdb.list_collections(): docdb.drop_collection(collection_name) indexes = [ { "fields": pk, "unique": True, "index_type": "kPrimaryKey", }, ] docdb.create_collection(collection=collection_name, indexes=indexes) for file in tqdm(files): df = pd.read_pickle(file, compression="gzip") if embed_fields: for field in embed_fields: df[field] = df[field].apply( lambda embed: ( [float(x) for x in embed] if isinstance(embed, (np.ndarray, np.generic, list)) else None ) ) data = df.to_dict(orient="records") docdb.insert(collection_name, data) @click.command() @click.option( "--data_path", type=Path, help="Data path", default="data/embedding/multilingual-e5-large", ) def main(data_path): collection_name = _COLLECTION_NAME content_collection_name = f"{_COLLECTION_NAME}.content" channel = Channel(_AECA_HOST, _AECA_PORT) docdb = DocumentDB(channel) all_files = sorted(data_path.glob("*.pk")) files = [x for x in all_files if "content" not in x.name] content_files = [x for x in all_files if "content" in x.name] _insert( docdb, collection_name, files, ["doc_id"], ["judgment_summary_embed", "holding_statement_embed"], ) _insert( docdb, content_collection_name, content_files, ["doc_id", "sent_id"], ["content_embed"], ) _create_index(docdb, collection_name) _create_content_index(docdb, content_collection_name)

다음은 색인 생성 코드이며 _create_index에서 judgment_summary_embed, holding_statement_embed가 추가된 것을 확인하실 수 있습니다. 그리고 _create_content_index에서 본문에 해당하는 색인을 생성하고 있습니다.

_DEFAULT_ANALYZER = { "analyzer": {"type": "standard_cjk", "options": {"tokenizer": "mecab"}}, "index_options": "offsets", } _INT_ANALYZER = {"analyzer": {"type": "int64"}, "index_options": "doc_freqs"} _KEYWORD_ANALYZER = { "analyzer": {"type": "keyword"}, "index_options": "doc_freqs", } _DATETIME_ANALYZER = { "analyzer": {"type": "datetime"}, "index_options": "doc_freqs", } _HNSW_OPTIONS = { "index_type": "HNSW", "dims": 1024, "m": 64, "ef_construction": 100, "ef_search": 32, "metric": "inner_product", "normalize": True, "shards": 1, } _HNSW_ANALYZER = { "analyzer": { "type": "DenseVectorAnalyzer", "options": _HNSW_OPTIONS, }, "index_options": "doc_freqs", } def _create_index(docdb: DocumentDB, collection_name: str) -> None: index = { "index_name": "sk_fts", "fields": [ "doc_id", "name", "number", "judgment_date", "judgment", "court_name", "court_type_code", "type_name", "type_code", "judgment_type", "holding_statement", "holding_statement_embed", "judgment_summary", "judgment_summary_embed", "reference_provisions", "reference_cases", "content", ], "index_type": "kFullTextSearchIndex", "unique": False, "options": { "doc_id": _INT_ANALYZER, "name": _DEFAULT_ANALYZER, "number": _KEYWORD_ANALYZER, "judgment_date": _DATETIME_ANALYZER, "judgment": _KEYWORD_ANALYZER, "court_name": _KEYWORD_ANALYZER, "court_type_code": _KEYWORD_ANALYZER, "type_name": _KEYWORD_ANALYZER, "type_code": _KEYWORD_ANALYZER, "judgment_type": _KEYWORD_ANALYZER, "holding_statement": _DEFAULT_ANALYZER, "holding_statement_embed": _HNSW_ANALYZER, "judgment_summary": _DEFAULT_ANALYZER, "judgment_summary_embed": _HNSW_ANALYZER, "reference_provisions": _DEFAULT_ANALYZER, "reference_cases": _DEFAULT_ANALYZER, "content": _DEFAULT_ANALYZER, }, } docdb.create_index(collection_name, **index) def _create_content_index(docdb: DocumentDB, collection_name: str) -> None: index = { "index_name": "sk_fts", "fields": [ "doc_id", "sent_id", "content", "content_embed", ], "index_type": "kFullTextSearchIndex", "unique": False, "options": { "doc_id": _INT_ANALYZER, "sent_id": _INT_ANALYZER, "content": _DEFAULT_ANALYZER, "content_embed": _HNSW_ANALYZER, }, } docdb.create_index(collection_name, **index)

이 작업은 다음과 같은 결과를 얻을 수 있었습니다. 저장된 원본 데이터는 압축된 GZip과 용량이 비슷함을 알 수 있습니다. 여기서 DenseVectorAnalyzer의 양자화 옵션을 사용하면 저장용량과 속도를 추가적으로 개선할 수 있습니다.

  • 데이터 파일: GZip으로 압축된 pickle, 2.5GB
  • 데이터 입력: 14m 49.4s
    • demo.law: 967.5MB
    • demo.law.content: 2.3GB
  • 색인: 33.6m
    • demo.law: 3.7GB, 9.4m
    • demo.law.content: 8GB, 24.2m

4. 데모 개발

데모는 판례 데이터를 빠르게 검색 가능한 상태로 만들기에서 조금 수정하여 구성했습니다. 다음 Next.js의 API에 대응하는 코드입니다.

import { StatusCode, statusSuccess } from "@_api/_lib/status" import { COLLECTION_COLUMNS, SearchResult } from "@_api/law/_lib/document" import config from "@_app/config" import { Channel, DocumentDB, SentenceTransformer } from "@aeca/client" import { NextResponse } from "next/server" import { NextRequest } from "next/server" const _COLLECTION = "demo.law" const _COLLECTION_CONTENT = "demo.law.content" const _COLLECTION_COLUMNS = COLLECTION_COLUMNS.filter( (x) => !x.endsWith("_embed"), ) const _SEARCH_LIMIT = 10000 export async function GET( request: NextRequest, ): Promise<NextResponse<SearchResult>> { const channel = new Channel(config.host, config.port) const docdb = new DocumentDB(channel) const model = new SentenceTransformer(channel, config.model) const searchParams = request.nextUrl.searchParams const query = searchParams.get("query") const filterQuery = searchParams.get("filter") const queryWithFilter = filterQuery ? `(${query}) AND (${filterQuery})` : query if (!query) { return NextResponse.json( { code: StatusCode.INVALID_ARGUMENT, message: "query is empty" }, { status: 500 }, ) } const queryEmbed = await model.encode([query]) const embedQueryString = queryEmbed[0].data.join(",") const embedFields = ["judgment_summary_embed", "holding_statement_embed"] const embedFieldsQuery = embedFields .map((x) => `${x}:[${embedQueryString}]`) .join(" OR ") const aql = [ { $search: { query: `(${queryWithFilter}) AND (${embedFieldsQuery})^10`, highlight: true, limit: _SEARCH_LIMIT, }, $project: [..._COLLECTION_COLUMNS, "_meta", "_highlights"], }, { $join: { type: "inner", collection: _COLLECTION_CONTENT, query: [ { $search: { query: `(${query}) AND (content_embed:[${embedQueryString}])^10`, highlight: true, limit: _SEARCH_LIMIT, }, }, { $project: [ "doc_id", { _meta: "passage._meta" }, { _highlights: "passage._highlights" }, { sent_id: "passage.sent_id" }, { content: "passage.content" }, { start_char_index: "passage.start_char_index" }, { end_char_index: "passage.end_char_index" }, ], }, ], on: ["doc_id"], }, }, { $limit: 30, }, ] const findStartTime = performance.now() const df = await docdb.find(_COLLECTION, aql) const findElapsedTime = performance.now() - findStartTime return NextResponse.json({ ...statusSuccess, docs: df?.data || [], info: { findElapsedTime: findElapsedTime, }, }) }

하나씩 살펴보면 판례 검색에는 다음과 같이 질의와 별도로 필터로 판례를 선별할 수 있도록 구성되어 있습니다. 따라서 사용자 질의(query)와 필터(filterQuery)를 구별하기 위해 별도의 변수로 할당합니다. 이는 다음 단계에서 사용자 질의만 임베딩 벡터로 추출하기 위함입니다.

filters

queryWithFilter를 필터와 사용자 쿼리를 하나로 병합하여 할당합니다. 이 변수는 이후 FTS를 위한 질의가 됩니다.

const queryWithFilter = filterQuery ? `(${query}) AND (${filterQuery})` : query

그리고 Aeca의 ML model serving 기능을 활용하여 query를 벡터로 변환합니다. 이후 스트링 형태로 변경합니다. 이때 위에서 전달 받은 필터 정보(filterQuery)는 전달하지 않습니다.

const model = new SentenceTransformer(channel, config.model) const queryEmbed = await model.encode([query]) const embedQueryString = queryEmbed[0].data.join(",") const embedFields = ["judgment_summary_embed", "holding_statement_embed"] const embedFieldsQuery = embedFields .map((x) => `${x}:[${embedQueryString}]`) .join(" OR ")

위 코드를 실행하면 embeddingFieldsQuery 다음과 같은 내용을 가집니다.

judgment_summary_embed:[0.024,0.0031, ...] OR holding_statement_embed:[0.024,0.0031,...]

이제 FTS를 위한 질의 queryWithFilterembedFieldsQuery를 병합하고 Boosting 연산자 ^를 통해 벡터 검색에 가중치를 부여합니다. 이 가중치는 서비스의 형태에 따라 실험적으로 찾아야 하는 변수중 하나입니다.

const aql = [ { $search: { query: `(${queryWithFilter}) AND (${embedFieldsQuery})^10`, highlight: true, limit: _SEARCH_LIMIT, }, $project: [..._COLLECTION_COLUMNS, "_meta", "_highlights"], }, ... ]

이제 본문에서 검색할 질의를 작성합니다. query는 동일하며 $project에서 병합하기 전 passage._highlights와 같은 형식으로 nested fields로 변환하여 구조적으로 변경하고 동일 칼럼에 중복이 발생되지 않도록 합니다. 데이터 처리를 간단히 하기 위해 $join에서 innerdemo.law.content 컬렉션의 본문과 demo.law의 다른 필드가 동시에 만족해야 하는 조건으로 구성되어 있습니다.

이는 분리된 본문이 2차원 배열로 같은 컬렉션에 저장되면 단순화할 수 있는 부분이며 향후에 다시 다루도록 하겠습니다.

{ $join: { type: "inner", collection: _COLLECTION_CONTENT, query: [ { $search: { query: `(${query}) AND (content_embed:[${embedQueryString}])^10`, highlight: true, limit: _SEARCH_LIMIT, }, }, { $project: [ "doc_id", { _meta: "passage._meta" }, { _highlights: "passage._highlights" }, { sent_id: "passage.sent_id" }, { content: "passage.content" }, { start_char_index: "passage.start_char_index" }, { end_char_index: "passage.end_char_index" }, ], }, ], on: ["doc_id"], }, },

나가기

이제 이러한 구현으로 음주 운전을 했으나 검사를 거부한 경우 처벌 가능한가?과 같은 자연어 질의로 판례를 검색할 수 있게 되었습니다. 그러나 문장 혹은 단락을 임베딩하는 비용과 이를 별도로 저장하는 비용이 필요하게 됩니다. FTS만 사용했던 사례와 비교하면 장단점이 명확해 보입니다. 각 서비스에 맞는 적절한 방법을 선택하여 고도화하는 전략이 필요합니다.

위의 사례에서 볼 수 있듯 하나 이상의 필드를 임베딩 필드로 정의하고 FTS와 병합하여 점수를 계산하는 과정에서 별도의 제약 조건이 없다는 것을 확인할 수 있습니다. 그리고 데이터 저장을 위한 DBMS, FTS를 처리하기 위한 검색 엔진, 벡터 검색을 위한 벡터 DB의 분리 없이 하나의 시스템에서 유기적으로 동작합니다. 이는 실험과 비용/운영에 큰 장점이 될 수 있습니다.

Aeca 만나보기

Aeca를 더 사용해 보고 싶다면 Docker로 간단히 설치하여 바로 사용할 수 있습니다. Aeca 도입에 대한 더 자세한 설명이나 제품 소개서 요청은 고객 지원에서 문의해 주세요.

함께 보면 좋을 글

판례 데이터를 빠르게 검색 가능한 상태로 만들기

판례 데이터를 다운로드 받고 Aeca를 통해 판례 검색 서비스를 하루만에 구축한 과정을 설명합니다.

By Aeca Team|2024-06-21

상품 검색에 자연어 검색을 적용하기

Aeca를 활용하여 상품 검색을 위한 데이터 수집 및 가공, 검색과 서비스 개발 과정을 설명합니다. 정형, 비정형 데이터가 혼합되어 있을 때 어떻게 색인하고 LLM을 활용하여 어떻게 쿼리를 변환하여 검색하는지를 알아봅니다.

By Aeca Team|2024-06-12

Copyright © 2024 Aeca, Inc.

Made with ☕️ and 😽 in San Francisco, CA.