Async Recursion with backoff

It’s been a while since I published anything. More than three years! A lot of things happened since then. The most relevant to mention in the beginning of this post is that I have been super busy building a lot of cool tech with a very talented team here at EPAM Anywhere. We are doing full-stack Typescript with next.js and native AWS serverless services and can’t get enough of it. This experience has been challenging me to learn new things every day and I have a lot to share!

Today I want to show you one particular technique that I found super useful when I need to safely use aws-sdk batch APIs and ensure delivery.

When you work with AWS, you will certainly use aws-sdk and APIs of different services. Need to send a message to an SQS queue? That’s an HTTP API call and you will use sdk. Need to update a document in DynamoDB? The same. Need to push a message to the Firehose? The same. Many of these APIs have their batch equivalents:

These batch APIs will throw if something fundamental is wrong. Say your auth is not good or you don’t have enough permissions or you don’t have the connectivity to the service. If the sdk connected successfully to the service but failed to perform some or all of the operations in your batch, the operation won’t throw. It will return an object that tells you which operations succeeded and which ones failed. The most likely reason to get partial failures is due to throttling. All of these APIs have soft and hard limits and sooner or later you will attempt to do more than AWS feels comfortable letting you get away with.

We learned it the hard way. It’s all documented, obviously, but things like this one are only obvious in hindsight. Let me show you a neat technique to batch safely but first, some background.

Recursion

I always liked recursion. When you need to scroll through something that paginates, you can employ while loops or you can recursively repeat and accumulate results as you go. The recursion always felt much cleaner to me but it comes with a gotcha - no stack is infinite. Consider the following simple example:

1
2
3
4
5
6
7
8
9
10
11
12
13
let iteration = 0;

const op = () => {
++iteration;

return iteration === 100000 ? iteration : op();
};

try {
console.log(op());
} catch (error) {
console.error(iteration);
}

This snippet won’t print 100000. When I run it with node sample.js, I get 15707 printed in the console. Your mileage may vary but you know you can reach the deep end and go no further. The error that I am not reporting is Maximum call stack size exceeded.

Async Recursion

What if op() was performing a network operation? Let’s simulate it and convert op() to an async op():

1
2
3
4
5
6
7
8
9
let iteration = 0;

const op = async () => {
await Promise.resolve(++iteration);

return iteration === 100000 ? iteration : op();
};

op().then(console.log).catch(console.error);

It prints 100000 and we do not exhaust the stack. Let’s understand why and we will be well on our way to leveraging this technique in real world scenarios.

The trick is in how promises (and async functions that return them) use event loop to schedule continuations. I highly recommend this article to get a deeper insight into how it all works. And here’s specifically about promises.

Basically, Promises use micro tasks just like process.nextTick() does and since the callback runs via the event loop, the stack frame is short lived and every recursive invocation has its own.

Let me do the same but this time I will be more explicit:

1
2
3
4
5
6
7
8
9
10
11
let iteration = 0;

const op = () => {
++iteration;

return iteration === 100000
? iteration
: new Promise((resolve) => process.nextTick(() => resolve(op())));
};

op().then(console.log).catch(console.error);

It also prints 100000 but here you can see how I “delay” promise resolution via the callback scheduled on the event loop. It adds one more ingredient that I need to explain.

I am using a trick of promise nesting when I do resolve(op()). When a promise A resolves with a promise B, the result of A is the resolved value of B. Since my op() keeps recursing onto itself, the last promise’s resolved value will be the value returned by the first call to the op().

Async Recursion with backoff

The last thing that I want to illustrate before I show you how I use this technique with aws-sdk APIs is a recursion with a backoff strategy. Take a look:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const { performance } = require("perf_hooks");

let iteration = 0;
const start = performance.now();

const op = () => {
++iteration;;

return iteration === 10
? performance.now() - start
: new Promise((resolve) => setTimeout(() => resolve(op()), iteration));
};

op().then(console.log).catch(console.error);

