Implementing As Partition Processing via Azure Functions–Part 2

In the previous blog post we walked through how to implement the As Partition Processing (open-source) solution using Azure Functions which generated some good questions and discussions. As a result, I’ve decided to take this another step and show you how to implement this solution using the Visual Studio Azure Function project type and modify it to use an Azure Service Principal.

Step 1: Create an Azure Service Principal

Not sure how to do this? Start here.

When you’re done, you’ll need to keep the following pieces of information handy for steps 2 & 3.

  • Application ID
  • Application Key

You will also want to give this Service Principal permissions to process models by either making it an instance admin or a member of a role with processing permissions.

Step 2: Update As Partition Processing Solution

This section contains steps needed to update the AsPartitionProcessing solution to allow for processing models using an Azure Service Principal. There are several ways to do this and until last week I was doing it a different way that had several disadvantages over what you see now (h/t Christian Wade for another course correction). Full source code available here.

First you’ll need to add a new column to the ModelConfiguration table of the AsPartitionProcessing.ConfigurationLogging database project:

CREATE TABLE [dbo].[ModelConfiguration] (
    [ModelConfigurationID]     INT           NOT NULL,
    [AnalysisServicesServer]   VARCHAR (255) NOT NULL,
    [AnalysisServicesDatabase] VARCHAR (255) NOT NULL,
    [InitialSetUp]             BIT           NOT NULL,
    [IncrementalOnline]        BIT           NOT NULL,
    [IntegratedAuth]           BIT           NOT NULL,
    [ServicePrincipal]         BIT           NOT NULL,
    [MaxParallelism]           INT           NOT NULL,
    [CommitTimeout]            INT           NOT NULL,
    [RetryAttempts]			   TINYINT NOT NULL, 
    [RetryWaitTimeSeconds]     INT NOT NULL, 
    CONSTRAINT [PK_ModelConfiguration] PRIMARY KEY CLUSTERED ([ModelConfigurationID] ASC)
);

Next, you’ll add that new column to the [dbo].[vPartitioningConfiguration] view in the same project:

CREATE VIEW [dbo].[vPartitioningConfiguration]
AS
SELECT m.[ModelConfigurationID]
      ,m.[AnalysisServicesServer]
      ,m.[AnalysisServicesDatabase]
      ,m.[InitialSetUp]
      ,m.[IncrementalOnline]
      ,m.[IntegratedAuth]
      ,m.[ServicePrincipal]
      ,m.[MaxParallelism]
      ,m.[CommitTimeout]
      ,m.[RetryAttempts]
      ,m.[RetryWaitTimeSeconds]
      ,t.[TableConfigurationID]
      ,t.[AnalysisServicesTable]
      ,t.[DoNotProcess]
      ,CASE
        WHEN p.[TableConfigurationID] IS NULL THEN 0
        ELSE 1
       END [Partitioned]
      ,p.[PartitioningConfigurationID]
      ,p.[Granularity]
      ,p.[NumberOfPartitionsFull]
      ,p.[NumberOfPartitionsForIncrementalProcess]
	  ,p.[MaxDateIsNow]
      ,p.[MaxDate]
      ,p.[IntegerDateKey]
      ,p.[TemplateSourceQuery]
  FROM [dbo].[ModelConfiguration] m
INNER JOIN [dbo].[TableConfiguration] t ON m.[ModelConfigurationID] = t.[ModelConfigurationID]
LEFT JOIN [dbo].[PartitioningConfiguration] p ON t.[TableConfigurationID] = p.[TableConfigurationID]

Then, you’ll update the ModelConfiguration class in the AsPartitionProcessing project:

        /// <summary>
        /// Should always set to true for SSAS implementations that will run under the current process account. For Azure AS, normally set to false.
        /// </summary>
        public bool IntegratedAuth { get; set; }

        /// <summary>
        /// Only applies when integratedAuth=false. Azure AD UPN vs ServicePrincipal.
        /// </summary>
        public bool ServicePrincipalTokenAuth { get; set; }

