1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
use jfs;

use std::path;

use sda_protocol::Id;
use sda_protocol::{AgentId, ClerkingJob, ClerkingJobId, ClerkingResult, SnapshotId};

use stores::{BaseStore, ClerkingJobsStore};
use jfs_stores::JfsStoreExt;

use SdaServerResult;

pub struct JfsClerkingJobsStore(path::PathBuf);

impl JfsClerkingJobsStore {
    pub fn new<P: AsRef<path::Path>>(prefix: P) -> SdaServerResult<JfsClerkingJobsStore> {
        Ok(JfsClerkingJobsStore(prefix.as_ref().to_path_buf()))
    }

    fn store<I: Id>(&self, prefix: &str, id: &I) -> SdaServerResult<jfs::Store> {
        Ok(jfs::Store::new(self.0
            .join(prefix)
            .join(id.to_string())
            .to_str()
            .ok_or("pathbuf to string")?)?)
    }
}

impl BaseStore for JfsClerkingJobsStore {
    fn ping(&self) -> SdaServerResult<()> {
        Ok(())
    }
}

impl ClerkingJobsStore for JfsClerkingJobsStore {
    fn enqueue_clerking_job(&self, job: &ClerkingJob) -> SdaServerResult<()> {
        self.store("queue", &job.clerk)?.create(job)
    }

    fn poll_clerking_job(&self, clerk: &AgentId) -> SdaServerResult<Option<ClerkingJob>> {
        Ok(self.store("queue", clerk)?.all::<ClerkingJob>()?.into_iter().next().map(|a| a.1))
    }

    fn get_clerking_job(&self,
                        clerk: &AgentId,
                        job: &ClerkingJobId)
                        -> SdaServerResult<Option<ClerkingJob>> {
        self.store("queue", clerk)?.get_option(job)
    }

    fn create_clerking_result(&self, result: &ClerkingResult) -> SdaServerResult<()> {
        let job: ClerkingJob = self.store("queue", &result.clerk)?
            .get_option(&result.job)?
            .ok_or("Job not found")?;
        self.store("results", &job.snapshot)?.upsert_with_id(result, &result.job)?;
        self.store("done", &result.clerk)?.upsert_with_id(&job, &result.job)?;
        self.store("queue", &result.clerk)?.delete(&*result.job.to_string())?;
        Ok(())
    }

    fn list_results(&self, snapshot: &SnapshotId) -> SdaServerResult<Vec<ClerkingJobId>> {
        Ok(self.store("results", snapshot)?
            .all::<ClerkingResult>()?
            .iter()
            .map(|r| r.1.job)
            .collect::<Vec<ClerkingJobId>>())
    }

    fn get_result(&self,
                  snapshot: &SnapshotId,
                  job: &ClerkingJobId)
                  -> SdaServerResult<Option<ClerkingResult>> {
        self.store("results", snapshot)?.get_option(job)
    }
}