It prints a value somewhere around 50. The code goes through 10 executions of op() and delays each next run by iteration milliseconds. So +1, then +2, then +3, up to +9 for the last run. We stop when ++iteration is equal to 10 so we only run through 9 via setTimeout(). The sum of the arithmetic progression from 1 to 9 with a step of 1 is 45 but op() doesn’t run exactly at the interval we ask for plus performance.now() isn’t exactly 0ms so let’s call the difference an overhead.

AWS batch APIs with retries

We are now ready to put it all together and employ async recursion with backoff technique with the batch APIs to ensure delivery.

First, the backoff strategies:

1
2
3
4
5
6
7
8
export type BackoffStrategy = (retryCount: number) => number;

export const attemptTimesOneHundredMs: BackoffStrategy = (attempt: number) =>
100 * attempt;

export const attemptSquaredTimesOneHundredMs: BackoffStrategy = (
attempt: number
) => Math.pow(attempt, 2) * 100;

SQS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
interface SQSBatchDeliveryOptions {
readonly sqs: SQS;
readonly queueUrl: string;
readonly retries?: number;
readonly backoff?: BackoffStrategy;
}
/**
* Perform batch write to the SQS. All records that failed will be retried with a backoff.
*
* The method will throw if we failed to deliver the batch after we exhausted the allowed retries.
*
* This factory method returns a reusable function that you can use over and over again if you are sending batches to the same `queue`.
*
* **NOTE** We are not checking any limits imposed by AWS/SQS. As of the time of this writing:
* - no more than 10 messages per batch
* - a message (and a batch as a whole) cannot be larger than 256Kb. We have helpers that do S3 bypass.
* - a message in each batch can only be ASCII with a few exceptions. Base64 encoding is recommended and we have helpers that you can use
*
* @param sqs instantiated sqs client
* @param queueUrl the queue URL to work with
* @param retries how many times to retry. default is 5
* @param backoff backoff strategy that can be calculated based on the attempt number. the default is 100ms * attempt
* @returns the reusable function that you call with your batch details

*/
export const deliverBatchToSqs =
({ sqs, queueUrl, retries = 5, backoff = attemptTimesOneHundredMs }: SQSBatchDeliveryOptions) =>
(batch: SQS.SendMessageBatchRequestEntryList): Promise<void> => {
const run = async (batch: SQS.SendMessageBatchRequestEntryList, attempt: number): Promise<void> => {
if (attempt > retries) {
throw new Error(`Failed to deliver batch after ${attempt} attempts`);
}

const { Failed } = await sqs.sendMessageBatch({ QueueUrl: queueUrl, Entries: batch }).promise();

if (Failed.length > 0) {
const retry = batch.filter((entry) => Failed.find((failed) => failed.Id === entry.Id));
if (retry.length > 0) {
return new Promise((resolve) => setTimeout(() => resolve(run(retry, attempt + 1)), backoff(attempt)));
}
}
};

return run(batch, 1);
};

And then somewhere else in the code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
async dispatch(events: Message[]): Promise<void> {
const batches = batchBy10AndNoMoreThan256KbEach(events);

const deliver = deliverBatchToSqs({ sqs: this.sqs, queueUrl: this.queue, logger });

await Promise.all(
batches.map((batch) =>
deliver(
batch.map((message) => ({
Id: nanoid(),
MessageGroupId: message.groupId,
MessageDeduplicationId: message.key,
MessageBody: message.payload,
}))
)
)
);
}

DynamoDB

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
interface DynamoBatchDeliveryOptions {
readonly db: DocumentClient;
readonly table: string;
readonly retries?: number;
readonly backoff?: BackoffStrategy;
}