<skipping a bunch of lines>

        /// <summary>
        /// Parameters normally from configuration database to determine partitioning ranges and design.
        /// </summary>
        /// <param name="modelConfigurationID">ID of the ModelConfiguration table.</param>
        /// <param name="analysisServicesServer">Name of the Analysis Services instance. Can be SSAS or an Azure AS URL.</param>
        /// <param name="analysisServicesDatabase">Name of the Analysis Services database.</param>
        /// <param name="initialSetUp">True for initial set up to create partitions and process them sequentially. False for incremental processing.</param>
        /// <param name="incrementalOnline">When initialSetUp=false, determines if processing is performed as an online operation, which may require more memory, but users can still query the model.</param>
        /// <param name="integratedAuth">Should always set to true for SSAS implementations that will run under the current process account. For Azure AS, normally set to false.</param>
        /// <param name="ServicePrincipalTokenAuth">Set to true when using service principal w/ Azure AS.</param>
        /// <param name="userName">Only applies when integratedAuth=false. Used for Azure AD UPNs to connect to Azure AS.</param>
        /// <param name="password">Only applies when integratedAuth=false. Used for Azure AD UPNs to connect to Azure AS.</param>
        /// <param name="maxParallelism">Sets the maximum number of threads on which to run processing commands in parallel. -1 will not set the value.</param>
        /// <param name="commitTimeout">Set to override of CommitTimeout server property value for the connection. -1 will not override; the server value will be used.</param>
        /// <param name="retryAttempts">Number of times a retry of the processing operation will be performed if an error occurs. Use for near-real time scenarios and environments with network reliability issues.</param>
        /// <param name="retryWaitTimeSeconds">Number of seconds to wait before a retry attempt.</param>
        /// <param name="tableConfigurations">Collection of partitioned tables containing configuration information.</param>
        public ModelConfiguration(
            int modelConfigurationID,
            string analysisServicesServer,
            string analysisServicesDatabase,
            bool initialSetUp,
            bool incrementalOnline,
            bool integratedAuth,
            bool serviceprincipal,
            string userName,
            string password,
            int maxParallelism,
            int commitTimeout,
            int retryAttempts,
            int retryWaitTimeSeconds,
            List<TableConfiguration> tableConfigurations
        )
        {
            ModelConfigurationID = modelConfigurationID;
            AnalysisServicesServer = analysisServicesServer;
            AnalysisServicesDatabase = analysisServicesDatabase;
            InitialSetUp = initialSetUp;
            IncrementalOnline = incrementalOnline;
            IntegratedAuth = integratedAuth;
            ServicePrincipalTokenAuth = serviceprincipal;
            UserName = userName;
            Password = password;
            MaxParallelism = maxParallelism;
            CommitTimeout = commitTimeout;
            RetryAttempts = retryAttempts;
            RetryWaitTimeSeconds = retryWaitTimeSeconds;
            TableConfigurations = tableConfigurations;
            ExecutionID = Guid.NewGuid().ToString();
        }

