Initial commit

This commit is contained in:
2025-04-24 20:54:50 +02:00
commit 240d64023b
13 changed files with 811 additions and 0 deletions
+1
View File
@@ -0,0 +1 @@
# Commands package initialization
+193
View File
@@ -0,0 +1,193 @@
import json
import os
import sys
import re
from pathlib import Path
def run(args):
# If output file is specified, enable single file mode
if args.output_file:
args.single_file = True
convert_json_to_markdown(args.json_file, args)
# OCR response structure classes
class OCRResponseImage:
def __init__(self, id, image_base64):
self.id = id
self.image_base64 = image_base64
class OCRResponsePage:
def __init__(self, index, markdown, image=None, images=None, dimensions=None):
self.index = index
self.markdown = markdown
self.image = image
self.images = images or []
self.dimensions = dimensions
class OCRResponseMetadata:
def __init__(self, title=None, author=None, creation_date=None, page_count=None):
self.title = title
self.author = author
self.creation_date = creation_date
self.page_count = page_count
class OCRResponse:
def __init__(self, pages=None, metadata=None):
self.pages = pages or []
self.metadata = metadata or OCRResponseMetadata()
def replace_image_references(content, images, include_images):
if not include_images or not images:
return content
# Create a map of image IDs to their base64 data
image_map = {}
for img in images:
if img.image_base64:
img_data = img.image_base64
if not img_data.startswith("data:"):
img_data = "data:image/jpeg;base64," + img_data
image_map[img.id] = img_data
# Replace all image references with base64 data
for id, base64_data in image_map.items():
# Escape special characters in the ID for regex
escaped_id = re.escape(id)
pattern = f"!\\[{escaped_id}\\]\\({escaped_id}\\)"
replacement = f"![{id}]({base64_data})"
content = re.sub(pattern, replacement, content)
return content
def convert_json_to_markdown(json_file, args):
try:
# Read JSON file
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Parse JSON into our structure
ocr_response = OCRResponse()
# Parse pages
if "pages" in data:
for page_data in data["pages"]:
page = OCRResponsePage(
index=page_data.get("index", 0),
markdown=page_data.get("markdown", ""),
image=page_data.get("image", "")
)
# Parse images if present
if "images" in page_data:
for img_data in page_data["images"]:
page.images.append(OCRResponseImage(
id=img_data.get("id", ""),
image_base64=img_data.get("image_base64", "")
))
ocr_response.pages.append(page)
# Parse metadata
if "metadata" in data:
metadata = data["metadata"]
ocr_response.metadata = OCRResponseMetadata(
title=metadata.get("title", ""),
author=metadata.get("author", ""),
creation_date=metadata.get("creation_date", ""),
page_count=metadata.get("page_count", 0)
)
# Create output directory if it doesn't exist
os.makedirs(args.output_dir, exist_ok=True)
if args.single_file:
# Process all pages into a single markdown file
combined = []
title = "Document"
# Use metadata title if available
if ocr_response.metadata.title:
title = ocr_response.metadata.title
elif args.title_from_filename:
# Use filename without extension
title = Path(json_file).stem
combined.append(f"# {title}\n")
# Add metadata if available
if (ocr_response.metadata.author or
ocr_response.metadata.creation_date or
ocr_response.metadata.page_count):
combined.append("## Document Metadata\n")
if ocr_response.metadata.author:
combined.append(f"**Author:** {ocr_response.metadata.author}\n")
if ocr_response.metadata.creation_date:
combined.append(f"**Creation Date:** {ocr_response.metadata.creation_date}\n")
if ocr_response.metadata.page_count:
combined.append(f"**Page Count:** {ocr_response.metadata.page_count}\n")
combined.append("\n")
# Process each page
for i, page in enumerate(ocr_response.pages):
# Add page header
combined.append(f"## Page {page.index + 1}\n")
# Convert page images to OCRResponseImage format
page_images = [OCRResponseImage(img.id, img.image_base64) for img in page.images]
# Replace image references in markdown content if includeImages is true
page_content = page.markdown
if args.images:
page_content = replace_image_references(page_content, page_images, args.images)
# Add page content
combined.append(page_content)
combined.append("\n")
# Add page separator if not the last page
if args.page_breaks and i < len(ocr_response.pages) - 1:
combined.append("\n---\n")
# Write combined markdown file
# Use custom filename if provided, otherwise use default
filename = "document.md"
if args.output_file:
# If output_file contains directory components, ensure they exist
output_path = Path(args.output_dir) / args.output_file
output_path.parent.mkdir(parents=True, exist_ok=True)
filename = args.output_file
else:
output_path = Path(args.output_dir) / filename
with open(output_path, 'w', encoding='utf-8') as f:
f.write("\n".join(combined))
print(f"Created single markdown file: {output_path}")
else:
# Process each page into a separate file
for page in ocr_response.pages:
# Use page index as the filename
filename = f"{page.index}.md"
output_path = Path(args.output_dir) / filename
# Convert page images to OCRResponseImage format
page_images = [OCRResponseImage(img.id, img.image_base64) for img in page.images]
# Get page content with image references replaced if needed
markdown_content = page.markdown
if args.images:
markdown_content = replace_image_references(markdown_content, page_images, args.images)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(markdown_content)
print(f"Created markdown file: {output_path}")
print(f"Successfully converted {json_file} to markdown files in {args.output_dir}/")
print(f"Total pages: {len(ocr_response.pages)}")
except Exception as e:
print(f"Error converting JSON to markdown: {e}", file=sys.stderr)
sys.exit(1)
+44
View File
@@ -0,0 +1,44 @@
import os
import sys
import tempfile
from pathlib import Path
from mistral_ocr.commands import process, convert
def run(args):
# Ensure that if --images is true, include_image_base64 is also true
include_image_base64 = args.images
# If output file is specified, enable single file mode
if args.output_file:
args.single_file = True
# Create temporary file for JSON output if not specified
json_output_path = args.json_file
temp_file = None
if not json_output_path:
temp_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
json_output_path = temp_file.name
temp_file.close()
try:
# Step 1: Process the document
if args.file_or_url.startswith(("http://", "https://")):
process.process_url(args.file_or_url, json_output_path, include_image_base64)
else:
process.process_local_file(args.file_or_url, json_output_path, include_image_base64)
# Step 2: Convert the JSON to markdown
print("Converting JSON to Markdown...")
convert.convert_json_to_markdown(json_output_path, args)
except Exception as e:
print(f"Error processing and converting document: {e}", file=sys.stderr)
sys.exit(1)
finally:
# Clean up temporary file if we created one
if temp_file and not args.json_file:
try:
os.unlink(temp_file.name)
except:
pass
+89
View File
@@ -0,0 +1,89 @@
import os
import json
import sys
from pathlib import Path
import urllib.parse
from mistral_ocr.client import MistralClient
def run(args):
file_path = args.file
# Determine if input is a URL or a local file
if file_path.startswith(("http://", "https://")):
process_url(file_path, args.output_file, args.include_images)
else:
process_local_file(file_path, args.output_file, args.include_images)
def process_url(url, output_file, include_image_base64):
try:
client = MistralClient()
# Determine the document type based on URL
doc_type = "document_url"
url_lower = url.lower()
if any(url_lower.endswith(ext) for ext in [".jpg", ".jpeg", ".png", ".webp", ".gif"]):
doc_type = "image_url"
# Process the document
resp_data = client.process_ocr(doc_type, url, include_image_base64)
# Handle the output
handle_output(resp_data, output_file)
except Exception as e:
print(f"Error processing document: {e}", file=sys.stderr)
sys.exit(1)
def process_local_file(file_path, output_file, include_image_base64):
try:
print(f"Processing local file: {file_path}")
# Check if file exists
if not os.path.exists(file_path):
print(f"Error: file '{file_path}' does not exist", file=sys.stderr)
sys.exit(1)
client = MistralClient()
# Upload the file to Mistral API
file_id = client.upload_file(file_path)
print(f"File uploaded successfully with ID: {file_id}")
# Get the signed file URL for processing
file_url = client.get_file_url(file_id)
# Determine the document type based on file extension
doc_type = "document_url"
file_path_lower = file_path.lower()
if any(file_path_lower.endswith(ext) for ext in [".jpg", ".jpeg", ".png", ".webp", ".gif"]):
doc_type = "image_url"
print(f"Processing with signed file URL (type: {doc_type})")
# Process the uploaded file with the appropriate type
resp_data = client.process_ocr(doc_type, file_url, include_image_base64)
# Handle the output
handle_output(resp_data, output_file)
except Exception as e:
print(f"Error processing document: {e}", file=sys.stderr)
sys.exit(1)
def handle_output(data, output_file):
# Pretty print the JSON response
pretty_json = json.dumps(json.loads(data), indent=2)
# Write to output file or stdout
if output_file:
# Create directory if it doesn't exist
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
# Write the file
with open(output_file, 'w', encoding='utf-8') as f:
f.write(pretty_json)
print(f"OCR results saved to {output_file}")
else:
# Write to stdout
print(pretty_json)
+6
View File
@@ -0,0 +1,6 @@
import sys
VERSION = "0.1.0"
def run(args):
print(f"Mistral OCR CLI v{VERSION}")