/**
* Perform batch write/delete to the DynamoDB. All records that failed will be retried up to five times with a backoff.
* The method will throw if we failed to deliver the batch after the specified number of retries. Default is 5
*
* This factory method returns a reusable function that you can use over and over again if you are sending batches to the same `table`.
*
* **NOTE** We are not checking any limits imposed by AWS/Dynamo. As of the time of this writing:
* - 25 requests per batch
* - max document size is 400Kb including attribute name lengths
* - the total size of items written cannot exceed 16Mb (400x25 is less btw)
*
* @param db instantiated document client
* @param table the name of the DynamoDB table to work with
* @param retries how many times to retry. default is 5
* @param backoff backoff strategy that can be calculated based on the attempt number. the default is 100ms * attempt
* @returns the reusable function that you call with your batch details
*/
export const deliverBatchToDynamo =
({ db, table, retries = 5, backoff = attemptTimesOneHundredMs }: DynamoBatchDeliveryOptions) =>
(batch: DocumentClient.WriteRequests): Promise<void> => {
const run = async (batch: DocumentClient.WriteRequests, attempt: number): Promise<void> => {
if (attempt > retries) {
throw new Error(`Failed to deliver batch after ${attempt} attempts`);
}

const { UnprocessedItems } = await db.batchWrite({ RequestItems: { [table]: batch } }).promise();

if (UnprocessedItems) {
const retry = UnprocessedItems[table];
if (retry?.length) {
return new Promise((resolve) => setTimeout(() => resolve(run(retry, attempt + 1)), backoff(attempt)));
}
}
};

return run(batch, 1);
};

I have to say that we are not using this technique in our request/response APIs. We are pretty serious about building fast and pleasant experiences and so we target sub-half-second for user facing APIs. We use this technique anywhere else though - async step functions, batch operations, code that is responding to external events.

That’s it for today. I hope you found it useful. More to come soon!

OAuth 2 with Passport - 10 Steps Recipe

Recently I found myself integrating OAuth 2 into a React/node.js app. The web is full of blog posts and questions answered on how to do it, but I still had to scratch my head a few times to get everything right. This post is a simple step by step recipe on how to do it with Passport.

Step 1. Express boilerplate

I am using express so first things first:

1
2
3
4
5
6
7
8
const express = require('express');
const cors = require('cors');
const bodyParser = require('body-parser');

const app = express();

app.use(cors());
app.use(bodyParser.urlencoded({ extended: false }));

Step 2. Session middleware first

If you are using sessions and plan on using passport with sessions, make sure that session middleware goes first:

1
2
3
const session = require('express-session');

app.use(session({ ... }));

Step 3. Passport

Now we can wire in passport:

1
2
3
4
const passport = require('passport');

app.use(passport.initialize());
app.use(passport.session());

Step 4. Authentication route

This wasn’t apparent to me from the beginning and from the simple examples that I looked at, but:

passport is designed to only facilitate the authentication process. In other words, the only route that should have passport middleware on it is the /login route where you would send your unauthenticated users to

1
2
3
4
app.get('/login', passport.authenticate('oauth2', {
session: true,
successReturnToOrRedirect: '/'
}));

Actually, you should also add the passport middleware to your callback route or just make /login your OAuth callback. That’s what I did. The OAuth strategy looks at ?code= in the URL to decide whether to initiate the authentication sequence or process the callback:

1
2
3
4
5
6
7
8
9
10
11
12
13
OAuth2Strategy.prototype.authenticate = function(req, options) {
// ...

if (req.query && req.query.error) {
// fail authentication sequence
}

if (req.query && req.query.code) {
// process the callback from the identity provider
} else {
// start the authentication sequence
}
};

Step 5. Protected routes

To protect your routes and ensure authenticated access, you can use something like connect-ensure-login. Passport itself can’t help you with that:

1
2
3
4
const { ensureLoggedIn } = require('connect-ensure-login');
const api = require('./endpoints/api');

app.use(`/api/v1`, ensureLoggedIn('/login'), api);

Step 6. OAuth 2 strategy

To be able to do passport.authenticate('oauth2', {...}) as I showed in step 4, you should set up passport with the OAuth 2 strategy first:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const OAuth2Strategy = require('passport-oauth2');

const tokenToProfile = async () => {} // <-- I will explain in step 9