After that, you’ll update the ReadConfig method in the ConfigDatabaseHelper class of the AsPartitionProcessing project:

        public static List<ModelConfiguration> ReadConfig(ConfigDatabaseConnectionInfo connectionInfo, string modelConfigurationIDs)
        {
            string commandText = String.Format(@"  
                        SELECT [ModelConfigurationID]
                              ,[AnalysisServicesServer]
                              ,[AnalysisServicesDatabase]
                              ,[InitialSetUp]
                              ,[IncrementalOnline]
                              ,[IntegratedAuth]
                              ,[ServicePrincipal]
                              ,[MaxParallelism]
                              ,[CommitTimeout]
                              ,[RetryAttempts]
                              ,[RetryWaitTimeSeconds]
                              ,[TableConfigurationID]
                              ,[AnalysisServicesTable]
                              ,[Partitioned]
                              ,[PartitioningConfigurationID]
                              ,[Granularity]
                              ,[NumberOfPartitionsFull]
                              ,[NumberOfPartitionsForIncrementalProcess]
                              ,[MaxDateIsNow]
                              ,[MaxDate]
                              ,[IntegerDateKey]
                              ,[TemplateSourceQuery]
                          FROM [dbo].[vPartitioningConfiguration]
                          WHERE [DoNotProcess] = 0 {0}
                          ORDER BY
                               [ModelConfigurationID],
                               [TableConfigurationID],
                               [PartitioningConfigurationID];",
                               (String.IsNullOrEmpty(modelConfigurationIDs) ? "" : $" AND [ModelConfigurationID] IN ({modelConfigurationIDs}) "));

            using (SqlConnection connection = new SqlConnection(GetConnectionString(connectionInfo)))
            {
                connection.Open();
                using (SqlCommand command = new SqlCommand())
                {
                    command.Connection = connection;
                    command.CommandType = CommandType.Text;
                    command.CommandText = commandText;

                    List<ModelConfiguration> modelConfigs = new List<ModelConfiguration>();
                    ModelConfiguration modelConfig = null;
                    int currentModelConfigurationID = -1;
                    TableConfiguration tableConfig = null;
                    int currentTableConfigurationID = -1;

                    SqlDataReader reader = command.ExecuteReader();

                    while (reader.Read())
                    {
                        if (modelConfig == null || currentModelConfigurationID != Convert.ToInt32(reader["ModelConfigurationID"]))
                        {
                            modelConfig = new ModelConfiguration();
                            modelConfig.TableConfigurations = new List<TableConfiguration>();
                            modelConfigs.Add(modelConfig);

                            modelConfig.ModelConfigurationID = Convert.ToInt32(reader["ModelConfigurationID"]);
                            modelConfig.AnalysisServicesServer = Convert.ToString(reader["AnalysisServicesServer"]);
                            modelConfig.AnalysisServicesDatabase = Convert.ToString(reader["AnalysisServicesDatabase"]);
                            modelConfig.InitialSetUp = Convert.ToBoolean(reader["InitialSetUp"]);
                            modelConfig.IncrementalOnline = Convert.ToBoolean(reader["IncrementalOnline"]);
                            modelConfig.IntegratedAuth = Convert.ToBoolean(reader["IntegratedAuth"]);
                            modelConfig.ServicePrincipalTokenAuth = Convert.ToBoolean(reader["ServicePrincipal"]);
                            modelConfig.MaxParallelism = Convert.ToInt32(reader["MaxParallelism"]);
                            modelConfig.CommitTimeout = Convert.ToInt32(reader["CommitTimeout"]);
                            modelConfig.RetryAttempts = Convert.ToInt32(reader["RetryAttempts"]);
                            modelConfig.RetryWaitTimeSeconds = Convert.ToInt32(reader["RetryWaitTimeSeconds"]);
                            modelConfig.ConfigDatabaseConnectionInfo = connectionInfo;

                            currentModelConfigurationID = modelConfig.ModelConfigurationID;
                        }

                        if (tableConfig == null || currentTableConfigurationID != Convert.ToInt32(reader["TableConfigurationID"]))
                        {
                            tableConfig = new TableConfiguration();
                            tableConfig.PartitioningConfigurations = new List<PartitioningConfiguration>();
                            modelConfig.TableConfigurations.Add(tableConfig);
                            tableConfig.TableConfigurationID = Convert.ToInt32(reader["TableConfigurationID"]);
                            tableConfig.AnalysisServicesTable = Convert.ToString(reader["AnalysisServicesTable"]);
                            currentTableConfigurationID = tableConfig.TableConfigurationID;
                        }

                        if (Convert.ToBoolean(reader["Partitioned"]))
                        {
                            tableConfig.PartitioningConfigurations.Add(
                                new PartitioningConfiguration(
                                    Convert.ToInt32(reader["PartitioningConfigurationID"]),
                                    (Granularity)Convert.ToInt32(reader["Granularity"]),
                                    Convert.ToInt32(reader["NumberOfPartitionsFull"]),
                                    Convert.ToInt32(reader["NumberOfPartitionsForIncrementalProcess"]),
                                    Convert.ToBoolean(reader["MaxDateIsNow"]),
                                    (reader["MaxDate"] == DBNull.Value ? DateTime.MinValue : Convert.ToDateTime(reader["MaxDate"])),
                                    Convert.ToBoolean(reader["IntegerDateKey"]),
                                    Convert.ToString(reader["TemplateSourceQuery"])
                                )
                            );
                        }
                    }
                    
                    return modelConfigs;
                }
            }
        }

Finally, you’ll update the Connect method in the PartitionProcessor class of the AsPartitionProcessing project:

IMPORTANT: currently this will only work when using version 15 of TOM from this NuGet package.

        private static void Connect(Server server, out Database database)
        {
            //Connect and get main objects
            string serverConnectionString = $"Provider=MSOLAP;{(_modelConfiguration.CommitTimeout == -1 ? "" : $"CommitTimeout={Convert.ToString(_modelConfiguration.CommitTimeout)};")}Data Source={_modelConfiguration.AnalysisServicesServer};";
            if (_modelConfiguration.IntegratedAuth)
            {
                serverConnectionString += $"Integrated Security=SSPI;";
            }
            else if (_modelConfiguration.ServicePrincipalTokenAuth)
            {
                serverConnectionString += $"User ID=app:{_modelConfiguration.UserName};Password={_modelConfiguration.Password};";
            }
            else
            {
                serverConnectionString += $"User ID={_modelConfiguration.UserName};Password={_modelConfiguration.Password};Persist Security Info=True;Impersonation Level=Impersonate;";
            }
            server.Connect(serverConnectionString);

            database = server.Databases.FindByName(_modelConfiguration.AnalysisServicesDatabase);
            if (database == null)
            {
                throw new Microsoft.AnalysisServices.ConnectionException($"Could not connect to database {_modelConfiguration.AnalysisServicesDatabase}.");
            }
        }

