How to connect to Cloudflare R2, fetch files, and send them to Anaplan using the Anaplan Bulk API. Based on the ITINT anaplan-worker service and shared Go libraries.
The pattern is always the same 3-step flow:
This is orchestrated by Temporal workflows running in Kubernetes.
R2 uses the S3-compatible API. The shared library at go.cfdata.org/itint/pkg/r2 wraps the AWS SDK v2 for Go.
| Env Var | Description | Source |
|---|---|---|
R2_ACCOUNT_ID | Cloudflare account ID | Vault |
R2_ACCESS_KEY_ID | S3-compatible access key | Vault |
R2_ACCESS_KEY_SECRET | S3-compatible secret key | Vault |
R2_RETRY_TIME_BETWEEN_RETRIES_IN_SECONDS | Retry interval (e.g. 5s) | Helm |
R2_RETRY_MAX_ATTEMPTS | Max retries (e.g. 3) | Helm |
The endpoint format is https://{ACCOUNT_ID}.r2.cloudflarestorage.com with region "auto".
import (
"go.cfdata.org/itint/pkg/r2"
"go.cfdata.org/itint/pkg/retry"
)
// 1. Load config from environment variables
r2Conf, err := r2.NewConf() // reads R2_ACCOUNT_ID, R2_ACCESS_KEY_ID, R2_ACCESS_KEY_SECRET
// 2. Create retry handler
retryHandler := retry.NewHandler(retry.Conf{
RetryWait: r2Conf.R2RetryWaitInterval,
RetryMax: r2Conf.R2RetryMaxAttempts,
})
// 3. Create the R2 client (with optional metrics/tracing)
r2Client, err := r2.NewClient(r2Conf, retryHandler,
r2.WithMetrics(metricsClient), // optional: Prometheus metrics
r2.WithTracing(otelTracer), // optional: OpenTelemetry tracing
)
// 4. Wrap in a Handler for Sentry error reporting
r2Handler := r2.NewHandler(r2Client, sentryReporter)
Under the hood, the client constructs an AWS S3 client on each operation:
// From pkg/r2/client.go
cfg, err := config.LoadDefaultConfig(ctx,
config.WithCredentialsProvider(
credentials.NewStaticCredentialsProvider(accessKeyID, accessKeySecret, ""),
),
config.WithRegion("auto"),
)
client := s3.NewFromConfig(cfg, func(o *s3.Options) {
o.BaseEndpoint = &c.r2BaseEndpoint // https://{accountID}.r2.cloudflarestorage.com
})
// Fetch a file (returns []byte)
fileBytes, err := r2Handler.FetchFile(ctx, "path/to/file.csv", "my-bucket")
// List files with prefix filter
files, err := r2Handler.ListFiles(ctx, r2.ListObjectsInput{
BucketName: "my-bucket",
Prefix: "folder/prefix/",
})
// files is []r2.File{ {FileName: "...", LastModifiedDate: time.Time{}} }
// Upload a file
err := r2Handler.UploadFile(ctx,
"upload-folder", // folder within bucket
fileBytes, // []byte content
"my-bucket", // bucket name
"output.csv", // filename (path becomes "upload-folder/output.csv")
r2.PushObjectOptions{ContentType: "text/csv"},
)
// Delete a file
err := r2Handler.DeleteFile(ctx, "my-bucket", "path/to/file.csv")
The shared library at go.cfdata.org/itint/pkg/anaplan handles authentication and the Anaplan REST API.
| Env Var | Description | Source |
|---|---|---|
ANAPLAN_CERTIFICATE | CA certificate for Anaplan auth | Vault |
ANAPLAN_ENCODED_STRING | Encoded data for cert auth | Vault |
ANAPLAN_ENCODED_SIGNED_STRING | Signed auth data | Vault |
ANAPLAN_AUTH_URL | Auth endpoint | https://auth.anaplan.com/token/authenticate |
ANAPLAN_WORKSPACE | Workspace ID | Anaplan admin panel |
ANAPLAN_MODEL | Model ID | Anaplan admin panel |
ANAPLAN_INSTANCE_URL | API base URL | https://api.anaplan.com/2/0 |
MAX_RETRIES | Max polling retries | Helm (e.g. 5) |
SLEEP_INTERVAL | Seconds between polls | Helm (e.g. 12) |
Anaplan uses certificate-based authentication, not OAuth2.
POST https://auth.anaplan.com/token/authenticate
Headers:
Authorization: CACertificate {certificate_value}
Content-Type: application/json
Body:
{ "encodedData": "...", "encodedSignedData": "..." }
Response (201 Created):
{ "tokenInfo": { "tokenValue": "eyJhbGci..." } }
All subsequent API calls use: Authorization: AnaplanAuthToken {tokenValue}
From the session library (pkg/anaplan/session/session.go):
func authorize(ctx context.Context, auth Auth) (*AuthResponse, int, error) {
reqBody := AuthBody{
EncodedSignedData: auth.EncodedSignedData,
EncodedData: auth.EncodedData,
}
payload, _ := json.Marshal(reqBody)
request, _ := http.NewRequestWithContext(ctx, http.MethodPost, auth.URL,
bytes.NewReader(payload))
certAuth := fmt.Sprintf("CACertificate %s", auth.Certificate)
request.Header.Set("Content-Type", "application/json")
request.Header.Set("authorization", certAuth)
// ... executes request, expects 201 Created
}
import (
"go.cfdata.org/itint/pkg/anaplan"
anaplanClient "go.cfdata.org/itint/pkg/anaplan/client"
"go.cfdata.org/itint/pkg/anaplan/session"
)
// 1. Load config from env vars
anaplanConf, err := anaplan.NewConf()
// 2. Create the low-level client
client, err := anaplanClient.NewClient(anaplanClient.Configuration{
Client: &http.Client{Timeout: 30 * time.Second},
WorkspaceID: anaplanConf.WorkspaceID,
ModelID: anaplanConf.ModelID,
InstanceURL: anaplanConf.InstanceURL, // https://api.anaplan.com/2/0
Auth: session.Auth{
Certificate: anaplanConf.Certificate,
EncodedData: anaplanConf.EncodedString,
EncodedSignedData: anaplanConf.EncodedSignedString,
URL: anaplanConf.AuthURL,
},
})
// 3. Create the high-level handler
anaplanHandler := anaplan.NewHandler(sentryReporter, client, metricsHandler, *anaplanConf)
The upload is a single PUT with the raw file bytes (no chunking):
PUT https://api.anaplan.com/2/0/workspaces/{workspaceID}/models/{modelID}/files/{fileID}
Headers:
Authorization: AnaplanAuthToken {token}
Content-Type: application/octet-stream
Body: <raw file bytes>
Expected: 204 No Content
fileID is a pre-existing file definition in Anaplan. You must create the file data source in the Anaplan model first, then use its ID here.
Using the handler:
uploadInput := anaplanClient.UploadInput{
File: fileBytes, // []byte from R2
FileName: "departments.csv", // for logging
FileID: "<ANAPLAN_FILE_ID>", // Anaplan file ID (pre-configured in model)
}
err := anaplanHandler.UploadFile(ctx, uploadInput, "my-integration-name")
The handler internally calls OpenSession() (authenticates) then Upload().
After uploading the file, trigger an Anaplan process that imports the data:
POST https://api.anaplan.com/2/0/workspaces/{wsID}/models/{modelID}/processes/{processID}/tasks/
Headers:
Authorization: AnaplanAuthToken {token}
Content-Type: application/json
Body: { "localeName": "en_US" }
Response (200 OK):
{ "task": { "taskId": "abc123", "taskState": "NOT_STARTED" } }
Then poll until complete:
GET .../processes/{processID}/tasks/{taskID}
Response: { "task": { "taskState": "COMPLETE", "currentStep": "Complete." } }
Task state machine: NOT_STARTED → IN_PROGRESS → COMPLETE (or CANCELLED)
Using the handler (does start + poll automatically):
err := anaplanHandler.ImportAndWaitForCompletion(ctx, processID, "my-integration-name")
This method:
StartProcess to create a taskGetTaskInfo every SLEEP_INTERVAL seconds (default 12s)MAX_RETRIES times (default 5)WorkflowError if any rows failed/were invalidThe internal/processor/processor.go provides UploadFileToAnaplanFromR2 — the core bridge that combines R2 fetch + Anaplan upload into a single operation:
import "go.cfdata.org/itint/anaplan-worker/internal/processor"
// Create the processor (bridges R2 and Anaplan)
proc, err := processor.NewProcessor(anaplanHandler, r2Handler, metricsHandler)
// Use it -- fetches from R2 and uploads to Anaplan in one call
recordCount, err := proc.UploadFileToAnaplanFromR2(ctx, processor.Config{
AnaplanFileID: "<ANAPLAN_FILE_ID>", // Anaplan file ID
R2BucketName: "my-r2-bucket", // R2 bucket
R2FileName: "folder/data_202461.csv", // file path in R2
IntegrationName: "my-integration-name",
})
Here's the canonical 2-activity workflow used by most integrations:
func (w MyWorkflow) MyDataToAnaplan(ctx workflow.Context) error {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
NonRetryableErrorTypes: []string{"NonRetryableError"},
MaximumAttempts: 5,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
// Step 1: Construct dated filename
y, m, d := time.Now().UTC().Date()
r2FileName := fmt.Sprintf("%s/mydata_%d%d%d.csv", conf.R2Folder, y, int(m), d)
// Step 2: Activity -- Fetch from R2 + Upload to Anaplan (via processor)
var recordsCounter int
err := workflow.ExecuteActivity(ctx, w.processor.UploadFileToAnaplanFromR2,
processor.Config{
AnaplanFileID: conf.AnaplanFileID,
R2BucketName: conf.R2Bucket,
R2FileName: r2FileName,
IntegrationName: "my-integration",
}).Get(ctx, &recordsCounter)
if err != nil {
return err
}
// Step 3: Activity -- Trigger Anaplan import + poll until done
err = workflow.ExecuteActivity(ctx,
w.anaplanHandler.ImportAndWaitForCompletion,
conf.AnaplanProcessID,
"my-integration",
).Get(ctx, nil)
if err != nil {
return err
}
return nil
}
Store secrets in your team's Vault path. The anaplan-worker uses:
| Key | Purpose |
|---|---|
ANAPLAN_CERTIFICATE | Anaplan CA certificate |
ANAPLAN_ENCODED_STRING | Encoded auth data |
ANAPLAN_ENCODED_SIGNED_STRING | Signed auth data |
R2_ACCESS_KEY | R2 S3-compatible access key |
R2_SECRET_KEY | R2 S3-compatible secret key |
# R2 connection
R2_ACCOUNT_ID: "<YOUR_CLOUDFLARE_ACCOUNT_ID>"
R2_RETRY_TIME_BETWEEN_RETRIES_IN_SECONDS: "5s"
R2_RETRY_MAX_ATTEMPTS: "3"
# Anaplan connection
ANAPLAN_AUTH_URL: "https://auth.anaplan.com/token/authenticate"
ANAPLAN_INSTANCE_URL: "https://api.anaplan.com/2/0"
ANAPLAN_WORKSPACE: "<YOUR_WORKSPACE_ID>"
ANAPLAN_MODEL: "<YOUR_MODEL_ID>"
MAX_RETRIES: "5"
SLEEP_INTERVAL: "12"
# Per-integration IDs (from Anaplan admin)
ANAPLAN_FILE_ID: "<FILE_ID_FROM_ANAPLAN>"
ANAPLAN_PROCESS_ID: "<PROCESS_ID_FROM_ANAPLAN>"
R2_BUCKET: "<YOUR_BUCKET_NAME>"
R2_FOLDER: "<FOLDER_PATH_IN_BUCKET>"
Add these to your go.mod:
require (
go.cfdata.org/itint/pkg/r2 latest
go.cfdata.org/itint/pkg/anaplan latest
go.cfdata.org/itint/pkg/authorization latest
go.cfdata.org/itint/pkg/retry latest
go.cfdata.org/itint/pkg/reporters latest
go.cfdata.org/itint/pkg/metrics latest
)
GONOSUMCHECK and GONOSUMDB for go.cfdata.org/*, and set GOPRIVATE=go.cfdata.org.
| Metric | Type | Labels | Package |
|---|---|---|---|
itint_r2_errors_total | Counter | operation, error | pkg/r2 |
itint_r2_operations_latency | Histogram | operation, result | pkg/r2 |
itint_anaplan_errors_total | Counter | endpoint, method, code | pkg/anaplan/client |
itint_anaplan_operations_latency | Histogram | endpoint, method, result | pkg/anaplan/client |
itint_anaplan_file_imported | Gauge | integration | pkg/anaplan |
itint_anaplan_total_rows | Gauge | integration | pkg/anaplan |
itint_anaplan_failed_rows | Gauge | integration | pkg/anaplan |
itint_anaplan_invalid_rows | Gauge | integration | pkg/anaplan |
itint_anaplan_total_warnings | Gauge | integration | pkg/anaplan |
Both R2 and Anaplan handlers accept a reporters.Reporter interface:
reporter.ReportEvent(reporters.ReportingEvent{
Err: err,
Tags: map[string]string{"severity": "1", "step": "Fetch file from R2"},
Context: reporters.Context{
Key: "File",
Value: map[string]interface{}{"path": filePath, "bucket": bucketName},
},
})
fileID and processID.pkg/r2, pkg/anaplan, pkg/retry to your go.mod.processor.UploadFileToAnaplanFromR2 + ImportAndWaitForCompletion.