const strategy = new OAuth2Strategy({
state: true,
authorizationURL: process.env.AUTH_AUTHORIZATION_URL,
tokenURL: process.env.AUTH_TOKEN_URL,
clientID: process.env.AUTH_CLIENT_ID,
clientSecret: process.env.AUTH_CLIENT_SECRET,
callbackURL: process.env.AUTH_CALLBACK_URL,
passReqToCallback: true // <-- I will explain in step 9
}, tokenToProfile);

passport.use(strategy);

Step 7. De/Serialize user

In order not to run the authentication sequence on every request, you would typically store the authenticated user ID in the session and then trust it for as long as the session is active. You need to implement serializeUser and deserializeUser to do it. Passport doesn’t do it automatically:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
passport.serializeUser((user, done) => {
// The user object in the arguments is the result of your authentication process
// (see step 9)

done(null, user);

// If your user object is large or has transient state,
// you may want to only store the user id in the session instead:

// done(null, user.user_id)
});

passport.deserializeUser(async (user, done) => {
// The user object in the arguments is what you have stored in the session

// If you stored the entire user object when you serialized it to session,
// you can skip re-quering your user store on every request

user = await User.getUserByID(user.user_id);

done(null, user);
});

Step 8. Tokens

OAuth 2 can send back access_token and it can also send the id_token. The latter is always a JWT token and the former is typically an opaque string.

Sometimes all you need is the access_token that you pass on to the back-end APIs. I, however, needed to authenticate the user and match the user’s identity with the application’s user record.

Two options:

  • Use the /userinfo endpoint with the access_token to retrieve the profile from your identity provider
  • Ask for the id_token and get profile attributes from there. To receive the id_token in the callback, you need to add scope=openid to your authorization request. If you need user’s email or additional attributes like name, for example, you will need to ask for more scopes (scope=openid email or scope=openid profile).

OAuth 2.0 is not an authentication protocol, apparently. Read the User Authentication article on oauth.net if you want to learn more. The id_token, claims, scopes, and /userinfo are all part of OpenID Connect.

Step 9. Retrieve Profile

When we set up the OAuth 2 strategy in step 6, we had to supply a tokenToProfile callback. If you read the documentation, you will see that it has the following signature:

1
2
3
function (accessToken, refreshToken, profile, cb) {
// Note: no id_token passed in the arguments
}

Don’t be surprised to always receive an empty object in profile:

OAuth 2 strategy for passport does not implement retrieval of the user profile

Here’s how it looks in the library:

1
2
3
4
5
OAuth2Strategy.prototype.userProfile = function(accessToken, done) {
return done(null, {}); // <-- always {}, oops!

// Note: no id_token passed in the arguments
};

You can either override it and use /userinfo endpoint or you can rely on id_token. Here’s how you would do the former:

1
2
3
4
5
const strategy = new OAuth2Strategy({ ... });

strategy.userProfile = function(accessToken, done) {
// access /userinfo with accessToken
}

The latter requires you to not only ask for the id_token from your identity provider using scope=openid, but to also have it exposed to you by the OAuth 2 strategy in passport. To do so, you need to set passReqToCallback to true when you instantiate the strategy (we did in step 6), and then you can use a different signatue for your callback:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const jwt = require('jsonwebtoken');

const tokenToProfile = async (req, accessToken, refreshToken, params, profile, done) => {
const idToken = params['id_token'];

// !!<-- Make sure you validate the token's signature -->!!
// And make sure you handle errors. I simplified the code for the blog post

const employeeID = jwt.decode(idToken)[process.env.AUTH_EMPLOYEE_ID || 'sub'];

profile = await User.getUserByEmployeeID(employeeID);

done(null, profile);
};

Step 10. Logout

The easiest and the most effective way to logout a user is to destroy the session:

1
2
3
router.get('/logout', function (req, res) {
req.session.destroy(() => res.redirect('/'));
});

Step 11 (Bonus). Spoof Authentication

If you have gotten this far, I have a bonus step for you. I found it very helpful to be able to spoof authentication in local environment for development and testing.

