2024-10-28 11:34:41 -05:00
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"os"
|
|
|
|
"sort"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/google/uuid"
|
2024-10-31 14:00:43 -05:00
|
|
|
"github.com/sirupsen/logrus"
|
2024-10-28 11:34:41 -05:00
|
|
|
)
|
|
|
|
|
|
|
|
// Job represents an OCR job
|
|
|
|
type Job struct {
|
|
|
|
ID string
|
|
|
|
DocumentID int
|
|
|
|
Status string // "pending", "in_progress", "completed", "failed"
|
|
|
|
Result string // OCR result or error message
|
|
|
|
CreatedAt time.Time
|
|
|
|
UpdatedAt time.Time
|
|
|
|
PagesDone int // Number of pages processed
|
|
|
|
}
|
|
|
|
|
|
|
|
// JobStore manages jobs and their statuses
|
|
|
|
type JobStore struct {
|
|
|
|
sync.RWMutex
|
|
|
|
jobs map[string]*Job
|
|
|
|
}
|
|
|
|
|
|
|
|
var (
|
2024-10-31 14:00:43 -05:00
|
|
|
logger = logrus.New()
|
|
|
|
|
2024-10-28 11:34:41 -05:00
|
|
|
jobStore = &JobStore{
|
|
|
|
jobs: make(map[string]*Job),
|
|
|
|
}
|
|
|
|
jobQueue = make(chan *Job, 100) // Buffered channel with capacity of 100 jobs
|
|
|
|
)
|
|
|
|
|
2024-10-31 14:00:43 -05:00
|
|
|
func init() {
|
|
|
|
|
|
|
|
// Initialize logger
|
|
|
|
logger.SetOutput(os.Stdout)
|
|
|
|
logger.SetFormatter(&logrus.TextFormatter{
|
|
|
|
FullTimestamp: true,
|
|
|
|
})
|
|
|
|
logger.SetLevel(logrus.InfoLevel)
|
|
|
|
logger.WithField("prefix", "OCR_JOB")
|
|
|
|
}
|
|
|
|
|
2024-10-28 11:34:41 -05:00
|
|
|
func generateJobID() string {
|
|
|
|
return uuid.New().String()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (store *JobStore) addJob(job *Job) {
|
|
|
|
store.Lock()
|
|
|
|
defer store.Unlock()
|
|
|
|
job.PagesDone = 0 // Initialize PagesDone to 0
|
|
|
|
store.jobs[job.ID] = job
|
2024-10-31 14:00:43 -05:00
|
|
|
logger.Infof("Job added: %v", job)
|
2024-10-28 11:34:41 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
func (store *JobStore) getJob(jobID string) (*Job, bool) {
|
|
|
|
store.RLock()
|
|
|
|
defer store.RUnlock()
|
|
|
|
job, exists := store.jobs[jobID]
|
|
|
|
return job, exists
|
|
|
|
}
|
|
|
|
|
|
|
|
func (store *JobStore) GetAllJobs() []*Job {
|
|
|
|
store.RLock()
|
|
|
|
defer store.RUnlock()
|
|
|
|
|
|
|
|
jobs := make([]*Job, 0, len(store.jobs))
|
|
|
|
for _, job := range store.jobs {
|
|
|
|
jobs = append(jobs, job)
|
|
|
|
}
|
|
|
|
|
|
|
|
sort.Slice(jobs, func(i, j int) bool {
|
|
|
|
return jobs[i].CreatedAt.After(jobs[j].CreatedAt)
|
|
|
|
})
|
|
|
|
|
|
|
|
return jobs
|
|
|
|
}
|
|
|
|
|
|
|
|
func (store *JobStore) updateJobStatus(jobID, status, result string) {
|
|
|
|
store.Lock()
|
|
|
|
defer store.Unlock()
|
|
|
|
if job, exists := store.jobs[jobID]; exists {
|
|
|
|
job.Status = status
|
|
|
|
if result != "" {
|
|
|
|
job.Result = result
|
|
|
|
}
|
|
|
|
job.UpdatedAt = time.Now()
|
2024-10-31 14:00:43 -05:00
|
|
|
logger.Infof("Job status updated: %v", job)
|
2024-10-28 11:34:41 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (store *JobStore) updatePagesDone(jobID string, pagesDone int) {
|
|
|
|
store.Lock()
|
|
|
|
defer store.Unlock()
|
|
|
|
if job, exists := store.jobs[jobID]; exists {
|
|
|
|
job.PagesDone = pagesDone
|
|
|
|
job.UpdatedAt = time.Now()
|
2024-10-31 14:00:43 -05:00
|
|
|
logger.Infof("Job pages done updated: %v", job)
|
2024-10-28 11:34:41 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func startWorkerPool(app *App, numWorkers int) {
|
|
|
|
for i := 0; i < numWorkers; i++ {
|
|
|
|
go func(workerID int) {
|
2024-10-31 14:00:43 -05:00
|
|
|
logger.Infof("Worker %d started", workerID)
|
2024-10-28 11:34:41 -05:00
|
|
|
for job := range jobQueue {
|
2024-10-31 14:00:43 -05:00
|
|
|
logger.Infof("Worker %d processing job: %s", workerID, job.ID)
|
2024-10-28 11:34:41 -05:00
|
|
|
processJob(app, job)
|
|
|
|
}
|
|
|
|
}(i)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func processJob(app *App, job *Job) {
|
|
|
|
jobStore.updateJobStatus(job.ID, "in_progress", "")
|
|
|
|
|
|
|
|
ctx := context.Background()
|
|
|
|
|
2025-01-06 16:03:41 -06:00
|
|
|
fullOcrText, err := app.ProcessDocumentOCR(ctx, job.DocumentID)
|
2024-10-28 11:34:41 -05:00
|
|
|
if err != nil {
|
2025-01-06 16:03:41 -06:00
|
|
|
logger.Errorf("Error processing document OCR for job %s: %v", job.ID, err)
|
|
|
|
jobStore.updateJobStatus(job.ID, "failed", err.Error())
|
2024-10-28 11:34:41 -05:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
jobStore.updateJobStatus(job.ID, "completed", fullOcrText)
|
2024-10-31 14:00:43 -05:00
|
|
|
logger.Infof("Job completed: %s", job.ID)
|
2024-10-28 11:34:41 -05:00
|
|
|
}
|