Skip to content

Commit

Permalink
#461: beginnings of push logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Charlotte committed Jul 5, 2018
1 parent 11d001f commit b6bba1e
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 79 deletions.
202 changes: 123 additions & 79 deletions cmd/dotmesh-server/pkg/main/statemachines.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ func activeState(f *fsMachine) stateFn {

log.Printf("GOT S3 TRANSFER REQUEST %+v", f.lastS3TransferRequest)
if f.lastS3TransferRequest.Direction == "push" {
return pushInitiatorState
return s3PushInitiatorState
} else if f.lastS3TransferRequest.Direction == "pull" {
return s3PullInitiatorState
}
Expand Down Expand Up @@ -1881,6 +1881,41 @@ func TransferPollResultFromTransferRequest(
}
}

func s3PushInitiatorState(f *fsMachine) stateFn {
f.transitionedTo("s3PushInitiatorState", "requesting")
transferRequest := f.lastS3TransferRequest
transferRequestId := f.lastTransferRequestId
pollResult := TransferPollResult{
TransferRequestId: transferRequestId,
Direction: transferRequest.Direction,
InitiatorNodeId: f.state.myNodeId,
Index: 1,
Status: "starting",
}
svc, err := getS3Client(transferRequest)
if err != nil {
f.sendEventUpdateUser(err, "couldnt-create-s3-client", "Couldn't create s3 client - check credentials are correct", transferRequestId, pollResult)
return backoffState
}
latestSnap, err := f.getLastNonMetadataSnapshot()
if err != nil {
f.sendEventUpdateUser(err, "s3-push-initiator-cant-get-snapshot-data", "cant get snapshot data", transferRequestId, pollResult)
return backoffState
}
latestMeta := make(map[string]string)

if latestSnap != nil {
pathToFile := fmt.Sprintf("%s/dm.s3-versions/%s", mnt(f.filesystemId), latestSnap.Id)
if _, err := os.Stat(pathToFile); err == nil {
f.sendEventUpdateUser(&EventArgs{"path": pathToFile}, "commit-already-in-s3", "Found s3 metadata for latest snap - nothing to push!", transferRequestId, pollResult)
return backoffState
} else if !os.IsNotExist(err) {
f.sendEventUpdateUser(err, "couldnt-stat-s3-meta-file", fmt.Sprintf("Could not stat s3 meta file %s", pathToFile), transferRequestId, pollResult)
return backoffState
}
}
return discoveringState
}
func pushInitiatorState(f *fsMachine) stateFn {
// Deduce the latest snapshot in
// f.lastTransferRequest.LocalFilesystemName:LocalCloneName
Expand Down Expand Up @@ -3118,118 +3153,129 @@ func createSubDot(filesystemId, subDir string) (string, error) {
return destPath, nil
}

func updateUser(message, transferRequestId string, pollResult TransferPollResult) {
pollResult.Status = "error"
pollResult.Message = message
return updatePollResult(transferId, pollResult)
}

func sendEvent(f *fsMachine) (params *EventArgs, eventName, loggerString string) {
log.Printf(loggerString)
f.innerResponses <- &Event{
Name: eventName,
Args: params,
}
}

func sendEventUpdateUser(f *fsMachine) (err error, eventName, loggerString, transferRequestId string, pollResult TransferPollResult) {
f.sendEvent(&EventArgs{"err": err}, eventName, loggerString)
err := updateUser(loggerString, transferRequestId, pollResult)
if err != nil {
f.sendEvent(&EventArgs{"err": err}, "cant-write-to-etcd", "Cannot write to etcd")
}
}

func sendEventUpdateUser(f *fsMachine) (args *EventArgs, eventName, loggerString, transferRequestId string, pollResult TransferPollResult) {
f.sendEvent(args, eventName, loggerString)
err := updateUser(loggerString, transferRequestId, pollResult)
if err != nil {
f.sendEvent(&EventArgs{"err": err}, "cant-write-to-etcd", "Cannot write to etcd")
}
}

func getLastNonMetadataSnapshot(f *fsMachine) (*snapshot, error) {
snaps, err := f.state.snapshotsForCurrentMaster(f.filesystemId)
if err != nil {
return err
}
latestMeta := make(map[string]string)
var latestSnap *snapshot
for idx := len(snaps)-1;idx--;idx>0 {
commitType, ok := snaps[idx].Metadata["type"]
if !ok || commitType != "dotmesh.metadata_only" {
latestSnap = &snaps[idx]
}
}
return latestSnap
}

func loadS3Meta(filesystemId, latestSnapId string, latestMeta *map[string]string) error {
pathToCommitMeta := fmt.Sprintf("%s/dm.s3-versions/%s", mnt(f.filesystemId), latestSnap.Id)
data, err := ioutil.ReadFile(pathToCommitMeta)
if err != nil {
return err
}
err = json.Unmarshal(data, &latestMeta)
if err != nil {
return err
}
return nil
}

func s3PullInitiatorState(f *fsMachine) stateFn {
f.transitionedTo("s3PullInitiatorState", "requesting")
containers, err := f.containersRunning()
if err != nil {
log.Printf(
"Can't pull into filesystem while we can't list whether containers are using it",
)
f.innerResponses <- &Event{
Name: "error-listing-containers-during-pull",
Args: &EventArgs{"err": err},
}
f.sendAndLogError(err, "error-listing-containers-during-pull", "Can't list containers running")
return backoffState
}
if len(containers) > 0 {
log.Printf("Can't pull into filesystem while containers are using it")
f.innerResponses <- &Event{
Name: "cannot-pull-while-containers-running",
Args: &EventArgs{"containers": containers},
}
f.sendAndLog(&EventArgs{"containers": containers}, "cannot-pull-while-containers-running", "Can't pull into filesystem while containers are using it")
return backoffState
}
transferRequest := f.lastS3TransferRequest
transferRequestId := f.lastTransferRequestId
pollResult := TransferPollResult{
TransferRequestId: transferRequestId,
Direction: transferRequest.Direction,
InitiatorNodeId: f.state.myNodeId,
Index: 1,
Status: "starting",
}
// TODO pull this out somewhere as I've duplicated this in rpc.go

// create the default paths
destPath, err := createSubDot(f.filesystemId, "__default__")
if err != nil {
f.innerResponses <- &Event{
Name: "cannot-create-default-dir",
Args: &EventArgs{"err": err},
}
f.sendEventUpdateUser(err, "cannot-create-default-dir", "Could not create default directory", transferRequestId, pollResult)
return backoffState
}
svc, err := getS3Client(transferRequest)
if err != nil {
f.innerResponses <- &Event{
Name: "couldnt-create-s3-client",
Args: &EventArgs{"err": err},
}
f.sendEventUpdateUser(err, "couldnt-create-s3-client", "Couldn't create s3 client - check credentials are correct", transferRequestId, pollResult)
return backoffState
}
pollResult := TransferPollResult{
TransferRequestId: transferRequestId,
Direction: transferRequest.Direction,
InitiatorNodeId: f.state.myNodeId,
Index: 1,
Status: "starting",
}

f.lastPollResult = &pollResult
err = updatePollResult(transferRequestId, pollResult)
if err != nil {
f.innerResponses <- &Event{
Name: "s3-pull-initiator-cant-write-to-etcd",
Args: &EventArgs{"err": err},
}
f.sendEvent(err, "s3-pull-initiator-cant-write-to-etcd", "cannot write to etcd")
return backoffState
}

snaps, err := f.state.snapshotsForCurrentMaster(f.filesystemId)
latestMeta := make(map[string]string)
latestSnap, err := f.getLastNonMetadataSnapshot()
if err != nil {
f.innerResponses <- &Event{
Name: "s3-pull-initiator-cant-get-snapshot-data",
Args: &EventArgs{"err": err},
}
f.sendEventUpdateUser(err, "s3-pull-initiator-cant-get-snapshot-data", "cant get snapshot data", transferRequestId, pollResult)
return backoffState
}
latestMeta := make(map[string]string)
var latestSnap *snapshot
for idx := len(snaps)-1;idx--;idx>0 {
commitType, ok := snaps[idx].Metadata["type"]
if !ok || commitType != "metadata-only" {
latestSnap = &snaps[idx]
}
}
}
if latestSnap != nil {
// todo:
// if "type" == "metadata-only" in commit ignore it
// go back to the one before it until we find one that isn't that type


pathToCommitMeta := fmt.Sprintf("%s/dm.s3-versions/%s", mnt(f.filesystemId), latestSnap.Id)
data, err := ioutil.ReadFile(pathToCommitMeta)
err := loadS3Meta(f.filesystemId, latestSnap.Id, &latestMeta)
if err != nil {
message := "s3 pull initiator can't read metadata"
if os.IsNotExist(err) {
pollResult.Status = "error"
pollResult.Message = "Could not read commit s3 metadata - you have changes which have not been pushed to s3."
updatePollResult(transferRequestId, pollResult)
}
f.innerResponses <- &Event{
Name: "s3-pull-initiator-cant-read-metadata",
Args: &EventArgs{"err": err},
}
return backoffState
}
err = json.Unmarshal(data, &latestMeta)
fmt.Printf("serial data: %s", string(data))
fmt.Printf("latest meta read: %#v", latestMeta)
if err != nil {
f.innerResponses <- &Event{
Name: "cannot-unmarshal-metadata-json",
Args: &EventArgs{"err": err},
message = "Could not read commit s3 metadata - you have changes which have not been pushed to s3."
}
f.sendEventUpdateUser(err, "s3-pull-initiator-cant-read-metadata", message, transferRequestId, pollResult)
return backoffState
}
}
bucketChanged, keyVersions, err := downloadS3Bucket(svc, transferRequest.RemoteName, destPath, transferRequestId, &pollResult, latestMeta)
if err != nil {
f.innerResponses <- &Event{
Name: "s3-pull-initiator-cant-pull-from-s3",
Args: &EventArgs{"err": err},
}
f.sendEventUpdateUser(err, "cant-pull-from-s3", "cant pull from s3", transferRequestId, pollResult)
return backoffState
}
if bucketChanged {
Expand All @@ -3253,25 +3299,23 @@ func s3PullInitiatorState(f *fsMachine) stateFn {
pathToCommitMeta := fmt.Sprintf("%s/%s", subPath, snapshotId)
data, err := json.Marshal(keyVersions)
if err != nil {
f.innerResponses <- &Event{
Name: "failed-marshalling-meta-json",
Args: &EventArgs{"err": err},
}
f.sendEventUpdateUser(err, "failed-marshalling-metadata-json", "cant marshal metadata json", transferRequestId, pollResult)
return backoffState
}
err = ioutil.WriteFile(pathToCommitMeta, data, 0600)
if err != nil {
f.innerResponses <- &Event{
Name: "failed-writing-meta-json",
Args: &EventArgs{"err": err},
}
f.sendEventUpdateUser(err, "failed-writing-metadata", "cant write metadata file", transferRequestId, pollResult)
return backoffState
}
response, _ := f.snapshot(&Event{Name: "snapshot",
Args: &EventArgs{"metadata": metadata{"message": "s3 content"},
"snapshotId": snapshotId}})
if response.Name != "snapshotted" {
f.innerResponses <- response
err = updateUser("Could not take snapshot", transferRequestId, pollResult)
if err != nil {
f.sendEvent(&EventArgs{"err": err}, "cant-write-to-etcd", "cant write to etcd")
}
return backoffState
}
}
Expand Down
20 changes: 20 additions & 0 deletions tests/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,24 @@ func TestS3Remote(t *testing.T) {
t.Error("Pull command did not detect extra commits")
}
})

t.Run("Push", func(t *testing.T) {
fsname := citools.UniqName()
citools.RunOnNode(t, node1, "dm clone test-real-s3 test.dotmesh --local-name="+fsname)
citools.RunOnNode(t, node1, citools.DockerRun(fsname)+" touch /foo/committedfile.txt")
citools.TryUntilSucceeds(func() error {
output := citools.OutputFromRunOnNode(t, node1, "dm dot show "+fsname+" -H | grep dirty")
if strings.Contains(output, "\t0") {
return fmt.Errorf("not dirty yet")
}
return nil
}, "waiting for dirty data...")
citools.RunOnNode(t, node1, "dm switch "+fsname)
citools.RunOnNode(t, node1, "dm commit -m 'non-s3 pushed data'")
citools.RunOnNode(t, node1, "dm push test-real-s3 "+fsname)
output := citools.OutputFromRunOnNode(t, node1, s3cmd+" ls s3://test.dotmesh")
if !strings.Contains(output, "committedfile.txt") {
t.Error("Did not push to s3")
}
})
}

0 comments on commit b6bba1e

Please sign in to comment.