First, the environment variable in my .env file to signal that the auth should be bypassed and to tell the app what user to run on behalf of:

1
AUTH_LOCAL_SPOOF_USER={"user_id": 2, "employeeID": "pavel@dontemailme.com", "role_id": 0}

And then a bypass strategy:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const strategy = new OAuth2Strategy({ ... });

if (process.env.AUTH_LOCAL_SPOOF_USER) {
passport.use({
name: 'oauth2',
authenticate: function () {
try {
this.success(JSON.parse(process.env.AUTH_LOCAL_SPOOF_USER));
} catch (error) {
this.error(error);
}
}
});
} else {
passport.use(strategy);
}

And that’s it! Enjoy!

Promiseland and Async/Await Kingdom

Last night I finally got a chance to publish the remaining setup scripts for my E-Commerce Chatbot. A few days ago, I added the script to load up products and variants into Azure Search and now also the catalog and historical transactions for Azure Recommendations. I basically had to script what I originally did as a one-off with curl.

Training the recommender model takes time and when you create a new recommendation build, it won’t be ready right away. I wanted my script to wait and keep polling the API until the training has finished. The whole script is basically a serious of asynchronous HTTP requests so I wired it all up as a chain of promises:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
sdk.model.list()
.then(({ models }) => {
// ...
}).then(() => {
return sdk.model.create(modelName, description);
}).then(() => {
return sdk.upload.catalog(...);
}).then(() => {
return sdk.upload.usage(...);
}).then(() => {
return sdk.build.fbt(...)
}).then(result => {

// <--
// ToDo: need to wait until the training is finished
// <--

}).then(() => {
console.log(`Set RECOMMENDATION_MODEL to ${model.id}`);
console.log(`Set RECOMMENDATION_BUILD to ${build.buildId}`);
}).catch(error => {
console.error(error);
});

You can see the full listing here.

Promiseland

Here’s how I implemented the wait-and-see:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// ...
}).then(build => {
const check = (timeout) => new Promise((resolve, reject) => {
setTimeout(() => sdk.build.get(model.id, build.buildId)
.then(response => {
if (!['NotStarted', 'Running'].includes(response.status)) {
console.log(`Build training finished: ${response.status}`);
resolve();
} else {
console.log(`Training is ${response.status}. Wait 30 seconds...`);
resolve(check(30000));
}
})
.catch(reject), timeout);
});

return check();
}).then(() => {
// ...
});

It’s basically a recursive promise. The function in the main then() will return a promise that will always resolve unless there’s an error, but the key is in what it will resolve with and how it runs. The function that the returned promise is wrapped around schedules itself via setTimeout() and exits the stack frame. Then, when the response is received, it will either resolve and signal that the training has complete, or it will resolve with another promise that will recursively repeat this process again. That another promise will basically insert itself into the main then chain and it will keep waiting until it resolves. Vicious circle.

It worked nicely and I even factored out the repeater so that my code looked like this:

1
2
3
4
5
6
7
8
9
10
11
// ...
}).then(build => {
return repeater.repeat(() => sdk.build.get(model.id, build.buildId), {
delay: 30000,
until: (response) => !['NotStarted', 'Running'].includes(response.status),
done: (response) => console.log(`Build training finished: ${response.status}`),
next: (response, delay) => console.log(`Training is ${response.status}. Wait ${delay / 1000} seconds...`)
});
}).then(() => {
// ...
});

Async/Await Kingdom

I really thought that I was very cleaver but then I decided to rewrite with async/await and run with the latest node that now natively supports it

Here’s what this code became:

1
2
3
4
5
6
7
8
9
10
11
12
let trained = false;
while (!trained) {
let check = await sdk.build.get(model.id, build.buildId);

if (!['NotStarted', 'Running'].includes(check.status)) {
trained = true;
console.log(`Build training finished: ${check.status}`);
} else {
console.log(`Training is ${check.status}. Wait 30 seconds...`);
await new Promise(resolve => setTimeout(resolve, 30000));
}
}

Here. Compare both versions: Original vs. Async/Await