At this point, if you want to test using the AsPartitionProcessing.SampleClient console app, you’ll need to modify the SetCredentials function as shown below and enter the AppID/AppKey values from Step 1 …

image

Step 3. Create the Azure Function Project in Visual Studio

I found the MSFT documentation on creating an Azure Function via Visual Studio more than adequate… so I’m not going to walk through the basics. Instead I’ll just share my source code and point out the hurdles I encountered.

After creating the Azure Function project, we need to add a reference to the AsPartitionProcessing.dll:

image

Next we can open up the main C# file and add the following:

using System;
using System.Configuration;
using System.Collections.Generic;
using AsPartitionProcessing;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;

namespace fAsPartitionProcessing
{
    public static class fAsPartitionProcessingTimer
    {

        private static string _modelConfigurationIDs;

        [FunctionName("fAsPartitionProcessingTimer")]
        public static void Run([TimerTrigger("0 */5 * * * *")]TimerInfo myTimer, TraceWriter log)
        {
            log.Info($"C# Timer trigger function (fAsPartitionProcessingTimer) started at: {DateTime.Now}");

            try
            {
                /* read from ASPP_ConfigurationLoggingDB */
                List<ModelConfiguration> modelsConfig = InitializeFromDatabase();

                /* loop through Model Config */
                foreach (ModelConfiguration modelConfig in modelsConfig)
                {
                    /* grab user/pw for AzureAS authentication */
                    String azure_AppID = System.Environment.GetEnvironmentVariable("CUSTOMCONNSTR_AzureAS_SvcPrincipal_AppID");
                    String azure_AppKey = System.Environment.GetEnvironmentVariable("CUSTOMCONNSTR_AzureAS_SvcPrincipal_AppKey");

                    /* apparently you can do it this way as well */
                    modelConfig.UserName = azure_AppID;
                    modelConfig.Password = azure_AppKey;

                    /* perform processing */
                    PartitionProcessor.PerformProcessing(modelConfig, ConfigDatabaseHelper.LogMessage);
                }
            }
            catch (Exception e)
            {
                log.Info($"C# Timer trigger function (fAsPartitionProcessingTimer) exception: {e.ToString()}");
            }

            log.Info($"C# Timer trigger function (fAsPartitionProcessingTimer) finished at: {DateTime.Now}");

        }
        public static List<ModelConfiguration> InitializeFromDatabase()
        {
            ConfigDatabaseConnectionInfo connectionInfo = new ConfigDatabaseConnectionInfo();
            
            connectionInfo.Server = System.Environment.GetEnvironmentVariable("CUSTOMCONNSTR_connstr_ASPP_ConfigurationLogging_SERVER");
            connectionInfo.Database = System.Environment.GetEnvironmentVariable("CUSTOMCONNSTR_connstr_ASPP_ConfigurationLogging_DB");
            connectionInfo.UserName = System.Environment.GetEnvironmentVariable("CUSTOMCONNSTR_connstr_ASPP_ConfigurationLogging_USER");
            connectionInfo.Password = System.Environment.GetEnvironmentVariable("CUSTOMCONNSTR_connstr_ASPP_ConfigurationLogging_PW");

            return ConfigDatabaseHelper.ReadConfig(connectionInfo, _modelConfigurationIDs);
        }
    }
}

Note the highlighted parts above… this was the only real stumbling block… in order to access the connection string values you have to prepend a value onto the name of the connection string you’re trying to retrieve. In the previous blog post – where we created the Azure Function via the portal, this was not required.

In this case, because I defined my connection strings as “Custom” (see image below) I had to prepend “CUSTOMCONNSTR_” to the name.

image

As far as I’m currently aware… these connection strings need to be created after publishing the Azure Function (which is a bit annoying)… if there is a way to add these variables as part of the deployment/publish process, please let me know.

The last thing to note… when using the Azure Function visual studio project, the app is read-only in the portal so any changes will need to be made in visual studio and re-published.

image

Additional Resources

Leave a Reply