paperless-gpt/jobs.go
2025-01-06 23:03:41 +01:00

135 lines
2.9 KiB
Go

package main
import (
"context"
"os"
"sort"
"sync"
"time"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
)
// Job represents an OCR job
type Job struct {
ID string
DocumentID int
Status string // "pending", "in_progress", "completed", "failed"
Result string // OCR result or error message
CreatedAt time.Time
UpdatedAt time.Time
PagesDone int // Number of pages processed
}
// JobStore manages jobs and their statuses
type JobStore struct {
sync.RWMutex
jobs map[string]*Job
}
var (
logger = logrus.New()
jobStore = &JobStore{
jobs: make(map[string]*Job),
}
jobQueue = make(chan *Job, 100) // Buffered channel with capacity of 100 jobs
)
func init() {
// Initialize logger
logger.SetOutput(os.Stdout)
logger.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
})
logger.SetLevel(logrus.InfoLevel)
logger.WithField("prefix", "OCR_JOB")
}
func generateJobID() string {
return uuid.New().String()
}
func (store *JobStore) addJob(job *Job) {
store.Lock()
defer store.Unlock()
job.PagesDone = 0 // Initialize PagesDone to 0
store.jobs[job.ID] = job
logger.Infof("Job added: %v", job)
}
func (store *JobStore) getJob(jobID string) (*Job, bool) {
store.RLock()
defer store.RUnlock()
job, exists := store.jobs[jobID]
return job, exists
}
func (store *JobStore) GetAllJobs() []*Job {
store.RLock()
defer store.RUnlock()
jobs := make([]*Job, 0, len(store.jobs))
for _, job := range store.jobs {
jobs = append(jobs, job)
}
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].CreatedAt.After(jobs[j].CreatedAt)
})
return jobs
}
func (store *JobStore) updateJobStatus(jobID, status, result string) {
store.Lock()
defer store.Unlock()
if job, exists := store.jobs[jobID]; exists {
job.Status = status
if result != "" {
job.Result = result
}
job.UpdatedAt = time.Now()
logger.Infof("Job status updated: %v", job)
}
}
func (store *JobStore) updatePagesDone(jobID string, pagesDone int) {
store.Lock()
defer store.Unlock()
if job, exists := store.jobs[jobID]; exists {
job.PagesDone = pagesDone
job.UpdatedAt = time.Now()
logger.Infof("Job pages done updated: %v", job)
}
}
func startWorkerPool(app *App, numWorkers int) {
for i := 0; i < numWorkers; i++ {
go func(workerID int) {
logger.Infof("Worker %d started", workerID)
for job := range jobQueue {
logger.Infof("Worker %d processing job: %s", workerID, job.ID)
processJob(app, job)
}
}(i)
}
}
func processJob(app *App, job *Job) {
jobStore.updateJobStatus(job.ID, "in_progress", "")
ctx := context.Background()
fullOcrText, err := app.ProcessDocumentOCR(ctx, job.DocumentID)
if err != nil {
logger.Errorf("Error processing document OCR for job %s: %v", job.ID, err)
jobStore.updateJobStatus(job.ID, "failed", err.Error())
return
}
jobStore.updateJobStatus(job.ID, "completed", fullOcrText)
logger.Infof("Job completed: %s", job.ID)
}