It’s not even funny! The code is so boring now, boring and simple. Just like it should be. No need to be clever and I bet I will know exactly what it’s doing and why when I look at it a year later.

I have officially converted.

Cheers!

My Go-To Scaffold for React + API

A few weeks ago I found myself building a simple app, a prototype actually. It has a nice interface to request that a certain job (or multiple) be executed in the background. It also provides real-time updates about those jobs. Nothing that you can’t do with JavaScript. I quickly settled on a node.js back-end with a React front-end and a socket.io channel in between.

This post is about how I set up my solution and my dev environment to nicely bundle my client and my server together to make everything work smoothly locally (including the compound end-to-end debugging) as well as to be ready for production deployment to heroku.

The overall solution looks like this:

1
2
3
4
5
6
solution/
├── server/
│ ├── package.json
├── client/
│ ├── package.json
└── pacakge.json

The first three things that I did after I created the solution folder were:

1
2
3
& cd solution && npm init
$ create-react-app client
$ mkdir server && cd server && npm init

In development, I would like my client to start up using react-scripts with webpack server on :3000 with hot reloading and other awesomeness. In production, however, my server will be serving up all front-end assets. And it will run on a different port locally when executed side by side with the webpack server. From server/app.js:

1
2
3
4
5
6
7
8
9
10
11
12
const app = express();
app.use(express.static('./client/build'));
app.get('/', function (req, res) {
res.redirect('/index.html');
});

const http = require('http').Server(app);
const io = require('socket.io')(http, { path: '/api' });

http.listen(process.env.PORT || process.env.port || 3001, () => {
console.log('Express/Socket.io server is ready and is accepting connections');
});

First, I installed concurrently in the root of the solution so that I could run both server and client with one command:

1
$ npm install concurrently --save-dev

Then, I added the following command to the solution level package.json:

1
2
3
4
"scripts": {
"debug": "concurrently \"cd server && node --inspect=7244 app.js\" \"cd client && npm start\""
},

Now when I do npm run debug in the solution root, I get two processes spun up - one runs the server/app.js on :3001 and the other one runs the client on :3000. I also run server in debug mode and this will come handy when we get to setting up local debugging.

By the way, I used debug and not start command because I need npm start to be the way heroku launches this setup in production where server handles it all:

1
2
3
4
"scripts": {
"debug": "...",
"start": "node server/app.js"
}

I also need heroku to install all dependencies and build the front-end every time I push new version up. That’s one more npm command in the solution level package.json:

1
2
3
4
5
"scripts": {
"debug": "...",
"start": "...",
"postinstall": "cd server && npm install && cd ../client && npm install && npm run build"
}

The client expects socket.io to be accessible on the /api endpoint from the same server. From the App.js:

1
2
3
4
5
6
7
8
9
import io from 'socket.io-client';

class App extends Component {
constructor(props) {
super(props);

this.socket = io({ path: '/api' });
}
}

Easy in production setting where there is only one server. This is where proxy comes in to aid the development setup. We need to tell the webpack dev server to proxy everything it can’t handle to the server over at :3001. Need to add one little line to the client/pacakge.json:

1
2
3
{
"proxy": "http://localhost:3001/"
}

Last but not least, I would really like to be able to debug both client and server in one place. Visual Studio Code supports compound debugging since late last year. Here’s my launch configuration:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
"version": "0.2.0",
"configurations": [
{
"name": "Server",
"type": "node",
"request": "attach",
"port": 7244
},
{
"name": "Chrome",
"type": "chrome",
"request": "launch",
"url": "http://localhost:3000",
"webRoot": "${workspaceRoot}/client/src"
}
],
"compounds": [
{
"name": "Hybrid",
"configurations": [
"Server",
"Chrome"
]
}
]
}

You will need the Debugger for Chrome extension. Now you can npm run debug and then F5 to attach to both processes.

Nirvana.


Anthony Accomazzo’s post - Using create-react-app with a server - made it very easy for me to set it all up. I am very happy to share it a little further with a thin layer of heroku and VS code debugging.

