Golang任務
更新時間:
Golang應用依賴Go版本SDK,即可接入SchedulerX,定時調度您的方法。由于Go使用協程運行任務,所以不支持停止任務。
任務類型
單機任務
編寫業務代碼,實現Processor
接口。
type Processor interface {
Process(ctx *processor.JobContext) (*ProcessResult, error)
}
示例:
package main
import (
"fmt"
"github.com/alibaba/schedulerx-worker-go/processor"
"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
"time"
)
var _ processor.Processor = &HelloWorld{}
type HelloWorld struct{}
func (h *HelloWorld) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
fmt.Println("[Process] Start process my task: Hello world!")
// mock execute task
time.Sleep(3 * time.Second)
ret := new(processor.ProcessResult)
ret.SetStatus(processor.InstanceStatusSucceed)
fmt.Println("[Process] End process my task: Hello world!")
return ret, nil
}
廣播任務
支持Java版本廣播分片任務,支持接口如下:
PreProcess:所有worker節點執行Process之前,由master節點執行一次PreProcess。
Process:所有worker節點執行Process,可以返回結果。
PostProcess:所有worker節點執行Process結束后,由master節點執行一次PostProcess,可以獲取所有節點Process的結果。
說明
需要Golang SDK版本為v0.0.2及以上。
示例:
package main
import (
"fmt"
"github.com/alibaba/schedulerx-worker-go/processor"
"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
"github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
"math/rand"
"strconv"
)
type TestBroadcast struct{}
// Process all machines would execute it.
func (t TestBroadcast) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
value := rand.Intn(10)
fmt.Printf("Total sharding num=%d, sharding=%d, value=%d\n", ctx.ShardingNum(), ctx.ShardingId(), value)
ret := new(processor.ProcessResult)
ret.SetSucceed()
ret.SetResult(strconv.Itoa(value))
return ret, nil
}
// PreProcess only one machine will execute it.
func (t TestBroadcast) PreProcess(ctx *jobcontext.JobContext) error {
fmt.Println("TestBroadcastJob PreProcess")
return nil
}
// PostProcess only one machine will execute it.
func (t TestBroadcast) PostProcess(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
fmt.Println("TestBroadcastJob PostProcess")
allTaskResults := ctx.TaskResults()
allTaskStatuses := ctx.TaskStatuses()
num := 0
for key, val := range allTaskResults {
fmt.Printf("%v:%v\n", key, val)
if allTaskStatuses[key] == taskstatus.TaskStatusSucceed {
valInt, _ := strconv.Atoi(val)
num += valInt
}
}
fmt.Printf("TestBroadcastJob PostProcess(), num=%d\n", num)
ret := new(processor.ProcessResult)
ret.SetSucceed()
ret.SetResult(strconv.Itoa(num))
return ret, nil
}
MapReduce任務
支持Java版本的Map模型和MapReduce模型。
說明
需要Golang SDK版本為v0.0.4及以上。
Map任務Demo
編寫業務代碼,實現
MapJobProcessor
。package main import ( "encoding/json" "errors" "fmt" "github.com/alibaba/schedulerx-worker-go/processor" "github.com/alibaba/schedulerx-worker-go/processor/jobcontext" "github.com/alibaba/schedulerx-worker-go/processor/mapjob" "strconv" "time" ) type TestMapJob struct { *mapjob.MapJobProcessor } func (mr *TestMapJob) Kill(jobCtx *jobcontext.JobContext) error { //TODO implement me panic("implement me") } // Process the MapReduce model is used to distributed scan orders for timeout confirmation func (mr *TestMapJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) { var ( num = 10 err error ) taskName := jobCtx.TaskName() if jobCtx.JobParameters() != "" { num, err = strconv.Atoi(jobCtx.JobParameters()) if err != nil { return nil, err } } if mr.IsRootTask(jobCtx) { fmt.Println("start root task") var messageList []interface{} for i := 1; i <= num; i++ { var str = fmt.Sprintf("id_%d", i) messageList = append(messageList, str) } fmt.Println(messageList) return mr.Map(jobCtx, messageList, "Level1Dispatch") } else if taskName == "Level1Dispatch" { var task []byte = jobCtx.Task() var str string err = json.Unmarshal(task, &str) fmt.Printf("str=%s\n", str) time.Sleep(100 * time.Millisecond) fmt.Println("Finish Process...") if str == "id_5" { return processor.NewProcessResult( processor.WithFailed(), processor.WithResult(str), ), errors.New("test") } return processor.NewProcessResult( processor.WithSucceed(), processor.WithResult(str), ), nil } return processor.NewProcessResult(processor.WithFailed()), nil }
在Client中注冊Map任務。
package main import ( "github.com/alibaba/schedulerx-worker-go" "github.com/alibaba/schedulerx-worker-go/processor/mapjob" ) func main() { // This is just an example, the real configuration needs to be obtained from the platform cfg := &schedulerx.Config{ Endpoint: "acm.aliyun.com", Namespace: "433d8b23-xxx-xxx-xxx-90d4d1b9a4af", GroupId: "xueren_sub", AppKey: "xxxxxx", } client, err := schedulerx.GetClient(cfg) if err != nil { panic(err) } task := &TestMapJob{ mapjob.NewMapJobProcessor(), } client.RegisterTask("TestMapJob", task) select {} }
MapReduce任務Demo
編寫業務代碼,實現
MapReduceJobProcessor
。package main import ( "encoding/json" "fmt" "github.com/alibaba/schedulerx-worker-go/processor" "github.com/alibaba/schedulerx-worker-go/processor/jobcontext" "github.com/alibaba/schedulerx-worker-go/processor/mapjob" "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" "strconv" "time" ) type OrderInfo struct { Id string `json:"id"` Value int `json:"value"` } func NewOrderInfo(id string, value int) *OrderInfo { return &OrderInfo{Id: id, Value: value} } type TestMapReduceJob struct { *mapjob.MapReduceJobProcessor } func (mr *TestMapReduceJob) Kill(jobCtx *jobcontext.JobContext) error { //TODO implement me panic("implement me") } // Process the MapReduce model is used to distributed scan orders for timeout confirmation func (mr *TestMapReduceJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) { var ( num = 1000 err error ) taskName := jobCtx.TaskName() if jobCtx.JobParameters() != "" { num, err = strconv.Atoi(jobCtx.JobParameters()) if err != nil { return nil, err } } if mr.IsRootTask(jobCtx) { fmt.Println("start root task, taskId=%d", jobCtx.TaskId()) var orderInfos []interface{} for i := 1; i <= num; i++ { orderInfos = append(orderInfos, NewOrderInfo(fmt.Sprintf("id_%d", i), i)) } return mr.Map(jobCtx, orderInfos, "OrderInfo") } else if taskName == "OrderInfo" { orderInfo := new(OrderInfo) if err := json.Unmarshal(jobCtx.Task(), orderInfo); err != nil { fmt.Printf("task is not OrderInfo, task=%+v\n", jobCtx.Task()) } fmt.Printf("taskId=%d, orderInfo=%+v\n", jobCtx.TaskId(), orderInfo) time.Sleep(1 * time.Millisecond) return processor.NewProcessResult( processor.WithSucceed(), processor.WithResult(strconv.Itoa(orderInfo.Value)), ), nil } return processor.NewProcessResult(processor.WithFailed()), nil } func (mr *TestMapReduceJob) Reduce(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) { allTaskResults := jobCtx.TaskResults() allTaskStatuses := jobCtx.TaskStatuses() count := 0 fmt.Printf("reduce: all task count=%d\n", len(allTaskResults)) for key, val := range allTaskResults { if key == 0 { continue } if allTaskStatuses[key] == taskstatus.TaskStatusSucceed { num, err := strconv.Atoi(val) if err != nil { return nil, err } count += num } } fmt.Printf("reduce: succeed task count=%d\n", count) return processor.NewProcessResult( processor.WithSucceed(), processor.WithResult(strconv.Itoa(count)), ), nil }
在Client中注冊MapReduce任務。
package main import ( "github.com/alibaba/schedulerx-worker-go" "github.com/alibaba/schedulerx-worker-go/processor/mapjob" ) func main() { // This is just an example, the real configuration needs to be obtained from the platform cfg := &schedulerx.Config{ Endpoint: "acm.aliyun.com", Namespace: "433d8b23-xxx-xxx-xxx-90d4d1b9a4af", GroupId: "xueren_sub", AppKey: "xxxxxx", } client, err := schedulerx.GetClient(cfg) if err != nil { panic(err) } task := &TestMapReduceJob{ mapjob.NewMapReduceJobProcessor(), } client.RegisterTask("TestMapReduceJob", task) select {} }
如何停止任務
Go processor通過協程運行任務,無法直接停止協程的執行。為此,我們提供KillProcessor接口,用戶可自行實現Kill方法停止Processor。
說明
需要Golang SDK版本為v1.0.2及以上。
示例:
package main
import (
"fmt"
"time"
"github.com/alibaba/schedulerx-worker-go/processor"
"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
)
var _ processor.Processor = &HelloWorld{}
type HelloWorld struct{}
var Stop = false
func (h *HelloWorld) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
fmt.Println("[Process] Start process my task: Hello world!")
// mock execute task
for i := 0; i < 10; i++ {
fmt.Printf("Hello%d\n", i)
time.Sleep(2 * time.Second)
if Stop {
break
}
}
ret := new(processor.ProcessResult)
ret.SetSucceed()
fmt.Println("[Process] End process my task: Hello world!")
return ret, nil
}
func (h *HelloWorld) Kill(ctx *jobcontext.JobContext) error {
fmt.Println("[Kill] Start kill my task: Hello world!")
Stop = true
return nil
}
相關文檔
文檔內容是否對您有幫助?