Enjoy!

How To Promisify Moltin APIs

If you’ve read my last post, then you know that I am having all kinds of geeky fun with Moltin and its APIs. Today I will show you how you can quickly promisify them all.

Moltin APIs

Moltin APIs are all asynchronous HTTP calls with very lightweight wrappers for JavaScript, Python, and many other languages. I am using it with node.js and the main pattern is fairly straightforward (look for js examples).

I am writing a batch import to create a playground product catalog so I find myself doing a lot of this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const inventory = Promise.all(goGetTheData()); 

inventory.then((data) => {
return Promise.all(data.products.map(product => {
return new Promise((resolve, reject) => {
moltin.Authenticate(function() {
moltin.Product.Create({
// .. product attributes
}, (result) => {
resolve(result);
}, (error, details) => {
reject(details);
});
});
});
}));
}).then((products) => {
return Promise.all(products.map(p => {
return new Promise((resolve, reject) => {
moltin.Authenticate(function() {
// ...
});
});
}));
}).then((modifiers) => {
// ... you got the idea, a lot of noise
});

I wish I could instead write:

1
2
3
4
5
6
7
inventory.then((data) => Promise.all(data.products.map(p => {
return moltin.Product.Create(...);
}))).then((products) => Promise.all(products.map(p => {
return moltin.Modifier.Create(...);
}))).then((modifiers) => {
// ... a lot cleaner and more readable, isn't it?
});

Promisification

There’s moltin-util on NPM that uses Promises but it seems to introduce a new API and I would like to retain the original. Here’s what I quickly put together and I now wonder if it’s worth posting to NPM. Is it? Let me know!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
const request = require('request');
const fs = require('fs');

const promisify = (moltin) => {
const promisified = {}

const executor = (actor, action) => function () {
const args = [...arguments];

let success = (result, pagination) => {
if (result && pagination) {
result.pagination = pagination;
}

return result;
};

let error = (error, details) => details;

if (typeof (args[args.length - 1]) === 'function') {
if (typeof (args[args.length - 2]) === 'function') {
error = args.pop();
success = args.pop();
} else {
success = args.pop();
}
}

return new Promise((resolve, reject) => {
moltin.Authenticate(function () {
actor[action].call(actor, ...args,
(result, pagination) => {
resolve(success.call(null, result, pagination));
},
(err, details) => {
console.error(details);
reject(error.call(null, details));
});
});
});
};

Object.keys(moltin)
.filter(key => key !== 'options' && typeof (moltin[key]) === 'object')
.forEach(member => {
promisified[member] = {};
let actor = moltin[member];

Object.keys(actor.__proto__)
.concat(Object.keys(actor.__proto__.__proto__))
.filter(action => typeof (actor[action]) === 'function')
.forEach(action => {
promisified[member][action] = executor(actor, action);
});
});

return promisified;
}

module.exports = function (moltin) {
return promisify(moltin);
};

My code is now a whole lot cleaner and smaller too. I will soon post it on Github so stay tuned! Here is, for example, how I would go about deleting a whole bunch of products:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const moltin = require('moltin')({
publicId: process.env.MOLTIN_PUBLIC_ID,
secretKey: process.env.MOLTIN_SECRET_KEY
});
const moltin_p = require('./promisify-moltin')(moltin);

moltin_p.Product.List(null)
.then((products) => Promise.all(products.map(p => {
console.log('Requesting a delete of %s', p.title);
return moltin_p.Product.Delete(p.id);
})))
.then((result) => {
console.log('Deleted %s products', result.length);
})
.catch((error) => {
console.error(error);
});

Sequencing Asynchronous Calls in JavaScript

I am playing with Moltin for my upcoming talk on the API Strategy conference and it generates all kinds of blog posts ideas.

Context

Moltin is the API-first (or I would even argue the API-only) commerce platform. A “new kid on the block”, a recent Y Combinator graduate with a little more than $2M in seed funding. My talk is about cognitive APIs and smarter apps and I will be using a conversational e-commerce chatbot as an example. I picked Moltin as my commerce backend because it’s ridiculously easy to get started with, requires no upfront setup, and seems to provide a simple and yet a rich API that covers all my scenarios. Plus, their free tier gives me 30,000 requests per month.

You can’t transact with a commerce platform if it doesn’t have a product catalog. Products have variants (e.g. a t-shirt can come in different sizes and different colors) and different commerce platforms approach setting up this hierarchy differently. In Moltin, you first create a main product. Then you add modifiers (in my case - color and size). And then you add variations for each modifier. Moltin will then create the actual variants matrix behind the scenes. If, for example, you add blue, red, and white variations to the color modifier and S, M, L to the size modifier, you will end up with a nine total variations (every size available in every color).

Moltin APIs

Moltin APIs are lean HTTP endpoints that understand application/x-www-form-urlencoded and multipart/form-data and return back JSON. The team also supplies lightweight wrappers for JavaScript, Python, and other languages. Here’s, for example, how the creation of a variation in JavaScript looks like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const moltin = require('moltin')({
publicId: process.env.MOLTIN_PUBLIC_ID,
secretKey: process.env.MOLTIN_SECRET_KEY
});

moltin.Authenticate(function() {
moltin.Variation.Create(productId, modifierId, {
title: value
},
function(result){
// result is the successfully created variation
},
function(error, details) {
// oops
});
});

I am scripting the creation of the product catalog using Adventure Works as my sample dataset so I need to run a lot of these asynchronous callback style request in order. I do it with Promises:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const addVariation = (productId, modifierId, value) => {
return new Promise((resolve, reject) => {
moltin.Authenticate(function() {
moltin.Variation.Create(productId, modifierId, {
title: value
},
function(result){
resolve(result);
},
function(error, details) {
reject(details);
});
});
});
};

And now I can chain all my actions with .then().

Problem

I faced an interesting challenge as I was creating the variations for my products. Here’s how I do it. First, I figure out what modifiers I need to create and then for each modifier I collect the values. The result looks something like this:

1
2
3
4
5
6
7
const mods = [{
title: 'color',
values ['red', 'blue', 'white']
}, {
title: 'size',
values ['S', 'M', 'L']
}];

Now I can recursively .map() this structure into an array of Promises each creating a required variation in Moltin:

1
2
3
4
5
6
7
8
9
10
// somewhere on the chain
.then((mods) => {
return Promise.all(_.flatMap(mods, mod => {
return mod.values.map(value => {
return new Promise((resolve, reject) => {
// create the variation in Moltin
});
});
}));
})

“Looking good”, I thought, until I found out that creating all the variations asynchronously in no particular order and maybe even in parallel confuses the logic on the Moltin side that creates the product matrix (details).

Since I can’t trigger a matrix rebuild via the API, the solution was to sequence the variants creation.

Solution

Instead of just mapping the modifiers to a list of Promises running somewhat concurrently, I I needed to chain variants creation one after another. I also needed to collect all created variations into a list for the next step in the bigger chain.

Nested reduce to the rescue. First, the addVariation now keeps track of the results:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const addVariation = (productId, modifierId, value, bag) => {
return new Promise((resolve, reject) => {
moltin.Authenticate(function() {
moltin.Variation.Create(productId, modifierId, {
title: value
},
function(result){
bag.push(result)
resolve(result);
}
// error handling skipped
});
});
};

And the Promie.all() has to convert into a linear chain of promises each creating a single variation:

1
2
3
4
5
6
7
8
9
10
11
12
// somewhere on the chain
.then(mods => {
var variations = [];

const chained = mods.reduce((chain, mod) => {
return mod.values.reduce((chain, value) => {
return chain.then(() => addVariant(productId, mod, value, variations));
}, chain);
}, Promise.resolve());

return chained.then(() => Promise.resolve(variations));
})

Works great but I feel like it can be cleaner with Rx. If you know how to convert this to observables and not explicitly manage the collection of created variants, please drop me a line. Thanks!